summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/couchdb/couch_rep.erl83
-rw-r--r--src/couchdb/couch_rep_reader.erl63
-rw-r--r--src/couchdb/couch_rep_writer.erl9
3 files changed, 131 insertions, 24 deletions
diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl
index 98413f23..c182c001 100644
--- a/src/couchdb/couch_rep.erl
+++ b/src/couchdb/couch_rep.erl
@@ -45,7 +45,8 @@
complete = false,
committed_seq = 0,
- stats = nil
+ stats = nil,
+ doc_ids = nil
}).
%% convenience function to do a simple replication from the shell
@@ -102,26 +103,49 @@ do_init([RepId, {PostProps}, UserCtx] = InitArgs) ->
SourceProps = proplists:get_value(<<"source">>, PostProps),
TargetProps = proplists:get_value(<<"target">>, PostProps),
+ DocIds = proplists:get_value(<<"doc_ids">>, PostProps, nil),
Continuous = proplists:get_value(<<"continuous">>, PostProps, false),
CreateTarget = proplists:get_value(<<"create_target">>, PostProps, false),
Source = open_db(SourceProps, UserCtx),
Target = open_db(TargetProps, UserCtx, CreateTarget),
- SourceLog = open_replication_log(Source, RepId),
- TargetLog = open_replication_log(Target, RepId),
-
SourceInfo = dbinfo(Source),
TargetInfo = dbinfo(Target),
+
+ case DocIds of
+ List when is_list(List) ->
+ % Fast replication using only a list of doc IDs to replicate.
+ % Replication sessions, checkpoints and logs are not created
+ % since the update sequence number of the source DB is not used
+ % for determining which documents are copied into the target DB.
+ SourceLog = nil,
+ TargetLog = nil,
+
+ StartSeq = nil,
+ History = nil,
+
+ ChangesFeed = nil,
+ MissingRevs = nil,
+
+ {ok, Reader} =
+ couch_rep_reader:start_link(self(), Source, DocIds, PostProps);
+
+ _ ->
+ % Replication using the _changes API (DB sequence update numbers).
+ SourceLog = open_replication_log(Source, RepId),
+ TargetLog = open_replication_log(Target, RepId),
- {StartSeq, History} = compare_replication_logs(SourceLog, TargetLog),
-
- {ok, ChangesFeed} =
- couch_rep_changes_feed:start_link(self(), Source, StartSeq, PostProps),
- {ok, MissingRevs} =
- couch_rep_missing_revs:start_link(self(), Target, ChangesFeed, PostProps),
- {ok, Reader} =
- couch_rep_reader:start_link(self(), Source, MissingRevs, PostProps),
+ {StartSeq, History} = compare_replication_logs(SourceLog, TargetLog),
+
+ {ok, ChangesFeed} =
+ couch_rep_changes_feed:start_link(self(), Source, StartSeq, PostProps),
+ {ok, MissingRevs} =
+ couch_rep_missing_revs:start_link(self(), Target, ChangesFeed, PostProps),
+ {ok, Reader} =
+ couch_rep_reader:start_link(self(), Source, MissingRevs, PostProps)
+ end,
+
{ok, Writer} =
couch_rep_writer:start_link(self(), Target, Reader, PostProps),
@@ -156,7 +180,8 @@ do_init([RepId, {PostProps}, UserCtx] = InitArgs) ->
target_log = TargetLog,
rep_starttime = httpd_util:rfc1123_date(),
src_starttime = proplists:get_value(instance_start_time, SourceInfo),
- tgt_starttime = proplists:get_value(instance_start_time, TargetInfo)
+ tgt_starttime = proplists:get_value(instance_start_time, TargetInfo),
+ doc_ids = DocIds
},
{ok, State}.
@@ -325,20 +350,34 @@ dbinfo(Db) ->
{ok, Info} = couch_db:get_db_info(Db),
Info.
+do_terminate(#state{doc_ids=DocIds} = State) when is_list(DocIds) ->
+ #state{
+ listeners = Listeners,
+ rep_starttime = ReplicationStartTime,
+ stats = Stats
+ } = State,
+
+ RepByDocsJson = {[
+ {<<"start_time">>, ?l2b(ReplicationStartTime)},
+ {<<"end_time">>, ?l2b(httpd_util:rfc1123_date())},
+ {<<"docs_read">>, ets:lookup_element(Stats, docs_read, 2)},
+ {<<"docs_written">>, ets:lookup_element(Stats, docs_written, 2)},
+ {<<"doc_write_failures">>,
+ ets:lookup_element(Stats, doc_write_failures, 2)}
+ ]},
+
+ terminate_cleanup(State),
+ [gen_server:reply(L, {ok, RepByDocsJson}) || L <- lists:reverse(Listeners)];
+
do_terminate(State) ->
#state{
checkpoint_history = CheckpointHistory,
committed_seq = NewSeq,
listeners = Listeners,
source = Source,
- target = Target,
continuous = Continuous,
- stats = Stats,
source_log = #doc{body={OldHistory}}
} = State,
- couch_task_status:update("Finishing"),
- ets:delete(Stats),
- close_db(Target),
NewRepHistory = case CheckpointHistory of
nil ->
@@ -366,7 +405,13 @@ do_terminate(State) ->
false ->
[gen_server:reply(R, retry) || R <- OtherListeners]
end,
- close_db(Source).
+ terminate_cleanup(State).
+
+terminate_cleanup(#state{source=Source, target=Target, stats=Stats}) ->
+ couch_task_status:update("Finishing"),
+ close_db(Target),
+ close_db(Source),
+ ets:delete(Stats).
has_session_id(_SessionId, []) ->
false;
diff --git a/src/couchdb/couch_rep_reader.erl b/src/couchdb/couch_rep_reader.erl
index a66454c8..b8326fb3 100644
--- a/src/couchdb/couch_rep_reader.erl
+++ b/src/couchdb/couch_rep_reader.erl
@@ -43,13 +43,15 @@
opened_seqs = []
}).
-start_link(Parent, Source, MissingRevs, PostProps) ->
- gen_server:start_link(?MODULE, [Parent, Source, MissingRevs, PostProps], []).
+start_link(Parent, Source, MissingRevs_or_DocIds, PostProps) ->
+ gen_server:start_link(
+ ?MODULE, [Parent, Source, MissingRevs_or_DocIds, PostProps], []
+ ).
next(Pid) ->
gen_server:call(Pid, next_docs, infinity).
-init([Parent, Source, MissingRevs, _PostProps]) ->
+init([Parent, Source, MissingRevs_or_DocIds, PostProps]) ->
process_flag(trap_exit, true),
if is_record(Source, http_db) ->
#url{host=Host, port=Port} = ibrowse_lib:parse_url(Source#http_db.url),
@@ -57,7 +59,15 @@ init([Parent, Source, MissingRevs, _PostProps]) ->
ibrowse:set_max_pipeline_size(Host, Port, ?MAX_PIPELINE_SIZE);
true -> ok end,
Self = self(),
- ReaderLoop = spawn_link(fun() -> reader_loop(Self, Source, MissingRevs) end),
+ ReaderLoop = spawn_link(
+ fun() -> reader_loop(Self, Source, MissingRevs_or_DocIds) end
+ ),
+ MissingRevs = case MissingRevs_or_DocIds of
+ Pid when is_pid(Pid) ->
+ Pid;
+ _ListDocIds ->
+ nil
+ end,
State = #state{
parent = Parent,
source = Source,
@@ -167,6 +177,8 @@ handle_reader_loop_complete(#state{monitor_count=0} = State) ->
handle_reader_loop_complete(State) ->
{noreply, State#state{complete = waiting_on_monitors}}.
+calculate_new_high_seq(#state{missing_revs=nil}) ->
+ nil;
calculate_new_high_seq(#state{requested_seqs=[], opened_seqs=[Open|_]}) ->
Open;
calculate_new_high_seq(#state{requested_seqs=[Req|_], opened_seqs=[Open|_]})
@@ -191,6 +203,8 @@ split_revlist(Rev, {[CurrentAcc|Rest], BaseLength, Length}) ->
% opened seqs greater than the smallest outstanding request. I believe its the
% minimal set of info needed to correctly calculate which seqs have been
% replicated (because remote docs can be opened out-of-order) -- APK
+update_sequence_lists(_Seq, #state{missing_revs=nil} = State) ->
+ State;
update_sequence_lists(Seq, State) ->
Requested = lists:delete(Seq, State#state.requested_seqs),
AllOpened = lists:merge([Seq], State#state.opened_seqs),
@@ -234,6 +248,37 @@ open_doc_revs(#http_db{} = DbS, DocId, Revs) ->
end,
[Transform(Result) || Result <- JsonResults].
+open_doc(#http_db{} = DbS, DocId) ->
+ % get latest rev of the doc
+ Req = DbS#http_db{resource=url_encode(DocId)},
+ case couch_rep_httpc:request(Req) of
+ {[{<<"error">>,<<"not_found">>}, {<<"reason">>,<<"missing">>}]} ->
+ [];
+ Json ->
+ #doc{id=Id, revs=Rev, atts=Atts} = Doc = couch_doc:from_json_obj(Json),
+ [Doc#doc{
+ atts=[couch_rep_att:convert_stub(A, {DbS,Id,Rev}) || A <- Atts]
+ }]
+ end.
+
+reader_loop(ReaderServer, Source, DocIds) when is_list(DocIds) ->
+ case Source of
+ #http_db{} ->
+ [gen_server:call(ReaderServer, {open_remote_doc, Id, nil, nil},
+ infinity) || Id <- DocIds];
+ _LocalDb ->
+ Docs = lists:foldr(fun(Id, Acc) ->
+ case couch_db:open_doc(Source, Id) of
+ {ok, Doc} ->
+ [Doc | Acc];
+ _ ->
+ Acc
+ end
+ end, [], DocIds),
+ gen_server:call(ReaderServer, {add_docs, nil, Docs}, infinity)
+ end,
+ exit(complete);
+
reader_loop(ReaderServer, Source, MissingRevsServer) ->
case couch_rep_missing_revs:next(MissingRevsServer) of
complete ->
@@ -267,6 +312,8 @@ maybe_reopen_db(#db{update_seq=OldSeq} = Db, HighSeq) when HighSeq > OldSeq ->
maybe_reopen_db(Db, _HighSeq) ->
Db.
+spawn_document_request(Source, Id, nil, nil) ->
+ spawn_document_request(Source, Id);
spawn_document_request(Source, Id, Seq, Revs) ->
Server = self(),
SpawnFun = fun() ->
@@ -274,3 +321,11 @@ spawn_document_request(Source, Id, Seq, Revs) ->
gen_server:call(Server, {add_docs, Seq, Results}, infinity)
end,
spawn_monitor(SpawnFun).
+
+spawn_document_request(Source, Id) ->
+ Server = self(),
+ SpawnFun = fun() ->
+ Results = open_doc(Source, Id),
+ gen_server:call(Server, {add_docs, nil, Results}, infinity)
+ end,
+ spawn_monitor(SpawnFun).
diff --git a/src/couchdb/couch_rep_writer.erl b/src/couchdb/couch_rep_writer.erl
index b86028ce..269b9799 100644
--- a/src/couchdb/couch_rep_writer.erl
+++ b/src/couchdb/couch_rep_writer.erl
@@ -21,6 +21,8 @@ start_link(Parent, Target, Reader, _PostProps) ->
writer_loop(Parent, Reader, Target) ->
case couch_rep_reader:next(Reader) of
+ {complete, nil} ->
+ ok;
{complete, FinalSeq} ->
Parent ! {writer_checkpoint, FinalSeq},
ok;
@@ -38,7 +40,12 @@ writer_loop(Parent, Reader, Target) ->
?LOG_DEBUG("writer failed to write an attachment ~p", [Err]),
exit({attachment_request_failed, Err, Docs})
end,
- Parent ! {writer_checkpoint, HighSeq},
+ case HighSeq of
+ nil ->
+ ok;
+ _SeqNumber ->
+ Parent ! {writer_checkpoint, HighSeq}
+ end,
couch_rep_att:cleanup(),
couch_util:should_flush(),
writer_loop(Parent, Reader, Target)