diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/couchdb/couch_rep.erl | 83 | ||||
-rw-r--r-- | src/couchdb/couch_rep_reader.erl | 63 | ||||
-rw-r--r-- | src/couchdb/couch_rep_writer.erl | 9 |
3 files changed, 131 insertions, 24 deletions
diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl index 98413f23..c182c001 100644 --- a/src/couchdb/couch_rep.erl +++ b/src/couchdb/couch_rep.erl @@ -45,7 +45,8 @@ complete = false, committed_seq = 0, - stats = nil + stats = nil, + doc_ids = nil }). %% convenience function to do a simple replication from the shell @@ -102,26 +103,49 @@ do_init([RepId, {PostProps}, UserCtx] = InitArgs) -> SourceProps = proplists:get_value(<<"source">>, PostProps), TargetProps = proplists:get_value(<<"target">>, PostProps), + DocIds = proplists:get_value(<<"doc_ids">>, PostProps, nil), Continuous = proplists:get_value(<<"continuous">>, PostProps, false), CreateTarget = proplists:get_value(<<"create_target">>, PostProps, false), Source = open_db(SourceProps, UserCtx), Target = open_db(TargetProps, UserCtx, CreateTarget), - SourceLog = open_replication_log(Source, RepId), - TargetLog = open_replication_log(Target, RepId), - SourceInfo = dbinfo(Source), TargetInfo = dbinfo(Target), + + case DocIds of + List when is_list(List) -> + % Fast replication using only a list of doc IDs to replicate. + % Replication sessions, checkpoints and logs are not created + % since the update sequence number of the source DB is not used + % for determining which documents are copied into the target DB. + SourceLog = nil, + TargetLog = nil, + + StartSeq = nil, + History = nil, + + ChangesFeed = nil, + MissingRevs = nil, + + {ok, Reader} = + couch_rep_reader:start_link(self(), Source, DocIds, PostProps); + + _ -> + % Replication using the _changes API (DB sequence update numbers). + SourceLog = open_replication_log(Source, RepId), + TargetLog = open_replication_log(Target, RepId), - {StartSeq, History} = compare_replication_logs(SourceLog, TargetLog), - - {ok, ChangesFeed} = - couch_rep_changes_feed:start_link(self(), Source, StartSeq, PostProps), - {ok, MissingRevs} = - couch_rep_missing_revs:start_link(self(), Target, ChangesFeed, PostProps), - {ok, Reader} = - couch_rep_reader:start_link(self(), Source, MissingRevs, PostProps), + {StartSeq, History} = compare_replication_logs(SourceLog, TargetLog), + + {ok, ChangesFeed} = + couch_rep_changes_feed:start_link(self(), Source, StartSeq, PostProps), + {ok, MissingRevs} = + couch_rep_missing_revs:start_link(self(), Target, ChangesFeed, PostProps), + {ok, Reader} = + couch_rep_reader:start_link(self(), Source, MissingRevs, PostProps) + end, + {ok, Writer} = couch_rep_writer:start_link(self(), Target, Reader, PostProps), @@ -156,7 +180,8 @@ do_init([RepId, {PostProps}, UserCtx] = InitArgs) -> target_log = TargetLog, rep_starttime = httpd_util:rfc1123_date(), src_starttime = proplists:get_value(instance_start_time, SourceInfo), - tgt_starttime = proplists:get_value(instance_start_time, TargetInfo) + tgt_starttime = proplists:get_value(instance_start_time, TargetInfo), + doc_ids = DocIds }, {ok, State}. @@ -325,20 +350,34 @@ dbinfo(Db) -> {ok, Info} = couch_db:get_db_info(Db), Info. +do_terminate(#state{doc_ids=DocIds} = State) when is_list(DocIds) -> + #state{ + listeners = Listeners, + rep_starttime = ReplicationStartTime, + stats = Stats + } = State, + + RepByDocsJson = {[ + {<<"start_time">>, ?l2b(ReplicationStartTime)}, + {<<"end_time">>, ?l2b(httpd_util:rfc1123_date())}, + {<<"docs_read">>, ets:lookup_element(Stats, docs_read, 2)}, + {<<"docs_written">>, ets:lookup_element(Stats, docs_written, 2)}, + {<<"doc_write_failures">>, + ets:lookup_element(Stats, doc_write_failures, 2)} + ]}, + + terminate_cleanup(State), + [gen_server:reply(L, {ok, RepByDocsJson}) || L <- lists:reverse(Listeners)]; + do_terminate(State) -> #state{ checkpoint_history = CheckpointHistory, committed_seq = NewSeq, listeners = Listeners, source = Source, - target = Target, continuous = Continuous, - stats = Stats, source_log = #doc{body={OldHistory}} } = State, - couch_task_status:update("Finishing"), - ets:delete(Stats), - close_db(Target), NewRepHistory = case CheckpointHistory of nil -> @@ -366,7 +405,13 @@ do_terminate(State) -> false -> [gen_server:reply(R, retry) || R <- OtherListeners] end, - close_db(Source). + terminate_cleanup(State). + +terminate_cleanup(#state{source=Source, target=Target, stats=Stats}) -> + couch_task_status:update("Finishing"), + close_db(Target), + close_db(Source), + ets:delete(Stats). has_session_id(_SessionId, []) -> false; diff --git a/src/couchdb/couch_rep_reader.erl b/src/couchdb/couch_rep_reader.erl index a66454c8..b8326fb3 100644 --- a/src/couchdb/couch_rep_reader.erl +++ b/src/couchdb/couch_rep_reader.erl @@ -43,13 +43,15 @@ opened_seqs = [] }). -start_link(Parent, Source, MissingRevs, PostProps) -> - gen_server:start_link(?MODULE, [Parent, Source, MissingRevs, PostProps], []). +start_link(Parent, Source, MissingRevs_or_DocIds, PostProps) -> + gen_server:start_link( + ?MODULE, [Parent, Source, MissingRevs_or_DocIds, PostProps], [] + ). next(Pid) -> gen_server:call(Pid, next_docs, infinity). -init([Parent, Source, MissingRevs, _PostProps]) -> +init([Parent, Source, MissingRevs_or_DocIds, PostProps]) -> process_flag(trap_exit, true), if is_record(Source, http_db) -> #url{host=Host, port=Port} = ibrowse_lib:parse_url(Source#http_db.url), @@ -57,7 +59,15 @@ init([Parent, Source, MissingRevs, _PostProps]) -> ibrowse:set_max_pipeline_size(Host, Port, ?MAX_PIPELINE_SIZE); true -> ok end, Self = self(), - ReaderLoop = spawn_link(fun() -> reader_loop(Self, Source, MissingRevs) end), + ReaderLoop = spawn_link( + fun() -> reader_loop(Self, Source, MissingRevs_or_DocIds) end + ), + MissingRevs = case MissingRevs_or_DocIds of + Pid when is_pid(Pid) -> + Pid; + _ListDocIds -> + nil + end, State = #state{ parent = Parent, source = Source, @@ -167,6 +177,8 @@ handle_reader_loop_complete(#state{monitor_count=0} = State) -> handle_reader_loop_complete(State) -> {noreply, State#state{complete = waiting_on_monitors}}. +calculate_new_high_seq(#state{missing_revs=nil}) -> + nil; calculate_new_high_seq(#state{requested_seqs=[], opened_seqs=[Open|_]}) -> Open; calculate_new_high_seq(#state{requested_seqs=[Req|_], opened_seqs=[Open|_]}) @@ -191,6 +203,8 @@ split_revlist(Rev, {[CurrentAcc|Rest], BaseLength, Length}) -> % opened seqs greater than the smallest outstanding request. I believe its the % minimal set of info needed to correctly calculate which seqs have been % replicated (because remote docs can be opened out-of-order) -- APK +update_sequence_lists(_Seq, #state{missing_revs=nil} = State) -> + State; update_sequence_lists(Seq, State) -> Requested = lists:delete(Seq, State#state.requested_seqs), AllOpened = lists:merge([Seq], State#state.opened_seqs), @@ -234,6 +248,37 @@ open_doc_revs(#http_db{} = DbS, DocId, Revs) -> end, [Transform(Result) || Result <- JsonResults]. +open_doc(#http_db{} = DbS, DocId) -> + % get latest rev of the doc + Req = DbS#http_db{resource=url_encode(DocId)}, + case couch_rep_httpc:request(Req) of + {[{<<"error">>,<<"not_found">>}, {<<"reason">>,<<"missing">>}]} -> + []; + Json -> + #doc{id=Id, revs=Rev, atts=Atts} = Doc = couch_doc:from_json_obj(Json), + [Doc#doc{ + atts=[couch_rep_att:convert_stub(A, {DbS,Id,Rev}) || A <- Atts] + }] + end. + +reader_loop(ReaderServer, Source, DocIds) when is_list(DocIds) -> + case Source of + #http_db{} -> + [gen_server:call(ReaderServer, {open_remote_doc, Id, nil, nil}, + infinity) || Id <- DocIds]; + _LocalDb -> + Docs = lists:foldr(fun(Id, Acc) -> + case couch_db:open_doc(Source, Id) of + {ok, Doc} -> + [Doc | Acc]; + _ -> + Acc + end + end, [], DocIds), + gen_server:call(ReaderServer, {add_docs, nil, Docs}, infinity) + end, + exit(complete); + reader_loop(ReaderServer, Source, MissingRevsServer) -> case couch_rep_missing_revs:next(MissingRevsServer) of complete -> @@ -267,6 +312,8 @@ maybe_reopen_db(#db{update_seq=OldSeq} = Db, HighSeq) when HighSeq > OldSeq -> maybe_reopen_db(Db, _HighSeq) -> Db. +spawn_document_request(Source, Id, nil, nil) -> + spawn_document_request(Source, Id); spawn_document_request(Source, Id, Seq, Revs) -> Server = self(), SpawnFun = fun() -> @@ -274,3 +321,11 @@ spawn_document_request(Source, Id, Seq, Revs) -> gen_server:call(Server, {add_docs, Seq, Results}, infinity) end, spawn_monitor(SpawnFun). + +spawn_document_request(Source, Id) -> + Server = self(), + SpawnFun = fun() -> + Results = open_doc(Source, Id), + gen_server:call(Server, {add_docs, nil, Results}, infinity) + end, + spawn_monitor(SpawnFun). diff --git a/src/couchdb/couch_rep_writer.erl b/src/couchdb/couch_rep_writer.erl index b86028ce..269b9799 100644 --- a/src/couchdb/couch_rep_writer.erl +++ b/src/couchdb/couch_rep_writer.erl @@ -21,6 +21,8 @@ start_link(Parent, Target, Reader, _PostProps) -> writer_loop(Parent, Reader, Target) -> case couch_rep_reader:next(Reader) of + {complete, nil} -> + ok; {complete, FinalSeq} -> Parent ! {writer_checkpoint, FinalSeq}, ok; @@ -38,7 +40,12 @@ writer_loop(Parent, Reader, Target) -> ?LOG_DEBUG("writer failed to write an attachment ~p", [Err]), exit({attachment_request_failed, Err, Docs}) end, - Parent ! {writer_checkpoint, HighSeq}, + case HighSeq of + nil -> + ok; + _SeqNumber -> + Parent ! {writer_checkpoint, HighSeq} + end, couch_rep_att:cleanup(), couch_util:should_flush(), writer_loop(Parent, Reader, Target) |