summaryrefslogtreecommitdiff
path: root/src/couchdb/couch_rep.erl
diff options
context:
space:
mode:
authorDamien F. Katz <damien@apache.org>2008-07-16 20:55:14 +0000
committerDamien F. Katz <damien@apache.org>2008-07-16 20:55:14 +0000
commit724d837ea4e009e5f3867d2e89f91b9e95ca75a1 (patch)
tree17707ed5054bab2222396d64940981cc6705811b /src/couchdb/couch_rep.erl
parentc3308660600cf7aee3eec6b67ccfe33658180739 (diff)
Fixed replication problems where read ad write queues can get backed up. With this fixed, throughput might be reduced.
git-svn-id: https://svn.apache.org/repos/asf/incubator/couchdb/trunk@677426 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src/couchdb/couch_rep.erl')
-rw-r--r--src/couchdb/couch_rep.erl103
1 files changed, 56 insertions, 47 deletions
diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl
index 0d914af0..452bb492 100644
--- a/src/couchdb/couch_rep.erl
+++ b/src/couchdb/couch_rep.erl
@@ -113,13 +113,15 @@ replicate(Source, Target, Options) ->
end.
pull_rep(DbTarget, DbSource, SourceSeqNum) ->
- Parent = self(),
SaveDocsPid = spawn_link(fun() ->
- save_docs_loop(Parent, DbTarget, 0) end),
+ save_docs_loop(DbTarget, 0) end),
OpenDocsPid = spawn_link(fun() ->
- open_doc_revs_loop(Parent, DbSource, SaveDocsPid, 0) end),
+ open_doc_revs_loop(DbSource, SaveDocsPid, 0) end),
+ OpenDocsPid ! got_it, % prime queue with got_it
MissingRevsPid = spawn_link(fun() ->
- get_missing_revs_loop(Parent, DbTarget, OpenDocsPid, 0, 0) end),
+ get_missing_revs_loop(DbTarget, OpenDocsPid, 0, 0) end),
+ MissingRevsPid ! got_it, % prime queue with got_it
+ self() ! got_it,
{ok, NewSeq} = enum_docs_since(DbSource, SourceSeqNum,
fun(SrcDocInfo, _, _) ->
#doc_info{id=Id,
@@ -128,73 +130,73 @@ pull_rep(DbTarget, DbSource, SourceSeqNum) ->
deleted_conflict_revs=DelConflicts,
update_seq=Seq} = SrcDocInfo,
SrcRevs = [Rev | Conflicts] ++ DelConflicts,
- MissingRevsPid ! {Id, SrcRevs}, % send to the missing revs process
+ receive got_it -> ok end,
+ MissingRevsPid ! {self(), Id, SrcRevs}, % send to the missing revs process
{ok, Seq}
end, SourceSeqNum),
- MissingRevsPid ! shutdown,
+
+ receive got_it -> ok end,
+
+ MissingRevsPid ! {self(), shutdown},
receive {done, MissingRevsPid, Stats1} -> ok end,
- OpenDocsPid ! shutdown,
+ OpenDocsPid ! {self(), shutdown},
receive {done, OpenDocsPid, Stats2} -> ok end,
- SaveDocsPid ! shutdown,
+ SaveDocsPid ! {self(), shutdown},
receive {done, SaveDocsPid, Stats3} -> ok end,
{NewSeq, Stats1 ++ Stats2 ++ Stats3}.
-receive_id_revs() ->
- receive
- {Id, Revs} ->
- [{Id, Revs} | receive_id_revs()]
- after 1 ->
- []
- end.
-
-get_missing_revs_loop(Parent, DbTarget, OpenDocsPid, RevsChecked, MissingFound) ->
+get_missing_revs_loop(DbTarget, OpenDocsPid, RevsChecked, MissingFound) ->
+ receive got_it -> ok end,
receive
- {Id, Revs} ->
- Changed = [{Id, Revs} | receive_id_revs()],
- {ok, Missing} = get_missing_revs(DbTarget, Changed),
- [OpenDocsPid ! {Id0, MissingRevs} || {Id0, MissingRevs} <- Missing],
- get_missing_revs_loop(Parent, DbTarget, OpenDocsPid,
- RevsChecked + length(Changed),
- MissingFound + length(Missing));
- shutdown ->
- Parent ! {done, self(), [{"missing_checked", RevsChecked},
+ {Src, Id, Revs} ->
+ Src ! got_it,
+ MissingRevs =
+ case get_missing_revs(DbTarget, [{Id, Revs}]) of
+ {ok, [{Id, MissingRevs0}]} ->
+ OpenDocsPid ! {self(), Id, MissingRevs0},
+ MissingRevs0;
+ {ok, []} ->
+ % prime our message queue
+ self() ! got_it,
+ []
+ end,
+ get_missing_revs_loop(DbTarget, OpenDocsPid,
+ RevsChecked + length(Revs),
+ MissingFound + length(MissingRevs));
+ {Src, shutdown} ->
+ Src ! {done, self(), [{"missing_checked", RevsChecked},
{"missing_found", MissingFound}]}
end.
-open_doc_revs_loop(Parent, DbSource, SaveDocsPid, DocsRead) ->
+open_doc_revs_loop(DbSource, SaveDocsPid, DocsRead) ->
+ receive got_it -> ok end,
receive
- {Id, MissingRevs} ->
+ {Src, Id, MissingRevs} ->
+ Src ! got_it,
{ok, DocResults} = open_doc_revs(DbSource, Id, MissingRevs, [latest]),
% only save successful reads
Docs = [RevDoc || {ok, RevDoc} <- DocResults],
- SaveDocsPid ! Docs,
- open_doc_revs_loop(Parent, DbSource, SaveDocsPid, DocsRead + length(Docs));
- shutdown ->
- Parent ! {done, self(), [{"docs_read", DocsRead}]}
+ SaveDocsPid ! {self(), docs, Docs},
+ open_doc_revs_loop(DbSource, SaveDocsPid, DocsRead + length(Docs));
+ {Src, shutdown} ->
+ Src ! {done, self(), [{"docs_read", DocsRead}]}
end.
-receive_docs() ->
- receive
- Docs when is_list(Docs) ->
- Docs ++ receive_docs()
- after 1 ->
- []
- end.
-save_docs_loop(Parent, DbTarget, DocsWritten) ->
+save_docs_loop(DbTarget, DocsWritten) ->
receive
- Docs0 when is_list(Docs0) ->
- Docs = Docs0 ++ receive_docs(),
+ {Src, docs, Docs} ->
+ Src ! got_it,
ok = save_docs(DbTarget, Docs, []),
- save_docs_loop(Parent, DbTarget, DocsWritten + length(Docs));
- shutdown ->
- Parent ! {done, self(), [{"docs_written", DocsWritten}]}
+ save_docs_loop(DbTarget, DocsWritten + length(Docs));
+ {Src, shutdown} ->
+ Src ! {done, self(), [{"docs_written", DocsWritten}]}
end.
@@ -239,7 +241,7 @@ open_db(DbName)->
enum_docs_since(DbUrl, StartSeq, InFun, InAcc) when is_list(DbUrl) ->
- Url = DbUrl ++ "_all_docs_by_seq?startkey=" ++ integer_to_list(StartSeq),
+ Url = DbUrl ++ "_all_docs_by_seq?count=4&startkey=" ++ integer_to_list(StartSeq),
{obj, Results} = do_http_request(Url, get),
DocInfoList=
lists:map(fun({obj, RowInfoList}) ->
@@ -254,7 +256,14 @@ enum_docs_since(DbUrl, StartSeq, InFun, InAcc) when is_list(DbUrl) ->
tuple_to_list(proplists:get_value("deleted_conflicts", RowValueProps, {})),
deleted = proplists:get_value("deleted", RowValueProps, false)}
end, tuple_to_list(proplists:get_value("rows", Results))),
- {ok, enum_docs0(InFun, DocInfoList, InAcc)};
+ case DocInfoList of
+ [] ->
+ {ok, InAcc};
+ _ ->
+ Acc2 = enum_docs0(InFun, DocInfoList, InAcc),
+ #doc_info{update_seq=LastSeq} = lists:last(DocInfoList),
+ enum_docs_since(DbUrl, LastSeq, InFun, Acc2)
+ end;
enum_docs_since(DbSource, StartSeq, Fun, Acc) ->
couch_db:enum_docs_since(DbSource, StartSeq, Fun, Acc).