diff options
-rw-r--r-- | src/couchdb/couch_rep.erl | 234 |
1 files changed, 129 insertions, 105 deletions
diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl index 08d4df8f..4fc1cdd6 100644 --- a/src/couchdb/couch_rep.erl +++ b/src/couchdb/couch_rep.erl @@ -135,93 +135,11 @@ replicate2(Source, DbSrc, Target, DbTgt, Options) -> end. pull_rep(DbTarget, DbSource, SourceSeqNum) -> - SaveDocsPid = spawn_link(fun() -> - save_docs_loop(DbTarget, 0) end), - OpenDocsPid = spawn_link(fun() -> - open_doc_revs_loop(DbSource, SaveDocsPid, 0) end), - OpenDocsPid ! got_it, % prime queue with got_it - MissingRevsPid = spawn_link(fun() -> - get_missing_revs_loop(DbTarget, OpenDocsPid, 0, 0) end), - MissingRevsPid ! got_it, % prime queue with got_it - self() ! got_it, - {ok, NewSeq} = enum_docs_since(DbSource, SourceSeqNum, - fun(SrcDocInfo, _, _) -> - #doc_info{id=Id, - rev=Rev, - conflict_revs=Conflicts, - deleted_conflict_revs=DelConflicts, - update_seq=Seq} = SrcDocInfo, - SrcRevs = [Rev | Conflicts] ++ DelConflicts, - receive got_it -> ok end, - MissingRevsPid ! {self(), Id, SrcRevs}, % send to the missing revs process - {ok, Seq} - end, SourceSeqNum), - - receive got_it -> ok end, - - MissingRevsPid ! {self(), shutdown}, - receive {done, MissingRevsPid, Stats1} -> ok end, - - OpenDocsPid ! {self(), shutdown}, - receive {done, OpenDocsPid, Stats2} -> ok end, - - SaveDocsPid ! {self(), shutdown}, - receive {done, SaveDocsPid, Stats3} -> ok end, - - {NewSeq, Stats1 ++ Stats2 ++ Stats3}. - - -get_missing_revs_loop(DbTarget, OpenDocsPid, RevsChecked, MissingFound) -> - receive got_it -> ok end, - receive - {Src, Id, Revs} -> - Src ! got_it, - - MissingRevs = - case get_missing_revs(DbTarget, [{Id, Revs}]) of - {ok, [{Id, MissingRevs0}]} -> - OpenDocsPid ! {self(), Id, MissingRevs0}, - MissingRevs0; - {ok, []} -> - % prime our message queue - self() ! got_it, - [] - end, - get_missing_revs_loop(DbTarget, OpenDocsPid, - RevsChecked + length(Revs), - MissingFound + length(MissingRevs)); - {Src, shutdown} -> - Src ! {done, self(), [{<<"missing_checked">>, RevsChecked}, - {<<"missing_found">>, MissingFound}]} - end. - - -open_doc_revs_loop(DbSource, SaveDocsPid, DocsRead) -> - receive got_it -> ok end, - receive - {Src, Id, MissingRevs} -> - Src ! got_it, - {ok, DocResults} = open_doc_revs(DbSource, Id, MissingRevs, [latest]), - % only save successful reads - Docs = [RevDoc || {ok, RevDoc} <- DocResults], - SaveDocsPid ! {self(), docs, Docs}, - open_doc_revs_loop(DbSource, SaveDocsPid, DocsRead + length(Docs)); - {Src, shutdown} -> - Src ! {done, self(), [{<<"docs_read">>, DocsRead}]} - end. - - - -save_docs_loop(DbTarget, DocsWritten) -> - receive - {Src, docs, Docs} -> - Src ! got_it, - ok = update_docs(DbTarget, Docs, [], false), - save_docs_loop(DbTarget, DocsWritten + length(Docs)); - {Src, shutdown} -> - Src ! {done, self(), [{<<"docs_written">>, DocsWritten}]} - end. - + http:set_options([{max_pipeline_length, 101}, {pipeline_timeout, 5000}]), + {ok, {NewSeq, Stats}} = + enum_docs_since(DbSource, DbTarget, SourceSeqNum, {SourceSeqNum, []}), + http:set_options([{max_pipeline_length, 2}, {pipeline_timeout, 0}]), + {NewSeq, Stats}. do_http_request(Url, Action, Headers) -> do_http_request(Url, Action, Headers, []). @@ -243,14 +161,96 @@ do_http_request(Url, Action, Headers, JsonBody) -> ?JSON_DECODE(ResponseBody) end. -enum_docs0(_InFun, [], Acc) -> - Acc; -enum_docs0(InFun, [DocInfo | Rest], Acc) -> - case InFun(DocInfo, 0, Acc) of - {ok, Acc2} -> enum_docs0(InFun, Rest, Acc2); - {stop, Acc2} -> Acc2 +save_docs_buffer(DbTarget, DocsBuffer, []) -> + receive + {Src, shutdown} -> + ok = update_docs(DbTarget, lists:reverse(DocsBuffer), [], false), + Src ! {done, self(), [{<<"docs_written">>, length(DocsBuffer)}]} + end; +save_docs_buffer(DbTarget, DocsBuffer, UpdateSequences) -> + [NextSeq|Rest] = UpdateSequences, + receive + {Src, skip, NextSeq} -> + Src ! got_it, + save_docs_buffer(DbTarget, DocsBuffer, Rest); + {Src, docs, {NextSeq, Docs}} -> + Src ! got_it, + case couch_util:should_flush() of + true -> + ok = update_docs(DbTarget, lists:reverse(Docs++DocsBuffer), [], + false), + save_docs_buffer(DbTarget, [], Rest); + false -> + save_docs_buffer(DbTarget, Docs++DocsBuffer, Rest) + end; + {Src, shutdown} -> + ?LOG_ERROR("received shutdown while waiting for more update_seqs", []), + ok = update_docs(DbTarget, lists:reverse(DocsBuffer), [], false), + Src ! {done, self(), [{<<"docs_written">>, length(DocsBuffer)}]} end. +pmap(F,List) -> + [wait_result(Worker) || Worker <- [spawn_worker(self(),F,E) || E <- List]]. + +spawn_worker(Parent, F, E) -> + erlang:spawn_monitor(fun() -> Parent ! {self(), F(E)} end). + +wait_result({Pid,Ref}) -> + receive + {'DOWN', Ref, _, _, normal} -> receive {Pid,Result} -> Result end; + {'DOWN', Ref, _, _, Reason} -> exit(Reason) +end. + +enum_docs_parallel(DbS, DbT, DocInfoList) -> + UpdateSeqs = [D#doc_info.update_seq || D <- DocInfoList], + SaveDocsPid = spawn_link(fun() -> save_docs_buffer(DbT,[],UpdateSeqs) end), + + Stats = pmap(fun(SrcDocInfo) -> + #doc_info{id=Id, + rev=Rev, + conflict_revs=Conflicts, + deleted_conflict_revs=DelConflicts, + update_seq=Seq} = SrcDocInfo, + SrcRevs = [Rev | Conflicts] ++ DelConflicts, + + case get_missing_revs(DbT, [{Id, SrcRevs}]) of + {ok, [{Id, MissingRevs}]} -> + {ok, DocResults} = open_doc_revs(DbS, Id, MissingRevs, [latest]), + + % only save successful reads + Docs = [RevDoc || {ok, RevDoc} <- DocResults], + + % include update_seq so we save docs in order + SaveDocsPid ! {self(), docs, {Seq, Docs}}, + receive got_it -> ok end, + [{missing_checked, length(SrcRevs)}, + {missing_found, length(MissingRevs)}, + {docs_read, length(Docs)}]; + {ok, []} -> + SaveDocsPid ! {self(), skip, Seq}, + receive got_it -> ok end, + [{missing_checked, length(SrcRevs)}] + end + end, DocInfoList), + + SaveDocsPid ! {self(), shutdown}, + + {MissingChecked, MissingFound, DocsRead} = lists:foldl(fun(S, {C, F, R}) -> + C1 = C + proplists:get_value(missing_checked, S, 0), + F1 = F + proplists:get_value(missing_found, S, 0), + R1 = R + proplists:get_value(docs_read, S, 0), + {C1, F1, R1} + end, {0, 0, 0}, Stats), + + receive + {done, SaveDocsPid, [{<<"docs_written">>, DocsWritten}]} -> ok + end, + + [ {<<"missing_checked">>, MissingChecked}, + {<<"missing_found">>, MissingFound}, + {<<"docs_read">>, DocsRead}, + {<<"docs_written">>, DocsWritten} ]. + fix_url(UrlBin) -> Url = binary_to_list(UrlBin), case lists:last(Url) of @@ -276,12 +276,10 @@ close_db(#http_db{})-> close_db(Db)-> couch_db:close(Db). - -enum_docs_since(#http_db{uri=DbUrl, headers=Headers}=Db, Start, InFun, InAcc)-> - Url = DbUrl ++ "_all_docs_by_seq?count=100&startkey=" - ++ integer_to_list(Start), +get_doc_info_list(#http_db{uri=DbUrl, headers=Headers}, StartSeq) -> + Url = DbUrl ++ "_all_docs_by_seq?count=100&startkey=" + ++ integer_to_list(StartSeq), {Results} = do_http_request(Url, get, Headers), - DocInfoList= lists:map(fun({RowInfoList}) -> {RowValueProps} = proplists:get_value(<<"value">>, RowInfoList), #doc_info{ @@ -292,18 +290,44 @@ enum_docs_since(#http_db{uri=DbUrl, headers=Headers}=Db, Start, InFun, InAcc)-> proplists:get_value(<<"conflicts">>, RowValueProps, []), deleted_conflict_revs = proplists:get_value(<<"deleted_conflicts">>, RowValueProps, []), - deleted = proplists:get_value(<<"deleted">>, RowValueProps, false)} - end, proplists:get_value(<<"rows">>, Results)), + deleted = proplists:get_value(<<"deleted">>, RowValueProps, false) + } + end, proplists:get_value(<<"rows">>, Results)); +get_doc_info_list(DbSource, StartSeq) -> + {ok, {_Count, DocInfoList}} = couch_db:enum_docs_since(DbSource, StartSeq, + fun (_, _, {100, DocInfoList}) -> + {stop, {100, DocInfoList}}; + (DocInfo, _, {Count, DocInfoList}) -> + {ok, {Count+1, [DocInfo|DocInfoList]}} + end, {0, []}), + lists:reverse(DocInfoList). + +enum_docs_since(DbSource, DbTarget, StartSeq, InAcc) -> + DocInfoList = get_doc_info_list(DbSource, StartSeq), case DocInfoList of [] -> {ok, InAcc}; _ -> - Acc2 = enum_docs0(InFun, DocInfoList, InAcc), + Stats = enum_docs_parallel(DbSource, DbTarget, DocInfoList), + OldStats = element(2, InAcc), + TotalStats = [ + {<<"missing_checked">>, + proplists:get_value(<<"missing_checked">>, OldStats, 0) + + proplists:get_value(<<"missing_checked">>, Stats, 0)}, + {<<"missing_found">>, + proplists:get_value(<<"missing_found">>, OldStats, 0) + + proplists:get_value(<<"missing_found">>, Stats, 0)}, + {<<"docs_read">>, + proplists:get_value(<<"docs_read">>, OldStats, 0) + + proplists:get_value(<<"docs_read">>, Stats, 0)}, + {<<"docs_written">>, + proplists:get_value(<<"docs_written">>, OldStats, 0) + + proplists:get_value(<<"docs_written">>, Stats, 0)} + ], + #doc_info{update_seq=LastSeq} = lists:last(DocInfoList), - enum_docs_since(Db, LastSeq, InFun, Acc2) - end; -enum_docs_since(DbSource, StartSeq, Fun, Acc) -> - couch_db:enum_docs_since(DbSource, StartSeq, Fun, Acc). + enum_docs_since(DbSource, DbTarget, LastSeq, {LastSeq, TotalStats}) + end. get_missing_revs(#http_db{uri=DbUrl, headers=Headers}, DocIdRevsList) -> {ResponseMembers} = do_http_request(DbUrl ++ "_missing_revs", post, Headers, |