summaryrefslogtreecommitdiff
path: root/apps/couch/src/couch_rep.erl
diff options
context:
space:
mode:
authorAdam Kocoloski <adam.kocoloski@gmail.com>2011-01-20 12:43:37 -0500
committerAdam Kocoloski <adam.kocoloski@gmail.com>2011-01-20 13:05:41 -0500
commitf79d0a666a5fb9541a0925db5111208a94631065 (patch)
tree392d85a8a9887ddc8f6268a48a65537b21734a7d /apps/couch/src/couch_rep.erl
parent2ea18bdaa19ea7f2da1a5dccce65d50cf0efc64d (diff)
parent94286611038e661487382ed834103853e88fdf69 (diff)
Merge CouchDB 1.0.2 release candidate
Conflicts: Makefile.am acinclude.m4.in apps/couch/src/couch_db.erl apps/couch/src/couch_db_updater.erl apps/couch/src/couch_rep.erl apps/couch/src/couch_rep_reader.erl apps/couch/src/couch_view.erl apps/couch/src/couch_view_group.erl rel/overlay/etc/default.ini share/Makefile.am src/couchdb/couch_query_servers.erl src/ibrowse/Makefile.am src/ibrowse/ibrowse.app.in src/ibrowse/ibrowse.erl src/ibrowse/ibrowse_app.erl src/ibrowse/ibrowse_http_client.erl src/ibrowse/ibrowse_lb.erl src/ibrowse/ibrowse_lib.erl src/ibrowse/ibrowse_sup.erl src/ibrowse/ibrowse_test.erl src/mochiweb/mochijson2.erl test/etap/112-replication-missing-revs.t test/etap/113-replication-attachment-comp.t test/etap/140-attachment-comp.t
Diffstat (limited to 'apps/couch/src/couch_rep.erl')
-rw-r--r--apps/couch/src/couch_rep.erl324
1 files changed, 162 insertions, 162 deletions
diff --git a/apps/couch/src/couch_rep.erl b/apps/couch/src/couch_rep.erl
index 126639e0..c804b49d 100644
--- a/apps/couch/src/couch_rep.erl
+++ b/apps/couch/src/couch_rep.erl
@@ -15,7 +15,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
--export([replicate/2, checkpoint/1, start_link/3]).
+-export([replicate/2, checkpoint/1]).
-include("couch_db.hrl").
@@ -34,6 +34,7 @@
start_seq,
history,
+ session_id,
source_log,
target_log,
rep_starttime,
@@ -46,12 +47,11 @@
committed_seq = 0,
stats = nil,
- doc_ids = nil
+ doc_ids = nil,
+ source_db_update_notifier = nil,
+ target_db_update_notifier = nil
}).
-start_link(Id, PostBody, UserCtx) ->
- gen_server:start_link(?MODULE, [Id, PostBody, UserCtx], []).
-
%% convenience function to do a simple replication from the shell
replicate(Source, Target) when is_list(Source) ->
replicate(?l2b(Source), Target);
@@ -64,7 +64,7 @@ replicate(Source, Target) when is_binary(Source), is_binary(Target) ->
replicate({Props}=PostBody, UserCtx) ->
{BaseId, Extension} = make_replication_id(PostBody, UserCtx),
Replicator = {BaseId ++ Extension,
- {?MODULE, start_link, [BaseId, PostBody, UserCtx]},
+ {gen_server, start_link, [?MODULE, [BaseId, PostBody, UserCtx], []]},
temporary,
1,
worker,
@@ -83,15 +83,10 @@ replicate({Props}=PostBody, UserCtx) ->
false ->
Server = start_replication_server(Replicator),
- Continuous = couch_util:get_value(<<"continuous">>, Props, false),
- Async = couch_util:get_value(<<"async">>, Props, false),
- case {Continuous, Async} of
- {true, _} ->
+ case couch_util:get_value(<<"continuous">>, Props, false) of
+ true ->
{ok, {continuous, ?l2b(BaseId)}};
- {_, true} ->
- spawn(fun() -> get_result(Server, PostBody, UserCtx) end),
- Server;
- _ ->
+ false ->
get_result(Server, PostBody, UserCtx)
end
end.
@@ -113,8 +108,10 @@ get_result(Server, PostBody, UserCtx) ->
end.
init(InitArgs) ->
- try do_init(InitArgs)
- catch _:Error ->
+ try
+ do_init(InitArgs)
+ catch
+ throw:Error ->
{stop, Error}
end.
@@ -199,12 +196,15 @@ do_init([RepId, {PostProps}, UserCtx] = InitArgs) ->
start_seq = StartSeq,
history = History,
+ session_id = couch_uuids:random(),
source_log = SourceLog,
target_log = TargetLog,
rep_starttime = httpd_util:rfc1123_date(),
src_starttime = couch_util:get_value(instance_start_time, SourceInfo),
tgt_starttime = couch_util:get_value(instance_start_time, TargetInfo),
- doc_ids = DocIds
+ doc_ids = DocIds,
+ source_db_update_notifier = source_db_update_notifier(Source),
+ target_db_update_notifier = target_db_update_notifier(Target)
},
{ok, State}.
@@ -212,7 +212,21 @@ handle_call(get_result, From, #state{complete=true, listeners=[]} = State) ->
{stop, normal, State#state{listeners=[From]}};
handle_call(get_result, From, State) ->
Listeners = State#state.listeners,
- {noreply, State#state{listeners=[From|Listeners]}}.
+ {noreply, State#state{listeners=[From|Listeners]}};
+
+handle_call(get_source_db, _From, #state{source = Source} = State) ->
+ {reply, {ok, Source}, State};
+
+handle_call(get_target_db, _From, #state{target = Target} = State) ->
+ {reply, {ok, Target}, State}.
+
+handle_cast(reopen_source_db, #state{source = Source} = State) ->
+ {ok, NewSource} = couch_db:reopen(Source),
+ {noreply, State#state{source = NewSource}};
+
+handle_cast(reopen_target_db, #state{target = Target} = State) ->
+ {ok, NewTarget} = couch_db:reopen(Target),
+ {noreply, State#state{target = NewTarget}};
handle_cast(do_checkpoint, State) ->
{noreply, do_checkpoint(State)};
@@ -221,16 +235,14 @@ handle_cast(_Msg, State) ->
{noreply, State}.
handle_info({missing_revs_checkpoint, SourceSeq}, State) ->
- couch_task_status:update("MR Processed source update #~p of ~p",
- [SourceSeq, seqnum(State#state.source)]),
+ couch_task_status:update("MR Processed source update #~p", [SourceSeq]),
{noreply, schedule_checkpoint(State#state{committed_seq = SourceSeq})};
handle_info({writer_checkpoint, SourceSeq}, #state{committed_seq=N} = State)
when SourceSeq > N ->
MissingRevs = State#state.missing_revs,
ok = gen_server:cast(MissingRevs, {update_committed_seq, SourceSeq}),
- couch_task_status:update("W Processed source update #~p of ~p",
- [SourceSeq, seqnum(State#state.source)]),
+ couch_task_status:update("W Processed source update #~p", [SourceSeq]),
{noreply, schedule_checkpoint(State#state{committed_seq = SourceSeq})};
handle_info({writer_checkpoint, _}, State) ->
{noreply, State};
@@ -239,14 +251,8 @@ handle_info({update_stats, Key, N}, State) ->
ets:update_counter(State#state.stats, Key, N),
{noreply, State};
-handle_info({'DOWN', _, _, Pid, _}, State) ->
- Me = node(),
- case erlang:node(Pid) of
- Me ->
- ?LOG_INFO("replication terminating - local DB is shutting down", []);
- Node ->
- ?LOG_INFO("replication terminating - DB on ~p is shutting down", [Node])
- end,
+handle_info({'DOWN', _, _, _, _}, State) ->
+ ?LOG_INFO("replication terminating because local DB is shutting down", []),
timer:cancel(State#state.checkpoint_scheduled),
{stop, shutdown, State};
@@ -293,35 +299,40 @@ code_change(_OldVsn, State, _Extra) ->
% internal funs
start_replication_server(Replicator) ->
- start_replication_server(Replicator, fun start_child/1).
-
-start_replication_server(Replicator, StartFun) ->
- case StartFun(Replicator) of
+ RepId = element(1, Replicator),
+ case supervisor:start_child(couch_rep_sup, Replicator) of
{ok, Pid} ->
+ ?LOG_INFO("starting new replication ~p at ~p", [RepId, Pid]),
Pid;
{error, already_present} ->
- start_replication_server(Replicator, fun restart_child/1);
+ case supervisor:restart_child(couch_rep_sup, RepId) of
+ {ok, Pid} ->
+ ?LOG_INFO("starting replication ~p at ~p", [RepId, Pid]),
+ Pid;
+ {error, running} ->
+ %% this error occurs if multiple replicators are racing
+ %% each other to start and somebody else won. Just grab
+ %% the Pid by calling start_child again.
+ {error, {already_started, Pid}} =
+ supervisor:start_child(couch_rep_sup, Replicator),
+ ?LOG_DEBUG("replication ~p already running at ~p", [RepId, Pid]),
+ Pid;
+ {error, {db_not_found, DbUrl}} ->
+ throw({db_not_found, <<"could not open ", DbUrl/binary>>});
+ {error, {unauthorized, DbUrl}} ->
+ throw({unauthorized,
+ <<"unauthorized to access database ", DbUrl/binary>>})
+ end;
{error, {already_started, Pid}} ->
+ ?LOG_DEBUG("replication ~p already running at ~p", [RepId, Pid]),
Pid;
- {error, running} ->
- Children = supervisor:which_children(couch_rep_sup),
- {value, {_, Pid, _, _}} = lists:keysearch(Replicator, 1, Children),
- Pid;
- % sadly both seem to be needed. I don't know why.
{error, {{db_not_found, DbUrl}, _}} ->
throw({db_not_found, <<"could not open ", DbUrl/binary>>});
- {error, {db_not_found, DbUrl}} ->
- throw({db_not_found, <<"could not open ", DbUrl/binary>>});
- {error, {node_not_connected, Node}} ->
- throw({node_not_connected, Node})
+ {error, {{unauthorized, DbUrl}, _}} ->
+ throw({unauthorized,
+ <<"unauthorized to access database ", DbUrl/binary>>})
end.
-start_child(Replicator) ->
- supervisor:start_child(couch_rep_sup, Replicator).
-
-restart_child(Replicator) ->
- supervisor:restart_child(couch_rep_sup, element(1, Replicator)).
-
compare_replication_logs(SrcDoc, TgtDoc) ->
#doc{body={RepRecProps}} = SrcDoc,
#doc{body={RepRecPropsTgt}} = TgtDoc,
@@ -373,15 +384,9 @@ close_db(Db) ->
couch_db:close(Db).
dbname(#http_db{url = Url}) ->
- strip_password(Url);
-dbname(#db{name = Name, main_pid = MainPid}) ->
- ?l2b([Name, " (", pid_to_list(MainPid), ")"]).
-
-strip_password(Url) ->
- re:replace(Url,
- "http(s)?://([^:]+):[^@]+@(.*)$",
- "http\\1://\\2:*****@\\3",
- [{return, list}]).
+ couch_util:url_strip_password(Url);
+dbname(#db{name = Name}) ->
+ Name.
dbinfo(#http_db{} = Db) ->
{DbProps} = couch_rep_httpc:request(Db),
@@ -445,13 +450,20 @@ do_terminate(State) ->
false ->
[gen_server:reply(R, retry) || R <- OtherListeners]
end,
+ couch_task_status:update("Finishing"),
terminate_cleanup(State).
-terminate_cleanup(#state{source=Source, target=Target, stats=Stats}) ->
- couch_task_status:update("Finishing"),
- close_db(Target),
- close_db(Source),
- ets:delete(Stats).
+terminate_cleanup(State) ->
+ close_db(State#state.source),
+ close_db(State#state.target),
+ stop_db_update_notifier(State#state.source_db_update_notifier),
+ stop_db_update_notifier(State#state.target_db_update_notifier),
+ ets:delete(State#state.stats).
+
+stop_db_update_notifier(nil) ->
+ ok;
+stop_db_update_notifier(Notifier) ->
+ couch_db_update_notifier:stop(Notifier).
has_session_id(_SessionId, []) ->
false;
@@ -476,12 +488,7 @@ maybe_append_options(Options, Props) ->
make_replication_id({Props}, UserCtx) ->
%% funky algorithm to preserve backwards compatibility
- case couch_util:get_value(<<"use_hostname">>, Props, false) of
- true ->
- {ok, HostName} = inet:gethostname();
- false ->
- HostName = couch_config:get("replication", "hostname", "cloudant.com")
- end,
+ {ok, HostName} = inet:gethostname(),
% Port = mochiweb_socket_server:get(couch_httpd, port),
Src = get_rep_endpoint(UserCtx, couch_util:get_value(<<"source">>, Props)),
Tgt = get_rep_endpoint(UserCtx, couch_util:get_value(<<"target">>, Props)),
@@ -504,22 +511,15 @@ make_replication_id({Props}, UserCtx) ->
maybe_add_trailing_slash(Url) ->
re:replace(Url, "[^/]$", "&/", [{return, list}]).
-get_rep_endpoint(UserCtx, {Props}) ->
- case couch_util:get_value(<<"url">>, Props) of
+get_rep_endpoint(_UserCtx, {Props}) ->
+ Url = maybe_add_trailing_slash(couch_util:get_value(<<"url">>, Props)),
+ {BinHeaders} = couch_util:get_value(<<"headers">>, Props, {[]}),
+ {Auth} = couch_util:get_value(<<"auth">>, Props, {[]}),
+ case couch_util:get_value(<<"oauth">>, Auth) of
undefined ->
- Node = couch_util:get_value(<<"node">>, Props),
- Name = couch_util:get_value(<<"name">>, Props),
- {Node, Name, UserCtx};
- RawUrl ->
- Url = maybe_add_trailing_slash(RawUrl),
- {BinHeaders} = couch_util:get_value(<<"headers">>, Props, {[]}),
- {Auth} = couch_util:get_value(<<"auth">>, Props, {[]}),
- case couch_util:get_value(<<"oauth">>, Auth) of
- undefined ->
- {remote, Url, [{?b2l(K),?b2l(V)} || {K,V} <- BinHeaders]};
- {OAuth} ->
- {remote, Url, [{?b2l(K),?b2l(V)} || {K,V} <- BinHeaders], OAuth}
- end
+ {remote, Url, [{?b2l(K),?b2l(V)} || {K,V} <- BinHeaders]};
+ {OAuth} ->
+ {remote, Url, [{?b2l(K),?b2l(V)} || {K,V} <- BinHeaders], OAuth}
end;
get_rep_endpoint(_UserCtx, <<"http://",_/binary>>=Url) ->
{remote, maybe_add_trailing_slash(Url), []};
@@ -533,43 +533,27 @@ open_replication_log(#http_db{}=Db, RepId) ->
Req = Db#http_db{resource=couch_util:url_encode(DocId)},
case couch_rep_httpc:request(Req) of
{[{<<"error">>, _}, {<<"reason">>, _}]} ->
- % ?LOG_DEBUG("didn't find a replication log for ~s", [Db#http_db.url]),
+ ?LOG_DEBUG("didn't find a replication log for ~s", [Db#http_db.url]),
#doc{id=?l2b(DocId)};
Doc ->
- % ?LOG_DEBUG("found a replication log for ~s", [Db#http_db.url]),
+ ?LOG_DEBUG("found a replication log for ~s", [Db#http_db.url]),
couch_doc:from_json_obj(Doc)
end;
open_replication_log(Db, RepId) ->
DocId = ?l2b(?LOCAL_DOC_PREFIX ++ RepId),
case couch_db:open_doc(Db, DocId, []) of
{ok, Doc} ->
- % ?LOG_DEBUG("found a replication log for ~s", [Db#db.name]),
+ ?LOG_DEBUG("found a replication log for ~s", [Db#db.name]),
Doc;
_ ->
- % ?LOG_DEBUG("didn't find a replication log for ~s", [Db#db.name]),
+ ?LOG_DEBUG("didn't find a replication log for ~s", [Db#db.name]),
#doc{id=DocId}
end.
open_db(Props, UserCtx, ProxyParams) ->
open_db(Props, UserCtx, ProxyParams, false).
-open_db(<<"http://",_/binary>>=Url, _, ProxyParams, Create) ->
- open_remote_db({[{<<"url">>,Url}]}, ProxyParams, Create);
-open_db(<<"https://",_/binary>>=Url, _, ProxyParams, Create) ->
- open_remote_db({[{<<"url">>,Url}]}, ProxyParams, Create);
-open_db({Props}, UserCtx, ProxyParams, Create) ->
- case couch_util:get_value(<<"url">>, Props) of
- undefined ->
- Node = couch_util:get_value(<<"node">>, Props, node()),
- DbName = couch_util:get_value(<<"name">>, Props),
- open_local_db(Node, DbName, UserCtx, Create);
- _Url ->
- open_remote_db({Props}, ProxyParams, Create)
- end;
-open_db(<<DbName/binary>>, UserCtx, _ProxyParams, Create) ->
- open_local_db(node(), DbName, UserCtx, Create).
-
-open_remote_db({Props}, ProxyParams, CreateTarget) ->
+open_db({Props}, _UserCtx, ProxyParams, CreateTarget) ->
Url = maybe_add_trailing_slash(couch_util:get_value(<<"url">>, Props)),
{AuthProps} = couch_util:get_value(<<"auth">>, Props, {[]}),
{BinHeaders} = couch_util:get_value(<<"headers">>, Props, {[]}),
@@ -580,33 +564,34 @@ open_remote_db({Props}, ProxyParams, CreateTarget) ->
auth = AuthProps,
headers = lists:ukeymerge(1, Headers, DefaultHeaders)
},
- Db = Db1#http_db{options = Db1#http_db.options ++ ProxyParams},
- couch_rep_httpc:db_exists(Db, CreateTarget).
-
-open_local_db(Node, DbName, UserCtx, Create) when is_binary(Node) ->
- try open_local_db(list_to_existing_atom(?b2l(Node)), DbName, UserCtx, Create)
- catch error:badarg ->
- ?LOG_ERROR("unknown replication node ~s", [Node]),
- throw({node_not_connected, Node}) end;
-open_local_db(Node, DbName, UserCtx, Create) when is_atom(Node) ->
- case catch gen_server:call({couch_server, Node}, {open, DbName, []}, infinity) of
- {ok, #db{} = Db} ->
- couch_db:monitor(Db),
- Db#db{fd_monitor = erlang:monitor(process, Db#db.fd)};
- {ok, MainPid} when is_pid(MainPid) ->
- {ok, Db} = couch_db:open_ref_counted(MainPid, UserCtx),
- couch_db:monitor(Db),
- Db;
- {not_found, no_db_file} when Create =:= false->
- throw({db_not_found, DbName});
- {not_found, no_db_file} ->
- ok = couch_httpd:verify_is_server_admin(UserCtx),
- couch_server:create(DbName, [{user_ctx, UserCtx}]);
- {'EXIT', {{nodedown, Node}, _Stack}} ->
- throw({node_not_connected, couch_util:to_binary(Node)});
- {'EXIT', {noproc, {gen_server,call,_}}} ->
- timer:sleep(1000),
- throw({noproc, couch_server, Node})
+ Db = Db1#http_db{
+ options = Db1#http_db.options ++ ProxyParams ++
+ couch_rep_httpc:ssl_options(Db1)
+ },
+ couch_rep_httpc:db_exists(Db, CreateTarget);
+open_db(<<"http://",_/binary>>=Url, _, ProxyParams, CreateTarget) ->
+ open_db({[{<<"url">>,Url}]}, [], ProxyParams, CreateTarget);
+open_db(<<"https://",_/binary>>=Url, _, ProxyParams, CreateTarget) ->
+ open_db({[{<<"url">>,Url}]}, [], ProxyParams, CreateTarget);
+open_db(<<DbName/binary>>, UserCtx, _ProxyParams, CreateTarget) ->
+ try
+ case CreateTarget of
+ true ->
+ ok = couch_httpd:verify_is_server_admin(UserCtx),
+ couch_server:create(DbName, [{user_ctx, UserCtx}]);
+ false ->
+ ok
+ end,
+
+ case couch_db:open(DbName, [{user_ctx, UserCtx}]) of
+ {ok, Db} ->
+ couch_db:monitor(Db),
+ Db;
+ {not_found, no_db_file} ->
+ throw({db_not_found, DbName})
+ end
+ catch throw:{unauthorized, _} ->
+ throw({unauthorized, DbName})
end.
schedule_checkpoint(#state{checkpoint_scheduled = nil} = State) ->
@@ -628,6 +613,7 @@ do_checkpoint(State) ->
committed_seq = NewSeqNum,
start_seq = StartSeqNum,
history = OldHistory,
+ session_id = SessionId,
source_log = SourceLog,
target_log = TargetLog,
rep_starttime = ReplicationStartTime,
@@ -637,14 +623,8 @@ do_checkpoint(State) ->
} = State,
case commit_to_both(Source, Target, NewSeqNum) of
{SrcInstanceStartTime, TgtInstanceStartTime} ->
- ?LOG_DEBUG("recording a checkpoint for ~s -> ~s at source update_seq ~p"
- " of ~p", [dbname(Source), dbname(Target), NewSeqNum, seqnum(Source)]),
- SessionId = couch_uuids:new(),
- TargetNode = case Target of #db{main_pid=MainPid} ->
- erlang:node(MainPid);
- _ ->
- http
- end,
+ ?LOG_INFO("recording a checkpoint for ~s -> ~s at source update_seq ~p",
+ [dbname(Source), dbname(Target), NewSeqNum]),
NewHistoryEntry = {[
{<<"session_id">>, SessionId},
{<<"start_time">>, list_to_binary(ReplicationStartTime)},
@@ -663,7 +643,6 @@ do_checkpoint(State) ->
NewRepHistory = {[
{<<"session_id">>, SessionId},
{<<"source_last_seq">>, NewSeqNum},
- {<<"target_node">>, TargetNode},
{<<"history">>, lists:sublist([NewHistoryEntry | OldHistory], 50)}
]},
@@ -683,9 +662,7 @@ do_checkpoint(State) ->
"yourself?)", []),
State
end;
- Else ->
- ?LOG_INFO("wanted ~p, got ~p from commit_to_both", [
- {SrcInstanceStartTime, TgtInstanceStartTime}, Else]),
+ _Else ->
?LOG_INFO("rebooting ~s -> ~s from last known replication checkpoint",
[dbname(Source), dbname(Target)]),
#state{
@@ -717,31 +694,30 @@ commit_to_both(Source, Target, RequiredSeq) ->
{SrcCommitPid, Timestamp} ->
Timestamp;
{'EXIT', SrcCommitPid, {http_request_failed, _}} ->
- nil;
- {'EXIT', SrcCommitPid, {noproc, {gen_server, call, [_]}}} ->
- nil; % DB crashed, this should trigger a reboot
- {'EXIT', SrcCommitPid, Else} ->
- ?LOG_ERROR("new error code for crashed replication commit ~p", [Else]),
- nil
+ exit(replication_link_failure)
end,
{SourceStartTime, TargetStartTime}.
ensure_full_commit(#http_db{headers = Headers} = Target) ->
+ Headers1 = [
+ {"Content-Length", 0} |
+ couch_util:proplist_apply_field(
+ {"Content-Type", "application/json"}, Headers)
+ ],
Req = Target#http_db{
resource = "_ensure_full_commit",
method = post,
- headers = couch_util:proplist_apply_field({"Content-Type", "application/json"}, Headers)
+ headers = Headers1
},
{ResultProps} = couch_rep_httpc:request(Req),
true = couch_util:get_value(<<"ok">>, ResultProps),
couch_util:get_value(<<"instance_start_time">>, ResultProps);
-ensure_full_commit(#db{name=DbName, main_pid=Pid} = Target) ->
- TargetNode = erlang:node(Pid),
- {ok, NewDb} = rpc:call(TargetNode, couch_db, open_int, [DbName, []]),
+ensure_full_commit(Target) ->
+ {ok, NewDb} = couch_db:open_int(Target#db.name, []),
UpdateSeq = couch_db:get_update_seq(Target),
CommitSeq = couch_db:get_committed_update_seq(NewDb),
InstanceStartTime = NewDb#db.instance_start_time,
- catch couch_db:close(NewDb),
+ couch_db:close(NewDb),
if UpdateSeq > CommitSeq ->
?LOG_DEBUG("target needs a full commit: update ~p commit ~p",
[UpdateSeq, CommitSeq]),
@@ -753,11 +729,16 @@ ensure_full_commit(#db{name=DbName, main_pid=Pid} = Target) ->
end.
ensure_full_commit(#http_db{headers = Headers} = Source, RequiredSeq) ->
+ Headers1 = [
+ {"Content-Length", 0} |
+ couch_util:proplist_apply_field(
+ {"Content-Type", "application/json"}, Headers)
+ ],
Req = Source#http_db{
resource = "_ensure_full_commit",
method = post,
qs = [{seq, RequiredSeq}],
- headers = couch_util:proplist_apply_field({"Content-Type", "application/json"}, Headers)
+ headers = Headers1
},
{ResultProps} = couch_rep_httpc:request(Req),
case couch_util:get_value(<<"ok">>, ResultProps) of
@@ -801,11 +782,6 @@ up_to_date(Source, Seq) ->
couch_db:close(NewDb),
T.
-seqnum(#http_db{}) ->
- -1;
-seqnum(Db) ->
- Db#db.update_seq.
-
parse_proxy_params(ProxyUrl) when is_binary(ProxyUrl) ->
parse_proxy_params(?b2l(ProxyUrl));
parse_proxy_params([]) ->
@@ -820,3 +796,27 @@ parse_proxy_params(ProxyUrl) ->
true ->
[{proxy_user, User}, {proxy_password, Passwd}]
end.
+
+source_db_update_notifier(#db{name = DbName}) ->
+ Server = self(),
+ {ok, Notifier} = couch_db_update_notifier:start_link(
+ fun({compacted, DbName1}) when DbName1 =:= DbName ->
+ ok = gen_server:cast(Server, reopen_source_db);
+ (_) ->
+ ok
+ end),
+ Notifier;
+source_db_update_notifier(_) ->
+ nil.
+
+target_db_update_notifier(#db{name = DbName}) ->
+ Server = self(),
+ {ok, Notifier} = couch_db_update_notifier:start_link(
+ fun({compacted, DbName1}) when DbName1 =:= DbName ->
+ ok = gen_server:cast(Server, reopen_target_db);
+ (_) ->
+ ok
+ end),
+ Notifier;
+target_db_update_notifier(_) ->
+ nil.