From e59afb307256a648172c156b0d478ad0f32d2dd6 Mon Sep 17 00:00:00 2001 From: Robert Newson Date: Thu, 16 Jun 2011 17:57:24 +0100 Subject: support _design and _doc_ids filters. --- apps/couch/src/couch_changes.erl | 32 +++++++++++++++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) (limited to 'apps') diff --git a/apps/couch/src/couch_changes.erl b/apps/couch/src/couch_changes.erl index 8d5eae70..44d0ad46 100644 --- a/apps/couch/src/couch_changes.erl +++ b/apps/couch/src/couch_changes.erl @@ -177,6 +177,17 @@ configure_filter(Filter, Style, Req, Db) when is_list(Filter) -> #doc{body={Props}} = DDoc, couch_util:get_nested_json_value({Props}, [<<"filters">>, FName]), {custom, Style, {Db, JsonReq, DDoc, FName}}; + [<<"_doc_ids">>] -> + DocIds = ?JSON_DECODE(couch_httpd:qs_value(Req, "doc_ids", "null")), + case is_list(DocIds) of + true -> ok; + false -> throw({bad_request, "`doc_ids` filter parameter is not a list."}) + end, + {builtin, Style, {doc_ids, DocIds}}; + [<<"_design">>] -> + {builtin, Style, design}; + [<<"_", _/binary>>] -> + throw({bad_request, "unknown builtin filter name"}); _Else -> throw({bad_request, "filter parameter must be of the form `designname/filtername`"}) @@ -191,7 +202,11 @@ filter(#doc_info{revs=Revs}, all_docs) -> filter(#doc_info{id=Id, revs=RevInfos}, {custom, main_only, Acc}) -> custom_filter(Id, [(hd(RevInfos))#rev_info.rev], Acc); filter(#doc_info{id=Id, revs=RevInfos}, {custom, all_docs, Acc}) -> - custom_filter(Id, [R || #rev_info{rev=R} <- RevInfos], Acc). + custom_filter(Id, [R || #rev_info{rev=R} <- RevInfos], Acc); +filter(#doc_info{id=Id, revs=RevInfos}, {builtin, main_only, Acc}) -> + builtin_filter(Id, [(hd(RevInfos))#rev_info.rev], Acc); +filter(#doc_info{id=Id, revs=RevInfos}, {builtin, all_docs, Acc}) -> + builtin_filter(Id, [R || #rev_info{rev=R} <- RevInfos], Acc). custom_filter(Id, Revs, {Db, JsonReq, DDoc, Filter}) -> {ok, Results} = fabric:open_revs(Db, Id, Revs, [deleted, conflicts]), @@ -203,6 +218,21 @@ custom_filter(Id, Revs, {Db, JsonReq, DDoc, Filter}) -> || {Pass, #doc{revs={RevPos,[RevId|_]}}} <- lists:zip(Passes, Docs), Pass == true]. +builtin_filter(Id, Revs, design) -> + case Id of + <<"_design", _/binary>> -> + [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]} || Rev <- Revs]; + _ -> + [] + end; +builtin_filter(Id, Revs, {doc_ids, DocIds}) -> + case lists:member(Id, DocIds) of + true -> + [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]} || Rev <- Revs]; + false -> + [] + end. + get_changes_timeout(Args, Callback) -> #changes_args{ heartbeat = Heartbeat, -- cgit v1.2.3 From 2e119bc5ddb41f9f95af567ba4c6fd02933766a6 Mon Sep 17 00:00:00 2001 From: Filipe David Borba Manana Date: Thu, 23 Jun 2011 09:46:46 +0000 Subject: Merged revision 1104168 from trunk Add infinity timeout to couch_ref_counter calls After compacting a very large database, the updater calls the couch_db gen_server with a db record that contains a new ref counter. The couch_db gen_server calls drop on the old ref counter and calls add on the new ref counter. However since the system is busy deleting the old db file or garbage collecting, one of the ref counter calls times out, causing couch_db's terminate to invoked and terminate calls shutdown on the updater. However the updater is waiting for the call it made to couch_db to complete, which can't complete since it's waiting for the updater. git-svn-id: https://svn.apache.org/repos/asf/couchdb/branches/1.1.x@1138799 13f79535-47bb-0310-9956-ffa450edef68 --- apps/couch/src/couch_ref_counter.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'apps') diff --git a/apps/couch/src/couch_ref_counter.erl b/apps/couch/src/couch_ref_counter.erl index 5a111ab6..a774f469 100644 --- a/apps/couch/src/couch_ref_counter.erl +++ b/apps/couch/src/couch_ref_counter.erl @@ -24,14 +24,14 @@ drop(RefCounterPid) -> drop(RefCounterPid, self()). drop(RefCounterPid, Pid) -> - gen_server:call(RefCounterPid, {drop, Pid}). + gen_server:call(RefCounterPid, {drop, Pid}, infinity). add(RefCounterPid) -> add(RefCounterPid, self()). add(RefCounterPid, Pid) -> - gen_server:call(RefCounterPid, {add, Pid}). + gen_server:call(RefCounterPid, {add, Pid}, infinity). count(RefCounterPid) -> gen_server:call(RefCounterPid, count). -- cgit v1.2.3 From d6b9664ef4b928585ddb76df5b693d9d66f61ba0 Mon Sep 17 00:00:00 2001 From: Filipe David Borba Manana Date: Tue, 28 Jun 2011 23:01:30 +0000 Subject: Backport revision 1140886 from trunk Improved error message in the replicator git-svn-id: https://svn.apache.org/repos/asf/couchdb/branches/1.1.x@1140887 13f79535-47bb-0310-9956-ffa450edef68 --- apps/couch/src/couch_rep.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'apps') diff --git a/apps/couch/src/couch_rep.erl b/apps/couch/src/couch_rep.erl index 966d6872..02f7bee7 100644 --- a/apps/couch/src/couch_rep.erl +++ b/apps/couch/src/couch_rep.erl @@ -322,7 +322,7 @@ start_replication_server(Replicator) -> throw({db_not_found, <<"could not open ", DbUrl/binary>>}); {error, {unauthorized, DbUrl}} -> throw({unauthorized, - <<"unauthorized to access database ", DbUrl/binary>>}); + <<"unauthorized to access or create database ", DbUrl/binary>>}); {error, {'EXIT', {badarg, [{erlang, apply, [gen_server, start_link, undefined]} | _]}}} -> % Clause to deal with a change in the supervisor module introduced @@ -338,7 +338,7 @@ start_replication_server(Replicator) -> throw({db_not_found, <<"could not open ", DbUrl/binary>>}); {error, {{unauthorized, DbUrl}, _}} -> throw({unauthorized, - <<"unauthorized to access database ", DbUrl/binary>>}) + <<"unauthorized to access or create database ", DbUrl/binary>>}) end. compare_replication_logs(SrcDoc, TgtDoc) -> -- cgit v1.2.3 From 82fe0bb6384d7b9a6528e02aa4597a3c1431419a Mon Sep 17 00:00:00 2001 From: Filipe David Borba Manana Date: Sat, 2 Jul 2011 18:44:00 +0000 Subject: Restart replications on error If a replication transitions to the "error" state, attempt to restart it up to "max_replication_retry_count" times (.ini configuration parameter). This number of retry attempts can now be set to "infinity" as well. This was already current behaviour in trunk (upcoming 1.2). Closes COUCHDB-1194. git-svn-id: https://svn.apache.org/repos/asf/couchdb/branches/1.1.x@1142258 13f79535-47bb-0310-9956-ffa450edef68 Conflicts: apps/couch/src/couch_rep.erl --- apps/couch/src/couch_rep.erl | 144 +-------- apps/couch/src/couch_replication_manager.erl | 424 ++++++++++++++++++++------- 2 files changed, 327 insertions(+), 241 deletions(-) (limited to 'apps') diff --git a/apps/couch/src/couch_rep.erl b/apps/couch/src/couch_rep.erl index 02f7bee7..eabef37b 100644 --- a/apps/couch/src/couch_rep.erl +++ b/apps/couch/src/couch_rep.erl @@ -16,12 +16,10 @@ code_change/3]). -export([replicate/2, checkpoint/1]). --export([ensure_rep_db_exists/0, make_replication_id/2]). +-export([make_replication_id/2]). -export([start_replication/3, end_replication/1, get_result/4]). --export([update_rep_doc/2]). -include("couch_db.hrl"). --include("couch_js_functions.hrl"). -include_lib("ibrowse/include/ibrowse.hrl"). -define(REP_ID_VERSION, 2). @@ -54,7 +52,6 @@ committed_seq = 0, stats = nil, - rep_doc = nil, source_db_update_notifier = nil, target_db_update_notifier = nil }). @@ -94,11 +91,11 @@ end_replication({BaseId, Extension}) -> end end. -start_replication(RepDoc, {BaseId, Extension}, UserCtx) -> +start_replication(RepDoc, {BaseId, Extension} = RepId, UserCtx) -> Replicator = { BaseId ++ Extension, {gen_server, start_link, - [?MODULE, [BaseId, RepDoc, UserCtx], []]}, + [?MODULE, [RepId, RepDoc, UserCtx], []]}, temporary, 1, worker, @@ -135,7 +132,7 @@ init(InitArgs) -> {stop, Error} end. -do_init([RepId, {PostProps} = RepDoc, UserCtx] = InitArgs) -> +do_init([{BaseId, _Ext} = RepId, {PostProps}, UserCtx] = InitArgs) -> process_flag(trap_exit, true), SourceProps = couch_util:get_value(<<"source">>, PostProps), @@ -152,10 +149,8 @@ do_init([RepId, {PostProps} = RepDoc, UserCtx] = InitArgs) -> SourceInfo = dbinfo(Source), TargetInfo = dbinfo(Target), - maybe_set_triggered(RepDoc, RepId), - [SourceLog, TargetLog] = find_replication_logs( - [Source, Target], RepId, {PostProps}, UserCtx), + [Source, Target], BaseId, {PostProps}, UserCtx), {StartSeq, History} = compare_replication_logs(SourceLog, TargetLog), {ok, ChangesFeed} = @@ -174,10 +169,12 @@ do_init([RepId, {PostProps} = RepDoc, UserCtx] = InitArgs) -> ets:insert(Stats, {docs_written, 0}), ets:insert(Stats, {doc_write_failures, 0}), - {ShortId, _} = lists:split(6, RepId), + {ShortId, _} = lists:split(6, BaseId), couch_task_status:add_task("Replication", io_lib:format("~s: ~s -> ~s", [ShortId, dbname(Source), dbname(Target)]), "Starting"), + couch_replication_manager:replication_started(RepId), + State = #state{ changes_feed = ChangesFeed, missing_revs = MissingRevs, @@ -200,7 +197,6 @@ do_init([RepId, {PostProps} = RepDoc, UserCtx] = InitArgs) -> rep_starttime = httpd_util:rfc1123_date(), src_starttime = couch_util:get_value(instance_start_time, SourceInfo), tgt_starttime = couch_util:get_value(instance_start_time, TargetInfo), - rep_doc = RepDoc, source_db_update_notifier = source_db_update_notifier(Source), target_db_update_notifier = target_db_update_notifier(Target) }, @@ -272,27 +268,24 @@ handle_info({'EXIT', _Pid, {Err, Reason}}, State) when Err == source_error; handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}. -terminate(normal, #state{checkpoint_scheduled=nil} = State) -> +terminate(normal, #state{checkpoint_scheduled=nil, init_args=[RepId | _]} = State) -> do_terminate(State), - update_rep_doc( - State#state.rep_doc, [{<<"_replication_state">>, <<"completed">>}]); + couch_replication_manager:replication_completed(RepId); -terminate(normal, State) -> +terminate(normal, #state{init_args=[RepId | _]} = State) -> timer:cancel(State#state.checkpoint_scheduled), do_terminate(do_checkpoint(State)), - update_rep_doc( - State#state.rep_doc, [{<<"_replication_state">>, <<"completed">>}]); + couch_replication_manager:replication_completed(RepId); terminate(shutdown, #state{listeners = Listeners} = State) -> % continuous replication stopped [gen_server:reply(L, {ok, stopped}) || L <- Listeners], terminate_cleanup(State); -terminate(Reason, #state{listeners = Listeners} = State) -> +terminate(Reason, #state{listeners = Listeners, init_args=[RepId | _]} = State) -> [gen_server:reply(L, {error, Reason}) || L <- Listeners], terminate_cleanup(State), - update_rep_doc( - State#state.rep_doc, [{<<"_replication_state">>, <<"error">>}]). + couch_replication_manager:replication_error(RepId, Reason). code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -698,7 +691,7 @@ do_checkpoint(State) -> src_starttime = SrcInstanceStartTime, tgt_starttime = TgtInstanceStartTime, stats = Stats, - rep_doc = {RepDoc} + init_args = [_RepId, {RepDoc} | _] } = State, case commit_to_both(Source, Target, NewSeqNum) of {SrcInstanceStartTime, TgtInstanceStartTime} -> @@ -901,113 +894,6 @@ parse_proxy_params(ProxyUrl) -> [{proxy_user, User}, {proxy_password, Passwd}] end. -update_rep_doc({Props} = _RepDoc, KVs) -> - case couch_util:get_value(<<"_id">>, Props) of - undefined -> - % replication triggered by POSTing to _replicate/ - ok; - RepDocId -> - % replication triggered by adding a Rep Doc to the replicator DB - {ok, RepDb} = ensure_rep_db_exists(), - case couch_db:open_doc(RepDb, RepDocId, []) of - {ok, LatestRepDoc} -> - update_rep_doc(RepDb, LatestRepDoc, KVs); - _ -> - ok - end, - couch_db:close(RepDb) - end. - -update_rep_doc(RepDb, #doc{body = {RepDocBody}} = RepDoc, KVs) -> - NewRepDocBody = lists:foldl( - fun({<<"_replication_state">> = K, State} = KV, Body) -> - case couch_util:get_value(K, Body) of - State -> - Body; - _ -> - Body1 = lists:keystore(K, 1, Body, KV), - lists:keystore( - <<"_replication_state_time">>, 1, - Body1, {<<"_replication_state_time">>, timestamp()}) - end; - ({K, _V} = KV, Body) -> - lists:keystore(K, 1, Body, KV) - end, - RepDocBody, - KVs - ), - case NewRepDocBody of - RepDocBody -> - ok; - _ -> - % might not succeed - when the replication doc is deleted right - % before this update (not an error) - couch_db:update_doc(RepDb, RepDoc#doc{body = {NewRepDocBody}}, []) - end. - -% RFC3339 timestamps. -% Note: doesn't include the time seconds fraction (RFC3339 says it's optional). -timestamp() -> - {{Year, Month, Day}, {Hour, Min, Sec}} = calendar:now_to_local_time(now()), - UTime = erlang:universaltime(), - LocalTime = calendar:universal_time_to_local_time(UTime), - DiffSecs = calendar:datetime_to_gregorian_seconds(LocalTime) - - calendar:datetime_to_gregorian_seconds(UTime), - zone(DiffSecs div 3600, (DiffSecs rem 3600) div 60), - iolist_to_binary( - io_lib:format("~4..0w-~2..0w-~2..0wT~2..0w:~2..0w:~2..0w~s", - [Year, Month, Day, Hour, Min, Sec, - zone(DiffSecs div 3600, (DiffSecs rem 3600) div 60)])). - -zone(Hr, Min) when Hr >= 0, Min >= 0 -> - io_lib:format("+~2..0w:~2..0w", [Hr, Min]); -zone(Hr, Min) -> - io_lib:format("-~2..0w:~2..0w", [abs(Hr), abs(Min)]). - - -maybe_set_triggered({RepProps} = RepDoc, RepId) -> - case couch_util:get_value(<<"_replication_state">>, RepProps) of - <<"triggered">> -> - ok; - _ -> - update_rep_doc( - RepDoc, - [ - {<<"_replication_state">>, <<"triggered">>}, - {<<"_replication_id">>, ?l2b(RepId)} - ] - ) - end. - -ensure_rep_db_exists() -> - DbName = ?l2b(couch_config:get("replicator", "db", "_replicator")), - Opts = [ - {user_ctx, #user_ctx{roles=[<<"_admin">>, <<"_replicator">>]}}, - sys_db - ], - case couch_db:open(DbName, Opts) of - {ok, Db} -> - Db; - _Error -> - {ok, Db} = couch_db:create(DbName, Opts) - end, - ok = ensure_rep_ddoc_exists(Db, <<"_design/_replicator">>), - {ok, Db}. - -ensure_rep_ddoc_exists(RepDb, DDocID) -> - case couch_db:open_doc(RepDb, DDocID, []) of - {ok, _Doc} -> - ok; - _ -> - DDoc = couch_doc:from_json_obj({[ - {<<"_id">>, DDocID}, - {<<"language">>, <<"javascript">>}, - {<<"validate_doc_update">>, ?REP_DB_DOC_VALIDATE_FUN} - ]}), - {ok, _Rev} = couch_db:update_doc(RepDb, DDoc, []) - end, - ok. - source_db_update_notifier(#db{name = DbName}) -> Server = self(), {ok, Notifier} = couch_db_update_notifier:start_link( diff --git a/apps/couch/src/couch_replication_manager.erl b/apps/couch/src/couch_replication_manager.erl index e3d97c37..7e2c8118 100644 --- a/apps/couch/src/couch_replication_manager.erl +++ b/apps/couch/src/couch_replication_manager.erl @@ -13,14 +13,20 @@ -module(couch_replication_manager). -behaviour(gen_server). +% public API +-export([replication_started/1, replication_completed/1, replication_error/2]). + +% gen_server callbacks -export([start_link/0, init/1, handle_call/3, handle_info/2, handle_cast/2]). -export([code_change/3, terminate/2]). -include("couch_db.hrl"). +-include("couch_js_functions.hrl"). --define(DOC_ID_TO_REP_ID, rep_doc_id_to_rep_id). --define(REP_ID_TO_DOC_ID, rep_id_to_rep_doc_id). --define(INITIAL_WAIT, 5). +-define(DOC_TO_REP, couch_rep_doc_id_to_rep_id). +-define(REP_TO_STATE, couch_rep_id_to_rep_state). +-define(INITIAL_WAIT, 2.5). % seconds +-define(MAX_WAIT, 600). % seconds -record(state, { changes_feed_loop = nil, @@ -30,6 +36,16 @@ max_retries }). +-record(rep_state, { + doc_id, + user_ctx, + doc, + starting, + retries_left, + max_retries, + wait = ?INITIAL_WAIT +}). + -import(couch_util, [ get_value/2, get_value/3, @@ -40,17 +56,56 @@ start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +replication_started({BaseId, _} = RepId) -> + case rep_state(RepId) of + nil -> + ok; + #rep_state{doc_id = DocId} -> + update_rep_doc(DocId, [ + {<<"_replication_state">>, <<"triggered">>}, + {<<"_replication_id">>, ?l2b(BaseId)}]), + ok = gen_server:call(?MODULE, {rep_started, RepId}, infinity), + ?LOG_INFO("Document `~s` triggered replication `~s`", + [DocId, pp_rep_id(RepId)]) + end. + + +replication_completed(RepId) -> + case rep_state(RepId) of + nil -> + ok; + #rep_state{doc_id = DocId} = St -> + update_rep_doc(DocId, [{<<"_replication_state">>, <<"completed">>}]), + ok = gen_server:call(?MODULE, {rep_complete, RepId}, infinity), + ?LOG_INFO("Replication `~s` finished (triggered by document `~s`)", + [pp_rep_id(RepId), DocId]) + end. + + +replication_error({BaseId, _} = RepId, Error) -> + case rep_state(RepId) of + nil -> + ok; + #rep_state{doc_id = DocId} -> + % TODO: maybe add error reason to replication document + update_rep_doc(DocId, [ + {<<"_replication_state">>, <<"error">>}, + {<<"_replication_id">>, ?l2b(BaseId)}]), + ok = gen_server:call(?MODULE, {rep_error, RepId, Error}, infinity) + end. + + init(_) -> process_flag(trap_exit, true), - _ = ets:new(?DOC_ID_TO_REP_ID, [named_table, set, protected]), - _ = ets:new(?REP_ID_TO_DOC_ID, [named_table, set, private]), + ?DOC_TO_REP = ets:new(?DOC_TO_REP, [named_table, set, protected]), + ?REP_TO_STATE = ets:new(?REP_TO_STATE, [named_table, set, protected]), Server = self(), ok = couch_config:register( fun("replicator", "db", NewName) -> ok = gen_server:cast(Server, {rep_db_changed, ?l2b(NewName)}); - ("replicator", "max_replication_retry_count", NewMaxRetries1) -> - NewMaxRetries = list_to_integer(NewMaxRetries1), - ok = gen_server:cast(Server, {set_max_retries, NewMaxRetries}) + ("replicator", "max_replication_retry_count", V) -> + ok = gen_server:cast(Server, {set_max_retries, retries_value(V)}) end ), {Loop, RepDbName} = changes_feed_loop(), @@ -58,7 +113,7 @@ init(_) -> changes_feed_loop = Loop, rep_db_name = RepDbName, db_notifier = db_update_notifier(), - max_retries = list_to_integer( + max_retries = retries_value( couch_config:get("replicator", "max_replication_retry_count", "10")) }}. @@ -68,32 +123,35 @@ handle_call({rep_db_update, {ChangeProps} = Change}, _From, State) -> process_update(State, Change) catch _Tag:Error -> - JsonRepDoc = get_value(doc, ChangeProps), - rep_db_update_error(Error, JsonRepDoc), + {RepProps} = get_value(doc, ChangeProps), + DocId = get_value(<<"_id">>, RepProps), + rep_db_update_error(Error, DocId), State end, {reply, ok, NewState}; -handle_call({triggered, {BaseId, _}}, _From, State) -> - [{BaseId, {DocId, true}}] = ets:lookup(?REP_ID_TO_DOC_ID, BaseId), - true = ets:insert(?REP_ID_TO_DOC_ID, {BaseId, {DocId, false}}), +handle_call({rep_started, RepId}, _From, State) -> + case rep_state(RepId) of + nil -> + ok; + RepState -> + NewRepState = RepState#rep_state{ + starting = false, + retries_left = State#state.max_retries, + max_retries = State#state.max_retries, + wait = ?INITIAL_WAIT + }, + true = ets:insert(?REP_TO_STATE, {RepId, NewRepState}) + end, {reply, ok, State}; -handle_call({restart_failure, {Props} = RepDoc, Error}, _From, State) -> - DocId = get_value(<<"_id">>, Props), - [{DocId, {{BaseId, _} = RepId, MaxRetries}}] = ets:lookup( - ?DOC_ID_TO_REP_ID, DocId), - ?LOG_ERROR("Failed to start replication `~s` after ~p attempts using " - "the document `~s`. Last error reason was: ~p", - [pp_rep_id(RepId), MaxRetries, DocId, Error]), - couch_rep:update_rep_doc( - RepDoc, - [{<<"_replication_state">>, <<"error">>}, - {<<"_replication_id">>, ?l2b(BaseId)}]), - true = ets:delete(?REP_ID_TO_DOC_ID, BaseId), - true = ets:delete(?DOC_ID_TO_REP_ID, DocId), +handle_call({rep_complete, RepId}, _From, State) -> + true = ets:delete(?REP_TO_STATE, RepId), {reply, ok, State}; +handle_call({rep_error, RepId, Error}, _From, State) -> + {reply, ok, replication_error(State, RepId, Error)}; + handle_call(Msg, From, State) -> ?LOG_ERROR("Replication manager received unexpected call ~p from ~p", [Msg, From]), @@ -150,8 +208,8 @@ terminate(_Reason, State) -> catch exit(Pid, stop) end, [Loop | StartPids]), - true = ets:delete(?REP_ID_TO_DOC_ID), - true = ets:delete(?DOC_ID_TO_REP_ID), + true = ets:delete(?REP_TO_STATE), + true = ets:delete(?DOC_TO_REP), couch_db_update_notifier:stop(Notifier). @@ -160,7 +218,7 @@ code_change(_OldVsn, State, _Extra) -> changes_feed_loop() -> - {ok, RepDb} = couch_rep:ensure_rep_db_exists(), + {ok, RepDb} = ensure_rep_db_exists(), Server = self(), Pid = spawn_link( fun() -> @@ -245,21 +303,20 @@ process_update(State, {Change}) -> State; false -> case get_value(<<"_replication_state">>, RepProps) of + undefined -> + maybe_start_replication(State, DocId, JsonRepDoc); + <<"triggered">> -> + maybe_start_replication(State, DocId, JsonRepDoc); <<"completed">> -> replication_complete(DocId), State; - <<"error">> -> - stop_replication(DocId), - State; - <<"triggered">> -> - maybe_start_replication(State, DocId, JsonRepDoc); - undefined -> - maybe_start_replication(State, DocId, JsonRepDoc) + _ -> + State end end. -rep_db_update_error(Error, {Props} = JsonRepDoc) -> +rep_db_update_error(Error, DocId) -> case Error of {bad_rep_doc, Reason} -> ok; @@ -267,9 +324,8 @@ rep_db_update_error(Error, {Props} = JsonRepDoc) -> Reason = to_binary(Error) end, ?LOG_ERROR("Replication manager, error processing document `~s`: ~s", - [get_value(<<"_id">>, Props), Reason]), - couch_rep:update_rep_doc( - JsonRepDoc, [{<<"_replication_state">>, <<"error">>}]). + [DocId, Reason]), + update_rep_doc(DocId, [{<<"_replication_state">>, <<"error">>}]). rep_user_ctx({RepDoc}) -> @@ -284,30 +340,39 @@ rep_user_ctx({RepDoc}) -> end. -maybe_start_replication(#state{max_retries = MaxRetries} = State, - DocId, JsonRepDoc) -> - UserCtx = rep_user_ctx(JsonRepDoc), - {BaseId, _} = RepId = make_rep_id(JsonRepDoc, UserCtx), - case ets:lookup(?REP_ID_TO_DOC_ID, BaseId) of - [] -> - true = ets:insert(?REP_ID_TO_DOC_ID, {BaseId, {DocId, true}}), - true = ets:insert(?DOC_ID_TO_REP_ID, {DocId, {RepId, MaxRetries}}), +maybe_start_replication(State, DocId, RepDoc) -> + UserCtx = rep_user_ctx(RepDoc), + {BaseId, _} = RepId = make_rep_id(RepDoc, UserCtx), + case rep_state(RepId) of + nil -> + RepState = #rep_state{ + doc_id = DocId, + user_ctx = UserCtx, + doc = RepDoc, + starting = true, + retries_left = State#state.max_retries, + max_retries = State#state.max_retries + }, + true = ets:insert(?REP_TO_STATE, {RepId, RepState}), + true = ets:insert(?DOC_TO_REP, {DocId, RepId}), + ?LOG_INFO("Attempting to start replication `~s` (document `~s`).", + [pp_rep_id(RepId), DocId]), Server = self(), Pid = spawn_link(fun() -> - start_replication(Server, JsonRepDoc, RepId, UserCtx, MaxRetries) + start_replication(Server, RepDoc, RepId, UserCtx, 0) end), State#state{rep_start_pids = [Pid | State#state.rep_start_pids]}; - [{BaseId, {DocId, _}}] -> + #rep_state{doc_id = DocId} -> State; - [{BaseId, {OtherDocId, false}}] -> + #rep_state{starting = false, doc_id = OtherDocId} -> ?LOG_INFO("The replication specified by the document `~s` was already" " triggered by the document `~s`", [DocId, OtherDocId]), - maybe_tag_rep_doc(JsonRepDoc, ?l2b(BaseId)), + maybe_tag_rep_doc(DocId, RepDoc, ?l2b(BaseId)), State; - [{BaseId, {OtherDocId, true}}] -> + #rep_state{starting = true, doc_id = OtherDocId} -> ?LOG_INFO("The replication specified by the document `~s` is already" " being triggered by the document `~s`", [DocId, OtherDocId]), - maybe_tag_rep_doc(JsonRepDoc, ?l2b(BaseId)), + maybe_tag_rep_doc(DocId, RepDoc, ?l2b(BaseId)), State end. @@ -323,98 +388,233 @@ make_rep_id(RepDoc, UserCtx) -> end. -maybe_tag_rep_doc({Props} = JsonRepDoc, RepId) -> - case get_value(<<"_replication_id">>, Props) of +maybe_tag_rep_doc(DocId, {RepProps}, RepId) -> + case get_value(<<"_replication_id">>, RepProps) of RepId -> ok; _ -> - couch_rep:update_rep_doc(JsonRepDoc, [{<<"_replication_id">>, RepId}]) + update_rep_doc(DocId, [{<<"_replication_id">>, RepId}]) end. -start_replication(Server, {RepProps} = RepDoc, RepId, UserCtx, MaxRetries) -> +start_replication(Server, RepDoc, RepId, UserCtx, Wait) -> + ok = timer:sleep(Wait * 1000), case (catch couch_rep:start_replication(RepDoc, RepId, UserCtx)) of Pid when is_pid(Pid) -> - ?LOG_INFO("Document `~s` triggered replication `~s`", - [get_value(<<"_id">>, RepProps), pp_rep_id(RepId)]), - ok = gen_server:call(Server, {triggered, RepId}, infinity), + ok = gen_server:call(Server, {rep_started, RepId}, infinity), couch_rep:get_result(Pid, RepId, RepDoc, UserCtx); Error -> - couch_rep:update_rep_doc( - RepDoc, - [{<<"_replication_state">>, <<"error">>}, - {<<"_replication_id">>, ?l2b(element(1, RepId))}]), - keep_retrying( - Server, RepId, RepDoc, UserCtx, Error, ?INITIAL_WAIT, MaxRetries) + replication_error(RepId, Error) end. -keep_retrying(Server, _RepId, RepDoc, _UserCtx, Error, _Wait, 0) -> - ok = gen_server:call(Server, {restart_failure, RepDoc, Error}, infinity); - -keep_retrying(Server, RepId, RepDoc, UserCtx, Error, Wait, RetriesLeft) -> - {RepProps} = RepDoc, - DocId = get_value(<<"_id">>, RepProps), - ?LOG_ERROR("Error starting replication `~s` (document `~s`): ~p. " - "Retrying in ~p seconds", [pp_rep_id(RepId), DocId, Error, Wait]), - ok = timer:sleep(Wait * 1000), - case (catch couch_rep:start_replication(RepDoc, RepId, UserCtx)) of - Pid when is_pid(Pid) -> - ok = gen_server:call(Server, {triggered, RepId}, infinity), - [{DocId, {RepId, MaxRetries}}] = ets:lookup(?DOC_ID_TO_REP_ID, DocId), - ?LOG_INFO("Document `~s` triggered replication `~s` after ~p attempts", - [DocId, pp_rep_id(RepId), MaxRetries - RetriesLeft + 1]), - couch_rep:get_result(Pid, RepId, RepDoc, UserCtx); - NewError -> - keep_retrying( - Server, RepId, RepDoc, UserCtx, NewError, Wait * 2, RetriesLeft - 1) +replication_complete(DocId) -> + case ets:lookup(?DOC_TO_REP, DocId) of + [{DocId, RepId}] -> + case rep_state(RepId) of + nil -> + couch_rep:end_replication(RepId); + #rep_state{} -> + ok + end, + true = ets:delete(?DOC_TO_REP, DocId); + _ -> + ok end. rep_doc_deleted(DocId) -> - case stop_replication(DocId) of - {ok, RepId} -> + case ets:lookup(?DOC_TO_REP, DocId) of + [{DocId, RepId}] -> + couch_rep:end_replication(RepId), + true = ets:delete(?REP_TO_STATE, RepId), + true = ets:delete(?DOC_TO_REP, DocId), ?LOG_INFO("Stopped replication `~s` because replication document `~s`" " was deleted", [pp_rep_id(RepId), DocId]); - none -> + [] -> ok end. -replication_complete(DocId) -> - case stop_replication(DocId) of - {ok, RepId} -> - ?LOG_INFO("Replication `~s` finished (triggered by document `~s`)", - [pp_rep_id(RepId), DocId]); - none -> - ok +replication_error(State, RepId, Error) -> + case rep_state(RepId) of + nil -> + State; + RepState -> + maybe_retry_replication(RepId, RepState, Error, State) end. - -stop_replication(DocId) -> - case ets:lookup(?DOC_ID_TO_REP_ID, DocId) of - [{DocId, {{BaseId, _} = RepId, _MaxRetries}}] -> - couch_rep:end_replication(RepId), - true = ets:delete(?REP_ID_TO_DOC_ID, BaseId), - true = ets:delete(?DOC_ID_TO_REP_ID, DocId), - {ok, RepId}; - [] -> - none - end. +maybe_retry_replication(RepId, #rep_state{retries_left = 0} = RepState, Error, State) -> + #rep_state{ + doc_id = DocId, + max_retries = MaxRetries + } = RepState, + couch_rep:end_replication(RepId), + true = ets:delete(?REP_TO_STATE, RepId), + true = ets:delete(?DOC_TO_REP, DocId), + ?LOG_ERROR("Error in replication `~s` (triggered by document `~s`): ~s" + "~nReached maximum retry attempts (~p).", + [pp_rep_id(RepId), DocId, to_binary(error_reason(Error)), MaxRetries]), + State; + +maybe_retry_replication(RepId, RepState, Error, State) -> + #rep_state{ + doc_id = DocId, + user_ctx = UserCtx, + doc = RepDoc + } = RepState, + #rep_state{wait = Wait} = NewRepState = state_after_error(RepState), + true = ets:insert(?REP_TO_STATE, {RepId, NewRepState}), + ?LOG_ERROR("Error in replication `~s` (triggered by document `~s`): ~s" + "~nRestarting replication in ~p seconds.", + [pp_rep_id(RepId), DocId, to_binary(error_reason(Error)), Wait]), + Server = self(), + Pid = spawn_link(fun() -> + start_replication(Server, RepDoc, RepId, UserCtx, Wait) + end), + State#state{rep_start_pids = [Pid | State#state.rep_start_pids]}. stop_all_replications() -> ?LOG_INFO("Stopping all ongoing replications because the replicator" " database was deleted or changed", []), ets:foldl( - fun({_, {RepId, _}}, _) -> + fun({_, RepId}, _) -> couch_rep:end_replication(RepId) end, - ok, ?DOC_ID_TO_REP_ID), - true = ets:delete_all_objects(?REP_ID_TO_DOC_ID), - true = ets:delete_all_objects(?DOC_ID_TO_REP_ID). + ok, ?DOC_TO_REP), + true = ets:delete_all_objects(?REP_TO_STATE), + true = ets:delete_all_objects(?DOC_TO_REP). + + +update_rep_doc(RepDocId, KVs) -> + {ok, RepDb} = ensure_rep_db_exists(), + try + case couch_db:open_doc(RepDb, RepDocId, []) of + {ok, LatestRepDoc} -> + update_rep_doc(RepDb, LatestRepDoc, KVs); + _ -> + ok + end + catch throw:conflict -> + % Shouldn't happen, as by default only the role _replicator can + % update replication documents. + ?LOG_ERROR("Conflict error when updating replication document `~s`." + " Retrying.", [RepDocId]), + ok = timer:sleep(5), + update_rep_doc(RepDocId, KVs) + after + couch_db:close(RepDb) + end. + +update_rep_doc(RepDb, #doc{body = {RepDocBody}} = RepDoc, KVs) -> + NewRepDocBody = lists:foldl( + fun({<<"_replication_state">> = K, State} = KV, Body) -> + case get_value(K, Body) of + State -> + Body; + _ -> + Body1 = lists:keystore(K, 1, Body, KV), + lists:keystore( + <<"_replication_state_time">>, 1, Body1, + {<<"_replication_state_time">>, timestamp()}) + end; + ({K, _V} = KV, Body) -> + lists:keystore(K, 1, Body, KV) + end, + RepDocBody, KVs), + case NewRepDocBody of + RepDocBody -> + ok; + _ -> + % Might not succeed - when the replication doc is deleted right + % before this update (not an error, ignore). + couch_db:update_doc(RepDb, RepDoc#doc{body = {NewRepDocBody}}, []) + end. + + +% RFC3339 timestamps. +% Note: doesn't include the time seconds fraction (RFC3339 says it's optional). +timestamp() -> + {{Year, Month, Day}, {Hour, Min, Sec}} = calendar:now_to_local_time(now()), + UTime = erlang:universaltime(), + LocalTime = calendar:universal_time_to_local_time(UTime), + DiffSecs = calendar:datetime_to_gregorian_seconds(LocalTime) - + calendar:datetime_to_gregorian_seconds(UTime), + zone(DiffSecs div 3600, (DiffSecs rem 3600) div 60), + iolist_to_binary( + io_lib:format("~4..0w-~2..0w-~2..0wT~2..0w:~2..0w:~2..0w~s", + [Year, Month, Day, Hour, Min, Sec, + zone(DiffSecs div 3600, (DiffSecs rem 3600) div 60)])). + +zone(Hr, Min) when Hr >= 0, Min >= 0 -> + io_lib:format("+~2..0w:~2..0w", [Hr, Min]); +zone(Hr, Min) -> + io_lib:format("-~2..0w:~2..0w", [abs(Hr), abs(Min)]). + + +ensure_rep_db_exists() -> + DbName = ?l2b(couch_config:get("replicator", "db", "_replicator")), + Opts = [ + {user_ctx, #user_ctx{roles=[<<"_admin">>, <<"_replicator">>]}}, + sys_db + ], + case couch_db:open(DbName, Opts) of + {ok, Db} -> + Db; + _Error -> + {ok, Db} = couch_db:create(DbName, Opts) + end, + ok = ensure_rep_ddoc_exists(Db, <<"_design/_replicator">>), + {ok, Db}. + + +ensure_rep_ddoc_exists(RepDb, DDocID) -> + case couch_db:open_doc(RepDb, DDocID, []) of + {ok, _Doc} -> + ok; + _ -> + DDoc = couch_doc:from_json_obj({[ + {<<"_id">>, DDocID}, + {<<"language">>, <<"javascript">>}, + {<<"validate_doc_update">>, ?REP_DB_DOC_VALIDATE_FUN} + ]}), + {ok, _Rev} = couch_db:update_doc(RepDb, DDoc, []) + end, + ok. % pretty-print replication id pp_rep_id({Base, Extension}) -> Base ++ Extension. + + +rep_state(RepId) -> + case ets:lookup(?REP_TO_STATE, RepId) of + [{RepId, RepState}] -> + RepState; + [] -> + nil + end. + + +error_reason({error, Reason}) -> + Reason; +error_reason(Reason) -> + Reason. + + +retries_value("infinity") -> + infinity; +retries_value(Value) -> + list_to_integer(Value). + + +state_after_error(#rep_state{retries_left = Left, wait = Wait} = State) -> + Wait2 = erlang:min(trunc(Wait * 2), ?MAX_WAIT), + case Left of + infinity -> + State#rep_state{wait = Wait2}; + _ -> + State#rep_state{retries_left = Left - 1, wait = Wait2} + end. -- cgit v1.2.3 From 571d63d20b8f20dc662d755248c9fafe47811782 Mon Sep 17 00:00:00 2001 From: Filipe David Borba Manana Date: Sat, 2 Jul 2011 18:47:54 +0000 Subject: Merge revision 1142259 from trunk Replication manager, ignore db monitor messages Ignore local database monitor messages. These messages are received in some circunstances because replication processes spawn local database monitors but never do the corresponding erlang:demonitor/1,2 calls. git-svn-id: https://svn.apache.org/repos/asf/couchdb/branches/1.1.x@1142260 13f79535-47bb-0310-9956-ffa450edef68 --- apps/couch/src/couch_replication_manager.erl | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'apps') diff --git a/apps/couch/src/couch_replication_manager.erl b/apps/couch/src/couch_replication_manager.erl index 7e2c8118..943cafa8 100644 --- a/apps/couch/src/couch_replication_manager.erl +++ b/apps/couch/src/couch_replication_manager.erl @@ -190,6 +190,10 @@ handle_info({'EXIT', From, normal}, #state{rep_start_pids = Pids} = State) -> % one of the replication start processes terminated successfully {noreply, State#state{rep_start_pids = Pids -- [From]}}; +handle_info({'DOWN', _Ref, _, _, _}, State) -> + % From a db monitor created by a replication process. Ignore. + {noreply, State}; + handle_info(Msg, State) -> ?LOG_ERROR("Replication manager received unexpected message ~p", [Msg]), {stop, {unexpected_msg, Msg}, State}. -- cgit v1.2.3 From d83235b3ccf4e15ceac51dd78d11b810e0e87cbe Mon Sep 17 00:00:00 2001 From: Filipe David Borba Manana Date: Sat, 2 Jul 2011 18:52:14 +0000 Subject: Merged revision 1142262 from trunk On server startup, restart replications in error If we setup a continuous replication which goes into an error state and restart Couch just before the replication is retried (before it transitions to the triggered state), the user has to manually restart the replication (recreating the document or deleting its _replication_state field). git-svn-id: https://svn.apache.org/repos/asf/couchdb/branches/1.1.x@1142263 13f79535-47bb-0310-9956-ffa450edef68 --- apps/couch/src/couch_replication_manager.erl | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) (limited to 'apps') diff --git a/apps/couch/src/couch_replication_manager.erl b/apps/couch/src/couch_replication_manager.erl index 943cafa8..b3fc3e3c 100644 --- a/apps/couch/src/couch_replication_manager.erl +++ b/apps/couch/src/couch_replication_manager.erl @@ -314,8 +314,13 @@ process_update(State, {Change}) -> <<"completed">> -> replication_complete(DocId), State; - _ -> - State + <<"error">> -> + case ets:lookup(?DOC_TO_REP, DocId) of + [] -> + maybe_start_replication(State, DocId, JsonRepDoc); + _ -> + State + end end end. -- cgit v1.2.3 From 70b6c534731f856d899d1fe26454548ee9111db3 Mon Sep 17 00:00:00 2001 From: Filipe David Borba Manana Date: Sun, 3 Jul 2011 09:58:54 +0000 Subject: Remove warning about unused variable git-svn-id: https://svn.apache.org/repos/asf/couchdb/branches/1.1.x@1142410 13f79535-47bb-0310-9956-ffa450edef68 --- apps/couch/src/couch_replication_manager.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'apps') diff --git a/apps/couch/src/couch_replication_manager.erl b/apps/couch/src/couch_replication_manager.erl index b3fc3e3c..b3c6db11 100644 --- a/apps/couch/src/couch_replication_manager.erl +++ b/apps/couch/src/couch_replication_manager.erl @@ -75,7 +75,7 @@ replication_completed(RepId) -> case rep_state(RepId) of nil -> ok; - #rep_state{doc_id = DocId} = St -> + #rep_state{doc_id = DocId} -> update_rep_doc(DocId, [{<<"_replication_state">>, <<"completed">>}]), ok = gen_server:call(?MODULE, {rep_complete, RepId}, infinity), ?LOG_INFO("Replication `~s` finished (triggered by document `~s`)", -- cgit v1.2.3 From ceabaac4e8b213126ea7cbdeab9a91009f9bf900 Mon Sep 17 00:00:00 2001 From: Robert Newson Date: Wed, 13 Jul 2011 16:02:39 +0100 Subject: move couch_js_functions.hrl to include dir --- apps/couch/include/couch_js_functions.hrl | 238 ++++++++++++++++++++++++++++++ apps/couch/src/couch_js_functions.hrl | 238 ------------------------------ 2 files changed, 238 insertions(+), 238 deletions(-) create mode 100644 apps/couch/include/couch_js_functions.hrl delete mode 100644 apps/couch/src/couch_js_functions.hrl (limited to 'apps') diff --git a/apps/couch/include/couch_js_functions.hrl b/apps/couch/include/couch_js_functions.hrl new file mode 100644 index 00000000..d07eead5 --- /dev/null +++ b/apps/couch/include/couch_js_functions.hrl @@ -0,0 +1,238 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-define(AUTH_DB_DOC_VALIDATE_FUNCTION, <<" + function(newDoc, oldDoc, userCtx) { + if (newDoc._deleted === true) { + // allow deletes by admins and matching users + // without checking the other fields + if ((userCtx.roles.indexOf('_admin') !== -1) || + (userCtx.name == oldDoc.name)) { + return; + } else { + throw({forbidden: 'Only admins may delete other user docs.'}); + } + } + + if ((oldDoc && oldDoc.type !== 'user') || newDoc.type !== 'user') { + throw({forbidden : 'doc.type must be user'}); + } // we only allow user docs for now + + if (!newDoc.name) { + throw({forbidden: 'doc.name is required'}); + } + + if (newDoc.roles && !isArray(newDoc.roles)) { + throw({forbidden: 'doc.roles must be an array'}); + } + + if (newDoc._id !== ('org.couchdb.user:' + newDoc.name)) { + throw({ + forbidden: 'Doc ID must be of the form org.couchdb.user:name' + }); + } + + if (oldDoc) { // validate all updates + if (oldDoc.name !== newDoc.name) { + throw({forbidden: 'Usernames can not be changed.'}); + } + } + + if (newDoc.password_sha && !newDoc.salt) { + throw({ + forbidden: 'Users with password_sha must have a salt.' + + 'See /_utils/script/couch.js for example code.' + }); + } + + if (userCtx.roles.indexOf('_admin') === -1) { + if (oldDoc) { // validate non-admin updates + if (userCtx.name !== newDoc.name) { + throw({ + forbidden: 'You may only update your own user document.' + }); + } + // validate role updates + var oldRoles = oldDoc.roles.sort(); + var newRoles = newDoc.roles.sort(); + + if (oldRoles.length !== newRoles.length) { + throw({forbidden: 'Only _admin may edit roles'}); + } + + for (var i = 0; i < oldRoles.length; i++) { + if (oldRoles[i] !== newRoles[i]) { + throw({forbidden: 'Only _admin may edit roles'}); + } + } + } else if (newDoc.roles.length > 0) { + throw({forbidden: 'Only _admin may set roles'}); + } + } + + // no system roles in users db + for (var i = 0; i < newDoc.roles.length; i++) { + if (newDoc.roles[i][0] === '_') { + throw({ + forbidden: + 'No system roles (starting with underscore) in users db.' + }); + } + } + + // no system names as names + if (newDoc.name[0] === '_') { + throw({forbidden: 'Username may not start with underscore.'}); + } + } +">>). + + +-define(REP_DB_DOC_VALIDATE_FUN, <<" + function(newDoc, oldDoc, userCtx) { + function reportError(error_msg) { + log('Error writing document `' + newDoc._id + + '\\' to the replicator database: ' + error_msg); + throw({forbidden: error_msg}); + } + + function validateEndpoint(endpoint, fieldName) { + if ((typeof endpoint !== 'string') && + ((typeof endpoint !== 'object') || (endpoint === null))) { + + reportError('The `' + fieldName + '\\' property must exist' + + ' and be either a string or an object.'); + } + + if (typeof endpoint === 'object') { + if ((typeof endpoint.url !== 'string') || !endpoint.url) { + reportError('The url property must exist in the `' + + fieldName + '\\' field and must be a non-empty string.'); + } + + if ((typeof endpoint.auth !== 'undefined') && + ((typeof endpoint.auth !== 'object') || + endpoint.auth === null)) { + + reportError('`' + fieldName + + '.auth\\' must be a non-null object.'); + } + + if ((typeof endpoint.headers !== 'undefined') && + ((typeof endpoint.headers !== 'object') || + endpoint.headers === null)) { + + reportError('`' + fieldName + + '.headers\\' must be a non-null object.'); + } + } + } + + var isReplicator = (userCtx.roles.indexOf('_replicator') >= 0); + var isAdmin = (userCtx.roles.indexOf('_admin') >= 0); + + if (oldDoc && !newDoc._deleted && !isReplicator && + (oldDoc._replication_state === 'triggered')) { + reportError('Only the replicator can edit replication documents ' + + 'that are in the triggered state.'); + } + + if (!newDoc._deleted) { + validateEndpoint(newDoc.source, 'source'); + validateEndpoint(newDoc.target, 'target'); + + if ((typeof newDoc.create_target !== 'undefined') && + (typeof newDoc.create_target !== 'boolean')) { + + reportError('The `create_target\\' field must be a boolean.'); + } + + if ((typeof newDoc.continuous !== 'undefined') && + (typeof newDoc.continuous !== 'boolean')) { + + reportError('The `continuous\\' field must be a boolean.'); + } + + if ((typeof newDoc.doc_ids !== 'undefined') && + !isArray(newDoc.doc_ids)) { + + reportError('The `doc_ids\\' field must be an array of strings.'); + } + + if ((typeof newDoc.filter !== 'undefined') && + ((typeof newDoc.filter !== 'string') || !newDoc.filter)) { + + reportError('The `filter\\' field must be a non-empty string.'); + } + + if ((typeof newDoc.query_params !== 'undefined') && + ((typeof newDoc.query_params !== 'object') || + newDoc.query_params === null)) { + + reportError('The `query_params\\' field must be an object.'); + } + + if (newDoc.user_ctx) { + var user_ctx = newDoc.user_ctx; + + if ((typeof user_ctx !== 'object') || (user_ctx === null)) { + reportError('The `user_ctx\\' property must be a ' + + 'non-null object.'); + } + + if (!(user_ctx.name === null || + (typeof user_ctx.name === 'undefined') || + ((typeof user_ctx.name === 'string') && + user_ctx.name.length > 0))) { + + reportError('The `user_ctx.name\\' property must be a ' + + 'non-empty string or null.'); + } + + if (!isAdmin && (user_ctx.name !== userCtx.name)) { + reportError('The given `user_ctx.name\\' is not valid'); + } + + if (user_ctx.roles && !isArray(user_ctx.roles)) { + reportError('The `user_ctx.roles\\' property must be ' + + 'an array of strings.'); + } + + if (!isAdmin && user_ctx.roles) { + for (var i = 0; i < user_ctx.roles.length; i++) { + var role = user_ctx.roles[i]; + + if (typeof role !== 'string' || role.length === 0) { + reportError('Roles must be non-empty strings.'); + } + if (userCtx.roles.indexOf(role) === -1) { + reportError('Invalid role (`' + role + + '\\') in the `user_ctx\\''); + } + } + } + } else { + if (!isAdmin) { + reportError('The `user_ctx\\' property is missing (it is ' + + 'optional for admins only).'); + } + } + } else { + if (!isAdmin) { + if (!oldDoc.user_ctx || (oldDoc.user_ctx.name !== userCtx.name)) { + reportError('Replication documents can only be deleted by ' + + 'admins or by the users who created them.'); + } + } + } + } +">>). diff --git a/apps/couch/src/couch_js_functions.hrl b/apps/couch/src/couch_js_functions.hrl deleted file mode 100644 index d07eead5..00000000 --- a/apps/couch/src/couch_js_functions.hrl +++ /dev/null @@ -1,238 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --define(AUTH_DB_DOC_VALIDATE_FUNCTION, <<" - function(newDoc, oldDoc, userCtx) { - if (newDoc._deleted === true) { - // allow deletes by admins and matching users - // without checking the other fields - if ((userCtx.roles.indexOf('_admin') !== -1) || - (userCtx.name == oldDoc.name)) { - return; - } else { - throw({forbidden: 'Only admins may delete other user docs.'}); - } - } - - if ((oldDoc && oldDoc.type !== 'user') || newDoc.type !== 'user') { - throw({forbidden : 'doc.type must be user'}); - } // we only allow user docs for now - - if (!newDoc.name) { - throw({forbidden: 'doc.name is required'}); - } - - if (newDoc.roles && !isArray(newDoc.roles)) { - throw({forbidden: 'doc.roles must be an array'}); - } - - if (newDoc._id !== ('org.couchdb.user:' + newDoc.name)) { - throw({ - forbidden: 'Doc ID must be of the form org.couchdb.user:name' - }); - } - - if (oldDoc) { // validate all updates - if (oldDoc.name !== newDoc.name) { - throw({forbidden: 'Usernames can not be changed.'}); - } - } - - if (newDoc.password_sha && !newDoc.salt) { - throw({ - forbidden: 'Users with password_sha must have a salt.' + - 'See /_utils/script/couch.js for example code.' - }); - } - - if (userCtx.roles.indexOf('_admin') === -1) { - if (oldDoc) { // validate non-admin updates - if (userCtx.name !== newDoc.name) { - throw({ - forbidden: 'You may only update your own user document.' - }); - } - // validate role updates - var oldRoles = oldDoc.roles.sort(); - var newRoles = newDoc.roles.sort(); - - if (oldRoles.length !== newRoles.length) { - throw({forbidden: 'Only _admin may edit roles'}); - } - - for (var i = 0; i < oldRoles.length; i++) { - if (oldRoles[i] !== newRoles[i]) { - throw({forbidden: 'Only _admin may edit roles'}); - } - } - } else if (newDoc.roles.length > 0) { - throw({forbidden: 'Only _admin may set roles'}); - } - } - - // no system roles in users db - for (var i = 0; i < newDoc.roles.length; i++) { - if (newDoc.roles[i][0] === '_') { - throw({ - forbidden: - 'No system roles (starting with underscore) in users db.' - }); - } - } - - // no system names as names - if (newDoc.name[0] === '_') { - throw({forbidden: 'Username may not start with underscore.'}); - } - } -">>). - - --define(REP_DB_DOC_VALIDATE_FUN, <<" - function(newDoc, oldDoc, userCtx) { - function reportError(error_msg) { - log('Error writing document `' + newDoc._id + - '\\' to the replicator database: ' + error_msg); - throw({forbidden: error_msg}); - } - - function validateEndpoint(endpoint, fieldName) { - if ((typeof endpoint !== 'string') && - ((typeof endpoint !== 'object') || (endpoint === null))) { - - reportError('The `' + fieldName + '\\' property must exist' + - ' and be either a string or an object.'); - } - - if (typeof endpoint === 'object') { - if ((typeof endpoint.url !== 'string') || !endpoint.url) { - reportError('The url property must exist in the `' + - fieldName + '\\' field and must be a non-empty string.'); - } - - if ((typeof endpoint.auth !== 'undefined') && - ((typeof endpoint.auth !== 'object') || - endpoint.auth === null)) { - - reportError('`' + fieldName + - '.auth\\' must be a non-null object.'); - } - - if ((typeof endpoint.headers !== 'undefined') && - ((typeof endpoint.headers !== 'object') || - endpoint.headers === null)) { - - reportError('`' + fieldName + - '.headers\\' must be a non-null object.'); - } - } - } - - var isReplicator = (userCtx.roles.indexOf('_replicator') >= 0); - var isAdmin = (userCtx.roles.indexOf('_admin') >= 0); - - if (oldDoc && !newDoc._deleted && !isReplicator && - (oldDoc._replication_state === 'triggered')) { - reportError('Only the replicator can edit replication documents ' + - 'that are in the triggered state.'); - } - - if (!newDoc._deleted) { - validateEndpoint(newDoc.source, 'source'); - validateEndpoint(newDoc.target, 'target'); - - if ((typeof newDoc.create_target !== 'undefined') && - (typeof newDoc.create_target !== 'boolean')) { - - reportError('The `create_target\\' field must be a boolean.'); - } - - if ((typeof newDoc.continuous !== 'undefined') && - (typeof newDoc.continuous !== 'boolean')) { - - reportError('The `continuous\\' field must be a boolean.'); - } - - if ((typeof newDoc.doc_ids !== 'undefined') && - !isArray(newDoc.doc_ids)) { - - reportError('The `doc_ids\\' field must be an array of strings.'); - } - - if ((typeof newDoc.filter !== 'undefined') && - ((typeof newDoc.filter !== 'string') || !newDoc.filter)) { - - reportError('The `filter\\' field must be a non-empty string.'); - } - - if ((typeof newDoc.query_params !== 'undefined') && - ((typeof newDoc.query_params !== 'object') || - newDoc.query_params === null)) { - - reportError('The `query_params\\' field must be an object.'); - } - - if (newDoc.user_ctx) { - var user_ctx = newDoc.user_ctx; - - if ((typeof user_ctx !== 'object') || (user_ctx === null)) { - reportError('The `user_ctx\\' property must be a ' + - 'non-null object.'); - } - - if (!(user_ctx.name === null || - (typeof user_ctx.name === 'undefined') || - ((typeof user_ctx.name === 'string') && - user_ctx.name.length > 0))) { - - reportError('The `user_ctx.name\\' property must be a ' + - 'non-empty string or null.'); - } - - if (!isAdmin && (user_ctx.name !== userCtx.name)) { - reportError('The given `user_ctx.name\\' is not valid'); - } - - if (user_ctx.roles && !isArray(user_ctx.roles)) { - reportError('The `user_ctx.roles\\' property must be ' + - 'an array of strings.'); - } - - if (!isAdmin && user_ctx.roles) { - for (var i = 0; i < user_ctx.roles.length; i++) { - var role = user_ctx.roles[i]; - - if (typeof role !== 'string' || role.length === 0) { - reportError('Roles must be non-empty strings.'); - } - if (userCtx.roles.indexOf(role) === -1) { - reportError('Invalid role (`' + role + - '\\') in the `user_ctx\\''); - } - } - } - } else { - if (!isAdmin) { - reportError('The `user_ctx\\' property is missing (it is ' + - 'optional for admins only).'); - } - } - } else { - if (!isAdmin) { - if (!oldDoc.user_ctx || (oldDoc.user_ctx.name !== userCtx.name)) { - reportError('Replication documents can only be deleted by ' + - 'admins or by the users who created them.'); - } - } - } - } -">>). -- cgit v1.2.3 From faf9071260147275bbac1633b599e85b4a302e8b Mon Sep 17 00:00:00 2001 From: Robert Newson Date: Thu, 14 Jul 2011 11:25:58 +0100 Subject: allow replication callback module to be chosen at runtime. --- apps/couch/src/couch_rep.erl | 31 +++++++++++++++------------- apps/couch/src/couch_replication_manager.erl | 2 +- 2 files changed, 18 insertions(+), 15 deletions(-) (limited to 'apps') diff --git a/apps/couch/src/couch_rep.erl b/apps/couch/src/couch_rep.erl index eabef37b..2d011aab 100644 --- a/apps/couch/src/couch_rep.erl +++ b/apps/couch/src/couch_rep.erl @@ -15,9 +15,9 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([replicate/2, checkpoint/1]). +-export([replicate/2, replicate/3, checkpoint/1]). -export([make_replication_id/2]). --export([start_replication/3, end_replication/1, get_result/4]). +-export([start_replication/4, end_replication/1, get_result/4]). -include("couch_db.hrl"). -include_lib("ibrowse/include/ibrowse.hrl"). @@ -65,13 +65,16 @@ replicate(Source, Target) when is_binary(Source), is_binary(Target) -> replicate({[{<<"source">>, Source}, {<<"target">>, Target}]}, #user_ctx{}); %% function handling POST to _replicate -replicate({Props}=PostBody, UserCtx) -> +replicate(PostBody, UserCtx) -> + replicate(PostBody, UserCtx, couch_replication_manager). + +replicate({Props}=PostBody, UserCtx, Module) -> RepId = make_replication_id(PostBody, UserCtx), case couch_util:get_value(<<"cancel">>, Props, false) of true -> end_replication(RepId); false -> - Server = start_replication(PostBody, RepId, UserCtx), + Server = start_replication(PostBody, RepId, UserCtx, Module), get_result(Server, RepId, PostBody, UserCtx) end. @@ -91,11 +94,11 @@ end_replication({BaseId, Extension}) -> end end. -start_replication(RepDoc, {BaseId, Extension} = RepId, UserCtx) -> +start_replication(RepDoc, {BaseId, Extension} = RepId, UserCtx, Module) -> Replicator = { BaseId ++ Extension, {gen_server, start_link, - [?MODULE, [RepId, RepDoc, UserCtx], []]}, + [?MODULE, [RepId, RepDoc, UserCtx, Module], []]}, temporary, 1, worker, @@ -132,7 +135,7 @@ init(InitArgs) -> {stop, Error} end. -do_init([{BaseId, _Ext} = RepId, {PostProps}, UserCtx] = InitArgs) -> +do_init([{BaseId, _Ext} = RepId, {PostProps}, UserCtx, Module] = InitArgs) -> process_flag(trap_exit, true), SourceProps = couch_util:get_value(<<"source">>, PostProps), @@ -173,7 +176,7 @@ do_init([{BaseId, _Ext} = RepId, {PostProps}, UserCtx] = InitArgs) -> couch_task_status:add_task("Replication", io_lib:format("~s: ~s -> ~s", [ShortId, dbname(Source), dbname(Target)]), "Starting"), - couch_replication_manager:replication_started(RepId), + Module:replication_started(RepId), State = #state{ changes_feed = ChangesFeed, @@ -268,24 +271,24 @@ handle_info({'EXIT', _Pid, {Err, Reason}}, State) when Err == source_error; handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}. -terminate(normal, #state{checkpoint_scheduled=nil, init_args=[RepId | _]} = State) -> +terminate(normal, #state{checkpoint_scheduled=nil, init_args=[RepId, _, _, Module]} = State) -> do_terminate(State), - couch_replication_manager:replication_completed(RepId); + Module:replication_completed(RepId); -terminate(normal, #state{init_args=[RepId | _]} = State) -> +terminate(normal, #state{init_args=[RepId, _, _, Module]} = State) -> timer:cancel(State#state.checkpoint_scheduled), do_terminate(do_checkpoint(State)), - couch_replication_manager:replication_completed(RepId); + Module:replication_completed(RepId); terminate(shutdown, #state{listeners = Listeners} = State) -> % continuous replication stopped [gen_server:reply(L, {ok, stopped}) || L <- Listeners], terminate_cleanup(State); -terminate(Reason, #state{listeners = Listeners, init_args=[RepId | _]} = State) -> +terminate(Reason, #state{listeners = Listeners, init_args=[RepId, _, _, Module]} = State) -> [gen_server:reply(L, {error, Reason}) || L <- Listeners], terminate_cleanup(State), - couch_replication_manager:replication_error(RepId, Reason). + Module:replication_error(RepId, Reason). code_change(_OldVsn, State, _Extra) -> {ok, State}. diff --git a/apps/couch/src/couch_replication_manager.erl b/apps/couch/src/couch_replication_manager.erl index b3c6db11..3f7cc27c 100644 --- a/apps/couch/src/couch_replication_manager.erl +++ b/apps/couch/src/couch_replication_manager.erl @@ -408,7 +408,7 @@ maybe_tag_rep_doc(DocId, {RepProps}, RepId) -> start_replication(Server, RepDoc, RepId, UserCtx, Wait) -> ok = timer:sleep(Wait * 1000), - case (catch couch_rep:start_replication(RepDoc, RepId, UserCtx)) of + case (catch couch_rep:start_replication(RepDoc, RepId, UserCtx, ?MODULE)) of Pid when is_pid(Pid) -> ok = gen_server:call(Server, {rep_started, RepId}, infinity), couch_rep:get_result(Pid, RepId, RepDoc, UserCtx); -- cgit v1.2.3 From c78b19296031168aef4142fbe409a0e7d2edda82 Mon Sep 17 00:00:00 2001 From: Robert Newson Date: Mon, 25 Jul 2011 21:25:40 +0000 Subject: assert that calls to file functions actually succeed. 1) couch_file:sync could leave open fd's if close failed. Now we'll get a trace. 2) couch_file:append_term failing would be bad, so let's test that too. backported from trunk r1150915 git-svn-id: https://svn.apache.org/repos/asf/couchdb/branches/1.1.x@1150918 13f79535-47bb-0310-9956-ffa450edef68 --- apps/couch/src/couch_db_updater.erl | 6 +++--- apps/couch/src/couch_file.erl | 2 +- apps/couch/src/couch_server_sup.erl | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) (limited to 'apps') diff --git a/apps/couch/src/couch_db_updater.erl b/apps/couch/src/couch_db_updater.erl index 179e1f35..9bf52ee0 100644 --- a/apps/couch/src/couch_db_updater.erl +++ b/apps/couch/src/couch_db_updater.erl @@ -52,7 +52,7 @@ init({DbName, Filepath, Fd, Options}) -> terminate(_Reason, Db) -> - couch_file:close(Db#db.fd), + ok = couch_file:close(Db#db.fd), couch_util:shutdown_sync(Db#db.compactor_pid), couch_util:shutdown_sync(Db#db.fd), ok. @@ -513,9 +513,9 @@ flush_trees(#db{fd=Fd,header=Header}=Db, {ok, NewSummaryPointer} = case Header#db_header.disk_version < 4 of true -> - couch_file:append_term(Fd, {Doc#doc.body, DiskAtts}); + {ok, _} = couch_file:append_term(Fd, {Doc#doc.body, DiskAtts}); false -> - couch_file:append_term_md5(Fd, {Doc#doc.body, DiskAtts}) + {ok, _} = couch_file:append_term_md5(Fd, {Doc#doc.body, DiskAtts}) end, #leaf{ deleted = IsDeleted, diff --git a/apps/couch/src/couch_file.erl b/apps/couch/src/couch_file.erl index 0136e877..dfc1f822 100644 --- a/apps/couch/src/couch_file.erl +++ b/apps/couch/src/couch_file.erl @@ -164,7 +164,7 @@ truncate(Fd, Pos) -> sync(Filepath) when is_list(Filepath) -> {ok, Fd} = file:open(Filepath, [append, raw]), - try file:sync(Fd) after file:close(Fd) end; + try ok = file:sync(Fd) after ok = file:close(Fd) end; sync(Fd) -> gen_server:call(Fd, sync, infinity). diff --git a/apps/couch/src/couch_server_sup.erl b/apps/couch/src/couch_server_sup.erl index bc1e6036..726e397f 100644 --- a/apps/couch/src/couch_server_sup.erl +++ b/apps/couch/src/couch_server_sup.erl @@ -120,7 +120,7 @@ start_server(IniFiles) -> undefined -> []; Uri -> io_lib:format("~s~n", [Uri]) end end || Uri <- Uris], - file:write_file(UriFile, Lines) + ok = file:write_file(UriFile, Lines) end, {ok, Pid}. -- cgit v1.2.3