summaryrefslogtreecommitdiff
path: root/apps/couch/src
diff options
context:
space:
mode:
authorRobert Newson <robert.newson@cloudant.com>2011-07-14 11:25:58 +0100
committerAdam Kocoloski <adam@cloudant.com>2011-08-12 21:28:27 -0400
commitfaf9071260147275bbac1633b599e85b4a302e8b (patch)
treef39e9d5a014f66c260905e825c56809bb2f22297 /apps/couch/src
parentceabaac4e8b213126ea7cbdeab9a91009f9bf900 (diff)
allow replication callback module to be chosen at runtime.
Diffstat (limited to 'apps/couch/src')
-rw-r--r--apps/couch/src/couch_rep.erl31
-rw-r--r--apps/couch/src/couch_replication_manager.erl2
2 files changed, 18 insertions, 15 deletions
diff --git a/apps/couch/src/couch_rep.erl b/apps/couch/src/couch_rep.erl
index eabef37b..2d011aab 100644
--- a/apps/couch/src/couch_rep.erl
+++ b/apps/couch/src/couch_rep.erl
@@ -15,9 +15,9 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
--export([replicate/2, checkpoint/1]).
+-export([replicate/2, replicate/3, checkpoint/1]).
-export([make_replication_id/2]).
--export([start_replication/3, end_replication/1, get_result/4]).
+-export([start_replication/4, end_replication/1, get_result/4]).
-include("couch_db.hrl").
-include_lib("ibrowse/include/ibrowse.hrl").
@@ -65,13 +65,16 @@ replicate(Source, Target) when is_binary(Source), is_binary(Target) ->
replicate({[{<<"source">>, Source}, {<<"target">>, Target}]}, #user_ctx{});
%% function handling POST to _replicate
-replicate({Props}=PostBody, UserCtx) ->
+replicate(PostBody, UserCtx) ->
+ replicate(PostBody, UserCtx, couch_replication_manager).
+
+replicate({Props}=PostBody, UserCtx, Module) ->
RepId = make_replication_id(PostBody, UserCtx),
case couch_util:get_value(<<"cancel">>, Props, false) of
true ->
end_replication(RepId);
false ->
- Server = start_replication(PostBody, RepId, UserCtx),
+ Server = start_replication(PostBody, RepId, UserCtx, Module),
get_result(Server, RepId, PostBody, UserCtx)
end.
@@ -91,11 +94,11 @@ end_replication({BaseId, Extension}) ->
end
end.
-start_replication(RepDoc, {BaseId, Extension} = RepId, UserCtx) ->
+start_replication(RepDoc, {BaseId, Extension} = RepId, UserCtx, Module) ->
Replicator = {
BaseId ++ Extension,
{gen_server, start_link,
- [?MODULE, [RepId, RepDoc, UserCtx], []]},
+ [?MODULE, [RepId, RepDoc, UserCtx, Module], []]},
temporary,
1,
worker,
@@ -132,7 +135,7 @@ init(InitArgs) ->
{stop, Error}
end.
-do_init([{BaseId, _Ext} = RepId, {PostProps}, UserCtx] = InitArgs) ->
+do_init([{BaseId, _Ext} = RepId, {PostProps}, UserCtx, Module] = InitArgs) ->
process_flag(trap_exit, true),
SourceProps = couch_util:get_value(<<"source">>, PostProps),
@@ -173,7 +176,7 @@ do_init([{BaseId, _Ext} = RepId, {PostProps}, UserCtx] = InitArgs) ->
couch_task_status:add_task("Replication", io_lib:format("~s: ~s -> ~s",
[ShortId, dbname(Source), dbname(Target)]), "Starting"),
- couch_replication_manager:replication_started(RepId),
+ Module:replication_started(RepId),
State = #state{
changes_feed = ChangesFeed,
@@ -268,24 +271,24 @@ handle_info({'EXIT', _Pid, {Err, Reason}}, State) when Err == source_error;
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State}.
-terminate(normal, #state{checkpoint_scheduled=nil, init_args=[RepId | _]} = State) ->
+terminate(normal, #state{checkpoint_scheduled=nil, init_args=[RepId, _, _, Module]} = State) ->
do_terminate(State),
- couch_replication_manager:replication_completed(RepId);
+ Module:replication_completed(RepId);
-terminate(normal, #state{init_args=[RepId | _]} = State) ->
+terminate(normal, #state{init_args=[RepId, _, _, Module]} = State) ->
timer:cancel(State#state.checkpoint_scheduled),
do_terminate(do_checkpoint(State)),
- couch_replication_manager:replication_completed(RepId);
+ Module:replication_completed(RepId);
terminate(shutdown, #state{listeners = Listeners} = State) ->
% continuous replication stopped
[gen_server:reply(L, {ok, stopped}) || L <- Listeners],
terminate_cleanup(State);
-terminate(Reason, #state{listeners = Listeners, init_args=[RepId | _]} = State) ->
+terminate(Reason, #state{listeners = Listeners, init_args=[RepId, _, _, Module]} = State) ->
[gen_server:reply(L, {error, Reason}) || L <- Listeners],
terminate_cleanup(State),
- couch_replication_manager:replication_error(RepId, Reason).
+ Module:replication_error(RepId, Reason).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
diff --git a/apps/couch/src/couch_replication_manager.erl b/apps/couch/src/couch_replication_manager.erl
index b3c6db11..3f7cc27c 100644
--- a/apps/couch/src/couch_replication_manager.erl
+++ b/apps/couch/src/couch_replication_manager.erl
@@ -408,7 +408,7 @@ maybe_tag_rep_doc(DocId, {RepProps}, RepId) ->
start_replication(Server, RepDoc, RepId, UserCtx, Wait) ->
ok = timer:sleep(Wait * 1000),
- case (catch couch_rep:start_replication(RepDoc, RepId, UserCtx)) of
+ case (catch couch_rep:start_replication(RepDoc, RepId, UserCtx, ?MODULE)) of
Pid when is_pid(Pid) ->
ok = gen_server:call(Server, {rep_started, RepId}, infinity),
couch_rep:get_result(Pid, RepId, RepDoc, UserCtx);