diff options
Diffstat (limited to 'src/couchdb/couch_rep.erl')
-rw-r--r-- | src/couchdb/couch_rep.erl | 144 |
1 files changed, 15 insertions, 129 deletions
diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl index 4fabdd99..a9c156e9 100644 --- a/src/couchdb/couch_rep.erl +++ b/src/couchdb/couch_rep.erl @@ -16,12 +16,10 @@ code_change/3]). -export([replicate/2, checkpoint/1]). --export([ensure_rep_db_exists/0, make_replication_id/2]). +-export([make_replication_id/2]). -export([start_replication/3, end_replication/1, get_result/4]). --export([update_rep_doc/2]). -include("couch_db.hrl"). --include("couch_js_functions.hrl"). -include("../ibrowse/ibrowse.hrl"). -define(REP_ID_VERSION, 2). @@ -54,7 +52,6 @@ committed_seq = 0, stats = nil, - rep_doc = nil, source_db_update_notifier = nil, target_db_update_notifier = nil }). @@ -94,11 +91,11 @@ end_replication({BaseId, Extension}) -> end end. -start_replication(RepDoc, {BaseId, Extension}, UserCtx) -> +start_replication(RepDoc, {BaseId, Extension} = RepId, UserCtx) -> Replicator = { BaseId ++ Extension, {gen_server, start_link, - [?MODULE, [BaseId, RepDoc, UserCtx], []]}, + [?MODULE, [RepId, RepDoc, UserCtx], []]}, temporary, 1, worker, @@ -135,7 +132,7 @@ init(InitArgs) -> {stop, Error} end. -do_init([RepId, {PostProps} = RepDoc, UserCtx] = InitArgs) -> +do_init([{BaseId, _Ext} = RepId, {PostProps}, UserCtx] = InitArgs) -> process_flag(trap_exit, true), SourceProps = couch_util:get_value(<<"source">>, PostProps), @@ -152,10 +149,8 @@ do_init([RepId, {PostProps} = RepDoc, UserCtx] = InitArgs) -> SourceInfo = dbinfo(Source), TargetInfo = dbinfo(Target), - maybe_set_triggered(RepDoc, RepId), - [SourceLog, TargetLog] = find_replication_logs( - [Source, Target], RepId, {PostProps}, UserCtx), + [Source, Target], BaseId, {PostProps}, UserCtx), {StartSeq, History} = compare_replication_logs(SourceLog, TargetLog), {ok, ChangesFeed} = @@ -174,10 +169,12 @@ do_init([RepId, {PostProps} = RepDoc, UserCtx] = InitArgs) -> ets:insert(Stats, {docs_written, 0}), ets:insert(Stats, {doc_write_failures, 0}), - {ShortId, _} = lists:split(6, RepId), + {ShortId, _} = lists:split(6, BaseId), couch_task_status:add_task("Replication", io_lib:format("~s: ~s -> ~s", [ShortId, dbname(Source), dbname(Target)]), "Starting"), + couch_replication_manager:replication_started(RepId), + State = #state{ changes_feed = ChangesFeed, missing_revs = MissingRevs, @@ -200,7 +197,6 @@ do_init([RepId, {PostProps} = RepDoc, UserCtx] = InitArgs) -> rep_starttime = httpd_util:rfc1123_date(), src_starttime = couch_util:get_value(instance_start_time, SourceInfo), tgt_starttime = couch_util:get_value(instance_start_time, TargetInfo), - rep_doc = RepDoc, source_db_update_notifier = source_db_update_notifier(Source), target_db_update_notifier = target_db_update_notifier(Target) }, @@ -272,27 +268,24 @@ handle_info({'EXIT', _Pid, {Err, Reason}}, State) when Err == source_error; handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}. -terminate(normal, #state{checkpoint_scheduled=nil} = State) -> +terminate(normal, #state{checkpoint_scheduled=nil, init_args=[RepId | _]} = State) -> do_terminate(State), - update_rep_doc( - State#state.rep_doc, [{<<"_replication_state">>, <<"completed">>}]); + couch_replication_manager:replication_completed(RepId); -terminate(normal, State) -> +terminate(normal, #state{init_args=[RepId | _]} = State) -> timer:cancel(State#state.checkpoint_scheduled), do_terminate(do_checkpoint(State)), - update_rep_doc( - State#state.rep_doc, [{<<"_replication_state">>, <<"completed">>}]); + couch_replication_manager:replication_completed(RepId); terminate(shutdown, #state{listeners = Listeners} = State) -> % continuous replication stopped [gen_server:reply(L, {ok, stopped}) || L <- Listeners], terminate_cleanup(State); -terminate(Reason, #state{listeners = Listeners} = State) -> +terminate(Reason, #state{listeners = Listeners, init_args=[RepId | _]} = State) -> [gen_server:reply(L, {error, Reason}) || L <- Listeners], terminate_cleanup(State), - update_rep_doc( - State#state.rep_doc, [{<<"_replication_state">>, <<"error">>}]). + couch_replication_manager:replication_error(RepId, Reason). code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -698,7 +691,7 @@ do_checkpoint(State) -> src_starttime = SrcInstanceStartTime, tgt_starttime = TgtInstanceStartTime, stats = Stats, - rep_doc = {RepDoc} + init_args = [_RepId, {RepDoc} | _] } = State, case commit_to_both(Source, Target, NewSeqNum) of {SrcInstanceStartTime, TgtInstanceStartTime} -> @@ -901,113 +894,6 @@ parse_proxy_params(ProxyUrl) -> [{proxy_user, User}, {proxy_password, Passwd}] end. -update_rep_doc({Props} = _RepDoc, KVs) -> - case couch_util:get_value(<<"_id">>, Props) of - undefined -> - % replication triggered by POSTing to _replicate/ - ok; - RepDocId -> - % replication triggered by adding a Rep Doc to the replicator DB - {ok, RepDb} = ensure_rep_db_exists(), - case couch_db:open_doc(RepDb, RepDocId, []) of - {ok, LatestRepDoc} -> - update_rep_doc(RepDb, LatestRepDoc, KVs); - _ -> - ok - end, - couch_db:close(RepDb) - end. - -update_rep_doc(RepDb, #doc{body = {RepDocBody}} = RepDoc, KVs) -> - NewRepDocBody = lists:foldl( - fun({<<"_replication_state">> = K, State} = KV, Body) -> - case couch_util:get_value(K, Body) of - State -> - Body; - _ -> - Body1 = lists:keystore(K, 1, Body, KV), - lists:keystore( - <<"_replication_state_time">>, 1, - Body1, {<<"_replication_state_time">>, timestamp()}) - end; - ({K, _V} = KV, Body) -> - lists:keystore(K, 1, Body, KV) - end, - RepDocBody, - KVs - ), - case NewRepDocBody of - RepDocBody -> - ok; - _ -> - % might not succeed - when the replication doc is deleted right - % before this update (not an error) - couch_db:update_doc(RepDb, RepDoc#doc{body = {NewRepDocBody}}, []) - end. - -% RFC3339 timestamps. -% Note: doesn't include the time seconds fraction (RFC3339 says it's optional). -timestamp() -> - {{Year, Month, Day}, {Hour, Min, Sec}} = calendar:now_to_local_time(now()), - UTime = erlang:universaltime(), - LocalTime = calendar:universal_time_to_local_time(UTime), - DiffSecs = calendar:datetime_to_gregorian_seconds(LocalTime) - - calendar:datetime_to_gregorian_seconds(UTime), - zone(DiffSecs div 3600, (DiffSecs rem 3600) div 60), - iolist_to_binary( - io_lib:format("~4..0w-~2..0w-~2..0wT~2..0w:~2..0w:~2..0w~s", - [Year, Month, Day, Hour, Min, Sec, - zone(DiffSecs div 3600, (DiffSecs rem 3600) div 60)])). - -zone(Hr, Min) when Hr >= 0, Min >= 0 -> - io_lib:format("+~2..0w:~2..0w", [Hr, Min]); -zone(Hr, Min) -> - io_lib:format("-~2..0w:~2..0w", [abs(Hr), abs(Min)]). - - -maybe_set_triggered({RepProps} = RepDoc, RepId) -> - case couch_util:get_value(<<"_replication_state">>, RepProps) of - <<"triggered">> -> - ok; - _ -> - update_rep_doc( - RepDoc, - [ - {<<"_replication_state">>, <<"triggered">>}, - {<<"_replication_id">>, ?l2b(RepId)} - ] - ) - end. - -ensure_rep_db_exists() -> - DbName = ?l2b(couch_config:get("replicator", "db", "_replicator")), - Opts = [ - {user_ctx, #user_ctx{roles=[<<"_admin">>, <<"_replicator">>]}}, - sys_db - ], - case couch_db:open(DbName, Opts) of - {ok, Db} -> - Db; - _Error -> - {ok, Db} = couch_db:create(DbName, Opts) - end, - ok = ensure_rep_ddoc_exists(Db, <<"_design/_replicator">>), - {ok, Db}. - -ensure_rep_ddoc_exists(RepDb, DDocID) -> - case couch_db:open_doc(RepDb, DDocID, []) of - {ok, _Doc} -> - ok; - _ -> - DDoc = couch_doc:from_json_obj({[ - {<<"_id">>, DDocID}, - {<<"language">>, <<"javascript">>}, - {<<"validate_doc_update">>, ?REP_DB_DOC_VALIDATE_FUN} - ]}), - {ok, _Rev} = couch_db:update_doc(RepDb, DDoc, []) - end, - ok. - source_db_update_notifier(#db{name = DbName}) -> Server = self(), {ok, Notifier} = couch_db_update_notifier:start_link( |