diff options
author | Robert Newson <robert.newson@cloudant.com> | 2011-07-14 11:25:58 +0100 |
---|---|---|
committer | Adam Kocoloski <adam@cloudant.com> | 2011-08-12 21:28:27 -0400 |
commit | faf9071260147275bbac1633b599e85b4a302e8b (patch) | |
tree | f39e9d5a014f66c260905e825c56809bb2f22297 | |
parent | ceabaac4e8b213126ea7cbdeab9a91009f9bf900 (diff) |
allow replication callback module to be chosen at runtime.
-rw-r--r-- | apps/couch/src/couch_rep.erl | 31 | ||||
-rw-r--r-- | apps/couch/src/couch_replication_manager.erl | 2 |
2 files changed, 18 insertions, 15 deletions
diff --git a/apps/couch/src/couch_rep.erl b/apps/couch/src/couch_rep.erl index eabef37b..2d011aab 100644 --- a/apps/couch/src/couch_rep.erl +++ b/apps/couch/src/couch_rep.erl @@ -15,9 +15,9 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([replicate/2, checkpoint/1]). +-export([replicate/2, replicate/3, checkpoint/1]). -export([make_replication_id/2]). --export([start_replication/3, end_replication/1, get_result/4]). +-export([start_replication/4, end_replication/1, get_result/4]). -include("couch_db.hrl"). -include_lib("ibrowse/include/ibrowse.hrl"). @@ -65,13 +65,16 @@ replicate(Source, Target) when is_binary(Source), is_binary(Target) -> replicate({[{<<"source">>, Source}, {<<"target">>, Target}]}, #user_ctx{}); %% function handling POST to _replicate -replicate({Props}=PostBody, UserCtx) -> +replicate(PostBody, UserCtx) -> + replicate(PostBody, UserCtx, couch_replication_manager). + +replicate({Props}=PostBody, UserCtx, Module) -> RepId = make_replication_id(PostBody, UserCtx), case couch_util:get_value(<<"cancel">>, Props, false) of true -> end_replication(RepId); false -> - Server = start_replication(PostBody, RepId, UserCtx), + Server = start_replication(PostBody, RepId, UserCtx, Module), get_result(Server, RepId, PostBody, UserCtx) end. @@ -91,11 +94,11 @@ end_replication({BaseId, Extension}) -> end end. -start_replication(RepDoc, {BaseId, Extension} = RepId, UserCtx) -> +start_replication(RepDoc, {BaseId, Extension} = RepId, UserCtx, Module) -> Replicator = { BaseId ++ Extension, {gen_server, start_link, - [?MODULE, [RepId, RepDoc, UserCtx], []]}, + [?MODULE, [RepId, RepDoc, UserCtx, Module], []]}, temporary, 1, worker, @@ -132,7 +135,7 @@ init(InitArgs) -> {stop, Error} end. -do_init([{BaseId, _Ext} = RepId, {PostProps}, UserCtx] = InitArgs) -> +do_init([{BaseId, _Ext} = RepId, {PostProps}, UserCtx, Module] = InitArgs) -> process_flag(trap_exit, true), SourceProps = couch_util:get_value(<<"source">>, PostProps), @@ -173,7 +176,7 @@ do_init([{BaseId, _Ext} = RepId, {PostProps}, UserCtx] = InitArgs) -> couch_task_status:add_task("Replication", io_lib:format("~s: ~s -> ~s", [ShortId, dbname(Source), dbname(Target)]), "Starting"), - couch_replication_manager:replication_started(RepId), + Module:replication_started(RepId), State = #state{ changes_feed = ChangesFeed, @@ -268,24 +271,24 @@ handle_info({'EXIT', _Pid, {Err, Reason}}, State) when Err == source_error; handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}. -terminate(normal, #state{checkpoint_scheduled=nil, init_args=[RepId | _]} = State) -> +terminate(normal, #state{checkpoint_scheduled=nil, init_args=[RepId, _, _, Module]} = State) -> do_terminate(State), - couch_replication_manager:replication_completed(RepId); + Module:replication_completed(RepId); -terminate(normal, #state{init_args=[RepId | _]} = State) -> +terminate(normal, #state{init_args=[RepId, _, _, Module]} = State) -> timer:cancel(State#state.checkpoint_scheduled), do_terminate(do_checkpoint(State)), - couch_replication_manager:replication_completed(RepId); + Module:replication_completed(RepId); terminate(shutdown, #state{listeners = Listeners} = State) -> % continuous replication stopped [gen_server:reply(L, {ok, stopped}) || L <- Listeners], terminate_cleanup(State); -terminate(Reason, #state{listeners = Listeners, init_args=[RepId | _]} = State) -> +terminate(Reason, #state{listeners = Listeners, init_args=[RepId, _, _, Module]} = State) -> [gen_server:reply(L, {error, Reason}) || L <- Listeners], terminate_cleanup(State), - couch_replication_manager:replication_error(RepId, Reason). + Module:replication_error(RepId, Reason). code_change(_OldVsn, State, _Extra) -> {ok, State}. diff --git a/apps/couch/src/couch_replication_manager.erl b/apps/couch/src/couch_replication_manager.erl index b3c6db11..3f7cc27c 100644 --- a/apps/couch/src/couch_replication_manager.erl +++ b/apps/couch/src/couch_replication_manager.erl @@ -408,7 +408,7 @@ maybe_tag_rep_doc(DocId, {RepProps}, RepId) -> start_replication(Server, RepDoc, RepId, UserCtx, Wait) -> ok = timer:sleep(Wait * 1000), - case (catch couch_rep:start_replication(RepDoc, RepId, UserCtx)) of + case (catch couch_rep:start_replication(RepDoc, RepId, UserCtx, ?MODULE)) of Pid when is_pid(Pid) -> ok = gen_server:call(Server, {rep_started, RepId}, infinity), couch_rep:get_result(Pid, RepId, RepDoc, UserCtx); |