diff options
author | Adam Kocoloski <kocolosk@apache.org> | 2009-09-02 03:40:44 +0000 |
---|---|---|
committer | Adam Kocoloski <kocolosk@apache.org> | 2009-09-02 03:40:44 +0000 |
commit | 4536671ff6ca35e16665867ef40a1298e47f4626 (patch) | |
tree | 651024567a36560bfc9b4b6ced02a74b6fb0f4e1 /src/couchdb/couch_rep_changes_feed.erl | |
parent | c1b797c10aef694415276f2fcb85676fddfb0ad8 (diff) |
Support for replication over SSL. Resolves COUCHDB-491
This turned out to be a decent amount of work, since:
1) ibrowse did not use SSL on dedicated connections. Wrote a simplistic patch,
will contact Chandru for further discussion.
2) When nginx is used for the SSL wrapper, it wants to buffer the changes feed.
Setting "proxy_buffering off" in nginx.conf helps, but some buffering still
occurred. Fixed by making couch_rep_changes_feed smart enough to split
merged chunks.
3) The Erlang ssl application showed instabilities when used with {active,once}.
Switched to the "new implementation" using {ssl_imp, new} and instabilities
disappeared.
git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@810350 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src/couchdb/couch_rep_changes_feed.erl')
-rw-r--r-- | src/couchdb/couch_rep_changes_feed.erl | 71 |
1 files changed, 28 insertions, 43 deletions
diff --git a/src/couchdb/couch_rep_changes_feed.erl b/src/couchdb/couch_rep_changes_feed.erl index da324a0e..f301b6e7 100644 --- a/src/couchdb/couch_rep_changes_feed.erl +++ b/src/couchdb/couch_rep_changes_feed.erl @@ -30,7 +30,7 @@ reqid = nil, complete = false, count = 0, - partial_chunk = nil, + partial_chunk = <<>>, reply_to = nil, rows = queue:new() }). @@ -60,7 +60,7 @@ init([_Parent, #http_db{}=Source, Since, PostProps]) -> conn = Pid, options = [{stream_to, {self(), once}}, {response_format, binary}], headers = Source#http_db.headers -- [{"Accept-Encoding", "gzip"}] - }, + }, {ibrowse_req_id, ReqId} = couch_rep_httpc:request(Req), receive @@ -127,8 +127,12 @@ handle_cast(_Msg, State) -> handle_info({ibrowse_async_headers, Id, Code, Hdrs}, #state{reqid=Id}=State) -> handle_headers(list_to_integer(Code), Hdrs, State); -handle_info({ibrowse_async_response, Id, Msg}, #state{reqid=Id} = State) -> - handle_response(Msg, State); +handle_info({ibrowse_async_response, Id, {error,E}}, #state{reqid=Id}=State) -> + {stop, {error, E}, State}; + +handle_info({ibrowse_async_response, Id, Chunk}, #state{reqid=Id}=State) -> + Messages = [M || M <- re:split(Chunk, ",?\n", [trim]), M =/= <<>>], + handle_messages(Messages, State); handle_info({ibrowse_async_response_end, Id}, #state{reqid=Id} = State) -> handle_feed_completion(State); @@ -200,60 +204,41 @@ handle_headers(Code, Hdrs, State) -> [Code,Hdrs]), {stop, {error, Code}, State}. -handle_response({error, Reason}, State) -> - {stop, {error, Reason}, State}; -handle_response(<<"\n">>, State) -> - ?LOG_DEBUG("got a heartbeat from the remote server", []), - ok = maybe_stream_next(State), - {noreply, State}; -handle_response(<<"{\"results\":[\n">>, State) -> +handle_messages([], State) -> ok = maybe_stream_next(State), {noreply, State}; -handle_response(<<"\n],\n\"last_seq\":", LastSeqStr/binary>>, State) -> +handle_messages([<<"{\"results\":[">>|Rest], State) -> + handle_messages(Rest, State); +handle_messages([<<"]">>, <<"\"last_seq\":", LastSeqStr/binary>>], State) -> LastSeq = list_to_integer(?b2l(hd(re:split(LastSeqStr, "}")))), - {noreply, State#state{last_seq = LastSeq}}; -handle_response(<<"{\"last_seq\":", LastSeqStr/binary>>, State) -> + handle_feed_completion(State#state{last_seq = LastSeq}); +handle_messages([<<"{\"last_seq\":", LastSeqStr/binary>>], State) -> LastSeq = list_to_integer(?b2l(hd(re:split(LastSeqStr, "}")))), - {noreply, State#state{last_seq = LastSeq}}; -handle_response(Chunk, #state{partial_chunk=nil} = State) -> - #state{ - count = Count, - rows = Rows - } = State, - ok = maybe_stream_next(State), - try - Row = decode_row(Chunk), - case State of - #state{reply_to=nil} -> - {noreply, State#state{count=Count+1, rows = queue:in(Row, Rows)}}; - #state{count=0, reply_to=From}-> - gen_server:reply(From, [Row]), - {noreply, State#state{reply_to=nil}} - end - catch - throw:{invalid_json, Bad} -> - {noreply, State#state{partial_chunk = Bad}} - end; -handle_response(Chunk, State) -> + handle_feed_completion(State#state{last_seq = LastSeq}); +handle_messages([Chunk|Rest], State) -> #state{ count = Count, partial_chunk = Partial, rows = Rows } = State, - ok = maybe_stream_next(State), - try + NewState = try Row = decode_row(<<Partial/binary, Chunk/binary>>), - {noreply, case State of + case State of #state{reply_to=nil} -> - State#state{count=Count+1, partial_chunk=nil, rows=queue:in(Row,Rows)}; + State#state{ + count = Count+1, + partial_chunk = <<>>, + rows=queue:in(Row,Rows) + }; #state{count=0, reply_to=From}-> gen_server:reply(From, [Row]), - State#state{reply_to=nil, partial_chunk=nil} - end} + State#state{reply_to = nil, partial_chunk = <<>>} + end catch throw:{invalid_json, Bad} -> - {noreply, State#state{partial_chunk = Bad}} - end. + State#state{partial_chunk = Bad} + end, + handle_messages(Rest, NewState). handle_feed_completion(#state{reply_to=nil} = State)-> {noreply, State#state{complete=true}}; |