summaryrefslogtreecommitdiff
path: root/src/couchdb
diff options
context:
space:
mode:
Diffstat (limited to 'src/couchdb')
-rw-r--r--src/couchdb/couch_rep.erl45
-rw-r--r--src/couchdb/couch_replication_manager.erl41
2 files changed, 75 insertions, 11 deletions
diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl
index 6e4295ea..9d86e7a5 100644
--- a/src/couchdb/couch_rep.erl
+++ b/src/couchdb/couch_rep.erl
@@ -486,7 +486,17 @@ make_replication_id(RepProps, UserCtx) ->
% add a new clause and increase ?REP_ID_VERSION at the top
make_replication_id({Props}, UserCtx, 2) ->
{ok, HostName} = inet:gethostname(),
- Port = mochiweb_socket_server:get(couch_httpd, port),
+ Port = case (catch mochiweb_socket_server:get(couch_httpd, port)) of
+ P when is_number(P) ->
+ P;
+ _ ->
+ % On restart we might be called before the couch_httpd process is
+ % started.
+ % TODO: we might be under an SSL socket server only, or both under
+ % SSL and a non-SSL socket.
+ % ... mochiweb_socket_server:get(https, port)
+ list_to_integer(couch_config:get("httpd", "port", "5984"))
+ end,
Src = get_rep_endpoint(UserCtx, couch_util:get_value(<<"source">>, Props)),
Tgt = get_rep_endpoint(UserCtx, couch_util:get_value(<<"target">>, Props)),
maybe_append_filters({Props}, [HostName, Port, Src, Tgt], UserCtx);
@@ -513,16 +523,37 @@ maybe_append_filters({Props}, Base, UserCtx) ->
couch_util:to_hex(couch_util:md5(term_to_binary(Base2))).
filter_code(Filter, Props, UserCtx) ->
- {match, [DDocName, FilterName]} =
- re:run(Filter, "(.*?)/(.*)", [{capture, [1, 2], binary}]),
+ {DDocName, FilterName} =
+ case re:run(Filter, "(.*?)/(.*)", [{capture, [1, 2], binary}]) of
+ {match, [DDocName0, FilterName0]} ->
+ {DDocName0, FilterName0};
+ _ ->
+ throw({error, <<"Invalid filter. Must match `ddocname/filtername`.">>})
+ end,
ProxyParams = parse_proxy_params(
couch_util:get_value(<<"proxy">>, Props, [])),
- Source = open_db(
- couch_util:get_value(<<"source">>, Props), UserCtx, ProxyParams),
+ DbName = couch_util:get_value(<<"source">>, Props),
+ Source = try
+ open_db(DbName, UserCtx, ProxyParams)
+ catch
+ _Tag:DbError ->
+ DbErrorMsg = io_lib:format("Could not open source database `~s`: ~s",
+ [couch_util:url_strip_password(DbName), couch_util:to_binary(DbError)]),
+ throw({error, iolist_to_binary(DbErrorMsg)})
+ end,
try
- {ok, DDoc} = open_doc(Source, <<"_design/", DDocName/binary>>),
+ Body = case (catch open_doc(Source, <<"_design/", DDocName/binary>>)) of
+ {ok, #doc{body = Body0}} ->
+ Body0;
+ DocError ->
+ DocErrorMsg = io_lib:format(
+ "Couldn't open document `_design/~s` from source "
+ "database `~s`: ~s",
+ [dbname(Source), DDocName, couch_util:to_binary(DocError)]),
+ throw({error, iolist_to_binary(DocErrorMsg)})
+ end,
Code = couch_util:get_nested_json_value(
- DDoc#doc.body, [<<"filters">>, FilterName]),
+ Body, [<<"filters">>, FilterName]),
re:replace(Code, "^\s*(.*?)\s*$", "\\1", [{return, binary}])
after
close_db(Source)
diff --git a/src/couchdb/couch_replication_manager.erl b/src/couchdb/couch_replication_manager.erl
index 6537c8b2..e3d97c37 100644
--- a/src/couchdb/couch_replication_manager.erl
+++ b/src/couchdb/couch_replication_manager.erl
@@ -32,7 +32,8 @@
-import(couch_util, [
get_value/2,
- get_value/3
+ get_value/3,
+ to_binary/1
]).
@@ -62,8 +63,16 @@ init(_) ->
}}.
-handle_call({rep_db_update, Change}, _From, State) ->
- {reply, ok, process_update(State, Change)};
+handle_call({rep_db_update, {ChangeProps} = Change}, _From, State) ->
+ NewState = try
+ process_update(State, Change)
+ catch
+ _Tag:Error ->
+ JsonRepDoc = get_value(doc, ChangeProps),
+ rep_db_update_error(Error, JsonRepDoc),
+ State
+ end,
+ {reply, ok, NewState};
handle_call({triggered, {BaseId, _}}, _From, State) ->
[{BaseId, {DocId, true}}] = ets:lookup(?REP_ID_TO_DOC_ID, BaseId),
@@ -250,6 +259,19 @@ process_update(State, {Change}) ->
end.
+rep_db_update_error(Error, {Props} = JsonRepDoc) ->
+ case Error of
+ {bad_rep_doc, Reason} ->
+ ok;
+ _ ->
+ Reason = to_binary(Error)
+ end,
+ ?LOG_ERROR("Replication manager, error processing document `~s`: ~s",
+ [get_value(<<"_id">>, Props), Reason]),
+ couch_rep:update_rep_doc(
+ JsonRepDoc, [{<<"_replication_state">>, <<"error">>}]).
+
+
rep_user_ctx({RepDoc}) ->
case get_value(<<"user_ctx">>, RepDoc) of
undefined ->
@@ -265,7 +287,7 @@ rep_user_ctx({RepDoc}) ->
maybe_start_replication(#state{max_retries = MaxRetries} = State,
DocId, JsonRepDoc) ->
UserCtx = rep_user_ctx(JsonRepDoc),
- {BaseId, _} = RepId = couch_rep:make_replication_id(JsonRepDoc, UserCtx),
+ {BaseId, _} = RepId = make_rep_id(JsonRepDoc, UserCtx),
case ets:lookup(?REP_ID_TO_DOC_ID, BaseId) of
[] ->
true = ets:insert(?REP_ID_TO_DOC_ID, {BaseId, {DocId, true}}),
@@ -290,6 +312,17 @@ maybe_start_replication(#state{max_retries = MaxRetries} = State,
end.
+make_rep_id(RepDoc, UserCtx) ->
+ try
+ couch_rep:make_replication_id(RepDoc, UserCtx)
+ catch
+ throw:{error, Reason} ->
+ throw({bad_rep_doc, Reason});
+ Tag:Err ->
+ throw({bad_rep_doc, to_binary({Tag, Err})})
+ end.
+
+
maybe_tag_rep_doc({Props} = JsonRepDoc, RepId) ->
case get_value(<<"_replication_id">>, Props) of
RepId ->