diff options
Diffstat (limited to 'src/couchdb/couch_rep.erl')
-rw-r--r-- | src/couchdb/couch_rep.erl | 103 |
1 files changed, 56 insertions, 47 deletions
diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl index 0d914af0..452bb492 100644 --- a/src/couchdb/couch_rep.erl +++ b/src/couchdb/couch_rep.erl @@ -113,13 +113,15 @@ replicate(Source, Target, Options) -> end. pull_rep(DbTarget, DbSource, SourceSeqNum) -> - Parent = self(), SaveDocsPid = spawn_link(fun() -> - save_docs_loop(Parent, DbTarget, 0) end), + save_docs_loop(DbTarget, 0) end), OpenDocsPid = spawn_link(fun() -> - open_doc_revs_loop(Parent, DbSource, SaveDocsPid, 0) end), + open_doc_revs_loop(DbSource, SaveDocsPid, 0) end), + OpenDocsPid ! got_it, % prime queue with got_it MissingRevsPid = spawn_link(fun() -> - get_missing_revs_loop(Parent, DbTarget, OpenDocsPid, 0, 0) end), + get_missing_revs_loop(DbTarget, OpenDocsPid, 0, 0) end), + MissingRevsPid ! got_it, % prime queue with got_it + self() ! got_it, {ok, NewSeq} = enum_docs_since(DbSource, SourceSeqNum, fun(SrcDocInfo, _, _) -> #doc_info{id=Id, @@ -128,73 +130,73 @@ pull_rep(DbTarget, DbSource, SourceSeqNum) -> deleted_conflict_revs=DelConflicts, update_seq=Seq} = SrcDocInfo, SrcRevs = [Rev | Conflicts] ++ DelConflicts, - MissingRevsPid ! {Id, SrcRevs}, % send to the missing revs process + receive got_it -> ok end, + MissingRevsPid ! {self(), Id, SrcRevs}, % send to the missing revs process {ok, Seq} end, SourceSeqNum), - MissingRevsPid ! shutdown, + + receive got_it -> ok end, + + MissingRevsPid ! {self(), shutdown}, receive {done, MissingRevsPid, Stats1} -> ok end, - OpenDocsPid ! shutdown, + OpenDocsPid ! {self(), shutdown}, receive {done, OpenDocsPid, Stats2} -> ok end, - SaveDocsPid ! shutdown, + SaveDocsPid ! {self(), shutdown}, receive {done, SaveDocsPid, Stats3} -> ok end, {NewSeq, Stats1 ++ Stats2 ++ Stats3}. -receive_id_revs() -> - receive - {Id, Revs} -> - [{Id, Revs} | receive_id_revs()] - after 1 -> - [] - end. - -get_missing_revs_loop(Parent, DbTarget, OpenDocsPid, RevsChecked, MissingFound) -> +get_missing_revs_loop(DbTarget, OpenDocsPid, RevsChecked, MissingFound) -> + receive got_it -> ok end, receive - {Id, Revs} -> - Changed = [{Id, Revs} | receive_id_revs()], - {ok, Missing} = get_missing_revs(DbTarget, Changed), - [OpenDocsPid ! {Id0, MissingRevs} || {Id0, MissingRevs} <- Missing], - get_missing_revs_loop(Parent, DbTarget, OpenDocsPid, - RevsChecked + length(Changed), - MissingFound + length(Missing)); - shutdown -> - Parent ! {done, self(), [{"missing_checked", RevsChecked}, + {Src, Id, Revs} -> + Src ! got_it, + MissingRevs = + case get_missing_revs(DbTarget, [{Id, Revs}]) of + {ok, [{Id, MissingRevs0}]} -> + OpenDocsPid ! {self(), Id, MissingRevs0}, + MissingRevs0; + {ok, []} -> + % prime our message queue + self() ! got_it, + [] + end, + get_missing_revs_loop(DbTarget, OpenDocsPid, + RevsChecked + length(Revs), + MissingFound + length(MissingRevs)); + {Src, shutdown} -> + Src ! {done, self(), [{"missing_checked", RevsChecked}, {"missing_found", MissingFound}]} end. -open_doc_revs_loop(Parent, DbSource, SaveDocsPid, DocsRead) -> +open_doc_revs_loop(DbSource, SaveDocsPid, DocsRead) -> + receive got_it -> ok end, receive - {Id, MissingRevs} -> + {Src, Id, MissingRevs} -> + Src ! got_it, {ok, DocResults} = open_doc_revs(DbSource, Id, MissingRevs, [latest]), % only save successful reads Docs = [RevDoc || {ok, RevDoc} <- DocResults], - SaveDocsPid ! Docs, - open_doc_revs_loop(Parent, DbSource, SaveDocsPid, DocsRead + length(Docs)); - shutdown -> - Parent ! {done, self(), [{"docs_read", DocsRead}]} + SaveDocsPid ! {self(), docs, Docs}, + open_doc_revs_loop(DbSource, SaveDocsPid, DocsRead + length(Docs)); + {Src, shutdown} -> + Src ! {done, self(), [{"docs_read", DocsRead}]} end. -receive_docs() -> - receive - Docs when is_list(Docs) -> - Docs ++ receive_docs() - after 1 -> - [] - end. -save_docs_loop(Parent, DbTarget, DocsWritten) -> +save_docs_loop(DbTarget, DocsWritten) -> receive - Docs0 when is_list(Docs0) -> - Docs = Docs0 ++ receive_docs(), + {Src, docs, Docs} -> + Src ! got_it, ok = save_docs(DbTarget, Docs, []), - save_docs_loop(Parent, DbTarget, DocsWritten + length(Docs)); - shutdown -> - Parent ! {done, self(), [{"docs_written", DocsWritten}]} + save_docs_loop(DbTarget, DocsWritten + length(Docs)); + {Src, shutdown} -> + Src ! {done, self(), [{"docs_written", DocsWritten}]} end. @@ -239,7 +241,7 @@ open_db(DbName)-> enum_docs_since(DbUrl, StartSeq, InFun, InAcc) when is_list(DbUrl) -> - Url = DbUrl ++ "_all_docs_by_seq?startkey=" ++ integer_to_list(StartSeq), + Url = DbUrl ++ "_all_docs_by_seq?count=4&startkey=" ++ integer_to_list(StartSeq), {obj, Results} = do_http_request(Url, get), DocInfoList= lists:map(fun({obj, RowInfoList}) -> @@ -254,7 +256,14 @@ enum_docs_since(DbUrl, StartSeq, InFun, InAcc) when is_list(DbUrl) -> tuple_to_list(proplists:get_value("deleted_conflicts", RowValueProps, {})), deleted = proplists:get_value("deleted", RowValueProps, false)} end, tuple_to_list(proplists:get_value("rows", Results))), - {ok, enum_docs0(InFun, DocInfoList, InAcc)}; + case DocInfoList of + [] -> + {ok, InAcc}; + _ -> + Acc2 = enum_docs0(InFun, DocInfoList, InAcc), + #doc_info{update_seq=LastSeq} = lists:last(DocInfoList), + enum_docs_since(DbUrl, LastSeq, InFun, Acc2) + end; enum_docs_since(DbSource, StartSeq, Fun, Acc) -> couch_db:enum_docs_since(DbSource, StartSeq, Fun, Acc). |