diff options
-rw-r--r-- | src/couchdb/couch_app.erl | 2 | ||||
-rw-r--r-- | src/couchdb/couch_rep_changes_feed.erl | 71 | ||||
-rw-r--r-- | src/couchdb/couch_rep_httpc.erl | 4 | ||||
-rw-r--r-- | src/ibrowse/ibrowse_http_client.erl | 12 |
4 files changed, 42 insertions, 47 deletions
diff --git a/src/couchdb/couch_app.erl b/src/couchdb/couch_app.erl index 98615e50..1b64434a 100644 --- a/src/couchdb/couch_app.erl +++ b/src/couchdb/couch_app.erl @@ -20,7 +20,7 @@ start(_Type, DefaultIniFiles) -> IniFiles = get_ini_files(DefaultIniFiles), - case start_apps([crypto, sasl, inets, oauth, ibrowse, mochiweb]) of + case start_apps([crypto, sasl, inets, oauth, ssl, ibrowse, mochiweb]) of ok -> couch_server_sup:start_link(IniFiles); {error, Reason} -> diff --git a/src/couchdb/couch_rep_changes_feed.erl b/src/couchdb/couch_rep_changes_feed.erl index da324a0e..f301b6e7 100644 --- a/src/couchdb/couch_rep_changes_feed.erl +++ b/src/couchdb/couch_rep_changes_feed.erl @@ -30,7 +30,7 @@ reqid = nil, complete = false, count = 0, - partial_chunk = nil, + partial_chunk = <<>>, reply_to = nil, rows = queue:new() }). @@ -60,7 +60,7 @@ init([_Parent, #http_db{}=Source, Since, PostProps]) -> conn = Pid, options = [{stream_to, {self(), once}}, {response_format, binary}], headers = Source#http_db.headers -- [{"Accept-Encoding", "gzip"}] - }, + }, {ibrowse_req_id, ReqId} = couch_rep_httpc:request(Req), receive @@ -127,8 +127,12 @@ handle_cast(_Msg, State) -> handle_info({ibrowse_async_headers, Id, Code, Hdrs}, #state{reqid=Id}=State) -> handle_headers(list_to_integer(Code), Hdrs, State); -handle_info({ibrowse_async_response, Id, Msg}, #state{reqid=Id} = State) -> - handle_response(Msg, State); +handle_info({ibrowse_async_response, Id, {error,E}}, #state{reqid=Id}=State) -> + {stop, {error, E}, State}; + +handle_info({ibrowse_async_response, Id, Chunk}, #state{reqid=Id}=State) -> + Messages = [M || M <- re:split(Chunk, ",?\n", [trim]), M =/= <<>>], + handle_messages(Messages, State); handle_info({ibrowse_async_response_end, Id}, #state{reqid=Id} = State) -> handle_feed_completion(State); @@ -200,60 +204,41 @@ handle_headers(Code, Hdrs, State) -> [Code,Hdrs]), {stop, {error, Code}, State}. -handle_response({error, Reason}, State) -> - {stop, {error, Reason}, State}; -handle_response(<<"\n">>, State) -> - ?LOG_DEBUG("got a heartbeat from the remote server", []), - ok = maybe_stream_next(State), - {noreply, State}; -handle_response(<<"{\"results\":[\n">>, State) -> +handle_messages([], State) -> ok = maybe_stream_next(State), {noreply, State}; -handle_response(<<"\n],\n\"last_seq\":", LastSeqStr/binary>>, State) -> +handle_messages([<<"{\"results\":[">>|Rest], State) -> + handle_messages(Rest, State); +handle_messages([<<"]">>, <<"\"last_seq\":", LastSeqStr/binary>>], State) -> LastSeq = list_to_integer(?b2l(hd(re:split(LastSeqStr, "}")))), - {noreply, State#state{last_seq = LastSeq}}; -handle_response(<<"{\"last_seq\":", LastSeqStr/binary>>, State) -> + handle_feed_completion(State#state{last_seq = LastSeq}); +handle_messages([<<"{\"last_seq\":", LastSeqStr/binary>>], State) -> LastSeq = list_to_integer(?b2l(hd(re:split(LastSeqStr, "}")))), - {noreply, State#state{last_seq = LastSeq}}; -handle_response(Chunk, #state{partial_chunk=nil} = State) -> - #state{ - count = Count, - rows = Rows - } = State, - ok = maybe_stream_next(State), - try - Row = decode_row(Chunk), - case State of - #state{reply_to=nil} -> - {noreply, State#state{count=Count+1, rows = queue:in(Row, Rows)}}; - #state{count=0, reply_to=From}-> - gen_server:reply(From, [Row]), - {noreply, State#state{reply_to=nil}} - end - catch - throw:{invalid_json, Bad} -> - {noreply, State#state{partial_chunk = Bad}} - end; -handle_response(Chunk, State) -> + handle_feed_completion(State#state{last_seq = LastSeq}); +handle_messages([Chunk|Rest], State) -> #state{ count = Count, partial_chunk = Partial, rows = Rows } = State, - ok = maybe_stream_next(State), - try + NewState = try Row = decode_row(<<Partial/binary, Chunk/binary>>), - {noreply, case State of + case State of #state{reply_to=nil} -> - State#state{count=Count+1, partial_chunk=nil, rows=queue:in(Row,Rows)}; + State#state{ + count = Count+1, + partial_chunk = <<>>, + rows=queue:in(Row,Rows) + }; #state{count=0, reply_to=From}-> gen_server:reply(From, [Row]), - State#state{reply_to=nil, partial_chunk=nil} - end} + State#state{reply_to = nil, partial_chunk = <<>>} + end catch throw:{invalid_json, Bad} -> - {noreply, State#state{partial_chunk = Bad}} - end. + State#state{partial_chunk = Bad} + end, + handle_messages(Rest, NewState). handle_feed_completion(#state{reply_to=nil} = State)-> {noreply, State#state{complete=true}}; diff --git a/src/couchdb/couch_rep_httpc.erl b/src/couchdb/couch_rep_httpc.erl index 9e8bfb42..5689fa29 100644 --- a/src/couchdb/couch_rep_httpc.erl +++ b/src/couchdb/couch_rep_httpc.erl @@ -149,12 +149,12 @@ process_response({error, Reason}, Req) -> spawn_worker_process(Req) -> Url = ibrowse_lib:parse_url(Req#http_db.url), - {ok, Pid} = ibrowse:spawn_worker_process(Url#url.host, Url#url.port), + {ok, Pid} = ibrowse_http_client:start(Url), Pid. spawn_link_worker_process(Req) -> Url = ibrowse_lib:parse_url(Req#http_db.url), - {ok, Pid} = ibrowse:spawn_link_worker_process(Url#url.host, Url#url.port), + {ok, Pid} = ibrowse_http_client:start_link(Url), Pid. maybe_decompress(Headers, Body) -> diff --git a/src/ibrowse/ibrowse_http_client.erl b/src/ibrowse/ibrowse_http_client.erl index dde258ef..5f62f705 100644 --- a/src/ibrowse/ibrowse_http_client.erl +++ b/src/ibrowse/ibrowse_http_client.erl @@ -113,6 +113,16 @@ init({Host, Port}) -> port = Port}, put(ibrowse_trace_token, [Host, $:, integer_to_list(Port)]), put(my_trace_flag, ibrowse_lib:get_trace_status(Host, Port)), + {ok, State}; +init(#url{host=Host, port=Port, protocol=Protocol}) -> + State = #state{ + host = Host, + port = Port, + is_ssl = (Protocol == https), + ssl_options = [{ssl_imp, new}] + }, + put(ibrowse_trace_token, [Host, $:, integer_to_list(Port)]), + put(my_trace_flag, ibrowse_lib:get_trace_status(Host, Port)), {ok, State}. %%-------------------------------------------------------------------- @@ -137,7 +147,7 @@ handle_call({send_req, {Url, Headers, Method, Body, Options, Timeout}}, handle_call(stop, _From, State) -> do_close(State), do_error_reply(State, closing_on_request), - {stop, normal, ok, State}; + {stop, normal, ok, State#state{socket=undefined}}; handle_call(Request, _From, State) -> Reply = {unknown_request, Request}, |