From 0fc0c2d630abe0f4b6cc37c7f92728d1fe156ff3 Mon Sep 17 00:00:00 2001 From: John Christopher Anderson Date: Fri, 29 Jan 2010 20:08:54 +0000 Subject: Thanks Filipe Manana. Closes COUCHDB-631. Replicator option to replicate a list of docids (bypasses by_seq index). Usage: POST to /_replicate with a JSON body of: {"source": "myfoo", "target" : "http://remotedb.com/theirfoo", "doc_ids": ["foo1", "foo3", "foo666"]} This will copy the listed docs from the source to the target database. git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@904615 13f79535-47bb-0310-9956-ffa450edef68 --- src/couchdb/couch_rep.erl | 83 ++++++++++++++++++++++++++++++++++++----------- 1 file changed, 64 insertions(+), 19 deletions(-) (limited to 'src/couchdb/couch_rep.erl') diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl index 98413f23..c182c001 100644 --- a/src/couchdb/couch_rep.erl +++ b/src/couchdb/couch_rep.erl @@ -45,7 +45,8 @@ complete = false, committed_seq = 0, - stats = nil + stats = nil, + doc_ids = nil }). %% convenience function to do a simple replication from the shell @@ -102,26 +103,49 @@ do_init([RepId, {PostProps}, UserCtx] = InitArgs) -> SourceProps = proplists:get_value(<<"source">>, PostProps), TargetProps = proplists:get_value(<<"target">>, PostProps), + DocIds = proplists:get_value(<<"doc_ids">>, PostProps, nil), Continuous = proplists:get_value(<<"continuous">>, PostProps, false), CreateTarget = proplists:get_value(<<"create_target">>, PostProps, false), Source = open_db(SourceProps, UserCtx), Target = open_db(TargetProps, UserCtx, CreateTarget), - SourceLog = open_replication_log(Source, RepId), - TargetLog = open_replication_log(Target, RepId), - SourceInfo = dbinfo(Source), TargetInfo = dbinfo(Target), + + case DocIds of + List when is_list(List) -> + % Fast replication using only a list of doc IDs to replicate. + % Replication sessions, checkpoints and logs are not created + % since the update sequence number of the source DB is not used + % for determining which documents are copied into the target DB. + SourceLog = nil, + TargetLog = nil, + + StartSeq = nil, + History = nil, + + ChangesFeed = nil, + MissingRevs = nil, + + {ok, Reader} = + couch_rep_reader:start_link(self(), Source, DocIds, PostProps); + + _ -> + % Replication using the _changes API (DB sequence update numbers). + SourceLog = open_replication_log(Source, RepId), + TargetLog = open_replication_log(Target, RepId), - {StartSeq, History} = compare_replication_logs(SourceLog, TargetLog), - - {ok, ChangesFeed} = - couch_rep_changes_feed:start_link(self(), Source, StartSeq, PostProps), - {ok, MissingRevs} = - couch_rep_missing_revs:start_link(self(), Target, ChangesFeed, PostProps), - {ok, Reader} = - couch_rep_reader:start_link(self(), Source, MissingRevs, PostProps), + {StartSeq, History} = compare_replication_logs(SourceLog, TargetLog), + + {ok, ChangesFeed} = + couch_rep_changes_feed:start_link(self(), Source, StartSeq, PostProps), + {ok, MissingRevs} = + couch_rep_missing_revs:start_link(self(), Target, ChangesFeed, PostProps), + {ok, Reader} = + couch_rep_reader:start_link(self(), Source, MissingRevs, PostProps) + end, + {ok, Writer} = couch_rep_writer:start_link(self(), Target, Reader, PostProps), @@ -156,7 +180,8 @@ do_init([RepId, {PostProps}, UserCtx] = InitArgs) -> target_log = TargetLog, rep_starttime = httpd_util:rfc1123_date(), src_starttime = proplists:get_value(instance_start_time, SourceInfo), - tgt_starttime = proplists:get_value(instance_start_time, TargetInfo) + tgt_starttime = proplists:get_value(instance_start_time, TargetInfo), + doc_ids = DocIds }, {ok, State}. @@ -325,20 +350,34 @@ dbinfo(Db) -> {ok, Info} = couch_db:get_db_info(Db), Info. +do_terminate(#state{doc_ids=DocIds} = State) when is_list(DocIds) -> + #state{ + listeners = Listeners, + rep_starttime = ReplicationStartTime, + stats = Stats + } = State, + + RepByDocsJson = {[ + {<<"start_time">>, ?l2b(ReplicationStartTime)}, + {<<"end_time">>, ?l2b(httpd_util:rfc1123_date())}, + {<<"docs_read">>, ets:lookup_element(Stats, docs_read, 2)}, + {<<"docs_written">>, ets:lookup_element(Stats, docs_written, 2)}, + {<<"doc_write_failures">>, + ets:lookup_element(Stats, doc_write_failures, 2)} + ]}, + + terminate_cleanup(State), + [gen_server:reply(L, {ok, RepByDocsJson}) || L <- lists:reverse(Listeners)]; + do_terminate(State) -> #state{ checkpoint_history = CheckpointHistory, committed_seq = NewSeq, listeners = Listeners, source = Source, - target = Target, continuous = Continuous, - stats = Stats, source_log = #doc{body={OldHistory}} } = State, - couch_task_status:update("Finishing"), - ets:delete(Stats), - close_db(Target), NewRepHistory = case CheckpointHistory of nil -> @@ -366,7 +405,13 @@ do_terminate(State) -> false -> [gen_server:reply(R, retry) || R <- OtherListeners] end, - close_db(Source). + terminate_cleanup(State). + +terminate_cleanup(#state{source=Source, target=Target, stats=Stats}) -> + couch_task_status:update("Finishing"), + close_db(Target), + close_db(Source), + ets:delete(Stats). has_session_id(_SessionId, []) -> false; -- cgit v1.2.3