diff options
author | Adam Kocoloski <adam@cloudant.com> | 2010-08-18 11:51:03 -0400 |
---|---|---|
committer | Adam Kocoloski <adam@cloudant.com> | 2010-08-18 14:24:57 -0400 |
commit | 7393d62b7b630bee50f609d0ae8125d33f7cda2b (patch) | |
tree | 754e9ab17a586319c562de488e60056feff60bb8 /apps/couch/src/couch_rep.erl | |
parent | c0cb2625f25a2b51485c164bea1d8822f449ce14 (diff) |
Grab bag of Cloudant patches to couch OTP application
- Removal of couch_db and couch_ref_counter processes. Active DBs are
accessible through a protected ets table owned by couch_server.
- #full_doc_info{} in by_id and by_seq trees for faster compaction at the
expense of more disk usage afterwards. Proposed as COUCHDB-738 but not
accepted upstream.
- Replication via distributed Erlang.
- Better hot upgrade support (uses exported functions much more often).
- Configurable btree chunk sizes allow for larger (but still bounded)
reductions.
- Shorter names for btree fields in #db{} and #db_header{}.
- couch_view_group does not keep a reference to the #db{}.
- Terms are stored compressed (again).
Diffstat (limited to 'apps/couch/src/couch_rep.erl')
-rw-r--r-- | apps/couch/src/couch_rep.erl | 214 |
1 files changed, 144 insertions, 70 deletions
diff --git a/apps/couch/src/couch_rep.erl b/apps/couch/src/couch_rep.erl index 65573e8c..126639e0 100644 --- a/apps/couch/src/couch_rep.erl +++ b/apps/couch/src/couch_rep.erl @@ -15,7 +15,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([replicate/2, checkpoint/1]). +-export([replicate/2, checkpoint/1, start_link/3]). -include("couch_db.hrl"). @@ -49,6 +49,9 @@ doc_ids = nil }). +start_link(Id, PostBody, UserCtx) -> + gen_server:start_link(?MODULE, [Id, PostBody, UserCtx], []). + %% convenience function to do a simple replication from the shell replicate(Source, Target) when is_list(Source) -> replicate(?l2b(Source), Target); @@ -61,7 +64,7 @@ replicate(Source, Target) when is_binary(Source), is_binary(Target) -> replicate({Props}=PostBody, UserCtx) -> {BaseId, Extension} = make_replication_id(PostBody, UserCtx), Replicator = {BaseId ++ Extension, - {gen_server, start_link, [?MODULE, [BaseId, PostBody, UserCtx], []]}, + {?MODULE, start_link, [BaseId, PostBody, UserCtx]}, temporary, 1, worker, @@ -80,10 +83,15 @@ replicate({Props}=PostBody, UserCtx) -> false -> Server = start_replication_server(Replicator), - case couch_util:get_value(<<"continuous">>, Props, false) of - true -> + Continuous = couch_util:get_value(<<"continuous">>, Props, false), + Async = couch_util:get_value(<<"async">>, Props, false), + case {Continuous, Async} of + {true, _} -> {ok, {continuous, ?l2b(BaseId)}}; - false -> + {_, true} -> + spawn(fun() -> get_result(Server, PostBody, UserCtx) end), + Server; + _ -> get_result(Server, PostBody, UserCtx) end end. @@ -106,7 +114,9 @@ get_result(Server, PostBody, UserCtx) -> init(InitArgs) -> try do_init(InitArgs) - catch throw:{db_not_found, DbUrl} -> {stop, {db_not_found, DbUrl}} end. + catch _:Error -> + {stop, Error} + end. do_init([RepId, {PostProps}, UserCtx] = InitArgs) -> process_flag(trap_exit, true), @@ -211,14 +221,16 @@ handle_cast(_Msg, State) -> {noreply, State}. handle_info({missing_revs_checkpoint, SourceSeq}, State) -> - couch_task_status:update("MR Processed source update #~p", [SourceSeq]), + couch_task_status:update("MR Processed source update #~p of ~p", + [SourceSeq, seqnum(State#state.source)]), {noreply, schedule_checkpoint(State#state{committed_seq = SourceSeq})}; handle_info({writer_checkpoint, SourceSeq}, #state{committed_seq=N} = State) when SourceSeq > N -> MissingRevs = State#state.missing_revs, ok = gen_server:cast(MissingRevs, {update_committed_seq, SourceSeq}), - couch_task_status:update("W Processed source update #~p", [SourceSeq]), + couch_task_status:update("W Processed source update #~p of ~p", + [SourceSeq, seqnum(State#state.source)]), {noreply, schedule_checkpoint(State#state{committed_seq = SourceSeq})}; handle_info({writer_checkpoint, _}, State) -> {noreply, State}; @@ -227,8 +239,14 @@ handle_info({update_stats, Key, N}, State) -> ets:update_counter(State#state.stats, Key, N), {noreply, State}; -handle_info({'DOWN', _, _, _, _}, State) -> - ?LOG_INFO("replication terminating because local DB is shutting down", []), +handle_info({'DOWN', _, _, Pid, _}, State) -> + Me = node(), + case erlang:node(Pid) of + Me -> + ?LOG_INFO("replication terminating - local DB is shutting down", []); + Node -> + ?LOG_INFO("replication terminating - DB on ~p is shutting down", [Node]) + end, timer:cancel(State#state.checkpoint_scheduled), {stop, shutdown, State}; @@ -275,34 +293,35 @@ code_change(_OldVsn, State, _Extra) -> % internal funs start_replication_server(Replicator) -> - RepId = element(1, Replicator), - case supervisor:start_child(couch_rep_sup, Replicator) of + start_replication_server(Replicator, fun start_child/1). + +start_replication_server(Replicator, StartFun) -> + case StartFun(Replicator) of {ok, Pid} -> - ?LOG_INFO("starting new replication ~p at ~p", [RepId, Pid]), Pid; {error, already_present} -> - case supervisor:restart_child(couch_rep_sup, RepId) of - {ok, Pid} -> - ?LOG_INFO("starting replication ~p at ~p", [RepId, Pid]), - Pid; - {error, running} -> - %% this error occurs if multiple replicators are racing - %% each other to start and somebody else won. Just grab - %% the Pid by calling start_child again. - {error, {already_started, Pid}} = - supervisor:start_child(couch_rep_sup, Replicator), - ?LOG_DEBUG("replication ~p already running at ~p", [RepId, Pid]), - Pid; - {error, {db_not_found, DbUrl}} -> - throw({db_not_found, <<"could not open ", DbUrl/binary>>}) - end; + start_replication_server(Replicator, fun restart_child/1); {error, {already_started, Pid}} -> - ?LOG_DEBUG("replication ~p already running at ~p", [RepId, Pid]), Pid; + {error, running} -> + Children = supervisor:which_children(couch_rep_sup), + {value, {_, Pid, _, _}} = lists:keysearch(Replicator, 1, Children), + Pid; + % sadly both seem to be needed. I don't know why. {error, {{db_not_found, DbUrl}, _}} -> - throw({db_not_found, <<"could not open ", DbUrl/binary>>}) + throw({db_not_found, <<"could not open ", DbUrl/binary>>}); + {error, {db_not_found, DbUrl}} -> + throw({db_not_found, <<"could not open ", DbUrl/binary>>}); + {error, {node_not_connected, Node}} -> + throw({node_not_connected, Node}) end. +start_child(Replicator) -> + supervisor:start_child(couch_rep_sup, Replicator). + +restart_child(Replicator) -> + supervisor:restart_child(couch_rep_sup, element(1, Replicator)). + compare_replication_logs(SrcDoc, TgtDoc) -> #doc{body={RepRecProps}} = SrcDoc, #doc{body={RepRecPropsTgt}} = TgtDoc, @@ -355,8 +374,8 @@ close_db(Db) -> dbname(#http_db{url = Url}) -> strip_password(Url); -dbname(#db{name = Name}) -> - Name. +dbname(#db{name = Name, main_pid = MainPid}) -> + ?l2b([Name, " (", pid_to_list(MainPid), ")"]). strip_password(Url) -> re:replace(Url, @@ -457,7 +476,12 @@ maybe_append_options(Options, Props) -> make_replication_id({Props}, UserCtx) -> %% funky algorithm to preserve backwards compatibility - {ok, HostName} = inet:gethostname(), + case couch_util:get_value(<<"use_hostname">>, Props, false) of + true -> + {ok, HostName} = inet:gethostname(); + false -> + HostName = couch_config:get("replication", "hostname", "cloudant.com") + end, % Port = mochiweb_socket_server:get(couch_httpd, port), Src = get_rep_endpoint(UserCtx, couch_util:get_value(<<"source">>, Props)), Tgt = get_rep_endpoint(UserCtx, couch_util:get_value(<<"target">>, Props)), @@ -480,15 +504,22 @@ make_replication_id({Props}, UserCtx) -> maybe_add_trailing_slash(Url) -> re:replace(Url, "[^/]$", "&/", [{return, list}]). -get_rep_endpoint(_UserCtx, {Props}) -> - Url = maybe_add_trailing_slash(couch_util:get_value(<<"url">>, Props)), - {BinHeaders} = couch_util:get_value(<<"headers">>, Props, {[]}), - {Auth} = couch_util:get_value(<<"auth">>, Props, {[]}), - case couch_util:get_value(<<"oauth">>, Auth) of +get_rep_endpoint(UserCtx, {Props}) -> + case couch_util:get_value(<<"url">>, Props) of undefined -> - {remote, Url, [{?b2l(K),?b2l(V)} || {K,V} <- BinHeaders]}; - {OAuth} -> - {remote, Url, [{?b2l(K),?b2l(V)} || {K,V} <- BinHeaders], OAuth} + Node = couch_util:get_value(<<"node">>, Props), + Name = couch_util:get_value(<<"name">>, Props), + {Node, Name, UserCtx}; + RawUrl -> + Url = maybe_add_trailing_slash(RawUrl), + {BinHeaders} = couch_util:get_value(<<"headers">>, Props, {[]}), + {Auth} = couch_util:get_value(<<"auth">>, Props, {[]}), + case couch_util:get_value(<<"oauth">>, Auth) of + undefined -> + {remote, Url, [{?b2l(K),?b2l(V)} || {K,V} <- BinHeaders]}; + {OAuth} -> + {remote, Url, [{?b2l(K),?b2l(V)} || {K,V} <- BinHeaders], OAuth} + end end; get_rep_endpoint(_UserCtx, <<"http://",_/binary>>=Url) -> {remote, maybe_add_trailing_slash(Url), []}; @@ -502,27 +533,43 @@ open_replication_log(#http_db{}=Db, RepId) -> Req = Db#http_db{resource=couch_util:url_encode(DocId)}, case couch_rep_httpc:request(Req) of {[{<<"error">>, _}, {<<"reason">>, _}]} -> - ?LOG_DEBUG("didn't find a replication log for ~s", [Db#http_db.url]), + % ?LOG_DEBUG("didn't find a replication log for ~s", [Db#http_db.url]), #doc{id=?l2b(DocId)}; Doc -> - ?LOG_DEBUG("found a replication log for ~s", [Db#http_db.url]), + % ?LOG_DEBUG("found a replication log for ~s", [Db#http_db.url]), couch_doc:from_json_obj(Doc) end; open_replication_log(Db, RepId) -> DocId = ?l2b(?LOCAL_DOC_PREFIX ++ RepId), case couch_db:open_doc(Db, DocId, []) of {ok, Doc} -> - ?LOG_DEBUG("found a replication log for ~s", [Db#db.name]), + % ?LOG_DEBUG("found a replication log for ~s", [Db#db.name]), Doc; _ -> - ?LOG_DEBUG("didn't find a replication log for ~s", [Db#db.name]), + % ?LOG_DEBUG("didn't find a replication log for ~s", [Db#db.name]), #doc{id=DocId} end. open_db(Props, UserCtx, ProxyParams) -> open_db(Props, UserCtx, ProxyParams, false). -open_db({Props}, _UserCtx, ProxyParams, CreateTarget) -> +open_db(<<"http://",_/binary>>=Url, _, ProxyParams, Create) -> + open_remote_db({[{<<"url">>,Url}]}, ProxyParams, Create); +open_db(<<"https://",_/binary>>=Url, _, ProxyParams, Create) -> + open_remote_db({[{<<"url">>,Url}]}, ProxyParams, Create); +open_db({Props}, UserCtx, ProxyParams, Create) -> + case couch_util:get_value(<<"url">>, Props) of + undefined -> + Node = couch_util:get_value(<<"node">>, Props, node()), + DbName = couch_util:get_value(<<"name">>, Props), + open_local_db(Node, DbName, UserCtx, Create); + _Url -> + open_remote_db({Props}, ProxyParams, Create) + end; +open_db(<<DbName/binary>>, UserCtx, _ProxyParams, Create) -> + open_local_db(node(), DbName, UserCtx, Create). + +open_remote_db({Props}, ProxyParams, CreateTarget) -> Url = maybe_add_trailing_slash(couch_util:get_value(<<"url">>, Props)), {AuthProps} = couch_util:get_value(<<"auth">>, Props, {[]}), {BinHeaders} = couch_util:get_value(<<"headers">>, Props, {[]}), @@ -534,24 +581,32 @@ open_db({Props}, _UserCtx, ProxyParams, CreateTarget) -> headers = lists:ukeymerge(1, Headers, DefaultHeaders) }, Db = Db1#http_db{options = Db1#http_db.options ++ ProxyParams}, - couch_rep_httpc:db_exists(Db, CreateTarget); -open_db(<<"http://",_/binary>>=Url, _, ProxyParams, CreateTarget) -> - open_db({[{<<"url">>,Url}]}, [], ProxyParams, CreateTarget); -open_db(<<"https://",_/binary>>=Url, _, ProxyParams, CreateTarget) -> - open_db({[{<<"url">>,Url}]}, [], ProxyParams, CreateTarget); -open_db(<<DbName/binary>>, UserCtx, _ProxyParams, CreateTarget) -> - case CreateTarget of - true -> - ok = couch_httpd:verify_is_server_admin(UserCtx), - couch_server:create(DbName, [{user_ctx, UserCtx}]); - false -> ok - end, - - case couch_db:open(DbName, [{user_ctx, UserCtx}]) of - {ok, Db} -> + couch_rep_httpc:db_exists(Db, CreateTarget). + +open_local_db(Node, DbName, UserCtx, Create) when is_binary(Node) -> + try open_local_db(list_to_existing_atom(?b2l(Node)), DbName, UserCtx, Create) + catch error:badarg -> + ?LOG_ERROR("unknown replication node ~s", [Node]), + throw({node_not_connected, Node}) end; +open_local_db(Node, DbName, UserCtx, Create) when is_atom(Node) -> + case catch gen_server:call({couch_server, Node}, {open, DbName, []}, infinity) of + {ok, #db{} = Db} -> + couch_db:monitor(Db), + Db#db{fd_monitor = erlang:monitor(process, Db#db.fd)}; + {ok, MainPid} when is_pid(MainPid) -> + {ok, Db} = couch_db:open_ref_counted(MainPid, UserCtx), couch_db:monitor(Db), Db; - {not_found, no_db_file} -> throw({db_not_found, DbName}) + {not_found, no_db_file} when Create =:= false-> + throw({db_not_found, DbName}); + {not_found, no_db_file} -> + ok = couch_httpd:verify_is_server_admin(UserCtx), + couch_server:create(DbName, [{user_ctx, UserCtx}]); + {'EXIT', {{nodedown, Node}, _Stack}} -> + throw({node_not_connected, couch_util:to_binary(Node)}); + {'EXIT', {noproc, {gen_server,call,_}}} -> + timer:sleep(1000), + throw({noproc, couch_server, Node}) end. schedule_checkpoint(#state{checkpoint_scheduled = nil} = State) -> @@ -582,9 +637,14 @@ do_checkpoint(State) -> } = State, case commit_to_both(Source, Target, NewSeqNum) of {SrcInstanceStartTime, TgtInstanceStartTime} -> - ?LOG_INFO("recording a checkpoint for ~s -> ~s at source update_seq ~p", - [dbname(Source), dbname(Target), NewSeqNum]), - SessionId = couch_uuids:random(), + ?LOG_DEBUG("recording a checkpoint for ~s -> ~s at source update_seq ~p" + " of ~p", [dbname(Source), dbname(Target), NewSeqNum, seqnum(Source)]), + SessionId = couch_uuids:new(), + TargetNode = case Target of #db{main_pid=MainPid} -> + erlang:node(MainPid); + _ -> + http + end, NewHistoryEntry = {[ {<<"session_id">>, SessionId}, {<<"start_time">>, list_to_binary(ReplicationStartTime)}, @@ -603,6 +663,7 @@ do_checkpoint(State) -> NewRepHistory = {[ {<<"session_id">>, SessionId}, {<<"source_last_seq">>, NewSeqNum}, + {<<"target_node">>, TargetNode}, {<<"history">>, lists:sublist([NewHistoryEntry | OldHistory], 50)} ]}, @@ -622,7 +683,9 @@ do_checkpoint(State) -> "yourself?)", []), State end; - _Else -> + Else -> + ?LOG_INFO("wanted ~p, got ~p from commit_to_both", [ + {SrcInstanceStartTime, TgtInstanceStartTime}, Else]), ?LOG_INFO("rebooting ~s -> ~s from last known replication checkpoint", [dbname(Source), dbname(Target)]), #state{ @@ -654,7 +717,12 @@ commit_to_both(Source, Target, RequiredSeq) -> {SrcCommitPid, Timestamp} -> Timestamp; {'EXIT', SrcCommitPid, {http_request_failed, _}} -> - exit(replication_link_failure) + nil; + {'EXIT', SrcCommitPid, {noproc, {gen_server, call, [_]}}} -> + nil; % DB crashed, this should trigger a reboot + {'EXIT', SrcCommitPid, Else} -> + ?LOG_ERROR("new error code for crashed replication commit ~p", [Else]), + nil end, {SourceStartTime, TargetStartTime}. @@ -667,12 +735,13 @@ ensure_full_commit(#http_db{headers = Headers} = Target) -> {ResultProps} = couch_rep_httpc:request(Req), true = couch_util:get_value(<<"ok">>, ResultProps), couch_util:get_value(<<"instance_start_time">>, ResultProps); -ensure_full_commit(Target) -> - {ok, NewDb} = couch_db:open_int(Target#db.name, []), +ensure_full_commit(#db{name=DbName, main_pid=Pid} = Target) -> + TargetNode = erlang:node(Pid), + {ok, NewDb} = rpc:call(TargetNode, couch_db, open_int, [DbName, []]), UpdateSeq = couch_db:get_update_seq(Target), CommitSeq = couch_db:get_committed_update_seq(NewDb), InstanceStartTime = NewDb#db.instance_start_time, - couch_db:close(NewDb), + catch couch_db:close(NewDb), if UpdateSeq > CommitSeq -> ?LOG_DEBUG("target needs a full commit: update ~p commit ~p", [UpdateSeq, CommitSeq]), @@ -732,6 +801,11 @@ up_to_date(Source, Seq) -> couch_db:close(NewDb), T. +seqnum(#http_db{}) -> + -1; +seqnum(Db) -> + Db#db.update_seq. + parse_proxy_params(ProxyUrl) when is_binary(ProxyUrl) -> parse_proxy_params(?b2l(ProxyUrl)); parse_proxy_params([]) -> |