summaryrefslogtreecommitdiff
path: root/src/couchdb/couch_rep_writer.erl
diff options
context:
space:
mode:
authorAdam Kocoloski <kocolosk@apache.org>2010-06-12 16:35:37 +0000
committerAdam Kocoloski <kocolosk@apache.org>2010-06-12 16:35:37 +0000
commitc71b9fa6a197db1657aac41f8fd2994f23364d3b (patch)
tree7c241328061034c3bd59b7d7f68180b71e017cbf /src/couchdb/couch_rep_writer.erl
parent9a0de9a3f3828e5269f39f688c4e50db41527e8c (diff)
Fix hanging replication. COUCHDB-793. Thanks Filipe and Paul Bonser.
git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@954027 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src/couchdb/couch_rep_writer.erl')
-rw-r--r--src/couchdb/couch_rep_writer.erl59
1 files changed, 41 insertions, 18 deletions
diff --git a/src/couchdb/couch_rep_writer.erl b/src/couchdb/couch_rep_writer.erl
index 3a337255..cdbbbee0 100644
--- a/src/couchdb/couch_rep_writer.erl
+++ b/src/couchdb/couch_rep_writer.erl
@@ -94,49 +94,72 @@ write_multi_part_doc(#http_db{headers=Headers} = Db, #doc{atts=Atts} = Doc) ->
)
),
Boundary = couch_uuids:random(),
- {_ContentType, Len} = couch_doc:len_doc_to_multi_part_stream(
+ {ContentType, Len} = couch_doc:len_doc_to_multi_part_stream(
Boundary, JsonBytes, Atts, true
),
- {ok, DataQueue} = couch_work_queue:new(1024*1024, 1000),
- _StreamerPid = spawn_link(
- fun() ->
- couch_doc:doc_to_multi_part_stream(
- Boundary,
- JsonBytes,
- Atts,
- fun(Data) -> couch_work_queue:queue(DataQueue, Data) end,
- true
- ),
- couch_work_queue:close(DataQueue)
- end
+ StreamerPid = spawn_link(
+ fun() -> streamer_fun(Boundary, JsonBytes, Atts) end
),
BodyFun = fun(Acc) ->
+ DataQueue = case Acc of
+ nil ->
+ StreamerPid ! {start, self()},
+ receive
+ {queue, Q} ->
+ Q
+ end;
+ Queue ->
+ Queue
+ end,
case couch_work_queue:dequeue(DataQueue) of
closed ->
eof;
{ok, Data} ->
- {ok, iolist_to_binary(Data), Acc}
+ {ok, iolist_to_binary(Data), DataQueue}
end
end,
Request = Db#http_db{
resource = couch_util:url_encode(Doc#doc.id),
method = put,
qs = [{new_edits, false}],
- body = {BodyFun, ok},
+ body = {BodyFun, nil},
headers = [
{"x-couch-full-commit", "false"},
- {"Content-Type",
- "multipart/related; boundary=\"" ++ ?b2l(Boundary) ++ "\""},
+ {"Content-Type", ?b2l(ContentType)},
{"Content-Length", Len} | Headers
]
},
- case couch_rep_httpc:request(Request) of
+ Result = case couch_rep_httpc:request(Request) of
{[{<<"error">>, Error}, {<<"reason">>, Reason}]} ->
{Pos, [RevId | _]} = Doc#doc.revs,
ErrId = couch_util:to_existing_atom(Error),
[{Doc#doc.id, couch_doc:rev_to_str({Pos, RevId})}, {ErrId, Reason}];
_ ->
[]
+ end,
+ StreamerPid ! stop,
+ Result.
+
+streamer_fun(Boundary, JsonBytes, Atts) ->
+ receive
+ stop ->
+ ok;
+ {start, From} ->
+ % better use a brand new queue, to ensure there's no garbage from
+ % a previous (failed) iteration
+ {ok, DataQueue} = couch_work_queue:new(1024 * 1024, 1000),
+ From ! {queue, DataQueue},
+ couch_doc:doc_to_multi_part_stream(
+ Boundary,
+ JsonBytes,
+ Atts,
+ fun(Data) ->
+ couch_work_queue:queue(DataQueue, Data)
+ end,
+ true
+ ),
+ couch_work_queue:close(DataQueue),
+ streamer_fun(Boundary, JsonBytes, Atts)
end.
write_docs_1({Props}) ->