summaryrefslogtreecommitdiff
path: root/src/couchdb
diff options
context:
space:
mode:
authorAdam Kocoloski <kocolosk@apache.org>2009-03-07 18:48:47 +0000
committerAdam Kocoloski <kocolosk@apache.org>2009-03-07 18:48:47 +0000
commitf7c2f1f59ef95d4c4976c56c1bbf718f8036ca87 (patch)
tree00c7c16650d31701746f6b944ae3e4ab070c3823 /src/couchdb
parent5b9b9823e091b6e8720d3930785f59c424239daa (diff)
rewrite replicator using OTP behaviours
- only one instance of given source->target runs at a time - supervisor restarts replications that terminate abnormally - pull repl. streams attachments directly to disk - improved memory utilization - temporarily rollback parallel async doc GETs during pull rep. - replication updates show up in Futon Status window git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@751305 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src/couchdb')
-rw-r--r--src/couchdb/Makefile.am3
-rw-r--r--src/couchdb/couch_doc.erl30
-rw-r--r--src/couchdb/couch_rep.erl769
-rw-r--r--src/couchdb/couch_server_sup.erl10
-rw-r--r--src/couchdb/couch_util.erl21
5 files changed, 499 insertions, 334 deletions
diff --git a/src/couchdb/Makefile.am b/src/couchdb/Makefile.am
index 51e4878c..8bea5290 100644
--- a/src/couchdb/Makefile.am
+++ b/src/couchdb/Makefile.am
@@ -64,6 +64,7 @@ source_files = \
couch_query_servers.erl \
couch_ref_counter.erl \
couch_rep.erl \
+ couch_rep_sup.erl \
couch_server.erl \
couch_server_sup.erl \
couch_stats_aggregator.erl \
@@ -104,6 +105,7 @@ compiled_files = \
couch_query_servers.beam \
couch_ref_counter.beam \
couch_rep.beam \
+ couch_rep_sup.beam \
couch_server.beam \
couch_server_sup.beam \
couch_stats_aggregator.beam \
@@ -139,6 +141,7 @@ compiled_files = \
# couch_log.html \
# couch_query_servers.html \
# couch_rep.html \
+# couch_rep_sup.html \
# couch_server.html \
# couch_server_sup.html \
# couch_stream.html \
diff --git a/src/couchdb/couch_doc.erl b/src/couchdb/couch_doc.erl
index 1eb1575a..9860ac0c 100644
--- a/src/couchdb/couch_doc.erl
+++ b/src/couchdb/couch_doc.erl
@@ -51,7 +51,13 @@ to_json_revs_info(Meta) ->
to_json_attachment_stubs(Attachments) ->
BinProps = lists:map(
- fun({Name, {Type, BinValue}}) ->
+ fun({Name, {Type, {_RcvFun, Length}}}) ->
+ {Name, {[
+ {<<"stub">>, true},
+ {<<"content_type">>, Type},
+ {<<"length">>, Length}
+ ]}};
+ ({Name, {Type, BinValue}}) ->
{Name, {[
{<<"stub">>, true},
{<<"content_type">>, Type},
@@ -66,7 +72,13 @@ to_json_attachment_stubs(Attachments) ->
to_json_attachments(Attachments) ->
BinProps = lists:map(
- fun({Name, {Type, BinValue}}) ->
+ fun({Name, {Type, {RcvFun, Length}}}) ->
+ Data = read_streamed_attachment(RcvFun, Length, _Acc = []),
+ {Name, {[
+ {<<"content_type">>, Type},
+ {<<"data">>, couch_util:encodeBase64(Data)}
+ ]}};
+ ({Name, {Type, BinValue}}) ->
{Name, {[
{<<"content_type">>, Type},
{<<"data">>, couch_util:encodeBase64(bin_to_binary(BinValue))}
@@ -100,7 +112,9 @@ from_json_obj({Props}) ->
Bins = lists:flatmap(fun({Name, {BinProps}}) ->
case proplists:get_value(<<"stub">>, BinProps) of
true ->
- [{Name, stub}];
+ Type = proplists:get_value(<<"content_type">>, BinProps),
+ Length = proplists:get_value(<<"length">>, BinProps),
+ [{Name, {stub, Type, Length}}];
_ ->
Value = proplists:get_value(<<"data">>, BinProps),
Type = proplists:get_value(<<"content_type">>, BinProps,
@@ -225,7 +239,7 @@ has_stubs(#doc{attachments=Bins}) ->
has_stubs(Bins);
has_stubs([]) ->
false;
-has_stubs([{_Name, stub}|_]) ->
+has_stubs([{_Name, {stub, _, _}}|_]) ->
true;
has_stubs([_Bin|Rest]) ->
has_stubs(Rest).
@@ -233,9 +247,15 @@ has_stubs([_Bin|Rest]) ->
merge_stubs(#doc{attachments=MemBins}=StubsDoc, #doc{attachments=DiskBins}) ->
BinDict = dict:from_list(DiskBins),
MergedBins = lists:map(
- fun({Name, stub}) ->
+ fun({Name, {stub, _, _}}) ->
{Name, dict:fetch(Name, BinDict)};
({Name, Value}) ->
{Name, Value}
end, MemBins),
StubsDoc#doc{attachments= MergedBins}.
+
+read_streamed_attachment(_RcvFun, 0, Acc) ->
+ list_to_binary(lists:reverse(Acc));
+read_streamed_attachment(RcvFun, LenLeft, Acc) ->
+ Bin = RcvFun(),
+ read_streamed_attachment(RcvFun, LenLeft - size(Bin), [Bin|Acc]).
diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl
index e0c4d470..3df2d821 100644
--- a/src/couchdb/couch_rep.erl
+++ b/src/couchdb/couch_rep.erl
@@ -11,170 +11,343 @@
% the License.
-module(couch_rep).
+-behaviour(gen_server).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3]).
+
+-export([replicate/3]).
+
+-include_lib("couch_db.hrl").
+
+%% @spec replicate(Source::binary(), Target::binary(), Options::proplist()) ->
+%% {ok, Stats} | {error, Reason}
+%% @doc Triggers a replication. Stats is a JSON Object with the following
+%% keys: session_id (UUID), source_last_seq (integer), and history (array).
+%% Each element of the history is an Object with keys start_time, end_time,
+%% start_last_seq, end_last_seq, missing_checked, missing_found, docs_read,
+%% and docs_written.
+%%
+%% The supervisor will try to restart the replication in case of any error
+%% other than shutdown. Just call this function again to listen for the
+%% result of the retry.
+replicate(Source, Target, Options) ->
+ Id = <<Source/binary, ":", Target/binary>>,
+ Args = [?MODULE, [Source,Target,Options], []],
+
+ Replicator = {Id,
+ {gen_server, start_link, Args},
+ transient,
+ 10000,
+ worker,
+ [?MODULE]
+ },
+
+ Server = case supervisor:start_child(couch_rep_sup, Replicator) of
+ {ok, Pid} ->
+ ?LOG_INFO("starting new replication ~p at ~p", [Id, Pid]),
+ Pid;
+ {error, already_present} ->
+ case supervisor:restart_child(couch_rep_sup, Id) of
+ {ok, Pid} ->
+ ?LOG_INFO("starting replication ~p at ~p", [Id, Pid]),
+ Pid;
+ {error, running} ->
+ %% this error occurs if multiple replicators are racing
+ %% each other to start and somebody else won. Just grab
+ %% the Pid by calling start_child again.
+ {error, {already_started, Pid}} =
+ supervisor:start_child(couch_rep_sup, Replicator),
+ ?LOG_INFO("replication ~p already running at ~p", [Id, Pid]),
+ Pid
+ end;
+ {error, {already_started, Pid}} ->
+ ?LOG_INFO("replication ~p already running at ~p", [Id, Pid]),
+ Pid
+ end,
+
+ case gen_server:call(Server, get_result, infinity) of
+ retry -> replicate(Source, Target, Options);
+ Else -> Else
+ end.
--include("couch_db.hrl").
+%%=============================================================================
+%% gen_server callbacks
+%%=============================================================================
-record(http_db, {
uri,
headers
}).
--export([replicate/2, replicate/3]).
-
-url_encode(Bin) when is_binary(Bin) ->
- url_encode(binary_to_list(Bin));
-url_encode([H|T]) ->
- if
- H >= $a, $z >= H ->
- [H|url_encode(T)];
- H >= $A, $Z >= H ->
- [H|url_encode(T)];
- H >= $0, $9 >= H ->
- [H|url_encode(T)];
- H == $_; H == $.; H == $-; H == $: ->
- [H|url_encode(T)];
- true ->
- case lists:flatten(io_lib:format("~.16.0B", [H])) of
- [X, Y] ->
- [$%, X, Y | url_encode(T)];
- [X] ->
- [$%, $0, X | url_encode(T)]
- end
- end;
-url_encode([]) ->
- [].
-
-
-replicate(DbNameA, DbNameB) ->
- replicate(DbNameA, DbNameB, []).
+-record(state, {
+ context,
+ current_seq,
+ source,
+ target,
+ stats,
+ enum_pid,
+ docs_buffer = [],
+ listeners = []
+}).
-replicate(Source, Target, Options) ->
- {ok, DbSrc} = open_db(Source,
- proplists:get_value(source_options, Options, [])),
- try
- {ok, DbTgt} = open_db(Target,
- proplists:get_value(target_options, Options, [])),
- try
- replicate2(Source, DbSrc, Target, DbTgt, Options)
- after
- close_db(DbTgt)
- end
- after
- close_db(DbSrc)
- end.
+init([Source, Target, Options]) ->
+ {ok, DbSrc} =
+ open_db(Source, proplists:get_value(source_options, Options, [])),
+ {ok, DbTgt} =
+ open_db(Target, proplists:get_value(target_options, Options, [])),
-replicate2(Source, DbSrc, Target, DbTgt, Options) ->
- {ok, HostName} = inet:gethostname(),
- HostNameBin = list_to_binary(HostName),
- RepRecKey = <<?LOCAL_DOC_PREFIX, HostNameBin/binary,
- ":", Source/binary, ":", Target/binary>>,
-
- ReplicationStartTime = httpd_util:rfc1123_date(),
+ {ok, Host} = inet:gethostname(),
+ HostBin = list_to_binary(Host),
+ DocKey = <<?LOCAL_DOC_PREFIX, HostBin/binary, ":", Source/binary, ":",
+ Target/binary>>,
{ok, InfoSrc} = get_db_info(DbSrc),
{ok, InfoTgt} = get_db_info(DbTgt),
+ ReplicationStartTime = httpd_util:rfc1123_date(),
SrcInstanceStartTime = proplists:get_value(instance_start_time, InfoSrc),
TgtInstanceStartTime = proplists:get_value(instance_start_time, InfoTgt),
case proplists:get_value(full, Options, false)
orelse proplists:get_value("full", Options, false) of
true ->
- RepRecSrc = RepRecTgt = #doc{id=RepRecKey};
+ RepRecSrc = RepRecTgt = #doc{id=DocKey};
false ->
- RepRecSrc =
- case open_doc(DbSrc, RepRecKey, []) of
- {ok, SrcDoc} ->
- ?LOG_DEBUG("Found existing replication record on source", []),
- SrcDoc;
- _ -> #doc{id=RepRecKey}
+ RepRecSrc = case open_doc(DbSrc, DocKey, []) of
+ {ok, SrcDoc} ->
+ ?LOG_DEBUG("Found existing replication record on source", []),
+ SrcDoc;
+ _ -> #doc{id=DocKey}
end,
- RepRecTgt =
- case open_doc(DbTgt, RepRecKey, []) of
- {ok, TgtDoc} ->
- ?LOG_DEBUG("Found existing replication record on target", []),
- TgtDoc;
- _ -> #doc{id=RepRecKey}
+ RepRecTgt = case open_doc(DbTgt, DocKey, []) of
+ {ok, TgtDoc} ->
+ ?LOG_DEBUG("Found existing replication record on target", []),
+ TgtDoc;
+ _ -> #doc{id=DocKey}
end
end,
#doc{body={OldRepHistoryProps}} = RepRecSrc,
#doc{body={OldRepHistoryPropsTrg}} = RepRecTgt,
- SeqNum =
- case OldRepHistoryProps == OldRepHistoryPropsTrg of
- true ->
- % if the records are identical, then we have a valid replication history
- proplists:get_value(<<"source_last_seq">>, OldRepHistoryProps, 0);
- false ->
- ?LOG_INFO("Replication records differ. "
+ SeqNum = case OldRepHistoryProps == OldRepHistoryPropsTrg of
+ true ->
+ % if the records are identical, then we have a valid replication history
+ proplists:get_value(<<"source_last_seq">>, OldRepHistoryProps, 0);
+ false ->
+ ?LOG_INFO("Replication records differ. "
"Performing full replication instead of incremental.", []),
- ?LOG_DEBUG("Record on source:~p~nRecord on target:~p~n", [OldRepHistoryProps, OldRepHistoryPropsTrg]),
+ ?LOG_DEBUG("Record on source:~p~nRecord on target:~p~n",
+ [OldRepHistoryProps, OldRepHistoryPropsTrg]),
0
end,
+
+ Context = [
+ {start_seq, SeqNum},
+ {history, OldRepHistoryProps},
+ {rep_starttime, ReplicationStartTime},
+ {src_starttime, SrcInstanceStartTime},
+ {tgt_starttime, TgtInstanceStartTime},
+ {src_record, RepRecSrc},
+ {tgt_record, RepRecTgt}
+ ],
+
+ Stats = ets:new(replication_stats, [set, private]),
+ ets:insert(Stats, {total_revs,0}),
+ ets:insert(Stats, {missing_revs, 0}),
+ ets:insert(Stats, {docs_read, 0}),
+ ets:insert(Stats, {docs_written, 0}),
+
+ couch_task_status:add_task("Replication", <<Source/binary, " -> ",
+ Target/binary>>, "Starting"),
+
+ Parent = self(),
+ Pid = spawn_link(fun() -> enum_docs_since(Parent,DbSrc,DbTgt,{SeqNum,0}) end),
+
+ State = #state{
+ context = Context,
+ current_seq = SeqNum,
+ enum_pid = Pid,
+ source = DbSrc,
+ target = DbTgt,
+ stats = Stats
+ },
+
+ {ok, State}.
+
- {NewSeqNum, Stats} = pull_rep(DbTgt, DbSrc, SeqNum),
+handle_call(get_result, From, #state{listeners=L} = State) ->
+ {noreply, State#state{listeners=[From|L]}};
+
+handle_call({replicate_doc, {Id, Revs}}, {Pid,_}, #state{enum_pid=Pid} = State) ->
+ #state{
+ docs_buffer = Buffer,
+ source = Source,
+ target = Target,
+ stats = Stats
+ } = State,
- case NewSeqNum == SeqNum andalso OldRepHistoryProps /= [] of
- true ->
- % nothing changed, don't record results
- {ok, {OldRepHistoryProps}};
- false ->
- % commit changes to both src and tgt. The src because if changes
- % we replicated are lost, we'll record the a seq number of ahead
- % of what was committed and therefore lose future changes with the
- % same seq nums.
-
- {ok, SrcInstanceStartTime2} = ensure_full_commit(DbSrc),
- {ok, TgtInstanceStartTime2} = ensure_full_commit(DbTgt),
-
- RecordSeqNum =
- if SrcInstanceStartTime2 == SrcInstanceStartTime andalso
- TgtInstanceStartTime2 == TgtInstanceStartTime ->
- NewSeqNum;
+ ets:update_counter(Stats, missing_revs, length(Revs)),
+
+ %% get document(s)
+ {ok, DocResults} = open_doc_revs(Source, Id, Revs, [latest]),
+ Docs = [RevDoc || {ok, RevDoc} <- DocResults],
+ ets:update_counter(Stats, docs_read, length(Docs)),
+
+ %% save them (maybe in a buffer)
+ NewBuffer = case couch_util:should_flush() of
true ->
- ?LOG_INFO("A server has restarted sinced replication start. "
- "Not recording the new sequence number to ensure the "
- "replication is redone and documents reexamined.", []),
- SeqNum
- end,
-
- HistEntries =[
- {
- [{<<"start_time">>, list_to_binary(ReplicationStartTime)},
- {<<"end_time">>, list_to_binary(httpd_util:rfc1123_date())},
- {<<"start_last_seq">>, SeqNum},
- {<<"end_last_seq">>, NewSeqNum} | Stats]}
- | proplists:get_value("history", OldRepHistoryProps, [])],
- % something changed, record results
- NewRepHistory =
- {
- [{<<"session_id">>, couch_util:new_uuid()},
- {<<"source_last_seq">>, RecordSeqNum},
- {<<"history">>, lists:sublist(HistEntries, 50)}]},
-
- {ok, _} = update_doc(DbSrc, RepRecSrc#doc{body=NewRepHistory}, []),
- {ok, _} = update_doc(DbTgt, RepRecTgt#doc{body=NewRepHistory}, []),
- {ok, NewRepHistory}
+ Docs2 = lists:flatten([Docs|Buffer]),
+ ok = update_docs(Target, Docs2, [], false),
+ ets:update_counter(Stats, docs_written, length(Docs2)),
+ [];
+ false ->
+ [Docs | Buffer]
+ end,
+
+ {reply, ok, State#state{docs_buffer=NewBuffer}};
+
+handle_call({fin, {LastSeq, RevsCount}}, {Pid,_}, #state{enum_pid=Pid} = State) ->
+ ets:update_counter(State#state.stats, total_revs, RevsCount),
+ {stop, normal, ok, State#state{current_seq=LastSeq}}.
+
+handle_cast({increment_update_seq, Seq}, State) ->
+ couch_task_status:update("Processed source update #~p", [Seq]),
+ {noreply, State#state{current_seq=Seq}}.
+
+handle_info(_Msg, State) ->
+ {noreply, State}.
+
+terminate(normal, State) ->
+ #state{
+ context = Context,
+ current_seq = Seq,
+ docs_buffer = Buffer,
+ listeners = Listeners,
+ source = Source,
+ target = Target,
+ stats = Stats
+ } = State,
+
+ ok = update_docs(Target, lists:flatten(Buffer), [], false),
+ ets:update_counter(Stats, docs_written, lists:flatlength(Buffer)),
+
+ couch_task_status:update("Finishing"),
+
+ %% format replication history
+ JsonStats = [
+ {<<"missing_checked">>, ets:lookup_element(Stats, total_revs, 2)},
+ {<<"missing_found">>, ets:lookup_element(Stats, missing_revs, 2)},
+ {<<"docs_read">>, ets:lookup_element(Stats, docs_read, 2)},
+ {<<"docs_written">>, ets:lookup_element(Stats, docs_written, 2)}
+ ],
+ ets:delete(Stats),
+ {ok, NewRepHistory} = finalize_response(Source, Target, Context, Seq, JsonStats),
+
+ %% update local documents
+ RepRecSrc = proplists:get_value(src_record, Context),
+ RepRecTgt = proplists:get_value(tgt_record, Context),
+ {ok, _} = update_local_doc(Source, RepRecSrc#doc{body=NewRepHistory}, []),
+ {ok, _} = update_local_doc(Target, RepRecTgt#doc{body=NewRepHistory}, []),
+
+ close_db(Target),
+
+ %% reply to original requester
+ [Original|Rest] = Listeners,
+ gen_server:reply(Original, {ok, NewRepHistory}),
+
+ %% maybe trigger another replication. If this replicator uses a local
+ %% source Db, changes to that Db since we started will not be included in
+ %% this pass.
+ case up_to_date(Source, Seq) of
+ true ->
+ [gen_server:reply(R, {ok, NewRepHistory}) || R <- Rest];
+ false ->
+ [gen_server:reply(R, retry) || R <- Rest]
+ end,
+ close_db(Source);
+terminate(Reason, State) ->
+ #state{
+ listeners = Listeners,
+ source = Source,
+ target = Target,
+ stats = Stats
+ } = State,
+
+ [gen_server:reply(L, {error, Reason}) || L <- Listeners],
+
+ ets:delete(Stats),
+ close_db(Target),
+ close_db(Source).
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%=============================================================================
+%% internal functions
+%%=============================================================================
+
+attachment_loop(ReqId) ->
+ couch_util:should_flush(),
+ receive
+ {From, {set_req_id, NewId}} ->
+ From ! {self(), {ok, NewId}},
+ attachment_loop(NewId);
+ {ibrowse_async_headers, ReqId, _Status, _Headers} ->
+ attachment_loop(ReqId);
+ {ibrowse_async_response, ReqId, {chunk_start,_}} ->
+ attachment_loop(ReqId);
+ {ibrowse_async_response, ReqId, chunk_end} ->
+ attachment_loop(ReqId);
+ {ibrowse_async_response, ReqId, Data} ->
+ receive {From, gimme_data} -> From ! {self(), Data} end,
+ attachment_loop(ReqId);
+ {ibrowse_async_response_end, ReqId} -> ok
end.
-pull_rep(DbTarget, DbSource, SourceSeqNum) ->
- {ok, {NewSeq, Stats}} =
- enum_docs_since(DbSource, DbTarget, SourceSeqNum, {SourceSeqNum, []}),
- {NewSeq, Stats}.
+attachment_stub_converter(DbS, Id, {Name, {stub, Type, Length}}) ->
+ #http_db{uri=DbUrl, headers=Headers} = DbS,
+ % TODO worry about revisions
+ Url = DbUrl ++ url_encode(Id) ++ "/" ++ ?b2l(Name),
+ ?LOG_DEBUG("Attachment URL ~p", [Url]),
+
+ %% start the process that receives attachment data from ibrowse
+ Pid = spawn_link(fun() -> attachment_loop(nil) end),
+
+ %% make the async request
+ Options = [{stream_to, Pid}, {response_format, binary}],
+ {ibrowse_req_id, ReqId} = ibrowse:send_req(Url, Headers, get, [], Options),
+
+ %% tell our receiver about the ReqId it needs to look for
+ Pid ! {self(), {set_req_id, ReqId}},
+ receive {Pid, {ok, ReqId}} -> ok end,
+
+ %% this is the function that goes into the streaming attachment code.
+ %% It gets executed by the replication gen_server, so it can't
+ %% be the one to actually receive the ibrowse data.
+ RcvFun = fun() ->
+ Pid ! {self(), gimme_data},
+ receive {Pid, Data} -> Data end
+ end,
+ {Name, {Type, {RcvFun, Length}}}.
+
+close_db(#http_db{})->
+ ok;
+close_db(Db)->
+ couch_db:close(Db).
do_http_request(Url, Action, Headers) ->
do_http_request(Url, Action, Headers, []).
do_http_request(Url, Action, Headers, JsonBody) ->
- do_http_request(Url, Action, Headers, JsonBody, 10).
+ do_http_request(?b2l(?l2b(Url)), Action, Headers, JsonBody, 10).
do_http_request(Url, Action, _Headers, _JsonBody, 0) ->
- ?LOG_ERROR("couch_rep HTTP ~p request failed after 10 retries: ~p",
+ ?LOG_ERROR("couch_rep HTTP ~p request failed after 10 retries: ~s",
[Action, Url]);
do_http_request(Url, Action, Headers, JsonBody, Retries) ->
- ?LOG_DEBUG("couch_rep HTTP ~p request: ~p", [Action, Url]),
+ ?LOG_DEBUG("couch_rep HTTP ~p request: ~s", [Action, Url]),
Body =
case JsonBody of
[] ->
@@ -187,9 +360,10 @@ do_http_request(Url, Action, Headers, JsonBody, Retries) ->
_ -> [{transfer_encoding, {chunked, 65535}}]
end ++ [
{content_type, "application/json; charset=utf-8"},
- {max_pipeline_size, 101}
+ {max_pipeline_size, 101},
+ {response_format, binary}
],
- case ibrowse:send_req(Url, Headers, Action, Body, Options) of
+ case ibrowse:send_req(Url, Headers, Action, Body, Options, infinity) of
{ok, Status, ResponseHeaders, ResponseBody} ->
ResponseCode = list_to_integer(Status),
if
@@ -202,139 +376,113 @@ do_http_request(Url, Action, Headers, JsonBody, Retries) ->
ResponseCode >= 400, ResponseCode < 500 ->
?JSON_DECODE(ResponseBody);
ResponseCode == 500 ->
- ?LOG_INFO("retrying couch_rep HTTP ~p request due to 500 error: ~p",
+ ?LOG_INFO("retrying couch_rep HTTP ~p request due to 500 error: ~s",
[Action, Url]),
do_http_request(Url, Action, Headers, JsonBody, Retries - 1)
end;
{error, Reason} ->
- ?LOG_INFO("retrying couch_rep HTTP ~p request due to {error, ~p}: ~p",
+ ?LOG_INFO("retrying couch_rep HTTP ~p request due to {error, ~p}: ~s",
[Action, Reason, Url]),
do_http_request(Url, Action, Headers, JsonBody, Retries - 1)
end.
-save_docs_buffer(DbTarget, DocsBuffer, []) ->
- receive
- {Src, shutdown} ->
- ok = update_docs(DbTarget, lists:reverse(DocsBuffer), [], false),
- Src ! {done, self(), [{<<"docs_written">>, length(DocsBuffer)}]}
- end;
-save_docs_buffer(DbTarget, DocsBuffer, UpdateSequences) ->
- [NextSeq|Rest] = UpdateSequences,
- receive
- {Src, skip, NextSeq} ->
- Src ! got_it,
- save_docs_buffer(DbTarget, DocsBuffer, Rest);
- {Src, docs, {NextSeq, Docs}} ->
- Src ! got_it,
- case couch_util:should_flush() of
- true ->
- ok = update_docs(DbTarget, lists:reverse(Docs++DocsBuffer), [],
- false),
- save_docs_buffer(DbTarget, [], Rest);
- false ->
- save_docs_buffer(DbTarget, Docs++DocsBuffer, Rest)
- end;
- {Src, shutdown} ->
- ?LOG_ERROR("received shutdown while waiting for more update_seqs", []),
- ok = update_docs(DbTarget, lists:reverse(DocsBuffer), [], false),
- Src ! {done, self(), [{<<"docs_written">>, length(DocsBuffer)}]}
+ensure_full_commit(#http_db{uri=DbUrl, headers=Headers}) ->
+ {ResultProps} = do_http_request(DbUrl ++ "_ensure_full_commit", post,
+ Headers, true),
+ true = proplists:get_value(<<"ok">>, ResultProps),
+ {ok, proplists:get_value(<<"instance_start_time">>, ResultProps)};
+ensure_full_commit(Db) ->
+ couch_db:ensure_full_commit(Db).
+
+enum_docs_since(Pid, DbSource, DbTarget, {StartSeq, RevsCount}) ->
+ case get_doc_info_list(DbSource, StartSeq) of
+ [] ->
+ gen_server:call(Pid, {fin, {StartSeq, RevsCount}}, infinity);
+ DocInfoList ->
+ % UpdateSeqs = [D#doc_info.update_seq || D <- DocInfoList],
+ SrcRevsList = lists:map(fun(SrcDocInfo) ->
+ #doc_info{id=Id,
+ rev=Rev,
+ conflict_revs=Conflicts,
+ deleted_conflict_revs=DelConflicts
+ } = SrcDocInfo,
+ SrcRevs = [Rev | Conflicts] ++ DelConflicts,
+ {Id, SrcRevs}
+ end, DocInfoList),
+ {ok, MissingRevs} = get_missing_revs(DbTarget, SrcRevsList),
+
+ %% do we need to check for success here?
+ [ gen_server:call(Pid, {replicate_doc, Info}, infinity)
+ || Info <- MissingRevs ],
+
+ #doc_info{update_seq=LastSeq} = lists:last(DocInfoList),
+ RevsCount2 = RevsCount + length(SrcRevsList),
+ gen_server:cast(Pid, {increment_update_seq, LastSeq}),
+
+ enum_docs_since(Pid, DbSource, DbTarget, {LastSeq, RevsCount2})
end.
-pmap(F,List) ->
- [wait_result(Worker) || Worker <- [spawn_worker(self(),F,E) || E <- List]].
-
-spawn_worker(Parent, F, E) ->
- erlang:spawn_monitor(fun() -> Parent ! {self(), F(E)} end).
-
-wait_result({Pid,Ref}) ->
- receive
- {'DOWN', Ref, _, _, normal} -> receive {Pid,Result} -> Result end;
- {'DOWN', Ref, _, _, Reason} -> exit(Reason)
-end.
-
-enum_docs_parallel(DbS, DbT, InfoList) ->
- UpdateSeqs = [Seq || {_, Seq, _, _} <- InfoList],
- SaveDocsPid = spawn_link(fun() -> save_docs_buffer(DbT,[],UpdateSeqs) end),
-
- Stats = pmap(fun({Id, Seq, SrcRevs, MissingRevs}) ->
- case MissingRevs of
- [] ->
- SaveDocsPid ! {self(), skip, Seq},
- receive got_it -> ok end,
- [{missing_checked, length(SrcRevs)}];
- _ ->
- {ok, DocResults} = open_doc_revs(DbS, Id, MissingRevs, [latest]),
-
- % only save successful reads
- Docs = [RevDoc || {ok, RevDoc} <- DocResults],
-
- % include update_seq so we save docs in order
- SaveDocsPid ! {self(), docs, {Seq, Docs}},
- receive got_it -> ok end,
- [{missing_checked, length(SrcRevs)},
- {missing_found, length(MissingRevs)},
- {docs_read, length(Docs)}]
- end
- end, InfoList),
+finalize_response(Source, Target, Context, NewSeqNum, Stats) ->
+ [
+ {start_seq, SeqNum},
+ {history, OldRepHistoryProps},
+ {rep_starttime, ReplicationStartTime},
+ {src_starttime, SrcInstanceStartTime},
+ {tgt_starttime, TgtInstanceStartTime}
+ |_] = Context,
- SaveDocsPid ! {self(), shutdown},
-
- {MissingChecked, MissingFound, DocsRead} = lists:foldl(fun(S, {C, F, R}) ->
- C1 = C + proplists:get_value(missing_checked, S, 0),
- F1 = F + proplists:get_value(missing_found, S, 0),
- R1 = R + proplists:get_value(docs_read, S, 0),
- {C1, F1, R1}
- end, {0, 0, 0}, Stats),
-
- receive
- {done, SaveDocsPid, [{<<"docs_written">>, DocsWritten}]} -> ok
- end,
-
- [ {<<"missing_checked">>, MissingChecked},
- {<<"missing_found">>, MissingFound},
- {<<"docs_read">>, DocsRead},
- {<<"docs_written">>, DocsWritten} ].
+ case NewSeqNum == SeqNum andalso OldRepHistoryProps /= [] of
+ true ->
+ % nothing changed, don't record results
+ {ok, {OldRepHistoryProps}};
+ false ->
+ % commit changes to both src and tgt. The src because if changes
+ % we replicated are lost, we'll record the a seq number of ahead
+ % of what was committed and therefore lose future changes with the
+ % same seq nums.
+ {ok, SrcInstanceStartTime2} = ensure_full_commit(Source),
+ {ok, TgtInstanceStartTime2} = ensure_full_commit(Target),
+
+ RecordSeqNum =
+ if SrcInstanceStartTime2 == SrcInstanceStartTime andalso
+ TgtInstanceStartTime2 == TgtInstanceStartTime ->
+ NewSeqNum;
+ true ->
+ ?LOG_INFO("A server has restarted sinced replication start. "
+ "Not recording the new sequence number to ensure the "
+ "replication is redone and documents reexamined.", []),
+ SeqNum
+ end,
+
+ HistEntries =[
+ {
+ [{<<"start_time">>, list_to_binary(ReplicationStartTime)},
+ {<<"end_time">>, list_to_binary(httpd_util:rfc1123_date())},
+ {<<"start_last_seq">>, SeqNum},
+ {<<"end_last_seq">>, NewSeqNum} | Stats]}
+ | proplists:get_value(<<"history">>, OldRepHistoryProps, [])],
+ % something changed, record results
+ NewRepHistory =
+ {
+ [{<<"session_id">>, couch_util:new_uuid()},
+ {<<"source_last_seq">>, RecordSeqNum},
+ {<<"history">>, lists:sublist(HistEntries, 50)}]},
+ {ok, NewRepHistory}
+ end.
fix_url(UrlBin) ->
Url = binary_to_list(UrlBin),
case lists:last(Url) of
- $/ ->
- Url;
- _ ->
- Url ++ "/"
+ $/ -> Url;
+ _ -> Url ++ "/"
end.
-open_http_db(UrlBin, Options) ->
- Headers = proplists:get_value(headers, Options, {[]}),
- {ok, #http_db{uri=fix_url(UrlBin), headers=Headers}}.
-
-open_db(<<"http://", _/binary>>=Url, Options)->
- open_http_db(Url, Options);
-open_db(<<"https://", _/binary>>=Url, Options)->
- open_http_db(Url, Options);
-open_db(DbName, Options)->
- couch_db:open(DbName, Options).
-
-close_db(#http_db{})->
- ok;
-close_db(Db)->
- couch_db:close(Db).
-
get_db_info(#http_db{uri=DbUrl, headers=Headers}) ->
{DbProps} = do_http_request(DbUrl, get, Headers),
{ok, [{list_to_existing_atom(?b2l(K)), V} || {K,V} <- DbProps]};
get_db_info(Db) ->
couch_db:get_db_info(Db).
-
-ensure_full_commit(#http_db{uri=DbUrl, headers=Headers}) ->
- {ResultProps} = do_http_request(DbUrl ++ "_ensure_full_commit", post, Headers, true),
- true = proplists:get_value(<<"ok">>, ResultProps),
- {ok, proplists:get_value(<<"instance_start_time">>, ResultProps)};
-ensure_full_commit(Db) ->
- couch_db:ensure_full_commit(Db).
-
-
get_doc_info_list(#http_db{uri=DbUrl, headers=Headers}, StartSeq) ->
Url = DbUrl ++ "_all_docs_by_seq?limit=100&startkey="
++ integer_to_list(StartSeq),
@@ -361,82 +509,26 @@ get_doc_info_list(DbSource, StartSeq) ->
end, {0, []}),
lists:reverse(DocInfoList).
-enum_docs_since(DbSource, DbTarget, StartSeq, InAcc) ->
- DocInfoList = get_doc_info_list(DbSource, StartSeq),
- case DocInfoList of
- [] ->
- {ok, InAcc};
- _ ->
- UpdateSeqs = [D#doc_info.update_seq || D <- DocInfoList],
- SrcRevsList = lists:map(fun(SrcDocInfo) ->
- #doc_info{id=Id,
- rev=Rev,
- conflict_revs=Conflicts,
- deleted_conflict_revs=DelConflicts
- } = SrcDocInfo,
- SrcRevs = [Rev | Conflicts] ++ DelConflicts,
- {Id, SrcRevs}
- end, DocInfoList),
- {ok, MissingRevsList} = get_missing_revs(DbTarget, SrcRevsList),
- InfoList = lists:map(fun({{Id, SrcRevs}, Seq}) ->
- MissingRevs = proplists:get_value(Id, MissingRevsList, []),
- {Id, Seq, SrcRevs, MissingRevs}
- end, lists:zip(SrcRevsList, UpdateSeqs)),
- Stats = enum_docs_parallel(DbSource, DbTarget, InfoList),
- OldStats = element(2, InAcc),
- TotalStats = [
- {<<"missing_checked">>,
- proplists:get_value(<<"missing_checked">>, OldStats, 0) +
- proplists:get_value(<<"missing_checked">>, Stats, 0)},
- {<<"missing_found">>,
- proplists:get_value(<<"missing_found">>, OldStats, 0) +
- proplists:get_value(<<"missing_found">>, Stats, 0)},
- {<<"docs_read">>,
- proplists:get_value(<<"docs_read">>, OldStats, 0) +
- proplists:get_value(<<"docs_read">>, Stats, 0)},
- {<<"docs_written">>,
- proplists:get_value(<<"docs_written">>, OldStats, 0) +
- proplists:get_value(<<"docs_written">>, Stats, 0)}
- ],
-
- #doc_info{update_seq=LastSeq} = lists:last(DocInfoList),
- enum_docs_since(DbSource, DbTarget, LastSeq, {LastSeq, TotalStats})
- end.
-
get_missing_revs(#http_db{uri=DbUrl, headers=Headers}, DocIdRevsList) ->
{ResponseMembers} = do_http_request(DbUrl ++ "_missing_revs", post, Headers,
{DocIdRevsList}),
- {DocMissingRevsList} = proplists:get_value(<<"missing_revs">>, ResponseMembers),
- {ok, DocMissingRevsList};
+ {MissingRevs} = proplists:get_value(<<"missing_revs">>, ResponseMembers),
+ {ok, MissingRevs};
get_missing_revs(Db, DocId) ->
couch_db:get_missing_revs(Db, DocId).
+open_http_db(UrlBin, Options) ->
+ Headers = proplists:get_value(headers, Options, {[]}),
+ {ok, #http_db{uri=fix_url(UrlBin), headers=Headers}}.
+
+open_db(<<"http://", _/binary>>=Url, Options)->
+ open_http_db(Url, Options);
+open_db(<<"https://", _/binary>>=Url, Options)->
+ open_http_db(Url, Options);
+open_db(DbName, Options)->
+ couch_db:open(DbName, Options).
-update_doc(#http_db{uri=DbUrl, headers=Headers}, #doc{id=DocId}=Doc, Options) ->
- [] = Options,
- Url = DbUrl ++ url_encode(DocId),
- {ResponseMembers} = do_http_request(Url, put, Headers,
- couch_doc:to_json_obj(Doc, [revs,attachments])),
- RevId = proplists:get_value(<<"_rev">>, ResponseMembers),
- {ok, RevId};
-update_doc(Db, Doc, Options) ->
- couch_db:update_doc(Db, Doc, Options).
-
-update_docs(_, [], _, _) ->
- ok;
-update_docs(#http_db{uri=DbUrl, headers=Headers}, Docs, [], NewEdits) ->
- JsonDocs = [couch_doc:to_json_obj(Doc, [revs,attachments]) || Doc <- Docs],
- {Returned} =
- do_http_request(DbUrl ++ "_bulk_docs", post, Headers,
- {[{new_edits, NewEdits}, {docs, JsonDocs}]}),
- true = proplists:get_value(<<"ok">>, Returned),
- ok;
-update_docs(Db, Docs, Options, NewEdits) ->
- couch_db:update_docs(Db, Docs, Options, NewEdits).
-
-
-open_doc(#http_db{uri=DbUrl, headers=Headers}, DocId, Options) ->
- [] = Options,
+open_doc(#http_db{uri=DbUrl, headers=Headers}, DocId, []) ->
case do_http_request(DbUrl ++ url_encode(DocId), get, Headers) of
{[{<<"error">>, ErrId}, {<<"reason">>, Reason}]} ->
{couch_util:to_existing_atom(ErrId), Reason};
@@ -446,16 +538,8 @@ open_doc(#http_db{uri=DbUrl, headers=Headers}, DocId, Options) ->
open_doc(Db, DocId, Options) ->
couch_db:open_doc(Db, DocId, Options).
-
-open_doc_revs(#http_db{uri=DbUrl, headers=Headers}, DocId, Revs, Options) ->
- QueryOptionStrs =
- lists:map(fun(latest) ->
- % latest is only option right now
- "latest=true"
- end, Options),
-
- BaseUrl = DbUrl ++ url_encode(DocId) ++ "?" ++ couch_util:implode(
- ["revs=true", "attachments=true"] ++ QueryOptionStrs, "&"),
+open_doc_revs(#http_db{uri=DbUrl, headers=Headers} = DbS, DocId, Revs, _Opts) ->
+ BaseUrl = DbUrl ++ url_encode(DocId) ++ "?revs=true&latest=true",
%% MochiWeb expects URLs < 8KB long, so maybe split into multiple requests
MaxN = trunc((8192 - length(BaseUrl))/14),
@@ -477,15 +561,66 @@ open_doc_revs(#http_db{uri=DbUrl, headers=Headers}, DocId, Revs, Options) ->
lists:flatten(?JSON_ENCODE(lists:reverse(Rest))), get, Headers)
end,
- Results =
- lists:map(
- fun({[{<<"missing">>, Rev}]}) ->
- {{not_found, missing}, Rev};
- ({[{<<"ok">>, JsonDoc}]}) ->
- {ok, couch_doc:from_json_obj(JsonDoc)}
- end, JsonResults),
+ Results =
+ lists:map(fun({[{<<"missing">>, Rev}]}) ->
+ {{not_found, missing}, Rev};
+ ({[{<<"ok">>, JsonDoc}]}) ->
+ #doc{id=Id, attachments=Attach} = Doc = couch_doc:from_json_obj(JsonDoc),
+ Attach2 = [attachment_stub_converter(DbS,Id,A) || A <- Attach],
+ {ok, Doc#doc{attachments=Attach2}}
+ end, JsonResults),
{ok, Results};
open_doc_revs(Db, DocId, Revs, Options) ->
couch_db:open_doc_revs(Db, DocId, Revs, Options).
+update_docs(_, [], _, _) ->
+ ok;
+update_docs(#http_db{uri=DbUrl, headers=Headers}, Docs, [], NewEdits) ->
+ JsonDocs = [couch_doc:to_json_obj(Doc, [revs,attachments]) || Doc <- Docs],
+ {Returned} =
+ do_http_request(DbUrl ++ "_bulk_docs", post, Headers,
+ {[{new_edits, NewEdits}, {docs, JsonDocs}]}),
+ true = proplists:get_value(<<"ok">>, Returned),
+ ok;
+update_docs(Db, Docs, Options, NewEdits) ->
+ couch_db:update_docs(Db, Docs, Options, NewEdits).
+
+update_local_doc(#http_db{uri=DbUrl, headers=Headers}, #doc{id=DocId}=Doc, []) ->
+ Url = DbUrl ++ url_encode(DocId),
+ {ResponseMembers} = do_http_request(Url, put, Headers,
+ couch_doc:to_json_obj(Doc, [revs,attachments])),
+ RevId = proplists:get_value(<<"_rev">>, ResponseMembers),
+ {ok, RevId};
+update_local_doc(Db, Doc, Options) ->
+ couch_db:update_doc(Db, Doc, Options).
+up_to_date(#http_db{}, _Seq) ->
+ true;
+up_to_date(Source, Seq) ->
+ {ok, NewDb} = couch_db:open(Source#db.name, []),
+ T = NewDb#db.update_seq == Seq,
+ couch_db:close(NewDb),
+ T.
+
+url_encode(Bin) when is_binary(Bin) ->
+ url_encode(binary_to_list(Bin));
+url_encode([H|T]) ->
+ if
+ H >= $a, $z >= H ->
+ [H|url_encode(T)];
+ H >= $A, $Z >= H ->
+ [H|url_encode(T)];
+ H >= $0, $9 >= H ->
+ [H|url_encode(T)];
+ H == $_; H == $.; H == $-; H == $: ->
+ [H|url_encode(T)];
+ true ->
+ case lists:flatten(io_lib:format("~.16.0B", [H])) of
+ [X, Y] ->
+ [$%, X, Y | url_encode(T)];
+ [X] ->
+ [$%, $0, X | url_encode(T)]
+ end
+ end;
+url_encode([]) ->
+ [].
diff --git a/src/couchdb/couch_server_sup.erl b/src/couchdb/couch_server_sup.erl
index 942de092..15867ad7 100644
--- a/src/couchdb/couch_server_sup.erl
+++ b/src/couchdb/couch_server_sup.erl
@@ -133,6 +133,12 @@ start_primary_services() ->
brutal_kill,
worker,
[couch_log]},
+ {couch_replication_supervisor,
+ {couch_rep_sup, start_link, []},
+ permanent,
+ infinity,
+ supervisor,
+ [couch_rep_sup]},
{couch_task_status,
{couch_task_status, start_link, []},
permanent,
@@ -150,7 +156,9 @@ start_primary_services() ->
permanent,
brutal_kill,
supervisor,
- dynamic}]}).
+ dynamic}
+ ]
+ }).
start_secondary_services() ->
DaemonChildSpecs = [
diff --git a/src/couchdb/couch_util.erl b/src/couchdb/couch_util.erl
index d0f0c0b6..21d6eb4c 100644
--- a/src/couchdb/couch_util.erl
+++ b/src/couchdb/couch_util.erl
@@ -184,19 +184,18 @@ should_flush() ->
should_flush(?FLUSH_MAX_MEM).
should_flush(MemThreshHold) ->
- case process_info(self(), memory) of
- {memory, Mem} when Mem > 2*MemThreshHold ->
+ {memory, ProcMem} = process_info(self(), memory),
+ BinMem = lists:foldl(fun({_Id, Size, _NRefs}, Acc) -> Size+Acc end,
+ 0, element(2,process_info(self(), binary))),
+ if ProcMem+BinMem > 2*MemThreshHold ->
garbage_collect(),
- case process_info(self(), memory) of
- {memory, Mem} when Mem > MemThreshHold ->
+ {memory, ProcMem2} = process_info(self(), memory),
+ BinMem2 = lists:foldl(fun({_Id, Size, _NRefs}, Acc) -> Size+Acc end,
+ 0, element(2,process_info(self(), binary))),
+ if ProcMem2+BinMem2 > MemThreshHold ->
true;
- _ ->
- false
- end;
- _ ->
- false
- end.
-
+ true -> false end;
+ true -> false end.
%%% Purpose : Base 64 encoding and decoding.