summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAdam Kocoloski <kocolosk@apache.org>2010-06-12 16:35:37 +0000
committerAdam Kocoloski <kocolosk@apache.org>2010-06-12 16:35:37 +0000
commitc71b9fa6a197db1657aac41f8fd2994f23364d3b (patch)
tree7c241328061034c3bd59b7d7f68180b71e017cbf /src
parent9a0de9a3f3828e5269f39f688c4e50db41527e8c (diff)
Fix hanging replication. COUCHDB-793. Thanks Filipe and Paul Bonser.
git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@954027 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src')
-rw-r--r--src/couchdb/couch_rep_att.erl2
-rw-r--r--src/couchdb/couch_rep_reader.erl8
-rw-r--r--src/couchdb/couch_rep_writer.erl59
3 files changed, 49 insertions, 20 deletions
diff --git a/src/couchdb/couch_rep_att.erl b/src/couchdb/couch_rep_att.erl
index be10acc8..28b8945c 100644
--- a/src/couchdb/couch_rep_att.erl
+++ b/src/couchdb/couch_rep_att.erl
@@ -54,7 +54,7 @@ attachment_receiver(Ref, Request) ->
receive_data(Ref, ReqId, ContentEncoding)
end
catch
- throw:{attachment_request_failed, timeout} ->
+ throw:{attachment_request_failed, _} ->
case {Request#http_db.retries, Request#http_db.pause} of
{0, _} ->
?LOG_INFO("request for ~p failed", [Request#http_db.resource]),
diff --git a/src/couchdb/couch_rep_reader.erl b/src/couchdb/couch_rep_reader.erl
index 8b75258a..3edc1f37 100644
--- a/src/couchdb/couch_rep_reader.erl
+++ b/src/couchdb/couch_rep_reader.erl
@@ -108,6 +108,8 @@ code_change(_OldVsn, State, _Extra) ->
%internal funs
+handle_add_docs(_Seq, [], _From, State) ->
+ {reply, ok, State};
handle_add_docs(Seq, DocsToAdd, From, #state{reply_to=nil} = State) ->
State1 = update_sequence_lists(Seq, State),
NewState = State1#state{
@@ -151,9 +153,13 @@ handle_open_remote_doc(Id, Seq, Revs, _, #state{source=#http_db{}} = State) ->
{_, _Ref} = spawn_document_request(Source, Id, Seq, Revs),
{reply, ok, State#state{monitor_count = Count+1}}.
-handle_monitor_down(normal, #state{pending_doc_request=nil,
+handle_monitor_down(normal, #state{pending_doc_request=nil, reply_to=nil,
monitor_count=1, complete=waiting_on_monitors} = State) ->
{noreply, State#state{complete=true, monitor_count=0}};
+handle_monitor_down(normal, #state{pending_doc_request=nil, reply_to=From,
+ monitor_count=1, complete=waiting_on_monitors} = State) ->
+ gen_server:reply(From, {complete, calculate_new_high_seq(State)}),
+ {stop, normal, State#state{complete=true, monitor_count=0}};
handle_monitor_down(normal, #state{pending_doc_request=nil} = State) ->
#state{monitor_count = Count} = State,
{noreply, State#state{monitor_count = Count-1}};
diff --git a/src/couchdb/couch_rep_writer.erl b/src/couchdb/couch_rep_writer.erl
index 3a337255..cdbbbee0 100644
--- a/src/couchdb/couch_rep_writer.erl
+++ b/src/couchdb/couch_rep_writer.erl
@@ -94,49 +94,72 @@ write_multi_part_doc(#http_db{headers=Headers} = Db, #doc{atts=Atts} = Doc) ->
)
),
Boundary = couch_uuids:random(),
- {_ContentType, Len} = couch_doc:len_doc_to_multi_part_stream(
+ {ContentType, Len} = couch_doc:len_doc_to_multi_part_stream(
Boundary, JsonBytes, Atts, true
),
- {ok, DataQueue} = couch_work_queue:new(1024*1024, 1000),
- _StreamerPid = spawn_link(
- fun() ->
- couch_doc:doc_to_multi_part_stream(
- Boundary,
- JsonBytes,
- Atts,
- fun(Data) -> couch_work_queue:queue(DataQueue, Data) end,
- true
- ),
- couch_work_queue:close(DataQueue)
- end
+ StreamerPid = spawn_link(
+ fun() -> streamer_fun(Boundary, JsonBytes, Atts) end
),
BodyFun = fun(Acc) ->
+ DataQueue = case Acc of
+ nil ->
+ StreamerPid ! {start, self()},
+ receive
+ {queue, Q} ->
+ Q
+ end;
+ Queue ->
+ Queue
+ end,
case couch_work_queue:dequeue(DataQueue) of
closed ->
eof;
{ok, Data} ->
- {ok, iolist_to_binary(Data), Acc}
+ {ok, iolist_to_binary(Data), DataQueue}
end
end,
Request = Db#http_db{
resource = couch_util:url_encode(Doc#doc.id),
method = put,
qs = [{new_edits, false}],
- body = {BodyFun, ok},
+ body = {BodyFun, nil},
headers = [
{"x-couch-full-commit", "false"},
- {"Content-Type",
- "multipart/related; boundary=\"" ++ ?b2l(Boundary) ++ "\""},
+ {"Content-Type", ?b2l(ContentType)},
{"Content-Length", Len} | Headers
]
},
- case couch_rep_httpc:request(Request) of
+ Result = case couch_rep_httpc:request(Request) of
{[{<<"error">>, Error}, {<<"reason">>, Reason}]} ->
{Pos, [RevId | _]} = Doc#doc.revs,
ErrId = couch_util:to_existing_atom(Error),
[{Doc#doc.id, couch_doc:rev_to_str({Pos, RevId})}, {ErrId, Reason}];
_ ->
[]
+ end,
+ StreamerPid ! stop,
+ Result.
+
+streamer_fun(Boundary, JsonBytes, Atts) ->
+ receive
+ stop ->
+ ok;
+ {start, From} ->
+ % better use a brand new queue, to ensure there's no garbage from
+ % a previous (failed) iteration
+ {ok, DataQueue} = couch_work_queue:new(1024 * 1024, 1000),
+ From ! {queue, DataQueue},
+ couch_doc:doc_to_multi_part_stream(
+ Boundary,
+ JsonBytes,
+ Atts,
+ fun(Data) ->
+ couch_work_queue:queue(DataQueue, Data)
+ end,
+ true
+ ),
+ couch_work_queue:close(DataQueue),
+ streamer_fun(Boundary, JsonBytes, Atts)
end.
write_docs_1({Props}) ->