From c71b9fa6a197db1657aac41f8fd2994f23364d3b Mon Sep 17 00:00:00 2001 From: Adam Kocoloski Date: Sat, 12 Jun 2010 16:35:37 +0000 Subject: Fix hanging replication. COUCHDB-793. Thanks Filipe and Paul Bonser. git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@954027 13f79535-47bb-0310-9956-ffa450edef68 --- src/couchdb/couch_rep_writer.erl | 59 ++++++++++++++++++++++++++++------------ 1 file changed, 41 insertions(+), 18 deletions(-) (limited to 'src/couchdb/couch_rep_writer.erl') diff --git a/src/couchdb/couch_rep_writer.erl b/src/couchdb/couch_rep_writer.erl index 3a337255..cdbbbee0 100644 --- a/src/couchdb/couch_rep_writer.erl +++ b/src/couchdb/couch_rep_writer.erl @@ -94,49 +94,72 @@ write_multi_part_doc(#http_db{headers=Headers} = Db, #doc{atts=Atts} = Doc) -> ) ), Boundary = couch_uuids:random(), - {_ContentType, Len} = couch_doc:len_doc_to_multi_part_stream( + {ContentType, Len} = couch_doc:len_doc_to_multi_part_stream( Boundary, JsonBytes, Atts, true ), - {ok, DataQueue} = couch_work_queue:new(1024*1024, 1000), - _StreamerPid = spawn_link( - fun() -> - couch_doc:doc_to_multi_part_stream( - Boundary, - JsonBytes, - Atts, - fun(Data) -> couch_work_queue:queue(DataQueue, Data) end, - true - ), - couch_work_queue:close(DataQueue) - end + StreamerPid = spawn_link( + fun() -> streamer_fun(Boundary, JsonBytes, Atts) end ), BodyFun = fun(Acc) -> + DataQueue = case Acc of + nil -> + StreamerPid ! {start, self()}, + receive + {queue, Q} -> + Q + end; + Queue -> + Queue + end, case couch_work_queue:dequeue(DataQueue) of closed -> eof; {ok, Data} -> - {ok, iolist_to_binary(Data), Acc} + {ok, iolist_to_binary(Data), DataQueue} end end, Request = Db#http_db{ resource = couch_util:url_encode(Doc#doc.id), method = put, qs = [{new_edits, false}], - body = {BodyFun, ok}, + body = {BodyFun, nil}, headers = [ {"x-couch-full-commit", "false"}, - {"Content-Type", - "multipart/related; boundary=\"" ++ ?b2l(Boundary) ++ "\""}, + {"Content-Type", ?b2l(ContentType)}, {"Content-Length", Len} | Headers ] }, - case couch_rep_httpc:request(Request) of + Result = case couch_rep_httpc:request(Request) of {[{<<"error">>, Error}, {<<"reason">>, Reason}]} -> {Pos, [RevId | _]} = Doc#doc.revs, ErrId = couch_util:to_existing_atom(Error), [{Doc#doc.id, couch_doc:rev_to_str({Pos, RevId})}, {ErrId, Reason}]; _ -> [] + end, + StreamerPid ! stop, + Result. + +streamer_fun(Boundary, JsonBytes, Atts) -> + receive + stop -> + ok; + {start, From} -> + % better use a brand new queue, to ensure there's no garbage from + % a previous (failed) iteration + {ok, DataQueue} = couch_work_queue:new(1024 * 1024, 1000), + From ! {queue, DataQueue}, + couch_doc:doc_to_multi_part_stream( + Boundary, + JsonBytes, + Atts, + fun(Data) -> + couch_work_queue:queue(DataQueue, Data) + end, + true + ), + couch_work_queue:close(DataQueue), + streamer_fun(Boundary, JsonBytes, Atts) end. write_docs_1({Props}) -> -- cgit v1.2.3