summaryrefslogtreecommitdiff
path: root/src/couchdb/couch_rep.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/couchdb/couch_rep.erl')
-rw-r--r--src/couchdb/couch_rep.erl114
1 files changed, 79 insertions, 35 deletions
diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl
index bead047c..f6fd0fad 100644
--- a/src/couchdb/couch_rep.erl
+++ b/src/couchdb/couch_rep.erl
@@ -16,14 +16,6 @@
-export([replicate/2, replicate/3]).
--record(stats, {
- docs_read=0,
- read_errors=0,
- docs_copied=0,
- copy_errors=0
- }).
-
-
url_encode([H|T]) ->
if
H >= $a, $z >= H ->
@@ -87,7 +79,8 @@ replicate(Source, Target, Options) ->
false -> SeqNum0
end,
- {NewSeqNum, Stats} = pull_rep(DbTgt, DbSrc, SeqNum, #stats{}),
+ {NewSeqNum, Stats} = pull_rep(DbTgt, DbSrc, SeqNum),
+
case NewSeqNum == SeqNum andalso OldRepHistoryProps /= [] of
true ->
% nothing changed, don't record results
@@ -98,11 +91,7 @@ replicate(Source, Target, Options) ->
[{"start_time", StartTime},
{"end_time", httpd_util:rfc1123_date()},
{"start_last_seq", SeqNum},
- {"end_last_seq", NewSeqNum},
- {"docs_read", Stats#stats.docs_read},
- {"read_errors", Stats#stats.read_errors},
- {"docs_copied", Stats#stats.docs_copied},
- {"copy_errors", Stats#stats.copy_errors}]}
+ {"end_last_seq", NewSeqNum} | Stats]}
| tuple_to_list(proplists:get_value("history", OldRepHistoryProps, {}))],
% something changed, record results
NewRepHistory =
@@ -116,34 +105,89 @@ replicate(Source, Target, Options) ->
{ok, NewRepHistory}
end.
-pull_rep(DbTarget, DbSource, SourceSeqNum, Stats) ->
+pull_rep(DbTarget, DbSource, SourceSeqNum) ->
+ Parent = self(),
+ SaveDocsPid = spawn_link(fun() ->
+ save_docs_loop(Parent, DbTarget, 0) end),
+ OpenDocsPid = spawn_link(fun() ->
+ open_doc_revs_loop(Parent, DbSource, SaveDocsPid, 0) end),
+ MissingRevsPid = spawn_link(fun() ->
+ get_missing_revs_loop(Parent, DbTarget, OpenDocsPid, 0, 0) end),
{ok, NewSeq} = enum_docs_since(DbSource, SourceSeqNum,
- fun(#doc_info{update_seq=Seq}=SrcDocInfo, _, {_, AccStats}) ->
- Stats2 = maybe_save_docs(DbTarget, DbSource, SrcDocInfo, AccStats),
- {ok, {Seq, Stats2}}
- end, {SourceSeqNum, Stats}),
- NewSeq.
-
+ fun(SrcDocInfo, _, _) ->
+ #doc_info{id=Id,
+ rev=Rev,
+ conflict_revs=Conflicts,
+ deleted_conflict_revs=DelConflicts,
+ update_seq=Seq} = SrcDocInfo,
+ SrcRevs = [Rev | Conflicts] ++ DelConflicts,
+ MissingRevsPid ! {Id, SrcRevs}, % send to the missing revs process
+ {ok, Seq}
+ end, SourceSeqNum),
+ MissingRevsPid ! shutdown,
+ receive {done, MissingRevsPid, Stats1} -> ok end,
+
+ OpenDocsPid ! shutdown,
+ receive {done, OpenDocsPid, Stats2} -> ok end,
+
+ SaveDocsPid ! shutdown,
+ receive {done, SaveDocsPid, Stats3} -> ok end,
+
+ {NewSeq, Stats1 ++ Stats2 ++ Stats3}.
+
+
+receive_id_revs() ->
+ receive
+ {Id, Revs} ->
+ [{Id, Revs} | receive_id_revs()]
+ after 1 ->
+ []
+ end.
-maybe_save_docs(DbTarget, DbSource,
- #doc_info{id=Id, rev=Rev, conflict_revs=Conflicts, deleted_conflict_revs=DelConflicts},
- Stats) ->
- SrcRevs = [Rev | Conflicts] ++ DelConflicts,
- {ok, [{Id, MissingRevs}]} = get_missing_revs(DbTarget, [{Id, SrcRevs}]),
+get_missing_revs_loop(Parent, DbTarget, OpenDocsPid, RevsChecked, MissingFound) ->
+ receive
+ {Id, Revs} ->
+ Changed = [{Id, Revs} | receive_id_revs()],
+ {ok, Missing} = get_missing_revs(DbTarget, Changed),
+ [OpenDocsPid ! {Id0, MissingRevs} || {Id0, MissingRevs} <- Missing],
+ get_missing_revs_loop(Parent, DbTarget, OpenDocsPid,
+ RevsChecked + length(Changed),
+ MissingFound + length(Missing));
+ shutdown ->
+ Parent ! {done, self(), [{missing_checked, RevsChecked},
+ {missing_found, MissingFound}]}
+ end.
+
- case MissingRevs of
- [] ->
- Stats;
- _Else ->
+open_doc_revs_loop(Parent, DbSource, SaveDocsPid, DocsRead) ->
+ receive
+ {Id, MissingRevs} ->
{ok, DocResults} = open_doc_revs(DbSource, Id, MissingRevs, [latest]),
% only save successful reads
Docs = [RevDoc || {ok, RevDoc} <- DocResults],
- ok = save_docs(DbTarget, Docs, []),
+ SaveDocsPid ! Docs,
+ open_doc_revs_loop(Parent, DbSource, SaveDocsPid, DocsRead + length(Docs));
+ shutdown ->
+ Parent ! {done, self(), [{docs_read, DocsRead}]}
+ end.
+
- Stats#stats{
- docs_read=Stats#stats.docs_read + length(Docs),
- read_errors=Stats#stats.read_errors + length(DocResults) - length(Docs),
- docs_copied=Stats#stats.docs_copied + length(Docs)}
+receive_docs() ->
+ receive
+ Docs when is_list(Docs) ->
+ Docs ++ receive_docs()
+ after 1 ->
+ []
+ end.
+
+save_docs_loop(Parent, DbTarget, DocsWritten) ->
+ receive
+ Docs0 when is_list(Docs0) ->
+ Docs = Docs0 ++ receive_docs(),
+ ok = save_docs(DbTarget, Docs, []),
+ save_docs_loop(Parent, DbTarget, DocsWritten + length(Docs));
+ shutdown ->
+ Parent ! {done, self(), [{docs_written, DocsWritten}]}
end.