diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/couchdb/couch_httpd_misc_handlers.erl | 10 | ||||
-rw-r--r-- | src/couchdb/couch_rep.erl | 191 |
2 files changed, 129 insertions, 72 deletions
diff --git a/src/couchdb/couch_httpd_misc_handlers.erl b/src/couchdb/couch_httpd_misc_handlers.erl index be9e0033..583a87c2 100644 --- a/src/couchdb/couch_httpd_misc_handlers.erl +++ b/src/couchdb/couch_httpd_misc_handlers.erl @@ -92,8 +92,14 @@ handle_replicate_req(#httpd{user_ctx=UserCtx,method='POST'}=Req) -> [{headers, TgtHeaders}, {user_ctx, UserCtx}]} | Options], - {ok, {JsonResults}} = couch_rep:replicate(Source, Target, Options2), - send_json(Req, {[{ok, true} | JsonResults]}); + case couch_rep:replicate(Source, Target, Options2) of + {ok, {JsonResults}} -> + send_json(Req, {[{ok, true} | JsonResults]}); + {error, {Type, Details}} -> + send_json(Req, 500, {[{error, Type}, {reason, Details}]}); + {error, Reason} -> + send_json(Req, 500, {[{error, Reason}]}) + end; handle_replicate_req(Req) -> send_method_not_allowed(Req, "POST"). diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl index 3df2d821..89d40be3 100644 --- a/src/couchdb/couch_rep.erl +++ b/src/couchdb/couch_rep.erl @@ -91,6 +91,8 @@ replicate(Source, Target, Options) -> }). init([Source, Target, Options]) -> + process_flag(trap_exit, true), + {ok, DbSrc} = open_db(Source, proplists:get_value(source_options, Options, [])), {ok, DbTgt} = @@ -182,6 +184,8 @@ handle_call(get_result, From, #state{listeners=L} = State) -> handle_call({replicate_doc, {Id, Revs}}, {Pid,_}, #state{enum_pid=Pid} = State) -> #state{ + context = Context, + current_seq = Seq, docs_buffer = Buffer, source = Source, target = Target, @@ -196,17 +200,18 @@ handle_call({replicate_doc, {Id, Revs}}, {Pid,_}, #state{enum_pid=Pid} = State) ets:update_counter(Stats, docs_read, length(Docs)), %% save them (maybe in a buffer) - NewBuffer = case couch_util:should_flush() of + {NewBuffer, NewContext} = case couch_util:should_flush() of true -> Docs2 = lists:flatten([Docs|Buffer]), ok = update_docs(Target, Docs2, [], false), ets:update_counter(Stats, docs_written, length(Docs2)), - []; + {ok, _, Ctxt} = do_checkpoint(Source, Target, Context, Seq, Stats), + {[], Ctxt}; false -> - [Docs | Buffer] + {[Docs | Buffer], Context} end, - {reply, ok, State#state{docs_buffer=NewBuffer}}; + {reply, ok, State#state{context=NewContext, docs_buffer=NewBuffer}}; handle_call({fin, {LastSeq, RevsCount}}, {Pid,_}, #state{enum_pid=Pid} = State) -> ets:update_counter(State#state.stats, total_revs, RevsCount), @@ -216,6 +221,26 @@ handle_cast({increment_update_seq, Seq}, State) -> couch_task_status:update("Processed source update #~p", [Seq]), {noreply, State#state{current_seq=Seq}}. +handle_info({'EXIT', Pid, Reason}, #state{enum_pid=Pid} = State) -> + ?LOG_ERROR("replication enumerator exited with ~p .. respawning", [Reason]), + #state{ + current_seq = Seq, + source = Src, + target = Tgt, + enum_pid = Pid + } = State, + Parent = self(), + NewPid = spawn_link(fun() -> enum_docs_since(Parent,Src,Tgt,{Seq,0}) end), + {noreply, State#state{enum_pid=NewPid}}; + +%% if any linked process dies, respawn the enumerator to get things going again +handle_info({'EXIT', _From, normal}, State) -> + {noreply, State}; +handle_info({'EXIT', From, Reason}, #state{enum_pid=EnumPid} = State) -> + ?LOG_ERROR("replicator-linked pid ~p exited with ~p", [From, Reason]), + exit(EnumPid, pls_restart_kthxbye), + {noreply, State}; + handle_info(_Msg, State) -> {noreply, State}. @@ -235,22 +260,8 @@ terminate(normal, State) -> couch_task_status:update("Finishing"), - %% format replication history - JsonStats = [ - {<<"missing_checked">>, ets:lookup_element(Stats, total_revs, 2)}, - {<<"missing_found">>, ets:lookup_element(Stats, missing_revs, 2)}, - {<<"docs_read">>, ets:lookup_element(Stats, docs_read, 2)}, - {<<"docs_written">>, ets:lookup_element(Stats, docs_written, 2)} - ], + {ok, NewRepHistory, _} = do_checkpoint(Source, Target, Context, Seq, Stats), ets:delete(Stats), - {ok, NewRepHistory} = finalize_response(Source, Target, Context, Seq, JsonStats), - - %% update local documents - RepRecSrc = proplists:get_value(src_record, Context), - RepRecTgt = proplists:get_value(tgt_record, Context), - {ok, _} = update_local_doc(Source, RepRecSrc#doc{body=NewRepHistory}, []), - {ok, _} = update_local_doc(Target, RepRecTgt#doc{body=NewRepHistory}, []), - close_db(Target), %% reply to original requester @@ -268,7 +279,10 @@ terminate(normal, State) -> end, close_db(Source); terminate(Reason, State) -> + ?LOG_ERROR("replicator terminating with reason ~p", [Reason]), #state{ + context = Context, + current_seq = Seq, listeners = Listeners, source = Source, target = Target, @@ -277,6 +291,8 @@ terminate(Reason, State) -> [gen_server:reply(L, {error, Reason}) || L <- Listeners], + {ok, _, _} = do_checkpoint(Source, Target, Context, Seq, Stats), + ets:delete(Stats), close_db(Target), close_db(Source). @@ -300,6 +316,9 @@ attachment_loop(ReqId) -> attachment_loop(ReqId); {ibrowse_async_response, ReqId, chunk_end} -> attachment_loop(ReqId); + {ibrowse_async_response, ReqId, {error, Err}} -> + ?LOG_ERROR("streaming attachment failed with ~p", [Err]), + exit(attachment_request_failed); {ibrowse_async_response, ReqId, Data} -> receive {From, gimme_data} -> From ! {self(), Data} end, attachment_loop(ReqId); @@ -317,7 +336,10 @@ attachment_stub_converter(DbS, Id, {Name, {stub, Type, Length}}) -> %% make the async request Options = [{stream_to, Pid}, {response_format, binary}], - {ibrowse_req_id, ReqId} = ibrowse:send_req(Url, Headers, get, [], Options), + ReqId = case ibrowse:send_req(Url, Headers, get, [], Options, infinity) of + {ibrowse_req_id, X} -> X; + {error, _Reason} -> exit(attachment_request_failed) + end, %% tell our receiver about the ReqId it needs to look for Pid ! {self(), {set_req_id, ReqId}}, @@ -337,6 +359,82 @@ close_db(#http_db{})-> close_db(Db)-> couch_db:close(Db). +do_checkpoint(Source, Target, Context, NewSeqNum, Stats) -> + ?LOG_INFO("recording a checkpoint at source update_seq ~p", [NewSeqNum]), + [ + {start_seq, SeqNum}, + {history, OldRepHistoryProps}, + {rep_starttime, ReplicationStartTime}, + {src_starttime, SrcInstanceStartTime}, + {tgt_starttime, TgtInstanceStartTime}, + {src_record, RepRecSrc}, + {tgt_record, RepRecTgt} + ] = Context, + + NewHistory = case NewSeqNum == SeqNum andalso OldRepHistoryProps /= [] of + true -> + % nothing changed, don't record results + {OldRepHistoryProps}; + false -> + % commit changes to both src and tgt. The src because if changes + % we replicated are lost, we'll record the a seq number of ahead + % of what was committed and therefore lose future changes with the + % same seq nums. + {ok, SrcInstanceStartTime2} = ensure_full_commit(Source), + {ok, TgtInstanceStartTime2} = ensure_full_commit(Target), + + RecordSeqNum = + if SrcInstanceStartTime2 == SrcInstanceStartTime andalso + TgtInstanceStartTime2 == TgtInstanceStartTime -> + NewSeqNum; + true -> + ?LOG_INFO("A server has restarted sinced replication start. " + "Not recording the new sequence number to ensure the " + "replication is redone and documents reexamined.", []), + SeqNum + end, + + %% format replication history + JsonStats = [ + {<<"missing_checked">>, ets:lookup_element(Stats, total_revs, 2)}, + {<<"missing_found">>, ets:lookup_element(Stats, missing_revs, 2)}, + {<<"docs_read">>, ets:lookup_element(Stats, docs_read, 2)}, + {<<"docs_written">>, ets:lookup_element(Stats, docs_written, 2)} + ], + + HistEntries =[ + { + [{<<"start_time">>, list_to_binary(ReplicationStartTime)}, + {<<"end_time">>, list_to_binary(httpd_util:rfc1123_date())}, + {<<"start_last_seq">>, SeqNum}, + {<<"end_last_seq">>, NewSeqNum} | JsonStats]} + | proplists:get_value(<<"history">>, OldRepHistoryProps, [])], + % something changed, record results + {[ + {<<"session_id">>, couch_util:new_uuid()}, + {<<"source_last_seq">>, RecordSeqNum}, + {<<"history">>, lists:sublist(HistEntries, 50)} + ]} + end, + + %% update local documents + RepRecSrc = proplists:get_value(src_record, Context), + RepRecTgt = proplists:get_value(tgt_record, Context), + {ok, TgtRev} = update_local_doc(Target, RepRecTgt#doc{body=NewHistory}, []), + {ok, SrcRev} = update_local_doc(Source, RepRecSrc#doc{body=NewHistory}, []), + + NewContext = [ + {start_seq, SeqNum}, + {history, OldRepHistoryProps}, + {rep_starttime, ReplicationStartTime}, + {src_starttime, SrcInstanceStartTime}, + {tgt_starttime, TgtInstanceStartTime}, + {src_record, RepRecSrc#doc{revs=[SrcRev]}}, + {tgt_record, RepRecTgt#doc{revs=[TgtRev]}} + ], + + {ok, NewHistory, NewContext}. + do_http_request(Url, Action, Headers) -> do_http_request(Url, Action, Headers, []). @@ -345,7 +443,8 @@ do_http_request(Url, Action, Headers, JsonBody) -> do_http_request(Url, Action, _Headers, _JsonBody, 0) -> ?LOG_ERROR("couch_rep HTTP ~p request failed after 10 retries: ~s", - [Action, Url]); + [Action, Url]), + exit({http_request_failed, ?l2b(Url)}); do_http_request(Url, Action, Headers, JsonBody, Retries) -> ?LOG_DEBUG("couch_rep HTTP ~p request: ~s", [Action, Url]), Body = @@ -422,54 +521,6 @@ enum_docs_since(Pid, DbSource, DbTarget, {StartSeq, RevsCount}) -> enum_docs_since(Pid, DbSource, DbTarget, {LastSeq, RevsCount2}) end. -finalize_response(Source, Target, Context, NewSeqNum, Stats) -> - [ - {start_seq, SeqNum}, - {history, OldRepHistoryProps}, - {rep_starttime, ReplicationStartTime}, - {src_starttime, SrcInstanceStartTime}, - {tgt_starttime, TgtInstanceStartTime} - |_] = Context, - - case NewSeqNum == SeqNum andalso OldRepHistoryProps /= [] of - true -> - % nothing changed, don't record results - {ok, {OldRepHistoryProps}}; - false -> - % commit changes to both src and tgt. The src because if changes - % we replicated are lost, we'll record the a seq number of ahead - % of what was committed and therefore lose future changes with the - % same seq nums. - {ok, SrcInstanceStartTime2} = ensure_full_commit(Source), - {ok, TgtInstanceStartTime2} = ensure_full_commit(Target), - - RecordSeqNum = - if SrcInstanceStartTime2 == SrcInstanceStartTime andalso - TgtInstanceStartTime2 == TgtInstanceStartTime -> - NewSeqNum; - true -> - ?LOG_INFO("A server has restarted sinced replication start. " - "Not recording the new sequence number to ensure the " - "replication is redone and documents reexamined.", []), - SeqNum - end, - - HistEntries =[ - { - [{<<"start_time">>, list_to_binary(ReplicationStartTime)}, - {<<"end_time">>, list_to_binary(httpd_util:rfc1123_date())}, - {<<"start_last_seq">>, SeqNum}, - {<<"end_last_seq">>, NewSeqNum} | Stats]} - | proplists:get_value(<<"history">>, OldRepHistoryProps, [])], - % something changed, record results - NewRepHistory = - { - [{<<"session_id">>, couch_util:new_uuid()}, - {<<"source_last_seq">>, RecordSeqNum}, - {<<"history">>, lists:sublist(HistEntries, 50)}]}, - {ok, NewRepHistory} - end. - fix_url(UrlBin) -> Url = binary_to_list(UrlBin), case lists:last(Url) of @@ -589,7 +640,7 @@ update_local_doc(#http_db{uri=DbUrl, headers=Headers}, #doc{id=DocId}=Doc, []) - Url = DbUrl ++ url_encode(DocId), {ResponseMembers} = do_http_request(Url, put, Headers, couch_doc:to_json_obj(Doc, [revs,attachments])), - RevId = proplists:get_value(<<"_rev">>, ResponseMembers), + RevId = proplists:get_value(<<"rev">>, ResponseMembers), {ok, RevId}; update_local_doc(Db, Doc, Options) -> couch_db:update_doc(Db, Doc, Options). |