% Licensed under the Apache License, Version 2.0 (the "License"); you may not % use this file except in compliance with the License. You may obtain a copy of % the License at % % http://www.apache.org/licenses/LICENSE-2.0 % % Unless required by applicable law or agreed to in writing, software % distributed under the License is distributed on an "AS IS" BASIS, WITHOUT % WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the % License for the specific language governing permissions and limitations under % the License. -module(couch_rep_writer). -export([start_link/4]). -include("couch_db.hrl"). start_link(Parent, _Target, Reader, _PostProps) -> {ok, spawn_link(fun() -> writer_loop(Parent, Reader) end)}. writer_loop(Parent, Reader) -> case couch_rep_reader:next(Reader) of {complete, FinalSeq} -> Parent ! {writer_checkpoint, FinalSeq}, ok; {HighSeq, Docs} -> DocCount = length(Docs), {ok, Target0} = gen_server:call(Parent, get_target_db, infinity), Target = open_db(Target0), try write_docs(Target, Docs) of {ok, []} -> Parent ! {update_stats, docs_written, DocCount}; {ok, Errors} -> ErrorCount = length(Errors), Parent ! {update_stats, doc_write_failures, ErrorCount}, Parent ! {update_stats, docs_written, DocCount - ErrorCount} catch {attachment_request_failed, Err} -> ?LOG_DEBUG("writer failed to write an attachment ~p", [Err]), exit({attachment_request_failed, Err, Docs}) after close_db(Target) end, Parent ! {writer_checkpoint, HighSeq}, couch_rep_att:cleanup(), couch_util:should_flush(), writer_loop(Parent, Reader) end. write_docs(#http_db{} = Db, Docs) -> {DocsAtts, DocsNoAtts} = lists:partition( fun(#doc{atts=[]}) -> false; (_) -> true end, Docs ), ErrorsJson0 = write_bulk_docs(Db, DocsNoAtts), ErrorsJson = lists:foldl( fun(Doc, Acc) -> write_multi_part_doc(Db, Doc) ++ Acc end, ErrorsJson0, DocsAtts ), {ok, ErrorsJson}; write_docs(Db, Docs) -> couch_db:update_docs(Db, Docs, [delay_commit], replicated_changes). write_bulk_docs(_Db, []) -> []; write_bulk_docs(#http_db{headers = Headers} = Db, Docs) -> JsonDocs = [ couch_doc:to_json_obj(Doc, [revs]) || Doc <- Docs ], Request = Db#http_db{ resource = "_bulk_docs", method = post, body = {[{new_edits, false}, {docs, JsonDocs}]}, headers = couch_util:proplist_apply_field({"Content-Type", "application/json"}, [{"X-Couch-Full-Commit", "false"} | Headers]) }, ErrorsJson = case couch_rep_httpc:request(Request) of {FailProps} -> exit({target_error, couch_util:get_value(<<"error">>, FailProps)}); List when is_list(List) -> List end, [write_docs_1(V) || V <- ErrorsJson]. write_multi_part_doc(#http_db{headers=Headers} = Db, #doc{atts=Atts} = Doc) -> JsonBytes = ?JSON_ENCODE( couch_doc:to_json_obj( Doc, [follows, att_encoding_info, attachments, revs] ) ), Boundary = couch_uuids:random(), {ContentType, Len} = couch_doc:len_doc_to_multi_part_stream( Boundary, JsonBytes, Atts, true ), StreamerPid = spawn_link( fun() -> streamer_fun(Boundary, JsonBytes, Atts) end ), BodyFun = fun(Acc) -> DataQueue = case Acc of nil -> StreamerPid ! {start, self()}, receive {queue, Q} -> Q end; Queue -> Queue end, case couch_work_queue:dequeue(DataQueue) of closed -> eof; {ok, Data} -> {ok, iolist_to_binary(Data), DataQueue} end end, Request = Db#http_db{ resource = couch_util:encode_doc_id(Doc), method = put, qs = [{new_edits, false}], body = {BodyFun, nil}, headers = [ {"x-couch-full-commit", "false"}, {"Content-Type", ?b2l(ContentType)}, {"Content-Length", Len} | Headers ] }, Conn = couch_rep_httpc:spawn_link_worker_process(Request), Result = try case couch_rep_httpc:request(Request#http_db{conn=Conn}) of {[{<<"error">>, Error}, {<<"reason">>, Reason}]} -> {Pos, [RevId | _]} = Doc#doc.revs, ErrId = couch_util:to_existing_atom(Error), [{Doc#doc.id, couch_doc:rev_to_str({Pos, RevId})}, {ErrId, Reason}]; _ -> [] end after ibrowse:stop_worker_process(Conn) end, StreamerPid ! stop, Result. streamer_fun(Boundary, JsonBytes, Atts) -> receive stop -> ok; {start, From} -> % better use a brand new queue, to ensure there's no garbage from % a previous (failed) iteration {ok, DataQueue} = couch_work_queue:new( [{max_size, 1024 * 1024}, {max_items, 1000}]), From ! {queue, DataQueue}, couch_doc:doc_to_multi_part_stream( Boundary, JsonBytes, Atts, fun(Data) -> couch_work_queue:queue(DataQueue, Data) end, true ), couch_work_queue:close(DataQueue), streamer_fun(Boundary, JsonBytes, Atts) end. write_docs_1({Props}) -> Id = couch_util:get_value(<<"id">>, Props), Rev = couch_doc:parse_rev(couch_util:get_value(<<"rev">>, Props)), ErrId = couch_util:to_existing_atom(couch_util:get_value(<<"error">>, Props)), Reason = couch_util:get_value(<<"reason">>, Props), {{Id, Rev}, {ErrId, Reason}}. open_db(#db{name = Name, user_ctx = UserCtx}) -> {ok, Db} = couch_db:open(Name, [{user_ctx, UserCtx}]), Db; open_db(HttpDb) -> HttpDb. close_db(#db{} = Db) -> couch_db:close(Db); close_db(_HttpDb) -> ok.