summaryrefslogtreecommitdiff
path: root/src/couchdb/couch_rep.erl
diff options
context:
space:
mode:
authorJohn Christopher Anderson <jchris@apache.org>2009-03-13 22:15:34 +0000
committerJohn Christopher Anderson <jchris@apache.org>2009-03-13 22:15:34 +0000
commit9007e2d21dea8b0185c0096b30364a8ee40a3867 (patch)
tree7d8dacb2c8cd619f18dfab8fdb40d146ac28c85a /src/couchdb/couch_rep.erl
parent65608e14e8911b33c30178d717d745edc9f66c17 (diff)
Commit Damien's rep_security branch to trunk.
Changes bulk_docs conflict checking. Breaks file format, see mailing list for data upgrade procedure, or http://wiki.apache.org/couchdb/Breaking_changes git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@753448 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src/couchdb/couch_rep.erl')
-rw-r--r--src/couchdb/couch_rep.erl365
1 files changed, 202 insertions, 163 deletions
diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl
index 89d40be3..3647f6db 100644
--- a/src/couchdb/couch_rep.erl
+++ b/src/couchdb/couch_rep.erl
@@ -15,11 +15,11 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
--export([replicate/3]).
+-export([replicate/2]).
-include_lib("couch_db.hrl").
-%% @spec replicate(Source::binary(), Target::binary(), Options::proplist()) ->
+%% @spec replicate(Source::binary(), Target::binary()) ->
%% {ok, Stats} | {error, Reason}
%% @doc Triggers a replication. Stats is a JSON Object with the following
%% keys: session_id (UUID), source_last_seq (integer), and history (array).
@@ -30,26 +30,29 @@
%% The supervisor will try to restart the replication in case of any error
%% other than shutdown. Just call this function again to listen for the
%% result of the retry.
-replicate(Source, Target, Options) ->
- Id = <<Source/binary, ":", Target/binary>>,
- Args = [?MODULE, [Source,Target,Options], []],
+replicate(Source, Target) ->
- Replicator = {Id,
+ {ok, HostName} = inet:gethostname(),
+ RepId = couch_util:to_hex(
+ erlang:md5(term_to_binary([HostName, Source, Target]))),
+ Args = [?MODULE, [RepId, Source,Target], []],
+
+ Replicator = {RepId,
{gen_server, start_link, Args},
transient,
- 10000,
+ 1,
worker,
[?MODULE]
},
Server = case supervisor:start_child(couch_rep_sup, Replicator) of
{ok, Pid} ->
- ?LOG_INFO("starting new replication ~p at ~p", [Id, Pid]),
+ ?LOG_INFO("starting new replication ~p at ~p", [RepId, Pid]),
Pid;
{error, already_present} ->
- case supervisor:restart_child(couch_rep_sup, Id) of
+ case supervisor:restart_child(couch_rep_sup, RepId) of
{ok, Pid} ->
- ?LOG_INFO("starting replication ~p at ~p", [Id, Pid]),
+ ?LOG_INFO("starting replication ~p at ~p", [RepId, Pid]),
Pid;
{error, running} ->
%% this error occurs if multiple replicators are racing
@@ -57,16 +60,16 @@ replicate(Source, Target, Options) ->
%% the Pid by calling start_child again.
{error, {already_started, Pid}} =
supervisor:start_child(couch_rep_sup, Replicator),
- ?LOG_INFO("replication ~p already running at ~p", [Id, Pid]),
+ ?LOG_INFO("replication ~p already running at ~p", [RepId, Pid]),
Pid
end;
{error, {already_started, Pid}} ->
- ?LOG_INFO("replication ~p already running at ~p", [Id, Pid]),
+ ?LOG_INFO("replication ~p already running at ~p", [RepId, Pid]),
Pid
end,
case gen_server:call(Server, get_result, infinity) of
- retry -> replicate(Source, Target, Options);
+ retry -> replicate(Source, Target);
Else -> Else
end.
@@ -79,6 +82,7 @@ replicate(Source, Target, Options) ->
headers
}).
+
-record(state, {
context,
current_seq,
@@ -90,18 +94,14 @@ replicate(Source, Target, Options) ->
listeners = []
}).
-init([Source, Target, Options]) ->
+
+init([RepId, Source, Target]) ->
process_flag(trap_exit, true),
- {ok, DbSrc} =
- open_db(Source, proplists:get_value(source_options, Options, [])),
- {ok, DbTgt} =
- open_db(Target, proplists:get_value(target_options, Options, [])),
+ {ok, DbSrc, SrcName} = open_db(Source),
+ {ok, DbTgt, TgtName} = open_db(Target),
- {ok, Host} = inet:gethostname(),
- HostBin = list_to_binary(Host),
- DocKey = <<?LOCAL_DOC_PREFIX, HostBin/binary, ":", Source/binary, ":",
- Target/binary>>,
+ DocKey = ?l2b(?LOCAL_DOC_PREFIX ++ RepId),
{ok, InfoSrc} = get_db_info(DbSrc),
{ok, InfoTgt} = get_db_info(DbTgt),
@@ -110,49 +110,49 @@ init([Source, Target, Options]) ->
SrcInstanceStartTime = proplists:get_value(instance_start_time, InfoSrc),
TgtInstanceStartTime = proplists:get_value(instance_start_time, InfoTgt),
- case proplists:get_value(full, Options, false)
- orelse proplists:get_value("full", Options, false) of
+ RepRecDocSrc =
+ case open_doc(DbSrc, DocKey, []) of
+ {ok, SrcDoc} ->
+ ?LOG_DEBUG("Found existing replication record on source", []),
+ SrcDoc;
+ _ -> #doc{id=DocKey}
+ end,
+
+ RepRecDocTgt =
+ case open_doc(DbTgt, DocKey, []) of
+ {ok, TgtDoc} ->
+ ?LOG_DEBUG("Found existing replication record on target", []),
+ TgtDoc;
+ _ -> #doc{id=DocKey}
+ end,
+
+ #doc{body={RepRecProps}} = RepRecDocSrc,
+ #doc{body={RepRecPropsTgt}} = RepRecDocTgt,
+
+ case proplists:get_value(<<"session_id">>, RepRecProps) ==
+ proplists:get_value(<<"session_id">>, RepRecPropsTgt) of
true ->
- RepRecSrc = RepRecTgt = #doc{id=DocKey};
+ % if the records have the same session id,
+ % then we have a valid replication history
+ OldSeqNum = proplists:get_value(<<"source_last_seq">>, RepRecProps, 0),
+ OldHistory = proplists:get_value(<<"history">>, RepRecProps, []);
false ->
- RepRecSrc = case open_doc(DbSrc, DocKey, []) of
- {ok, SrcDoc} ->
- ?LOG_DEBUG("Found existing replication record on source", []),
- SrcDoc;
- _ -> #doc{id=DocKey}
- end,
-
- RepRecTgt = case open_doc(DbTgt, DocKey, []) of
- {ok, TgtDoc} ->
- ?LOG_DEBUG("Found existing replication record on target", []),
- TgtDoc;
- _ -> #doc{id=DocKey}
- end
- end,
-
- #doc{body={OldRepHistoryProps}} = RepRecSrc,
- #doc{body={OldRepHistoryPropsTrg}} = RepRecTgt,
-
- SeqNum = case OldRepHistoryProps == OldRepHistoryPropsTrg of
- true ->
- % if the records are identical, then we have a valid replication history
- proplists:get_value(<<"source_last_seq">>, OldRepHistoryProps, 0);
- false ->
- ?LOG_INFO("Replication records differ. "
+ ?LOG_INFO("Replication records differ. "
"Performing full replication instead of incremental.", []),
- ?LOG_DEBUG("Record on source:~p~nRecord on target:~p~n",
- [OldRepHistoryProps, OldRepHistoryPropsTrg]),
- 0
+ ?LOG_DEBUG("Record on source:~p~nRecord on target:~p~n",
+ [RepRecProps, RepRecPropsTgt]),
+ OldSeqNum = 0,
+ OldHistory = []
end,
Context = [
- {start_seq, SeqNum},
- {history, OldRepHistoryProps},
+ {start_seq, OldSeqNum},
+ {history, OldHistory},
{rep_starttime, ReplicationStartTime},
{src_starttime, SrcInstanceStartTime},
{tgt_starttime, TgtInstanceStartTime},
- {src_record, RepRecSrc},
- {tgt_record, RepRecTgt}
+ {src_record, RepRecDocSrc},
+ {tgt_record, RepRecDocTgt}
],
Stats = ets:new(replication_stats, [set, private]),
@@ -160,16 +160,17 @@ init([Source, Target, Options]) ->
ets:insert(Stats, {missing_revs, 0}),
ets:insert(Stats, {docs_read, 0}),
ets:insert(Stats, {docs_written, 0}),
+ ets:insert(Stats, {doc_write_failures, 0}),
- couch_task_status:add_task("Replication", <<Source/binary, " -> ",
- Target/binary>>, "Starting"),
+ couch_task_status:add_task("Replication", <<SrcName/binary, " -> ",
+ TgtName/binary>>, "Starting"),
Parent = self(),
- Pid = spawn_link(fun() -> enum_docs_since(Parent,DbSrc,DbTgt,{SeqNum,0}) end),
+ Pid = spawn_link(fun() -> enum_docs_since(Parent,DbSrc,DbTgt,{OldSeqNum,0}) end),
State = #state{
context = Context,
- current_seq = SeqNum,
+ current_seq = OldSeqNum,
enum_pid = Pid,
source = DbSrc,
target = DbTgt,
@@ -178,7 +179,6 @@ init([Source, Target, Options]) ->
{ok, State}.
-
handle_call(get_result, From, #state{listeners=L} = State) ->
{noreply, State#state{listeners=[From|L]}};
@@ -191,7 +191,7 @@ handle_call({replicate_doc, {Id, Revs}}, {Pid,_}, #state{enum_pid=Pid} = State)
target = Target,
stats = Stats
} = State,
-
+
ets:update_counter(Stats, missing_revs, length(Revs)),
%% get document(s)
@@ -203,8 +203,11 @@ handle_call({replicate_doc, {Id, Revs}}, {Pid,_}, #state{enum_pid=Pid} = State)
{NewBuffer, NewContext} = case couch_util:should_flush() of
true ->
Docs2 = lists:flatten([Docs|Buffer]),
- ok = update_docs(Target, Docs2, [], false),
- ets:update_counter(Stats, docs_written, length(Docs2)),
+ {ok, Errors} = update_docs(Target, Docs2, [], replicated_changes),
+ dump_update_errors(Errors),
+ ets:update_counter(Stats, doc_write_failures, length(Errors)),
+ ets:update_counter(Stats, docs_written, length(Docs2) -
+ length(Errors)),
{ok, _, Ctxt} = do_checkpoint(Source, Target, Context, Seq, Stats),
{[], Ctxt};
false ->
@@ -255,8 +258,11 @@ terminate(normal, State) ->
stats = Stats
} = State,
- ok = update_docs(Target, lists:flatten(Buffer), [], false),
- ets:update_counter(Stats, docs_written, lists:flatlength(Buffer)),
+ {ok, Errors} = update_docs(Target, lists:flatten(Buffer), [], replicated_changes),
+ dump_update_errors(Errors),
+ ets:update_counter(Stats, doc_write_failures, length(Errors)),
+ ets:update_counter(Stats, docs_written, lists:flatlength(Buffer) -
+ length(Errors)),
couch_task_status:update("Finishing"),
@@ -264,9 +270,12 @@ terminate(normal, State) ->
ets:delete(Stats),
close_db(Target),
- %% reply to original requester
- [Original|Rest] = Listeners,
- gen_server:reply(Original, {ok, NewRepHistory}),
+ case Listeners of
+ [Original|Rest] ->
+ %% reply to original requester
+ gen_server:reply(Original, {ok, NewRepHistory});
+ Rest -> ok
+ end,
%% maybe trigger another replication. If this replicator uses a local
%% source Db, changes to that Db since we started will not be included in
@@ -304,6 +313,16 @@ code_change(_OldVsn, State, _Extra) ->
%% internal functions
%%=============================================================================
+
+% we should probably write these to a special replication log
+% or have a callback where the caller decides what to do with replication
+% errors.
+dump_update_errors([]) -> ok;
+dump_update_errors([{{Id, Rev}, Error}|Rest]) ->
+ ?LOG_INFO("error replicating document \"~s\" rev \"~s\":~p",
+ [Id, couch_doc:rev_to_str(Rev), Error]),
+ dump_update_errors(Rest).
+
attachment_loop(ReqId) ->
couch_util:should_flush(),
receive
@@ -354,6 +373,16 @@ attachment_stub_converter(DbS, Id, {Name, {stub, Type, Length}}) ->
end,
{Name, {Type, {RcvFun, Length}}}.
+
+open_db({remote, Url, Headers})->
+ {ok, #http_db{uri=?b2l(Url), headers=Headers}, Url};
+open_db({local, DbName, UserCtx})->
+ case couch_db:open(DbName, [{user_ctx, UserCtx}]) of
+ {ok, Db} -> {ok, Db, DbName};
+ Error -> Error
+ end.
+
+
close_db(#http_db{})->
ok;
close_db(Db)->
@@ -362,27 +391,38 @@ close_db(Db)->
do_checkpoint(Source, Target, Context, NewSeqNum, Stats) ->
?LOG_INFO("recording a checkpoint at source update_seq ~p", [NewSeqNum]),
[
- {start_seq, SeqNum},
- {history, OldRepHistoryProps},
+ {start_seq, StartSeqNum},
+ {history, OldHistory},
{rep_starttime, ReplicationStartTime},
{src_starttime, SrcInstanceStartTime},
{tgt_starttime, TgtInstanceStartTime},
- {src_record, RepRecSrc},
- {tgt_record, RepRecTgt}
+ {src_record, #doc{body={LastRepRecord}}=RepRecDocSrc},
+ {tgt_record, RepRecDocTgt}
] = Context,
- NewHistory = case NewSeqNum == SeqNum andalso OldRepHistoryProps /= [] of
+ case NewSeqNum == StartSeqNum andalso OldHistory /= [] of
true ->
% nothing changed, don't record results
- {OldRepHistoryProps};
+ {ok, {[{<<"no_changes">>, true} | LastRepRecord]}, Context};
false ->
+ % something changed, record results for incremental replication,
+
% commit changes to both src and tgt. The src because if changes
- % we replicated are lost, we'll record the a seq number of ahead
- % of what was committed and therefore lose future changes with the
- % same seq nums.
- {ok, SrcInstanceStartTime2} = ensure_full_commit(Source),
+ % we replicated are lost, we'll record the a seq number ahead
+ % of what was committed. If those changes are lost and the seq number
+ % reverts to a previous committed value, we will skip future changes
+ % when new doc updates are given our already replicated seq nums.
+
+ % commit the src async
+ ParentPid = self(),
+ SrcCommitPid = spawn_link(fun() ->
+ ParentPid ! {self(), ensure_full_commit(Source)} end),
+
+ % commit tgt sync
{ok, TgtInstanceStartTime2} = ensure_full_commit(Target),
+ receive {SrcCommitPid, {ok, SrcInstanceStartTime2}} -> ok end,
+
RecordSeqNum =
if SrcInstanceStartTime2 == SrcInstanceStartTime andalso
TgtInstanceStartTime2 == TgtInstanceStartTime ->
@@ -391,60 +431,57 @@ do_checkpoint(Source, Target, Context, NewSeqNum, Stats) ->
?LOG_INFO("A server has restarted sinced replication start. "
"Not recording the new sequence number to ensure the "
"replication is redone and documents reexamined.", []),
- SeqNum
+ StartSeqNum
end,
- %% format replication history
- JsonStats = [
+ NewHistoryEntry = {
+ [{<<"start_time">>, list_to_binary(ReplicationStartTime)},
+ {<<"end_time">>, list_to_binary(httpd_util:rfc1123_date())},
+ {<<"start_last_seq">>, StartSeqNum},
+ {<<"end_last_seq">>, NewSeqNum},
{<<"missing_checked">>, ets:lookup_element(Stats, total_revs, 2)},
{<<"missing_found">>, ets:lookup_element(Stats, missing_revs, 2)},
{<<"docs_read">>, ets:lookup_element(Stats, docs_read, 2)},
- {<<"docs_written">>, ets:lookup_element(Stats, docs_written, 2)}
+ {<<"docs_written">>, ets:lookup_element(Stats, docs_written, 2)},
+ {<<"doc_write_failures">>, ets:lookup_element(Stats, doc_write_failures, 2)}
+ ]},
+ % limit history to 50 entries
+ HistEntries =lists:sublist([NewHistoryEntry | OldHistory], 50),
+
+ NewRepHistory =
+ {[{<<"session_id">>, couch_util:new_uuid()},
+ {<<"source_last_seq">>, RecordSeqNum},
+ {<<"history">>, HistEntries}]},
+
+ {ok, {SrcRevPos,SrcRevId}} = update_doc(Source,
+ RepRecDocSrc#doc{body=NewRepHistory}, []),
+ {ok, {TgtRevPos,TgtRevId}} = update_doc(Target,
+ RepRecDocTgt#doc{body=NewRepHistory}, []),
+
+ NewContext = [
+ {start_seq, StartSeqNum},
+ {history, OldHistory},
+ {rep_starttime, ReplicationStartTime},
+ {src_starttime, SrcInstanceStartTime},
+ {tgt_starttime, TgtInstanceStartTime},
+ {src_record, RepRecDocSrc#doc{revs={SrcRevPos,[SrcRevId]}}},
+ {tgt_record, RepRecDocTgt#doc{revs={TgtRevPos,[TgtRevId]}}}
],
-
- HistEntries =[
- {
- [{<<"start_time">>, list_to_binary(ReplicationStartTime)},
- {<<"end_time">>, list_to_binary(httpd_util:rfc1123_date())},
- {<<"start_last_seq">>, SeqNum},
- {<<"end_last_seq">>, NewSeqNum} | JsonStats]}
- | proplists:get_value(<<"history">>, OldRepHistoryProps, [])],
- % something changed, record results
- {[
- {<<"session_id">>, couch_util:new_uuid()},
- {<<"source_last_seq">>, RecordSeqNum},
- {<<"history">>, lists:sublist(HistEntries, 50)}
- ]}
- end,
- %% update local documents
- RepRecSrc = proplists:get_value(src_record, Context),
- RepRecTgt = proplists:get_value(tgt_record, Context),
- {ok, TgtRev} = update_local_doc(Target, RepRecTgt#doc{body=NewHistory}, []),
- {ok, SrcRev} = update_local_doc(Source, RepRecSrc#doc{body=NewHistory}, []),
+ {ok, NewRepHistory, NewContext}
- NewContext = [
- {start_seq, SeqNum},
- {history, OldRepHistoryProps},
- {rep_starttime, ReplicationStartTime},
- {src_starttime, SrcInstanceStartTime},
- {tgt_starttime, TgtInstanceStartTime},
- {src_record, RepRecSrc#doc{revs=[SrcRev]}},
- {tgt_record, RepRecTgt#doc{revs=[TgtRev]}}
- ],
-
- {ok, NewHistory, NewContext}.
+ end.
do_http_request(Url, Action, Headers) ->
do_http_request(Url, Action, Headers, []).
do_http_request(Url, Action, Headers, JsonBody) ->
- do_http_request(?b2l(?l2b(Url)), Action, Headers, JsonBody, 10).
+ do_http_request(Url, Action, Headers, JsonBody, 10).
do_http_request(Url, Action, _Headers, _JsonBody, 0) ->
?LOG_ERROR("couch_rep HTTP ~p request failed after 10 retries: ~s",
[Action, Url]),
- exit({http_request_failed, ?l2b(Url)});
+ exit({http_request_failed, Url});
do_http_request(Url, Action, Headers, JsonBody, Retries) ->
?LOG_DEBUG("couch_rep HTTP ~p request: ~s", [Action, Url]),
Body =
@@ -498,7 +535,6 @@ enum_docs_since(Pid, DbSource, DbTarget, {StartSeq, RevsCount}) ->
[] ->
gen_server:call(Pid, {fin, {StartSeq, RevsCount}}, infinity);
DocInfoList ->
- % UpdateSeqs = [D#doc_info.update_seq || D <- DocInfoList],
SrcRevsList = lists:map(fun(SrcDocInfo) ->
#doc_info{id=Id,
rev=Rev,
@@ -521,13 +557,8 @@ enum_docs_since(Pid, DbSource, DbTarget, {StartSeq, RevsCount}) ->
enum_docs_since(Pid, DbSource, DbTarget, {LastSeq, RevsCount2})
end.
-fix_url(UrlBin) ->
- Url = binary_to_list(UrlBin),
- case lists:last(Url) of
- $/ -> Url;
- _ -> Url ++ "/"
- end.
+
get_db_info(#http_db{uri=DbUrl, headers=Headers}) ->
{DbProps} = do_http_request(DbUrl, get, Headers),
{ok, [{list_to_existing_atom(?b2l(K)), V} || {K,V} <- DbProps]};
@@ -542,12 +573,12 @@ get_doc_info_list(#http_db{uri=DbUrl, headers=Headers}, StartSeq) ->
{RowValueProps} = proplists:get_value(<<"value">>, RowInfoList),
#doc_info{
id=proplists:get_value(<<"id">>, RowInfoList),
- rev=proplists:get_value(<<"rev">>, RowValueProps),
+ rev=couch_doc:parse_rev(proplists:get_value(<<"rev">>, RowValueProps)),
update_seq = proplists:get_value(<<"key">>, RowInfoList),
conflict_revs =
- proplists:get_value(<<"conflicts">>, RowValueProps, []),
+ couch_doc:parse_revs(proplists:get_value(<<"conflicts">>, RowValueProps, [])),
deleted_conflict_revs =
- proplists:get_value(<<"deleted_conflicts">>, RowValueProps, []),
+ couch_doc:parse_revs(proplists:get_value(<<"deleted_conflicts">>, RowValueProps, [])),
deleted = proplists:get_value(<<"deleted">>, RowValueProps, false)
}
end, proplists:get_value(<<"rows">>, Results));
@@ -561,25 +592,18 @@ get_doc_info_list(DbSource, StartSeq) ->
lists:reverse(DocInfoList).
get_missing_revs(#http_db{uri=DbUrl, headers=Headers}, DocIdRevsList) ->
+ DocIdRevsList2 = [{Id, couch_doc:rev_to_strs(Revs)} || {Id, Revs} <- DocIdRevsList],
{ResponseMembers} = do_http_request(DbUrl ++ "_missing_revs", post, Headers,
- {DocIdRevsList}),
- {MissingRevs} = proplists:get_value(<<"missing_revs">>, ResponseMembers),
- {ok, MissingRevs};
+ {DocIdRevsList2}),
+ {DocMissingRevsList} = proplists:get_value(<<"missing_revs">>, ResponseMembers),
+ DocMissingRevsList2 = [{Id, couch_doc:parse_revs(MissingRevStrs)} || {Id, MissingRevStrs} <- DocMissingRevsList],
+ {ok, DocMissingRevsList2};
get_missing_revs(Db, DocId) ->
couch_db:get_missing_revs(Db, DocId).
-open_http_db(UrlBin, Options) ->
- Headers = proplists:get_value(headers, Options, {[]}),
- {ok, #http_db{uri=fix_url(UrlBin), headers=Headers}}.
-
-open_db(<<"http://", _/binary>>=Url, Options)->
- open_http_db(Url, Options);
-open_db(<<"https://", _/binary>>=Url, Options)->
- open_http_db(Url, Options);
-open_db(DbName, Options)->
- couch_db:open(DbName, Options).
-
-open_doc(#http_db{uri=DbUrl, headers=Headers}, DocId, []) ->
+
+open_doc(#http_db{uri=DbUrl, headers=Headers}, DocId, Options) ->
+ [] = Options,
case do_http_request(DbUrl ++ url_encode(DocId), get, Headers) of
{[{<<"error">>, ErrId}, {<<"reason">>, Reason}]} ->
{couch_util:to_existing_atom(ErrId), Reason};
@@ -589,7 +613,9 @@ open_doc(#http_db{uri=DbUrl, headers=Headers}, DocId, []) ->
open_doc(Db, DocId, Options) ->
couch_db:open_doc(Db, DocId, Options).
-open_doc_revs(#http_db{uri=DbUrl, headers=Headers} = DbS, DocId, Revs, _Opts) ->
+open_doc_revs(#http_db{uri=DbUrl, headers=Headers} = DbS, DocId, Revs0,
+ [latest]) ->
+ Revs = couch_doc:rev_to_strs(Revs0),
BaseUrl = DbUrl ++ url_encode(DocId) ++ "?revs=true&latest=true",
%% MochiWeb expects URLs < 8KB long, so maybe split into multiple requests
@@ -612,39 +638,52 @@ open_doc_revs(#http_db{uri=DbUrl, headers=Headers} = DbS, DocId, Revs, _Opts) ->
lists:flatten(?JSON_ENCODE(lists:reverse(Rest))), get, Headers)
end,
- Results =
- lists:map(fun({[{<<"missing">>, Rev}]}) ->
- {{not_found, missing}, Rev};
- ({[{<<"ok">>, JsonDoc}]}) ->
+ Results =
+ lists:map(
+ fun({[{<<"missing">>, Rev}]}) ->
+ {{not_found, missing}, couch_doc:parse_rev(Rev)};
+ ({[{<<"ok">>, JsonDoc}]}) ->
#doc{id=Id, attachments=Attach} = Doc = couch_doc:from_json_obj(JsonDoc),
Attach2 = [attachment_stub_converter(DbS,Id,A) || A <- Attach],
{ok, Doc#doc{attachments=Attach2}}
- end, JsonResults),
+ end, JsonResults),
{ok, Results};
open_doc_revs(Db, DocId, Revs, Options) ->
couch_db:open_doc_revs(Db, DocId, Revs, Options).
-update_docs(_, [], _, _) ->
- ok;
-update_docs(#http_db{uri=DbUrl, headers=Headers}, Docs, [], NewEdits) ->
- JsonDocs = [couch_doc:to_json_obj(Doc, [revs,attachments]) || Doc <- Docs],
- {Returned} =
- do_http_request(DbUrl ++ "_bulk_docs", post, Headers,
- {[{new_edits, NewEdits}, {docs, JsonDocs}]}),
- true = proplists:get_value(<<"ok">>, Returned),
- ok;
-update_docs(Db, Docs, Options, NewEdits) ->
- couch_db:update_docs(Db, Docs, Options, NewEdits).
-update_local_doc(#http_db{uri=DbUrl, headers=Headers}, #doc{id=DocId}=Doc, []) ->
+update_doc(#http_db{uri=DbUrl, headers=Headers}, #doc{id=DocId}=Doc, Options) ->
+ [] = Options,
Url = DbUrl ++ url_encode(DocId),
{ResponseMembers} = do_http_request(Url, put, Headers,
- couch_doc:to_json_obj(Doc, [revs,attachments])),
- RevId = proplists:get_value(<<"rev">>, ResponseMembers),
- {ok, RevId};
-update_local_doc(Db, Doc, Options) ->
+ couch_doc:to_json_obj(Doc, [attachments])),
+ Rev = proplists:get_value(<<"rev">>, ResponseMembers),
+ {ok, couch_doc:parse_rev(Rev)};
+update_doc(Db, Doc, Options) ->
couch_db:update_doc(Db, Doc, Options).
+update_docs(_, [], _, _) ->
+ {ok, []};
+update_docs(#http_db{uri=DbUrl, headers=Headers}, Docs, [], replicated_changes) ->
+ JsonDocs = [couch_doc:to_json_obj(Doc, [revs,attachments]) || Doc <- Docs],
+ ErrorsJson =
+ do_http_request(DbUrl ++ "_bulk_docs", post, Headers,
+ {[{new_edits, false}, {docs, JsonDocs}]}),
+ ErrorsList =
+ lists:map(
+ fun({Props}) ->
+ Id = proplists:get_value(<<"id">>, Props),
+ Rev = couch_doc:parse_rev(proplists:get_value(<<"rev">>, Props)),
+ ErrId = couch_util:to_existing_atom(
+ proplists:get_value(<<"error">>, Props)),
+ Reason = proplists:get_value(<<"reason">>, Props),
+ Error = {ErrId, Reason},
+ {{Id, Rev}, Error}
+ end, ErrorsJson),
+ {ok, ErrorsList};
+update_docs(Db, Docs, Options, UpdateType) ->
+ couch_db:update_docs(Db, Docs, Options, UpdateType).
+
up_to_date(#http_db{}, _Seq) ->
true;
up_to_date(Source, Seq) ->