diff options
author | Adam Kocoloski <kocolosk@apache.org> | 2009-08-10 18:37:43 +0000 |
---|---|---|
committer | Adam Kocoloski <kocolosk@apache.org> | 2009-08-10 18:37:43 +0000 |
commit | 5dcbc2290ac780f1a625b5c9435cfb35eac4e1ef (patch) | |
tree | bc9e04c73807b9eb34e05d5c70026b2e951fc673 /src | |
parent | abcc5a35fda60c7124af7899939f09e59ae7968b (diff) |
new replicator using _changes feed for continuous replication
git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@802888 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src')
-rw-r--r-- | src/couchdb/Makefile.am | 8 | ||||
-rw-r--r-- | src/couchdb/couch_httpd_misc_handlers.erl | 27 | ||||
-rw-r--r-- | src/couchdb/couch_rep.erl | 1156 | ||||
-rw-r--r-- | src/couchdb/couch_rep_att.erl | 100 | ||||
-rw-r--r-- | src/couchdb/couch_rep_changes_feed.erl | 1 | ||||
-rw-r--r-- | src/couchdb/couch_rep_missing_revs.erl | 15 | ||||
-rw-r--r-- | src/couchdb/couch_rep_reader.erl | 268 | ||||
-rw-r--r-- | src/couchdb/couch_rep_writer.erl | 68 |
8 files changed, 843 insertions, 800 deletions
diff --git a/src/couchdb/Makefile.am b/src/couchdb/Makefile.am index f32497fa..6691dbba 100644 --- a/src/couchdb/Makefile.am +++ b/src/couchdb/Makefile.am @@ -74,10 +74,13 @@ source_files = \ couch_query_servers.erl \ couch_ref_counter.erl \ couch_rep.erl \ + couch_rep_att.erl \ couch_rep_changes_feed.erl \ couch_rep_httpc.erl \ couch_rep_missing_revs.erl \ + couch_rep_reader.erl \ couch_rep_sup.erl \ + couch_rep_writer.erl \ couch_server.erl \ couch_server_sup.erl \ couch_stats_aggregator.erl \ @@ -123,10 +126,13 @@ compiled_files = \ couch_query_servers.beam \ couch_ref_counter.beam \ couch_rep.beam \ + couch_rep_att.beam \ couch_rep_changes_feed.beam \ - couch_rep_missing_revs.beam \ couch_rep_httpc.beam \ + couch_rep_missing_revs.beam \ + couch_rep_reader.beam \ couch_rep_sup.beam \ + couch_rep_writer.beam \ couch_server.beam \ couch_server_sup.beam \ couch_stats_aggregator.beam \ diff --git a/src/couchdb/couch_httpd_misc_handlers.erl b/src/couchdb/couch_httpd_misc_handlers.erl index eea353bd..c4f28308 100644 --- a/src/couchdb/couch_httpd_misc_handlers.erl +++ b/src/couchdb/couch_httpd_misc_handlers.erl @@ -77,32 +77,9 @@ handle_task_status_req(#httpd{method='GET'}=Req) -> handle_task_status_req(Req) -> send_method_not_allowed(Req, "GET,HEAD"). -% add trailing slash if missing -fix_db_url(UrlBin) -> - ?l2b(case lists:last(Url = ?b2l(UrlBin)) of - $/ -> Url; - _ -> Url ++ "/" - end). - - -get_rep_endpoint(_Req, {Props}) -> - Url = proplists:get_value(<<"url">>, Props), - {BinHeaders} = proplists:get_value(<<"headers">>, Props, {[]}), - Auth = proplists:get_value(<<"auth">>, Props, undefined), - ?LOG_DEBUG("AUTH ~p", [Auth]), - {remote, fix_db_url(Url), [{?b2l(K),?b2l(V)} || {K,V} <- BinHeaders], Auth}; -get_rep_endpoint(_Req, <<"http://",_/binary>>=Url) -> - {remote, fix_db_url(Url), [], []}; -get_rep_endpoint(_Req, <<"https://",_/binary>>=Url) -> - {remote, fix_db_url(Url), [], []}; -get_rep_endpoint(#httpd{user_ctx=UserCtx}, <<DbName/binary>>) -> - {local, DbName, UserCtx}. - handle_replicate_req(#httpd{method='POST'}=Req) -> - {Props} = couch_httpd:json_body_obj(Req), - Source = get_rep_endpoint(Req, proplists:get_value(<<"source">>, Props)), - Target = get_rep_endpoint(Req, proplists:get_value(<<"target">>, Props)), - case couch_rep:replicate(Source, Target) of + PostBody = couch_httpd:json_body_obj(Req), + case couch_rep:replicate(PostBody, Req#httpd.user_ctx) of {ok, {JsonResults}} -> send_json(Req, {[{ok, true} | JsonResults]}); {error, {Type, Details}} -> diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl index 1692943a..c5a07685 100644 --- a/src/couchdb/couch_rep.erl +++ b/src/couchdb/couch_rep.erl @@ -12,155 +12,97 @@ -module(couch_rep). -behaviour(gen_server). --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -export([replicate/2]). --define(BUFFER_NDOCS, 1000). --define(BUFFER_NATTACHMENTS, 50). --define(BUFFER_MEMORY, 10000000). %% bytes - -include("couch_db.hrl"). --include("../ibrowse/ibrowse.hrl"). - -%% @spec replicate(Source::binary(), Target::binary()) -> -%% {ok, Stats} | {error, Reason} -%% @doc Triggers a replication. Stats is a JSON Object with the following -%% keys: session_id (UUID), source_last_seq (integer), and history (array). -%% Each element of the history is an Object with keys start_time, end_time, -%% start_last_seq, end_last_seq, missing_checked, missing_found, docs_read, -%% and docs_written. -%% -%% The supervisor will try to restart the replication in case of any error -%% other than shutdown. Just call this function again to listen for the -%% result of the retry. -replicate(Source, Target) -> - {ok, HostName} = inet:gethostname(), - RepId = couch_util:to_hex( - erlang:md5(term_to_binary([HostName, Source, Target]))), - Args = [?MODULE, [RepId, Source,Target], []], +-record(state, { + changes_feed, + missing_revs, + reader, + writer, + + source, + target, + init_args, + + start_seq, + history, + source_log, + target_log, + rep_starttime, + src_starttime, + tgt_starttime, + checkpoint_history = nil, + + listeners = [], + complete = false, + committed_seq = 0, + + stats = nil +}). +%% convenience function to do a simple replication from the shell +replicate(Source, Target) when is_list(Source) -> + replicate(?l2b(Source), Target); +replicate(Source, Target) when is_binary(Source), is_list(Target) -> + replicate(Source, ?l2b(Target)); +replicate(Source, Target) when is_binary(Source), is_binary(Target) -> + replicate({[{<<"source">>, Source}, {<<"target">>, Target}]}, #user_ctx{}); + +%% function handling POST to _replicate +replicate(PostBody, UserCtx) -> + RepId = make_replication_id(PostBody, UserCtx), Replicator = {RepId, - {gen_server, start_link, Args}, + {gen_server, start_link, [?MODULE, [RepId, PostBody, UserCtx], []]}, transient, 1, worker, [?MODULE] }, - Server = case supervisor:start_child(couch_rep_sup, Replicator) of - {ok, Pid} -> - ?LOG_INFO("starting new replication ~p at ~p", [RepId, Pid]), - Pid; - {error, already_present} -> - case supervisor:restart_child(couch_rep_sup, RepId) of - {ok, Pid} -> - ?LOG_INFO("starting replication ~p at ~p", [RepId, Pid]), - Pid; - {error, running} -> - %% this error occurs if multiple replicators are racing - %% each other to start and somebody else won. Just grab - %% the Pid by calling start_child again. - {error, {already_started, Pid}} = - supervisor:start_child(couch_rep_sup, Replicator), - ?LOG_INFO("replication ~p already running at ~p", [RepId, Pid]), - Pid - end; - {error, {already_started, Pid}} -> - ?LOG_INFO("replication ~p already running at ~p", [RepId, Pid]), - Pid - end, + Server = start_replication_server(Replicator), - case gen_server:call(Server, get_result, infinity) of - retry -> replicate(Source, Target); - Else -> Else + try gen_server:call(Server, get_result, infinity) of + retry -> replicate(PostBody, UserCtx); + Else -> Else + catch + exit:{noproc, {gen_server, call, [Server, get_result , infinity]}} -> + %% oops, this replication just finished -- restart it. + replicate(PostBody, UserCtx); + exit:{normal, {gen_server, call, [Server, get_result , infinity]}} -> + %% we made the call during terminate + replicate(PostBody, UserCtx) end. -%%============================================================================= -%% gen_server callbacks -%%============================================================================= - --record(old_http_db, { - uri, - headers, - oauth -}). - - --record(state, { - context, - current_seq, - source, - target, - stats, - enum_pid, - docs_buffer = [], - listeners = [], - done = false -}). - - -init([RepId, Source, Target]) -> +init([RepId, {PostProps}, UserCtx] = InitArgs) -> process_flag(trap_exit, true), - {ok, DbSrc, SrcName} = open_db(Source), - {ok, DbTgt, TgtName} = open_db(Target), + SourceProps = proplists:get_value(<<"source">>, PostProps), + TargetProps = proplists:get_value(<<"target">>, PostProps), - DocKey = ?l2b(?LOCAL_DOC_PREFIX ++ RepId), + Source = open_db(SourceProps, UserCtx), + Target = open_db(TargetProps, UserCtx), - {ok, InfoSrc} = get_db_info(DbSrc), - {ok, InfoTgt} = get_db_info(DbTgt), + SourceLog = open_replication_log(Source, RepId), + TargetLog = open_replication_log(Target, RepId), - ReplicationStartTime = httpd_util:rfc1123_date(), - SrcInstanceStartTime = proplists:get_value(instance_start_time, InfoSrc), - TgtInstanceStartTime = proplists:get_value(instance_start_time, InfoTgt), + SourceInfo = dbinfo(Source), + TargetInfo = dbinfo(Target), + + {StartSeq, History} = compare_replication_logs(SourceLog, TargetLog), - RepRecDocSrc = - case open_doc(DbSrc, DocKey, []) of - {ok, SrcDoc} -> - ?LOG_DEBUG("Found existing replication record on source", []), - SrcDoc; - _ -> #doc{id=DocKey} - end, - - RepRecDocTgt = - case open_doc(DbTgt, DocKey, []) of - {ok, TgtDoc} -> - ?LOG_DEBUG("Found existing replication record on target", []), - TgtDoc; - _ -> #doc{id=DocKey} - end, - - #doc{body={RepRecProps}} = RepRecDocSrc, - #doc{body={RepRecPropsTgt}} = RepRecDocTgt, - - case proplists:get_value(<<"session_id">>, RepRecProps) == - proplists:get_value(<<"session_id">>, RepRecPropsTgt) of - true -> - % if the records have the same session id, - % then we have a valid replication history - OldSeqNum = proplists:get_value(<<"source_last_seq">>, RepRecProps, 0), - OldHistory = proplists:get_value(<<"history">>, RepRecProps, []); - false -> - ?LOG_INFO("Replication records differ. " - "Performing full replication instead of incremental.", []), - ?LOG_DEBUG("Record on source:~p~nRecord on target:~p~n", - [RepRecProps, RepRecPropsTgt]), - OldSeqNum = 0, - OldHistory = [] - end, - - Context = [ - {start_seq, OldSeqNum}, - {history, OldHistory}, - {rep_starttime, ReplicationStartTime}, - {src_starttime, SrcInstanceStartTime}, - {tgt_starttime, TgtInstanceStartTime}, - {src_record, RepRecDocSrc}, - {tgt_record, RepRecDocTgt} - ], + {ok, ChangesFeed} = + couch_rep_changes_feed:start_link(self(), Source, StartSeq, PostProps), + {ok, MissingRevs} = + couch_rep_missing_revs:start_link(self(), Target, ChangesFeed, PostProps), + {ok, Reader} = + couch_rep_reader:start_link(self(), Source, MissingRevs, PostProps), + {ok, Writer} = + couch_rep_writer:start_link(self(), Target, Reader, PostProps), Stats = ets:new(replication_stats, [set, private]), ets:insert(Stats, {total_revs,0}), @@ -169,158 +111,116 @@ init([RepId, Source, Target]) -> ets:insert(Stats, {docs_written, 0}), ets:insert(Stats, {doc_write_failures, 0}), - couch_task_status:add_task("Replication", <<SrcName/binary, " -> ", - TgtName/binary>>, "Starting"), - - Parent = self(), - Pid = spawn_link(fun() -> enum_docs_since(Parent,DbSrc,DbTgt,{OldSeqNum,0}) end), + {ShortId, _} = lists:split(6, RepId), + couch_task_status:add_task("Replication", io_lib:format("~s: ~s -> ~s", + [ShortId, dbname(Source), dbname(Target)]), "Starting"), State = #state{ - context = Context, - current_seq = OldSeqNum, - enum_pid = Pid, - source = DbSrc, - target = DbTgt, - stats = Stats - }, + changes_feed = ChangesFeed, + missing_revs = MissingRevs, + reader = Reader, + writer = Writer, - {ok, State}. -handle_call(get_result, From, #state{listeners=L,done=true} = State) -> - {stop, normal, State#state{listeners=[From|L]}}; -handle_call(get_result, From, #state{listeners=L} = State) -> - {noreply, State#state{listeners=[From|L]}}; - -handle_call({replicate_doc, {Id, Revs}}, {Pid,_}, #state{enum_pid=Pid} = State) -> - #state{ - context = Context, - current_seq = Seq, - docs_buffer = Buffer, source = Source, target = Target, - stats = Stats - } = State, + init_args = InitArgs, + stats = Stats, + + start_seq = StartSeq, + history = History, + source_log = SourceLog, + target_log = TargetLog, + rep_starttime = httpd_util:rfc1123_date(), + src_starttime = proplists:get_value(instance_start_time, SourceInfo), + tgt_starttime = proplists:get_value(instance_start_time, TargetInfo) + }, + {ok, State}. - ets:update_counter(Stats, missing_revs, length(Revs)), +handle_call(get_result, From, #state{complete=true, listeners=[]} = State) -> + {stop, normal, State#state{listeners=[From]}}; +handle_call(get_result, From, State) -> + Listeners = State#state.listeners, + {noreply, State#state{listeners=[From|Listeners]}}. - %% get document(s) - {ok, DocResults} = open_doc_revs(Source, Id, Revs, [latest]), - Docs = [RevDoc || {ok, RevDoc} <- DocResults], - ets:update_counter(Stats, docs_read, length(Docs)), +handle_cast(_Msg, State) -> + {noreply, State}. - %% save them (maybe in a buffer) - {NewBuffer, NewContext} = - case should_flush(lists:flatlength([Docs|Buffer])) of - true -> - Docs2 = lists:flatten([Docs|Buffer]), - try update_docs(Target, Docs2, [], replicated_changes) of - {ok, Errors} -> - dump_update_errors(Errors), - ets:update_counter(Stats, doc_write_failures, length(Errors)), - ets:update_counter(Stats, docs_written, length(Docs2) - - length(Errors)), - {ok, _, Ctxt} = do_checkpoint(Source, Target, Context, Seq, Stats), - {[], Ctxt} - catch - throw:attachment_write_failed -> - ?LOG_ERROR("attachment request failed during write to disk", []), - exit({internal_server_error, replication_link_failure}) - end; - false -> - {[Docs | Buffer], Context} - end, +handle_info({missing_revs_checkpoint, SourceSeq}, State) -> + couch_task_status:update("MR Processed source update #~p", [SourceSeq]), + {noreply, do_checkpoint(State#state{committed_seq = SourceSeq})}; + +handle_info({writer_checkpoint, SourceSeq}, #state{committed_seq=N} = State) + when SourceSeq > N -> + MissingRevs = State#state.missing_revs, + ok = gen_server:cast(MissingRevs, {update_committed_seq, SourceSeq}), + couch_task_status:update("W Processed source update #~p", [SourceSeq]), + {noreply, do_checkpoint(State#state{committed_seq = SourceSeq})}; +handle_info({writer_checkpoint, _}, State) -> + {noreply, State}; - {reply, ok, State#state{context=NewContext, docs_buffer=NewBuffer}}; +handle_info({update_stats, Key, N}, State) -> + ets:update_counter(State#state.stats, Key, N), + {noreply, State}; -handle_call({fin, {LastSeq, RevsCount}}, {Pid,_}, #state{enum_pid=Pid} = State) -> - ets:update_counter(State#state.stats, total_revs, RevsCount), +handle_info({'EXIT', Writer, normal}, #state{writer=Writer} = State) -> case State#state.listeners of [] -> - % still waiting for the first listener to send a request - {noreply, State#state{current_seq=LastSeq,done=true}}; - _ -> - {stop, normal, ok, State#state{current_seq=LastSeq}} - end. - -handle_cast({increment_update_seq, Seq}, State) -> - couch_task_status:update("Processed source update #~p", [Seq]), - {noreply, State#state{current_seq=Seq}}. - -handle_info({'EXIT', Pid, Reason}, #state{enum_pid=Pid} = State) -> - ?LOG_ERROR("replication enumerator exited with ~p .. respawning", [Reason]), - #state{ - current_seq = Seq, - source = Src, - target = Tgt, - enum_pid = Pid - } = State, - Parent = self(), - NewPid = spawn_link(fun() -> enum_docs_since(Parent,Src,Tgt,{Seq,0}) end), - {noreply, State#state{enum_pid=NewPid}}; + {noreply, State#state{complete = true}}; + _Else -> + {stop, normal, State} + end; -%% if any linked process dies, respawn the enumerator to get things going again -handle_info({'EXIT', _From, normal}, State) -> - {noreply, State}; -handle_info({'EXIT', From, Reason}, #state{enum_pid=EnumPid} = State) -> - ?LOG_ERROR("replicator-linked pid ~p exited with ~p", [From, Reason]), - exit(EnumPid, pls_restart_kthxbye), +handle_info({'EXIT', _, normal}, State) -> {noreply, State}; - -handle_info(_Msg, State) -> - {noreply, State}. +handle_info({'EXIT', Pid, Reason}, State) -> + ?LOG_ERROR("exit of linked Pid ~p with reason ~p", [Pid, Reason]), + {stop, Reason, State}. terminate(normal, State) -> + % ?LOG_DEBUG("replication terminating normally", []), #state{ - context = Context, - current_seq = Seq, - docs_buffer = Buffer, + checkpoint_history = CheckpointHistory, + committed_seq = NewSeq, listeners = Listeners, source = Source, target = Target, - stats = Stats + stats = Stats, + source_log = #doc{body={OldHistory}} } = State, - - try update_docs(Target, lists:flatten(Buffer), [], replicated_changes) of - {ok, Errors} -> - dump_update_errors(Errors), - ets:update_counter(Stats, doc_write_failures, length(Errors)), - ets:update_counter(Stats, docs_written, lists:flatlength(Buffer) - - length(Errors)) - catch - throw:attachment_write_failed -> - ?LOG_ERROR("attachment request failed during final write", []), - exit({internal_server_error, replication_link_failure}) - end, - couch_task_status:update("Finishing"), - - {ok, NewRepHistory, _} = do_checkpoint(Source, Target, Context, Seq, Stats), ets:delete(Stats), close_db(Target), + + NewRepHistory = case CheckpointHistory of + nil -> + {[{<<"no_changes">>, true} | OldHistory]}; + _Else -> + CheckpointHistory + end, - [Original|Rest] = Listeners, + %% reply to original requester + [Original|OtherListeners] = lists:reverse(Listeners), gen_server:reply(Original, {ok, NewRepHistory}), %% maybe trigger another replication. If this replicator uses a local %% source Db, changes to that Db since we started will not be included in %% this pass. - case up_to_date(Source, Seq) of + case up_to_date(Source, NewSeq) of true -> - [gen_server:reply(R, {ok, NewRepHistory}) || R <- Rest]; + [gen_server:reply(R, {ok, NewRepHistory}) || R <- OtherListeners]; false -> - [gen_server:reply(R, retry) || R <- Rest] + [gen_server:reply(R, retry) || R <- OtherListeners] end, close_db(Source); + terminate(Reason, State) -> - ?LOG_ERROR("replicator terminating with reason ~p", [Reason]), #state{ listeners = Listeners, source = Source, target = Target, stats = Stats } = State, - [gen_server:reply(L, {error, Reason}) || L <- Listeners], - ets:delete(Stats), close_db(Target), close_db(Source). @@ -328,560 +228,284 @@ terminate(Reason, State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -%%============================================================================= -%% internal functions -%%============================================================================= - +% internal funs -% we should probably write these to a special replication log -% or have a callback where the caller decides what to do with replication -% errors. -dump_update_errors([]) -> ok; -dump_update_errors([{{Id, Rev}, Error}|Rest]) -> - ?LOG_INFO("error replicating document \"~s\" rev \"~s\":~p", - [Id, couch_doc:rev_to_str(Rev), Error]), - dump_update_errors(Rest). - -attachment_loop(ReqId, Conn) -> - couch_util:should_flush(), - receive - {From, {set_req_id, NewId}} -> - %% we learn the ReqId to listen for - From ! {self(), {ok, NewId}}, - attachment_loop(NewId, Conn); - {ibrowse_async_headers, ReqId, Status, Headers} -> - %% we got header, give the controlling process a chance to react - receive - {From, gimme_status} -> - %% send status/headers to controller - From ! {self(), {status, Status, Headers}}, - receive - {From, continue} -> - %% normal case - attachment_loop(ReqId, Conn); - {From, fail} -> - %% error, failure code - ?LOG_ERROR( - "streaming attachment failed with status ~p", - [Status]), - catch ibrowse:stop_worker_process(Conn), - exit(attachment_request_failed); - {From, stop_ok} -> - %% stop looping, controller will start a new loop - catch ibrowse:stop_worker_process(Conn), - stop_ok - end - end, - attachment_loop(ReqId, Conn); - {ibrowse_async_response, ReqId, {chunk_start,_}} -> - attachment_loop(ReqId, Conn); - {ibrowse_async_response, ReqId, chunk_end} -> - attachment_loop(ReqId, Conn); - {ibrowse_async_response, ReqId, {error, Err}} -> - ?LOG_ERROR("streaming attachment failed with ~p", [Err]), - catch ibrowse:stop_worker_process(Conn), - exit(attachment_request_failed); - {ibrowse_async_response, ReqId, Data} -> - receive {From, gimme_data} -> From ! {self(), Data} end, - attachment_loop(ReqId, Conn); - {ibrowse_async_response_end, ReqId} -> - catch ibrowse:stop_worker_process(Conn), - exit(normal) +start_replication_server(Replicator) -> + RepId = element(1, Replicator), + case supervisor:start_child(couch_rep_sup, Replicator) of + {ok, Pid} -> + ?LOG_INFO("starting new replication ~p at ~p", [RepId, Pid]), + Pid; + {error, already_present} -> + case supervisor:restart_child(couch_rep_sup, RepId) of + {ok, Pid} -> + ?LOG_INFO("starting replication ~p at ~p", [RepId, Pid]), + Pid; + {error, running} -> + %% this error occurs if multiple replicators are racing + %% each other to start and somebody else won. Just grab + %% the Pid by calling start_child again. + {error, {already_started, Pid}} = + supervisor:start_child(couch_rep_sup, Replicator), + ?LOG_DEBUG("replication ~p already running at ~p", [RepId, Pid]), + Pid + end; + {error, {already_started, Pid}} -> + ?LOG_DEBUG("replication ~p already running at ~p", [RepId, Pid]), + Pid end. -att_stub_converter(DbS, Id, Rev, - #att{name=Name,data=stub,type=Type,len=Length}=Att) -> - #old_http_db{uri=DbUrl, headers=Headers} = DbS, - {Pos, [RevId|_]} = Rev, - Url = lists:flatten([DbUrl, couch_util:url_encode(Id), "/", couch_util:url_encode(?b2l(Name)), - "?rev=", ?b2l(couch_doc:rev_to_str({Pos,RevId}))]), - ?LOG_DEBUG("Attachment URL ~s", [Url]), - {ok, RcvFun} = make_att_stub_receiver(Url, Headers, Name, - Type, Length), - Att#att{name=Name,type=Type,data=RcvFun,len=Length}. - -make_att_stub_receiver(Url, Headers, Name, Type, Length) -> - make_att_stub_receiver(Url, Headers, Name, Type, Length, 10, 1000). - -make_att_stub_receiver(Url, _Headers, _Name, _Type, _Length, 0, _Pause) -> - ?LOG_ERROR("streaming attachment request failed after 10 retries: ~s", - [Url]), - exit({attachment_request_failed, ?l2b(["failed to replicate ", Url])}); - -make_att_stub_receiver(Url, Headers, Name, Type, Length, Retries, Pause) -> - %% start the process that receives attachment data from ibrowse - #url{host=Host, port=Port} = ibrowse_lib:parse_url(Url), - {ok, Conn} = ibrowse:spawn_link_worker_process(Host, Port), - Pid = spawn_link(fun() -> attachment_loop(nil, Conn) end), - - %% make the async request - Opts = [{stream_to, Pid}, {response_format, binary}], - ReqId = - case ibrowse:send_req_direct(Conn, Url, Headers, get, [], Opts, infinity) of - {ibrowse_req_id, X} -> - X; - {error, Reason} -> - ?LOG_INFO("retrying couch_rep attachment request in ~p " ++ - "seconds due to {error, ~p}: ~s", [Pause/1000, Reason, Url]), - catch ibrowse:stop_worker_process(Conn), - timer:sleep(Pause), - make_att_stub_receiver(Url, Headers, Name, Type, Length, - Retries-1, 2*Pause) - end, - - %% tell our receiver about the ReqId it needs to look for - Pid ! {self(), {set_req_id, ReqId}}, - receive - {Pid, {ok, ReqId}} -> - ok; - {'EXIT', Pid, _Reason} -> - catch ibrowse:stop_worker_process(Conn), - timer:sleep(Pause), - make_att_stub_receiver(Url, Headers, Name, Type, Length, - Retries-1, 2*Pause) - end, - - %% wait for headers to ensure that we have a 200 status code - %% this is where we follow redirects etc - Pid ! {self(), gimme_status}, - receive - {'EXIT', Pid, attachment_request_failed} -> - catch ibrowse:stop_worker_process(Conn), - make_att_stub_receiver(Url, Headers, Name, Type, Length, - Retries-1, Pause); - {Pid, {status, StreamStatus, StreamHeaders}} -> - ?LOG_DEBUG("streaming attachment Status ~p Headers ~p", - [StreamStatus, StreamHeaders]), - - ResponseCode = list_to_integer(StreamStatus), - if - ResponseCode >= 200, ResponseCode < 300 -> - % the normal case - Pid ! {self(), continue}, - %% this function goes into the streaming attachment code. - %% It gets executed by the replication gen_server, so it can't - %% be the one to actually receive the ibrowse data. - {ok, fun() -> - Pid ! {self(), gimme_data}, - receive - {Pid, Data} -> - Data; - {'EXIT', Pid, attachment_request_failed} -> - throw(attachment_write_failed) - end - end}; - ResponseCode >= 300, ResponseCode < 400 -> - % follow the redirect - Pid ! {self(), stop_ok}, - RedirectUrl = mochiweb_headers:get_value("Location", - mochiweb_headers:make(StreamHeaders)), - catch ibrowse:stop_worker_process(Conn), - make_att_stub_receiver(RedirectUrl, Headers, Name, Type, - Length, Retries - 1, Pause); - ResponseCode >= 400, ResponseCode < 500 -> - % an error... log and fail - ?LOG_ERROR("streaming attachment failed with code ~p: ~s", - [ResponseCode, Url]), - Pid ! {self(), fail}, - exit(attachment_request_failed); - ResponseCode == 500 -> - % an error... log and retry - ?LOG_INFO("retrying couch_rep attachment request in ~p " ++ - "seconds due to 500 response: ~s", [Pause/1000, Url]), - Pid ! {self(), fail}, - catch ibrowse:stop_worker_process(Conn), - timer:sleep(Pause), - make_att_stub_receiver(Url, Headers, Name, Type, Length, - Retries - 1, 2*Pause) - end +compare_replication_logs(SrcDoc, TgtDoc) -> + #doc{body={RepRecProps}} = SrcDoc, + #doc{body={RepRecPropsTgt}} = TgtDoc, + case proplists:get_value(<<"session_id">>, RepRecProps) == + proplists:get_value(<<"session_id">>, RepRecPropsTgt) of + true -> + % if the records have the same session id, + % then we have a valid replication history + OldSeqNum = proplists:get_value(<<"source_last_seq">>, RepRecProps, 0), + OldHistory = proplists:get_value(<<"history">>, RepRecProps, []), + {OldSeqNum, OldHistory}; + false -> + SourceHistory = proplists:get_value(<<"history">>, RepRecProps, []), + TargetHistory = proplists:get_value(<<"history">>, RepRecPropsTgt, []), + ?LOG_INFO("Replication records differ. " + "Scanning histories to find a common ancestor.", []), + ?LOG_DEBUG("Record on source:~p~nRecord on target:~p~n", + [RepRecProps, RepRecPropsTgt]), + compare_rep_history(SourceHistory, TargetHistory) end. - -open_db({remote, Url, Headers, Auth})-> - {ok, #old_http_db{uri=?b2l(Url), headers=Headers, oauth=Auth}, Url}; -open_db({local, DbName, UserCtx})-> - case couch_db:open(DbName, [{user_ctx, UserCtx}]) of - {ok, Db} -> {ok, Db, DbName}; - Error -> Error +compare_rep_history(S, T) when length(S) =:= 0 orelse length(T) =:= 0 -> + ?LOG_INFO("no common ancestry -- performing full replication", []), + {0, []}; +compare_rep_history([{S}|SourceRest], [{T}|TargetRest]=Target) -> + SourceId = proplists:get_value(<<"session_id">>, S), + case has_session_id(SourceId, Target) of + true -> + RecordSeqNum = proplists:get_value(<<"recorded_seq">>, S, 0), + ?LOG_INFO("found a common replication record with source_seq ~p", + [RecordSeqNum]), + {RecordSeqNum, SourceRest}; + false -> + TargetId = proplists:get_value(<<"session_id">>, T), + case has_session_id(TargetId, SourceRest) of + true -> + RecordSeqNum = proplists:get_value(<<"recorded_seq">>, T, 0), + ?LOG_INFO("found a common replication record with source_seq ~p", + [RecordSeqNum]), + {RecordSeqNum, TargetRest}; + false -> + compare_rep_history(SourceRest, TargetRest) + end end. - -close_db(#old_http_db{})-> +close_db(#http_db{})-> ok; close_db(Db)-> couch_db:close(Db). -do_checkpoint(Source, Target, Context, NewSeqNum, Stats) -> - ?LOG_INFO("recording a checkpoint at source update_seq ~p", [NewSeqNum]), - [ - {start_seq, StartSeqNum}, - {history, OldHistory}, - {rep_starttime, ReplicationStartTime}, - {src_starttime, SrcInstanceStartTime}, - {tgt_starttime, TgtInstanceStartTime}, - {src_record, #doc{body={LastRepRecord}}=RepRecDocSrc}, - {tgt_record, RepRecDocTgt} - ] = Context, - - case NewSeqNum == StartSeqNum andalso OldHistory /= [] of - true -> - % nothing changed, don't record results - {ok, {[{<<"no_changes">>, true} | LastRepRecord]}, Context}; - false -> - % something changed, record results for incremental replication, - - % commit changes to both src and tgt. The src because if changes - % we replicated are lost, we'll record the a seq number ahead - % of what was committed. If those changes are lost and the seq number - % reverts to a previous committed value, we will skip future changes - % when new doc updates are given our already replicated seq nums. - - % commit the src async - ParentPid = self(), - SrcCommitPid = spawn_link(fun() -> - ParentPid ! {self(), ensure_full_commit(Source)} end), - - % commit tgt sync - {ok, TgtInstanceStartTime2} = ensure_full_commit(Target), - - SrcInstanceStartTime2 = - receive - {SrcCommitPid, {ok, Timestamp}} -> - Timestamp; - {'EXIT', SrcCommitPid, {http_request_failed, _}} -> - exit(replication_link_failure) - end, - - RecordSeqNum = - if SrcInstanceStartTime2 == SrcInstanceStartTime andalso - TgtInstanceStartTime2 == TgtInstanceStartTime -> - NewSeqNum; - true -> - ?LOG_INFO("A server has restarted sinced replication start. " - "Not recording the new sequence number to ensure the " - "replication is redone and documents reexamined.", []), - StartSeqNum - end, - - NewHistoryEntry = { - [{<<"start_time">>, list_to_binary(ReplicationStartTime)}, - {<<"end_time">>, list_to_binary(httpd_util:rfc1123_date())}, - {<<"start_last_seq">>, StartSeqNum}, - {<<"end_last_seq">>, NewSeqNum}, - {<<"missing_checked">>, ets:lookup_element(Stats, total_revs, 2)}, - {<<"missing_found">>, ets:lookup_element(Stats, missing_revs, 2)}, - {<<"docs_read">>, ets:lookup_element(Stats, docs_read, 2)}, - {<<"docs_written">>, ets:lookup_element(Stats, docs_written, 2)}, - {<<"doc_write_failures">>, ets:lookup_element(Stats, doc_write_failures, 2)} - ]}, - % limit history to 50 entries - HistEntries =lists:sublist([NewHistoryEntry | OldHistory], 50), - - NewRepHistory = - {[{<<"session_id">>, couch_util:new_uuid()}, - {<<"source_last_seq">>, RecordSeqNum}, - {<<"history">>, HistEntries}]}, - - {ok, {SrcRevPos,SrcRevId}} = update_doc(Source, - RepRecDocSrc#doc{body=NewRepHistory}, []), - {ok, {TgtRevPos,TgtRevId}} = update_doc(Target, - RepRecDocTgt#doc{body=NewRepHistory}, []), - - NewContext = [ - {start_seq, StartSeqNum}, - {history, OldHistory}, - {rep_starttime, ReplicationStartTime}, - {src_starttime, SrcInstanceStartTime}, - {tgt_starttime, TgtInstanceStartTime}, - {src_record, RepRecDocSrc#doc{revs={SrcRevPos,[SrcRevId]}}}, - {tgt_record, RepRecDocTgt#doc{revs={TgtRevPos,[TgtRevId]}}} - ], - - {ok, NewRepHistory, NewContext} - +dbname(#http_db{} = Db) -> + Db#http_db.url; +dbname(Db) -> + Db#db.name. + +dbinfo(#http_db{} = Db) -> + {DbProps} = couch_rep_httpc:request(Db), + [{list_to_atom(?b2l(K)), V} || {K,V} <- DbProps]; +dbinfo(Db) -> + {ok, Info} = couch_db:get_db_info(Db), + Info. + +has_session_id(_SessionId, []) -> + false; +has_session_id(SessionId, [{Props} | Rest]) -> + case proplists:get_value(<<"session_id">>, Props, nil) of + SessionId -> + true; + _Else -> + has_session_id(SessionId, Rest) end. -do_http_request(Url, Action, Headers, Auth) -> - do_http_request(Url, Action, Headers, Auth, []). - -do_http_request(Url, Action, Headers, Auth, JsonBody) -> - Headers0 = case Auth of - {Props} -> - % Add OAuth header - {OAuth} = proplists:get_value(<<"oauth">>, Props), - ConsumerKey = ?b2l(proplists:get_value(<<"consumer_key">>, OAuth)), - Token = ?b2l(proplists:get_value(<<"token">>, OAuth)), - TokenSecret = ?b2l(proplists:get_value(<<"token_secret">>, OAuth)), - ConsumerSecret = ?b2l(proplists:get_value(<<"consumer_secret">>, OAuth)), - Consumer = {ConsumerKey, ConsumerSecret, hmac_sha1}, - Method = case Action of - get -> "GET"; - post -> "POST"; - put -> "PUT" - end, - Params = oauth:signed_params(Method, Url, [], Consumer, Token, TokenSecret), - [{"Authorization", "OAuth " ++ oauth_uri:params_to_header_string(Params)} | Headers]; - _Else -> - Headers - end, - do_http_request0(Url, Action, Headers0, JsonBody, 10, 1000). - -do_http_request0(Url, Action, Headers, Body, Retries, Pause) when is_binary(Url) -> - do_http_request0(?b2l(Url), Action, Headers, Body, Retries, Pause); -do_http_request0(Url, Action, _Headers, _JsonBody, 0, _Pause) -> - ?LOG_ERROR("couch_rep HTTP ~p request failed after 10 retries: ~s", - [Action, Url]), - exit({http_request_failed, ?l2b(["failed to replicate ", Url])}); -do_http_request0(Url, Action, Headers, JsonBody, Retries, Pause) -> - ?LOG_DEBUG("couch_rep HTTP ~p request: ~s", [Action, Url]), - Body = - case JsonBody of - [] -> - <<>>; +make_replication_id({Props}, UserCtx) -> + %% funky algorithm to preserve backwards compatibility + {ok, HostName} = inet:gethostname(), + Src = get_rep_endpoint(UserCtx, proplists:get_value(<<"source">>, Props)), + Tgt = get_rep_endpoint(UserCtx, proplists:get_value(<<"target">>, Props)), + couch_util:to_hex(erlang:md5(term_to_binary([HostName, Src, Tgt]))). + +maybe_add_trailing_slash(Url) -> + re:replace(Url, "[^/]$", "&/", [{return, list}]). + +get_rep_endpoint(_UserCtx, {Props}) -> + Url = maybe_add_trailing_slash(proplists:get_value(<<"url">>, Props)), + {BinHeaders} = proplists:get_value(<<"headers">>, Props, {[]}), + {Auth} = proplists:get_value(<<"auth">>, Props, {[]}), + case proplists:get_value(<<"oauth">>, Auth) of + undefined -> + {remote, Url, [{?b2l(K),?b2l(V)} || {K,V} <- BinHeaders]}; + {OAuth} -> + {remote, Url, [{?b2l(K),?b2l(V)} || {K,V} <- BinHeaders], OAuth} + end; +get_rep_endpoint(_UserCtx, <<"http://",_/binary>>=Url) -> + {remote, maybe_add_trailing_slash(Url), []}; +get_rep_endpoint(_UserCtx, <<"https://",_/binary>>=Url) -> + {remote, maybe_add_trailing_slash(Url), []}; +get_rep_endpoint(UserCtx, <<DbName/binary>>) -> + {local, DbName, UserCtx}. + +open_replication_log(#http_db{}=Db, RepId) -> + DocId = ?LOCAL_DOC_PREFIX ++ RepId, + Req = Db#http_db{resource=couch_util:url_encode(DocId)}, + case couch_rep_httpc:request(Req) of + {[{<<"error">>, _}, {<<"reason">>, _}]} -> + ?LOG_DEBUG("didn't find a replication log for ~s", [Db#http_db.url]), + #doc{id=?l2b(DocId)}; + Doc -> + ?LOG_DEBUG("found a replication log for ~s", [Db#http_db.url]), + couch_doc:from_json_obj(Doc) + end; +open_replication_log(Db, RepId) -> + DocId = ?l2b(?LOCAL_DOC_PREFIX ++ RepId), + case couch_db:open_doc(Db, DocId, []) of + {ok, Doc} -> + ?LOG_DEBUG("found a replication log for ~s", [Db#db.name]), + Doc; _ -> - iolist_to_binary(?JSON_ENCODE(JsonBody)) - end, - Options = case Action of - get -> []; - _ -> [{transfer_encoding, {chunked, 65535}}] - end ++ [ - {content_type, "application/json; charset=utf-8"}, - {max_pipeline_size, 101}, - {response_format, binary} - ], - case ibrowse:send_req(Url, Headers, Action, Body, Options, infinity) of - {ok, Status, ResponseHeaders, ResponseBody} -> - ResponseCode = list_to_integer(Status), - if - ResponseCode >= 200, ResponseCode < 300 -> - ?JSON_DECODE(ResponseBody); - ResponseCode >= 300, ResponseCode < 400 -> - RedirectUrl = mochiweb_headers:get_value("Location", - mochiweb_headers:make(ResponseHeaders)), - do_http_request0(RedirectUrl, Action, Headers, JsonBody, Retries-1, - Pause); - ResponseCode >= 400, ResponseCode < 500 -> - ?JSON_DECODE(ResponseBody); - ResponseCode == 500 -> - ?LOG_INFO("retrying couch_rep HTTP ~p request in ~p seconds " ++ - "due to 500 error: ~s", [Action, Pause/1000, Url]), - timer:sleep(Pause), - do_http_request0(Url, Action, Headers, JsonBody, Retries - 1, 2*Pause) - end; - {error, Reason} -> - ?LOG_INFO("retrying couch_rep HTTP ~p request in ~p seconds due to " ++ - "{error, ~p}: ~s", [Action, Pause/1000, Reason, Url]), - timer:sleep(Pause), - do_http_request0(Url, Action, Headers, JsonBody, Retries - 1, 2*Pause) + ?LOG_DEBUG("didn't find a replication log for ~s", [Db#db.name]), + #doc{id=DocId} end. -ensure_full_commit(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth}) -> - {ResultProps} = do_http_request(DbUrl ++ "_ensure_full_commit", post, - Headers, OAuth, true), - true = proplists:get_value(<<"ok">>, ResultProps), - {ok, proplists:get_value(<<"instance_start_time">>, ResultProps)}; -ensure_full_commit(Db) -> - couch_db:ensure_full_commit(Db). - -enum_docs_since(Pid, DbSource, DbTarget, {StartSeq, RevsCount}) -> - case get_doc_info_list(DbSource, StartSeq) of - [] -> - gen_server:call(Pid, {fin, {StartSeq, RevsCount}}, infinity); - DocInfoList -> - SrcRevsList = lists:map(fun(#doc_info{id=Id,revs=RevInfos}) -> - SrcRevs = [Rev || #rev_info{rev=Rev} <- RevInfos], - {Id, SrcRevs} - end, DocInfoList), - {ok, MissingRevs} = get_missing_revs(DbTarget, SrcRevsList), - - %% do we need to check for success here? - [gen_server:call(Pid, {replicate_doc, Info}, infinity) - || Info <- MissingRevs ], - - #doc_info{high_seq=LastSeq} = lists:last(DocInfoList), - RevsCount2 = RevsCount + length(SrcRevsList), - gen_server:cast(Pid, {increment_update_seq, LastSeq}), - - enum_docs_since(Pid, DbSource, DbTarget, {LastSeq, RevsCount2}) - end. - - - -get_db_info(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth}) -> - {DbProps} = do_http_request(DbUrl, get, Headers, OAuth), - {ok, [{list_to_atom(?b2l(K)), V} || {K,V} <- DbProps]}; -get_db_info(Db) -> - couch_db:get_db_info(Db). - -get_doc_info_list(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth}, StartSeq) -> - Url = DbUrl ++ "_all_docs_by_seq?limit=100&startkey=" - ++ integer_to_list(StartSeq), - {Results} = do_http_request(Url, get, Headers, OAuth), - lists:map(fun({RowInfoList}) -> - {RowValueProps} = proplists:get_value(<<"value">>, RowInfoList), - Seq = proplists:get_value(<<"key">>, RowInfoList), - Revs = - [#rev_info{rev=couch_doc:parse_rev(proplists:get_value(<<"rev">>, RowValueProps)), deleted = proplists:get_value(<<"deleted">>, RowValueProps, false)} | - [#rev_info{rev=Rev,deleted=false} || Rev <- couch_doc:parse_revs(proplists:get_value(<<"conflicts">>, RowValueProps, []))] ++ - [#rev_info{rev=Rev,deleted=true} || Rev <- couch_doc:parse_revs(proplists:get_value(<<"deleted_conflicts">>, RowValueProps, []))]], - #doc_info{ - id=proplists:get_value(<<"id">>, RowInfoList), - high_seq = Seq, - revs = Revs - } - end, proplists:get_value(<<"rows">>, Results)); -get_doc_info_list(DbSource, StartSeq) -> - {ok, {_Count, DocInfoList}} = couch_db:enum_docs_since(DbSource, StartSeq, - fun (_, _, {100, DocInfoList}) -> - {stop, {100, DocInfoList}}; - (DocInfo, _, {Count, DocInfoList}) -> - {ok, {Count+1, [DocInfo|DocInfoList]}} - end, {0, []}), - lists:reverse(DocInfoList). - -get_missing_revs(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth}, DocIdRevsList) -> - DocIdRevsList2 = [{Id, couch_doc:rev_to_strs(Revs)} || {Id, Revs} <- DocIdRevsList], - {ResponseMembers} = do_http_request(DbUrl ++ "_missing_revs", post, Headers, OAuth, - {DocIdRevsList2}), - {DocMissingRevsList} = proplists:get_value(<<"missing_revs">>, ResponseMembers), - DocMissingRevsList2 = [{Id, couch_doc:parse_revs(MissingRevStrs)} || {Id, MissingRevStrs} <- DocMissingRevsList], - {ok, DocMissingRevsList2}; -get_missing_revs(Db, DocId) -> - couch_db:get_missing_revs(Db, DocId). - - -open_doc(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth}, DocId, Options) -> - [] = Options, - case do_http_request(DbUrl ++ couch_util:url_encode(DocId), get, Headers, OAuth) of - {[{<<"error">>, ErrId}, {<<"reason">>, Reason}]} -> - {couch_util:to_existing_atom(ErrId), Reason}; - Doc -> - {ok, couch_doc:from_json_obj(Doc)} +open_db({Props}, _UserCtx) -> + Url = maybe_add_trailing_slash(proplists:get_value(<<"url">>, Props)), + {AuthProps} = proplists:get_value(<<"auth">>, Props, {[]}), + {BinHeaders} = proplists:get_value(<<"headers">>, Props, {[]}), + Headers = [{?b2l(K),?b2l(V)} || {K,V} <- BinHeaders], + DefaultHeaders = (#http_db{})#http_db.headers, + Db = #http_db{ + url = Url, + auth = AuthProps, + headers = lists:ukeymerge(1, Headers, DefaultHeaders) + }, + case couch_rep_httpc:db_exists(Db) of + true -> Db; + false -> throw({db_not_found, Url}) end; -open_doc(Db, DocId, Options) -> - couch_db:open_doc(Db, DocId, Options). - -open_doc_revs(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth} = DbS, DocId, Revs0, - [latest]) -> - Revs = couch_doc:rev_to_strs(Revs0), - BaseUrl = DbUrl ++ couch_util:url_encode(DocId) ++ "?revs=true&latest=true", - - %% MochiWeb expects URLs < 8KB long, so maybe split into multiple requests - MaxN = trunc((8192 - length(BaseUrl))/14), - - JsonResults = case length(Revs) > MaxN of - false -> - Url = ?l2b(BaseUrl ++ "&open_revs=" ++ ?JSON_ENCODE(Revs)), - do_http_request(Url, get, Headers, OAuth); - true -> - {_, Rest, Acc} = lists:foldl( - fun(Rev, {Count, RevsAcc, AccResults}) when Count =:= MaxN -> - QSRevs = ?JSON_ENCODE(lists:reverse(RevsAcc)), - Url = ?l2b(BaseUrl ++ "&open_revs=" ++ QSRevs), - {1, [Rev], AccResults++do_http_request(Url, get, Headers, OAuth)}; - (Rev, {Count, RevsAcc, AccResults}) -> - {Count+1, [Rev|RevsAcc], AccResults} - end, {0, [], []}, Revs), - Acc ++ do_http_request(?l2b(BaseUrl ++ "&open_revs=" ++ - ?JSON_ENCODE(lists:reverse(Rest))), get, Headers, OAuth) - end, - - Results = - lists:map( - fun({[{<<"missing">>, Rev}]}) -> - {{not_found, missing}, couch_doc:parse_rev(Rev)}; - ({[{<<"ok">>, JsonDoc}]}) -> - #doc{id=Id, revs=Rev, atts=Atts} = Doc = - couch_doc:from_json_obj(JsonDoc), - {ok, Doc#doc{atts=[att_stub_converter(DbS,Id,Rev,A) || A <- Atts]}} - end, JsonResults), - {ok, Results}; -open_doc_revs(Db, DocId, Revs, Options) -> - couch_db:open_doc_revs(Db, DocId, Revs, Options). - -%% @spec should_flush() -> true | false -%% @doc Calculates whether it's time to flush the document buffer. Considers -%% - memory utilization -%% - number of pending document writes -%% - approximate number of pending attachment writes -should_flush(DocCount) when DocCount > ?BUFFER_NDOCS -> - true; -should_flush(_DocCount) -> - MeAndMyLinks = [self()| - [P || P <- element(2,process_info(self(),links)), is_pid(P)]], - - case length(MeAndMyLinks)/2 > ?BUFFER_NATTACHMENTS of - true -> true; - false -> - case memory_footprint(MeAndMyLinks) > 2*?BUFFER_MEMORY of - true -> - [garbage_collect(Pid) || Pid <- MeAndMyLinks], - memory_footprint(MeAndMyLinks) > ?BUFFER_MEMORY; - false -> false - end - end. - -%% @spec memory_footprint([pid()]) -> integer() -%% @doc Sum of process and binary memory utilization for all processes in list -memory_footprint(PidList) -> - memory_footprint(PidList, {0,0}). - -memory_footprint([], {ProcessMemory, BinaryMemory}) -> - ?LOG_DEBUG("ProcessMem ~p BinaryMem ~p", [ProcessMemory, BinaryMemory]), - ProcessMemory + BinaryMemory; -memory_footprint([Pid|Rest], {ProcAcc, BinAcc}) -> - case is_process_alive(Pid) of - true -> - ProcMem = element(2,process_info(Pid, memory)), - BinMem = binary_memory(Pid), - memory_footprint(Rest, {ProcMem + ProcAcc, BinMem + BinAcc}); - false -> - memory_footprint(Rest, {ProcAcc, BinAcc}) +open_db(<<"http://",_/binary>>=Url, _) -> + open_db({[{<<"url">>,Url}]}, []); +open_db(<<"https://",_/binary>>=Url, _) -> + open_db({[{<<"url">>,Url}]}, []); +open_db(<<DbName/binary>>, UserCtx) -> + case couch_db:open(DbName, [{user_ctx, UserCtx}]) of + {ok, Db} -> Db; + {not_found, no_db_file} -> throw({db_not_found, DbName}) end. -%% @spec binary_memory(pid()) -> integer() -%% @doc Memory utilization of all binaries referenced by this process. -binary_memory(Pid) -> - lists:foldl(fun({_Id, Size, _NRefs}, Acc) -> Size+Acc end, - 0, element(2,process_info(Pid, binary))). - -update_doc(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth}, #doc{id=DocId}=Doc, Options) -> - [] = Options, - Url = DbUrl ++ couch_util:url_encode(DocId), - {ResponseMembers} = do_http_request(Url, put, Headers, OAuth, - couch_doc:to_json_obj(Doc, [attachments])), +do_checkpoint(State) -> + #state{ + source = Source, + target = Target, + committed_seq = NewSeqNum, + start_seq = StartSeqNum, + history = OldHistory, + source_log = SourceLog, + target_log = TargetLog, + rep_starttime = ReplicationStartTime, + src_starttime = SrcInstanceStartTime, + tgt_starttime = TgtInstanceStartTime, + stats = Stats + } = State, + ?LOG_INFO("recording a checkpoint at source update_seq ~p", [NewSeqNum]), + RecordSeqNum = case commit_to_both(Source, Target) of + {SrcInstanceStartTime, TgtInstanceStartTime} -> + NewSeqNum; + _Else -> + ?LOG_INFO("A server has restarted sinced replication start. " + "Not recording the new sequence number to ensure the " + "replication is redone and documents reexamined.", []), + StartSeqNum + end, + SessionId = couch_util:new_uuid(), + NewHistoryEntry = {[ + {<<"session_id">>, SessionId}, + {<<"start_time">>, list_to_binary(ReplicationStartTime)}, + {<<"end_time">>, list_to_binary(httpd_util:rfc1123_date())}, + {<<"start_last_seq">>, StartSeqNum}, + {<<"end_last_seq">>, NewSeqNum}, + {<<"recorded_seq">>, RecordSeqNum}, + {<<"missing_checked">>, ets:lookup_element(Stats, total_revs, 2)}, + {<<"missing_found">>, ets:lookup_element(Stats, missing_revs, 2)}, + {<<"docs_read">>, ets:lookup_element(Stats, docs_read, 2)}, + {<<"docs_written">>, ets:lookup_element(Stats, docs_written, 2)}, + {<<"doc_write_failures">>, + ets:lookup_element(Stats, doc_write_failures, 2)} + ]}, + % limit history to 50 entries + NewRepHistory = {[ + {<<"session_id">>, SessionId}, + {<<"source_last_seq">>, RecordSeqNum}, + {<<"history">>, lists:sublist([NewHistoryEntry | OldHistory], 50)} + ]}, + % ?LOG_DEBUG("updating src doc ~p", [SourceLog]), + {SrcRevPos,SrcRevId} = + update_doc(Source, SourceLog#doc{body=NewRepHistory}, []), + % ?LOG_DEBUG("updating tgt doc ~p", [TargetLog]), + {TgtRevPos,TgtRevId} = + update_doc(Target, TargetLog#doc{body=NewRepHistory}, []), + State#state{ + checkpoint_history = NewRepHistory, + source_log = SourceLog#doc{revs={SrcRevPos, [SrcRevId]}}, + target_log = TargetLog#doc{revs={TgtRevPos, [TgtRevId]}} + }. + +commit_to_both(Source, Target) -> + % commit the src async + ParentPid = self(), + SrcCommitPid = spawn_link(fun() -> + ParentPid ! {self(), ensure_full_commit(Source)} end), + + % commit tgt sync + TargetStartTime = ensure_full_commit(Target), + + SourceStartTime = + receive + {SrcCommitPid, Timestamp} -> + Timestamp; + {'EXIT', SrcCommitPid, {http_request_failed, _}} -> + exit(replication_link_failure) + end, + {SourceStartTime, TargetStartTime}. + +ensure_full_commit(#http_db{} = Db) -> + Req = Db#http_db{ + resource = "_ensure_full_commit", + method = post, + body = true + }, + {ResultProps} = couch_rep_httpc:request(Req), + true = proplists:get_value(<<"ok">>, ResultProps), + proplists:get_value(<<"instance_start_time">>, ResultProps); +ensure_full_commit(Db) -> + {ok, StartTime} = couch_db:ensure_full_commit(Db), + StartTime. + +update_doc(#http_db{} = Db, #doc{id=DocId} = Doc, []) -> + Req = Db#http_db{ + resource = couch_util:url_encode(DocId), + method = put, + body = couch_doc:to_json_obj(Doc, [attachments]) + }, + {ResponseMembers} = couch_rep_httpc:request(Req), Rev = proplists:get_value(<<"rev">>, ResponseMembers), - {ok, couch_doc:parse_rev(Rev)}; + couch_doc:parse_rev(Rev); update_doc(Db, Doc, Options) -> - couch_db:update_doc(Db, Doc, Options). - -update_docs(_, [], _, _) -> - {ok, []}; -update_docs(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth}, Docs, [], replicated_changes) -> - JsonDocs = [couch_doc:to_json_obj(Doc, [revs,attachments]) || Doc <- Docs], - ErrorsJson = - do_http_request(DbUrl ++ "_bulk_docs", post, Headers, OAuth, - {[{new_edits, false}, {docs, JsonDocs}]}), - ErrorsList = - lists:map( - fun({Props}) -> - Id = proplists:get_value(<<"id">>, Props), - Rev = couch_doc:parse_rev(proplists:get_value(<<"rev">>, Props)), - ErrId = couch_util:to_existing_atom( - proplists:get_value(<<"error">>, Props)), - Reason = proplists:get_value(<<"reason">>, Props), - Error = {ErrId, Reason}, - {{Id, Rev}, Error} - end, ErrorsJson), - {ok, ErrorsList}; -update_docs(Db, Docs, Options, UpdateType) -> - couch_db:update_docs(Db, Docs, Options, UpdateType). - -up_to_date(#old_http_db{}, _Seq) -> + {ok, Result} = couch_db:update_doc(Db, Doc, Options), + Result. + +up_to_date(#http_db{}, _Seq) -> true; up_to_date(Source, Seq) -> {ok, NewDb} = couch_db:open(Source#db.name, []), T = NewDb#db.update_seq == Seq, couch_db:close(NewDb), T. - diff --git a/src/couchdb/couch_rep_att.erl b/src/couchdb/couch_rep_att.erl new file mode 100644 index 00000000..baeb6c65 --- /dev/null +++ b/src/couchdb/couch_rep_att.erl @@ -0,0 +1,100 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_rep_att). + +-export([convert_stub/2, cleanup/0]). + +-include("couch_db.hrl"). + +convert_stub(#att{data=stub} = Attachment, {#http_db{} = Db, Id, Rev}) -> + {Pos, [RevId|_]} = Rev, + Name = Attachment#att.name, + Request = Db#http_db{ + resource = lists:flatten([couch_util:url_encode(Id), "/", + couch_util:url_encode(Name)]), + qs = [{rev, couch_doc:rev_to_str({Pos,RevId})}] + }, + Ref = make_ref(), + RcvFun = fun() -> attachment_receiver(Ref, Request) end, + Attachment#att{data=RcvFun}. + +cleanup() -> + receive + {ibrowse_async_response, _, _} -> + %% TODO maybe log, didn't expect to have data here + cleanup(); + {ibrowse_async_response_end, _} -> + cleanup() + after 0 -> + erase(), + ok + end. + +% internal funs + +attachment_receiver(Ref, Request) -> + case get(Ref) of + undefined -> + ReqId = start_http_request(Request), + put(Ref, ReqId), + receive_data(Ref, ReqId); + ReqId -> + receive_data(Ref, ReqId) + end. + +receive_data(Ref, ReqId) -> + receive + {ibrowse_async_response, ReqId, {chunk_start,_}} -> + receive_data(Ref, ReqId); + {ibrowse_async_response, ReqId, chunk_end} -> + receive_data(Ref, ReqId); + {ibrowse_async_response, ReqId, {error, Err}} -> + ?LOG_ERROR("streaming attachment ~p failed with ~p", [ReqId, Err]), + throw({attachment_request_failed, Err}); + {ibrowse_async_response, ReqId, Data} -> + % ?LOG_DEBUG("got ~p bytes for ~p", [size(Data), ReqId]), + Data; + {ibrowse_async_response_end, ReqId} -> + ?LOG_ERROR("streaming att. ended but more data requested ~p", [ReqId]), + throw({attachment_request_failed, premature_end}) + end. + +start_http_request(Req) -> + %% set stream_to here because self() has changed + Req2 = Req#http_db{options = [{stream_to,self()} | Req#http_db.options]}, + {ibrowse_req_id, ReqId} = couch_rep_httpc:request(Req2), + receive {ibrowse_async_headers, ReqId, Code, Headers} -> + case validate_headers(Req2, list_to_integer(Code), Headers) of + ok -> + ReqId; + {ok, NewReqId} -> + NewReqId + end + end. + +validate_headers(_Req, 200, _Headers) -> + ok; +validate_headers(Req, Code, Headers) when Code > 299, Code < 400 -> + %% TODO check that the qs is actually included in the Location header + %% TODO this only supports one level of redirection + Url = mochiweb_headers:get_value("Location",mochiweb_headers:make(Headers)), + NewReq = Req#http_db{url=Url, resource="", qs=[]}, + {ibrowse_req_id, ReqId} = couch_rep_httpc:request(NewReq), + receive {ibrowse_async_headers, ReqId, NewCode, NewHeaders} -> + ok = validate_headers(NewReq, list_to_integer(NewCode), NewHeaders) + end, + {ok, ReqId}; +validate_headers(Req, Code, _Headers) -> + #http_db{url=Url, resource=Resource} = Req, + ?LOG_ERROR("got ~p for ~s~s", [Code, Url, Resource]), + throw({attachment_request_failed, {bad_code, Code}}). diff --git a/src/couchdb/couch_rep_changes_feed.erl b/src/couchdb/couch_rep_changes_feed.erl index 7c2d05b0..ce9e4812 100644 --- a/src/couchdb/couch_rep_changes_feed.erl +++ b/src/couchdb/couch_rep_changes_feed.erl @@ -47,6 +47,7 @@ stop(Server) -> gen_server:call(Server, stop). init([_Parent, #http_db{}=Source, Since, PostProps]) -> + process_flag(trap_exit, true), Feed = case proplists:get_value(<<"continuous">>, PostProps, false) of false -> normal; diff --git a/src/couchdb/couch_rep_missing_revs.erl b/src/couchdb/couch_rep_missing_revs.erl index bd7cda66..7e1dc16a 100644 --- a/src/couchdb/couch_rep_missing_revs.erl +++ b/src/couchdb/couch_rep_missing_revs.erl @@ -51,19 +51,17 @@ init([Parent, Target, ChangesFeed, _PostProps]) -> {ok, #state{changes_loop=Pid, target=Target, parent=Parent}}. handle_call({add_missing_revs, {HighSeq, Revs}}, From, State) -> + State#state.parent ! {update_stats, missing_revs, length(Revs)}, handle_add_missing_revs(HighSeq, Revs, From, State); handle_call(next_missing_revs, From, State) -> - handle_next_missing_revs(From, State); + handle_next_missing_revs(From, State). -handle_call({update_committed_seq, N}, _From, State) -> +handle_cast({update_committed_seq, N}, State) -> if State#state.high_committed_seq < N -> ?LOG_DEBUG("missing_revs updating committed seq to ~p", [N]); true -> ok end, - {reply, ok, State#state{high_committed_seq=N}}. - -handle_cast(_Msg, State) -> - {noreply, State}. + {noreply, State#state{high_committed_seq=N}}. handle_info({'EXIT', Pid, Reason}, #state{changes_loop=Pid} = State) -> handle_changes_loop_exit(Reason, State); @@ -84,8 +82,9 @@ code_change(_OldVsn, State, _Extra) -> %internal funs handle_add_missing_revs(HighSeq, [], _From, State) -> - maybe_checkpoint(State), - {reply, ok, State#state{high_source_seq=HighSeq}}; + NewState = State#state{high_source_seq=HighSeq}, + maybe_checkpoint(NewState), + {reply, ok, NewState}; handle_add_missing_revs(HighSeq, Revs, From, #state{reply_to=nil} = State) -> #state{rows=Rows, count=Count} = State, NewState = State#state{ diff --git a/src/couchdb/couch_rep_reader.erl b/src/couchdb/couch_rep_reader.erl new file mode 100644 index 00000000..d17c6c59 --- /dev/null +++ b/src/couchdb/couch_rep_reader.erl @@ -0,0 +1,268 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_rep_reader). +-behaviour(gen_server). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). + +-export([start_link/4, next/1]). + +-import(couch_util, [url_encode/1]). + +-define (BUFFER_SIZE, 1000). +-define (MAX_CONCURRENT_REQUESTS, 100). +-define (MAX_CONNECTIONS, 20). +-define (MAX_PIPELINE_SIZE, 50). + +-include("couch_db.hrl"). +-include("../ibrowse/ibrowse.hrl"). + +-record (state, { + parent, + source, + missing_revs, + reader_loop, + reader_from = nil, + count = 0, + docs = queue:new(), + reply_to = nil, + complete = false, + monitor_count = 0, + monitor_count_by_seq = ets:new(monitor_count_by_seq, [set, private]), + monitors_by_ref = ets:new(monitors_by_ref, [set, private]), + pending_doc_request = nil, + high_missing_seq = 0 +}). + +start_link(Parent, Source, MissingRevs, PostProps) -> + gen_server:start_link(?MODULE, [Parent, Source, MissingRevs, PostProps], []). + +next(Pid) -> + gen_server:call(Pid, next_docs, infinity). + +init([Parent, Source, MissingRevs, _PostProps]) -> + process_flag(trap_exit, true), + if is_record(Source, http_db) -> + #url{host=Host, port=Port} = ibrowse_lib:parse_url(Source#http_db.url), + ibrowse:set_max_sessions(Host, Port, ?MAX_CONNECTIONS), + ibrowse:set_max_pipeline_size(Host, Port, ?MAX_PIPELINE_SIZE); + true -> ok end, + Self = self(), + ReaderLoop = spawn_link(fun() -> reader_loop(Self, Source, MissingRevs) end), + State = #state{ + parent = Parent, + source = Source, + missing_revs = MissingRevs, + reader_loop = ReaderLoop + }, + {ok, State}. + +handle_call({add_docs, Docs}, From, State) -> + State#state.parent ! {update_stats, docs_read, length(Docs)}, + handle_add_docs(lists:flatten(Docs), From, State); + +handle_call(next_docs, From, State) -> + handle_next_docs(From, State); + +handle_call({open_doc_revs, Id, Revs, HighSeq}, From, State) -> + handle_open_doc_revs(Id, Revs, HighSeq, From, State); + +handle_call({set_monitor_count, Seq, Count}, _From, State) -> + ets:insert(State#state.monitor_count_by_seq, {Seq,Count}), + {reply, ok, State}; + +handle_call({update_high_seq, HighSeq}, _From, State) -> + {reply, ok, State#state{high_missing_seq=HighSeq}}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info({'DOWN', Ref, _, _, Reason}, State) -> + handle_monitor_down(Reason, Ref, State); + +handle_info({'EXIT', Loop, complete}, #state{reader_loop=Loop} = State) -> + handle_reader_loop_complete(State). + +terminate(Reason, _State) -> + % ?LOG_INFO("rep reader terminating with reason ~p", [Reason]), + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%internal funs + +handle_add_docs(DocsToAdd, From, #state{reply_to=nil} = State) -> + NewState = State#state{ + docs = queue:join(State#state.docs, queue:from_list(DocsToAdd)), + count = State#state.count + length(DocsToAdd) + }, + if NewState#state.count < ?BUFFER_SIZE -> + {reply, ok, NewState}; + true -> + {noreply, NewState#state{reader_from=From}} + end; +handle_add_docs(DocsToAdd, _From, #state{count=0} = State) -> + HighSeq = State#state.high_missing_seq, + gen_server:reply(State#state.reply_to, {HighSeq, DocsToAdd}), + {reply, ok, State#state{reply_to=nil}}. + +handle_next_docs(From, #state{count=0} = State) -> + if State#state.complete -> + {stop, normal, {complete, State#state.high_missing_seq}, State}; + true -> + {noreply, State#state{reply_to=From}} + end; +handle_next_docs(_From, State) -> + #state{ + reader_from = ReaderFrom, + docs = Docs, + high_missing_seq = HighSeq + } = State, + if ReaderFrom =/= nil -> + gen_server:reply(ReaderFrom, ok); + true -> ok end, + NewState = State#state{count=0, reader_from=nil, docs=queue:new()}, + {reply, {HighSeq, queue:to_list(Docs)}, NewState}. + +handle_open_doc_revs(Id, Revs, Seq, From, #state{monitor_count=N} = State) + when N > ?MAX_CONCURRENT_REQUESTS -> + {noreply, State#state{pending_doc_request={From,Id,Revs,Seq}}}; +handle_open_doc_revs(Id, Revs, Seq, _From, #state{source=#http_db{}} = State) -> + #state{ + monitor_count = Count, + monitors_by_ref = MonitorsByRef, + source = Source + } = State, + {_, Ref} = spawn_document_request(Source, Id, Revs), + ets:insert(MonitorsByRef, {Ref, Seq}), + {reply, ok, State#state{monitor_count = Count+1}}. + +handle_monitor_down(normal, Ref, #state{pending_doc_request=nil, + monitor_count=1, complete=waiting_on_monitors} = State) -> + N = calculate_new_high_seq(State, Ref), + {noreply, State#state{complete=true, monitor_count=0, high_missing_seq=N}}; +handle_monitor_down(normal, Ref, #state{pending_doc_request=nil} = State) -> + #state{monitor_count = Count} = State, + HighSeq = calculate_new_high_seq(State, Ref), + {noreply, State#state{monitor_count = Count-1, high_missing_seq=HighSeq}}; +handle_monitor_down(normal, Ref, State) -> + #state{ + source = Source, + monitors_by_ref = MonitorsByRef, + pending_doc_request = {From, Id, Revs, Seq} + } = State, + HighSeq = calculate_new_high_seq(State, Ref), + gen_server:reply(From, ok), + {_, NewRef} = spawn_document_request(Source, Id, Revs), + ets:insert(MonitorsByRef, {NewRef, Seq}), + {noreply, State#state{pending_doc_request=nil, high_missing_seq=HighSeq}}; +handle_monitor_down(Reason, _, State) -> + {stop, Reason, State}. + +handle_reader_loop_complete(#state{reply_to=nil, monitor_count=0} = State) -> + {noreply, State#state{complete = true}}; +handle_reader_loop_complete(#state{monitor_count=0} = State) -> + HighSeq = State#state.high_missing_seq, + gen_server:reply(State#state.reply_to, {complete, HighSeq}), + {stop, normal, State}; +handle_reader_loop_complete(State) -> + {noreply, State#state{complete = waiting_on_monitors}}. + +split_revlist(Rev, {[CurrentAcc|Rest], BaseLength, Length}) -> + case Length+size(Rev) > 8192 of + false -> + {[[Rev|CurrentAcc] | Rest], BaseLength, Length+size(Rev)}; + true -> + {[[Rev],CurrentAcc|Rest], BaseLength, BaseLength} + end. + +open_doc_revs(#http_db{} = DbS, DocId, Revs) -> + %% all this logic just splits up revision lists that are too long for + %% MochiWeb into multiple requests + BaseQS = [{revs,true}, {latest,true}], + BaseReq = DbS#http_db{resource=url_encode(DocId), qs=BaseQS}, + BaseLength = length(couch_rep_httpc:full_url(BaseReq)) + 11, % &open_revs= + + {RevLists, _, _} = lists:foldl(fun split_revlist/2, + {[[]], BaseLength, BaseLength}, couch_doc:rev_to_strs(Revs)), + + Requests = [BaseReq#http_db{ + qs = [{open_revs, ?JSON_ENCODE(RevList)} | BaseQS] + } || RevList <- RevLists], + JsonResults = lists:flatten([couch_rep_httpc:request(R) || R <- Requests]), + + Transform = + fun({[{<<"missing">>, Rev}]}) -> + {{not_found, missing}, couch_doc:parse_rev(Rev)}; + ({[{<<"ok">>, Json}]}) -> + #doc{id=Id, revs=Rev, atts=Atts} = Doc = couch_doc:from_json_obj(Json), + Doc#doc{atts=[couch_rep_att:convert_stub(A, {DbS,Id,Rev}) || A <- Atts]} + end, + [Transform(Result) || Result <- JsonResults]. + +reader_loop(ReaderServer, Source, MissingRevsServer) -> + case couch_rep_missing_revs:next(MissingRevsServer) of + complete -> + % ?LOG_INFO("reader_loop terminating with complete", []), + exit(complete); + {HighSeq, IdsRevs} -> + % ?LOG_DEBUG("got IdsRevs ~p", [IdsRevs]), + case Source of + #http_db{} -> + N = length(IdsRevs), + gen_server:call(ReaderServer, {set_monitor_count, HighSeq, N}), + [gen_server:call(ReaderServer, {open_doc_revs, Id, Revs, HighSeq}) + || {Id,Revs} <- IdsRevs]; + _Local -> + lists:foreach(fun({Id,Revs}) -> + {ok, Docs} = couch_db:open_doc_revs(Source, Id, Revs, [latest]), + JustTheDocs = [Doc || {ok, Doc} <- Docs], + gen_server:call(ReaderServer, {add_docs, JustTheDocs}) + end, IdsRevs), + gen_server:call(ReaderServer, {update_high_seq, HighSeq}) + end + end, + reader_loop(ReaderServer, Source, MissingRevsServer). + +spawn_document_request(Source, Id, Revs) -> + Server = self(), + SpawnFun = fun() -> + Results = open_doc_revs(Source, Id, Revs), + gen_server:call(Server, {add_docs, Results}) + end, + spawn_monitor(SpawnFun). + +%% check if any more HTTP requests are pending for this update sequence +calculate_new_high_seq(State, Ref) -> + #state{ + monitors_by_ref = MonitorsByRef, + monitor_count_by_seq = MonitorCountBySeq, + high_missing_seq = OldSeq + } = State, + Seq = ets:lookup_element(MonitorsByRef, Ref, 2), + ets:delete(MonitorsByRef, Ref), + case ets:update_counter(MonitorCountBySeq, Seq, -1) of + 0 -> + ets:delete(MonitorCountBySeq, Seq), + case ets:first(MonitorCountBySeq) of + Key when Key > Seq -> + Seq; + '$end_of_table' -> + Seq; + _Else -> + OldSeq + end; + _Else -> + OldSeq + end. diff --git a/src/couchdb/couch_rep_writer.erl b/src/couchdb/couch_rep_writer.erl new file mode 100644 index 00000000..8bea63fe --- /dev/null +++ b/src/couchdb/couch_rep_writer.erl @@ -0,0 +1,68 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_rep_writer). + +-export([start_link/4]). + +-include("couch_db.hrl"). + +start_link(Parent, Target, Reader, _PostProps) -> + {ok, spawn_link(fun() -> writer_loop(Parent, Reader, Target) end)}. + +writer_loop(Parent, Reader, Target) -> + % ?LOG_DEBUG("writer loop begin", []), + case couch_rep_reader:next(Reader) of + {complete, FinalSeq} -> + % ?LOG_INFO("writer terminating normally", []), + Parent ! {writer_checkpoint, FinalSeq}, + ok; + {HighSeq, Docs} -> + % ?LOG_DEBUG("writer loop trying to write ~p", [Docs]), + DocCount = length(Docs), + try write_docs(Target, Docs) of + {ok, []} -> + Parent ! {update_stats, docs_written, DocCount}; + {ok, Errors} -> + ErrorCount = length(Errors), + Parent ! {update_stats, doc_write_failures, ErrorCount}, + Parent ! {update_stats, docs_written, DocCount - ErrorCount} + catch + {attachment_request_failed, Err} -> + ?LOG_DEBUG("writer failed to write an attachment ~p", [Err]), + exit({attachment_request_failed, Err, Docs}) + end, + Parent ! {writer_checkpoint, HighSeq}, + couch_rep_att:cleanup(), + writer_loop(Parent, Reader, Target) + end. + +write_docs(#http_db{} = Db, Docs) -> + JsonDocs = [couch_doc:to_json_obj(Doc, [revs,attachments]) || Doc <- Docs], + ErrorsJson = couch_rep_httpc:request(Db#http_db{ + resource = "_bulk_docs", + method = post, + body = {[{new_edits, false}, {docs, JsonDocs}]} + }), + ErrorsList = + lists:map( + fun({Props}) -> + Id = proplists:get_value(<<"id">>, Props), + Rev = couch_doc:parse_rev(proplists:get_value(<<"rev">>, Props)), + ErrId = couch_util:to_existing_atom( + proplists:get_value(<<"error">>, Props)), + Reason = proplists:get_value(<<"reason">>, Props), + {{Id, Rev}, {ErrId, Reason}} + end, ErrorsJson), + {ok, ErrorsList}; +write_docs(Db, Docs) -> + couch_db:update_docs(Db, Docs, [], replicated_changes). |