From 5dcbc2290ac780f1a625b5c9435cfb35eac4e1ef Mon Sep 17 00:00:00 2001 From: Adam Kocoloski Date: Mon, 10 Aug 2009 18:37:43 +0000 Subject: new replicator using _changes feed for continuous replication git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@802888 13f79535-47bb-0310-9956-ffa450edef68 --- src/couchdb/couch_rep_writer.erl | 68 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 68 insertions(+) create mode 100644 src/couchdb/couch_rep_writer.erl (limited to 'src/couchdb/couch_rep_writer.erl') diff --git a/src/couchdb/couch_rep_writer.erl b/src/couchdb/couch_rep_writer.erl new file mode 100644 index 00000000..8bea63fe --- /dev/null +++ b/src/couchdb/couch_rep_writer.erl @@ -0,0 +1,68 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_rep_writer). + +-export([start_link/4]). + +-include("couch_db.hrl"). + +start_link(Parent, Target, Reader, _PostProps) -> + {ok, spawn_link(fun() -> writer_loop(Parent, Reader, Target) end)}. + +writer_loop(Parent, Reader, Target) -> + % ?LOG_DEBUG("writer loop begin", []), + case couch_rep_reader:next(Reader) of + {complete, FinalSeq} -> + % ?LOG_INFO("writer terminating normally", []), + Parent ! {writer_checkpoint, FinalSeq}, + ok; + {HighSeq, Docs} -> + % ?LOG_DEBUG("writer loop trying to write ~p", [Docs]), + DocCount = length(Docs), + try write_docs(Target, Docs) of + {ok, []} -> + Parent ! {update_stats, docs_written, DocCount}; + {ok, Errors} -> + ErrorCount = length(Errors), + Parent ! {update_stats, doc_write_failures, ErrorCount}, + Parent ! {update_stats, docs_written, DocCount - ErrorCount} + catch + {attachment_request_failed, Err} -> + ?LOG_DEBUG("writer failed to write an attachment ~p", [Err]), + exit({attachment_request_failed, Err, Docs}) + end, + Parent ! {writer_checkpoint, HighSeq}, + couch_rep_att:cleanup(), + writer_loop(Parent, Reader, Target) + end. + +write_docs(#http_db{} = Db, Docs) -> + JsonDocs = [couch_doc:to_json_obj(Doc, [revs,attachments]) || Doc <- Docs], + ErrorsJson = couch_rep_httpc:request(Db#http_db{ + resource = "_bulk_docs", + method = post, + body = {[{new_edits, false}, {docs, JsonDocs}]} + }), + ErrorsList = + lists:map( + fun({Props}) -> + Id = proplists:get_value(<<"id">>, Props), + Rev = couch_doc:parse_rev(proplists:get_value(<<"rev">>, Props)), + ErrId = couch_util:to_existing_atom( + proplists:get_value(<<"error">>, Props)), + Reason = proplists:get_value(<<"reason">>, Props), + {{Id, Rev}, {ErrId, Reason}} + end, ErrorsJson), + {ok, ErrorsList}; +write_docs(Db, Docs) -> + couch_db:update_docs(Db, Docs, [], replicated_changes). -- cgit v1.2.3