summaryrefslogtreecommitdiff
path: root/src/couchdb/couch_rep_writer.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/couchdb/couch_rep_writer.erl')
-rw-r--r--src/couchdb/couch_rep_writer.erl171
1 files changed, 0 insertions, 171 deletions
diff --git a/src/couchdb/couch_rep_writer.erl b/src/couchdb/couch_rep_writer.erl
deleted file mode 100644
index cf98ccfb..00000000
--- a/src/couchdb/couch_rep_writer.erl
+++ /dev/null
@@ -1,171 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_rep_writer).
-
--export([start_link/4]).
-
--include("couch_db.hrl").
-
-start_link(Parent, _Target, Reader, _PostProps) ->
- {ok, spawn_link(fun() -> writer_loop(Parent, Reader) end)}.
-
-writer_loop(Parent, Reader) ->
- case couch_rep_reader:next(Reader) of
- {complete, nil} ->
- ok;
- {complete, FinalSeq} ->
- Parent ! {writer_checkpoint, FinalSeq},
- ok;
- {HighSeq, Docs} ->
- DocCount = length(Docs),
- {ok, Target} = gen_server:call(Parent, get_target_db, infinity),
- try write_docs(Target, Docs) of
- {ok, []} ->
- Parent ! {update_stats, docs_written, DocCount};
- {ok, Errors} ->
- ErrorCount = length(Errors),
- Parent ! {update_stats, doc_write_failures, ErrorCount},
- Parent ! {update_stats, docs_written, DocCount - ErrorCount}
- catch
- {attachment_request_failed, Err} ->
- ?LOG_DEBUG("writer failed to write an attachment ~p", [Err]),
- exit({attachment_request_failed, Err, Docs})
- end,
- case HighSeq of
- nil ->
- ok;
- _SeqNumber ->
- Parent ! {writer_checkpoint, HighSeq}
- end,
- couch_rep_att:cleanup(),
- couch_util:should_flush(),
- writer_loop(Parent, Reader)
- end.
-
-write_docs(#http_db{} = Db, Docs) ->
- {DocsAtts, DocsNoAtts} = lists:partition(
- fun(#doc{atts=[]}) -> false; (_) -> true end,
- Docs
- ),
- ErrorsJson0 = write_bulk_docs(Db, DocsNoAtts),
- ErrorsJson = lists:foldl(
- fun(Doc, Acc) -> write_multi_part_doc(Db, Doc) ++ Acc end,
- ErrorsJson0,
- DocsAtts
- ),
- {ok, ErrorsJson};
-write_docs(Db, Docs) ->
- couch_db:update_docs(Db, Docs, [delay_commit], replicated_changes).
-
-write_bulk_docs(_Db, []) ->
- [];
-write_bulk_docs(#http_db{headers = Headers} = Db, Docs) ->
- JsonDocs = [
- couch_doc:to_json_obj(Doc, [revs, att_gzip_length]) || Doc <- Docs
- ],
- Request = Db#http_db{
- resource = "_bulk_docs",
- method = post,
- body = {[{new_edits, false}, {docs, JsonDocs}]},
- headers = couch_util:proplist_apply_field({"Content-Type", "application/json"}, [{"X-Couch-Full-Commit", "false"} | Headers])
- },
- ErrorsJson = case couch_rep_httpc:request(Request) of
- {FailProps} ->
- exit({target_error, couch_util:get_value(<<"error">>, FailProps)});
- List when is_list(List) ->
- List
- end,
- [write_docs_1(V) || V <- ErrorsJson].
-
-write_multi_part_doc(#http_db{headers=Headers} = Db, #doc{atts=Atts} = Doc) ->
- JsonBytes = ?JSON_ENCODE(
- couch_doc:to_json_obj(
- Doc,
- [follows, att_encoding_info, attachments]
- )
- ),
- Boundary = couch_uuids:random(),
- {ContentType, Len} = couch_doc:len_doc_to_multi_part_stream(
- Boundary, JsonBytes, Atts, true
- ),
- StreamerPid = spawn_link(
- fun() -> streamer_fun(Boundary, JsonBytes, Atts) end
- ),
- BodyFun = fun(Acc) ->
- DataQueue = case Acc of
- nil ->
- StreamerPid ! {start, self()},
- receive
- {queue, Q} ->
- Q
- end;
- Queue ->
- Queue
- end,
- case couch_work_queue:dequeue(DataQueue) of
- closed ->
- eof;
- {ok, Data} ->
- {ok, iolist_to_binary(Data), DataQueue}
- end
- end,
- Request = Db#http_db{
- resource = couch_util:url_encode(Doc#doc.id),
- method = put,
- qs = [{new_edits, false}],
- body = {BodyFun, nil},
- headers = [
- {"x-couch-full-commit", "false"},
- {"Content-Type", ?b2l(ContentType)},
- {"Content-Length", Len} | Headers
- ]
- },
- Result = case couch_rep_httpc:request(Request) of
- {[{<<"error">>, Error}, {<<"reason">>, Reason}]} ->
- {Pos, [RevId | _]} = Doc#doc.revs,
- ErrId = couch_util:to_existing_atom(Error),
- [{Doc#doc.id, couch_doc:rev_to_str({Pos, RevId})}, {ErrId, Reason}];
- _ ->
- []
- end,
- StreamerPid ! stop,
- Result.
-
-streamer_fun(Boundary, JsonBytes, Atts) ->
- receive
- stop ->
- ok;
- {start, From} ->
- % better use a brand new queue, to ensure there's no garbage from
- % a previous (failed) iteration
- {ok, DataQueue} = couch_work_queue:new(1024 * 1024, 1000),
- From ! {queue, DataQueue},
- couch_doc:doc_to_multi_part_stream(
- Boundary,
- JsonBytes,
- Atts,
- fun(Data) ->
- couch_work_queue:queue(DataQueue, Data)
- end,
- true
- ),
- couch_work_queue:close(DataQueue),
- streamer_fun(Boundary, JsonBytes, Atts)
- end.
-
-write_docs_1({Props}) ->
- Id = couch_util:get_value(<<"id">>, Props),
- Rev = couch_doc:parse_rev(couch_util:get_value(<<"rev">>, Props)),
- ErrId = couch_util:to_existing_atom(couch_util:get_value(<<"error">>, Props)),
- Reason = couch_util:get_value(<<"reason">>, Props),
- {{Id, Rev}, {ErrId, Reason}}.