summaryrefslogtreecommitdiff
path: root/src/couchdb/couch_rep_changes_feed.erl
diff options
context:
space:
mode:
authorAdam Kocoloski <kocolosk@apache.org>2009-09-02 03:40:44 +0000
committerAdam Kocoloski <kocolosk@apache.org>2009-09-02 03:40:44 +0000
commit4536671ff6ca35e16665867ef40a1298e47f4626 (patch)
tree651024567a36560bfc9b4b6ced02a74b6fb0f4e1 /src/couchdb/couch_rep_changes_feed.erl
parentc1b797c10aef694415276f2fcb85676fddfb0ad8 (diff)
Support for replication over SSL. Resolves COUCHDB-491
This turned out to be a decent amount of work, since: 1) ibrowse did not use SSL on dedicated connections. Wrote a simplistic patch, will contact Chandru for further discussion. 2) When nginx is used for the SSL wrapper, it wants to buffer the changes feed. Setting "proxy_buffering off" in nginx.conf helps, but some buffering still occurred. Fixed by making couch_rep_changes_feed smart enough to split merged chunks. 3) The Erlang ssl application showed instabilities when used with {active,once}. Switched to the "new implementation" using {ssl_imp, new} and instabilities disappeared. git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@810350 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src/couchdb/couch_rep_changes_feed.erl')
-rw-r--r--src/couchdb/couch_rep_changes_feed.erl71
1 files changed, 28 insertions, 43 deletions
diff --git a/src/couchdb/couch_rep_changes_feed.erl b/src/couchdb/couch_rep_changes_feed.erl
index da324a0e..f301b6e7 100644
--- a/src/couchdb/couch_rep_changes_feed.erl
+++ b/src/couchdb/couch_rep_changes_feed.erl
@@ -30,7 +30,7 @@
reqid = nil,
complete = false,
count = 0,
- partial_chunk = nil,
+ partial_chunk = <<>>,
reply_to = nil,
rows = queue:new()
}).
@@ -60,7 +60,7 @@ init([_Parent, #http_db{}=Source, Since, PostProps]) ->
conn = Pid,
options = [{stream_to, {self(), once}}, {response_format, binary}],
headers = Source#http_db.headers -- [{"Accept-Encoding", "gzip"}]
- },
+ },
{ibrowse_req_id, ReqId} = couch_rep_httpc:request(Req),
receive
@@ -127,8 +127,12 @@ handle_cast(_Msg, State) ->
handle_info({ibrowse_async_headers, Id, Code, Hdrs}, #state{reqid=Id}=State) ->
handle_headers(list_to_integer(Code), Hdrs, State);
-handle_info({ibrowse_async_response, Id, Msg}, #state{reqid=Id} = State) ->
- handle_response(Msg, State);
+handle_info({ibrowse_async_response, Id, {error,E}}, #state{reqid=Id}=State) ->
+ {stop, {error, E}, State};
+
+handle_info({ibrowse_async_response, Id, Chunk}, #state{reqid=Id}=State) ->
+ Messages = [M || M <- re:split(Chunk, ",?\n", [trim]), M =/= <<>>],
+ handle_messages(Messages, State);
handle_info({ibrowse_async_response_end, Id}, #state{reqid=Id} = State) ->
handle_feed_completion(State);
@@ -200,60 +204,41 @@ handle_headers(Code, Hdrs, State) ->
[Code,Hdrs]),
{stop, {error, Code}, State}.
-handle_response({error, Reason}, State) ->
- {stop, {error, Reason}, State};
-handle_response(<<"\n">>, State) ->
- ?LOG_DEBUG("got a heartbeat from the remote server", []),
- ok = maybe_stream_next(State),
- {noreply, State};
-handle_response(<<"{\"results\":[\n">>, State) ->
+handle_messages([], State) ->
ok = maybe_stream_next(State),
{noreply, State};
-handle_response(<<"\n],\n\"last_seq\":", LastSeqStr/binary>>, State) ->
+handle_messages([<<"{\"results\":[">>|Rest], State) ->
+ handle_messages(Rest, State);
+handle_messages([<<"]">>, <<"\"last_seq\":", LastSeqStr/binary>>], State) ->
LastSeq = list_to_integer(?b2l(hd(re:split(LastSeqStr, "}")))),
- {noreply, State#state{last_seq = LastSeq}};
-handle_response(<<"{\"last_seq\":", LastSeqStr/binary>>, State) ->
+ handle_feed_completion(State#state{last_seq = LastSeq});
+handle_messages([<<"{\"last_seq\":", LastSeqStr/binary>>], State) ->
LastSeq = list_to_integer(?b2l(hd(re:split(LastSeqStr, "}")))),
- {noreply, State#state{last_seq = LastSeq}};
-handle_response(Chunk, #state{partial_chunk=nil} = State) ->
- #state{
- count = Count,
- rows = Rows
- } = State,
- ok = maybe_stream_next(State),
- try
- Row = decode_row(Chunk),
- case State of
- #state{reply_to=nil} ->
- {noreply, State#state{count=Count+1, rows = queue:in(Row, Rows)}};
- #state{count=0, reply_to=From}->
- gen_server:reply(From, [Row]),
- {noreply, State#state{reply_to=nil}}
- end
- catch
- throw:{invalid_json, Bad} ->
- {noreply, State#state{partial_chunk = Bad}}
- end;
-handle_response(Chunk, State) ->
+ handle_feed_completion(State#state{last_seq = LastSeq});
+handle_messages([Chunk|Rest], State) ->
#state{
count = Count,
partial_chunk = Partial,
rows = Rows
} = State,
- ok = maybe_stream_next(State),
- try
+ NewState = try
Row = decode_row(<<Partial/binary, Chunk/binary>>),
- {noreply, case State of
+ case State of
#state{reply_to=nil} ->
- State#state{count=Count+1, partial_chunk=nil, rows=queue:in(Row,Rows)};
+ State#state{
+ count = Count+1,
+ partial_chunk = <<>>,
+ rows=queue:in(Row,Rows)
+ };
#state{count=0, reply_to=From}->
gen_server:reply(From, [Row]),
- State#state{reply_to=nil, partial_chunk=nil}
- end}
+ State#state{reply_to = nil, partial_chunk = <<>>}
+ end
catch
throw:{invalid_json, Bad} ->
- {noreply, State#state{partial_chunk = Bad}}
- end.
+ State#state{partial_chunk = Bad}
+ end,
+ handle_messages(Rest, NewState).
handle_feed_completion(#state{reply_to=nil} = State)->
{noreply, State#state{complete=true}};