summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/couchdb/couch_httpd_misc_handlers.erl10
-rw-r--r--src/couchdb/couch_rep.erl191
2 files changed, 129 insertions, 72 deletions
diff --git a/src/couchdb/couch_httpd_misc_handlers.erl b/src/couchdb/couch_httpd_misc_handlers.erl
index be9e0033..583a87c2 100644
--- a/src/couchdb/couch_httpd_misc_handlers.erl
+++ b/src/couchdb/couch_httpd_misc_handlers.erl
@@ -92,8 +92,14 @@ handle_replicate_req(#httpd{user_ctx=UserCtx,method='POST'}=Req) ->
[{headers, TgtHeaders},
{user_ctx, UserCtx}]}
| Options],
- {ok, {JsonResults}} = couch_rep:replicate(Source, Target, Options2),
- send_json(Req, {[{ok, true} | JsonResults]});
+ case couch_rep:replicate(Source, Target, Options2) of
+ {ok, {JsonResults}} ->
+ send_json(Req, {[{ok, true} | JsonResults]});
+ {error, {Type, Details}} ->
+ send_json(Req, 500, {[{error, Type}, {reason, Details}]});
+ {error, Reason} ->
+ send_json(Req, 500, {[{error, Reason}]})
+ end;
handle_replicate_req(Req) ->
send_method_not_allowed(Req, "POST").
diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl
index 3df2d821..89d40be3 100644
--- a/src/couchdb/couch_rep.erl
+++ b/src/couchdb/couch_rep.erl
@@ -91,6 +91,8 @@ replicate(Source, Target, Options) ->
}).
init([Source, Target, Options]) ->
+ process_flag(trap_exit, true),
+
{ok, DbSrc} =
open_db(Source, proplists:get_value(source_options, Options, [])),
{ok, DbTgt} =
@@ -182,6 +184,8 @@ handle_call(get_result, From, #state{listeners=L} = State) ->
handle_call({replicate_doc, {Id, Revs}}, {Pid,_}, #state{enum_pid=Pid} = State) ->
#state{
+ context = Context,
+ current_seq = Seq,
docs_buffer = Buffer,
source = Source,
target = Target,
@@ -196,17 +200,18 @@ handle_call({replicate_doc, {Id, Revs}}, {Pid,_}, #state{enum_pid=Pid} = State)
ets:update_counter(Stats, docs_read, length(Docs)),
%% save them (maybe in a buffer)
- NewBuffer = case couch_util:should_flush() of
+ {NewBuffer, NewContext} = case couch_util:should_flush() of
true ->
Docs2 = lists:flatten([Docs|Buffer]),
ok = update_docs(Target, Docs2, [], false),
ets:update_counter(Stats, docs_written, length(Docs2)),
- [];
+ {ok, _, Ctxt} = do_checkpoint(Source, Target, Context, Seq, Stats),
+ {[], Ctxt};
false ->
- [Docs | Buffer]
+ {[Docs | Buffer], Context}
end,
- {reply, ok, State#state{docs_buffer=NewBuffer}};
+ {reply, ok, State#state{context=NewContext, docs_buffer=NewBuffer}};
handle_call({fin, {LastSeq, RevsCount}}, {Pid,_}, #state{enum_pid=Pid} = State) ->
ets:update_counter(State#state.stats, total_revs, RevsCount),
@@ -216,6 +221,26 @@ handle_cast({increment_update_seq, Seq}, State) ->
couch_task_status:update("Processed source update #~p", [Seq]),
{noreply, State#state{current_seq=Seq}}.
+handle_info({'EXIT', Pid, Reason}, #state{enum_pid=Pid} = State) ->
+ ?LOG_ERROR("replication enumerator exited with ~p .. respawning", [Reason]),
+ #state{
+ current_seq = Seq,
+ source = Src,
+ target = Tgt,
+ enum_pid = Pid
+ } = State,
+ Parent = self(),
+ NewPid = spawn_link(fun() -> enum_docs_since(Parent,Src,Tgt,{Seq,0}) end),
+ {noreply, State#state{enum_pid=NewPid}};
+
+%% if any linked process dies, respawn the enumerator to get things going again
+handle_info({'EXIT', _From, normal}, State) ->
+ {noreply, State};
+handle_info({'EXIT', From, Reason}, #state{enum_pid=EnumPid} = State) ->
+ ?LOG_ERROR("replicator-linked pid ~p exited with ~p", [From, Reason]),
+ exit(EnumPid, pls_restart_kthxbye),
+ {noreply, State};
+
handle_info(_Msg, State) ->
{noreply, State}.
@@ -235,22 +260,8 @@ terminate(normal, State) ->
couch_task_status:update("Finishing"),
- %% format replication history
- JsonStats = [
- {<<"missing_checked">>, ets:lookup_element(Stats, total_revs, 2)},
- {<<"missing_found">>, ets:lookup_element(Stats, missing_revs, 2)},
- {<<"docs_read">>, ets:lookup_element(Stats, docs_read, 2)},
- {<<"docs_written">>, ets:lookup_element(Stats, docs_written, 2)}
- ],
+ {ok, NewRepHistory, _} = do_checkpoint(Source, Target, Context, Seq, Stats),
ets:delete(Stats),
- {ok, NewRepHistory} = finalize_response(Source, Target, Context, Seq, JsonStats),
-
- %% update local documents
- RepRecSrc = proplists:get_value(src_record, Context),
- RepRecTgt = proplists:get_value(tgt_record, Context),
- {ok, _} = update_local_doc(Source, RepRecSrc#doc{body=NewRepHistory}, []),
- {ok, _} = update_local_doc(Target, RepRecTgt#doc{body=NewRepHistory}, []),
-
close_db(Target),
%% reply to original requester
@@ -268,7 +279,10 @@ terminate(normal, State) ->
end,
close_db(Source);
terminate(Reason, State) ->
+ ?LOG_ERROR("replicator terminating with reason ~p", [Reason]),
#state{
+ context = Context,
+ current_seq = Seq,
listeners = Listeners,
source = Source,
target = Target,
@@ -277,6 +291,8 @@ terminate(Reason, State) ->
[gen_server:reply(L, {error, Reason}) || L <- Listeners],
+ {ok, _, _} = do_checkpoint(Source, Target, Context, Seq, Stats),
+
ets:delete(Stats),
close_db(Target),
close_db(Source).
@@ -300,6 +316,9 @@ attachment_loop(ReqId) ->
attachment_loop(ReqId);
{ibrowse_async_response, ReqId, chunk_end} ->
attachment_loop(ReqId);
+ {ibrowse_async_response, ReqId, {error, Err}} ->
+ ?LOG_ERROR("streaming attachment failed with ~p", [Err]),
+ exit(attachment_request_failed);
{ibrowse_async_response, ReqId, Data} ->
receive {From, gimme_data} -> From ! {self(), Data} end,
attachment_loop(ReqId);
@@ -317,7 +336,10 @@ attachment_stub_converter(DbS, Id, {Name, {stub, Type, Length}}) ->
%% make the async request
Options = [{stream_to, Pid}, {response_format, binary}],
- {ibrowse_req_id, ReqId} = ibrowse:send_req(Url, Headers, get, [], Options),
+ ReqId = case ibrowse:send_req(Url, Headers, get, [], Options, infinity) of
+ {ibrowse_req_id, X} -> X;
+ {error, _Reason} -> exit(attachment_request_failed)
+ end,
%% tell our receiver about the ReqId it needs to look for
Pid ! {self(), {set_req_id, ReqId}},
@@ -337,6 +359,82 @@ close_db(#http_db{})->
close_db(Db)->
couch_db:close(Db).
+do_checkpoint(Source, Target, Context, NewSeqNum, Stats) ->
+ ?LOG_INFO("recording a checkpoint at source update_seq ~p", [NewSeqNum]),
+ [
+ {start_seq, SeqNum},
+ {history, OldRepHistoryProps},
+ {rep_starttime, ReplicationStartTime},
+ {src_starttime, SrcInstanceStartTime},
+ {tgt_starttime, TgtInstanceStartTime},
+ {src_record, RepRecSrc},
+ {tgt_record, RepRecTgt}
+ ] = Context,
+
+ NewHistory = case NewSeqNum == SeqNum andalso OldRepHistoryProps /= [] of
+ true ->
+ % nothing changed, don't record results
+ {OldRepHistoryProps};
+ false ->
+ % commit changes to both src and tgt. The src because if changes
+ % we replicated are lost, we'll record the a seq number of ahead
+ % of what was committed and therefore lose future changes with the
+ % same seq nums.
+ {ok, SrcInstanceStartTime2} = ensure_full_commit(Source),
+ {ok, TgtInstanceStartTime2} = ensure_full_commit(Target),
+
+ RecordSeqNum =
+ if SrcInstanceStartTime2 == SrcInstanceStartTime andalso
+ TgtInstanceStartTime2 == TgtInstanceStartTime ->
+ NewSeqNum;
+ true ->
+ ?LOG_INFO("A server has restarted sinced replication start. "
+ "Not recording the new sequence number to ensure the "
+ "replication is redone and documents reexamined.", []),
+ SeqNum
+ end,
+
+ %% format replication history
+ JsonStats = [
+ {<<"missing_checked">>, ets:lookup_element(Stats, total_revs, 2)},
+ {<<"missing_found">>, ets:lookup_element(Stats, missing_revs, 2)},
+ {<<"docs_read">>, ets:lookup_element(Stats, docs_read, 2)},
+ {<<"docs_written">>, ets:lookup_element(Stats, docs_written, 2)}
+ ],
+
+ HistEntries =[
+ {
+ [{<<"start_time">>, list_to_binary(ReplicationStartTime)},
+ {<<"end_time">>, list_to_binary(httpd_util:rfc1123_date())},
+ {<<"start_last_seq">>, SeqNum},
+ {<<"end_last_seq">>, NewSeqNum} | JsonStats]}
+ | proplists:get_value(<<"history">>, OldRepHistoryProps, [])],
+ % something changed, record results
+ {[
+ {<<"session_id">>, couch_util:new_uuid()},
+ {<<"source_last_seq">>, RecordSeqNum},
+ {<<"history">>, lists:sublist(HistEntries, 50)}
+ ]}
+ end,
+
+ %% update local documents
+ RepRecSrc = proplists:get_value(src_record, Context),
+ RepRecTgt = proplists:get_value(tgt_record, Context),
+ {ok, TgtRev} = update_local_doc(Target, RepRecTgt#doc{body=NewHistory}, []),
+ {ok, SrcRev} = update_local_doc(Source, RepRecSrc#doc{body=NewHistory}, []),
+
+ NewContext = [
+ {start_seq, SeqNum},
+ {history, OldRepHistoryProps},
+ {rep_starttime, ReplicationStartTime},
+ {src_starttime, SrcInstanceStartTime},
+ {tgt_starttime, TgtInstanceStartTime},
+ {src_record, RepRecSrc#doc{revs=[SrcRev]}},
+ {tgt_record, RepRecTgt#doc{revs=[TgtRev]}}
+ ],
+
+ {ok, NewHistory, NewContext}.
+
do_http_request(Url, Action, Headers) ->
do_http_request(Url, Action, Headers, []).
@@ -345,7 +443,8 @@ do_http_request(Url, Action, Headers, JsonBody) ->
do_http_request(Url, Action, _Headers, _JsonBody, 0) ->
?LOG_ERROR("couch_rep HTTP ~p request failed after 10 retries: ~s",
- [Action, Url]);
+ [Action, Url]),
+ exit({http_request_failed, ?l2b(Url)});
do_http_request(Url, Action, Headers, JsonBody, Retries) ->
?LOG_DEBUG("couch_rep HTTP ~p request: ~s", [Action, Url]),
Body =
@@ -422,54 +521,6 @@ enum_docs_since(Pid, DbSource, DbTarget, {StartSeq, RevsCount}) ->
enum_docs_since(Pid, DbSource, DbTarget, {LastSeq, RevsCount2})
end.
-finalize_response(Source, Target, Context, NewSeqNum, Stats) ->
- [
- {start_seq, SeqNum},
- {history, OldRepHistoryProps},
- {rep_starttime, ReplicationStartTime},
- {src_starttime, SrcInstanceStartTime},
- {tgt_starttime, TgtInstanceStartTime}
- |_] = Context,
-
- case NewSeqNum == SeqNum andalso OldRepHistoryProps /= [] of
- true ->
- % nothing changed, don't record results
- {ok, {OldRepHistoryProps}};
- false ->
- % commit changes to both src and tgt. The src because if changes
- % we replicated are lost, we'll record the a seq number of ahead
- % of what was committed and therefore lose future changes with the
- % same seq nums.
- {ok, SrcInstanceStartTime2} = ensure_full_commit(Source),
- {ok, TgtInstanceStartTime2} = ensure_full_commit(Target),
-
- RecordSeqNum =
- if SrcInstanceStartTime2 == SrcInstanceStartTime andalso
- TgtInstanceStartTime2 == TgtInstanceStartTime ->
- NewSeqNum;
- true ->
- ?LOG_INFO("A server has restarted sinced replication start. "
- "Not recording the new sequence number to ensure the "
- "replication is redone and documents reexamined.", []),
- SeqNum
- end,
-
- HistEntries =[
- {
- [{<<"start_time">>, list_to_binary(ReplicationStartTime)},
- {<<"end_time">>, list_to_binary(httpd_util:rfc1123_date())},
- {<<"start_last_seq">>, SeqNum},
- {<<"end_last_seq">>, NewSeqNum} | Stats]}
- | proplists:get_value(<<"history">>, OldRepHistoryProps, [])],
- % something changed, record results
- NewRepHistory =
- {
- [{<<"session_id">>, couch_util:new_uuid()},
- {<<"source_last_seq">>, RecordSeqNum},
- {<<"history">>, lists:sublist(HistEntries, 50)}]},
- {ok, NewRepHistory}
- end.
-
fix_url(UrlBin) ->
Url = binary_to_list(UrlBin),
case lists:last(Url) of
@@ -589,7 +640,7 @@ update_local_doc(#http_db{uri=DbUrl, headers=Headers}, #doc{id=DocId}=Doc, []) -
Url = DbUrl ++ url_encode(DocId),
{ResponseMembers} = do_http_request(Url, put, Headers,
couch_doc:to_json_obj(Doc, [revs,attachments])),
- RevId = proplists:get_value(<<"_rev">>, ResponseMembers),
+ RevId = proplists:get_value(<<"rev">>, ResponseMembers),
{ok, RevId};
update_local_doc(Db, Doc, Options) ->
couch_db:update_doc(Db, Doc, Options).