From c71b9fa6a197db1657aac41f8fd2994f23364d3b Mon Sep 17 00:00:00 2001 From: Adam Kocoloski Date: Sat, 12 Jun 2010 16:35:37 +0000 Subject: Fix hanging replication. COUCHDB-793. Thanks Filipe and Paul Bonser. git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@954027 13f79535-47bb-0310-9956-ffa450edef68 --- src/couchdb/couch_rep_att.erl | 2 +- src/couchdb/couch_rep_reader.erl | 8 +++++- src/couchdb/couch_rep_writer.erl | 59 ++++++++++++++++++++++++++++------------ 3 files changed, 49 insertions(+), 20 deletions(-) (limited to 'src') diff --git a/src/couchdb/couch_rep_att.erl b/src/couchdb/couch_rep_att.erl index be10acc8..28b8945c 100644 --- a/src/couchdb/couch_rep_att.erl +++ b/src/couchdb/couch_rep_att.erl @@ -54,7 +54,7 @@ attachment_receiver(Ref, Request) -> receive_data(Ref, ReqId, ContentEncoding) end catch - throw:{attachment_request_failed, timeout} -> + throw:{attachment_request_failed, _} -> case {Request#http_db.retries, Request#http_db.pause} of {0, _} -> ?LOG_INFO("request for ~p failed", [Request#http_db.resource]), diff --git a/src/couchdb/couch_rep_reader.erl b/src/couchdb/couch_rep_reader.erl index 8b75258a..3edc1f37 100644 --- a/src/couchdb/couch_rep_reader.erl +++ b/src/couchdb/couch_rep_reader.erl @@ -108,6 +108,8 @@ code_change(_OldVsn, State, _Extra) -> %internal funs +handle_add_docs(_Seq, [], _From, State) -> + {reply, ok, State}; handle_add_docs(Seq, DocsToAdd, From, #state{reply_to=nil} = State) -> State1 = update_sequence_lists(Seq, State), NewState = State1#state{ @@ -151,9 +153,13 @@ handle_open_remote_doc(Id, Seq, Revs, _, #state{source=#http_db{}} = State) -> {_, _Ref} = spawn_document_request(Source, Id, Seq, Revs), {reply, ok, State#state{monitor_count = Count+1}}. -handle_monitor_down(normal, #state{pending_doc_request=nil, +handle_monitor_down(normal, #state{pending_doc_request=nil, reply_to=nil, monitor_count=1, complete=waiting_on_monitors} = State) -> {noreply, State#state{complete=true, monitor_count=0}}; +handle_monitor_down(normal, #state{pending_doc_request=nil, reply_to=From, + monitor_count=1, complete=waiting_on_monitors} = State) -> + gen_server:reply(From, {complete, calculate_new_high_seq(State)}), + {stop, normal, State#state{complete=true, monitor_count=0}}; handle_monitor_down(normal, #state{pending_doc_request=nil} = State) -> #state{monitor_count = Count} = State, {noreply, State#state{monitor_count = Count-1}}; diff --git a/src/couchdb/couch_rep_writer.erl b/src/couchdb/couch_rep_writer.erl index 3a337255..cdbbbee0 100644 --- a/src/couchdb/couch_rep_writer.erl +++ b/src/couchdb/couch_rep_writer.erl @@ -94,49 +94,72 @@ write_multi_part_doc(#http_db{headers=Headers} = Db, #doc{atts=Atts} = Doc) -> ) ), Boundary = couch_uuids:random(), - {_ContentType, Len} = couch_doc:len_doc_to_multi_part_stream( + {ContentType, Len} = couch_doc:len_doc_to_multi_part_stream( Boundary, JsonBytes, Atts, true ), - {ok, DataQueue} = couch_work_queue:new(1024*1024, 1000), - _StreamerPid = spawn_link( - fun() -> - couch_doc:doc_to_multi_part_stream( - Boundary, - JsonBytes, - Atts, - fun(Data) -> couch_work_queue:queue(DataQueue, Data) end, - true - ), - couch_work_queue:close(DataQueue) - end + StreamerPid = spawn_link( + fun() -> streamer_fun(Boundary, JsonBytes, Atts) end ), BodyFun = fun(Acc) -> + DataQueue = case Acc of + nil -> + StreamerPid ! {start, self()}, + receive + {queue, Q} -> + Q + end; + Queue -> + Queue + end, case couch_work_queue:dequeue(DataQueue) of closed -> eof; {ok, Data} -> - {ok, iolist_to_binary(Data), Acc} + {ok, iolist_to_binary(Data), DataQueue} end end, Request = Db#http_db{ resource = couch_util:url_encode(Doc#doc.id), method = put, qs = [{new_edits, false}], - body = {BodyFun, ok}, + body = {BodyFun, nil}, headers = [ {"x-couch-full-commit", "false"}, - {"Content-Type", - "multipart/related; boundary=\"" ++ ?b2l(Boundary) ++ "\""}, + {"Content-Type", ?b2l(ContentType)}, {"Content-Length", Len} | Headers ] }, - case couch_rep_httpc:request(Request) of + Result = case couch_rep_httpc:request(Request) of {[{<<"error">>, Error}, {<<"reason">>, Reason}]} -> {Pos, [RevId | _]} = Doc#doc.revs, ErrId = couch_util:to_existing_atom(Error), [{Doc#doc.id, couch_doc:rev_to_str({Pos, RevId})}, {ErrId, Reason}]; _ -> [] + end, + StreamerPid ! stop, + Result. + +streamer_fun(Boundary, JsonBytes, Atts) -> + receive + stop -> + ok; + {start, From} -> + % better use a brand new queue, to ensure there's no garbage from + % a previous (failed) iteration + {ok, DataQueue} = couch_work_queue:new(1024 * 1024, 1000), + From ! {queue, DataQueue}, + couch_doc:doc_to_multi_part_stream( + Boundary, + JsonBytes, + Atts, + fun(Data) -> + couch_work_queue:queue(DataQueue, Data) + end, + true + ), + couch_work_queue:close(DataQueue), + streamer_fun(Boundary, JsonBytes, Atts) end. write_docs_1({Props}) -> -- cgit v1.2.3