path: root/src/couchdb/couch_rep.erl
diff options
authorAdam Kocoloski <>2009-03-08 18:18:08 +0000
committerAdam Kocoloski <>2009-03-08 18:18:08 +0000
commitdd79e85bc9e4849df904498ca25ec56304440b5f (patch)
tree6196200629dd9ee5e4acc4655f1b52a8fa22116d /src/couchdb/couch_rep.erl
parentb4d54393f6a549b0d2c057b6646727fb99b15904 (diff)
beefier fault tolerance in the replicator
- trap exits (enumerator and attachment streamers are linked) - retry by respawning enumerator with last known good source seq - checkpoint replication record on every flush of document buffer - reformat nicer error messages to listeners if we need to exit git-svn-id: 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src/couchdb/couch_rep.erl')
1 files changed, 121 insertions, 70 deletions
diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl
index 3df2d821..89d40be3 100644
--- a/src/couchdb/couch_rep.erl
+++ b/src/couchdb/couch_rep.erl
@@ -91,6 +91,8 @@ replicate(Source, Target, Options) ->
init([Source, Target, Options]) ->
+ process_flag(trap_exit, true),
{ok, DbSrc} =
open_db(Source, proplists:get_value(source_options, Options, [])),
{ok, DbTgt} =
@@ -182,6 +184,8 @@ handle_call(get_result, From, #state{listeners=L} = State) ->
handle_call({replicate_doc, {Id, Revs}}, {Pid,_}, #state{enum_pid=Pid} = State) ->
+ context = Context,
+ current_seq = Seq,
docs_buffer = Buffer,
source = Source,
target = Target,
@@ -196,17 +200,18 @@ handle_call({replicate_doc, {Id, Revs}}, {Pid,_}, #state{enum_pid=Pid} = State)
ets:update_counter(Stats, docs_read, length(Docs)),
%% save them (maybe in a buffer)
- NewBuffer = case couch_util:should_flush() of
+ {NewBuffer, NewContext} = case couch_util:should_flush() of
true ->
Docs2 = lists:flatten([Docs|Buffer]),
ok = update_docs(Target, Docs2, [], false),
ets:update_counter(Stats, docs_written, length(Docs2)),
- [];
+ {ok, _, Ctxt} = do_checkpoint(Source, Target, Context, Seq, Stats),
+ {[], Ctxt};
false ->
- [Docs | Buffer]
+ {[Docs | Buffer], Context}
- {reply, ok, State#state{docs_buffer=NewBuffer}};
+ {reply, ok, State#state{context=NewContext, docs_buffer=NewBuffer}};
handle_call({fin, {LastSeq, RevsCount}}, {Pid,_}, #state{enum_pid=Pid} = State) ->
ets:update_counter(State#state.stats, total_revs, RevsCount),
@@ -216,6 +221,26 @@ handle_cast({increment_update_seq, Seq}, State) ->
couch_task_status:update("Processed source update #~p", [Seq]),
{noreply, State#state{current_seq=Seq}}.
+handle_info({'EXIT', Pid, Reason}, #state{enum_pid=Pid} = State) ->
+ ?LOG_ERROR("replication enumerator exited with ~p .. respawning", [Reason]),
+ #state{
+ current_seq = Seq,
+ source = Src,
+ target = Tgt,
+ enum_pid = Pid
+ } = State,
+ Parent = self(),
+ NewPid = spawn_link(fun() -> enum_docs_since(Parent,Src,Tgt,{Seq,0}) end),
+ {noreply, State#state{enum_pid=NewPid}};
+%% if any linked process dies, respawn the enumerator to get things going again
+handle_info({'EXIT', _From, normal}, State) ->
+ {noreply, State};
+handle_info({'EXIT', From, Reason}, #state{enum_pid=EnumPid} = State) ->
+ ?LOG_ERROR("replicator-linked pid ~p exited with ~p", [From, Reason]),
+ exit(EnumPid, pls_restart_kthxbye),
+ {noreply, State};
handle_info(_Msg, State) ->
{noreply, State}.
@@ -235,22 +260,8 @@ terminate(normal, State) ->
- %% format replication history
- JsonStats = [
- {<<"missing_checked">>, ets:lookup_element(Stats, total_revs, 2)},
- {<<"missing_found">>, ets:lookup_element(Stats, missing_revs, 2)},
- {<<"docs_read">>, ets:lookup_element(Stats, docs_read, 2)},
- {<<"docs_written">>, ets:lookup_element(Stats, docs_written, 2)}
- ],
+ {ok, NewRepHistory, _} = do_checkpoint(Source, Target, Context, Seq, Stats),
- {ok, NewRepHistory} = finalize_response(Source, Target, Context, Seq, JsonStats),
- %% update local documents
- RepRecSrc = proplists:get_value(src_record, Context),
- RepRecTgt = proplists:get_value(tgt_record, Context),
- {ok, _} = update_local_doc(Source, RepRecSrc#doc{body=NewRepHistory}, []),
- {ok, _} = update_local_doc(Target, RepRecTgt#doc{body=NewRepHistory}, []),
%% reply to original requester
@@ -268,7 +279,10 @@ terminate(normal, State) ->
terminate(Reason, State) ->
+ ?LOG_ERROR("replicator terminating with reason ~p", [Reason]),
+ context = Context,
+ current_seq = Seq,
listeners = Listeners,
source = Source,
target = Target,
@@ -277,6 +291,8 @@ terminate(Reason, State) ->
[gen_server:reply(L, {error, Reason}) || L <- Listeners],
+ {ok, _, _} = do_checkpoint(Source, Target, Context, Seq, Stats),
@@ -300,6 +316,9 @@ attachment_loop(ReqId) ->
{ibrowse_async_response, ReqId, chunk_end} ->
+ {ibrowse_async_response, ReqId, {error, Err}} ->
+ ?LOG_ERROR("streaming attachment failed with ~p", [Err]),
+ exit(attachment_request_failed);
{ibrowse_async_response, ReqId, Data} ->
receive {From, gimme_data} -> From ! {self(), Data} end,
@@ -317,7 +336,10 @@ attachment_stub_converter(DbS, Id, {Name, {stub, Type, Length}}) ->
%% make the async request
Options = [{stream_to, Pid}, {response_format, binary}],
- {ibrowse_req_id, ReqId} = ibrowse:send_req(Url, Headers, get, [], Options),
+ ReqId = case ibrowse:send_req(Url, Headers, get, [], Options, infinity) of
+ {ibrowse_req_id, X} -> X;
+ {error, _Reason} -> exit(attachment_request_failed)
+ end,
%% tell our receiver about the ReqId it needs to look for
Pid ! {self(), {set_req_id, ReqId}},
@@ -337,6 +359,82 @@ close_db(#http_db{})->
+do_checkpoint(Source, Target, Context, NewSeqNum, Stats) ->
+ ?LOG_INFO("recording a checkpoint at source update_seq ~p", [NewSeqNum]),
+ [
+ {start_seq, SeqNum},
+ {history, OldRepHistoryProps},
+ {rep_starttime, ReplicationStartTime},
+ {src_starttime, SrcInstanceStartTime},
+ {tgt_starttime, TgtInstanceStartTime},
+ {src_record, RepRecSrc},
+ {tgt_record, RepRecTgt}
+ ] = Context,
+ NewHistory = case NewSeqNum == SeqNum andalso OldRepHistoryProps /= [] of
+ true ->
+ % nothing changed, don't record results
+ {OldRepHistoryProps};
+ false ->
+ % commit changes to both src and tgt. The src because if changes
+ % we replicated are lost, we'll record the a seq number of ahead
+ % of what was committed and therefore lose future changes with the
+ % same seq nums.
+ {ok, SrcInstanceStartTime2} = ensure_full_commit(Source),
+ {ok, TgtInstanceStartTime2} = ensure_full_commit(Target),
+ RecordSeqNum =
+ if SrcInstanceStartTime2 == SrcInstanceStartTime andalso
+ TgtInstanceStartTime2 == TgtInstanceStartTime ->
+ NewSeqNum;
+ true ->
+ ?LOG_INFO("A server has restarted sinced replication start. "
+ "Not recording the new sequence number to ensure the "
+ "replication is redone and documents reexamined.", []),
+ SeqNum
+ end,
+ %% format replication history
+ JsonStats = [
+ {<<"missing_checked">>, ets:lookup_element(Stats, total_revs, 2)},
+ {<<"missing_found">>, ets:lookup_element(Stats, missing_revs, 2)},
+ {<<"docs_read">>, ets:lookup_element(Stats, docs_read, 2)},
+ {<<"docs_written">>, ets:lookup_element(Stats, docs_written, 2)}
+ ],
+ HistEntries =[
+ {
+ [{<<"start_time">>, list_to_binary(ReplicationStartTime)},
+ {<<"end_time">>, list_to_binary(httpd_util:rfc1123_date())},
+ {<<"start_last_seq">>, SeqNum},
+ {<<"end_last_seq">>, NewSeqNum} | JsonStats]}
+ | proplists:get_value(<<"history">>, OldRepHistoryProps, [])],
+ % something changed, record results
+ {[
+ {<<"session_id">>, couch_util:new_uuid()},
+ {<<"source_last_seq">>, RecordSeqNum},
+ {<<"history">>, lists:sublist(HistEntries, 50)}
+ ]}
+ end,
+ %% update local documents
+ RepRecSrc = proplists:get_value(src_record, Context),
+ RepRecTgt = proplists:get_value(tgt_record, Context),
+ {ok, TgtRev} = update_local_doc(Target, RepRecTgt#doc{body=NewHistory}, []),
+ {ok, SrcRev} = update_local_doc(Source, RepRecSrc#doc{body=NewHistory}, []),
+ NewContext = [
+ {start_seq, SeqNum},
+ {history, OldRepHistoryProps},
+ {rep_starttime, ReplicationStartTime},
+ {src_starttime, SrcInstanceStartTime},
+ {tgt_starttime, TgtInstanceStartTime},
+ {src_record, RepRecSrc#doc{revs=[SrcRev]}},
+ {tgt_record, RepRecTgt#doc{revs=[TgtRev]}}
+ ],
+ {ok, NewHistory, NewContext}.
do_http_request(Url, Action, Headers) ->
do_http_request(Url, Action, Headers, []).
@@ -345,7 +443,8 @@ do_http_request(Url, Action, Headers, JsonBody) ->
do_http_request(Url, Action, _Headers, _JsonBody, 0) ->
?LOG_ERROR("couch_rep HTTP ~p request failed after 10 retries: ~s",
- [Action, Url]);
+ [Action, Url]),
+ exit({http_request_failed, ?l2b(Url)});
do_http_request(Url, Action, Headers, JsonBody, Retries) ->
?LOG_DEBUG("couch_rep HTTP ~p request: ~s", [Action, Url]),
Body =
@@ -422,54 +521,6 @@ enum_docs_since(Pid, DbSource, DbTarget, {StartSeq, RevsCount}) ->
enum_docs_since(Pid, DbSource, DbTarget, {LastSeq, RevsCount2})
-finalize_response(Source, Target, Context, NewSeqNum, Stats) ->
- [
- {start_seq, SeqNum},
- {history, OldRepHistoryProps},
- {rep_starttime, ReplicationStartTime},
- {src_starttime, SrcInstanceStartTime},
- {tgt_starttime, TgtInstanceStartTime}
- |_] = Context,
- case NewSeqNum == SeqNum andalso OldRepHistoryProps /= [] of
- true ->
- % nothing changed, don't record results
- {ok, {OldRepHistoryProps}};
- false ->
- % commit changes to both src and tgt. The src because if changes
- % we replicated are lost, we'll record the a seq number of ahead
- % of what was committed and therefore lose future changes with the
- % same seq nums.
- {ok, SrcInstanceStartTime2} = ensure_full_commit(Source),
- {ok, TgtInstanceStartTime2} = ensure_full_commit(Target),
- RecordSeqNum =
- if SrcInstanceStartTime2 == SrcInstanceStartTime andalso
- TgtInstanceStartTime2 == TgtInstanceStartTime ->
- NewSeqNum;
- true ->
- ?LOG_INFO("A server has restarted sinced replication start. "
- "Not recording the new sequence number to ensure the "
- "replication is redone and documents reexamined.", []),
- SeqNum
- end,
- HistEntries =[
- {
- [{<<"start_time">>, list_to_binary(ReplicationStartTime)},
- {<<"end_time">>, list_to_binary(httpd_util:rfc1123_date())},
- {<<"start_last_seq">>, SeqNum},
- {<<"end_last_seq">>, NewSeqNum} | Stats]}
- | proplists:get_value(<<"history">>, OldRepHistoryProps, [])],
- % something changed, record results
- NewRepHistory =
- {
- [{<<"session_id">>, couch_util:new_uuid()},
- {<<"source_last_seq">>, RecordSeqNum},
- {<<"history">>, lists:sublist(HistEntries, 50)}]},
- {ok, NewRepHistory}
- end.
fix_url(UrlBin) ->
Url = binary_to_list(UrlBin),
case lists:last(Url) of
@@ -589,7 +640,7 @@ update_local_doc(#http_db{uri=DbUrl, headers=Headers}, #doc{id=DocId}=Doc, []) -
Url = DbUrl ++ url_encode(DocId),
{ResponseMembers} = do_http_request(Url, put, Headers,
couch_doc:to_json_obj(Doc, [revs,attachments])),
- RevId = proplists:get_value(<<"_rev">>, ResponseMembers),
+ RevId = proplists:get_value(<<"rev">>, ResponseMembers),
{ok, RevId};
update_local_doc(Db, Doc, Options) ->
couch_db:update_doc(Db, Doc, Options).