diff options
Diffstat (limited to 'apps/couch/src/couch_rep.erl')
-rw-r--r-- | apps/couch/src/couch_rep.erl | 450 |
1 files changed, 315 insertions, 135 deletions
diff --git a/apps/couch/src/couch_rep.erl b/apps/couch/src/couch_rep.erl index c804b49d..482b84dc 100644 --- a/apps/couch/src/couch_rep.erl +++ b/apps/couch/src/couch_rep.erl @@ -16,8 +16,15 @@ code_change/3]). -export([replicate/2, checkpoint/1]). +-export([ensure_rep_db_exists/0, make_replication_id/2]). +-export([start_replication/3, end_replication/1, get_result/4]). +-export([update_rep_doc/2]). -include("couch_db.hrl"). +-include("couch_js_functions.hrl"). +-include_lib("ibrowse/include/ibrowse.hrl"). + +-define(REP_ID_VERSION, 2). -record(state, { changes_feed, @@ -47,7 +54,7 @@ committed_seq = 0, stats = nil, - doc_ids = nil, + rep_doc = nil, source_db_update_notifier = nil, target_db_update_notifier = nil }). @@ -62,39 +69,51 @@ replicate(Source, Target) when is_binary(Source), is_binary(Target) -> %% function handling POST to _replicate replicate({Props}=PostBody, UserCtx) -> - {BaseId, Extension} = make_replication_id(PostBody, UserCtx), - Replicator = {BaseId ++ Extension, - {gen_server, start_link, [?MODULE, [BaseId, PostBody, UserCtx], []]}, + RepId = make_replication_id(PostBody, UserCtx), + case couch_util:get_value(<<"cancel">>, Props, false) of + true -> + end_replication(RepId); + false -> + Server = start_replication(PostBody, RepId, UserCtx), + get_result(Server, RepId, PostBody, UserCtx) + end. + +end_replication({BaseId, Extension}) -> + RepId = BaseId ++ Extension, + case supervisor:terminate_child(couch_rep_sup, RepId) of + {error, not_found} = R -> + R; + ok -> + case supervisor:delete_child(couch_rep_sup, RepId) of + ok -> + {ok, {cancelled, ?l2b(BaseId)}}; + {error, not_found} -> + {ok, {cancelled, ?l2b(BaseId)}}; + {error, _} = Error -> + Error + end + end. + +start_replication(RepDoc, {BaseId, Extension}, UserCtx) -> + Replicator = { + BaseId ++ Extension, + {gen_server, start_link, + [?MODULE, [BaseId, RepDoc, UserCtx], []]}, temporary, 1, worker, [?MODULE] }, + start_replication_server(Replicator). - case couch_util:get_value(<<"cancel">>, Props, false) of - true -> - case supervisor:terminate_child(couch_rep_sup, BaseId ++ Extension) of - {error, not_found} -> - {error, not_found}; - ok -> - ok = supervisor:delete_child(couch_rep_sup, BaseId ++ Extension), - {ok, {cancelled, ?l2b(BaseId)}} - end; - false -> - Server = start_replication_server(Replicator), +checkpoint(Server) -> + gen_server:cast(Server, do_checkpoint). +get_result(Server, {BaseId, _Extension}, {Props} = PostBody, UserCtx) -> case couch_util:get_value(<<"continuous">>, Props, false) of true -> {ok, {continuous, ?l2b(BaseId)}}; false -> - get_result(Server, PostBody, UserCtx) - end - end. - -checkpoint(Server) -> - gen_server:cast(Server, do_checkpoint). - -get_result(Server, PostBody, UserCtx) -> try gen_server:call(Server, get_result, infinity) of retry -> replicate(PostBody, UserCtx); Else -> Else @@ -105,6 +124,7 @@ get_result(Server, PostBody, UserCtx) -> exit:{normal, {gen_server, call, [Server, get_result , infinity]}} -> %% we made the call during terminate replicate(PostBody, UserCtx) + end end. init(InitArgs) -> @@ -115,13 +135,12 @@ init(InitArgs) -> {stop, Error} end. -do_init([RepId, {PostProps}, UserCtx] = InitArgs) -> +do_init([RepId, {PostProps} = RepDoc, UserCtx] = InitArgs) -> process_flag(trap_exit, true), SourceProps = couch_util:get_value(<<"source">>, PostProps), TargetProps = couch_util:get_value(<<"target">>, PostProps), - DocIds = couch_util:get_value(<<"doc_ids">>, PostProps, nil), Continuous = couch_util:get_value(<<"continuous">>, PostProps, false), CreateTarget = couch_util:get_value(<<"create_target">>, PostProps, false), @@ -133,29 +152,10 @@ do_init([RepId, {PostProps}, UserCtx] = InitArgs) -> SourceInfo = dbinfo(Source), TargetInfo = dbinfo(Target), - case DocIds of - List when is_list(List) -> - % Fast replication using only a list of doc IDs to replicate. - % Replication sessions, checkpoints and logs are not created - % since the update sequence number of the source DB is not used - % for determining which documents are copied into the target DB. - SourceLog = nil, - TargetLog = nil, + maybe_set_triggered(RepDoc, RepId), - StartSeq = nil, - History = nil, - - ChangesFeed = nil, - MissingRevs = nil, - - {ok, Reader} = - couch_rep_reader:start_link(self(), Source, DocIds, PostProps); - - _ -> - % Replication using the _changes API (DB sequence update numbers). - SourceLog = open_replication_log(Source, RepId), - TargetLog = open_replication_log(Target, RepId), - + [SourceLog, TargetLog] = find_replication_logs( + [Source, Target], RepId, {PostProps}, UserCtx), {StartSeq, History} = compare_replication_logs(SourceLog, TargetLog), {ok, ChangesFeed} = @@ -163,9 +163,7 @@ do_init([RepId, {PostProps}, UserCtx] = InitArgs) -> {ok, MissingRevs} = couch_rep_missing_revs:start_link(self(), Target, ChangesFeed, PostProps), {ok, Reader} = - couch_rep_reader:start_link(self(), Source, MissingRevs, PostProps) - end, - + couch_rep_reader:start_link(self(), Source, MissingRevs, PostProps), {ok, Writer} = couch_rep_writer:start_link(self(), Target, Reader, PostProps), @@ -202,7 +200,7 @@ do_init([RepId, {PostProps}, UserCtx] = InitArgs) -> rep_starttime = httpd_util:rfc1123_date(), src_starttime = couch_util:get_value(instance_start_time, SourceInfo), tgt_starttime = couch_util:get_value(instance_start_time, TargetInfo), - doc_ids = DocIds, + rep_doc = RepDoc, source_db_update_notifier = source_db_update_notifier(Source), target_db_update_notifier = target_db_update_notifier(Target) }, @@ -275,23 +273,26 @@ handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}. terminate(normal, #state{checkpoint_scheduled=nil} = State) -> - do_terminate(State); + do_terminate(State), + update_rep_doc( + State#state.rep_doc, [{<<"_replication_state">>, <<"completed">>}]); terminate(normal, State) -> timer:cancel(State#state.checkpoint_scheduled), - do_terminate(do_checkpoint(State)); + do_terminate(do_checkpoint(State)), + update_rep_doc( + State#state.rep_doc, [{<<"_replication_state">>, <<"completed">>}]); -terminate(Reason, State) -> - #state{ - listeners = Listeners, - source = Source, - target = Target, - stats = Stats - } = State, +terminate(shutdown, #state{listeners = Listeners} = State) -> + % continuous replication stopped + [gen_server:reply(L, {ok, stopped}) || L <- Listeners], + terminate_cleanup(State); + +terminate(Reason, #state{listeners = Listeners} = State) -> [gen_server:reply(L, {error, Reason}) || L <- Listeners], - ets:delete(Stats), - close_db(Target), - close_db(Source). + terminate_cleanup(State), + update_rep_doc( + State#state.rep_doc, [{<<"_replication_state">>, <<"error">>}]). code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -321,7 +322,14 @@ start_replication_server(Replicator) -> throw({db_not_found, <<"could not open ", DbUrl/binary>>}); {error, {unauthorized, DbUrl}} -> throw({unauthorized, - <<"unauthorized to access database ", DbUrl/binary>>}) + <<"unauthorized to access database ", DbUrl/binary>>}); + {error, {'EXIT', {badarg, + [{erlang, apply, [gen_server, start_link, undefined]} | _]}}} -> + % Clause to deal with a change in the supervisor module introduced + % in R14B02. For more details consult the thread at: + % http://erlang.org/pipermail/erlang-bugs/2011-March/002273.html + _ = supervisor:delete_child(couch_rep_sup, RepId), + start_replication_server(Replicator) end; {error, {already_started, Pid}} -> ?LOG_DEBUG("replication ~p already running at ~p", [RepId, Pid]), @@ -390,30 +398,11 @@ dbname(#db{name = Name}) -> dbinfo(#http_db{} = Db) -> {DbProps} = couch_rep_httpc:request(Db), - [{list_to_existing_atom(?b2l(K)), V} || {K,V} <- DbProps]; + [{couch_util:to_existing_atom(K), V} || {K,V} <- DbProps]; dbinfo(Db) -> {ok, Info} = couch_db:get_db_info(Db), Info. -do_terminate(#state{doc_ids=DocIds} = State) when is_list(DocIds) -> - #state{ - listeners = Listeners, - rep_starttime = ReplicationStartTime, - stats = Stats - } = State, - - RepByDocsJson = {[ - {<<"start_time">>, ?l2b(ReplicationStartTime)}, - {<<"end_time">>, ?l2b(httpd_util:rfc1123_date())}, - {<<"docs_read">>, ets:lookup_element(Stats, docs_read, 2)}, - {<<"docs_written">>, ets:lookup_element(Stats, docs_written, 2)}, - {<<"doc_write_failures">>, - ets:lookup_element(Stats, doc_write_failures, 2)} - ]}, - - terminate_cleanup(State), - [gen_server:reply(L, {ok, RepByDocsJson}) || L <- lists:reverse(Listeners)]; - do_terminate(State) -> #state{ checkpoint_history = CheckpointHistory, @@ -475,7 +464,7 @@ has_session_id(SessionId, [{Props} | Rest]) -> has_session_id(SessionId, Rest) end. -maybe_append_options(Options, Props) -> +maybe_append_options(Options, {Props}) -> lists:foldl(fun(Option, Acc) -> Acc ++ case couch_util:get_value(Option, Props, false) of @@ -486,13 +475,29 @@ maybe_append_options(Options, Props) -> end end, [], Options). -make_replication_id({Props}, UserCtx) -> - %% funky algorithm to preserve backwards compatibility +make_replication_id(RepProps, UserCtx) -> + BaseId = make_replication_id(RepProps, UserCtx, ?REP_ID_VERSION), + Extension = maybe_append_options( + [<<"continuous">>, <<"create_target">>], RepProps), + {BaseId, Extension}. + +% Versioned clauses for generating replication ids +% If a change is made to how replications are identified +% add a new clause and increase ?REP_ID_VERSION at the top +make_replication_id({Props}, UserCtx, 2) -> + {ok, HostName} = inet:gethostname(), + Port = mochiweb_socket_server:get(couch_httpd, port), + Src = get_rep_endpoint(UserCtx, couch_util:get_value(<<"source">>, Props)), + Tgt = get_rep_endpoint(UserCtx, couch_util:get_value(<<"target">>, Props)), + maybe_append_filters({Props}, [HostName, Port, Src, Tgt], UserCtx); +make_replication_id({Props}, UserCtx, 1) -> {ok, HostName} = inet:gethostname(), - % Port = mochiweb_socket_server:get(couch_httpd, port), Src = get_rep_endpoint(UserCtx, couch_util:get_value(<<"source">>, Props)), Tgt = get_rep_endpoint(UserCtx, couch_util:get_value(<<"target">>, Props)), - Base = [HostName, Src, Tgt] ++ + maybe_append_filters({Props}, [HostName, Src, Tgt], UserCtx). + +maybe_append_filters({Props}, Base, UserCtx) -> + Base2 = Base ++ case couch_util:get_value(<<"filter">>, Props) of undefined -> case couch_util:get_value(<<"doc_ids">>, Props) of @@ -502,11 +507,26 @@ make_replication_id({Props}, UserCtx) -> [DocIds] end; Filter -> - [Filter, couch_util:get_value(<<"query_params">>, Props, {[]})] + [filter_code(Filter, Props, UserCtx), + couch_util:get_value(<<"query_params">>, Props, {[]})] end, - Extension = maybe_append_options( - [<<"continuous">>, <<"create_target">>], Props), - {couch_util:to_hex(couch_util:md5(term_to_binary(Base))), Extension}. + couch_util:to_hex(couch_util:md5(term_to_binary(Base2))). + +filter_code(Filter, Props, UserCtx) -> + {match, [DDocName, FilterName]} = + re:run(Filter, "(.*?)/(.*)", [{capture, [1, 2], binary}]), + ProxyParams = parse_proxy_params( + couch_util:get_value(<<"proxy">>, Props, [])), + Source = open_db( + couch_util:get_value(<<"source">>, Props), UserCtx, ProxyParams), + try + {ok, DDoc} = open_doc(Source, <<"_design/", DDocName/binary>>), + Code = couch_util:get_nested_json_value( + DDoc#doc.body, [<<"filters">>, FilterName]), + re:replace(Code, "^\s*(.*?)\s*$", "\\1", [{return, binary}]) + after + close_db(Source) + end. maybe_add_trailing_slash(Url) -> re:replace(Url, "[^/]$", "&/", [{return, list}]). @@ -528,27 +548,54 @@ get_rep_endpoint(_UserCtx, <<"https://",_/binary>>=Url) -> get_rep_endpoint(UserCtx, <<DbName/binary>>) -> {local, DbName, UserCtx}. -open_replication_log(#http_db{}=Db, RepId) -> - DocId = ?LOCAL_DOC_PREFIX ++ RepId, - Req = Db#http_db{resource=couch_util:url_encode(DocId)}, +find_replication_logs(DbList, RepId, RepProps, UserCtx) -> + LogId = ?l2b(?LOCAL_DOC_PREFIX ++ RepId), + fold_replication_logs(DbList, ?REP_ID_VERSION, + LogId, LogId, RepProps, UserCtx, []). + +% Accumulate the replication logs +% Falls back to older log document ids and migrates them +fold_replication_logs([], _Vsn, _LogId, _NewId, _RepProps, _UserCtx, Acc) -> + lists:reverse(Acc); +fold_replication_logs([Db|Rest]=Dbs, Vsn, LogId, NewId, + RepProps, UserCtx, Acc) -> + case open_replication_log(Db, LogId) of + {error, not_found} when Vsn > 1 -> + OldRepId = make_replication_id(RepProps, UserCtx, Vsn - 1), + fold_replication_logs(Dbs, Vsn - 1, + ?l2b(?LOCAL_DOC_PREFIX ++ OldRepId), NewId, RepProps, UserCtx, Acc); + {error, not_found} -> + fold_replication_logs(Rest, ?REP_ID_VERSION, NewId, NewId, + RepProps, UserCtx, [#doc{id=NewId}|Acc]); + {ok, Doc} when LogId =:= NewId -> + fold_replication_logs(Rest, ?REP_ID_VERSION, NewId, NewId, + RepProps, UserCtx, [Doc|Acc]); + {ok, Doc} -> + MigratedLog = #doc{id=NewId,body=Doc#doc.body}, + fold_replication_logs(Rest, ?REP_ID_VERSION, NewId, NewId, + RepProps, UserCtx, [MigratedLog|Acc]) + end. + +open_replication_log(Db, DocId) -> + case open_doc(Db, DocId) of + {ok, Doc} -> + ?LOG_DEBUG("found a replication log for ~s", [dbname(Db)]), + {ok, Doc}; + _ -> + ?LOG_DEBUG("didn't find a replication log for ~s", [dbname(Db)]), + {error, not_found} + end. + +open_doc(#http_db{} = Db, DocId) -> + Req = Db#http_db{resource = couch_util:encode_doc_id(DocId)}, case couch_rep_httpc:request(Req) of {[{<<"error">>, _}, {<<"reason">>, _}]} -> - ?LOG_DEBUG("didn't find a replication log for ~s", [Db#http_db.url]), - #doc{id=?l2b(DocId)}; + {error, not_found}; Doc -> - ?LOG_DEBUG("found a replication log for ~s", [Db#http_db.url]), - couch_doc:from_json_obj(Doc) + {ok, couch_doc:from_json_obj(Doc)} end; -open_replication_log(Db, RepId) -> - DocId = ?l2b(?LOCAL_DOC_PREFIX ++ RepId), - case couch_db:open_doc(Db, DocId, []) of - {ok, Doc} -> - ?LOG_DEBUG("found a replication log for ~s", [Db#db.name]), - Doc; - _ -> - ?LOG_DEBUG("didn't find a replication log for ~s", [Db#db.name]), - #doc{id=DocId} - end. +open_doc(Db, DocId) -> + couch_db:open_doc(Db, DocId). open_db(Props, UserCtx, ProxyParams) -> open_db(Props, UserCtx, ProxyParams, false). @@ -575,18 +622,18 @@ open_db(<<"https://",_/binary>>=Url, _, ProxyParams, CreateTarget) -> open_db({[{<<"url">>,Url}]}, [], ProxyParams, CreateTarget); open_db(<<DbName/binary>>, UserCtx, _ProxyParams, CreateTarget) -> try - case CreateTarget of - true -> - ok = couch_httpd:verify_is_server_admin(UserCtx), - couch_server:create(DbName, [{user_ctx, UserCtx}]); + case CreateTarget of + true -> + ok = couch_httpd:verify_is_server_admin(UserCtx), + couch_server:create(DbName, [{user_ctx, UserCtx}]); false -> ok - end, + end, - case couch_db:open(DbName, [{user_ctx, UserCtx}]) of - {ok, Db} -> - couch_db:monitor(Db), - Db; + case couch_db:open(DbName, [{user_ctx, UserCtx}]) of + {ok, Db} -> + couch_db:monitor(Db), + Db; {not_found, no_db_file} -> throw({db_not_found, DbName}) end @@ -619,32 +666,54 @@ do_checkpoint(State) -> rep_starttime = ReplicationStartTime, src_starttime = SrcInstanceStartTime, tgt_starttime = TgtInstanceStartTime, - stats = Stats + stats = Stats, + rep_doc = {RepDoc} } = State, case commit_to_both(Source, Target, NewSeqNum) of {SrcInstanceStartTime, TgtInstanceStartTime} -> ?LOG_INFO("recording a checkpoint for ~s -> ~s at source update_seq ~p", [dbname(Source), dbname(Target), NewSeqNum]), + EndTime = ?l2b(httpd_util:rfc1123_date()), + StartTime = ?l2b(ReplicationStartTime), + DocsRead = ets:lookup_element(Stats, docs_read, 2), + DocsWritten = ets:lookup_element(Stats, docs_written, 2), + DocWriteFailures = ets:lookup_element(Stats, doc_write_failures, 2), NewHistoryEntry = {[ {<<"session_id">>, SessionId}, - {<<"start_time">>, list_to_binary(ReplicationStartTime)}, - {<<"end_time">>, list_to_binary(httpd_util:rfc1123_date())}, + {<<"start_time">>, StartTime}, + {<<"end_time">>, EndTime}, {<<"start_last_seq">>, StartSeqNum}, {<<"end_last_seq">>, NewSeqNum}, {<<"recorded_seq">>, NewSeqNum}, {<<"missing_checked">>, ets:lookup_element(Stats, total_revs, 2)}, {<<"missing_found">>, ets:lookup_element(Stats, missing_revs, 2)}, - {<<"docs_read">>, ets:lookup_element(Stats, docs_read, 2)}, - {<<"docs_written">>, ets:lookup_element(Stats, docs_written, 2)}, - {<<"doc_write_failures">>, - ets:lookup_element(Stats, doc_write_failures, 2)} + {<<"docs_read">>, DocsRead}, + {<<"docs_written">>, DocsWritten}, + {<<"doc_write_failures">>, DocWriteFailures} ]}, - % limit history to 50 entries - NewRepHistory = {[ + BaseHistory = [ {<<"session_id">>, SessionId}, {<<"source_last_seq">>, NewSeqNum}, - {<<"history">>, lists:sublist([NewHistoryEntry | OldHistory], 50)} - ]}, + {<<"replication_id_version">>, ?REP_ID_VERSION} + ] ++ case couch_util:get_value(<<"doc_ids">>, RepDoc) of + undefined -> + []; + DocIds when is_list(DocIds) -> + % backwards compatibility with the result of a replication by + % doc IDs in versions 0.11.x and 1.0.x + [ + {<<"start_time">>, StartTime}, + {<<"end_time">>, EndTime}, + {<<"docs_read">>, DocsRead}, + {<<"docs_written">>, DocsWritten}, + {<<"doc_write_failures">>, DocWriteFailures} + ] + end, + % limit history to 50 entries + NewRepHistory = { + BaseHistory ++ + [{<<"history">>, lists:sublist([NewHistoryEntry | OldHistory], 50)}] + }, try {SrcRevPos,SrcRevId} = @@ -760,9 +829,9 @@ ensure_full_commit(Source, RequiredSeq) -> InstanceStartTime end. -update_local_doc(#http_db{} = Db, #doc{id=DocId} = Doc) -> +update_local_doc(#http_db{} = Db, Doc) -> Req = Db#http_db{ - resource = couch_util:url_encode(DocId), + resource = couch_util:encode_doc_id(Doc), method = put, body = couch_doc:to_json_obj(Doc, [attachments]), headers = [{"x-couch-full-commit", "false"} | Db#http_db.headers] @@ -787,9 +856,13 @@ parse_proxy_params(ProxyUrl) when is_binary(ProxyUrl) -> parse_proxy_params([]) -> []; parse_proxy_params(ProxyUrl) -> - {url, _, Base, Port, User, Passwd, _Path, _Proto} = - ibrowse_lib:parse_url(ProxyUrl), - [{proxy_host, Base}, {proxy_port, Port}] ++ + #url{ + host = Host, + port = Port, + username = User, + password = Passwd + } = ibrowse_lib:parse_url(ProxyUrl), + [{proxy_host, Host}, {proxy_port, Port}] ++ case is_list(User) andalso is_list(Passwd) of false -> []; @@ -797,6 +870,113 @@ parse_proxy_params(ProxyUrl) -> [{proxy_user, User}, {proxy_password, Passwd}] end. +update_rep_doc({Props} = _RepDoc, KVs) -> + case couch_util:get_value(<<"_id">>, Props) of + undefined -> + % replication triggered by POSTing to _replicate/ + ok; + RepDocId -> + % replication triggered by adding a Rep Doc to the replicator DB + {ok, RepDb} = ensure_rep_db_exists(), + case couch_db:open_doc(RepDb, RepDocId, []) of + {ok, LatestRepDoc} -> + update_rep_doc(RepDb, LatestRepDoc, KVs); + _ -> + ok + end, + couch_db:close(RepDb) + end. + +update_rep_doc(RepDb, #doc{body = {RepDocBody}} = RepDoc, KVs) -> + NewRepDocBody = lists:foldl( + fun({<<"_replication_state">> = K, State} = KV, Body) -> + case couch_util:get_value(K, Body) of + State -> + Body; + _ -> + Body1 = lists:keystore(K, 1, Body, KV), + lists:keystore( + <<"_replication_state_time">>, 1, + Body1, {<<"_replication_state_time">>, timestamp()}) + end; + ({K, _V} = KV, Body) -> + lists:keystore(K, 1, Body, KV) + end, + RepDocBody, + KVs + ), + case NewRepDocBody of + RepDocBody -> + ok; + _ -> + % might not succeed - when the replication doc is deleted right + % before this update (not an error) + couch_db:update_doc(RepDb, RepDoc#doc{body = {NewRepDocBody}}, []) + end. + +% RFC3339 timestamps. +% Note: doesn't include the time seconds fraction (RFC3339 says it's optional). +timestamp() -> + {{Year, Month, Day}, {Hour, Min, Sec}} = calendar:now_to_local_time(now()), + UTime = erlang:universaltime(), + LocalTime = calendar:universal_time_to_local_time(UTime), + DiffSecs = calendar:datetime_to_gregorian_seconds(LocalTime) - + calendar:datetime_to_gregorian_seconds(UTime), + zone(DiffSecs div 3600, (DiffSecs rem 3600) div 60), + iolist_to_binary( + io_lib:format("~4..0w-~2..0w-~2..0wT~2..0w:~2..0w:~2..0w~s", + [Year, Month, Day, Hour, Min, Sec, + zone(DiffSecs div 3600, (DiffSecs rem 3600) div 60)])). + +zone(Hr, Min) when Hr >= 0, Min >= 0 -> + io_lib:format("+~2..0w:~2..0w", [Hr, Min]); +zone(Hr, Min) -> + io_lib:format("-~2..0w:~2..0w", [abs(Hr), abs(Min)]). + + +maybe_set_triggered({RepProps} = RepDoc, RepId) -> + case couch_util:get_value(<<"_replication_state">>, RepProps) of + <<"triggered">> -> + ok; + _ -> + update_rep_doc( + RepDoc, + [ + {<<"_replication_state">>, <<"triggered">>}, + {<<"_replication_id">>, ?l2b(RepId)} + ] + ) + end. + +ensure_rep_db_exists() -> + DbName = ?l2b(couch_config:get("replicator", "db", "_replicator")), + Opts = [ + {user_ctx, #user_ctx{roles=[<<"_admin">>, <<"_replicator">>]}}, + sys_db + ], + case couch_db:open(DbName, Opts) of + {ok, Db} -> + Db; + _Error -> + {ok, Db} = couch_db:create(DbName, Opts) + end, + ok = ensure_rep_ddoc_exists(Db, <<"_design/_replicator">>), + {ok, Db}. + +ensure_rep_ddoc_exists(RepDb, DDocID) -> + case couch_db:open_doc(RepDb, DDocID, []) of + {ok, _Doc} -> + ok; + _ -> + DDoc = couch_doc:from_json_obj({[ + {<<"_id">>, DDocID}, + {<<"language">>, <<"javascript">>}, + {<<"validate_doc_update">>, ?REP_DB_DOC_VALIDATE_FUN} + ]}), + {ok, _Rev} = couch_db:update_doc(RepDb, DDoc, []) + end, + ok. + source_db_update_notifier(#db{name = DbName}) -> Server = self(), {ok, Notifier} = couch_db_update_notifier:start_link( |