summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAdam Kocoloski <kocolosk@apache.org>2009-09-02 03:40:44 +0000
committerAdam Kocoloski <kocolosk@apache.org>2009-09-02 03:40:44 +0000
commit4536671ff6ca35e16665867ef40a1298e47f4626 (patch)
tree651024567a36560bfc9b4b6ced02a74b6fb0f4e1 /src
parentc1b797c10aef694415276f2fcb85676fddfb0ad8 (diff)
Support for replication over SSL. Resolves COUCHDB-491
This turned out to be a decent amount of work, since: 1) ibrowse did not use SSL on dedicated connections. Wrote a simplistic patch, will contact Chandru for further discussion. 2) When nginx is used for the SSL wrapper, it wants to buffer the changes feed. Setting "proxy_buffering off" in nginx.conf helps, but some buffering still occurred. Fixed by making couch_rep_changes_feed smart enough to split merged chunks. 3) The Erlang ssl application showed instabilities when used with {active,once}. Switched to the "new implementation" using {ssl_imp, new} and instabilities disappeared. git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@810350 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src')
-rw-r--r--src/couchdb/couch_app.erl2
-rw-r--r--src/couchdb/couch_rep_changes_feed.erl71
-rw-r--r--src/couchdb/couch_rep_httpc.erl4
-rw-r--r--src/ibrowse/ibrowse_http_client.erl12
4 files changed, 42 insertions, 47 deletions
diff --git a/src/couchdb/couch_app.erl b/src/couchdb/couch_app.erl
index 98615e50..1b64434a 100644
--- a/src/couchdb/couch_app.erl
+++ b/src/couchdb/couch_app.erl
@@ -20,7 +20,7 @@
start(_Type, DefaultIniFiles) ->
IniFiles = get_ini_files(DefaultIniFiles),
- case start_apps([crypto, sasl, inets, oauth, ibrowse, mochiweb]) of
+ case start_apps([crypto, sasl, inets, oauth, ssl, ibrowse, mochiweb]) of
ok ->
couch_server_sup:start_link(IniFiles);
{error, Reason} ->
diff --git a/src/couchdb/couch_rep_changes_feed.erl b/src/couchdb/couch_rep_changes_feed.erl
index da324a0e..f301b6e7 100644
--- a/src/couchdb/couch_rep_changes_feed.erl
+++ b/src/couchdb/couch_rep_changes_feed.erl
@@ -30,7 +30,7 @@
reqid = nil,
complete = false,
count = 0,
- partial_chunk = nil,
+ partial_chunk = <<>>,
reply_to = nil,
rows = queue:new()
}).
@@ -60,7 +60,7 @@ init([_Parent, #http_db{}=Source, Since, PostProps]) ->
conn = Pid,
options = [{stream_to, {self(), once}}, {response_format, binary}],
headers = Source#http_db.headers -- [{"Accept-Encoding", "gzip"}]
- },
+ },
{ibrowse_req_id, ReqId} = couch_rep_httpc:request(Req),
receive
@@ -127,8 +127,12 @@ handle_cast(_Msg, State) ->
handle_info({ibrowse_async_headers, Id, Code, Hdrs}, #state{reqid=Id}=State) ->
handle_headers(list_to_integer(Code), Hdrs, State);
-handle_info({ibrowse_async_response, Id, Msg}, #state{reqid=Id} = State) ->
- handle_response(Msg, State);
+handle_info({ibrowse_async_response, Id, {error,E}}, #state{reqid=Id}=State) ->
+ {stop, {error, E}, State};
+
+handle_info({ibrowse_async_response, Id, Chunk}, #state{reqid=Id}=State) ->
+ Messages = [M || M <- re:split(Chunk, ",?\n", [trim]), M =/= <<>>],
+ handle_messages(Messages, State);
handle_info({ibrowse_async_response_end, Id}, #state{reqid=Id} = State) ->
handle_feed_completion(State);
@@ -200,60 +204,41 @@ handle_headers(Code, Hdrs, State) ->
[Code,Hdrs]),
{stop, {error, Code}, State}.
-handle_response({error, Reason}, State) ->
- {stop, {error, Reason}, State};
-handle_response(<<"\n">>, State) ->
- ?LOG_DEBUG("got a heartbeat from the remote server", []),
- ok = maybe_stream_next(State),
- {noreply, State};
-handle_response(<<"{\"results\":[\n">>, State) ->
+handle_messages([], State) ->
ok = maybe_stream_next(State),
{noreply, State};
-handle_response(<<"\n],\n\"last_seq\":", LastSeqStr/binary>>, State) ->
+handle_messages([<<"{\"results\":[">>|Rest], State) ->
+ handle_messages(Rest, State);
+handle_messages([<<"]">>, <<"\"last_seq\":", LastSeqStr/binary>>], State) ->
LastSeq = list_to_integer(?b2l(hd(re:split(LastSeqStr, "}")))),
- {noreply, State#state{last_seq = LastSeq}};
-handle_response(<<"{\"last_seq\":", LastSeqStr/binary>>, State) ->
+ handle_feed_completion(State#state{last_seq = LastSeq});
+handle_messages([<<"{\"last_seq\":", LastSeqStr/binary>>], State) ->
LastSeq = list_to_integer(?b2l(hd(re:split(LastSeqStr, "}")))),
- {noreply, State#state{last_seq = LastSeq}};
-handle_response(Chunk, #state{partial_chunk=nil} = State) ->
- #state{
- count = Count,
- rows = Rows
- } = State,
- ok = maybe_stream_next(State),
- try
- Row = decode_row(Chunk),
- case State of
- #state{reply_to=nil} ->
- {noreply, State#state{count=Count+1, rows = queue:in(Row, Rows)}};
- #state{count=0, reply_to=From}->
- gen_server:reply(From, [Row]),
- {noreply, State#state{reply_to=nil}}
- end
- catch
- throw:{invalid_json, Bad} ->
- {noreply, State#state{partial_chunk = Bad}}
- end;
-handle_response(Chunk, State) ->
+ handle_feed_completion(State#state{last_seq = LastSeq});
+handle_messages([Chunk|Rest], State) ->
#state{
count = Count,
partial_chunk = Partial,
rows = Rows
} = State,
- ok = maybe_stream_next(State),
- try
+ NewState = try
Row = decode_row(<<Partial/binary, Chunk/binary>>),
- {noreply, case State of
+ case State of
#state{reply_to=nil} ->
- State#state{count=Count+1, partial_chunk=nil, rows=queue:in(Row,Rows)};
+ State#state{
+ count = Count+1,
+ partial_chunk = <<>>,
+ rows=queue:in(Row,Rows)
+ };
#state{count=0, reply_to=From}->
gen_server:reply(From, [Row]),
- State#state{reply_to=nil, partial_chunk=nil}
- end}
+ State#state{reply_to = nil, partial_chunk = <<>>}
+ end
catch
throw:{invalid_json, Bad} ->
- {noreply, State#state{partial_chunk = Bad}}
- end.
+ State#state{partial_chunk = Bad}
+ end,
+ handle_messages(Rest, NewState).
handle_feed_completion(#state{reply_to=nil} = State)->
{noreply, State#state{complete=true}};
diff --git a/src/couchdb/couch_rep_httpc.erl b/src/couchdb/couch_rep_httpc.erl
index 9e8bfb42..5689fa29 100644
--- a/src/couchdb/couch_rep_httpc.erl
+++ b/src/couchdb/couch_rep_httpc.erl
@@ -149,12 +149,12 @@ process_response({error, Reason}, Req) ->
spawn_worker_process(Req) ->
Url = ibrowse_lib:parse_url(Req#http_db.url),
- {ok, Pid} = ibrowse:spawn_worker_process(Url#url.host, Url#url.port),
+ {ok, Pid} = ibrowse_http_client:start(Url),
Pid.
spawn_link_worker_process(Req) ->
Url = ibrowse_lib:parse_url(Req#http_db.url),
- {ok, Pid} = ibrowse:spawn_link_worker_process(Url#url.host, Url#url.port),
+ {ok, Pid} = ibrowse_http_client:start_link(Url),
Pid.
maybe_decompress(Headers, Body) ->
diff --git a/src/ibrowse/ibrowse_http_client.erl b/src/ibrowse/ibrowse_http_client.erl
index dde258ef..5f62f705 100644
--- a/src/ibrowse/ibrowse_http_client.erl
+++ b/src/ibrowse/ibrowse_http_client.erl
@@ -113,6 +113,16 @@ init({Host, Port}) ->
port = Port},
put(ibrowse_trace_token, [Host, $:, integer_to_list(Port)]),
put(my_trace_flag, ibrowse_lib:get_trace_status(Host, Port)),
+ {ok, State};
+init(#url{host=Host, port=Port, protocol=Protocol}) ->
+ State = #state{
+ host = Host,
+ port = Port,
+ is_ssl = (Protocol == https),
+ ssl_options = [{ssl_imp, new}]
+ },
+ put(ibrowse_trace_token, [Host, $:, integer_to_list(Port)]),
+ put(my_trace_flag, ibrowse_lib:get_trace_status(Host, Port)),
{ok, State}.
%%--------------------------------------------------------------------
@@ -137,7 +147,7 @@ handle_call({send_req, {Url, Headers, Method, Body, Options, Timeout}},
handle_call(stop, _From, State) ->
do_close(State),
do_error_reply(State, closing_on_request),
- {stop, normal, ok, State};
+ {stop, normal, ok, State#state{socket=undefined}};
handle_call(Request, _From, State) ->
Reply = {unknown_request, Request},