diff options
Diffstat (limited to 'src/couchdb/couch_rep.erl')
-rw-r--r-- | src/couchdb/couch_rep.erl | 114 |
1 files changed, 79 insertions, 35 deletions
diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl index bead047c..f6fd0fad 100644 --- a/src/couchdb/couch_rep.erl +++ b/src/couchdb/couch_rep.erl @@ -16,14 +16,6 @@ -export([replicate/2, replicate/3]). --record(stats, { - docs_read=0, - read_errors=0, - docs_copied=0, - copy_errors=0 - }). - - url_encode([H|T]) -> if H >= $a, $z >= H -> @@ -87,7 +79,8 @@ replicate(Source, Target, Options) -> false -> SeqNum0 end, - {NewSeqNum, Stats} = pull_rep(DbTgt, DbSrc, SeqNum, #stats{}), + {NewSeqNum, Stats} = pull_rep(DbTgt, DbSrc, SeqNum), + case NewSeqNum == SeqNum andalso OldRepHistoryProps /= [] of true -> % nothing changed, don't record results @@ -98,11 +91,7 @@ replicate(Source, Target, Options) -> [{"start_time", StartTime}, {"end_time", httpd_util:rfc1123_date()}, {"start_last_seq", SeqNum}, - {"end_last_seq", NewSeqNum}, - {"docs_read", Stats#stats.docs_read}, - {"read_errors", Stats#stats.read_errors}, - {"docs_copied", Stats#stats.docs_copied}, - {"copy_errors", Stats#stats.copy_errors}]} + {"end_last_seq", NewSeqNum} | Stats]} | tuple_to_list(proplists:get_value("history", OldRepHistoryProps, {}))], % something changed, record results NewRepHistory = @@ -116,34 +105,89 @@ replicate(Source, Target, Options) -> {ok, NewRepHistory} end. -pull_rep(DbTarget, DbSource, SourceSeqNum, Stats) -> +pull_rep(DbTarget, DbSource, SourceSeqNum) -> + Parent = self(), + SaveDocsPid = spawn_link(fun() -> + save_docs_loop(Parent, DbTarget, 0) end), + OpenDocsPid = spawn_link(fun() -> + open_doc_revs_loop(Parent, DbSource, SaveDocsPid, 0) end), + MissingRevsPid = spawn_link(fun() -> + get_missing_revs_loop(Parent, DbTarget, OpenDocsPid, 0, 0) end), {ok, NewSeq} = enum_docs_since(DbSource, SourceSeqNum, - fun(#doc_info{update_seq=Seq}=SrcDocInfo, _, {_, AccStats}) -> - Stats2 = maybe_save_docs(DbTarget, DbSource, SrcDocInfo, AccStats), - {ok, {Seq, Stats2}} - end, {SourceSeqNum, Stats}), - NewSeq. - + fun(SrcDocInfo, _, _) -> + #doc_info{id=Id, + rev=Rev, + conflict_revs=Conflicts, + deleted_conflict_revs=DelConflicts, + update_seq=Seq} = SrcDocInfo, + SrcRevs = [Rev | Conflicts] ++ DelConflicts, + MissingRevsPid ! {Id, SrcRevs}, % send to the missing revs process + {ok, Seq} + end, SourceSeqNum), + MissingRevsPid ! shutdown, + receive {done, MissingRevsPid, Stats1} -> ok end, + + OpenDocsPid ! shutdown, + receive {done, OpenDocsPid, Stats2} -> ok end, + + SaveDocsPid ! shutdown, + receive {done, SaveDocsPid, Stats3} -> ok end, + + {NewSeq, Stats1 ++ Stats2 ++ Stats3}. + + +receive_id_revs() -> + receive + {Id, Revs} -> + [{Id, Revs} | receive_id_revs()] + after 1 -> + [] + end. -maybe_save_docs(DbTarget, DbSource, - #doc_info{id=Id, rev=Rev, conflict_revs=Conflicts, deleted_conflict_revs=DelConflicts}, - Stats) -> - SrcRevs = [Rev | Conflicts] ++ DelConflicts, - {ok, [{Id, MissingRevs}]} = get_missing_revs(DbTarget, [{Id, SrcRevs}]), +get_missing_revs_loop(Parent, DbTarget, OpenDocsPid, RevsChecked, MissingFound) -> + receive + {Id, Revs} -> + Changed = [{Id, Revs} | receive_id_revs()], + {ok, Missing} = get_missing_revs(DbTarget, Changed), + [OpenDocsPid ! {Id0, MissingRevs} || {Id0, MissingRevs} <- Missing], + get_missing_revs_loop(Parent, DbTarget, OpenDocsPid, + RevsChecked + length(Changed), + MissingFound + length(Missing)); + shutdown -> + Parent ! {done, self(), [{missing_checked, RevsChecked}, + {missing_found, MissingFound}]} + end. + - case MissingRevs of - [] -> - Stats; - _Else -> +open_doc_revs_loop(Parent, DbSource, SaveDocsPid, DocsRead) -> + receive + {Id, MissingRevs} -> {ok, DocResults} = open_doc_revs(DbSource, Id, MissingRevs, [latest]), % only save successful reads Docs = [RevDoc || {ok, RevDoc} <- DocResults], - ok = save_docs(DbTarget, Docs, []), + SaveDocsPid ! Docs, + open_doc_revs_loop(Parent, DbSource, SaveDocsPid, DocsRead + length(Docs)); + shutdown -> + Parent ! {done, self(), [{docs_read, DocsRead}]} + end. + - Stats#stats{ - docs_read=Stats#stats.docs_read + length(Docs), - read_errors=Stats#stats.read_errors + length(DocResults) - length(Docs), - docs_copied=Stats#stats.docs_copied + length(Docs)} +receive_docs() -> + receive + Docs when is_list(Docs) -> + Docs ++ receive_docs() + after 1 -> + [] + end. + +save_docs_loop(Parent, DbTarget, DocsWritten) -> + receive + Docs0 when is_list(Docs0) -> + Docs = Docs0 ++ receive_docs(), + ok = save_docs(DbTarget, Docs, []), + save_docs_loop(Parent, DbTarget, DocsWritten + length(Docs)); + shutdown -> + Parent ! {done, self(), [{docs_written, DocsWritten}]} end. |