From 4536671ff6ca35e16665867ef40a1298e47f4626 Mon Sep 17 00:00:00 2001 From: Adam Kocoloski Date: Wed, 2 Sep 2009 03:40:44 +0000 Subject: Support for replication over SSL. Resolves COUCHDB-491 This turned out to be a decent amount of work, since: 1) ibrowse did not use SSL on dedicated connections. Wrote a simplistic patch, will contact Chandru for further discussion. 2) When nginx is used for the SSL wrapper, it wants to buffer the changes feed. Setting "proxy_buffering off" in nginx.conf helps, but some buffering still occurred. Fixed by making couch_rep_changes_feed smart enough to split merged chunks. 3) The Erlang ssl application showed instabilities when used with {active,once}. Switched to the "new implementation" using {ssl_imp, new} and instabilities disappeared. git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@810350 13f79535-47bb-0310-9956-ffa450edef68 --- src/couchdb/couch_app.erl | 2 +- src/couchdb/couch_rep_changes_feed.erl | 71 ++++++++++++++-------------------- src/couchdb/couch_rep_httpc.erl | 4 +- 3 files changed, 31 insertions(+), 46 deletions(-) (limited to 'src/couchdb') diff --git a/src/couchdb/couch_app.erl b/src/couchdb/couch_app.erl index 98615e50..1b64434a 100644 --- a/src/couchdb/couch_app.erl +++ b/src/couchdb/couch_app.erl @@ -20,7 +20,7 @@ start(_Type, DefaultIniFiles) -> IniFiles = get_ini_files(DefaultIniFiles), - case start_apps([crypto, sasl, inets, oauth, ibrowse, mochiweb]) of + case start_apps([crypto, sasl, inets, oauth, ssl, ibrowse, mochiweb]) of ok -> couch_server_sup:start_link(IniFiles); {error, Reason} -> diff --git a/src/couchdb/couch_rep_changes_feed.erl b/src/couchdb/couch_rep_changes_feed.erl index da324a0e..f301b6e7 100644 --- a/src/couchdb/couch_rep_changes_feed.erl +++ b/src/couchdb/couch_rep_changes_feed.erl @@ -30,7 +30,7 @@ reqid = nil, complete = false, count = 0, - partial_chunk = nil, + partial_chunk = <<>>, reply_to = nil, rows = queue:new() }). @@ -60,7 +60,7 @@ init([_Parent, #http_db{}=Source, Since, PostProps]) -> conn = Pid, options = [{stream_to, {self(), once}}, {response_format, binary}], headers = Source#http_db.headers -- [{"Accept-Encoding", "gzip"}] - }, + }, {ibrowse_req_id, ReqId} = couch_rep_httpc:request(Req), receive @@ -127,8 +127,12 @@ handle_cast(_Msg, State) -> handle_info({ibrowse_async_headers, Id, Code, Hdrs}, #state{reqid=Id}=State) -> handle_headers(list_to_integer(Code), Hdrs, State); -handle_info({ibrowse_async_response, Id, Msg}, #state{reqid=Id} = State) -> - handle_response(Msg, State); +handle_info({ibrowse_async_response, Id, {error,E}}, #state{reqid=Id}=State) -> + {stop, {error, E}, State}; + +handle_info({ibrowse_async_response, Id, Chunk}, #state{reqid=Id}=State) -> + Messages = [M || M <- re:split(Chunk, ",?\n", [trim]), M =/= <<>>], + handle_messages(Messages, State); handle_info({ibrowse_async_response_end, Id}, #state{reqid=Id} = State) -> handle_feed_completion(State); @@ -200,60 +204,41 @@ handle_headers(Code, Hdrs, State) -> [Code,Hdrs]), {stop, {error, Code}, State}. -handle_response({error, Reason}, State) -> - {stop, {error, Reason}, State}; -handle_response(<<"\n">>, State) -> - ?LOG_DEBUG("got a heartbeat from the remote server", []), - ok = maybe_stream_next(State), - {noreply, State}; -handle_response(<<"{\"results\":[\n">>, State) -> +handle_messages([], State) -> ok = maybe_stream_next(State), {noreply, State}; -handle_response(<<"\n],\n\"last_seq\":", LastSeqStr/binary>>, State) -> +handle_messages([<<"{\"results\":[">>|Rest], State) -> + handle_messages(Rest, State); +handle_messages([<<"]">>, <<"\"last_seq\":", LastSeqStr/binary>>], State) -> LastSeq = list_to_integer(?b2l(hd(re:split(LastSeqStr, "}")))), - {noreply, State#state{last_seq = LastSeq}}; -handle_response(<<"{\"last_seq\":", LastSeqStr/binary>>, State) -> + handle_feed_completion(State#state{last_seq = LastSeq}); +handle_messages([<<"{\"last_seq\":", LastSeqStr/binary>>], State) -> LastSeq = list_to_integer(?b2l(hd(re:split(LastSeqStr, "}")))), - {noreply, State#state{last_seq = LastSeq}}; -handle_response(Chunk, #state{partial_chunk=nil} = State) -> - #state{ - count = Count, - rows = Rows - } = State, - ok = maybe_stream_next(State), - try - Row = decode_row(Chunk), - case State of - #state{reply_to=nil} -> - {noreply, State#state{count=Count+1, rows = queue:in(Row, Rows)}}; - #state{count=0, reply_to=From}-> - gen_server:reply(From, [Row]), - {noreply, State#state{reply_to=nil}} - end - catch - throw:{invalid_json, Bad} -> - {noreply, State#state{partial_chunk = Bad}} - end; -handle_response(Chunk, State) -> + handle_feed_completion(State#state{last_seq = LastSeq}); +handle_messages([Chunk|Rest], State) -> #state{ count = Count, partial_chunk = Partial, rows = Rows } = State, - ok = maybe_stream_next(State), - try + NewState = try Row = decode_row(<>), - {noreply, case State of + case State of #state{reply_to=nil} -> - State#state{count=Count+1, partial_chunk=nil, rows=queue:in(Row,Rows)}; + State#state{ + count = Count+1, + partial_chunk = <<>>, + rows=queue:in(Row,Rows) + }; #state{count=0, reply_to=From}-> gen_server:reply(From, [Row]), - State#state{reply_to=nil, partial_chunk=nil} - end} + State#state{reply_to = nil, partial_chunk = <<>>} + end catch throw:{invalid_json, Bad} -> - {noreply, State#state{partial_chunk = Bad}} - end. + State#state{partial_chunk = Bad} + end, + handle_messages(Rest, NewState). handle_feed_completion(#state{reply_to=nil} = State)-> {noreply, State#state{complete=true}}; diff --git a/src/couchdb/couch_rep_httpc.erl b/src/couchdb/couch_rep_httpc.erl index 9e8bfb42..5689fa29 100644 --- a/src/couchdb/couch_rep_httpc.erl +++ b/src/couchdb/couch_rep_httpc.erl @@ -149,12 +149,12 @@ process_response({error, Reason}, Req) -> spawn_worker_process(Req) -> Url = ibrowse_lib:parse_url(Req#http_db.url), - {ok, Pid} = ibrowse:spawn_worker_process(Url#url.host, Url#url.port), + {ok, Pid} = ibrowse_http_client:start(Url), Pid. spawn_link_worker_process(Req) -> Url = ibrowse_lib:parse_url(Req#http_db.url), - {ok, Pid} = ibrowse:spawn_link_worker_process(Url#url.host, Url#url.port), + {ok, Pid} = ibrowse_http_client:start_link(Url), Pid. maybe_decompress(Headers, Body) -> -- cgit v1.2.3