diff options
author | John Christopher Anderson <jchris@apache.org> | 2009-03-13 22:15:34 +0000 |
---|---|---|
committer | John Christopher Anderson <jchris@apache.org> | 2009-03-13 22:15:34 +0000 |
commit | 9007e2d21dea8b0185c0096b30364a8ee40a3867 (patch) | |
tree | 7d8dacb2c8cd619f18dfab8fdb40d146ac28c85a /src/couchdb/couch_rep.erl | |
parent | 65608e14e8911b33c30178d717d745edc9f66c17 (diff) |
Commit Damien's rep_security branch to trunk.
Changes bulk_docs conflict checking.
Breaks file format, see mailing list for data upgrade procedure, or
http://wiki.apache.org/couchdb/Breaking_changes
git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@753448 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src/couchdb/couch_rep.erl')
-rw-r--r-- | src/couchdb/couch_rep.erl | 365 |
1 files changed, 202 insertions, 163 deletions
diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl index 89d40be3..3647f6db 100644 --- a/src/couchdb/couch_rep.erl +++ b/src/couchdb/couch_rep.erl @@ -15,11 +15,11 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([replicate/3]). +-export([replicate/2]). -include_lib("couch_db.hrl"). -%% @spec replicate(Source::binary(), Target::binary(), Options::proplist()) -> +%% @spec replicate(Source::binary(), Target::binary()) -> %% {ok, Stats} | {error, Reason} %% @doc Triggers a replication. Stats is a JSON Object with the following %% keys: session_id (UUID), source_last_seq (integer), and history (array). @@ -30,26 +30,29 @@ %% The supervisor will try to restart the replication in case of any error %% other than shutdown. Just call this function again to listen for the %% result of the retry. -replicate(Source, Target, Options) -> - Id = <<Source/binary, ":", Target/binary>>, - Args = [?MODULE, [Source,Target,Options], []], +replicate(Source, Target) -> - Replicator = {Id, + {ok, HostName} = inet:gethostname(), + RepId = couch_util:to_hex( + erlang:md5(term_to_binary([HostName, Source, Target]))), + Args = [?MODULE, [RepId, Source,Target], []], + + Replicator = {RepId, {gen_server, start_link, Args}, transient, - 10000, + 1, worker, [?MODULE] }, Server = case supervisor:start_child(couch_rep_sup, Replicator) of {ok, Pid} -> - ?LOG_INFO("starting new replication ~p at ~p", [Id, Pid]), + ?LOG_INFO("starting new replication ~p at ~p", [RepId, Pid]), Pid; {error, already_present} -> - case supervisor:restart_child(couch_rep_sup, Id) of + case supervisor:restart_child(couch_rep_sup, RepId) of {ok, Pid} -> - ?LOG_INFO("starting replication ~p at ~p", [Id, Pid]), + ?LOG_INFO("starting replication ~p at ~p", [RepId, Pid]), Pid; {error, running} -> %% this error occurs if multiple replicators are racing @@ -57,16 +60,16 @@ replicate(Source, Target, Options) -> %% the Pid by calling start_child again. {error, {already_started, Pid}} = supervisor:start_child(couch_rep_sup, Replicator), - ?LOG_INFO("replication ~p already running at ~p", [Id, Pid]), + ?LOG_INFO("replication ~p already running at ~p", [RepId, Pid]), Pid end; {error, {already_started, Pid}} -> - ?LOG_INFO("replication ~p already running at ~p", [Id, Pid]), + ?LOG_INFO("replication ~p already running at ~p", [RepId, Pid]), Pid end, case gen_server:call(Server, get_result, infinity) of - retry -> replicate(Source, Target, Options); + retry -> replicate(Source, Target); Else -> Else end. @@ -79,6 +82,7 @@ replicate(Source, Target, Options) -> headers }). + -record(state, { context, current_seq, @@ -90,18 +94,14 @@ replicate(Source, Target, Options) -> listeners = [] }). -init([Source, Target, Options]) -> + +init([RepId, Source, Target]) -> process_flag(trap_exit, true), - {ok, DbSrc} = - open_db(Source, proplists:get_value(source_options, Options, [])), - {ok, DbTgt} = - open_db(Target, proplists:get_value(target_options, Options, [])), + {ok, DbSrc, SrcName} = open_db(Source), + {ok, DbTgt, TgtName} = open_db(Target), - {ok, Host} = inet:gethostname(), - HostBin = list_to_binary(Host), - DocKey = <<?LOCAL_DOC_PREFIX, HostBin/binary, ":", Source/binary, ":", - Target/binary>>, + DocKey = ?l2b(?LOCAL_DOC_PREFIX ++ RepId), {ok, InfoSrc} = get_db_info(DbSrc), {ok, InfoTgt} = get_db_info(DbTgt), @@ -110,49 +110,49 @@ init([Source, Target, Options]) -> SrcInstanceStartTime = proplists:get_value(instance_start_time, InfoSrc), TgtInstanceStartTime = proplists:get_value(instance_start_time, InfoTgt), - case proplists:get_value(full, Options, false) - orelse proplists:get_value("full", Options, false) of + RepRecDocSrc = + case open_doc(DbSrc, DocKey, []) of + {ok, SrcDoc} -> + ?LOG_DEBUG("Found existing replication record on source", []), + SrcDoc; + _ -> #doc{id=DocKey} + end, + + RepRecDocTgt = + case open_doc(DbTgt, DocKey, []) of + {ok, TgtDoc} -> + ?LOG_DEBUG("Found existing replication record on target", []), + TgtDoc; + _ -> #doc{id=DocKey} + end, + + #doc{body={RepRecProps}} = RepRecDocSrc, + #doc{body={RepRecPropsTgt}} = RepRecDocTgt, + + case proplists:get_value(<<"session_id">>, RepRecProps) == + proplists:get_value(<<"session_id">>, RepRecPropsTgt) of true -> - RepRecSrc = RepRecTgt = #doc{id=DocKey}; + % if the records have the same session id, + % then we have a valid replication history + OldSeqNum = proplists:get_value(<<"source_last_seq">>, RepRecProps, 0), + OldHistory = proplists:get_value(<<"history">>, RepRecProps, []); false -> - RepRecSrc = case open_doc(DbSrc, DocKey, []) of - {ok, SrcDoc} -> - ?LOG_DEBUG("Found existing replication record on source", []), - SrcDoc; - _ -> #doc{id=DocKey} - end, - - RepRecTgt = case open_doc(DbTgt, DocKey, []) of - {ok, TgtDoc} -> - ?LOG_DEBUG("Found existing replication record on target", []), - TgtDoc; - _ -> #doc{id=DocKey} - end - end, - - #doc{body={OldRepHistoryProps}} = RepRecSrc, - #doc{body={OldRepHistoryPropsTrg}} = RepRecTgt, - - SeqNum = case OldRepHistoryProps == OldRepHistoryPropsTrg of - true -> - % if the records are identical, then we have a valid replication history - proplists:get_value(<<"source_last_seq">>, OldRepHistoryProps, 0); - false -> - ?LOG_INFO("Replication records differ. " + ?LOG_INFO("Replication records differ. " "Performing full replication instead of incremental.", []), - ?LOG_DEBUG("Record on source:~p~nRecord on target:~p~n", - [OldRepHistoryProps, OldRepHistoryPropsTrg]), - 0 + ?LOG_DEBUG("Record on source:~p~nRecord on target:~p~n", + [RepRecProps, RepRecPropsTgt]), + OldSeqNum = 0, + OldHistory = [] end, Context = [ - {start_seq, SeqNum}, - {history, OldRepHistoryProps}, + {start_seq, OldSeqNum}, + {history, OldHistory}, {rep_starttime, ReplicationStartTime}, {src_starttime, SrcInstanceStartTime}, {tgt_starttime, TgtInstanceStartTime}, - {src_record, RepRecSrc}, - {tgt_record, RepRecTgt} + {src_record, RepRecDocSrc}, + {tgt_record, RepRecDocTgt} ], Stats = ets:new(replication_stats, [set, private]), @@ -160,16 +160,17 @@ init([Source, Target, Options]) -> ets:insert(Stats, {missing_revs, 0}), ets:insert(Stats, {docs_read, 0}), ets:insert(Stats, {docs_written, 0}), + ets:insert(Stats, {doc_write_failures, 0}), - couch_task_status:add_task("Replication", <<Source/binary, " -> ", - Target/binary>>, "Starting"), + couch_task_status:add_task("Replication", <<SrcName/binary, " -> ", + TgtName/binary>>, "Starting"), Parent = self(), - Pid = spawn_link(fun() -> enum_docs_since(Parent,DbSrc,DbTgt,{SeqNum,0}) end), + Pid = spawn_link(fun() -> enum_docs_since(Parent,DbSrc,DbTgt,{OldSeqNum,0}) end), State = #state{ context = Context, - current_seq = SeqNum, + current_seq = OldSeqNum, enum_pid = Pid, source = DbSrc, target = DbTgt, @@ -178,7 +179,6 @@ init([Source, Target, Options]) -> {ok, State}. - handle_call(get_result, From, #state{listeners=L} = State) -> {noreply, State#state{listeners=[From|L]}}; @@ -191,7 +191,7 @@ handle_call({replicate_doc, {Id, Revs}}, {Pid,_}, #state{enum_pid=Pid} = State) target = Target, stats = Stats } = State, - + ets:update_counter(Stats, missing_revs, length(Revs)), %% get document(s) @@ -203,8 +203,11 @@ handle_call({replicate_doc, {Id, Revs}}, {Pid,_}, #state{enum_pid=Pid} = State) {NewBuffer, NewContext} = case couch_util:should_flush() of true -> Docs2 = lists:flatten([Docs|Buffer]), - ok = update_docs(Target, Docs2, [], false), - ets:update_counter(Stats, docs_written, length(Docs2)), + {ok, Errors} = update_docs(Target, Docs2, [], replicated_changes), + dump_update_errors(Errors), + ets:update_counter(Stats, doc_write_failures, length(Errors)), + ets:update_counter(Stats, docs_written, length(Docs2) - + length(Errors)), {ok, _, Ctxt} = do_checkpoint(Source, Target, Context, Seq, Stats), {[], Ctxt}; false -> @@ -255,8 +258,11 @@ terminate(normal, State) -> stats = Stats } = State, - ok = update_docs(Target, lists:flatten(Buffer), [], false), - ets:update_counter(Stats, docs_written, lists:flatlength(Buffer)), + {ok, Errors} = update_docs(Target, lists:flatten(Buffer), [], replicated_changes), + dump_update_errors(Errors), + ets:update_counter(Stats, doc_write_failures, length(Errors)), + ets:update_counter(Stats, docs_written, lists:flatlength(Buffer) - + length(Errors)), couch_task_status:update("Finishing"), @@ -264,9 +270,12 @@ terminate(normal, State) -> ets:delete(Stats), close_db(Target), - %% reply to original requester - [Original|Rest] = Listeners, - gen_server:reply(Original, {ok, NewRepHistory}), + case Listeners of + [Original|Rest] -> + %% reply to original requester + gen_server:reply(Original, {ok, NewRepHistory}); + Rest -> ok + end, %% maybe trigger another replication. If this replicator uses a local %% source Db, changes to that Db since we started will not be included in @@ -304,6 +313,16 @@ code_change(_OldVsn, State, _Extra) -> %% internal functions %%============================================================================= + +% we should probably write these to a special replication log +% or have a callback where the caller decides what to do with replication +% errors. +dump_update_errors([]) -> ok; +dump_update_errors([{{Id, Rev}, Error}|Rest]) -> + ?LOG_INFO("error replicating document \"~s\" rev \"~s\":~p", + [Id, couch_doc:rev_to_str(Rev), Error]), + dump_update_errors(Rest). + attachment_loop(ReqId) -> couch_util:should_flush(), receive @@ -354,6 +373,16 @@ attachment_stub_converter(DbS, Id, {Name, {stub, Type, Length}}) -> end, {Name, {Type, {RcvFun, Length}}}. + +open_db({remote, Url, Headers})-> + {ok, #http_db{uri=?b2l(Url), headers=Headers}, Url}; +open_db({local, DbName, UserCtx})-> + case couch_db:open(DbName, [{user_ctx, UserCtx}]) of + {ok, Db} -> {ok, Db, DbName}; + Error -> Error + end. + + close_db(#http_db{})-> ok; close_db(Db)-> @@ -362,27 +391,38 @@ close_db(Db)-> do_checkpoint(Source, Target, Context, NewSeqNum, Stats) -> ?LOG_INFO("recording a checkpoint at source update_seq ~p", [NewSeqNum]), [ - {start_seq, SeqNum}, - {history, OldRepHistoryProps}, + {start_seq, StartSeqNum}, + {history, OldHistory}, {rep_starttime, ReplicationStartTime}, {src_starttime, SrcInstanceStartTime}, {tgt_starttime, TgtInstanceStartTime}, - {src_record, RepRecSrc}, - {tgt_record, RepRecTgt} + {src_record, #doc{body={LastRepRecord}}=RepRecDocSrc}, + {tgt_record, RepRecDocTgt} ] = Context, - NewHistory = case NewSeqNum == SeqNum andalso OldRepHistoryProps /= [] of + case NewSeqNum == StartSeqNum andalso OldHistory /= [] of true -> % nothing changed, don't record results - {OldRepHistoryProps}; + {ok, {[{<<"no_changes">>, true} | LastRepRecord]}, Context}; false -> + % something changed, record results for incremental replication, + % commit changes to both src and tgt. The src because if changes - % we replicated are lost, we'll record the a seq number of ahead - % of what was committed and therefore lose future changes with the - % same seq nums. - {ok, SrcInstanceStartTime2} = ensure_full_commit(Source), + % we replicated are lost, we'll record the a seq number ahead + % of what was committed. If those changes are lost and the seq number + % reverts to a previous committed value, we will skip future changes + % when new doc updates are given our already replicated seq nums. + + % commit the src async + ParentPid = self(), + SrcCommitPid = spawn_link(fun() -> + ParentPid ! {self(), ensure_full_commit(Source)} end), + + % commit tgt sync {ok, TgtInstanceStartTime2} = ensure_full_commit(Target), + receive {SrcCommitPid, {ok, SrcInstanceStartTime2}} -> ok end, + RecordSeqNum = if SrcInstanceStartTime2 == SrcInstanceStartTime andalso TgtInstanceStartTime2 == TgtInstanceStartTime -> @@ -391,60 +431,57 @@ do_checkpoint(Source, Target, Context, NewSeqNum, Stats) -> ?LOG_INFO("A server has restarted sinced replication start. " "Not recording the new sequence number to ensure the " "replication is redone and documents reexamined.", []), - SeqNum + StartSeqNum end, - %% format replication history - JsonStats = [ + NewHistoryEntry = { + [{<<"start_time">>, list_to_binary(ReplicationStartTime)}, + {<<"end_time">>, list_to_binary(httpd_util:rfc1123_date())}, + {<<"start_last_seq">>, StartSeqNum}, + {<<"end_last_seq">>, NewSeqNum}, {<<"missing_checked">>, ets:lookup_element(Stats, total_revs, 2)}, {<<"missing_found">>, ets:lookup_element(Stats, missing_revs, 2)}, {<<"docs_read">>, ets:lookup_element(Stats, docs_read, 2)}, - {<<"docs_written">>, ets:lookup_element(Stats, docs_written, 2)} + {<<"docs_written">>, ets:lookup_element(Stats, docs_written, 2)}, + {<<"doc_write_failures">>, ets:lookup_element(Stats, doc_write_failures, 2)} + ]}, + % limit history to 50 entries + HistEntries =lists:sublist([NewHistoryEntry | OldHistory], 50), + + NewRepHistory = + {[{<<"session_id">>, couch_util:new_uuid()}, + {<<"source_last_seq">>, RecordSeqNum}, + {<<"history">>, HistEntries}]}, + + {ok, {SrcRevPos,SrcRevId}} = update_doc(Source, + RepRecDocSrc#doc{body=NewRepHistory}, []), + {ok, {TgtRevPos,TgtRevId}} = update_doc(Target, + RepRecDocTgt#doc{body=NewRepHistory}, []), + + NewContext = [ + {start_seq, StartSeqNum}, + {history, OldHistory}, + {rep_starttime, ReplicationStartTime}, + {src_starttime, SrcInstanceStartTime}, + {tgt_starttime, TgtInstanceStartTime}, + {src_record, RepRecDocSrc#doc{revs={SrcRevPos,[SrcRevId]}}}, + {tgt_record, RepRecDocTgt#doc{revs={TgtRevPos,[TgtRevId]}}} ], - - HistEntries =[ - { - [{<<"start_time">>, list_to_binary(ReplicationStartTime)}, - {<<"end_time">>, list_to_binary(httpd_util:rfc1123_date())}, - {<<"start_last_seq">>, SeqNum}, - {<<"end_last_seq">>, NewSeqNum} | JsonStats]} - | proplists:get_value(<<"history">>, OldRepHistoryProps, [])], - % something changed, record results - {[ - {<<"session_id">>, couch_util:new_uuid()}, - {<<"source_last_seq">>, RecordSeqNum}, - {<<"history">>, lists:sublist(HistEntries, 50)} - ]} - end, - %% update local documents - RepRecSrc = proplists:get_value(src_record, Context), - RepRecTgt = proplists:get_value(tgt_record, Context), - {ok, TgtRev} = update_local_doc(Target, RepRecTgt#doc{body=NewHistory}, []), - {ok, SrcRev} = update_local_doc(Source, RepRecSrc#doc{body=NewHistory}, []), + {ok, NewRepHistory, NewContext} - NewContext = [ - {start_seq, SeqNum}, - {history, OldRepHistoryProps}, - {rep_starttime, ReplicationStartTime}, - {src_starttime, SrcInstanceStartTime}, - {tgt_starttime, TgtInstanceStartTime}, - {src_record, RepRecSrc#doc{revs=[SrcRev]}}, - {tgt_record, RepRecTgt#doc{revs=[TgtRev]}} - ], - - {ok, NewHistory, NewContext}. + end. do_http_request(Url, Action, Headers) -> do_http_request(Url, Action, Headers, []). do_http_request(Url, Action, Headers, JsonBody) -> - do_http_request(?b2l(?l2b(Url)), Action, Headers, JsonBody, 10). + do_http_request(Url, Action, Headers, JsonBody, 10). do_http_request(Url, Action, _Headers, _JsonBody, 0) -> ?LOG_ERROR("couch_rep HTTP ~p request failed after 10 retries: ~s", [Action, Url]), - exit({http_request_failed, ?l2b(Url)}); + exit({http_request_failed, Url}); do_http_request(Url, Action, Headers, JsonBody, Retries) -> ?LOG_DEBUG("couch_rep HTTP ~p request: ~s", [Action, Url]), Body = @@ -498,7 +535,6 @@ enum_docs_since(Pid, DbSource, DbTarget, {StartSeq, RevsCount}) -> [] -> gen_server:call(Pid, {fin, {StartSeq, RevsCount}}, infinity); DocInfoList -> - % UpdateSeqs = [D#doc_info.update_seq || D <- DocInfoList], SrcRevsList = lists:map(fun(SrcDocInfo) -> #doc_info{id=Id, rev=Rev, @@ -521,13 +557,8 @@ enum_docs_since(Pid, DbSource, DbTarget, {StartSeq, RevsCount}) -> enum_docs_since(Pid, DbSource, DbTarget, {LastSeq, RevsCount2}) end. -fix_url(UrlBin) -> - Url = binary_to_list(UrlBin), - case lists:last(Url) of - $/ -> Url; - _ -> Url ++ "/" - end. + get_db_info(#http_db{uri=DbUrl, headers=Headers}) -> {DbProps} = do_http_request(DbUrl, get, Headers), {ok, [{list_to_existing_atom(?b2l(K)), V} || {K,V} <- DbProps]}; @@ -542,12 +573,12 @@ get_doc_info_list(#http_db{uri=DbUrl, headers=Headers}, StartSeq) -> {RowValueProps} = proplists:get_value(<<"value">>, RowInfoList), #doc_info{ id=proplists:get_value(<<"id">>, RowInfoList), - rev=proplists:get_value(<<"rev">>, RowValueProps), + rev=couch_doc:parse_rev(proplists:get_value(<<"rev">>, RowValueProps)), update_seq = proplists:get_value(<<"key">>, RowInfoList), conflict_revs = - proplists:get_value(<<"conflicts">>, RowValueProps, []), + couch_doc:parse_revs(proplists:get_value(<<"conflicts">>, RowValueProps, [])), deleted_conflict_revs = - proplists:get_value(<<"deleted_conflicts">>, RowValueProps, []), + couch_doc:parse_revs(proplists:get_value(<<"deleted_conflicts">>, RowValueProps, [])), deleted = proplists:get_value(<<"deleted">>, RowValueProps, false) } end, proplists:get_value(<<"rows">>, Results)); @@ -561,25 +592,18 @@ get_doc_info_list(DbSource, StartSeq) -> lists:reverse(DocInfoList). get_missing_revs(#http_db{uri=DbUrl, headers=Headers}, DocIdRevsList) -> + DocIdRevsList2 = [{Id, couch_doc:rev_to_strs(Revs)} || {Id, Revs} <- DocIdRevsList], {ResponseMembers} = do_http_request(DbUrl ++ "_missing_revs", post, Headers, - {DocIdRevsList}), - {MissingRevs} = proplists:get_value(<<"missing_revs">>, ResponseMembers), - {ok, MissingRevs}; + {DocIdRevsList2}), + {DocMissingRevsList} = proplists:get_value(<<"missing_revs">>, ResponseMembers), + DocMissingRevsList2 = [{Id, couch_doc:parse_revs(MissingRevStrs)} || {Id, MissingRevStrs} <- DocMissingRevsList], + {ok, DocMissingRevsList2}; get_missing_revs(Db, DocId) -> couch_db:get_missing_revs(Db, DocId). -open_http_db(UrlBin, Options) -> - Headers = proplists:get_value(headers, Options, {[]}), - {ok, #http_db{uri=fix_url(UrlBin), headers=Headers}}. - -open_db(<<"http://", _/binary>>=Url, Options)-> - open_http_db(Url, Options); -open_db(<<"https://", _/binary>>=Url, Options)-> - open_http_db(Url, Options); -open_db(DbName, Options)-> - couch_db:open(DbName, Options). - -open_doc(#http_db{uri=DbUrl, headers=Headers}, DocId, []) -> + +open_doc(#http_db{uri=DbUrl, headers=Headers}, DocId, Options) -> + [] = Options, case do_http_request(DbUrl ++ url_encode(DocId), get, Headers) of {[{<<"error">>, ErrId}, {<<"reason">>, Reason}]} -> {couch_util:to_existing_atom(ErrId), Reason}; @@ -589,7 +613,9 @@ open_doc(#http_db{uri=DbUrl, headers=Headers}, DocId, []) -> open_doc(Db, DocId, Options) -> couch_db:open_doc(Db, DocId, Options). -open_doc_revs(#http_db{uri=DbUrl, headers=Headers} = DbS, DocId, Revs, _Opts) -> +open_doc_revs(#http_db{uri=DbUrl, headers=Headers} = DbS, DocId, Revs0, + [latest]) -> + Revs = couch_doc:rev_to_strs(Revs0), BaseUrl = DbUrl ++ url_encode(DocId) ++ "?revs=true&latest=true", %% MochiWeb expects URLs < 8KB long, so maybe split into multiple requests @@ -612,39 +638,52 @@ open_doc_revs(#http_db{uri=DbUrl, headers=Headers} = DbS, DocId, Revs, _Opts) -> lists:flatten(?JSON_ENCODE(lists:reverse(Rest))), get, Headers) end, - Results = - lists:map(fun({[{<<"missing">>, Rev}]}) -> - {{not_found, missing}, Rev}; - ({[{<<"ok">>, JsonDoc}]}) -> + Results = + lists:map( + fun({[{<<"missing">>, Rev}]}) -> + {{not_found, missing}, couch_doc:parse_rev(Rev)}; + ({[{<<"ok">>, JsonDoc}]}) -> #doc{id=Id, attachments=Attach} = Doc = couch_doc:from_json_obj(JsonDoc), Attach2 = [attachment_stub_converter(DbS,Id,A) || A <- Attach], {ok, Doc#doc{attachments=Attach2}} - end, JsonResults), + end, JsonResults), {ok, Results}; open_doc_revs(Db, DocId, Revs, Options) -> couch_db:open_doc_revs(Db, DocId, Revs, Options). -update_docs(_, [], _, _) -> - ok; -update_docs(#http_db{uri=DbUrl, headers=Headers}, Docs, [], NewEdits) -> - JsonDocs = [couch_doc:to_json_obj(Doc, [revs,attachments]) || Doc <- Docs], - {Returned} = - do_http_request(DbUrl ++ "_bulk_docs", post, Headers, - {[{new_edits, NewEdits}, {docs, JsonDocs}]}), - true = proplists:get_value(<<"ok">>, Returned), - ok; -update_docs(Db, Docs, Options, NewEdits) -> - couch_db:update_docs(Db, Docs, Options, NewEdits). -update_local_doc(#http_db{uri=DbUrl, headers=Headers}, #doc{id=DocId}=Doc, []) -> +update_doc(#http_db{uri=DbUrl, headers=Headers}, #doc{id=DocId}=Doc, Options) -> + [] = Options, Url = DbUrl ++ url_encode(DocId), {ResponseMembers} = do_http_request(Url, put, Headers, - couch_doc:to_json_obj(Doc, [revs,attachments])), - RevId = proplists:get_value(<<"rev">>, ResponseMembers), - {ok, RevId}; -update_local_doc(Db, Doc, Options) -> + couch_doc:to_json_obj(Doc, [attachments])), + Rev = proplists:get_value(<<"rev">>, ResponseMembers), + {ok, couch_doc:parse_rev(Rev)}; +update_doc(Db, Doc, Options) -> couch_db:update_doc(Db, Doc, Options). +update_docs(_, [], _, _) -> + {ok, []}; +update_docs(#http_db{uri=DbUrl, headers=Headers}, Docs, [], replicated_changes) -> + JsonDocs = [couch_doc:to_json_obj(Doc, [revs,attachments]) || Doc <- Docs], + ErrorsJson = + do_http_request(DbUrl ++ "_bulk_docs", post, Headers, + {[{new_edits, false}, {docs, JsonDocs}]}), + ErrorsList = + lists:map( + fun({Props}) -> + Id = proplists:get_value(<<"id">>, Props), + Rev = couch_doc:parse_rev(proplists:get_value(<<"rev">>, Props)), + ErrId = couch_util:to_existing_atom( + proplists:get_value(<<"error">>, Props)), + Reason = proplists:get_value(<<"reason">>, Props), + Error = {ErrId, Reason}, + {{Id, Rev}, Error} + end, ErrorsJson), + {ok, ErrorsList}; +update_docs(Db, Docs, Options, UpdateType) -> + couch_db:update_docs(Db, Docs, Options, UpdateType). + up_to_date(#http_db{}, _Seq) -> true; up_to_date(Source, Seq) -> |