summaryrefslogtreecommitdiff
path: root/apps/couch/src/couch_replication_manager.erl
diff options
context:
space:
mode:
Diffstat (limited to 'apps/couch/src/couch_replication_manager.erl')
-rw-r--r--apps/couch/src/couch_replication_manager.erl41
1 files changed, 37 insertions, 4 deletions
diff --git a/apps/couch/src/couch_replication_manager.erl b/apps/couch/src/couch_replication_manager.erl
index 6537c8b2..e3d97c37 100644
--- a/apps/couch/src/couch_replication_manager.erl
+++ b/apps/couch/src/couch_replication_manager.erl
@@ -32,7 +32,8 @@
-import(couch_util, [
get_value/2,
- get_value/3
+ get_value/3,
+ to_binary/1
]).
@@ -62,8 +63,16 @@ init(_) ->
}}.
-handle_call({rep_db_update, Change}, _From, State) ->
- {reply, ok, process_update(State, Change)};
+handle_call({rep_db_update, {ChangeProps} = Change}, _From, State) ->
+ NewState = try
+ process_update(State, Change)
+ catch
+ _Tag:Error ->
+ JsonRepDoc = get_value(doc, ChangeProps),
+ rep_db_update_error(Error, JsonRepDoc),
+ State
+ end,
+ {reply, ok, NewState};
handle_call({triggered, {BaseId, _}}, _From, State) ->
[{BaseId, {DocId, true}}] = ets:lookup(?REP_ID_TO_DOC_ID, BaseId),
@@ -250,6 +259,19 @@ process_update(State, {Change}) ->
end.
+rep_db_update_error(Error, {Props} = JsonRepDoc) ->
+ case Error of
+ {bad_rep_doc, Reason} ->
+ ok;
+ _ ->
+ Reason = to_binary(Error)
+ end,
+ ?LOG_ERROR("Replication manager, error processing document `~s`: ~s",
+ [get_value(<<"_id">>, Props), Reason]),
+ couch_rep:update_rep_doc(
+ JsonRepDoc, [{<<"_replication_state">>, <<"error">>}]).
+
+
rep_user_ctx({RepDoc}) ->
case get_value(<<"user_ctx">>, RepDoc) of
undefined ->
@@ -265,7 +287,7 @@ rep_user_ctx({RepDoc}) ->
maybe_start_replication(#state{max_retries = MaxRetries} = State,
DocId, JsonRepDoc) ->
UserCtx = rep_user_ctx(JsonRepDoc),
- {BaseId, _} = RepId = couch_rep:make_replication_id(JsonRepDoc, UserCtx),
+ {BaseId, _} = RepId = make_rep_id(JsonRepDoc, UserCtx),
case ets:lookup(?REP_ID_TO_DOC_ID, BaseId) of
[] ->
true = ets:insert(?REP_ID_TO_DOC_ID, {BaseId, {DocId, true}}),
@@ -290,6 +312,17 @@ maybe_start_replication(#state{max_retries = MaxRetries} = State,
end.
+make_rep_id(RepDoc, UserCtx) ->
+ try
+ couch_rep:make_replication_id(RepDoc, UserCtx)
+ catch
+ throw:{error, Reason} ->
+ throw({bad_rep_doc, Reason});
+ Tag:Err ->
+ throw({bad_rep_doc, to_binary({Tag, Err})})
+ end.
+
+
maybe_tag_rep_doc({Props} = JsonRepDoc, RepId) ->
case get_value(<<"_replication_id">>, Props) of
RepId ->