summaryrefslogtreecommitdiff
path: root/src/couchdb/couch_rep.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/couchdb/couch_rep.erl')
-rw-r--r--src/couchdb/couch_rep.erl1156
1 files changed, 390 insertions, 766 deletions
diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl
index 1692943a..c5a07685 100644
--- a/src/couchdb/couch_rep.erl
+++ b/src/couchdb/couch_rep.erl
@@ -12,155 +12,97 @@
-module(couch_rep).
-behaviour(gen_server).
--export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
-export([replicate/2]).
--define(BUFFER_NDOCS, 1000).
--define(BUFFER_NATTACHMENTS, 50).
--define(BUFFER_MEMORY, 10000000). %% bytes
-
-include("couch_db.hrl").
--include("../ibrowse/ibrowse.hrl").
-
-%% @spec replicate(Source::binary(), Target::binary()) ->
-%% {ok, Stats} | {error, Reason}
-%% @doc Triggers a replication. Stats is a JSON Object with the following
-%% keys: session_id (UUID), source_last_seq (integer), and history (array).
-%% Each element of the history is an Object with keys start_time, end_time,
-%% start_last_seq, end_last_seq, missing_checked, missing_found, docs_read,
-%% and docs_written.
-%%
-%% The supervisor will try to restart the replication in case of any error
-%% other than shutdown. Just call this function again to listen for the
-%% result of the retry.
-replicate(Source, Target) ->
- {ok, HostName} = inet:gethostname(),
- RepId = couch_util:to_hex(
- erlang:md5(term_to_binary([HostName, Source, Target]))),
- Args = [?MODULE, [RepId, Source,Target], []],
+-record(state, {
+ changes_feed,
+ missing_revs,
+ reader,
+ writer,
+
+ source,
+ target,
+ init_args,
+
+ start_seq,
+ history,
+ source_log,
+ target_log,
+ rep_starttime,
+ src_starttime,
+ tgt_starttime,
+ checkpoint_history = nil,
+
+ listeners = [],
+ complete = false,
+ committed_seq = 0,
+
+ stats = nil
+}).
+%% convenience function to do a simple replication from the shell
+replicate(Source, Target) when is_list(Source) ->
+ replicate(?l2b(Source), Target);
+replicate(Source, Target) when is_binary(Source), is_list(Target) ->
+ replicate(Source, ?l2b(Target));
+replicate(Source, Target) when is_binary(Source), is_binary(Target) ->
+ replicate({[{<<"source">>, Source}, {<<"target">>, Target}]}, #user_ctx{});
+
+%% function handling POST to _replicate
+replicate(PostBody, UserCtx) ->
+ RepId = make_replication_id(PostBody, UserCtx),
Replicator = {RepId,
- {gen_server, start_link, Args},
+ {gen_server, start_link, [?MODULE, [RepId, PostBody, UserCtx], []]},
transient,
1,
worker,
[?MODULE]
},
- Server = case supervisor:start_child(couch_rep_sup, Replicator) of
- {ok, Pid} ->
- ?LOG_INFO("starting new replication ~p at ~p", [RepId, Pid]),
- Pid;
- {error, already_present} ->
- case supervisor:restart_child(couch_rep_sup, RepId) of
- {ok, Pid} ->
- ?LOG_INFO("starting replication ~p at ~p", [RepId, Pid]),
- Pid;
- {error, running} ->
- %% this error occurs if multiple replicators are racing
- %% each other to start and somebody else won. Just grab
- %% the Pid by calling start_child again.
- {error, {already_started, Pid}} =
- supervisor:start_child(couch_rep_sup, Replicator),
- ?LOG_INFO("replication ~p already running at ~p", [RepId, Pid]),
- Pid
- end;
- {error, {already_started, Pid}} ->
- ?LOG_INFO("replication ~p already running at ~p", [RepId, Pid]),
- Pid
- end,
+ Server = start_replication_server(Replicator),
- case gen_server:call(Server, get_result, infinity) of
- retry -> replicate(Source, Target);
- Else -> Else
+ try gen_server:call(Server, get_result, infinity) of
+ retry -> replicate(PostBody, UserCtx);
+ Else -> Else
+ catch
+ exit:{noproc, {gen_server, call, [Server, get_result , infinity]}} ->
+ %% oops, this replication just finished -- restart it.
+ replicate(PostBody, UserCtx);
+ exit:{normal, {gen_server, call, [Server, get_result , infinity]}} ->
+ %% we made the call during terminate
+ replicate(PostBody, UserCtx)
end.
-%%=============================================================================
-%% gen_server callbacks
-%%=============================================================================
-
--record(old_http_db, {
- uri,
- headers,
- oauth
-}).
-
-
--record(state, {
- context,
- current_seq,
- source,
- target,
- stats,
- enum_pid,
- docs_buffer = [],
- listeners = [],
- done = false
-}).
-
-
-init([RepId, Source, Target]) ->
+init([RepId, {PostProps}, UserCtx] = InitArgs) ->
process_flag(trap_exit, true),
- {ok, DbSrc, SrcName} = open_db(Source),
- {ok, DbTgt, TgtName} = open_db(Target),
+ SourceProps = proplists:get_value(<<"source">>, PostProps),
+ TargetProps = proplists:get_value(<<"target">>, PostProps),
- DocKey = ?l2b(?LOCAL_DOC_PREFIX ++ RepId),
+ Source = open_db(SourceProps, UserCtx),
+ Target = open_db(TargetProps, UserCtx),
- {ok, InfoSrc} = get_db_info(DbSrc),
- {ok, InfoTgt} = get_db_info(DbTgt),
+ SourceLog = open_replication_log(Source, RepId),
+ TargetLog = open_replication_log(Target, RepId),
- ReplicationStartTime = httpd_util:rfc1123_date(),
- SrcInstanceStartTime = proplists:get_value(instance_start_time, InfoSrc),
- TgtInstanceStartTime = proplists:get_value(instance_start_time, InfoTgt),
+ SourceInfo = dbinfo(Source),
+ TargetInfo = dbinfo(Target),
+
+ {StartSeq, History} = compare_replication_logs(SourceLog, TargetLog),
- RepRecDocSrc =
- case open_doc(DbSrc, DocKey, []) of
- {ok, SrcDoc} ->
- ?LOG_DEBUG("Found existing replication record on source", []),
- SrcDoc;
- _ -> #doc{id=DocKey}
- end,
-
- RepRecDocTgt =
- case open_doc(DbTgt, DocKey, []) of
- {ok, TgtDoc} ->
- ?LOG_DEBUG("Found existing replication record on target", []),
- TgtDoc;
- _ -> #doc{id=DocKey}
- end,
-
- #doc{body={RepRecProps}} = RepRecDocSrc,
- #doc{body={RepRecPropsTgt}} = RepRecDocTgt,
-
- case proplists:get_value(<<"session_id">>, RepRecProps) ==
- proplists:get_value(<<"session_id">>, RepRecPropsTgt) of
- true ->
- % if the records have the same session id,
- % then we have a valid replication history
- OldSeqNum = proplists:get_value(<<"source_last_seq">>, RepRecProps, 0),
- OldHistory = proplists:get_value(<<"history">>, RepRecProps, []);
- false ->
- ?LOG_INFO("Replication records differ. "
- "Performing full replication instead of incremental.", []),
- ?LOG_DEBUG("Record on source:~p~nRecord on target:~p~n",
- [RepRecProps, RepRecPropsTgt]),
- OldSeqNum = 0,
- OldHistory = []
- end,
-
- Context = [
- {start_seq, OldSeqNum},
- {history, OldHistory},
- {rep_starttime, ReplicationStartTime},
- {src_starttime, SrcInstanceStartTime},
- {tgt_starttime, TgtInstanceStartTime},
- {src_record, RepRecDocSrc},
- {tgt_record, RepRecDocTgt}
- ],
+ {ok, ChangesFeed} =
+ couch_rep_changes_feed:start_link(self(), Source, StartSeq, PostProps),
+ {ok, MissingRevs} =
+ couch_rep_missing_revs:start_link(self(), Target, ChangesFeed, PostProps),
+ {ok, Reader} =
+ couch_rep_reader:start_link(self(), Source, MissingRevs, PostProps),
+ {ok, Writer} =
+ couch_rep_writer:start_link(self(), Target, Reader, PostProps),
Stats = ets:new(replication_stats, [set, private]),
ets:insert(Stats, {total_revs,0}),
@@ -169,158 +111,116 @@ init([RepId, Source, Target]) ->
ets:insert(Stats, {docs_written, 0}),
ets:insert(Stats, {doc_write_failures, 0}),
- couch_task_status:add_task("Replication", <<SrcName/binary, " -> ",
- TgtName/binary>>, "Starting"),
-
- Parent = self(),
- Pid = spawn_link(fun() -> enum_docs_since(Parent,DbSrc,DbTgt,{OldSeqNum,0}) end),
+ {ShortId, _} = lists:split(6, RepId),
+ couch_task_status:add_task("Replication", io_lib:format("~s: ~s -> ~s",
+ [ShortId, dbname(Source), dbname(Target)]), "Starting"),
State = #state{
- context = Context,
- current_seq = OldSeqNum,
- enum_pid = Pid,
- source = DbSrc,
- target = DbTgt,
- stats = Stats
- },
+ changes_feed = ChangesFeed,
+ missing_revs = MissingRevs,
+ reader = Reader,
+ writer = Writer,
- {ok, State}.
-handle_call(get_result, From, #state{listeners=L,done=true} = State) ->
- {stop, normal, State#state{listeners=[From|L]}};
-handle_call(get_result, From, #state{listeners=L} = State) ->
- {noreply, State#state{listeners=[From|L]}};
-
-handle_call({replicate_doc, {Id, Revs}}, {Pid,_}, #state{enum_pid=Pid} = State) ->
- #state{
- context = Context,
- current_seq = Seq,
- docs_buffer = Buffer,
source = Source,
target = Target,
- stats = Stats
- } = State,
+ init_args = InitArgs,
+ stats = Stats,
+
+ start_seq = StartSeq,
+ history = History,
+ source_log = SourceLog,
+ target_log = TargetLog,
+ rep_starttime = httpd_util:rfc1123_date(),
+ src_starttime = proplists:get_value(instance_start_time, SourceInfo),
+ tgt_starttime = proplists:get_value(instance_start_time, TargetInfo)
+ },
+ {ok, State}.
- ets:update_counter(Stats, missing_revs, length(Revs)),
+handle_call(get_result, From, #state{complete=true, listeners=[]} = State) ->
+ {stop, normal, State#state{listeners=[From]}};
+handle_call(get_result, From, State) ->
+ Listeners = State#state.listeners,
+ {noreply, State#state{listeners=[From|Listeners]}}.
- %% get document(s)
- {ok, DocResults} = open_doc_revs(Source, Id, Revs, [latest]),
- Docs = [RevDoc || {ok, RevDoc} <- DocResults],
- ets:update_counter(Stats, docs_read, length(Docs)),
+handle_cast(_Msg, State) ->
+ {noreply, State}.
- %% save them (maybe in a buffer)
- {NewBuffer, NewContext} =
- case should_flush(lists:flatlength([Docs|Buffer])) of
- true ->
- Docs2 = lists:flatten([Docs|Buffer]),
- try update_docs(Target, Docs2, [], replicated_changes) of
- {ok, Errors} ->
- dump_update_errors(Errors),
- ets:update_counter(Stats, doc_write_failures, length(Errors)),
- ets:update_counter(Stats, docs_written, length(Docs2) -
- length(Errors)),
- {ok, _, Ctxt} = do_checkpoint(Source, Target, Context, Seq, Stats),
- {[], Ctxt}
- catch
- throw:attachment_write_failed ->
- ?LOG_ERROR("attachment request failed during write to disk", []),
- exit({internal_server_error, replication_link_failure})
- end;
- false ->
- {[Docs | Buffer], Context}
- end,
+handle_info({missing_revs_checkpoint, SourceSeq}, State) ->
+ couch_task_status:update("MR Processed source update #~p", [SourceSeq]),
+ {noreply, do_checkpoint(State#state{committed_seq = SourceSeq})};
+
+handle_info({writer_checkpoint, SourceSeq}, #state{committed_seq=N} = State)
+ when SourceSeq > N ->
+ MissingRevs = State#state.missing_revs,
+ ok = gen_server:cast(MissingRevs, {update_committed_seq, SourceSeq}),
+ couch_task_status:update("W Processed source update #~p", [SourceSeq]),
+ {noreply, do_checkpoint(State#state{committed_seq = SourceSeq})};
+handle_info({writer_checkpoint, _}, State) ->
+ {noreply, State};
- {reply, ok, State#state{context=NewContext, docs_buffer=NewBuffer}};
+handle_info({update_stats, Key, N}, State) ->
+ ets:update_counter(State#state.stats, Key, N),
+ {noreply, State};
-handle_call({fin, {LastSeq, RevsCount}}, {Pid,_}, #state{enum_pid=Pid} = State) ->
- ets:update_counter(State#state.stats, total_revs, RevsCount),
+handle_info({'EXIT', Writer, normal}, #state{writer=Writer} = State) ->
case State#state.listeners of
[] ->
- % still waiting for the first listener to send a request
- {noreply, State#state{current_seq=LastSeq,done=true}};
- _ ->
- {stop, normal, ok, State#state{current_seq=LastSeq}}
- end.
-
-handle_cast({increment_update_seq, Seq}, State) ->
- couch_task_status:update("Processed source update #~p", [Seq]),
- {noreply, State#state{current_seq=Seq}}.
-
-handle_info({'EXIT', Pid, Reason}, #state{enum_pid=Pid} = State) ->
- ?LOG_ERROR("replication enumerator exited with ~p .. respawning", [Reason]),
- #state{
- current_seq = Seq,
- source = Src,
- target = Tgt,
- enum_pid = Pid
- } = State,
- Parent = self(),
- NewPid = spawn_link(fun() -> enum_docs_since(Parent,Src,Tgt,{Seq,0}) end),
- {noreply, State#state{enum_pid=NewPid}};
+ {noreply, State#state{complete = true}};
+ _Else ->
+ {stop, normal, State}
+ end;
-%% if any linked process dies, respawn the enumerator to get things going again
-handle_info({'EXIT', _From, normal}, State) ->
- {noreply, State};
-handle_info({'EXIT', From, Reason}, #state{enum_pid=EnumPid} = State) ->
- ?LOG_ERROR("replicator-linked pid ~p exited with ~p", [From, Reason]),
- exit(EnumPid, pls_restart_kthxbye),
+handle_info({'EXIT', _, normal}, State) ->
{noreply, State};
-
-handle_info(_Msg, State) ->
- {noreply, State}.
+handle_info({'EXIT', Pid, Reason}, State) ->
+ ?LOG_ERROR("exit of linked Pid ~p with reason ~p", [Pid, Reason]),
+ {stop, Reason, State}.
terminate(normal, State) ->
+ % ?LOG_DEBUG("replication terminating normally", []),
#state{
- context = Context,
- current_seq = Seq,
- docs_buffer = Buffer,
+ checkpoint_history = CheckpointHistory,
+ committed_seq = NewSeq,
listeners = Listeners,
source = Source,
target = Target,
- stats = Stats
+ stats = Stats,
+ source_log = #doc{body={OldHistory}}
} = State,
-
- try update_docs(Target, lists:flatten(Buffer), [], replicated_changes) of
- {ok, Errors} ->
- dump_update_errors(Errors),
- ets:update_counter(Stats, doc_write_failures, length(Errors)),
- ets:update_counter(Stats, docs_written, lists:flatlength(Buffer) -
- length(Errors))
- catch
- throw:attachment_write_failed ->
- ?LOG_ERROR("attachment request failed during final write", []),
- exit({internal_server_error, replication_link_failure})
- end,
-
couch_task_status:update("Finishing"),
-
- {ok, NewRepHistory, _} = do_checkpoint(Source, Target, Context, Seq, Stats),
ets:delete(Stats),
close_db(Target),
+
+ NewRepHistory = case CheckpointHistory of
+ nil ->
+ {[{<<"no_changes">>, true} | OldHistory]};
+ _Else ->
+ CheckpointHistory
+ end,
- [Original|Rest] = Listeners,
+ %% reply to original requester
+ [Original|OtherListeners] = lists:reverse(Listeners),
gen_server:reply(Original, {ok, NewRepHistory}),
%% maybe trigger another replication. If this replicator uses a local
%% source Db, changes to that Db since we started will not be included in
%% this pass.
- case up_to_date(Source, Seq) of
+ case up_to_date(Source, NewSeq) of
true ->
- [gen_server:reply(R, {ok, NewRepHistory}) || R <- Rest];
+ [gen_server:reply(R, {ok, NewRepHistory}) || R <- OtherListeners];
false ->
- [gen_server:reply(R, retry) || R <- Rest]
+ [gen_server:reply(R, retry) || R <- OtherListeners]
end,
close_db(Source);
+
terminate(Reason, State) ->
- ?LOG_ERROR("replicator terminating with reason ~p", [Reason]),
#state{
listeners = Listeners,
source = Source,
target = Target,
stats = Stats
} = State,
-
[gen_server:reply(L, {error, Reason}) || L <- Listeners],
-
ets:delete(Stats),
close_db(Target),
close_db(Source).
@@ -328,560 +228,284 @@ terminate(Reason, State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-%%=============================================================================
-%% internal functions
-%%=============================================================================
-
+% internal funs
-% we should probably write these to a special replication log
-% or have a callback where the caller decides what to do with replication
-% errors.
-dump_update_errors([]) -> ok;
-dump_update_errors([{{Id, Rev}, Error}|Rest]) ->
- ?LOG_INFO("error replicating document \"~s\" rev \"~s\":~p",
- [Id, couch_doc:rev_to_str(Rev), Error]),
- dump_update_errors(Rest).
-
-attachment_loop(ReqId, Conn) ->
- couch_util:should_flush(),
- receive
- {From, {set_req_id, NewId}} ->
- %% we learn the ReqId to listen for
- From ! {self(), {ok, NewId}},
- attachment_loop(NewId, Conn);
- {ibrowse_async_headers, ReqId, Status, Headers} ->
- %% we got header, give the controlling process a chance to react
- receive
- {From, gimme_status} ->
- %% send status/headers to controller
- From ! {self(), {status, Status, Headers}},
- receive
- {From, continue} ->
- %% normal case
- attachment_loop(ReqId, Conn);
- {From, fail} ->
- %% error, failure code
- ?LOG_ERROR(
- "streaming attachment failed with status ~p",
- [Status]),
- catch ibrowse:stop_worker_process(Conn),
- exit(attachment_request_failed);
- {From, stop_ok} ->
- %% stop looping, controller will start a new loop
- catch ibrowse:stop_worker_process(Conn),
- stop_ok
- end
- end,
- attachment_loop(ReqId, Conn);
- {ibrowse_async_response, ReqId, {chunk_start,_}} ->
- attachment_loop(ReqId, Conn);
- {ibrowse_async_response, ReqId, chunk_end} ->
- attachment_loop(ReqId, Conn);
- {ibrowse_async_response, ReqId, {error, Err}} ->
- ?LOG_ERROR("streaming attachment failed with ~p", [Err]),
- catch ibrowse:stop_worker_process(Conn),
- exit(attachment_request_failed);
- {ibrowse_async_response, ReqId, Data} ->
- receive {From, gimme_data} -> From ! {self(), Data} end,
- attachment_loop(ReqId, Conn);
- {ibrowse_async_response_end, ReqId} ->
- catch ibrowse:stop_worker_process(Conn),
- exit(normal)
+start_replication_server(Replicator) ->
+ RepId = element(1, Replicator),
+ case supervisor:start_child(couch_rep_sup, Replicator) of
+ {ok, Pid} ->
+ ?LOG_INFO("starting new replication ~p at ~p", [RepId, Pid]),
+ Pid;
+ {error, already_present} ->
+ case supervisor:restart_child(couch_rep_sup, RepId) of
+ {ok, Pid} ->
+ ?LOG_INFO("starting replication ~p at ~p", [RepId, Pid]),
+ Pid;
+ {error, running} ->
+ %% this error occurs if multiple replicators are racing
+ %% each other to start and somebody else won. Just grab
+ %% the Pid by calling start_child again.
+ {error, {already_started, Pid}} =
+ supervisor:start_child(couch_rep_sup, Replicator),
+ ?LOG_DEBUG("replication ~p already running at ~p", [RepId, Pid]),
+ Pid
+ end;
+ {error, {already_started, Pid}} ->
+ ?LOG_DEBUG("replication ~p already running at ~p", [RepId, Pid]),
+ Pid
end.
-att_stub_converter(DbS, Id, Rev,
- #att{name=Name,data=stub,type=Type,len=Length}=Att) ->
- #old_http_db{uri=DbUrl, headers=Headers} = DbS,
- {Pos, [RevId|_]} = Rev,
- Url = lists:flatten([DbUrl, couch_util:url_encode(Id), "/", couch_util:url_encode(?b2l(Name)),
- "?rev=", ?b2l(couch_doc:rev_to_str({Pos,RevId}))]),
- ?LOG_DEBUG("Attachment URL ~s", [Url]),
- {ok, RcvFun} = make_att_stub_receiver(Url, Headers, Name,
- Type, Length),
- Att#att{name=Name,type=Type,data=RcvFun,len=Length}.
-
-make_att_stub_receiver(Url, Headers, Name, Type, Length) ->
- make_att_stub_receiver(Url, Headers, Name, Type, Length, 10, 1000).
-
-make_att_stub_receiver(Url, _Headers, _Name, _Type, _Length, 0, _Pause) ->
- ?LOG_ERROR("streaming attachment request failed after 10 retries: ~s",
- [Url]),
- exit({attachment_request_failed, ?l2b(["failed to replicate ", Url])});
-
-make_att_stub_receiver(Url, Headers, Name, Type, Length, Retries, Pause) ->
- %% start the process that receives attachment data from ibrowse
- #url{host=Host, port=Port} = ibrowse_lib:parse_url(Url),
- {ok, Conn} = ibrowse:spawn_link_worker_process(Host, Port),
- Pid = spawn_link(fun() -> attachment_loop(nil, Conn) end),
-
- %% make the async request
- Opts = [{stream_to, Pid}, {response_format, binary}],
- ReqId =
- case ibrowse:send_req_direct(Conn, Url, Headers, get, [], Opts, infinity) of
- {ibrowse_req_id, X} ->
- X;
- {error, Reason} ->
- ?LOG_INFO("retrying couch_rep attachment request in ~p " ++
- "seconds due to {error, ~p}: ~s", [Pause/1000, Reason, Url]),
- catch ibrowse:stop_worker_process(Conn),
- timer:sleep(Pause),
- make_att_stub_receiver(Url, Headers, Name, Type, Length,
- Retries-1, 2*Pause)
- end,
-
- %% tell our receiver about the ReqId it needs to look for
- Pid ! {self(), {set_req_id, ReqId}},
- receive
- {Pid, {ok, ReqId}} ->
- ok;
- {'EXIT', Pid, _Reason} ->
- catch ibrowse:stop_worker_process(Conn),
- timer:sleep(Pause),
- make_att_stub_receiver(Url, Headers, Name, Type, Length,
- Retries-1, 2*Pause)
- end,
-
- %% wait for headers to ensure that we have a 200 status code
- %% this is where we follow redirects etc
- Pid ! {self(), gimme_status},
- receive
- {'EXIT', Pid, attachment_request_failed} ->
- catch ibrowse:stop_worker_process(Conn),
- make_att_stub_receiver(Url, Headers, Name, Type, Length,
- Retries-1, Pause);
- {Pid, {status, StreamStatus, StreamHeaders}} ->
- ?LOG_DEBUG("streaming attachment Status ~p Headers ~p",
- [StreamStatus, StreamHeaders]),
-
- ResponseCode = list_to_integer(StreamStatus),
- if
- ResponseCode >= 200, ResponseCode < 300 ->
- % the normal case
- Pid ! {self(), continue},
- %% this function goes into the streaming attachment code.
- %% It gets executed by the replication gen_server, so it can't
- %% be the one to actually receive the ibrowse data.
- {ok, fun() ->
- Pid ! {self(), gimme_data},
- receive
- {Pid, Data} ->
- Data;
- {'EXIT', Pid, attachment_request_failed} ->
- throw(attachment_write_failed)
- end
- end};
- ResponseCode >= 300, ResponseCode < 400 ->
- % follow the redirect
- Pid ! {self(), stop_ok},
- RedirectUrl = mochiweb_headers:get_value("Location",
- mochiweb_headers:make(StreamHeaders)),
- catch ibrowse:stop_worker_process(Conn),
- make_att_stub_receiver(RedirectUrl, Headers, Name, Type,
- Length, Retries - 1, Pause);
- ResponseCode >= 400, ResponseCode < 500 ->
- % an error... log and fail
- ?LOG_ERROR("streaming attachment failed with code ~p: ~s",
- [ResponseCode, Url]),
- Pid ! {self(), fail},
- exit(attachment_request_failed);
- ResponseCode == 500 ->
- % an error... log and retry
- ?LOG_INFO("retrying couch_rep attachment request in ~p " ++
- "seconds due to 500 response: ~s", [Pause/1000, Url]),
- Pid ! {self(), fail},
- catch ibrowse:stop_worker_process(Conn),
- timer:sleep(Pause),
- make_att_stub_receiver(Url, Headers, Name, Type, Length,
- Retries - 1, 2*Pause)
- end
+compare_replication_logs(SrcDoc, TgtDoc) ->
+ #doc{body={RepRecProps}} = SrcDoc,
+ #doc{body={RepRecPropsTgt}} = TgtDoc,
+ case proplists:get_value(<<"session_id">>, RepRecProps) ==
+ proplists:get_value(<<"session_id">>, RepRecPropsTgt) of
+ true ->
+ % if the records have the same session id,
+ % then we have a valid replication history
+ OldSeqNum = proplists:get_value(<<"source_last_seq">>, RepRecProps, 0),
+ OldHistory = proplists:get_value(<<"history">>, RepRecProps, []),
+ {OldSeqNum, OldHistory};
+ false ->
+ SourceHistory = proplists:get_value(<<"history">>, RepRecProps, []),
+ TargetHistory = proplists:get_value(<<"history">>, RepRecPropsTgt, []),
+ ?LOG_INFO("Replication records differ. "
+ "Scanning histories to find a common ancestor.", []),
+ ?LOG_DEBUG("Record on source:~p~nRecord on target:~p~n",
+ [RepRecProps, RepRecPropsTgt]),
+ compare_rep_history(SourceHistory, TargetHistory)
end.
-
-open_db({remote, Url, Headers, Auth})->
- {ok, #old_http_db{uri=?b2l(Url), headers=Headers, oauth=Auth}, Url};
-open_db({local, DbName, UserCtx})->
- case couch_db:open(DbName, [{user_ctx, UserCtx}]) of
- {ok, Db} -> {ok, Db, DbName};
- Error -> Error
+compare_rep_history(S, T) when length(S) =:= 0 orelse length(T) =:= 0 ->
+ ?LOG_INFO("no common ancestry -- performing full replication", []),
+ {0, []};
+compare_rep_history([{S}|SourceRest], [{T}|TargetRest]=Target) ->
+ SourceId = proplists:get_value(<<"session_id">>, S),
+ case has_session_id(SourceId, Target) of
+ true ->
+ RecordSeqNum = proplists:get_value(<<"recorded_seq">>, S, 0),
+ ?LOG_INFO("found a common replication record with source_seq ~p",
+ [RecordSeqNum]),
+ {RecordSeqNum, SourceRest};
+ false ->
+ TargetId = proplists:get_value(<<"session_id">>, T),
+ case has_session_id(TargetId, SourceRest) of
+ true ->
+ RecordSeqNum = proplists:get_value(<<"recorded_seq">>, T, 0),
+ ?LOG_INFO("found a common replication record with source_seq ~p",
+ [RecordSeqNum]),
+ {RecordSeqNum, TargetRest};
+ false ->
+ compare_rep_history(SourceRest, TargetRest)
+ end
end.
-
-close_db(#old_http_db{})->
+close_db(#http_db{})->
ok;
close_db(Db)->
couch_db:close(Db).
-do_checkpoint(Source, Target, Context, NewSeqNum, Stats) ->
- ?LOG_INFO("recording a checkpoint at source update_seq ~p", [NewSeqNum]),
- [
- {start_seq, StartSeqNum},
- {history, OldHistory},
- {rep_starttime, ReplicationStartTime},
- {src_starttime, SrcInstanceStartTime},
- {tgt_starttime, TgtInstanceStartTime},
- {src_record, #doc{body={LastRepRecord}}=RepRecDocSrc},
- {tgt_record, RepRecDocTgt}
- ] = Context,
-
- case NewSeqNum == StartSeqNum andalso OldHistory /= [] of
- true ->
- % nothing changed, don't record results
- {ok, {[{<<"no_changes">>, true} | LastRepRecord]}, Context};
- false ->
- % something changed, record results for incremental replication,
-
- % commit changes to both src and tgt. The src because if changes
- % we replicated are lost, we'll record the a seq number ahead
- % of what was committed. If those changes are lost and the seq number
- % reverts to a previous committed value, we will skip future changes
- % when new doc updates are given our already replicated seq nums.
-
- % commit the src async
- ParentPid = self(),
- SrcCommitPid = spawn_link(fun() ->
- ParentPid ! {self(), ensure_full_commit(Source)} end),
-
- % commit tgt sync
- {ok, TgtInstanceStartTime2} = ensure_full_commit(Target),
-
- SrcInstanceStartTime2 =
- receive
- {SrcCommitPid, {ok, Timestamp}} ->
- Timestamp;
- {'EXIT', SrcCommitPid, {http_request_failed, _}} ->
- exit(replication_link_failure)
- end,
-
- RecordSeqNum =
- if SrcInstanceStartTime2 == SrcInstanceStartTime andalso
- TgtInstanceStartTime2 == TgtInstanceStartTime ->
- NewSeqNum;
- true ->
- ?LOG_INFO("A server has restarted sinced replication start. "
- "Not recording the new sequence number to ensure the "
- "replication is redone and documents reexamined.", []),
- StartSeqNum
- end,
-
- NewHistoryEntry = {
- [{<<"start_time">>, list_to_binary(ReplicationStartTime)},
- {<<"end_time">>, list_to_binary(httpd_util:rfc1123_date())},
- {<<"start_last_seq">>, StartSeqNum},
- {<<"end_last_seq">>, NewSeqNum},
- {<<"missing_checked">>, ets:lookup_element(Stats, total_revs, 2)},
- {<<"missing_found">>, ets:lookup_element(Stats, missing_revs, 2)},
- {<<"docs_read">>, ets:lookup_element(Stats, docs_read, 2)},
- {<<"docs_written">>, ets:lookup_element(Stats, docs_written, 2)},
- {<<"doc_write_failures">>, ets:lookup_element(Stats, doc_write_failures, 2)}
- ]},
- % limit history to 50 entries
- HistEntries =lists:sublist([NewHistoryEntry | OldHistory], 50),
-
- NewRepHistory =
- {[{<<"session_id">>, couch_util:new_uuid()},
- {<<"source_last_seq">>, RecordSeqNum},
- {<<"history">>, HistEntries}]},
-
- {ok, {SrcRevPos,SrcRevId}} = update_doc(Source,
- RepRecDocSrc#doc{body=NewRepHistory}, []),
- {ok, {TgtRevPos,TgtRevId}} = update_doc(Target,
- RepRecDocTgt#doc{body=NewRepHistory}, []),
-
- NewContext = [
- {start_seq, StartSeqNum},
- {history, OldHistory},
- {rep_starttime, ReplicationStartTime},
- {src_starttime, SrcInstanceStartTime},
- {tgt_starttime, TgtInstanceStartTime},
- {src_record, RepRecDocSrc#doc{revs={SrcRevPos,[SrcRevId]}}},
- {tgt_record, RepRecDocTgt#doc{revs={TgtRevPos,[TgtRevId]}}}
- ],
-
- {ok, NewRepHistory, NewContext}
-
+dbname(#http_db{} = Db) ->
+ Db#http_db.url;
+dbname(Db) ->
+ Db#db.name.
+
+dbinfo(#http_db{} = Db) ->
+ {DbProps} = couch_rep_httpc:request(Db),
+ [{list_to_atom(?b2l(K)), V} || {K,V} <- DbProps];
+dbinfo(Db) ->
+ {ok, Info} = couch_db:get_db_info(Db),
+ Info.
+
+has_session_id(_SessionId, []) ->
+ false;
+has_session_id(SessionId, [{Props} | Rest]) ->
+ case proplists:get_value(<<"session_id">>, Props, nil) of
+ SessionId ->
+ true;
+ _Else ->
+ has_session_id(SessionId, Rest)
end.
-do_http_request(Url, Action, Headers, Auth) ->
- do_http_request(Url, Action, Headers, Auth, []).
-
-do_http_request(Url, Action, Headers, Auth, JsonBody) ->
- Headers0 = case Auth of
- {Props} ->
- % Add OAuth header
- {OAuth} = proplists:get_value(<<"oauth">>, Props),
- ConsumerKey = ?b2l(proplists:get_value(<<"consumer_key">>, OAuth)),
- Token = ?b2l(proplists:get_value(<<"token">>, OAuth)),
- TokenSecret = ?b2l(proplists:get_value(<<"token_secret">>, OAuth)),
- ConsumerSecret = ?b2l(proplists:get_value(<<"consumer_secret">>, OAuth)),
- Consumer = {ConsumerKey, ConsumerSecret, hmac_sha1},
- Method = case Action of
- get -> "GET";
- post -> "POST";
- put -> "PUT"
- end,
- Params = oauth:signed_params(Method, Url, [], Consumer, Token, TokenSecret),
- [{"Authorization", "OAuth " ++ oauth_uri:params_to_header_string(Params)} | Headers];
- _Else ->
- Headers
- end,
- do_http_request0(Url, Action, Headers0, JsonBody, 10, 1000).
-
-do_http_request0(Url, Action, Headers, Body, Retries, Pause) when is_binary(Url) ->
- do_http_request0(?b2l(Url), Action, Headers, Body, Retries, Pause);
-do_http_request0(Url, Action, _Headers, _JsonBody, 0, _Pause) ->
- ?LOG_ERROR("couch_rep HTTP ~p request failed after 10 retries: ~s",
- [Action, Url]),
- exit({http_request_failed, ?l2b(["failed to replicate ", Url])});
-do_http_request0(Url, Action, Headers, JsonBody, Retries, Pause) ->
- ?LOG_DEBUG("couch_rep HTTP ~p request: ~s", [Action, Url]),
- Body =
- case JsonBody of
- [] ->
- <<>>;
+make_replication_id({Props}, UserCtx) ->
+ %% funky algorithm to preserve backwards compatibility
+ {ok, HostName} = inet:gethostname(),
+ Src = get_rep_endpoint(UserCtx, proplists:get_value(<<"source">>, Props)),
+ Tgt = get_rep_endpoint(UserCtx, proplists:get_value(<<"target">>, Props)),
+ couch_util:to_hex(erlang:md5(term_to_binary([HostName, Src, Tgt]))).
+
+maybe_add_trailing_slash(Url) ->
+ re:replace(Url, "[^/]$", "&/", [{return, list}]).
+
+get_rep_endpoint(_UserCtx, {Props}) ->
+ Url = maybe_add_trailing_slash(proplists:get_value(<<"url">>, Props)),
+ {BinHeaders} = proplists:get_value(<<"headers">>, Props, {[]}),
+ {Auth} = proplists:get_value(<<"auth">>, Props, {[]}),
+ case proplists:get_value(<<"oauth">>, Auth) of
+ undefined ->
+ {remote, Url, [{?b2l(K),?b2l(V)} || {K,V} <- BinHeaders]};
+ {OAuth} ->
+ {remote, Url, [{?b2l(K),?b2l(V)} || {K,V} <- BinHeaders], OAuth}
+ end;
+get_rep_endpoint(_UserCtx, <<"http://",_/binary>>=Url) ->
+ {remote, maybe_add_trailing_slash(Url), []};
+get_rep_endpoint(_UserCtx, <<"https://",_/binary>>=Url) ->
+ {remote, maybe_add_trailing_slash(Url), []};
+get_rep_endpoint(UserCtx, <<DbName/binary>>) ->
+ {local, DbName, UserCtx}.
+
+open_replication_log(#http_db{}=Db, RepId) ->
+ DocId = ?LOCAL_DOC_PREFIX ++ RepId,
+ Req = Db#http_db{resource=couch_util:url_encode(DocId)},
+ case couch_rep_httpc:request(Req) of
+ {[{<<"error">>, _}, {<<"reason">>, _}]} ->
+ ?LOG_DEBUG("didn't find a replication log for ~s", [Db#http_db.url]),
+ #doc{id=?l2b(DocId)};
+ Doc ->
+ ?LOG_DEBUG("found a replication log for ~s", [Db#http_db.url]),
+ couch_doc:from_json_obj(Doc)
+ end;
+open_replication_log(Db, RepId) ->
+ DocId = ?l2b(?LOCAL_DOC_PREFIX ++ RepId),
+ case couch_db:open_doc(Db, DocId, []) of
+ {ok, Doc} ->
+ ?LOG_DEBUG("found a replication log for ~s", [Db#db.name]),
+ Doc;
_ ->
- iolist_to_binary(?JSON_ENCODE(JsonBody))
- end,
- Options = case Action of
- get -> [];
- _ -> [{transfer_encoding, {chunked, 65535}}]
- end ++ [
- {content_type, "application/json; charset=utf-8"},
- {max_pipeline_size, 101},
- {response_format, binary}
- ],
- case ibrowse:send_req(Url, Headers, Action, Body, Options, infinity) of
- {ok, Status, ResponseHeaders, ResponseBody} ->
- ResponseCode = list_to_integer(Status),
- if
- ResponseCode >= 200, ResponseCode < 300 ->
- ?JSON_DECODE(ResponseBody);
- ResponseCode >= 300, ResponseCode < 400 ->
- RedirectUrl = mochiweb_headers:get_value("Location",
- mochiweb_headers:make(ResponseHeaders)),
- do_http_request0(RedirectUrl, Action, Headers, JsonBody, Retries-1,
- Pause);
- ResponseCode >= 400, ResponseCode < 500 ->
- ?JSON_DECODE(ResponseBody);
- ResponseCode == 500 ->
- ?LOG_INFO("retrying couch_rep HTTP ~p request in ~p seconds " ++
- "due to 500 error: ~s", [Action, Pause/1000, Url]),
- timer:sleep(Pause),
- do_http_request0(Url, Action, Headers, JsonBody, Retries - 1, 2*Pause)
- end;
- {error, Reason} ->
- ?LOG_INFO("retrying couch_rep HTTP ~p request in ~p seconds due to " ++
- "{error, ~p}: ~s", [Action, Pause/1000, Reason, Url]),
- timer:sleep(Pause),
- do_http_request0(Url, Action, Headers, JsonBody, Retries - 1, 2*Pause)
+ ?LOG_DEBUG("didn't find a replication log for ~s", [Db#db.name]),
+ #doc{id=DocId}
end.
-ensure_full_commit(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth}) ->
- {ResultProps} = do_http_request(DbUrl ++ "_ensure_full_commit", post,
- Headers, OAuth, true),
- true = proplists:get_value(<<"ok">>, ResultProps),
- {ok, proplists:get_value(<<"instance_start_time">>, ResultProps)};
-ensure_full_commit(Db) ->
- couch_db:ensure_full_commit(Db).
-
-enum_docs_since(Pid, DbSource, DbTarget, {StartSeq, RevsCount}) ->
- case get_doc_info_list(DbSource, StartSeq) of
- [] ->
- gen_server:call(Pid, {fin, {StartSeq, RevsCount}}, infinity);
- DocInfoList ->
- SrcRevsList = lists:map(fun(#doc_info{id=Id,revs=RevInfos}) ->
- SrcRevs = [Rev || #rev_info{rev=Rev} <- RevInfos],
- {Id, SrcRevs}
- end, DocInfoList),
- {ok, MissingRevs} = get_missing_revs(DbTarget, SrcRevsList),
-
- %% do we need to check for success here?
- [gen_server:call(Pid, {replicate_doc, Info}, infinity)
- || Info <- MissingRevs ],
-
- #doc_info{high_seq=LastSeq} = lists:last(DocInfoList),
- RevsCount2 = RevsCount + length(SrcRevsList),
- gen_server:cast(Pid, {increment_update_seq, LastSeq}),
-
- enum_docs_since(Pid, DbSource, DbTarget, {LastSeq, RevsCount2})
- end.
-
-
-
-get_db_info(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth}) ->
- {DbProps} = do_http_request(DbUrl, get, Headers, OAuth),
- {ok, [{list_to_atom(?b2l(K)), V} || {K,V} <- DbProps]};
-get_db_info(Db) ->
- couch_db:get_db_info(Db).
-
-get_doc_info_list(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth}, StartSeq) ->
- Url = DbUrl ++ "_all_docs_by_seq?limit=100&startkey="
- ++ integer_to_list(StartSeq),
- {Results} = do_http_request(Url, get, Headers, OAuth),
- lists:map(fun({RowInfoList}) ->
- {RowValueProps} = proplists:get_value(<<"value">>, RowInfoList),
- Seq = proplists:get_value(<<"key">>, RowInfoList),
- Revs =
- [#rev_info{rev=couch_doc:parse_rev(proplists:get_value(<<"rev">>, RowValueProps)), deleted = proplists:get_value(<<"deleted">>, RowValueProps, false)} |
- [#rev_info{rev=Rev,deleted=false} || Rev <- couch_doc:parse_revs(proplists:get_value(<<"conflicts">>, RowValueProps, []))] ++
- [#rev_info{rev=Rev,deleted=true} || Rev <- couch_doc:parse_revs(proplists:get_value(<<"deleted_conflicts">>, RowValueProps, []))]],
- #doc_info{
- id=proplists:get_value(<<"id">>, RowInfoList),
- high_seq = Seq,
- revs = Revs
- }
- end, proplists:get_value(<<"rows">>, Results));
-get_doc_info_list(DbSource, StartSeq) ->
- {ok, {_Count, DocInfoList}} = couch_db:enum_docs_since(DbSource, StartSeq,
- fun (_, _, {100, DocInfoList}) ->
- {stop, {100, DocInfoList}};
- (DocInfo, _, {Count, DocInfoList}) ->
- {ok, {Count+1, [DocInfo|DocInfoList]}}
- end, {0, []}),
- lists:reverse(DocInfoList).
-
-get_missing_revs(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth}, DocIdRevsList) ->
- DocIdRevsList2 = [{Id, couch_doc:rev_to_strs(Revs)} || {Id, Revs} <- DocIdRevsList],
- {ResponseMembers} = do_http_request(DbUrl ++ "_missing_revs", post, Headers, OAuth,
- {DocIdRevsList2}),
- {DocMissingRevsList} = proplists:get_value(<<"missing_revs">>, ResponseMembers),
- DocMissingRevsList2 = [{Id, couch_doc:parse_revs(MissingRevStrs)} || {Id, MissingRevStrs} <- DocMissingRevsList],
- {ok, DocMissingRevsList2};
-get_missing_revs(Db, DocId) ->
- couch_db:get_missing_revs(Db, DocId).
-
-
-open_doc(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth}, DocId, Options) ->
- [] = Options,
- case do_http_request(DbUrl ++ couch_util:url_encode(DocId), get, Headers, OAuth) of
- {[{<<"error">>, ErrId}, {<<"reason">>, Reason}]} ->
- {couch_util:to_existing_atom(ErrId), Reason};
- Doc ->
- {ok, couch_doc:from_json_obj(Doc)}
+open_db({Props}, _UserCtx) ->
+ Url = maybe_add_trailing_slash(proplists:get_value(<<"url">>, Props)),
+ {AuthProps} = proplists:get_value(<<"auth">>, Props, {[]}),
+ {BinHeaders} = proplists:get_value(<<"headers">>, Props, {[]}),
+ Headers = [{?b2l(K),?b2l(V)} || {K,V} <- BinHeaders],
+ DefaultHeaders = (#http_db{})#http_db.headers,
+ Db = #http_db{
+ url = Url,
+ auth = AuthProps,
+ headers = lists:ukeymerge(1, Headers, DefaultHeaders)
+ },
+ case couch_rep_httpc:db_exists(Db) of
+ true -> Db;
+ false -> throw({db_not_found, Url})
end;
-open_doc(Db, DocId, Options) ->
- couch_db:open_doc(Db, DocId, Options).
-
-open_doc_revs(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth} = DbS, DocId, Revs0,
- [latest]) ->
- Revs = couch_doc:rev_to_strs(Revs0),
- BaseUrl = DbUrl ++ couch_util:url_encode(DocId) ++ "?revs=true&latest=true",
-
- %% MochiWeb expects URLs < 8KB long, so maybe split into multiple requests
- MaxN = trunc((8192 - length(BaseUrl))/14),
-
- JsonResults = case length(Revs) > MaxN of
- false ->
- Url = ?l2b(BaseUrl ++ "&open_revs=" ++ ?JSON_ENCODE(Revs)),
- do_http_request(Url, get, Headers, OAuth);
- true ->
- {_, Rest, Acc} = lists:foldl(
- fun(Rev, {Count, RevsAcc, AccResults}) when Count =:= MaxN ->
- QSRevs = ?JSON_ENCODE(lists:reverse(RevsAcc)),
- Url = ?l2b(BaseUrl ++ "&open_revs=" ++ QSRevs),
- {1, [Rev], AccResults++do_http_request(Url, get, Headers, OAuth)};
- (Rev, {Count, RevsAcc, AccResults}) ->
- {Count+1, [Rev|RevsAcc], AccResults}
- end, {0, [], []}, Revs),
- Acc ++ do_http_request(?l2b(BaseUrl ++ "&open_revs=" ++
- ?JSON_ENCODE(lists:reverse(Rest))), get, Headers, OAuth)
- end,
-
- Results =
- lists:map(
- fun({[{<<"missing">>, Rev}]}) ->
- {{not_found, missing}, couch_doc:parse_rev(Rev)};
- ({[{<<"ok">>, JsonDoc}]}) ->
- #doc{id=Id, revs=Rev, atts=Atts} = Doc =
- couch_doc:from_json_obj(JsonDoc),
- {ok, Doc#doc{atts=[att_stub_converter(DbS,Id,Rev,A) || A <- Atts]}}
- end, JsonResults),
- {ok, Results};
-open_doc_revs(Db, DocId, Revs, Options) ->
- couch_db:open_doc_revs(Db, DocId, Revs, Options).
-
-%% @spec should_flush() -> true | false
-%% @doc Calculates whether it's time to flush the document buffer. Considers
-%% - memory utilization
-%% - number of pending document writes
-%% - approximate number of pending attachment writes
-should_flush(DocCount) when DocCount > ?BUFFER_NDOCS ->
- true;
-should_flush(_DocCount) ->
- MeAndMyLinks = [self()|
- [P || P <- element(2,process_info(self(),links)), is_pid(P)]],
-
- case length(MeAndMyLinks)/2 > ?BUFFER_NATTACHMENTS of
- true -> true;
- false ->
- case memory_footprint(MeAndMyLinks) > 2*?BUFFER_MEMORY of
- true ->
- [garbage_collect(Pid) || Pid <- MeAndMyLinks],
- memory_footprint(MeAndMyLinks) > ?BUFFER_MEMORY;
- false -> false
- end
- end.
-
-%% @spec memory_footprint([pid()]) -> integer()
-%% @doc Sum of process and binary memory utilization for all processes in list
-memory_footprint(PidList) ->
- memory_footprint(PidList, {0,0}).
-
-memory_footprint([], {ProcessMemory, BinaryMemory}) ->
- ?LOG_DEBUG("ProcessMem ~p BinaryMem ~p", [ProcessMemory, BinaryMemory]),
- ProcessMemory + BinaryMemory;
-memory_footprint([Pid|Rest], {ProcAcc, BinAcc}) ->
- case is_process_alive(Pid) of
- true ->
- ProcMem = element(2,process_info(Pid, memory)),
- BinMem = binary_memory(Pid),
- memory_footprint(Rest, {ProcMem + ProcAcc, BinMem + BinAcc});
- false ->
- memory_footprint(Rest, {ProcAcc, BinAcc})
+open_db(<<"http://",_/binary>>=Url, _) ->
+ open_db({[{<<"url">>,Url}]}, []);
+open_db(<<"https://",_/binary>>=Url, _) ->
+ open_db({[{<<"url">>,Url}]}, []);
+open_db(<<DbName/binary>>, UserCtx) ->
+ case couch_db:open(DbName, [{user_ctx, UserCtx}]) of
+ {ok, Db} -> Db;
+ {not_found, no_db_file} -> throw({db_not_found, DbName})
end.
-%% @spec binary_memory(pid()) -> integer()
-%% @doc Memory utilization of all binaries referenced by this process.
-binary_memory(Pid) ->
- lists:foldl(fun({_Id, Size, _NRefs}, Acc) -> Size+Acc end,
- 0, element(2,process_info(Pid, binary))).
-
-update_doc(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth}, #doc{id=DocId}=Doc, Options) ->
- [] = Options,
- Url = DbUrl ++ couch_util:url_encode(DocId),
- {ResponseMembers} = do_http_request(Url, put, Headers, OAuth,
- couch_doc:to_json_obj(Doc, [attachments])),
+do_checkpoint(State) ->
+ #state{
+ source = Source,
+ target = Target,
+ committed_seq = NewSeqNum,
+ start_seq = StartSeqNum,
+ history = OldHistory,
+ source_log = SourceLog,
+ target_log = TargetLog,
+ rep_starttime = ReplicationStartTime,
+ src_starttime = SrcInstanceStartTime,
+ tgt_starttime = TgtInstanceStartTime,
+ stats = Stats
+ } = State,
+ ?LOG_INFO("recording a checkpoint at source update_seq ~p", [NewSeqNum]),
+ RecordSeqNum = case commit_to_both(Source, Target) of
+ {SrcInstanceStartTime, TgtInstanceStartTime} ->
+ NewSeqNum;
+ _Else ->
+ ?LOG_INFO("A server has restarted sinced replication start. "
+ "Not recording the new sequence number to ensure the "
+ "replication is redone and documents reexamined.", []),
+ StartSeqNum
+ end,
+ SessionId = couch_util:new_uuid(),
+ NewHistoryEntry = {[
+ {<<"session_id">>, SessionId},
+ {<<"start_time">>, list_to_binary(ReplicationStartTime)},
+ {<<"end_time">>, list_to_binary(httpd_util:rfc1123_date())},
+ {<<"start_last_seq">>, StartSeqNum},
+ {<<"end_last_seq">>, NewSeqNum},
+ {<<"recorded_seq">>, RecordSeqNum},
+ {<<"missing_checked">>, ets:lookup_element(Stats, total_revs, 2)},
+ {<<"missing_found">>, ets:lookup_element(Stats, missing_revs, 2)},
+ {<<"docs_read">>, ets:lookup_element(Stats, docs_read, 2)},
+ {<<"docs_written">>, ets:lookup_element(Stats, docs_written, 2)},
+ {<<"doc_write_failures">>,
+ ets:lookup_element(Stats, doc_write_failures, 2)}
+ ]},
+ % limit history to 50 entries
+ NewRepHistory = {[
+ {<<"session_id">>, SessionId},
+ {<<"source_last_seq">>, RecordSeqNum},
+ {<<"history">>, lists:sublist([NewHistoryEntry | OldHistory], 50)}
+ ]},
+ % ?LOG_DEBUG("updating src doc ~p", [SourceLog]),
+ {SrcRevPos,SrcRevId} =
+ update_doc(Source, SourceLog#doc{body=NewRepHistory}, []),
+ % ?LOG_DEBUG("updating tgt doc ~p", [TargetLog]),
+ {TgtRevPos,TgtRevId} =
+ update_doc(Target, TargetLog#doc{body=NewRepHistory}, []),
+ State#state{
+ checkpoint_history = NewRepHistory,
+ source_log = SourceLog#doc{revs={SrcRevPos, [SrcRevId]}},
+ target_log = TargetLog#doc{revs={TgtRevPos, [TgtRevId]}}
+ }.
+
+commit_to_both(Source, Target) ->
+ % commit the src async
+ ParentPid = self(),
+ SrcCommitPid = spawn_link(fun() ->
+ ParentPid ! {self(), ensure_full_commit(Source)} end),
+
+ % commit tgt sync
+ TargetStartTime = ensure_full_commit(Target),
+
+ SourceStartTime =
+ receive
+ {SrcCommitPid, Timestamp} ->
+ Timestamp;
+ {'EXIT', SrcCommitPid, {http_request_failed, _}} ->
+ exit(replication_link_failure)
+ end,
+ {SourceStartTime, TargetStartTime}.
+
+ensure_full_commit(#http_db{} = Db) ->
+ Req = Db#http_db{
+ resource = "_ensure_full_commit",
+ method = post,
+ body = true
+ },
+ {ResultProps} = couch_rep_httpc:request(Req),
+ true = proplists:get_value(<<"ok">>, ResultProps),
+ proplists:get_value(<<"instance_start_time">>, ResultProps);
+ensure_full_commit(Db) ->
+ {ok, StartTime} = couch_db:ensure_full_commit(Db),
+ StartTime.
+
+update_doc(#http_db{} = Db, #doc{id=DocId} = Doc, []) ->
+ Req = Db#http_db{
+ resource = couch_util:url_encode(DocId),
+ method = put,
+ body = couch_doc:to_json_obj(Doc, [attachments])
+ },
+ {ResponseMembers} = couch_rep_httpc:request(Req),
Rev = proplists:get_value(<<"rev">>, ResponseMembers),
- {ok, couch_doc:parse_rev(Rev)};
+ couch_doc:parse_rev(Rev);
update_doc(Db, Doc, Options) ->
- couch_db:update_doc(Db, Doc, Options).
-
-update_docs(_, [], _, _) ->
- {ok, []};
-update_docs(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth}, Docs, [], replicated_changes) ->
- JsonDocs = [couch_doc:to_json_obj(Doc, [revs,attachments]) || Doc <- Docs],
- ErrorsJson =
- do_http_request(DbUrl ++ "_bulk_docs", post, Headers, OAuth,
- {[{new_edits, false}, {docs, JsonDocs}]}),
- ErrorsList =
- lists:map(
- fun({Props}) ->
- Id = proplists:get_value(<<"id">>, Props),
- Rev = couch_doc:parse_rev(proplists:get_value(<<"rev">>, Props)),
- ErrId = couch_util:to_existing_atom(
- proplists:get_value(<<"error">>, Props)),
- Reason = proplists:get_value(<<"reason">>, Props),
- Error = {ErrId, Reason},
- {{Id, Rev}, Error}
- end, ErrorsJson),
- {ok, ErrorsList};
-update_docs(Db, Docs, Options, UpdateType) ->
- couch_db:update_docs(Db, Docs, Options, UpdateType).
-
-up_to_date(#old_http_db{}, _Seq) ->
+ {ok, Result} = couch_db:update_doc(Db, Doc, Options),
+ Result.
+
+up_to_date(#http_db{}, _Seq) ->
true;
up_to_date(Source, Seq) ->
{ok, NewDb} = couch_db:open(Source#db.name, []),
T = NewDb#db.update_seq == Seq,
couch_db:close(NewDb),
T.
-