summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAdam Kocoloski <kocolosk@apache.org>2009-08-10 18:37:43 +0000
committerAdam Kocoloski <kocolosk@apache.org>2009-08-10 18:37:43 +0000
commit5dcbc2290ac780f1a625b5c9435cfb35eac4e1ef (patch)
treebc9e04c73807b9eb34e05d5c70026b2e951fc673
parentabcc5a35fda60c7124af7899939f09e59ae7968b (diff)
new replicator using _changes feed for continuous replication
git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@802888 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--src/couchdb/Makefile.am8
-rw-r--r--src/couchdb/couch_httpd_misc_handlers.erl27
-rw-r--r--src/couchdb/couch_rep.erl1156
-rw-r--r--src/couchdb/couch_rep_att.erl100
-rw-r--r--src/couchdb/couch_rep_changes_feed.erl1
-rw-r--r--src/couchdb/couch_rep_missing_revs.erl15
-rw-r--r--src/couchdb/couch_rep_reader.erl268
-rw-r--r--src/couchdb/couch_rep_writer.erl68
8 files changed, 843 insertions, 800 deletions
diff --git a/src/couchdb/Makefile.am b/src/couchdb/Makefile.am
index f32497fa..6691dbba 100644
--- a/src/couchdb/Makefile.am
+++ b/src/couchdb/Makefile.am
@@ -74,10 +74,13 @@ source_files = \
couch_query_servers.erl \
couch_ref_counter.erl \
couch_rep.erl \
+ couch_rep_att.erl \
couch_rep_changes_feed.erl \
couch_rep_httpc.erl \
couch_rep_missing_revs.erl \
+ couch_rep_reader.erl \
couch_rep_sup.erl \
+ couch_rep_writer.erl \
couch_server.erl \
couch_server_sup.erl \
couch_stats_aggregator.erl \
@@ -123,10 +126,13 @@ compiled_files = \
couch_query_servers.beam \
couch_ref_counter.beam \
couch_rep.beam \
+ couch_rep_att.beam \
couch_rep_changes_feed.beam \
- couch_rep_missing_revs.beam \
couch_rep_httpc.beam \
+ couch_rep_missing_revs.beam \
+ couch_rep_reader.beam \
couch_rep_sup.beam \
+ couch_rep_writer.beam \
couch_server.beam \
couch_server_sup.beam \
couch_stats_aggregator.beam \
diff --git a/src/couchdb/couch_httpd_misc_handlers.erl b/src/couchdb/couch_httpd_misc_handlers.erl
index eea353bd..c4f28308 100644
--- a/src/couchdb/couch_httpd_misc_handlers.erl
+++ b/src/couchdb/couch_httpd_misc_handlers.erl
@@ -77,32 +77,9 @@ handle_task_status_req(#httpd{method='GET'}=Req) ->
handle_task_status_req(Req) ->
send_method_not_allowed(Req, "GET,HEAD").
-% add trailing slash if missing
-fix_db_url(UrlBin) ->
- ?l2b(case lists:last(Url = ?b2l(UrlBin)) of
- $/ -> Url;
- _ -> Url ++ "/"
- end).
-
-
-get_rep_endpoint(_Req, {Props}) ->
- Url = proplists:get_value(<<"url">>, Props),
- {BinHeaders} = proplists:get_value(<<"headers">>, Props, {[]}),
- Auth = proplists:get_value(<<"auth">>, Props, undefined),
- ?LOG_DEBUG("AUTH ~p", [Auth]),
- {remote, fix_db_url(Url), [{?b2l(K),?b2l(V)} || {K,V} <- BinHeaders], Auth};
-get_rep_endpoint(_Req, <<"http://",_/binary>>=Url) ->
- {remote, fix_db_url(Url), [], []};
-get_rep_endpoint(_Req, <<"https://",_/binary>>=Url) ->
- {remote, fix_db_url(Url), [], []};
-get_rep_endpoint(#httpd{user_ctx=UserCtx}, <<DbName/binary>>) ->
- {local, DbName, UserCtx}.
-
handle_replicate_req(#httpd{method='POST'}=Req) ->
- {Props} = couch_httpd:json_body_obj(Req),
- Source = get_rep_endpoint(Req, proplists:get_value(<<"source">>, Props)),
- Target = get_rep_endpoint(Req, proplists:get_value(<<"target">>, Props)),
- case couch_rep:replicate(Source, Target) of
+ PostBody = couch_httpd:json_body_obj(Req),
+ case couch_rep:replicate(PostBody, Req#httpd.user_ctx) of
{ok, {JsonResults}} ->
send_json(Req, {[{ok, true} | JsonResults]});
{error, {Type, Details}} ->
diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl
index 1692943a..c5a07685 100644
--- a/src/couchdb/couch_rep.erl
+++ b/src/couchdb/couch_rep.erl
@@ -12,155 +12,97 @@
-module(couch_rep).
-behaviour(gen_server).
--export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
-export([replicate/2]).
--define(BUFFER_NDOCS, 1000).
--define(BUFFER_NATTACHMENTS, 50).
--define(BUFFER_MEMORY, 10000000). %% bytes
-
-include("couch_db.hrl").
--include("../ibrowse/ibrowse.hrl").
-
-%% @spec replicate(Source::binary(), Target::binary()) ->
-%% {ok, Stats} | {error, Reason}
-%% @doc Triggers a replication. Stats is a JSON Object with the following
-%% keys: session_id (UUID), source_last_seq (integer), and history (array).
-%% Each element of the history is an Object with keys start_time, end_time,
-%% start_last_seq, end_last_seq, missing_checked, missing_found, docs_read,
-%% and docs_written.
-%%
-%% The supervisor will try to restart the replication in case of any error
-%% other than shutdown. Just call this function again to listen for the
-%% result of the retry.
-replicate(Source, Target) ->
- {ok, HostName} = inet:gethostname(),
- RepId = couch_util:to_hex(
- erlang:md5(term_to_binary([HostName, Source, Target]))),
- Args = [?MODULE, [RepId, Source,Target], []],
+-record(state, {
+ changes_feed,
+ missing_revs,
+ reader,
+ writer,
+
+ source,
+ target,
+ init_args,
+
+ start_seq,
+ history,
+ source_log,
+ target_log,
+ rep_starttime,
+ src_starttime,
+ tgt_starttime,
+ checkpoint_history = nil,
+
+ listeners = [],
+ complete = false,
+ committed_seq = 0,
+
+ stats = nil
+}).
+%% convenience function to do a simple replication from the shell
+replicate(Source, Target) when is_list(Source) ->
+ replicate(?l2b(Source), Target);
+replicate(Source, Target) when is_binary(Source), is_list(Target) ->
+ replicate(Source, ?l2b(Target));
+replicate(Source, Target) when is_binary(Source), is_binary(Target) ->
+ replicate({[{<<"source">>, Source}, {<<"target">>, Target}]}, #user_ctx{});
+
+%% function handling POST to _replicate
+replicate(PostBody, UserCtx) ->
+ RepId = make_replication_id(PostBody, UserCtx),
Replicator = {RepId,
- {gen_server, start_link, Args},
+ {gen_server, start_link, [?MODULE, [RepId, PostBody, UserCtx], []]},
transient,
1,
worker,
[?MODULE]
},
- Server = case supervisor:start_child(couch_rep_sup, Replicator) of
- {ok, Pid} ->
- ?LOG_INFO("starting new replication ~p at ~p", [RepId, Pid]),
- Pid;
- {error, already_present} ->
- case supervisor:restart_child(couch_rep_sup, RepId) of
- {ok, Pid} ->
- ?LOG_INFO("starting replication ~p at ~p", [RepId, Pid]),
- Pid;
- {error, running} ->
- %% this error occurs if multiple replicators are racing
- %% each other to start and somebody else won. Just grab
- %% the Pid by calling start_child again.
- {error, {already_started, Pid}} =
- supervisor:start_child(couch_rep_sup, Replicator),
- ?LOG_INFO("replication ~p already running at ~p", [RepId, Pid]),
- Pid
- end;
- {error, {already_started, Pid}} ->
- ?LOG_INFO("replication ~p already running at ~p", [RepId, Pid]),
- Pid
- end,
+ Server = start_replication_server(Replicator),
- case gen_server:call(Server, get_result, infinity) of
- retry -> replicate(Source, Target);
- Else -> Else
+ try gen_server:call(Server, get_result, infinity) of
+ retry -> replicate(PostBody, UserCtx);
+ Else -> Else
+ catch
+ exit:{noproc, {gen_server, call, [Server, get_result , infinity]}} ->
+ %% oops, this replication just finished -- restart it.
+ replicate(PostBody, UserCtx);
+ exit:{normal, {gen_server, call, [Server, get_result , infinity]}} ->
+ %% we made the call during terminate
+ replicate(PostBody, UserCtx)
end.
-%%=============================================================================
-%% gen_server callbacks
-%%=============================================================================
-
--record(old_http_db, {
- uri,
- headers,
- oauth
-}).
-
-
--record(state, {
- context,
- current_seq,
- source,
- target,
- stats,
- enum_pid,
- docs_buffer = [],
- listeners = [],
- done = false
-}).
-
-
-init([RepId, Source, Target]) ->
+init([RepId, {PostProps}, UserCtx] = InitArgs) ->
process_flag(trap_exit, true),
- {ok, DbSrc, SrcName} = open_db(Source),
- {ok, DbTgt, TgtName} = open_db(Target),
+ SourceProps = proplists:get_value(<<"source">>, PostProps),
+ TargetProps = proplists:get_value(<<"target">>, PostProps),
- DocKey = ?l2b(?LOCAL_DOC_PREFIX ++ RepId),
+ Source = open_db(SourceProps, UserCtx),
+ Target = open_db(TargetProps, UserCtx),
- {ok, InfoSrc} = get_db_info(DbSrc),
- {ok, InfoTgt} = get_db_info(DbTgt),
+ SourceLog = open_replication_log(Source, RepId),
+ TargetLog = open_replication_log(Target, RepId),
- ReplicationStartTime = httpd_util:rfc1123_date(),
- SrcInstanceStartTime = proplists:get_value(instance_start_time, InfoSrc),
- TgtInstanceStartTime = proplists:get_value(instance_start_time, InfoTgt),
+ SourceInfo = dbinfo(Source),
+ TargetInfo = dbinfo(Target),
+
+ {StartSeq, History} = compare_replication_logs(SourceLog, TargetLog),
- RepRecDocSrc =
- case open_doc(DbSrc, DocKey, []) of
- {ok, SrcDoc} ->
- ?LOG_DEBUG("Found existing replication record on source", []),
- SrcDoc;
- _ -> #doc{id=DocKey}
- end,
-
- RepRecDocTgt =
- case open_doc(DbTgt, DocKey, []) of
- {ok, TgtDoc} ->
- ?LOG_DEBUG("Found existing replication record on target", []),
- TgtDoc;
- _ -> #doc{id=DocKey}
- end,
-
- #doc{body={RepRecProps}} = RepRecDocSrc,
- #doc{body={RepRecPropsTgt}} = RepRecDocTgt,
-
- case proplists:get_value(<<"session_id">>, RepRecProps) ==
- proplists:get_value(<<"session_id">>, RepRecPropsTgt) of
- true ->
- % if the records have the same session id,
- % then we have a valid replication history
- OldSeqNum = proplists:get_value(<<"source_last_seq">>, RepRecProps, 0),
- OldHistory = proplists:get_value(<<"history">>, RepRecProps, []);
- false ->
- ?LOG_INFO("Replication records differ. "
- "Performing full replication instead of incremental.", []),
- ?LOG_DEBUG("Record on source:~p~nRecord on target:~p~n",
- [RepRecProps, RepRecPropsTgt]),
- OldSeqNum = 0,
- OldHistory = []
- end,
-
- Context = [
- {start_seq, OldSeqNum},
- {history, OldHistory},
- {rep_starttime, ReplicationStartTime},
- {src_starttime, SrcInstanceStartTime},
- {tgt_starttime, TgtInstanceStartTime},
- {src_record, RepRecDocSrc},
- {tgt_record, RepRecDocTgt}
- ],
+ {ok, ChangesFeed} =
+ couch_rep_changes_feed:start_link(self(), Source, StartSeq, PostProps),
+ {ok, MissingRevs} =
+ couch_rep_missing_revs:start_link(self(), Target, ChangesFeed, PostProps),
+ {ok, Reader} =
+ couch_rep_reader:start_link(self(), Source, MissingRevs, PostProps),
+ {ok, Writer} =
+ couch_rep_writer:start_link(self(), Target, Reader, PostProps),
Stats = ets:new(replication_stats, [set, private]),
ets:insert(Stats, {total_revs,0}),
@@ -169,158 +111,116 @@ init([RepId, Source, Target]) ->
ets:insert(Stats, {docs_written, 0}),
ets:insert(Stats, {doc_write_failures, 0}),
- couch_task_status:add_task("Replication", <<SrcName/binary, " -> ",
- TgtName/binary>>, "Starting"),
-
- Parent = self(),
- Pid = spawn_link(fun() -> enum_docs_since(Parent,DbSrc,DbTgt,{OldSeqNum,0}) end),
+ {ShortId, _} = lists:split(6, RepId),
+ couch_task_status:add_task("Replication", io_lib:format("~s: ~s -> ~s",
+ [ShortId, dbname(Source), dbname(Target)]), "Starting"),
State = #state{
- context = Context,
- current_seq = OldSeqNum,
- enum_pid = Pid,
- source = DbSrc,
- target = DbTgt,
- stats = Stats
- },
+ changes_feed = ChangesFeed,
+ missing_revs = MissingRevs,
+ reader = Reader,
+ writer = Writer,
- {ok, State}.
-handle_call(get_result, From, #state{listeners=L,done=true} = State) ->
- {stop, normal, State#state{listeners=[From|L]}};
-handle_call(get_result, From, #state{listeners=L} = State) ->
- {noreply, State#state{listeners=[From|L]}};
-
-handle_call({replicate_doc, {Id, Revs}}, {Pid,_}, #state{enum_pid=Pid} = State) ->
- #state{
- context = Context,
- current_seq = Seq,
- docs_buffer = Buffer,
source = Source,
target = Target,
- stats = Stats
- } = State,
+ init_args = InitArgs,
+ stats = Stats,
+
+ start_seq = StartSeq,
+ history = History,
+ source_log = SourceLog,
+ target_log = TargetLog,
+ rep_starttime = httpd_util:rfc1123_date(),
+ src_starttime = proplists:get_value(instance_start_time, SourceInfo),
+ tgt_starttime = proplists:get_value(instance_start_time, TargetInfo)
+ },
+ {ok, State}.
- ets:update_counter(Stats, missing_revs, length(Revs)),
+handle_call(get_result, From, #state{complete=true, listeners=[]} = State) ->
+ {stop, normal, State#state{listeners=[From]}};
+handle_call(get_result, From, State) ->
+ Listeners = State#state.listeners,
+ {noreply, State#state{listeners=[From|Listeners]}}.
- %% get document(s)
- {ok, DocResults} = open_doc_revs(Source, Id, Revs, [latest]),
- Docs = [RevDoc || {ok, RevDoc} <- DocResults],
- ets:update_counter(Stats, docs_read, length(Docs)),
+handle_cast(_Msg, State) ->
+ {noreply, State}.
- %% save them (maybe in a buffer)
- {NewBuffer, NewContext} =
- case should_flush(lists:flatlength([Docs|Buffer])) of
- true ->
- Docs2 = lists:flatten([Docs|Buffer]),
- try update_docs(Target, Docs2, [], replicated_changes) of
- {ok, Errors} ->
- dump_update_errors(Errors),
- ets:update_counter(Stats, doc_write_failures, length(Errors)),
- ets:update_counter(Stats, docs_written, length(Docs2) -
- length(Errors)),
- {ok, _, Ctxt} = do_checkpoint(Source, Target, Context, Seq, Stats),
- {[], Ctxt}
- catch
- throw:attachment_write_failed ->
- ?LOG_ERROR("attachment request failed during write to disk", []),
- exit({internal_server_error, replication_link_failure})
- end;
- false ->
- {[Docs | Buffer], Context}
- end,
+handle_info({missing_revs_checkpoint, SourceSeq}, State) ->
+ couch_task_status:update("MR Processed source update #~p", [SourceSeq]),
+ {noreply, do_checkpoint(State#state{committed_seq = SourceSeq})};
+
+handle_info({writer_checkpoint, SourceSeq}, #state{committed_seq=N} = State)
+ when SourceSeq > N ->
+ MissingRevs = State#state.missing_revs,
+ ok = gen_server:cast(MissingRevs, {update_committed_seq, SourceSeq}),
+ couch_task_status:update("W Processed source update #~p", [SourceSeq]),
+ {noreply, do_checkpoint(State#state{committed_seq = SourceSeq})};
+handle_info({writer_checkpoint, _}, State) ->
+ {noreply, State};
- {reply, ok, State#state{context=NewContext, docs_buffer=NewBuffer}};
+handle_info({update_stats, Key, N}, State) ->
+ ets:update_counter(State#state.stats, Key, N),
+ {noreply, State};
-handle_call({fin, {LastSeq, RevsCount}}, {Pid,_}, #state{enum_pid=Pid} = State) ->
- ets:update_counter(State#state.stats, total_revs, RevsCount),
+handle_info({'EXIT', Writer, normal}, #state{writer=Writer} = State) ->
case State#state.listeners of
[] ->
- % still waiting for the first listener to send a request
- {noreply, State#state{current_seq=LastSeq,done=true}};
- _ ->
- {stop, normal, ok, State#state{current_seq=LastSeq}}
- end.
-
-handle_cast({increment_update_seq, Seq}, State) ->
- couch_task_status:update("Processed source update #~p", [Seq]),
- {noreply, State#state{current_seq=Seq}}.
-
-handle_info({'EXIT', Pid, Reason}, #state{enum_pid=Pid} = State) ->
- ?LOG_ERROR("replication enumerator exited with ~p .. respawning", [Reason]),
- #state{
- current_seq = Seq,
- source = Src,
- target = Tgt,
- enum_pid = Pid
- } = State,
- Parent = self(),
- NewPid = spawn_link(fun() -> enum_docs_since(Parent,Src,Tgt,{Seq,0}) end),
- {noreply, State#state{enum_pid=NewPid}};
+ {noreply, State#state{complete = true}};
+ _Else ->
+ {stop, normal, State}
+ end;
-%% if any linked process dies, respawn the enumerator to get things going again
-handle_info({'EXIT', _From, normal}, State) ->
- {noreply, State};
-handle_info({'EXIT', From, Reason}, #state{enum_pid=EnumPid} = State) ->
- ?LOG_ERROR("replicator-linked pid ~p exited with ~p", [From, Reason]),
- exit(EnumPid, pls_restart_kthxbye),
+handle_info({'EXIT', _, normal}, State) ->
{noreply, State};
-
-handle_info(_Msg, State) ->
- {noreply, State}.
+handle_info({'EXIT', Pid, Reason}, State) ->
+ ?LOG_ERROR("exit of linked Pid ~p with reason ~p", [Pid, Reason]),
+ {stop, Reason, State}.
terminate(normal, State) ->
+ % ?LOG_DEBUG("replication terminating normally", []),
#state{
- context = Context,
- current_seq = Seq,
- docs_buffer = Buffer,
+ checkpoint_history = CheckpointHistory,
+ committed_seq = NewSeq,
listeners = Listeners,
source = Source,
target = Target,
- stats = Stats
+ stats = Stats,
+ source_log = #doc{body={OldHistory}}
} = State,
-
- try update_docs(Target, lists:flatten(Buffer), [], replicated_changes) of
- {ok, Errors} ->
- dump_update_errors(Errors),
- ets:update_counter(Stats, doc_write_failures, length(Errors)),
- ets:update_counter(Stats, docs_written, lists:flatlength(Buffer) -
- length(Errors))
- catch
- throw:attachment_write_failed ->
- ?LOG_ERROR("attachment request failed during final write", []),
- exit({internal_server_error, replication_link_failure})
- end,
-
couch_task_status:update("Finishing"),
-
- {ok, NewRepHistory, _} = do_checkpoint(Source, Target, Context, Seq, Stats),
ets:delete(Stats),
close_db(Target),
+
+ NewRepHistory = case CheckpointHistory of
+ nil ->
+ {[{<<"no_changes">>, true} | OldHistory]};
+ _Else ->
+ CheckpointHistory
+ end,
- [Original|Rest] = Listeners,
+ %% reply to original requester
+ [Original|OtherListeners] = lists:reverse(Listeners),
gen_server:reply(Original, {ok, NewRepHistory}),
%% maybe trigger another replication. If this replicator uses a local
%% source Db, changes to that Db since we started will not be included in
%% this pass.
- case up_to_date(Source, Seq) of
+ case up_to_date(Source, NewSeq) of
true ->
- [gen_server:reply(R, {ok, NewRepHistory}) || R <- Rest];
+ [gen_server:reply(R, {ok, NewRepHistory}) || R <- OtherListeners];
false ->
- [gen_server:reply(R, retry) || R <- Rest]
+ [gen_server:reply(R, retry) || R <- OtherListeners]
end,
close_db(Source);
+
terminate(Reason, State) ->
- ?LOG_ERROR("replicator terminating with reason ~p", [Reason]),
#state{
listeners = Listeners,
source = Source,
target = Target,
stats = Stats
} = State,
-
[gen_server:reply(L, {error, Reason}) || L <- Listeners],
-
ets:delete(Stats),
close_db(Target),
close_db(Source).
@@ -328,560 +228,284 @@ terminate(Reason, State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-%%=============================================================================
-%% internal functions
-%%=============================================================================
-
+% internal funs
-% we should probably write these to a special replication log
-% or have a callback where the caller decides what to do with replication
-% errors.
-dump_update_errors([]) -> ok;
-dump_update_errors([{{Id, Rev}, Error}|Rest]) ->
- ?LOG_INFO("error replicating document \"~s\" rev \"~s\":~p",
- [Id, couch_doc:rev_to_str(Rev), Error]),
- dump_update_errors(Rest).
-
-attachment_loop(ReqId, Conn) ->
- couch_util:should_flush(),
- receive
- {From, {set_req_id, NewId}} ->
- %% we learn the ReqId to listen for
- From ! {self(), {ok, NewId}},
- attachment_loop(NewId, Conn);
- {ibrowse_async_headers, ReqId, Status, Headers} ->
- %% we got header, give the controlling process a chance to react
- receive
- {From, gimme_status} ->
- %% send status/headers to controller
- From ! {self(), {status, Status, Headers}},
- receive
- {From, continue} ->
- %% normal case
- attachment_loop(ReqId, Conn);
- {From, fail} ->
- %% error, failure code
- ?LOG_ERROR(
- "streaming attachment failed with status ~p",
- [Status]),
- catch ibrowse:stop_worker_process(Conn),
- exit(attachment_request_failed);
- {From, stop_ok} ->
- %% stop looping, controller will start a new loop
- catch ibrowse:stop_worker_process(Conn),
- stop_ok
- end
- end,
- attachment_loop(ReqId, Conn);
- {ibrowse_async_response, ReqId, {chunk_start,_}} ->
- attachment_loop(ReqId, Conn);
- {ibrowse_async_response, ReqId, chunk_end} ->
- attachment_loop(ReqId, Conn);
- {ibrowse_async_response, ReqId, {error, Err}} ->
- ?LOG_ERROR("streaming attachment failed with ~p", [Err]),
- catch ibrowse:stop_worker_process(Conn),
- exit(attachment_request_failed);
- {ibrowse_async_response, ReqId, Data} ->
- receive {From, gimme_data} -> From ! {self(), Data} end,
- attachment_loop(ReqId, Conn);
- {ibrowse_async_response_end, ReqId} ->
- catch ibrowse:stop_worker_process(Conn),
- exit(normal)
+start_replication_server(Replicator) ->
+ RepId = element(1, Replicator),
+ case supervisor:start_child(couch_rep_sup, Replicator) of
+ {ok, Pid} ->
+ ?LOG_INFO("starting new replication ~p at ~p", [RepId, Pid]),
+ Pid;
+ {error, already_present} ->
+ case supervisor:restart_child(couch_rep_sup, RepId) of
+ {ok, Pid} ->
+ ?LOG_INFO("starting replication ~p at ~p", [RepId, Pid]),
+ Pid;
+ {error, running} ->
+ %% this error occurs if multiple replicators are racing
+ %% each other to start and somebody else won. Just grab
+ %% the Pid by calling start_child again.
+ {error, {already_started, Pid}} =
+ supervisor:start_child(couch_rep_sup, Replicator),
+ ?LOG_DEBUG("replication ~p already running at ~p", [RepId, Pid]),
+ Pid
+ end;
+ {error, {already_started, Pid}} ->
+ ?LOG_DEBUG("replication ~p already running at ~p", [RepId, Pid]),
+ Pid
end.
-att_stub_converter(DbS, Id, Rev,
- #att{name=Name,data=stub,type=Type,len=Length}=Att) ->
- #old_http_db{uri=DbUrl, headers=Headers} = DbS,
- {Pos, [RevId|_]} = Rev,
- Url = lists:flatten([DbUrl, couch_util:url_encode(Id), "/", couch_util:url_encode(?b2l(Name)),
- "?rev=", ?b2l(couch_doc:rev_to_str({Pos,RevId}))]),
- ?LOG_DEBUG("Attachment URL ~s", [Url]),
- {ok, RcvFun} = make_att_stub_receiver(Url, Headers, Name,
- Type, Length),
- Att#att{name=Name,type=Type,data=RcvFun,len=Length}.
-
-make_att_stub_receiver(Url, Headers, Name, Type, Length) ->
- make_att_stub_receiver(Url, Headers, Name, Type, Length, 10, 1000).
-
-make_att_stub_receiver(Url, _Headers, _Name, _Type, _Length, 0, _Pause) ->
- ?LOG_ERROR("streaming attachment request failed after 10 retries: ~s",
- [Url]),
- exit({attachment_request_failed, ?l2b(["failed to replicate ", Url])});
-
-make_att_stub_receiver(Url, Headers, Name, Type, Length, Retries, Pause) ->
- %% start the process that receives attachment data from ibrowse
- #url{host=Host, port=Port} = ibrowse_lib:parse_url(Url),
- {ok, Conn} = ibrowse:spawn_link_worker_process(Host, Port),
- Pid = spawn_link(fun() -> attachment_loop(nil, Conn) end),
-
- %% make the async request
- Opts = [{stream_to, Pid}, {response_format, binary}],
- ReqId =
- case ibrowse:send_req_direct(Conn, Url, Headers, get, [], Opts, infinity) of
- {ibrowse_req_id, X} ->
- X;
- {error, Reason} ->
- ?LOG_INFO("retrying couch_rep attachment request in ~p " ++
- "seconds due to {error, ~p}: ~s", [Pause/1000, Reason, Url]),
- catch ibrowse:stop_worker_process(Conn),
- timer:sleep(Pause),
- make_att_stub_receiver(Url, Headers, Name, Type, Length,
- Retries-1, 2*Pause)
- end,
-
- %% tell our receiver about the ReqId it needs to look for
- Pid ! {self(), {set_req_id, ReqId}},
- receive
- {Pid, {ok, ReqId}} ->
- ok;
- {'EXIT', Pid, _Reason} ->
- catch ibrowse:stop_worker_process(Conn),
- timer:sleep(Pause),
- make_att_stub_receiver(Url, Headers, Name, Type, Length,
- Retries-1, 2*Pause)
- end,
-
- %% wait for headers to ensure that we have a 200 status code
- %% this is where we follow redirects etc
- Pid ! {self(), gimme_status},
- receive
- {'EXIT', Pid, attachment_request_failed} ->
- catch ibrowse:stop_worker_process(Conn),
- make_att_stub_receiver(Url, Headers, Name, Type, Length,
- Retries-1, Pause);
- {Pid, {status, StreamStatus, StreamHeaders}} ->
- ?LOG_DEBUG("streaming attachment Status ~p Headers ~p",
- [StreamStatus, StreamHeaders]),
-
- ResponseCode = list_to_integer(StreamStatus),
- if
- ResponseCode >= 200, ResponseCode < 300 ->
- % the normal case
- Pid ! {self(), continue},
- %% this function goes into the streaming attachment code.
- %% It gets executed by the replication gen_server, so it can't
- %% be the one to actually receive the ibrowse data.
- {ok, fun() ->
- Pid ! {self(), gimme_data},
- receive
- {Pid, Data} ->
- Data;
- {'EXIT', Pid, attachment_request_failed} ->
- throw(attachment_write_failed)
- end
- end};
- ResponseCode >= 300, ResponseCode < 400 ->
- % follow the redirect
- Pid ! {self(), stop_ok},
- RedirectUrl = mochiweb_headers:get_value("Location",
- mochiweb_headers:make(StreamHeaders)),
- catch ibrowse:stop_worker_process(Conn),
- make_att_stub_receiver(RedirectUrl, Headers, Name, Type,
- Length, Retries - 1, Pause);
- ResponseCode >= 400, ResponseCode < 500 ->
- % an error... log and fail
- ?LOG_ERROR("streaming attachment failed with code ~p: ~s",
- [ResponseCode, Url]),
- Pid ! {self(), fail},
- exit(attachment_request_failed);
- ResponseCode == 500 ->
- % an error... log and retry
- ?LOG_INFO("retrying couch_rep attachment request in ~p " ++
- "seconds due to 500 response: ~s", [Pause/1000, Url]),
- Pid ! {self(), fail},
- catch ibrowse:stop_worker_process(Conn),
- timer:sleep(Pause),
- make_att_stub_receiver(Url, Headers, Name, Type, Length,
- Retries - 1, 2*Pause)
- end
+compare_replication_logs(SrcDoc, TgtDoc) ->
+ #doc{body={RepRecProps}} = SrcDoc,
+ #doc{body={RepRecPropsTgt}} = TgtDoc,
+ case proplists:get_value(<<"session_id">>, RepRecProps) ==
+ proplists:get_value(<<"session_id">>, RepRecPropsTgt) of
+ true ->
+ % if the records have the same session id,
+ % then we have a valid replication history
+ OldSeqNum = proplists:get_value(<<"source_last_seq">>, RepRecProps, 0),
+ OldHistory = proplists:get_value(<<"history">>, RepRecProps, []),
+ {OldSeqNum, OldHistory};
+ false ->
+ SourceHistory = proplists:get_value(<<"history">>, RepRecProps, []),
+ TargetHistory = proplists:get_value(<<"history">>, RepRecPropsTgt, []),
+ ?LOG_INFO("Replication records differ. "
+ "Scanning histories to find a common ancestor.", []),
+ ?LOG_DEBUG("Record on source:~p~nRecord on target:~p~n",
+ [RepRecProps, RepRecPropsTgt]),
+ compare_rep_history(SourceHistory, TargetHistory)
end.
-
-open_db({remote, Url, Headers, Auth})->
- {ok, #old_http_db{uri=?b2l(Url), headers=Headers, oauth=Auth}, Url};
-open_db({local, DbName, UserCtx})->
- case couch_db:open(DbName, [{user_ctx, UserCtx}]) of
- {ok, Db} -> {ok, Db, DbName};
- Error -> Error
+compare_rep_history(S, T) when length(S) =:= 0 orelse length(T) =:= 0 ->
+ ?LOG_INFO("no common ancestry -- performing full replication", []),
+ {0, []};
+compare_rep_history([{S}|SourceRest], [{T}|TargetRest]=Target) ->
+ SourceId = proplists:get_value(<<"session_id">>, S),
+ case has_session_id(SourceId, Target) of
+ true ->
+ RecordSeqNum = proplists:get_value(<<"recorded_seq">>, S, 0),
+ ?LOG_INFO("found a common replication record with source_seq ~p",
+ [RecordSeqNum]),
+ {RecordSeqNum, SourceRest};
+ false ->
+ TargetId = proplists:get_value(<<"session_id">>, T),
+ case has_session_id(TargetId, SourceRest) of
+ true ->
+ RecordSeqNum = proplists:get_value(<<"recorded_seq">>, T, 0),
+ ?LOG_INFO("found a common replication record with source_seq ~p",
+ [RecordSeqNum]),
+ {RecordSeqNum, TargetRest};
+ false ->
+ compare_rep_history(SourceRest, TargetRest)
+ end
end.
-
-close_db(#old_http_db{})->
+close_db(#http_db{})->
ok;
close_db(Db)->
couch_db:close(Db).
-do_checkpoint(Source, Target, Context, NewSeqNum, Stats) ->
- ?LOG_INFO("recording a checkpoint at source update_seq ~p", [NewSeqNum]),
- [
- {start_seq, StartSeqNum},
- {history, OldHistory},
- {rep_starttime, ReplicationStartTime},
- {src_starttime, SrcInstanceStartTime},
- {tgt_starttime, TgtInstanceStartTime},
- {src_record, #doc{body={LastRepRecord}}=RepRecDocSrc},
- {tgt_record, RepRecDocTgt}
- ] = Context,
-
- case NewSeqNum == StartSeqNum andalso OldHistory /= [] of
- true ->
- % nothing changed, don't record results
- {ok, {[{<<"no_changes">>, true} | LastRepRecord]}, Context};
- false ->
- % something changed, record results for incremental replication,
-
- % commit changes to both src and tgt. The src because if changes
- % we replicated are lost, we'll record the a seq number ahead
- % of what was committed. If those changes are lost and the seq number
- % reverts to a previous committed value, we will skip future changes
- % when new doc updates are given our already replicated seq nums.
-
- % commit the src async
- ParentPid = self(),
- SrcCommitPid = spawn_link(fun() ->
- ParentPid ! {self(), ensure_full_commit(Source)} end),
-
- % commit tgt sync
- {ok, TgtInstanceStartTime2} = ensure_full_commit(Target),
-
- SrcInstanceStartTime2 =
- receive
- {SrcCommitPid, {ok, Timestamp}} ->
- Timestamp;
- {'EXIT', SrcCommitPid, {http_request_failed, _}} ->
- exit(replication_link_failure)
- end,
-
- RecordSeqNum =
- if SrcInstanceStartTime2 == SrcInstanceStartTime andalso
- TgtInstanceStartTime2 == TgtInstanceStartTime ->
- NewSeqNum;
- true ->
- ?LOG_INFO("A server has restarted sinced replication start. "
- "Not recording the new sequence number to ensure the "
- "replication is redone and documents reexamined.", []),
- StartSeqNum
- end,
-
- NewHistoryEntry = {
- [{<<"start_time">>, list_to_binary(ReplicationStartTime)},
- {<<"end_time">>, list_to_binary(httpd_util:rfc1123_date())},
- {<<"start_last_seq">>, StartSeqNum},
- {<<"end_last_seq">>, NewSeqNum},
- {<<"missing_checked">>, ets:lookup_element(Stats, total_revs, 2)},
- {<<"missing_found">>, ets:lookup_element(Stats, missing_revs, 2)},
- {<<"docs_read">>, ets:lookup_element(Stats, docs_read, 2)},
- {<<"docs_written">>, ets:lookup_element(Stats, docs_written, 2)},
- {<<"doc_write_failures">>, ets:lookup_element(Stats, doc_write_failures, 2)}
- ]},
- % limit history to 50 entries
- HistEntries =lists:sublist([NewHistoryEntry | OldHistory], 50),
-
- NewRepHistory =
- {[{<<"session_id">>, couch_util:new_uuid()},
- {<<"source_last_seq">>, RecordSeqNum},
- {<<"history">>, HistEntries}]},
-
- {ok, {SrcRevPos,SrcRevId}} = update_doc(Source,
- RepRecDocSrc#doc{body=NewRepHistory}, []),
- {ok, {TgtRevPos,TgtRevId}} = update_doc(Target,
- RepRecDocTgt#doc{body=NewRepHistory}, []),
-
- NewContext = [
- {start_seq, StartSeqNum},
- {history, OldHistory},
- {rep_starttime, ReplicationStartTime},
- {src_starttime, SrcInstanceStartTime},
- {tgt_starttime, TgtInstanceStartTime},
- {src_record, RepRecDocSrc#doc{revs={SrcRevPos,[SrcRevId]}}},
- {tgt_record, RepRecDocTgt#doc{revs={TgtRevPos,[TgtRevId]}}}
- ],
-
- {ok, NewRepHistory, NewContext}
-
+dbname(#http_db{} = Db) ->
+ Db#http_db.url;
+dbname(Db) ->
+ Db#db.name.
+
+dbinfo(#http_db{} = Db) ->
+ {DbProps} = couch_rep_httpc:request(Db),
+ [{list_to_atom(?b2l(K)), V} || {K,V} <- DbProps];
+dbinfo(Db) ->
+ {ok, Info} = couch_db:get_db_info(Db),
+ Info.
+
+has_session_id(_SessionId, []) ->
+ false;
+has_session_id(SessionId, [{Props} | Rest]) ->
+ case proplists:get_value(<<"session_id">>, Props, nil) of
+ SessionId ->
+ true;
+ _Else ->
+ has_session_id(SessionId, Rest)
end.
-do_http_request(Url, Action, Headers, Auth) ->
- do_http_request(Url, Action, Headers, Auth, []).
-
-do_http_request(Url, Action, Headers, Auth, JsonBody) ->
- Headers0 = case Auth of
- {Props} ->
- % Add OAuth header
- {OAuth} = proplists:get_value(<<"oauth">>, Props),
- ConsumerKey = ?b2l(proplists:get_value(<<"consumer_key">>, OAuth)),
- Token = ?b2l(proplists:get_value(<<"token">>, OAuth)),
- TokenSecret = ?b2l(proplists:get_value(<<"token_secret">>, OAuth)),
- ConsumerSecret = ?b2l(proplists:get_value(<<"consumer_secret">>, OAuth)),
- Consumer = {ConsumerKey, ConsumerSecret, hmac_sha1},
- Method = case Action of
- get -> "GET";
- post -> "POST";
- put -> "PUT"
- end,
- Params = oauth:signed_params(Method, Url, [], Consumer, Token, TokenSecret),
- [{"Authorization", "OAuth " ++ oauth_uri:params_to_header_string(Params)} | Headers];
- _Else ->
- Headers
- end,
- do_http_request0(Url, Action, Headers0, JsonBody, 10, 1000).
-
-do_http_request0(Url, Action, Headers, Body, Retries, Pause) when is_binary(Url) ->
- do_http_request0(?b2l(Url), Action, Headers, Body, Retries, Pause);
-do_http_request0(Url, Action, _Headers, _JsonBody, 0, _Pause) ->
- ?LOG_ERROR("couch_rep HTTP ~p request failed after 10 retries: ~s",
- [Action, Url]),
- exit({http_request_failed, ?l2b(["failed to replicate ", Url])});
-do_http_request0(Url, Action, Headers, JsonBody, Retries, Pause) ->
- ?LOG_DEBUG("couch_rep HTTP ~p request: ~s", [Action, Url]),
- Body =
- case JsonBody of
- [] ->
- <<>>;
+make_replication_id({Props}, UserCtx) ->
+ %% funky algorithm to preserve backwards compatibility
+ {ok, HostName} = inet:gethostname(),
+ Src = get_rep_endpoint(UserCtx, proplists:get_value(<<"source">>, Props)),
+ Tgt = get_rep_endpoint(UserCtx, proplists:get_value(<<"target">>, Props)),
+ couch_util:to_hex(erlang:md5(term_to_binary([HostName, Src, Tgt]))).
+
+maybe_add_trailing_slash(Url) ->
+ re:replace(Url, "[^/]$", "&/", [{return, list}]).
+
+get_rep_endpoint(_UserCtx, {Props}) ->
+ Url = maybe_add_trailing_slash(proplists:get_value(<<"url">>, Props)),
+ {BinHeaders} = proplists:get_value(<<"headers">>, Props, {[]}),
+ {Auth} = proplists:get_value(<<"auth">>, Props, {[]}),
+ case proplists:get_value(<<"oauth">>, Auth) of
+ undefined ->
+ {remote, Url, [{?b2l(K),?b2l(V)} || {K,V} <- BinHeaders]};
+ {OAuth} ->
+ {remote, Url, [{?b2l(K),?b2l(V)} || {K,V} <- BinHeaders], OAuth}
+ end;
+get_rep_endpoint(_UserCtx, <<"http://",_/binary>>=Url) ->
+ {remote, maybe_add_trailing_slash(Url), []};
+get_rep_endpoint(_UserCtx, <<"https://",_/binary>>=Url) ->
+ {remote, maybe_add_trailing_slash(Url), []};
+get_rep_endpoint(UserCtx, <<DbName/binary>>) ->
+ {local, DbName, UserCtx}.
+
+open_replication_log(#http_db{}=Db, RepId) ->
+ DocId = ?LOCAL_DOC_PREFIX ++ RepId,
+ Req = Db#http_db{resource=couch_util:url_encode(DocId)},
+ case couch_rep_httpc:request(Req) of
+ {[{<<"error">>, _}, {<<"reason">>, _}]} ->
+ ?LOG_DEBUG("didn't find a replication log for ~s", [Db#http_db.url]),
+ #doc{id=?l2b(DocId)};
+ Doc ->
+ ?LOG_DEBUG("found a replication log for ~s", [Db#http_db.url]),
+ couch_doc:from_json_obj(Doc)
+ end;
+open_replication_log(Db, RepId) ->
+ DocId = ?l2b(?LOCAL_DOC_PREFIX ++ RepId),
+ case couch_db:open_doc(Db, DocId, []) of
+ {ok, Doc} ->
+ ?LOG_DEBUG("found a replication log for ~s", [Db#db.name]),
+ Doc;
_ ->
- iolist_to_binary(?JSON_ENCODE(JsonBody))
- end,
- Options = case Action of
- get -> [];
- _ -> [{transfer_encoding, {chunked, 65535}}]
- end ++ [
- {content_type, "application/json; charset=utf-8"},
- {max_pipeline_size, 101},
- {response_format, binary}
- ],
- case ibrowse:send_req(Url, Headers, Action, Body, Options, infinity) of
- {ok, Status, ResponseHeaders, ResponseBody} ->
- ResponseCode = list_to_integer(Status),
- if
- ResponseCode >= 200, ResponseCode < 300 ->
- ?JSON_DECODE(ResponseBody);
- ResponseCode >= 300, ResponseCode < 400 ->
- RedirectUrl = mochiweb_headers:get_value("Location",
- mochiweb_headers:make(ResponseHeaders)),
- do_http_request0(RedirectUrl, Action, Headers, JsonBody, Retries-1,
- Pause);
- ResponseCode >= 400, ResponseCode < 500 ->
- ?JSON_DECODE(ResponseBody);
- ResponseCode == 500 ->
- ?LOG_INFO("retrying couch_rep HTTP ~p request in ~p seconds " ++
- "due to 500 error: ~s", [Action, Pause/1000, Url]),
- timer:sleep(Pause),
- do_http_request0(Url, Action, Headers, JsonBody, Retries - 1, 2*Pause)
- end;
- {error, Reason} ->
- ?LOG_INFO("retrying couch_rep HTTP ~p request in ~p seconds due to " ++
- "{error, ~p}: ~s", [Action, Pause/1000, Reason, Url]),
- timer:sleep(Pause),
- do_http_request0(Url, Action, Headers, JsonBody, Retries - 1, 2*Pause)
+ ?LOG_DEBUG("didn't find a replication log for ~s", [Db#db.name]),
+ #doc{id=DocId}
end.
-ensure_full_commit(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth}) ->
- {ResultProps} = do_http_request(DbUrl ++ "_ensure_full_commit", post,
- Headers, OAuth, true),
- true = proplists:get_value(<<"ok">>, ResultProps),
- {ok, proplists:get_value(<<"instance_start_time">>, ResultProps)};
-ensure_full_commit(Db) ->
- couch_db:ensure_full_commit(Db).
-
-enum_docs_since(Pid, DbSource, DbTarget, {StartSeq, RevsCount}) ->
- case get_doc_info_list(DbSource, StartSeq) of
- [] ->
- gen_server:call(Pid, {fin, {StartSeq, RevsCount}}, infinity);
- DocInfoList ->
- SrcRevsList = lists:map(fun(#doc_info{id=Id,revs=RevInfos}) ->
- SrcRevs = [Rev || #rev_info{rev=Rev} <- RevInfos],
- {Id, SrcRevs}
- end, DocInfoList),
- {ok, MissingRevs} = get_missing_revs(DbTarget, SrcRevsList),
-
- %% do we need to check for success here?
- [gen_server:call(Pid, {replicate_doc, Info}, infinity)
- || Info <- MissingRevs ],
-
- #doc_info{high_seq=LastSeq} = lists:last(DocInfoList),
- RevsCount2 = RevsCount + length(SrcRevsList),
- gen_server:cast(Pid, {increment_update_seq, LastSeq}),
-
- enum_docs_since(Pid, DbSource, DbTarget, {LastSeq, RevsCount2})
- end.
-
-
-
-get_db_info(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth}) ->
- {DbProps} = do_http_request(DbUrl, get, Headers, OAuth),
- {ok, [{list_to_atom(?b2l(K)), V} || {K,V} <- DbProps]};
-get_db_info(Db) ->
- couch_db:get_db_info(Db).
-
-get_doc_info_list(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth}, StartSeq) ->
- Url = DbUrl ++ "_all_docs_by_seq?limit=100&startkey="
- ++ integer_to_list(StartSeq),
- {Results} = do_http_request(Url, get, Headers, OAuth),
- lists:map(fun({RowInfoList}) ->
- {RowValueProps} = proplists:get_value(<<"value">>, RowInfoList),
- Seq = proplists:get_value(<<"key">>, RowInfoList),
- Revs =
- [#rev_info{rev=couch_doc:parse_rev(proplists:get_value(<<"rev">>, RowValueProps)), deleted = proplists:get_value(<<"deleted">>, RowValueProps, false)} |
- [#rev_info{rev=Rev,deleted=false} || Rev <- couch_doc:parse_revs(proplists:get_value(<<"conflicts">>, RowValueProps, []))] ++
- [#rev_info{rev=Rev,deleted=true} || Rev <- couch_doc:parse_revs(proplists:get_value(<<"deleted_conflicts">>, RowValueProps, []))]],
- #doc_info{
- id=proplists:get_value(<<"id">>, RowInfoList),
- high_seq = Seq,
- revs = Revs
- }
- end, proplists:get_value(<<"rows">>, Results));
-get_doc_info_list(DbSource, StartSeq) ->
- {ok, {_Count, DocInfoList}} = couch_db:enum_docs_since(DbSource, StartSeq,
- fun (_, _, {100, DocInfoList}) ->
- {stop, {100, DocInfoList}};
- (DocInfo, _, {Count, DocInfoList}) ->
- {ok, {Count+1, [DocInfo|DocInfoList]}}
- end, {0, []}),
- lists:reverse(DocInfoList).
-
-get_missing_revs(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth}, DocIdRevsList) ->
- DocIdRevsList2 = [{Id, couch_doc:rev_to_strs(Revs)} || {Id, Revs} <- DocIdRevsList],
- {ResponseMembers} = do_http_request(DbUrl ++ "_missing_revs", post, Headers, OAuth,
- {DocIdRevsList2}),
- {DocMissingRevsList} = proplists:get_value(<<"missing_revs">>, ResponseMembers),
- DocMissingRevsList2 = [{Id, couch_doc:parse_revs(MissingRevStrs)} || {Id, MissingRevStrs} <- DocMissingRevsList],
- {ok, DocMissingRevsList2};
-get_missing_revs(Db, DocId) ->
- couch_db:get_missing_revs(Db, DocId).
-
-
-open_doc(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth}, DocId, Options) ->
- [] = Options,
- case do_http_request(DbUrl ++ couch_util:url_encode(DocId), get, Headers, OAuth) of
- {[{<<"error">>, ErrId}, {<<"reason">>, Reason}]} ->
- {couch_util:to_existing_atom(ErrId), Reason};
- Doc ->
- {ok, couch_doc:from_json_obj(Doc)}
+open_db({Props}, _UserCtx) ->
+ Url = maybe_add_trailing_slash(proplists:get_value(<<"url">>, Props)),
+ {AuthProps} = proplists:get_value(<<"auth">>, Props, {[]}),
+ {BinHeaders} = proplists:get_value(<<"headers">>, Props, {[]}),
+ Headers = [{?b2l(K),?b2l(V)} || {K,V} <- BinHeaders],
+ DefaultHeaders = (#http_db{})#http_db.headers,
+ Db = #http_db{
+ url = Url,
+ auth = AuthProps,
+ headers = lists:ukeymerge(1, Headers, DefaultHeaders)
+ },
+ case couch_rep_httpc:db_exists(Db) of
+ true -> Db;
+ false -> throw({db_not_found, Url})
end;
-open_doc(Db, DocId, Options) ->
- couch_db:open_doc(Db, DocId, Options).
-
-open_doc_revs(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth} = DbS, DocId, Revs0,
- [latest]) ->
- Revs = couch_doc:rev_to_strs(Revs0),
- BaseUrl = DbUrl ++ couch_util:url_encode(DocId) ++ "?revs=true&latest=true",
-
- %% MochiWeb expects URLs < 8KB long, so maybe split into multiple requests
- MaxN = trunc((8192 - length(BaseUrl))/14),
-
- JsonResults = case length(Revs) > MaxN of
- false ->
- Url = ?l2b(BaseUrl ++ "&open_revs=" ++ ?JSON_ENCODE(Revs)),
- do_http_request(Url, get, Headers, OAuth);
- true ->
- {_, Rest, Acc} = lists:foldl(
- fun(Rev, {Count, RevsAcc, AccResults}) when Count =:= MaxN ->
- QSRevs = ?JSON_ENCODE(lists:reverse(RevsAcc)),
- Url = ?l2b(BaseUrl ++ "&open_revs=" ++ QSRevs),
- {1, [Rev], AccResults++do_http_request(Url, get, Headers, OAuth)};
- (Rev, {Count, RevsAcc, AccResults}) ->
- {Count+1, [Rev|RevsAcc], AccResults}
- end, {0, [], []}, Revs),
- Acc ++ do_http_request(?l2b(BaseUrl ++ "&open_revs=" ++
- ?JSON_ENCODE(lists:reverse(Rest))), get, Headers, OAuth)
- end,
-
- Results =
- lists:map(
- fun({[{<<"missing">>, Rev}]}) ->
- {{not_found, missing}, couch_doc:parse_rev(Rev)};
- ({[{<<"ok">>, JsonDoc}]}) ->
- #doc{id=Id, revs=Rev, atts=Atts} = Doc =
- couch_doc:from_json_obj(JsonDoc),
- {ok, Doc#doc{atts=[att_stub_converter(DbS,Id,Rev,A) || A <- Atts]}}
- end, JsonResults),
- {ok, Results};
-open_doc_revs(Db, DocId, Revs, Options) ->
- couch_db:open_doc_revs(Db, DocId, Revs, Options).
-
-%% @spec should_flush() -> true | false
-%% @doc Calculates whether it's time to flush the document buffer. Considers
-%% - memory utilization
-%% - number of pending document writes
-%% - approximate number of pending attachment writes
-should_flush(DocCount) when DocCount > ?BUFFER_NDOCS ->
- true;
-should_flush(_DocCount) ->
- MeAndMyLinks = [self()|
- [P || P <- element(2,process_info(self(),links)), is_pid(P)]],
-
- case length(MeAndMyLinks)/2 > ?BUFFER_NATTACHMENTS of
- true -> true;
- false ->
- case memory_footprint(MeAndMyLinks) > 2*?BUFFER_MEMORY of
- true ->
- [garbage_collect(Pid) || Pid <- MeAndMyLinks],
- memory_footprint(MeAndMyLinks) > ?BUFFER_MEMORY;
- false -> false
- end
- end.
-
-%% @spec memory_footprint([pid()]) -> integer()
-%% @doc Sum of process and binary memory utilization for all processes in list
-memory_footprint(PidList) ->
- memory_footprint(PidList, {0,0}).
-
-memory_footprint([], {ProcessMemory, BinaryMemory}) ->
- ?LOG_DEBUG("ProcessMem ~p BinaryMem ~p", [ProcessMemory, BinaryMemory]),
- ProcessMemory + BinaryMemory;
-memory_footprint([Pid|Rest], {ProcAcc, BinAcc}) ->
- case is_process_alive(Pid) of
- true ->
- ProcMem = element(2,process_info(Pid, memory)),
- BinMem = binary_memory(Pid),
- memory_footprint(Rest, {ProcMem + ProcAcc, BinMem + BinAcc});
- false ->
- memory_footprint(Rest, {ProcAcc, BinAcc})
+open_db(<<"http://",_/binary>>=Url, _) ->
+ open_db({[{<<"url">>,Url}]}, []);
+open_db(<<"https://",_/binary>>=Url, _) ->
+ open_db({[{<<"url">>,Url}]}, []);
+open_db(<<DbName/binary>>, UserCtx) ->
+ case couch_db:open(DbName, [{user_ctx, UserCtx}]) of
+ {ok, Db} -> Db;
+ {not_found, no_db_file} -> throw({db_not_found, DbName})
end.
-%% @spec binary_memory(pid()) -> integer()
-%% @doc Memory utilization of all binaries referenced by this process.
-binary_memory(Pid) ->
- lists:foldl(fun({_Id, Size, _NRefs}, Acc) -> Size+Acc end,
- 0, element(2,process_info(Pid, binary))).
-
-update_doc(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth}, #doc{id=DocId}=Doc, Options) ->
- [] = Options,
- Url = DbUrl ++ couch_util:url_encode(DocId),
- {ResponseMembers} = do_http_request(Url, put, Headers, OAuth,
- couch_doc:to_json_obj(Doc, [attachments])),
+do_checkpoint(State) ->
+ #state{
+ source = Source,
+ target = Target,
+ committed_seq = NewSeqNum,
+ start_seq = StartSeqNum,
+ history = OldHistory,
+ source_log = SourceLog,
+ target_log = TargetLog,
+ rep_starttime = ReplicationStartTime,
+ src_starttime = SrcInstanceStartTime,
+ tgt_starttime = TgtInstanceStartTime,
+ stats = Stats
+ } = State,
+ ?LOG_INFO("recording a checkpoint at source update_seq ~p", [NewSeqNum]),
+ RecordSeqNum = case commit_to_both(Source, Target) of
+ {SrcInstanceStartTime, TgtInstanceStartTime} ->
+ NewSeqNum;
+ _Else ->
+ ?LOG_INFO("A server has restarted sinced replication start. "
+ "Not recording the new sequence number to ensure the "
+ "replication is redone and documents reexamined.", []),
+ StartSeqNum
+ end,
+ SessionId = couch_util:new_uuid(),
+ NewHistoryEntry = {[
+ {<<"session_id">>, SessionId},
+ {<<"start_time">>, list_to_binary(ReplicationStartTime)},
+ {<<"end_time">>, list_to_binary(httpd_util:rfc1123_date())},
+ {<<"start_last_seq">>, StartSeqNum},
+ {<<"end_last_seq">>, NewSeqNum},
+ {<<"recorded_seq">>, RecordSeqNum},
+ {<<"missing_checked">>, ets:lookup_element(Stats, total_revs, 2)},
+ {<<"missing_found">>, ets:lookup_element(Stats, missing_revs, 2)},
+ {<<"docs_read">>, ets:lookup_element(Stats, docs_read, 2)},
+ {<<"docs_written">>, ets:lookup_element(Stats, docs_written, 2)},
+ {<<"doc_write_failures">>,
+ ets:lookup_element(Stats, doc_write_failures, 2)}
+ ]},
+ % limit history to 50 entries
+ NewRepHistory = {[
+ {<<"session_id">>, SessionId},
+ {<<"source_last_seq">>, RecordSeqNum},
+ {<<"history">>, lists:sublist([NewHistoryEntry | OldHistory], 50)}
+ ]},
+ % ?LOG_DEBUG("updating src doc ~p", [SourceLog]),
+ {SrcRevPos,SrcRevId} =
+ update_doc(Source, SourceLog#doc{body=NewRepHistory}, []),
+ % ?LOG_DEBUG("updating tgt doc ~p", [TargetLog]),
+ {TgtRevPos,TgtRevId} =
+ update_doc(Target, TargetLog#doc{body=NewRepHistory}, []),
+ State#state{
+ checkpoint_history = NewRepHistory,
+ source_log = SourceLog#doc{revs={SrcRevPos, [SrcRevId]}},
+ target_log = TargetLog#doc{revs={TgtRevPos, [TgtRevId]}}
+ }.
+
+commit_to_both(Source, Target) ->
+ % commit the src async
+ ParentPid = self(),
+ SrcCommitPid = spawn_link(fun() ->
+ ParentPid ! {self(), ensure_full_commit(Source)} end),
+
+ % commit tgt sync
+ TargetStartTime = ensure_full_commit(Target),
+
+ SourceStartTime =
+ receive
+ {SrcCommitPid, Timestamp} ->
+ Timestamp;
+ {'EXIT', SrcCommitPid, {http_request_failed, _}} ->
+ exit(replication_link_failure)
+ end,
+ {SourceStartTime, TargetStartTime}.
+
+ensure_full_commit(#http_db{} = Db) ->
+ Req = Db#http_db{
+ resource = "_ensure_full_commit",
+ method = post,
+ body = true
+ },
+ {ResultProps} = couch_rep_httpc:request(Req),
+ true = proplists:get_value(<<"ok">>, ResultProps),
+ proplists:get_value(<<"instance_start_time">>, ResultProps);
+ensure_full_commit(Db) ->
+ {ok, StartTime} = couch_db:ensure_full_commit(Db),
+ StartTime.
+
+update_doc(#http_db{} = Db, #doc{id=DocId} = Doc, []) ->
+ Req = Db#http_db{
+ resource = couch_util:url_encode(DocId),
+ method = put,
+ body = couch_doc:to_json_obj(Doc, [attachments])
+ },
+ {ResponseMembers} = couch_rep_httpc:request(Req),
Rev = proplists:get_value(<<"rev">>, ResponseMembers),
- {ok, couch_doc:parse_rev(Rev)};
+ couch_doc:parse_rev(Rev);
update_doc(Db, Doc, Options) ->
- couch_db:update_doc(Db, Doc, Options).
-
-update_docs(_, [], _, _) ->
- {ok, []};
-update_docs(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth}, Docs, [], replicated_changes) ->
- JsonDocs = [couch_doc:to_json_obj(Doc, [revs,attachments]) || Doc <- Docs],
- ErrorsJson =
- do_http_request(DbUrl ++ "_bulk_docs", post, Headers, OAuth,
- {[{new_edits, false}, {docs, JsonDocs}]}),
- ErrorsList =
- lists:map(
- fun({Props}) ->
- Id = proplists:get_value(<<"id">>, Props),
- Rev = couch_doc:parse_rev(proplists:get_value(<<"rev">>, Props)),
- ErrId = couch_util:to_existing_atom(
- proplists:get_value(<<"error">>, Props)),
- Reason = proplists:get_value(<<"reason">>, Props),
- Error = {ErrId, Reason},
- {{Id, Rev}, Error}
- end, ErrorsJson),
- {ok, ErrorsList};
-update_docs(Db, Docs, Options, UpdateType) ->
- couch_db:update_docs(Db, Docs, Options, UpdateType).
-
-up_to_date(#old_http_db{}, _Seq) ->
+ {ok, Result} = couch_db:update_doc(Db, Doc, Options),
+ Result.
+
+up_to_date(#http_db{}, _Seq) ->
true;
up_to_date(Source, Seq) ->
{ok, NewDb} = couch_db:open(Source#db.name, []),
T = NewDb#db.update_seq == Seq,
couch_db:close(NewDb),
T.
-
diff --git a/src/couchdb/couch_rep_att.erl b/src/couchdb/couch_rep_att.erl
new file mode 100644
index 00000000..baeb6c65
--- /dev/null
+++ b/src/couchdb/couch_rep_att.erl
@@ -0,0 +1,100 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_rep_att).
+
+-export([convert_stub/2, cleanup/0]).
+
+-include("couch_db.hrl").
+
+convert_stub(#att{data=stub} = Attachment, {#http_db{} = Db, Id, Rev}) ->
+ {Pos, [RevId|_]} = Rev,
+ Name = Attachment#att.name,
+ Request = Db#http_db{
+ resource = lists:flatten([couch_util:url_encode(Id), "/",
+ couch_util:url_encode(Name)]),
+ qs = [{rev, couch_doc:rev_to_str({Pos,RevId})}]
+ },
+ Ref = make_ref(),
+ RcvFun = fun() -> attachment_receiver(Ref, Request) end,
+ Attachment#att{data=RcvFun}.
+
+cleanup() ->
+ receive
+ {ibrowse_async_response, _, _} ->
+ %% TODO maybe log, didn't expect to have data here
+ cleanup();
+ {ibrowse_async_response_end, _} ->
+ cleanup()
+ after 0 ->
+ erase(),
+ ok
+ end.
+
+% internal funs
+
+attachment_receiver(Ref, Request) ->
+ case get(Ref) of
+ undefined ->
+ ReqId = start_http_request(Request),
+ put(Ref, ReqId),
+ receive_data(Ref, ReqId);
+ ReqId ->
+ receive_data(Ref, ReqId)
+ end.
+
+receive_data(Ref, ReqId) ->
+ receive
+ {ibrowse_async_response, ReqId, {chunk_start,_}} ->
+ receive_data(Ref, ReqId);
+ {ibrowse_async_response, ReqId, chunk_end} ->
+ receive_data(Ref, ReqId);
+ {ibrowse_async_response, ReqId, {error, Err}} ->
+ ?LOG_ERROR("streaming attachment ~p failed with ~p", [ReqId, Err]),
+ throw({attachment_request_failed, Err});
+ {ibrowse_async_response, ReqId, Data} ->
+ % ?LOG_DEBUG("got ~p bytes for ~p", [size(Data), ReqId]),
+ Data;
+ {ibrowse_async_response_end, ReqId} ->
+ ?LOG_ERROR("streaming att. ended but more data requested ~p", [ReqId]),
+ throw({attachment_request_failed, premature_end})
+ end.
+
+start_http_request(Req) ->
+ %% set stream_to here because self() has changed
+ Req2 = Req#http_db{options = [{stream_to,self()} | Req#http_db.options]},
+ {ibrowse_req_id, ReqId} = couch_rep_httpc:request(Req2),
+ receive {ibrowse_async_headers, ReqId, Code, Headers} ->
+ case validate_headers(Req2, list_to_integer(Code), Headers) of
+ ok ->
+ ReqId;
+ {ok, NewReqId} ->
+ NewReqId
+ end
+ end.
+
+validate_headers(_Req, 200, _Headers) ->
+ ok;
+validate_headers(Req, Code, Headers) when Code > 299, Code < 400 ->
+ %% TODO check that the qs is actually included in the Location header
+ %% TODO this only supports one level of redirection
+ Url = mochiweb_headers:get_value("Location",mochiweb_headers:make(Headers)),
+ NewReq = Req#http_db{url=Url, resource="", qs=[]},
+ {ibrowse_req_id, ReqId} = couch_rep_httpc:request(NewReq),
+ receive {ibrowse_async_headers, ReqId, NewCode, NewHeaders} ->
+ ok = validate_headers(NewReq, list_to_integer(NewCode), NewHeaders)
+ end,
+ {ok, ReqId};
+validate_headers(Req, Code, _Headers) ->
+ #http_db{url=Url, resource=Resource} = Req,
+ ?LOG_ERROR("got ~p for ~s~s", [Code, Url, Resource]),
+ throw({attachment_request_failed, {bad_code, Code}}).
diff --git a/src/couchdb/couch_rep_changes_feed.erl b/src/couchdb/couch_rep_changes_feed.erl
index 7c2d05b0..ce9e4812 100644
--- a/src/couchdb/couch_rep_changes_feed.erl
+++ b/src/couchdb/couch_rep_changes_feed.erl
@@ -47,6 +47,7 @@ stop(Server) ->
gen_server:call(Server, stop).
init([_Parent, #http_db{}=Source, Since, PostProps]) ->
+ process_flag(trap_exit, true),
Feed = case proplists:get_value(<<"continuous">>, PostProps, false) of
false ->
normal;
diff --git a/src/couchdb/couch_rep_missing_revs.erl b/src/couchdb/couch_rep_missing_revs.erl
index bd7cda66..7e1dc16a 100644
--- a/src/couchdb/couch_rep_missing_revs.erl
+++ b/src/couchdb/couch_rep_missing_revs.erl
@@ -51,19 +51,17 @@ init([Parent, Target, ChangesFeed, _PostProps]) ->
{ok, #state{changes_loop=Pid, target=Target, parent=Parent}}.
handle_call({add_missing_revs, {HighSeq, Revs}}, From, State) ->
+ State#state.parent ! {update_stats, missing_revs, length(Revs)},
handle_add_missing_revs(HighSeq, Revs, From, State);
handle_call(next_missing_revs, From, State) ->
- handle_next_missing_revs(From, State);
+ handle_next_missing_revs(From, State).
-handle_call({update_committed_seq, N}, _From, State) ->
+handle_cast({update_committed_seq, N}, State) ->
if State#state.high_committed_seq < N ->
?LOG_DEBUG("missing_revs updating committed seq to ~p", [N]);
true -> ok end,
- {reply, ok, State#state{high_committed_seq=N}}.
-
-handle_cast(_Msg, State) ->
- {noreply, State}.
+ {noreply, State#state{high_committed_seq=N}}.
handle_info({'EXIT', Pid, Reason}, #state{changes_loop=Pid} = State) ->
handle_changes_loop_exit(Reason, State);
@@ -84,8 +82,9 @@ code_change(_OldVsn, State, _Extra) ->
%internal funs
handle_add_missing_revs(HighSeq, [], _From, State) ->
- maybe_checkpoint(State),
- {reply, ok, State#state{high_source_seq=HighSeq}};
+ NewState = State#state{high_source_seq=HighSeq},
+ maybe_checkpoint(NewState),
+ {reply, ok, NewState};
handle_add_missing_revs(HighSeq, Revs, From, #state{reply_to=nil} = State) ->
#state{rows=Rows, count=Count} = State,
NewState = State#state{
diff --git a/src/couchdb/couch_rep_reader.erl b/src/couchdb/couch_rep_reader.erl
new file mode 100644
index 00000000..d17c6c59
--- /dev/null
+++ b/src/couchdb/couch_rep_reader.erl
@@ -0,0 +1,268 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_rep_reader).
+-behaviour(gen_server).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3]).
+
+-export([start_link/4, next/1]).
+
+-import(couch_util, [url_encode/1]).
+
+-define (BUFFER_SIZE, 1000).
+-define (MAX_CONCURRENT_REQUESTS, 100).
+-define (MAX_CONNECTIONS, 20).
+-define (MAX_PIPELINE_SIZE, 50).
+
+-include("couch_db.hrl").
+-include("../ibrowse/ibrowse.hrl").
+
+-record (state, {
+ parent,
+ source,
+ missing_revs,
+ reader_loop,
+ reader_from = nil,
+ count = 0,
+ docs = queue:new(),
+ reply_to = nil,
+ complete = false,
+ monitor_count = 0,
+ monitor_count_by_seq = ets:new(monitor_count_by_seq, [set, private]),
+ monitors_by_ref = ets:new(monitors_by_ref, [set, private]),
+ pending_doc_request = nil,
+ high_missing_seq = 0
+}).
+
+start_link(Parent, Source, MissingRevs, PostProps) ->
+ gen_server:start_link(?MODULE, [Parent, Source, MissingRevs, PostProps], []).
+
+next(Pid) ->
+ gen_server:call(Pid, next_docs, infinity).
+
+init([Parent, Source, MissingRevs, _PostProps]) ->
+ process_flag(trap_exit, true),
+ if is_record(Source, http_db) ->
+ #url{host=Host, port=Port} = ibrowse_lib:parse_url(Source#http_db.url),
+ ibrowse:set_max_sessions(Host, Port, ?MAX_CONNECTIONS),
+ ibrowse:set_max_pipeline_size(Host, Port, ?MAX_PIPELINE_SIZE);
+ true -> ok end,
+ Self = self(),
+ ReaderLoop = spawn_link(fun() -> reader_loop(Self, Source, MissingRevs) end),
+ State = #state{
+ parent = Parent,
+ source = Source,
+ missing_revs = MissingRevs,
+ reader_loop = ReaderLoop
+ },
+ {ok, State}.
+
+handle_call({add_docs, Docs}, From, State) ->
+ State#state.parent ! {update_stats, docs_read, length(Docs)},
+ handle_add_docs(lists:flatten(Docs), From, State);
+
+handle_call(next_docs, From, State) ->
+ handle_next_docs(From, State);
+
+handle_call({open_doc_revs, Id, Revs, HighSeq}, From, State) ->
+ handle_open_doc_revs(Id, Revs, HighSeq, From, State);
+
+handle_call({set_monitor_count, Seq, Count}, _From, State) ->
+ ets:insert(State#state.monitor_count_by_seq, {Seq,Count}),
+ {reply, ok, State};
+
+handle_call({update_high_seq, HighSeq}, _From, State) ->
+ {reply, ok, State#state{high_missing_seq=HighSeq}}.
+
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+handle_info({'DOWN', Ref, _, _, Reason}, State) ->
+ handle_monitor_down(Reason, Ref, State);
+
+handle_info({'EXIT', Loop, complete}, #state{reader_loop=Loop} = State) ->
+ handle_reader_loop_complete(State).
+
+terminate(Reason, _State) ->
+ % ?LOG_INFO("rep reader terminating with reason ~p", [Reason]),
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%internal funs
+
+handle_add_docs(DocsToAdd, From, #state{reply_to=nil} = State) ->
+ NewState = State#state{
+ docs = queue:join(State#state.docs, queue:from_list(DocsToAdd)),
+ count = State#state.count + length(DocsToAdd)
+ },
+ if NewState#state.count < ?BUFFER_SIZE ->
+ {reply, ok, NewState};
+ true ->
+ {noreply, NewState#state{reader_from=From}}
+ end;
+handle_add_docs(DocsToAdd, _From, #state{count=0} = State) ->
+ HighSeq = State#state.high_missing_seq,
+ gen_server:reply(State#state.reply_to, {HighSeq, DocsToAdd}),
+ {reply, ok, State#state{reply_to=nil}}.
+
+handle_next_docs(From, #state{count=0} = State) ->
+ if State#state.complete ->
+ {stop, normal, {complete, State#state.high_missing_seq}, State};
+ true ->
+ {noreply, State#state{reply_to=From}}
+ end;
+handle_next_docs(_From, State) ->
+ #state{
+ reader_from = ReaderFrom,
+ docs = Docs,
+ high_missing_seq = HighSeq
+ } = State,
+ if ReaderFrom =/= nil ->
+ gen_server:reply(ReaderFrom, ok);
+ true -> ok end,
+ NewState = State#state{count=0, reader_from=nil, docs=queue:new()},
+ {reply, {HighSeq, queue:to_list(Docs)}, NewState}.
+
+handle_open_doc_revs(Id, Revs, Seq, From, #state{monitor_count=N} = State)
+ when N > ?MAX_CONCURRENT_REQUESTS ->
+ {noreply, State#state{pending_doc_request={From,Id,Revs,Seq}}};
+handle_open_doc_revs(Id, Revs, Seq, _From, #state{source=#http_db{}} = State) ->
+ #state{
+ monitor_count = Count,
+ monitors_by_ref = MonitorsByRef,
+ source = Source
+ } = State,
+ {_, Ref} = spawn_document_request(Source, Id, Revs),
+ ets:insert(MonitorsByRef, {Ref, Seq}),
+ {reply, ok, State#state{monitor_count = Count+1}}.
+
+handle_monitor_down(normal, Ref, #state{pending_doc_request=nil,
+ monitor_count=1, complete=waiting_on_monitors} = State) ->
+ N = calculate_new_high_seq(State, Ref),
+ {noreply, State#state{complete=true, monitor_count=0, high_missing_seq=N}};
+handle_monitor_down(normal, Ref, #state{pending_doc_request=nil} = State) ->
+ #state{monitor_count = Count} = State,
+ HighSeq = calculate_new_high_seq(State, Ref),
+ {noreply, State#state{monitor_count = Count-1, high_missing_seq=HighSeq}};
+handle_monitor_down(normal, Ref, State) ->
+ #state{
+ source = Source,
+ monitors_by_ref = MonitorsByRef,
+ pending_doc_request = {From, Id, Revs, Seq}
+ } = State,
+ HighSeq = calculate_new_high_seq(State, Ref),
+ gen_server:reply(From, ok),
+ {_, NewRef} = spawn_document_request(Source, Id, Revs),
+ ets:insert(MonitorsByRef, {NewRef, Seq}),
+ {noreply, State#state{pending_doc_request=nil, high_missing_seq=HighSeq}};
+handle_monitor_down(Reason, _, State) ->
+ {stop, Reason, State}.
+
+handle_reader_loop_complete(#state{reply_to=nil, monitor_count=0} = State) ->
+ {noreply, State#state{complete = true}};
+handle_reader_loop_complete(#state{monitor_count=0} = State) ->
+ HighSeq = State#state.high_missing_seq,
+ gen_server:reply(State#state.reply_to, {complete, HighSeq}),
+ {stop, normal, State};
+handle_reader_loop_complete(State) ->
+ {noreply, State#state{complete = waiting_on_monitors}}.
+
+split_revlist(Rev, {[CurrentAcc|Rest], BaseLength, Length}) ->
+ case Length+size(Rev) > 8192 of
+ false ->
+ {[[Rev|CurrentAcc] | Rest], BaseLength, Length+size(Rev)};
+ true ->
+ {[[Rev],CurrentAcc|Rest], BaseLength, BaseLength}
+ end.
+
+open_doc_revs(#http_db{} = DbS, DocId, Revs) ->
+ %% all this logic just splits up revision lists that are too long for
+ %% MochiWeb into multiple requests
+ BaseQS = [{revs,true}, {latest,true}],
+ BaseReq = DbS#http_db{resource=url_encode(DocId), qs=BaseQS},
+ BaseLength = length(couch_rep_httpc:full_url(BaseReq)) + 11, % &open_revs=
+
+ {RevLists, _, _} = lists:foldl(fun split_revlist/2,
+ {[[]], BaseLength, BaseLength}, couch_doc:rev_to_strs(Revs)),
+
+ Requests = [BaseReq#http_db{
+ qs = [{open_revs, ?JSON_ENCODE(RevList)} | BaseQS]
+ } || RevList <- RevLists],
+ JsonResults = lists:flatten([couch_rep_httpc:request(R) || R <- Requests]),
+
+ Transform =
+ fun({[{<<"missing">>, Rev}]}) ->
+ {{not_found, missing}, couch_doc:parse_rev(Rev)};
+ ({[{<<"ok">>, Json}]}) ->
+ #doc{id=Id, revs=Rev, atts=Atts} = Doc = couch_doc:from_json_obj(Json),
+ Doc#doc{atts=[couch_rep_att:convert_stub(A, {DbS,Id,Rev}) || A <- Atts]}
+ end,
+ [Transform(Result) || Result <- JsonResults].
+
+reader_loop(ReaderServer, Source, MissingRevsServer) ->
+ case couch_rep_missing_revs:next(MissingRevsServer) of
+ complete ->
+ % ?LOG_INFO("reader_loop terminating with complete", []),
+ exit(complete);
+ {HighSeq, IdsRevs} ->
+ % ?LOG_DEBUG("got IdsRevs ~p", [IdsRevs]),
+ case Source of
+ #http_db{} ->
+ N = length(IdsRevs),
+ gen_server:call(ReaderServer, {set_monitor_count, HighSeq, N}),
+ [gen_server:call(ReaderServer, {open_doc_revs, Id, Revs, HighSeq})
+ || {Id,Revs} <- IdsRevs];
+ _Local ->
+ lists:foreach(fun({Id,Revs}) ->
+ {ok, Docs} = couch_db:open_doc_revs(Source, Id, Revs, [latest]),
+ JustTheDocs = [Doc || {ok, Doc} <- Docs],
+ gen_server:call(ReaderServer, {add_docs, JustTheDocs})
+ end, IdsRevs),
+ gen_server:call(ReaderServer, {update_high_seq, HighSeq})
+ end
+ end,
+ reader_loop(ReaderServer, Source, MissingRevsServer).
+
+spawn_document_request(Source, Id, Revs) ->
+ Server = self(),
+ SpawnFun = fun() ->
+ Results = open_doc_revs(Source, Id, Revs),
+ gen_server:call(Server, {add_docs, Results})
+ end,
+ spawn_monitor(SpawnFun).
+
+%% check if any more HTTP requests are pending for this update sequence
+calculate_new_high_seq(State, Ref) ->
+ #state{
+ monitors_by_ref = MonitorsByRef,
+ monitor_count_by_seq = MonitorCountBySeq,
+ high_missing_seq = OldSeq
+ } = State,
+ Seq = ets:lookup_element(MonitorsByRef, Ref, 2),
+ ets:delete(MonitorsByRef, Ref),
+ case ets:update_counter(MonitorCountBySeq, Seq, -1) of
+ 0 ->
+ ets:delete(MonitorCountBySeq, Seq),
+ case ets:first(MonitorCountBySeq) of
+ Key when Key > Seq ->
+ Seq;
+ '$end_of_table' ->
+ Seq;
+ _Else ->
+ OldSeq
+ end;
+ _Else ->
+ OldSeq
+ end.
diff --git a/src/couchdb/couch_rep_writer.erl b/src/couchdb/couch_rep_writer.erl
new file mode 100644
index 00000000..8bea63fe
--- /dev/null
+++ b/src/couchdb/couch_rep_writer.erl
@@ -0,0 +1,68 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_rep_writer).
+
+-export([start_link/4]).
+
+-include("couch_db.hrl").
+
+start_link(Parent, Target, Reader, _PostProps) ->
+ {ok, spawn_link(fun() -> writer_loop(Parent, Reader, Target) end)}.
+
+writer_loop(Parent, Reader, Target) ->
+ % ?LOG_DEBUG("writer loop begin", []),
+ case couch_rep_reader:next(Reader) of
+ {complete, FinalSeq} ->
+ % ?LOG_INFO("writer terminating normally", []),
+ Parent ! {writer_checkpoint, FinalSeq},
+ ok;
+ {HighSeq, Docs} ->
+ % ?LOG_DEBUG("writer loop trying to write ~p", [Docs]),
+ DocCount = length(Docs),
+ try write_docs(Target, Docs) of
+ {ok, []} ->
+ Parent ! {update_stats, docs_written, DocCount};
+ {ok, Errors} ->
+ ErrorCount = length(Errors),
+ Parent ! {update_stats, doc_write_failures, ErrorCount},
+ Parent ! {update_stats, docs_written, DocCount - ErrorCount}
+ catch
+ {attachment_request_failed, Err} ->
+ ?LOG_DEBUG("writer failed to write an attachment ~p", [Err]),
+ exit({attachment_request_failed, Err, Docs})
+ end,
+ Parent ! {writer_checkpoint, HighSeq},
+ couch_rep_att:cleanup(),
+ writer_loop(Parent, Reader, Target)
+ end.
+
+write_docs(#http_db{} = Db, Docs) ->
+ JsonDocs = [couch_doc:to_json_obj(Doc, [revs,attachments]) || Doc <- Docs],
+ ErrorsJson = couch_rep_httpc:request(Db#http_db{
+ resource = "_bulk_docs",
+ method = post,
+ body = {[{new_edits, false}, {docs, JsonDocs}]}
+ }),
+ ErrorsList =
+ lists:map(
+ fun({Props}) ->
+ Id = proplists:get_value(<<"id">>, Props),
+ Rev = couch_doc:parse_rev(proplists:get_value(<<"rev">>, Props)),
+ ErrId = couch_util:to_existing_atom(
+ proplists:get_value(<<"error">>, Props)),
+ Reason = proplists:get_value(<<"reason">>, Props),
+ {{Id, Rev}, {ErrId, Reason}}
+ end, ErrorsJson),
+ {ok, ErrorsList};
+write_docs(Db, Docs) ->
+ couch_db:update_docs(Db, Docs, [], replicated_changes).