summaryrefslogtreecommitdiff
path: root/src/couchdb/couch_rep.erl
diff options
context:
space:
mode:
authorFilipe David Borba Manana <fdmanana@apache.org>2010-11-16 20:12:05 +0000
committerFilipe David Borba Manana <fdmanana@apache.org>2010-11-16 20:12:05 +0000
commit5afece2131f4ef4000d768b671bddbbf714303d9 (patch)
tree921522f0085d0de787e7857a73769d76863087e4 /src/couchdb/couch_rep.erl
parent2bc2dc1ad85943214a21597723ab4b357fbe1766 (diff)
Replicator: use the new builtin _doc_ids filter for the by doc IDs replication.
This reduces code complexity and allows for continuous by doc IDs replication. git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@1035780 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src/couchdb/couch_rep.erl')
-rw-r--r--src/couchdb/couch_rep.erl105
1 files changed, 40 insertions, 65 deletions
diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl
index 88912fbf..d35471c5 100644
--- a/src/couchdb/couch_rep.erl
+++ b/src/couchdb/couch_rep.erl
@@ -53,7 +53,6 @@
committed_seq = 0,
stats = nil,
- doc_ids = nil,
rep_doc = nil
}).
@@ -129,7 +128,6 @@ do_init([RepId, {PostProps} = RepDoc, UserCtx] = InitArgs) ->
SourceProps = couch_util:get_value(<<"source">>, PostProps),
TargetProps = couch_util:get_value(<<"target">>, PostProps),
- DocIds = couch_util:get_value(<<"doc_ids">>, PostProps, nil),
Continuous = couch_util:get_value(<<"continuous">>, PostProps, false),
CreateTarget = couch_util:get_value(<<"create_target">>, PostProps, false),
@@ -143,40 +141,16 @@ do_init([RepId, {PostProps} = RepDoc, UserCtx] = InitArgs) ->
maybe_set_triggered(RepDoc, RepId),
- case DocIds of
- List when is_list(List) ->
- % Fast replication using only a list of doc IDs to replicate.
- % Replication sessions, checkpoints and logs are not created
- % since the update sequence number of the source DB is not used
- % for determining which documents are copied into the target DB.
- SourceLog = nil,
- TargetLog = nil,
+ [SourceLog, TargetLog] = find_replication_logs(
+ [Source, Target], RepId, {PostProps}, UserCtx),
+ {StartSeq, History} = compare_replication_logs(SourceLog, TargetLog),
- StartSeq = nil,
- History = nil,
-
- ChangesFeed = nil,
- MissingRevs = nil,
-
- {ok, Reader} =
- couch_rep_reader:start_link(self(), Source, DocIds, PostProps);
-
- _ ->
- % Replication using the _changes API (DB sequence update numbers).
-
- [SourceLog, TargetLog] = find_replication_logs(
- [Source, Target], RepId, {PostProps}, UserCtx),
-
- {StartSeq, History} = compare_replication_logs(SourceLog, TargetLog),
-
- {ok, ChangesFeed} =
+ {ok, ChangesFeed} =
couch_rep_changes_feed:start_link(self(), Source, StartSeq, PostProps),
- {ok, MissingRevs} =
+ {ok, MissingRevs} =
couch_rep_missing_revs:start_link(self(), Target, ChangesFeed, PostProps),
- {ok, Reader} =
- couch_rep_reader:start_link(self(), Source, MissingRevs, PostProps)
- end,
-
+ {ok, Reader} =
+ couch_rep_reader:start_link(self(), Source, MissingRevs, PostProps),
{ok, Writer} =
couch_rep_writer:start_link(self(), Target, Reader, PostProps),
@@ -213,7 +187,6 @@ do_init([RepId, {PostProps} = RepDoc, UserCtx] = InitArgs) ->
rep_starttime = httpd_util:rfc1123_date(),
src_starttime = couch_util:get_value(instance_start_time, SourceInfo),
tgt_starttime = couch_util:get_value(instance_start_time, TargetInfo),
- doc_ids = DocIds,
rep_doc = RepDoc
},
{ok, State}.
@@ -390,25 +363,6 @@ dbinfo(Db) ->
{ok, Info} = couch_db:get_db_info(Db),
Info.
-do_terminate(#state{doc_ids=DocIds} = State) when is_list(DocIds) ->
- #state{
- listeners = Listeners,
- rep_starttime = ReplicationStartTime,
- stats = Stats
- } = State,
-
- RepByDocsJson = {[
- {<<"start_time">>, ?l2b(ReplicationStartTime)},
- {<<"end_time">>, ?l2b(httpd_util:rfc1123_date())},
- {<<"docs_read">>, ets:lookup_element(Stats, docs_read, 2)},
- {<<"docs_written">>, ets:lookup_element(Stats, docs_written, 2)},
- {<<"doc_write_failures">>,
- ets:lookup_element(Stats, doc_write_failures, 2)}
- ]},
-
- terminate_cleanup(State),
- [gen_server:reply(L, {ok, RepByDocsJson}) || L <- lists:reverse(Listeners)];
-
do_terminate(State) ->
#state{
checkpoint_history = CheckpointHistory,
@@ -659,32 +613,53 @@ do_checkpoint(State) ->
rep_starttime = ReplicationStartTime,
src_starttime = SrcInstanceStartTime,
tgt_starttime = TgtInstanceStartTime,
- stats = Stats
+ stats = Stats,
+ rep_doc = {RepDoc}
} = State,
case commit_to_both(Source, Target, NewSeqNum) of
{SrcInstanceStartTime, TgtInstanceStartTime} ->
?LOG_INFO("recording a checkpoint for ~s -> ~s at source update_seq ~p",
[dbname(Source), dbname(Target), NewSeqNum]),
+ EndTime = ?l2b(httpd_util:rfc1123_date()),
+ StartTime = ?l2b(ReplicationStartTime),
+ DocsRead = ets:lookup_element(Stats, docs_read, 2),
+ DocsWritten = ets:lookup_element(Stats, docs_written, 2),
+ DocWriteFailures = ets:lookup_element(Stats, doc_write_failures, 2),
NewHistoryEntry = {[
{<<"session_id">>, SessionId},
- {<<"start_time">>, list_to_binary(ReplicationStartTime)},
- {<<"end_time">>, list_to_binary(httpd_util:rfc1123_date())},
+ {<<"start_time">>, StartTime},
+ {<<"end_time">>, EndTime},
{<<"start_last_seq">>, StartSeqNum},
{<<"end_last_seq">>, NewSeqNum},
{<<"recorded_seq">>, NewSeqNum},
{<<"missing_checked">>, ets:lookup_element(Stats, total_revs, 2)},
{<<"missing_found">>, ets:lookup_element(Stats, missing_revs, 2)},
- {<<"docs_read">>, ets:lookup_element(Stats, docs_read, 2)},
- {<<"docs_written">>, ets:lookup_element(Stats, docs_written, 2)},
- {<<"doc_write_failures">>,
- ets:lookup_element(Stats, doc_write_failures, 2)}
+ {<<"docs_read">>, DocsRead},
+ {<<"docs_written">>, DocsWritten},
+ {<<"doc_write_failures">>, DocWriteFailures}
]},
- % limit history to 50 entries
- NewRepHistory = {[
+ BaseHistory = [
{<<"session_id">>, SessionId},
- {<<"source_last_seq">>, NewSeqNum},
- {<<"history">>, lists:sublist([NewHistoryEntry | OldHistory], 50)}
- ]},
+ {<<"source_last_seq">>, NewSeqNum}
+ ] ++ case couch_util:get_value(<<"doc_ids">>, RepDoc) of
+ undefined ->
+ [];
+ DocIds when is_list(DocIds) ->
+ % backwards compatibility with the result of a replication by
+ % doc IDs in versions 0.11.x and 1.0.x
+ [
+ {<<"start_time">>, StartTime},
+ {<<"end_time">>, EndTime},
+ {<<"docs_read">>, DocsRead},
+ {<<"docs_written">>, DocsWritten},
+ {<<"doc_write_failures">>, DocWriteFailures}
+ ]
+ end,
+ % limit history to 50 entries
+ NewRepHistory = {
+ BaseHistory ++
+ [{<<"history">>, lists:sublist([NewHistoryEntry | OldHistory], 50)}]
+ },
try
{SrcRevPos,SrcRevId} =