% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
%   http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.

-module(couch_rep_writer).

-export([start_link/4]).

-include("couch_db.hrl").

start_link(Parent, Target, Reader, _PostProps) ->
    {ok, spawn_link(fun() -> writer_loop(Parent, Reader, Target) end)}.

writer_loop(Parent, Reader, Target) ->
    case couch_rep_reader:next(Reader) of
    {complete, nil} ->
        ok;
    {complete, FinalSeq} ->
        Parent ! {writer_checkpoint, FinalSeq},
        ok;
    {HighSeq, Docs} ->
        DocCount = length(Docs),
        try write_docs(Target, Docs) of
        {ok, []} ->
            Parent ! {update_stats, docs_written, DocCount};
        {ok, Errors} ->
            ErrorCount = length(Errors),
            Parent ! {update_stats, doc_write_failures, ErrorCount},
            Parent ! {update_stats, docs_written, DocCount - ErrorCount}
        catch
        {attachment_request_failed, Err} ->
            ?LOG_DEBUG("writer failed to write an attachment ~p", [Err]),
            exit({attachment_request_failed, Err, Docs})
        end,
        case HighSeq of
        nil ->
            ok;
        _SeqNumber ->
            Parent ! {writer_checkpoint, HighSeq}
        end,
        couch_rep_att:cleanup(),
        couch_util:should_flush(),
        writer_loop(Parent, Reader, Target)
    end.

write_docs(#http_db{} = Db, Docs) ->
    {DocsAtts, DocsNoAtts} = lists:partition(
        fun(#doc{atts=[]}) -> false; (_) -> true end,
        Docs
    ),
    ErrorsJson0 = write_bulk_docs(Db, DocsNoAtts),
    ErrorsJson = lists:foldl(
       fun(Doc, Acc) -> write_multi_part_doc(Db, Doc) ++ Acc end,
       ErrorsJson0,
       DocsAtts
    ),
    {ok, ErrorsJson};
write_docs(Db, Docs) ->
    couch_db:update_docs(Db, Docs, [delay_commit], replicated_changes).

write_bulk_docs(_Db, []) ->
    [];
write_bulk_docs(#http_db{headers = Headers} = Db, Docs) ->
    JsonDocs = [
        couch_doc:to_json_obj(Doc, [revs, att_gzip_length]) || Doc <- Docs
    ],
    Request = Db#http_db{
        resource = "_bulk_docs",
        method = post,
        body = {[{new_edits, false}, {docs, JsonDocs}]},
        headers = couch_util:proplist_apply_field({"Content-Type", "application/json"}, [{"X-Couch-Full-Commit", "false"} | Headers])
    },
    ErrorsJson = case couch_rep_httpc:request(Request) of
    {FailProps} ->
        exit({target_error, couch_util:get_value(<<"error">>, FailProps)});
    List when is_list(List) ->
        List
    end,
    [write_docs_1(V) || V <- ErrorsJson].

write_multi_part_doc(#http_db{headers=Headers} = Db, #doc{atts=Atts} = Doc) ->
    JsonBytes = ?JSON_ENCODE(
        couch_doc:to_json_obj(
            Doc,
            [follows, att_encoding_info, attachments]
        )
    ),
    Boundary = couch_uuids:random(),
    {ContentType, Len} = couch_doc:len_doc_to_multi_part_stream(
        Boundary, JsonBytes, Atts, true
    ),
    StreamerPid = spawn_link(
        fun() -> streamer_fun(Boundary, JsonBytes, Atts) end
    ),
    BodyFun = fun(Acc) ->
        DataQueue = case Acc of
        nil ->
            StreamerPid ! {start, self()},
            receive
            {queue, Q} ->
                Q
            end;
        Queue ->
            Queue
        end,
        case couch_work_queue:dequeue(DataQueue) of
        closed ->
            eof;
        {ok, Data} ->
            {ok, iolist_to_binary(Data), DataQueue}
        end
    end,
    Request = Db#http_db{
        resource = couch_util:encode_doc_id(Doc),
        method = put,
        qs = [{new_edits, false}],
        body = {BodyFun, nil},
        headers = [
            {"x-couch-full-commit", "false"},
            {"Content-Type", ?b2l(ContentType)},
            {"Content-Length", Len} | Headers
        ]
    },
    Result = case couch_rep_httpc:request(Request) of
    {[{<<"error">>, Error}, {<<"reason">>, Reason}]} ->
        {Pos, [RevId | _]} = Doc#doc.revs,
        ErrId = couch_util:to_existing_atom(Error),
        [{Doc#doc.id, couch_doc:rev_to_str({Pos, RevId})}, {ErrId, Reason}];
    _ ->
        []
    end,
    StreamerPid ! stop,
    Result.

streamer_fun(Boundary, JsonBytes, Atts) ->
    receive
    stop ->
        ok;
    {start, From} ->
        % better use a brand new queue, to ensure there's no garbage from
        % a previous (failed) iteration
        {ok, DataQueue} = couch_work_queue:new(
            [{max_size, 1024 * 1024}, {max_items, 1000}]),
        From ! {queue, DataQueue},
        couch_doc:doc_to_multi_part_stream(
            Boundary,
            JsonBytes,
            Atts,
            fun(Data) ->
                couch_work_queue:queue(DataQueue, Data)
            end,
            true
        ),
        couch_work_queue:close(DataQueue),
        streamer_fun(Boundary, JsonBytes, Atts)
    end.

write_docs_1({Props}) ->
    Id = couch_util:get_value(<<"id">>, Props),
    Rev = couch_doc:parse_rev(couch_util:get_value(<<"rev">>, Props)),
    ErrId = couch_util:to_existing_atom(couch_util:get_value(<<"error">>, Props)),
    Reason = couch_util:get_value(<<"reason">>, Props),
    {{Id, Rev}, {ErrId, Reason}}.