summaryrefslogtreecommitdiff
path: root/apps/couch/src/couch_rep.erl
diff options
context:
space:
mode:
authorAdam Kocoloski <adam@cloudant.com>2010-08-18 11:51:03 -0400
committerAdam Kocoloski <adam@cloudant.com>2010-08-18 14:24:57 -0400
commit7393d62b7b630bee50f609d0ae8125d33f7cda2b (patch)
tree754e9ab17a586319c562de488e60056feff60bb8 /apps/couch/src/couch_rep.erl
parentc0cb2625f25a2b51485c164bea1d8822f449ce14 (diff)
Grab bag of Cloudant patches to couch OTP application
- Removal of couch_db and couch_ref_counter processes. Active DBs are accessible through a protected ets table owned by couch_server. - #full_doc_info{} in by_id and by_seq trees for faster compaction at the expense of more disk usage afterwards. Proposed as COUCHDB-738 but not accepted upstream. - Replication via distributed Erlang. - Better hot upgrade support (uses exported functions much more often). - Configurable btree chunk sizes allow for larger (but still bounded) reductions. - Shorter names for btree fields in #db{} and #db_header{}. - couch_view_group does not keep a reference to the #db{}. - Terms are stored compressed (again).
Diffstat (limited to 'apps/couch/src/couch_rep.erl')
-rw-r--r--apps/couch/src/couch_rep.erl214
1 files changed, 144 insertions, 70 deletions
diff --git a/apps/couch/src/couch_rep.erl b/apps/couch/src/couch_rep.erl
index 65573e8c..126639e0 100644
--- a/apps/couch/src/couch_rep.erl
+++ b/apps/couch/src/couch_rep.erl
@@ -15,7 +15,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
--export([replicate/2, checkpoint/1]).
+-export([replicate/2, checkpoint/1, start_link/3]).
-include("couch_db.hrl").
@@ -49,6 +49,9 @@
doc_ids = nil
}).
+start_link(Id, PostBody, UserCtx) ->
+ gen_server:start_link(?MODULE, [Id, PostBody, UserCtx], []).
+
%% convenience function to do a simple replication from the shell
replicate(Source, Target) when is_list(Source) ->
replicate(?l2b(Source), Target);
@@ -61,7 +64,7 @@ replicate(Source, Target) when is_binary(Source), is_binary(Target) ->
replicate({Props}=PostBody, UserCtx) ->
{BaseId, Extension} = make_replication_id(PostBody, UserCtx),
Replicator = {BaseId ++ Extension,
- {gen_server, start_link, [?MODULE, [BaseId, PostBody, UserCtx], []]},
+ {?MODULE, start_link, [BaseId, PostBody, UserCtx]},
temporary,
1,
worker,
@@ -80,10 +83,15 @@ replicate({Props}=PostBody, UserCtx) ->
false ->
Server = start_replication_server(Replicator),
- case couch_util:get_value(<<"continuous">>, Props, false) of
- true ->
+ Continuous = couch_util:get_value(<<"continuous">>, Props, false),
+ Async = couch_util:get_value(<<"async">>, Props, false),
+ case {Continuous, Async} of
+ {true, _} ->
{ok, {continuous, ?l2b(BaseId)}};
- false ->
+ {_, true} ->
+ spawn(fun() -> get_result(Server, PostBody, UserCtx) end),
+ Server;
+ _ ->
get_result(Server, PostBody, UserCtx)
end
end.
@@ -106,7 +114,9 @@ get_result(Server, PostBody, UserCtx) ->
init(InitArgs) ->
try do_init(InitArgs)
- catch throw:{db_not_found, DbUrl} -> {stop, {db_not_found, DbUrl}} end.
+ catch _:Error ->
+ {stop, Error}
+ end.
do_init([RepId, {PostProps}, UserCtx] = InitArgs) ->
process_flag(trap_exit, true),
@@ -211,14 +221,16 @@ handle_cast(_Msg, State) ->
{noreply, State}.
handle_info({missing_revs_checkpoint, SourceSeq}, State) ->
- couch_task_status:update("MR Processed source update #~p", [SourceSeq]),
+ couch_task_status:update("MR Processed source update #~p of ~p",
+ [SourceSeq, seqnum(State#state.source)]),
{noreply, schedule_checkpoint(State#state{committed_seq = SourceSeq})};
handle_info({writer_checkpoint, SourceSeq}, #state{committed_seq=N} = State)
when SourceSeq > N ->
MissingRevs = State#state.missing_revs,
ok = gen_server:cast(MissingRevs, {update_committed_seq, SourceSeq}),
- couch_task_status:update("W Processed source update #~p", [SourceSeq]),
+ couch_task_status:update("W Processed source update #~p of ~p",
+ [SourceSeq, seqnum(State#state.source)]),
{noreply, schedule_checkpoint(State#state{committed_seq = SourceSeq})};
handle_info({writer_checkpoint, _}, State) ->
{noreply, State};
@@ -227,8 +239,14 @@ handle_info({update_stats, Key, N}, State) ->
ets:update_counter(State#state.stats, Key, N),
{noreply, State};
-handle_info({'DOWN', _, _, _, _}, State) ->
- ?LOG_INFO("replication terminating because local DB is shutting down", []),
+handle_info({'DOWN', _, _, Pid, _}, State) ->
+ Me = node(),
+ case erlang:node(Pid) of
+ Me ->
+ ?LOG_INFO("replication terminating - local DB is shutting down", []);
+ Node ->
+ ?LOG_INFO("replication terminating - DB on ~p is shutting down", [Node])
+ end,
timer:cancel(State#state.checkpoint_scheduled),
{stop, shutdown, State};
@@ -275,34 +293,35 @@ code_change(_OldVsn, State, _Extra) ->
% internal funs
start_replication_server(Replicator) ->
- RepId = element(1, Replicator),
- case supervisor:start_child(couch_rep_sup, Replicator) of
+ start_replication_server(Replicator, fun start_child/1).
+
+start_replication_server(Replicator, StartFun) ->
+ case StartFun(Replicator) of
{ok, Pid} ->
- ?LOG_INFO("starting new replication ~p at ~p", [RepId, Pid]),
Pid;
{error, already_present} ->
- case supervisor:restart_child(couch_rep_sup, RepId) of
- {ok, Pid} ->
- ?LOG_INFO("starting replication ~p at ~p", [RepId, Pid]),
- Pid;
- {error, running} ->
- %% this error occurs if multiple replicators are racing
- %% each other to start and somebody else won. Just grab
- %% the Pid by calling start_child again.
- {error, {already_started, Pid}} =
- supervisor:start_child(couch_rep_sup, Replicator),
- ?LOG_DEBUG("replication ~p already running at ~p", [RepId, Pid]),
- Pid;
- {error, {db_not_found, DbUrl}} ->
- throw({db_not_found, <<"could not open ", DbUrl/binary>>})
- end;
+ start_replication_server(Replicator, fun restart_child/1);
{error, {already_started, Pid}} ->
- ?LOG_DEBUG("replication ~p already running at ~p", [RepId, Pid]),
Pid;
+ {error, running} ->
+ Children = supervisor:which_children(couch_rep_sup),
+ {value, {_, Pid, _, _}} = lists:keysearch(Replicator, 1, Children),
+ Pid;
+ % sadly both seem to be needed. I don't know why.
{error, {{db_not_found, DbUrl}, _}} ->
- throw({db_not_found, <<"could not open ", DbUrl/binary>>})
+ throw({db_not_found, <<"could not open ", DbUrl/binary>>});
+ {error, {db_not_found, DbUrl}} ->
+ throw({db_not_found, <<"could not open ", DbUrl/binary>>});
+ {error, {node_not_connected, Node}} ->
+ throw({node_not_connected, Node})
end.
+start_child(Replicator) ->
+ supervisor:start_child(couch_rep_sup, Replicator).
+
+restart_child(Replicator) ->
+ supervisor:restart_child(couch_rep_sup, element(1, Replicator)).
+
compare_replication_logs(SrcDoc, TgtDoc) ->
#doc{body={RepRecProps}} = SrcDoc,
#doc{body={RepRecPropsTgt}} = TgtDoc,
@@ -355,8 +374,8 @@ close_db(Db) ->
dbname(#http_db{url = Url}) ->
strip_password(Url);
-dbname(#db{name = Name}) ->
- Name.
+dbname(#db{name = Name, main_pid = MainPid}) ->
+ ?l2b([Name, " (", pid_to_list(MainPid), ")"]).
strip_password(Url) ->
re:replace(Url,
@@ -457,7 +476,12 @@ maybe_append_options(Options, Props) ->
make_replication_id({Props}, UserCtx) ->
%% funky algorithm to preserve backwards compatibility
- {ok, HostName} = inet:gethostname(),
+ case couch_util:get_value(<<"use_hostname">>, Props, false) of
+ true ->
+ {ok, HostName} = inet:gethostname();
+ false ->
+ HostName = couch_config:get("replication", "hostname", "cloudant.com")
+ end,
% Port = mochiweb_socket_server:get(couch_httpd, port),
Src = get_rep_endpoint(UserCtx, couch_util:get_value(<<"source">>, Props)),
Tgt = get_rep_endpoint(UserCtx, couch_util:get_value(<<"target">>, Props)),
@@ -480,15 +504,22 @@ make_replication_id({Props}, UserCtx) ->
maybe_add_trailing_slash(Url) ->
re:replace(Url, "[^/]$", "&/", [{return, list}]).
-get_rep_endpoint(_UserCtx, {Props}) ->
- Url = maybe_add_trailing_slash(couch_util:get_value(<<"url">>, Props)),
- {BinHeaders} = couch_util:get_value(<<"headers">>, Props, {[]}),
- {Auth} = couch_util:get_value(<<"auth">>, Props, {[]}),
- case couch_util:get_value(<<"oauth">>, Auth) of
+get_rep_endpoint(UserCtx, {Props}) ->
+ case couch_util:get_value(<<"url">>, Props) of
undefined ->
- {remote, Url, [{?b2l(K),?b2l(V)} || {K,V} <- BinHeaders]};
- {OAuth} ->
- {remote, Url, [{?b2l(K),?b2l(V)} || {K,V} <- BinHeaders], OAuth}
+ Node = couch_util:get_value(<<"node">>, Props),
+ Name = couch_util:get_value(<<"name">>, Props),
+ {Node, Name, UserCtx};
+ RawUrl ->
+ Url = maybe_add_trailing_slash(RawUrl),
+ {BinHeaders} = couch_util:get_value(<<"headers">>, Props, {[]}),
+ {Auth} = couch_util:get_value(<<"auth">>, Props, {[]}),
+ case couch_util:get_value(<<"oauth">>, Auth) of
+ undefined ->
+ {remote, Url, [{?b2l(K),?b2l(V)} || {K,V} <- BinHeaders]};
+ {OAuth} ->
+ {remote, Url, [{?b2l(K),?b2l(V)} || {K,V} <- BinHeaders], OAuth}
+ end
end;
get_rep_endpoint(_UserCtx, <<"http://",_/binary>>=Url) ->
{remote, maybe_add_trailing_slash(Url), []};
@@ -502,27 +533,43 @@ open_replication_log(#http_db{}=Db, RepId) ->
Req = Db#http_db{resource=couch_util:url_encode(DocId)},
case couch_rep_httpc:request(Req) of
{[{<<"error">>, _}, {<<"reason">>, _}]} ->
- ?LOG_DEBUG("didn't find a replication log for ~s", [Db#http_db.url]),
+ % ?LOG_DEBUG("didn't find a replication log for ~s", [Db#http_db.url]),
#doc{id=?l2b(DocId)};
Doc ->
- ?LOG_DEBUG("found a replication log for ~s", [Db#http_db.url]),
+ % ?LOG_DEBUG("found a replication log for ~s", [Db#http_db.url]),
couch_doc:from_json_obj(Doc)
end;
open_replication_log(Db, RepId) ->
DocId = ?l2b(?LOCAL_DOC_PREFIX ++ RepId),
case couch_db:open_doc(Db, DocId, []) of
{ok, Doc} ->
- ?LOG_DEBUG("found a replication log for ~s", [Db#db.name]),
+ % ?LOG_DEBUG("found a replication log for ~s", [Db#db.name]),
Doc;
_ ->
- ?LOG_DEBUG("didn't find a replication log for ~s", [Db#db.name]),
+ % ?LOG_DEBUG("didn't find a replication log for ~s", [Db#db.name]),
#doc{id=DocId}
end.
open_db(Props, UserCtx, ProxyParams) ->
open_db(Props, UserCtx, ProxyParams, false).
-open_db({Props}, _UserCtx, ProxyParams, CreateTarget) ->
+open_db(<<"http://",_/binary>>=Url, _, ProxyParams, Create) ->
+ open_remote_db({[{<<"url">>,Url}]}, ProxyParams, Create);
+open_db(<<"https://",_/binary>>=Url, _, ProxyParams, Create) ->
+ open_remote_db({[{<<"url">>,Url}]}, ProxyParams, Create);
+open_db({Props}, UserCtx, ProxyParams, Create) ->
+ case couch_util:get_value(<<"url">>, Props) of
+ undefined ->
+ Node = couch_util:get_value(<<"node">>, Props, node()),
+ DbName = couch_util:get_value(<<"name">>, Props),
+ open_local_db(Node, DbName, UserCtx, Create);
+ _Url ->
+ open_remote_db({Props}, ProxyParams, Create)
+ end;
+open_db(<<DbName/binary>>, UserCtx, _ProxyParams, Create) ->
+ open_local_db(node(), DbName, UserCtx, Create).
+
+open_remote_db({Props}, ProxyParams, CreateTarget) ->
Url = maybe_add_trailing_slash(couch_util:get_value(<<"url">>, Props)),
{AuthProps} = couch_util:get_value(<<"auth">>, Props, {[]}),
{BinHeaders} = couch_util:get_value(<<"headers">>, Props, {[]}),
@@ -534,24 +581,32 @@ open_db({Props}, _UserCtx, ProxyParams, CreateTarget) ->
headers = lists:ukeymerge(1, Headers, DefaultHeaders)
},
Db = Db1#http_db{options = Db1#http_db.options ++ ProxyParams},
- couch_rep_httpc:db_exists(Db, CreateTarget);
-open_db(<<"http://",_/binary>>=Url, _, ProxyParams, CreateTarget) ->
- open_db({[{<<"url">>,Url}]}, [], ProxyParams, CreateTarget);
-open_db(<<"https://",_/binary>>=Url, _, ProxyParams, CreateTarget) ->
- open_db({[{<<"url">>,Url}]}, [], ProxyParams, CreateTarget);
-open_db(<<DbName/binary>>, UserCtx, _ProxyParams, CreateTarget) ->
- case CreateTarget of
- true ->
- ok = couch_httpd:verify_is_server_admin(UserCtx),
- couch_server:create(DbName, [{user_ctx, UserCtx}]);
- false -> ok
- end,
-
- case couch_db:open(DbName, [{user_ctx, UserCtx}]) of
- {ok, Db} ->
+ couch_rep_httpc:db_exists(Db, CreateTarget).
+
+open_local_db(Node, DbName, UserCtx, Create) when is_binary(Node) ->
+ try open_local_db(list_to_existing_atom(?b2l(Node)), DbName, UserCtx, Create)
+ catch error:badarg ->
+ ?LOG_ERROR("unknown replication node ~s", [Node]),
+ throw({node_not_connected, Node}) end;
+open_local_db(Node, DbName, UserCtx, Create) when is_atom(Node) ->
+ case catch gen_server:call({couch_server, Node}, {open, DbName, []}, infinity) of
+ {ok, #db{} = Db} ->
+ couch_db:monitor(Db),
+ Db#db{fd_monitor = erlang:monitor(process, Db#db.fd)};
+ {ok, MainPid} when is_pid(MainPid) ->
+ {ok, Db} = couch_db:open_ref_counted(MainPid, UserCtx),
couch_db:monitor(Db),
Db;
- {not_found, no_db_file} -> throw({db_not_found, DbName})
+ {not_found, no_db_file} when Create =:= false->
+ throw({db_not_found, DbName});
+ {not_found, no_db_file} ->
+ ok = couch_httpd:verify_is_server_admin(UserCtx),
+ couch_server:create(DbName, [{user_ctx, UserCtx}]);
+ {'EXIT', {{nodedown, Node}, _Stack}} ->
+ throw({node_not_connected, couch_util:to_binary(Node)});
+ {'EXIT', {noproc, {gen_server,call,_}}} ->
+ timer:sleep(1000),
+ throw({noproc, couch_server, Node})
end.
schedule_checkpoint(#state{checkpoint_scheduled = nil} = State) ->
@@ -582,9 +637,14 @@ do_checkpoint(State) ->
} = State,
case commit_to_both(Source, Target, NewSeqNum) of
{SrcInstanceStartTime, TgtInstanceStartTime} ->
- ?LOG_INFO("recording a checkpoint for ~s -> ~s at source update_seq ~p",
- [dbname(Source), dbname(Target), NewSeqNum]),
- SessionId = couch_uuids:random(),
+ ?LOG_DEBUG("recording a checkpoint for ~s -> ~s at source update_seq ~p"
+ " of ~p", [dbname(Source), dbname(Target), NewSeqNum, seqnum(Source)]),
+ SessionId = couch_uuids:new(),
+ TargetNode = case Target of #db{main_pid=MainPid} ->
+ erlang:node(MainPid);
+ _ ->
+ http
+ end,
NewHistoryEntry = {[
{<<"session_id">>, SessionId},
{<<"start_time">>, list_to_binary(ReplicationStartTime)},
@@ -603,6 +663,7 @@ do_checkpoint(State) ->
NewRepHistory = {[
{<<"session_id">>, SessionId},
{<<"source_last_seq">>, NewSeqNum},
+ {<<"target_node">>, TargetNode},
{<<"history">>, lists:sublist([NewHistoryEntry | OldHistory], 50)}
]},
@@ -622,7 +683,9 @@ do_checkpoint(State) ->
"yourself?)", []),
State
end;
- _Else ->
+ Else ->
+ ?LOG_INFO("wanted ~p, got ~p from commit_to_both", [
+ {SrcInstanceStartTime, TgtInstanceStartTime}, Else]),
?LOG_INFO("rebooting ~s -> ~s from last known replication checkpoint",
[dbname(Source), dbname(Target)]),
#state{
@@ -654,7 +717,12 @@ commit_to_both(Source, Target, RequiredSeq) ->
{SrcCommitPid, Timestamp} ->
Timestamp;
{'EXIT', SrcCommitPid, {http_request_failed, _}} ->
- exit(replication_link_failure)
+ nil;
+ {'EXIT', SrcCommitPid, {noproc, {gen_server, call, [_]}}} ->
+ nil; % DB crashed, this should trigger a reboot
+ {'EXIT', SrcCommitPid, Else} ->
+ ?LOG_ERROR("new error code for crashed replication commit ~p", [Else]),
+ nil
end,
{SourceStartTime, TargetStartTime}.
@@ -667,12 +735,13 @@ ensure_full_commit(#http_db{headers = Headers} = Target) ->
{ResultProps} = couch_rep_httpc:request(Req),
true = couch_util:get_value(<<"ok">>, ResultProps),
couch_util:get_value(<<"instance_start_time">>, ResultProps);
-ensure_full_commit(Target) ->
- {ok, NewDb} = couch_db:open_int(Target#db.name, []),
+ensure_full_commit(#db{name=DbName, main_pid=Pid} = Target) ->
+ TargetNode = erlang:node(Pid),
+ {ok, NewDb} = rpc:call(TargetNode, couch_db, open_int, [DbName, []]),
UpdateSeq = couch_db:get_update_seq(Target),
CommitSeq = couch_db:get_committed_update_seq(NewDb),
InstanceStartTime = NewDb#db.instance_start_time,
- couch_db:close(NewDb),
+ catch couch_db:close(NewDb),
if UpdateSeq > CommitSeq ->
?LOG_DEBUG("target needs a full commit: update ~p commit ~p",
[UpdateSeq, CommitSeq]),
@@ -732,6 +801,11 @@ up_to_date(Source, Seq) ->
couch_db:close(NewDb),
T.
+seqnum(#http_db{}) ->
+ -1;
+seqnum(Db) ->
+ Db#db.update_seq.
+
parse_proxy_params(ProxyUrl) when is_binary(ProxyUrl) ->
parse_proxy_params(?b2l(ProxyUrl));
parse_proxy_params([]) ->