summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/couchdb/couch_rep.erl234
1 files changed, 129 insertions, 105 deletions
diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl
index 08d4df8f..4fc1cdd6 100644
--- a/src/couchdb/couch_rep.erl
+++ b/src/couchdb/couch_rep.erl
@@ -135,93 +135,11 @@ replicate2(Source, DbSrc, Target, DbTgt, Options) ->
end.
pull_rep(DbTarget, DbSource, SourceSeqNum) ->
- SaveDocsPid = spawn_link(fun() ->
- save_docs_loop(DbTarget, 0) end),
- OpenDocsPid = spawn_link(fun() ->
- open_doc_revs_loop(DbSource, SaveDocsPid, 0) end),
- OpenDocsPid ! got_it, % prime queue with got_it
- MissingRevsPid = spawn_link(fun() ->
- get_missing_revs_loop(DbTarget, OpenDocsPid, 0, 0) end),
- MissingRevsPid ! got_it, % prime queue with got_it
- self() ! got_it,
- {ok, NewSeq} = enum_docs_since(DbSource, SourceSeqNum,
- fun(SrcDocInfo, _, _) ->
- #doc_info{id=Id,
- rev=Rev,
- conflict_revs=Conflicts,
- deleted_conflict_revs=DelConflicts,
- update_seq=Seq} = SrcDocInfo,
- SrcRevs = [Rev | Conflicts] ++ DelConflicts,
- receive got_it -> ok end,
- MissingRevsPid ! {self(), Id, SrcRevs}, % send to the missing revs process
- {ok, Seq}
- end, SourceSeqNum),
-
- receive got_it -> ok end,
-
- MissingRevsPid ! {self(), shutdown},
- receive {done, MissingRevsPid, Stats1} -> ok end,
-
- OpenDocsPid ! {self(), shutdown},
- receive {done, OpenDocsPid, Stats2} -> ok end,
-
- SaveDocsPid ! {self(), shutdown},
- receive {done, SaveDocsPid, Stats3} -> ok end,
-
- {NewSeq, Stats1 ++ Stats2 ++ Stats3}.
-
-
-get_missing_revs_loop(DbTarget, OpenDocsPid, RevsChecked, MissingFound) ->
- receive got_it -> ok end,
- receive
- {Src, Id, Revs} ->
- Src ! got_it,
-
- MissingRevs =
- case get_missing_revs(DbTarget, [{Id, Revs}]) of
- {ok, [{Id, MissingRevs0}]} ->
- OpenDocsPid ! {self(), Id, MissingRevs0},
- MissingRevs0;
- {ok, []} ->
- % prime our message queue
- self() ! got_it,
- []
- end,
- get_missing_revs_loop(DbTarget, OpenDocsPid,
- RevsChecked + length(Revs),
- MissingFound + length(MissingRevs));
- {Src, shutdown} ->
- Src ! {done, self(), [{<<"missing_checked">>, RevsChecked},
- {<<"missing_found">>, MissingFound}]}
- end.
-
-
-open_doc_revs_loop(DbSource, SaveDocsPid, DocsRead) ->
- receive got_it -> ok end,
- receive
- {Src, Id, MissingRevs} ->
- Src ! got_it,
- {ok, DocResults} = open_doc_revs(DbSource, Id, MissingRevs, [latest]),
- % only save successful reads
- Docs = [RevDoc || {ok, RevDoc} <- DocResults],
- SaveDocsPid ! {self(), docs, Docs},
- open_doc_revs_loop(DbSource, SaveDocsPid, DocsRead + length(Docs));
- {Src, shutdown} ->
- Src ! {done, self(), [{<<"docs_read">>, DocsRead}]}
- end.
-
-
-
-save_docs_loop(DbTarget, DocsWritten) ->
- receive
- {Src, docs, Docs} ->
- Src ! got_it,
- ok = update_docs(DbTarget, Docs, [], false),
- save_docs_loop(DbTarget, DocsWritten + length(Docs));
- {Src, shutdown} ->
- Src ! {done, self(), [{<<"docs_written">>, DocsWritten}]}
- end.
-
+ http:set_options([{max_pipeline_length, 101}, {pipeline_timeout, 5000}]),
+ {ok, {NewSeq, Stats}} =
+ enum_docs_since(DbSource, DbTarget, SourceSeqNum, {SourceSeqNum, []}),
+ http:set_options([{max_pipeline_length, 2}, {pipeline_timeout, 0}]),
+ {NewSeq, Stats}.
do_http_request(Url, Action, Headers) ->
do_http_request(Url, Action, Headers, []).
@@ -243,14 +161,96 @@ do_http_request(Url, Action, Headers, JsonBody) ->
?JSON_DECODE(ResponseBody)
end.
-enum_docs0(_InFun, [], Acc) ->
- Acc;
-enum_docs0(InFun, [DocInfo | Rest], Acc) ->
- case InFun(DocInfo, 0, Acc) of
- {ok, Acc2} -> enum_docs0(InFun, Rest, Acc2);
- {stop, Acc2} -> Acc2
+save_docs_buffer(DbTarget, DocsBuffer, []) ->
+ receive
+ {Src, shutdown} ->
+ ok = update_docs(DbTarget, lists:reverse(DocsBuffer), [], false),
+ Src ! {done, self(), [{<<"docs_written">>, length(DocsBuffer)}]}
+ end;
+save_docs_buffer(DbTarget, DocsBuffer, UpdateSequences) ->
+ [NextSeq|Rest] = UpdateSequences,
+ receive
+ {Src, skip, NextSeq} ->
+ Src ! got_it,
+ save_docs_buffer(DbTarget, DocsBuffer, Rest);
+ {Src, docs, {NextSeq, Docs}} ->
+ Src ! got_it,
+ case couch_util:should_flush() of
+ true ->
+ ok = update_docs(DbTarget, lists:reverse(Docs++DocsBuffer), [],
+ false),
+ save_docs_buffer(DbTarget, [], Rest);
+ false ->
+ save_docs_buffer(DbTarget, Docs++DocsBuffer, Rest)
+ end;
+ {Src, shutdown} ->
+ ?LOG_ERROR("received shutdown while waiting for more update_seqs", []),
+ ok = update_docs(DbTarget, lists:reverse(DocsBuffer), [], false),
+ Src ! {done, self(), [{<<"docs_written">>, length(DocsBuffer)}]}
end.
+pmap(F,List) ->
+ [wait_result(Worker) || Worker <- [spawn_worker(self(),F,E) || E <- List]].
+
+spawn_worker(Parent, F, E) ->
+ erlang:spawn_monitor(fun() -> Parent ! {self(), F(E)} end).
+
+wait_result({Pid,Ref}) ->
+ receive
+ {'DOWN', Ref, _, _, normal} -> receive {Pid,Result} -> Result end;
+ {'DOWN', Ref, _, _, Reason} -> exit(Reason)
+end.
+
+enum_docs_parallel(DbS, DbT, DocInfoList) ->
+ UpdateSeqs = [D#doc_info.update_seq || D <- DocInfoList],
+ SaveDocsPid = spawn_link(fun() -> save_docs_buffer(DbT,[],UpdateSeqs) end),
+
+ Stats = pmap(fun(SrcDocInfo) ->
+ #doc_info{id=Id,
+ rev=Rev,
+ conflict_revs=Conflicts,
+ deleted_conflict_revs=DelConflicts,
+ update_seq=Seq} = SrcDocInfo,
+ SrcRevs = [Rev | Conflicts] ++ DelConflicts,
+
+ case get_missing_revs(DbT, [{Id, SrcRevs}]) of
+ {ok, [{Id, MissingRevs}]} ->
+ {ok, DocResults} = open_doc_revs(DbS, Id, MissingRevs, [latest]),
+
+ % only save successful reads
+ Docs = [RevDoc || {ok, RevDoc} <- DocResults],
+
+ % include update_seq so we save docs in order
+ SaveDocsPid ! {self(), docs, {Seq, Docs}},
+ receive got_it -> ok end,
+ [{missing_checked, length(SrcRevs)},
+ {missing_found, length(MissingRevs)},
+ {docs_read, length(Docs)}];
+ {ok, []} ->
+ SaveDocsPid ! {self(), skip, Seq},
+ receive got_it -> ok end,
+ [{missing_checked, length(SrcRevs)}]
+ end
+ end, DocInfoList),
+
+ SaveDocsPid ! {self(), shutdown},
+
+ {MissingChecked, MissingFound, DocsRead} = lists:foldl(fun(S, {C, F, R}) ->
+ C1 = C + proplists:get_value(missing_checked, S, 0),
+ F1 = F + proplists:get_value(missing_found, S, 0),
+ R1 = R + proplists:get_value(docs_read, S, 0),
+ {C1, F1, R1}
+ end, {0, 0, 0}, Stats),
+
+ receive
+ {done, SaveDocsPid, [{<<"docs_written">>, DocsWritten}]} -> ok
+ end,
+
+ [ {<<"missing_checked">>, MissingChecked},
+ {<<"missing_found">>, MissingFound},
+ {<<"docs_read">>, DocsRead},
+ {<<"docs_written">>, DocsWritten} ].
+
fix_url(UrlBin) ->
Url = binary_to_list(UrlBin),
case lists:last(Url) of
@@ -276,12 +276,10 @@ close_db(#http_db{})->
close_db(Db)->
couch_db:close(Db).
-
-enum_docs_since(#http_db{uri=DbUrl, headers=Headers}=Db, Start, InFun, InAcc)->
- Url = DbUrl ++ "_all_docs_by_seq?count=100&startkey="
- ++ integer_to_list(Start),
+get_doc_info_list(#http_db{uri=DbUrl, headers=Headers}, StartSeq) ->
+ Url = DbUrl ++ "_all_docs_by_seq?count=100&startkey="
+ ++ integer_to_list(StartSeq),
{Results} = do_http_request(Url, get, Headers),
- DocInfoList=
lists:map(fun({RowInfoList}) ->
{RowValueProps} = proplists:get_value(<<"value">>, RowInfoList),
#doc_info{
@@ -292,18 +290,44 @@ enum_docs_since(#http_db{uri=DbUrl, headers=Headers}=Db, Start, InFun, InAcc)->
proplists:get_value(<<"conflicts">>, RowValueProps, []),
deleted_conflict_revs =
proplists:get_value(<<"deleted_conflicts">>, RowValueProps, []),
- deleted = proplists:get_value(<<"deleted">>, RowValueProps, false)}
- end, proplists:get_value(<<"rows">>, Results)),
+ deleted = proplists:get_value(<<"deleted">>, RowValueProps, false)
+ }
+ end, proplists:get_value(<<"rows">>, Results));
+get_doc_info_list(DbSource, StartSeq) ->
+ {ok, {_Count, DocInfoList}} = couch_db:enum_docs_since(DbSource, StartSeq,
+ fun (_, _, {100, DocInfoList}) ->
+ {stop, {100, DocInfoList}};
+ (DocInfo, _, {Count, DocInfoList}) ->
+ {ok, {Count+1, [DocInfo|DocInfoList]}}
+ end, {0, []}),
+ lists:reverse(DocInfoList).
+
+enum_docs_since(DbSource, DbTarget, StartSeq, InAcc) ->
+ DocInfoList = get_doc_info_list(DbSource, StartSeq),
case DocInfoList of
[] ->
{ok, InAcc};
_ ->
- Acc2 = enum_docs0(InFun, DocInfoList, InAcc),
+ Stats = enum_docs_parallel(DbSource, DbTarget, DocInfoList),
+ OldStats = element(2, InAcc),
+ TotalStats = [
+ {<<"missing_checked">>,
+ proplists:get_value(<<"missing_checked">>, OldStats, 0) +
+ proplists:get_value(<<"missing_checked">>, Stats, 0)},
+ {<<"missing_found">>,
+ proplists:get_value(<<"missing_found">>, OldStats, 0) +
+ proplists:get_value(<<"missing_found">>, Stats, 0)},
+ {<<"docs_read">>,
+ proplists:get_value(<<"docs_read">>, OldStats, 0) +
+ proplists:get_value(<<"docs_read">>, Stats, 0)},
+ {<<"docs_written">>,
+ proplists:get_value(<<"docs_written">>, OldStats, 0) +
+ proplists:get_value(<<"docs_written">>, Stats, 0)}
+ ],
+
#doc_info{update_seq=LastSeq} = lists:last(DocInfoList),
- enum_docs_since(Db, LastSeq, InFun, Acc2)
- end;
-enum_docs_since(DbSource, StartSeq, Fun, Acc) ->
- couch_db:enum_docs_since(DbSource, StartSeq, Fun, Acc).
+ enum_docs_since(DbSource, DbTarget, LastSeq, {LastSeq, TotalStats})
+ end.
get_missing_revs(#http_db{uri=DbUrl, headers=Headers}, DocIdRevsList) ->
{ResponseMembers} = do_http_request(DbUrl ++ "_missing_revs", post, Headers,