diff options
| -rw-r--r-- | src/couchdb/couch_changes.erl | 2 | ||||
| -rw-r--r-- | src/couchdb/couch_rep.erl | 105 | ||||
| -rw-r--r-- | src/couchdb/couch_rep_changes_feed.erl | 107 | ||||
| -rw-r--r-- | src/couchdb/couch_rep_reader.erl | 70 | ||||
| -rw-r--r-- | src/couchdb/couch_rep_writer.erl | 9 | 
5 files changed, 112 insertions, 181 deletions
diff --git a/src/couchdb/couch_changes.erl b/src/couchdb/couch_changes.erl index ec0beb57..1e53161b 100644 --- a/src/couchdb/couch_changes.erl +++ b/src/couchdb/couch_changes.erl @@ -119,6 +119,8 @@ os_filter_fun(FilterName, Style, Req, Db) ->              "filter parameter must be of the form `designname/filtername`"})      end. +builtin_filter_fun("_doc_ids", Style, {json_req, {Props}}, _Db) -> +    filter_docids(couch_util:get_value(<<"doc_ids">>, Props), Style);  builtin_filter_fun("_doc_ids", Style, #httpd{method='POST'}=Req, _Db) ->      {Props} = couch_httpd:json_body_obj(Req),      DocIds =  couch_util:get_value(<<"doc_ids">>, Props, nil), diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl index 88912fbf..d35471c5 100644 --- a/src/couchdb/couch_rep.erl +++ b/src/couchdb/couch_rep.erl @@ -53,7 +53,6 @@      committed_seq = 0,      stats = nil, -    doc_ids = nil,      rep_doc = nil  }). @@ -129,7 +128,6 @@ do_init([RepId, {PostProps} = RepDoc, UserCtx] = InitArgs) ->      SourceProps = couch_util:get_value(<<"source">>, PostProps),      TargetProps = couch_util:get_value(<<"target">>, PostProps), -    DocIds = couch_util:get_value(<<"doc_ids">>, PostProps, nil),      Continuous = couch_util:get_value(<<"continuous">>, PostProps, false),      CreateTarget = couch_util:get_value(<<"create_target">>, PostProps, false), @@ -143,40 +141,16 @@ do_init([RepId, {PostProps} = RepDoc, UserCtx] = InitArgs) ->      maybe_set_triggered(RepDoc, RepId), -    case DocIds of -    List when is_list(List) -> -        % Fast replication using only a list of doc IDs to replicate. -        % Replication sessions, checkpoints and logs are not created -        % since the update sequence number of the source DB is not used -        % for determining which documents are copied into the target DB. -        SourceLog = nil, -        TargetLog = nil, +    [SourceLog, TargetLog] = find_replication_logs( +        [Source, Target], RepId, {PostProps}, UserCtx), +    {StartSeq, History} = compare_replication_logs(SourceLog, TargetLog), -        StartSeq = nil, -        History = nil, - -        ChangesFeed = nil, -        MissingRevs = nil, - -        {ok, Reader} = -        couch_rep_reader:start_link(self(), Source, DocIds, PostProps); - -    _ -> -        % Replication using the _changes API (DB sequence update numbers). - -        [SourceLog, TargetLog] = find_replication_logs( -            [Source, Target], RepId, {PostProps}, UserCtx), -     -        {StartSeq, History} = compare_replication_logs(SourceLog, TargetLog), - -        {ok, ChangesFeed} = +    {ok, ChangesFeed} =          couch_rep_changes_feed:start_link(self(), Source, StartSeq, PostProps), -        {ok, MissingRevs} = +    {ok, MissingRevs} =          couch_rep_missing_revs:start_link(self(), Target, ChangesFeed, PostProps), -        {ok, Reader} = -        couch_rep_reader:start_link(self(), Source, MissingRevs, PostProps) -    end, - +    {ok, Reader} = +        couch_rep_reader:start_link(self(), Source, MissingRevs, PostProps),      {ok, Writer} =      couch_rep_writer:start_link(self(), Target, Reader, PostProps), @@ -213,7 +187,6 @@ do_init([RepId, {PostProps} = RepDoc, UserCtx] = InitArgs) ->          rep_starttime = httpd_util:rfc1123_date(),          src_starttime = couch_util:get_value(instance_start_time, SourceInfo),          tgt_starttime = couch_util:get_value(instance_start_time, TargetInfo), -        doc_ids = DocIds,          rep_doc = RepDoc      },      {ok, State}. @@ -390,25 +363,6 @@ dbinfo(Db) ->      {ok, Info} = couch_db:get_db_info(Db),      Info. -do_terminate(#state{doc_ids=DocIds} = State) when is_list(DocIds) -> -    #state{ -        listeners = Listeners, -        rep_starttime = ReplicationStartTime, -        stats = Stats -    } = State, - -    RepByDocsJson = {[ -        {<<"start_time">>, ?l2b(ReplicationStartTime)}, -        {<<"end_time">>, ?l2b(httpd_util:rfc1123_date())}, -        {<<"docs_read">>, ets:lookup_element(Stats, docs_read, 2)}, -        {<<"docs_written">>, ets:lookup_element(Stats, docs_written, 2)}, -        {<<"doc_write_failures">>, -            ets:lookup_element(Stats, doc_write_failures, 2)} -    ]}, - -    terminate_cleanup(State), -    [gen_server:reply(L, {ok, RepByDocsJson}) || L <- lists:reverse(Listeners)]; -  do_terminate(State) ->      #state{          checkpoint_history = CheckpointHistory, @@ -659,32 +613,53 @@ do_checkpoint(State) ->          rep_starttime = ReplicationStartTime,          src_starttime = SrcInstanceStartTime,          tgt_starttime = TgtInstanceStartTime, -        stats = Stats +        stats = Stats, +        rep_doc = {RepDoc}      } = State,      case commit_to_both(Source, Target, NewSeqNum) of      {SrcInstanceStartTime, TgtInstanceStartTime} ->          ?LOG_INFO("recording a checkpoint for ~s -> ~s at source update_seq ~p",              [dbname(Source), dbname(Target), NewSeqNum]), +        EndTime = ?l2b(httpd_util:rfc1123_date()), +        StartTime = ?l2b(ReplicationStartTime), +        DocsRead = ets:lookup_element(Stats, docs_read, 2), +        DocsWritten = ets:lookup_element(Stats, docs_written, 2), +        DocWriteFailures = ets:lookup_element(Stats, doc_write_failures, 2),          NewHistoryEntry = {[              {<<"session_id">>, SessionId}, -            {<<"start_time">>, list_to_binary(ReplicationStartTime)}, -            {<<"end_time">>, list_to_binary(httpd_util:rfc1123_date())}, +            {<<"start_time">>, StartTime}, +            {<<"end_time">>, EndTime},              {<<"start_last_seq">>, StartSeqNum},              {<<"end_last_seq">>, NewSeqNum},              {<<"recorded_seq">>, NewSeqNum},              {<<"missing_checked">>, ets:lookup_element(Stats, total_revs, 2)},              {<<"missing_found">>, ets:lookup_element(Stats, missing_revs, 2)}, -            {<<"docs_read">>, ets:lookup_element(Stats, docs_read, 2)}, -            {<<"docs_written">>, ets:lookup_element(Stats, docs_written, 2)}, -            {<<"doc_write_failures">>, -                ets:lookup_element(Stats, doc_write_failures, 2)} +            {<<"docs_read">>, DocsRead}, +            {<<"docs_written">>, DocsWritten}, +            {<<"doc_write_failures">>, DocWriteFailures}          ]}, -        % limit history to 50 entries -        NewRepHistory = {[ +        BaseHistory = [              {<<"session_id">>, SessionId}, -            {<<"source_last_seq">>, NewSeqNum}, -            {<<"history">>, lists:sublist([NewHistoryEntry | OldHistory], 50)} -        ]}, +            {<<"source_last_seq">>, NewSeqNum} +        ] ++ case couch_util:get_value(<<"doc_ids">>, RepDoc) of +        undefined -> +            []; +        DocIds when is_list(DocIds) -> +            % backwards compatibility with the result of a replication by +            % doc IDs in versions 0.11.x and 1.0.x +            [ +                {<<"start_time">>, StartTime}, +                {<<"end_time">>, EndTime}, +                {<<"docs_read">>, DocsRead}, +                {<<"docs_written">>, DocsWritten}, +                {<<"doc_write_failures">>, DocWriteFailures} +            ] +        end, +        % limit history to 50 entries +        NewRepHistory = { +            BaseHistory ++ +            [{<<"history">>, lists:sublist([NewHistoryEntry | OldHistory], 50)}] +        },          try          {SrcRevPos,SrcRevId} = diff --git a/src/couchdb/couch_rep_changes_feed.erl b/src/couchdb/couch_rep_changes_feed.erl index 98e929fe..246e82c0 100644 --- a/src/couchdb/couch_rep_changes_feed.erl +++ b/src/couchdb/couch_rep_changes_feed.erl @@ -18,6 +18,7 @@  -export([start_link/4, next/1, stop/1]).  -define(BUFFER_SIZE, 1000). +-define(DOC_IDS_FILTER_NAME, "_doc_ids").  -include("couch_db.hrl").  -include("../ibrowse/ibrowse.hrl"). @@ -36,6 +37,11 @@      rows = queue:new()  }). +-import(couch_util, [ +    get_value/2, +    get_value/3 +]). +  start_link(Parent, Source, StartSeq, PostProps) ->      gen_server:start_link(?MODULE, [Parent, Source, StartSeq, PostProps], []). @@ -46,9 +52,9 @@ stop(Server) ->      catch gen_server:call(Server, stop),      ok. -init([Parent, #http_db{}=Source, Since, PostProps]) -> +init([Parent, #http_db{headers = Headers0} = Source, Since, PostProps]) ->      process_flag(trap_exit, true), -    Feed = case couch_util:get_value(<<"continuous">>, PostProps, false) of +    Feed = case get_value(<<"continuous">>, PostProps, false) of      false ->          normal;      true -> @@ -60,33 +66,24 @@ init([Parent, #http_db{}=Source, Since, PostProps]) ->          {"since", Since},          {"feed", Feed}      ], -    QS = case couch_util:get_value(<<"filter">>, PostProps) of +    {QS, Method, Body, Headers} = case get_value(<<"doc_ids">>, PostProps) of      undefined -> -        BaseQS; -    FilterName -> -        {Params} = couch_util:get_value(<<"query_params">>, PostProps, {[]}), -        lists:foldr( -            fun({K, V}, QSAcc) -> -                Ks = couch_util:to_list(K), -                case proplists:is_defined(Ks, QSAcc) of -                true -> -                    QSAcc; -                false -> -                    [{Ks, V} | QSAcc] -                end -            end, -            [{"filter", FilterName} | BaseQS], -            Params -        ) +        {maybe_add_filter_qs_params(PostProps, BaseQS), get, nil, Headers0}; +    DocIds when is_list(DocIds) -> +        Headers1 = [{"Content-Type", "application/json"} | Headers0], +        QS1 = [{"filter", ?l2b(?DOC_IDS_FILTER_NAME)} | BaseQS], +        {QS1, post, {[{<<"doc_ids">>, DocIds}]}, Headers1}      end,      Pid = couch_rep_httpc:spawn_link_worker_process(Source),      Req = Source#http_db{ +        method = Method, +        body = Body,          resource = "_changes",          qs = QS,          conn = Pid,          options = [{stream_to, {self(), once}}] ++                  lists:keydelete(inactivity_timeout, 1, Source#http_db.options), -        headers = Source#http_db.headers -- [{"Accept-Encoding", "gzip"}] +        headers = Headers -- [{"Accept-Encoding", "gzip"}]      },      {ibrowse_req_id, ReqId} = couch_rep_httpc:request(Req),      Args = [Parent, Req, Since, PostProps], @@ -123,11 +120,17 @@ init([Parent, #http_db{}=Source, Since, PostProps]) ->  init([_Parent, Source, Since, PostProps] = InitArgs) ->      process_flag(trap_exit, true),      Server = self(), +    Filter = case get_value(<<"doc_ids">>, PostProps) of +    undefined -> +        ?b2l(get_value(<<"filter">>, PostProps, <<>>)); +    DocIds when is_list(DocIds) -> +        ?DOC_IDS_FILTER_NAME +    end,      ChangesArgs = #changes_args{          style = all_docs,          since = Since, -        filter = ?b2l(couch_util:get_value(<<"filter">>, PostProps, <<>>)), -        feed = case couch_util:get_value(<<"continuous">>, PostProps, false) of +        filter = Filter, +        feed = case get_value(<<"continuous">>, PostProps, false) of              true ->                  "continuous";              false -> @@ -138,7 +141,7 @@ init([_Parent, Source, Since, PostProps] = InitArgs) ->      ChangesPid = spawn_link(fun() ->          ChangesFeedFun = couch_changes:handle_changes(              ChangesArgs, -            {json_req, filter_json_req(Source, PostProps)}, +            {json_req, filter_json_req(Filter, Source, PostProps)},              Source          ),          ChangesFeedFun(fun({change, Change, _}, _) -> @@ -149,29 +152,49 @@ init([_Parent, Source, Since, PostProps] = InitArgs) ->      end),      {ok, #state{changes_loop=ChangesPid, init_args=InitArgs}}. -filter_json_req(Db, PostProps) -> -    case couch_util:get_value(<<"filter">>, PostProps) of +maybe_add_filter_qs_params(PostProps, BaseQS) -> +    case get_value(<<"filter">>, PostProps) of      undefined -> -        {[]}; +        BaseQS;      FilterName -> -        {Query} = couch_util:get_value(<<"query_params">>, PostProps, {[]}), -        {ok, Info} = couch_db:get_db_info(Db), -        % simulate a request to db_name/_changes -        {[ -            {<<"info">>, {Info}}, -            {<<"id">>, null}, -            {<<"method">>, 'GET'}, -            {<<"path">>, [couch_db:name(Db), <<"_changes">>]}, -            {<<"query">>, {[{<<"filter">>, FilterName} | Query]}}, -            {<<"headers">>, []}, -            {<<"body">>, []}, -            {<<"peer">>, <<"replicator">>}, -            {<<"form">>, []}, -            {<<"cookie">>, []}, -            {<<"userCtx">>, couch_util:json_user_ctx(Db)} -       ]} +        {Params} = get_value(<<"query_params">>, PostProps, {[]}), +        lists:foldr( +            fun({K, V}, QSAcc) -> +                Ks = couch_util:to_list(K), +                case proplists:is_defined(Ks, QSAcc) of +                true -> +                    QSAcc; +                false -> +                    [{Ks, V} | QSAcc] +                end +            end, +            [{"filter", FilterName} | BaseQS], +            Params +        )      end. +filter_json_req([], _Db, _PostProps) -> +    {[]}; +filter_json_req(?DOC_IDS_FILTER_NAME, _Db, PostProps) -> +    {[{<<"doc_ids">>, get_value(<<"doc_ids">>, PostProps)}]}; +filter_json_req(FilterName, Db, PostProps) -> +    {Query} = get_value(<<"query_params">>, PostProps, {[]}), +    {ok, Info} = couch_db:get_db_info(Db), +    % simulate a request to db_name/_changes +    {[ +        {<<"info">>, {Info}}, +        {<<"id">>, null}, +        {<<"method">>, 'GET'}, +        {<<"path">>, [couch_db:name(Db), <<"_changes">>]}, +        {<<"query">>, {[{<<"filter">>, FilterName} | Query]}}, +        {<<"headers">>, []}, +        {<<"body">>, []}, +        {<<"peer">>, <<"replicator">>}, +        {<<"form">>, []}, +        {<<"cookie">>, []}, +        {<<"userCtx">>, couch_util:json_user_ctx(Db)} +    ]}. +  handle_call({add_change, Row}, From, State) ->      handle_add_change(Row, From, State); diff --git a/src/couchdb/couch_rep_reader.erl b/src/couchdb/couch_rep_reader.erl index 4f81c8e4..bdef3dfc 100644 --- a/src/couchdb/couch_rep_reader.erl +++ b/src/couchdb/couch_rep_reader.erl @@ -43,15 +43,13 @@      opened_seqs = []  }). -start_link(Parent, Source, MissingRevs_or_DocIds, PostProps) -> -    gen_server:start_link( -        ?MODULE, [Parent, Source, MissingRevs_or_DocIds, PostProps], [] -    ). +start_link(Parent, Source, MissingRevs, PostProps) -> +    gen_server:start_link(?MODULE, [Parent, Source, MissingRevs, PostProps], []).  next(Pid) ->      gen_server:call(Pid, next_docs, infinity). -init([Parent, Source, MissingRevs_or_DocIds, _PostProps]) -> +init([Parent, Source, MissingRevs, _PostProps]) ->      process_flag(trap_exit, true),      if is_record(Source, http_db) ->          #url{host=Host, port=Port} = ibrowse_lib:parse_url(Source#http_db.url), @@ -59,15 +57,7 @@ init([Parent, Source, MissingRevs_or_DocIds, _PostProps]) ->          ibrowse:set_max_pipeline_size(Host, Port, ?MAX_PIPELINE_SIZE);      true -> ok end,      Self = self(), -    ReaderLoop = spawn_link( -        fun() -> reader_loop(Self, Source, MissingRevs_or_DocIds) end -    ), -    MissingRevs = case MissingRevs_or_DocIds of -    Pid when is_pid(Pid) -> -        Pid; -    _ListDocIds -> -        nil -    end, +    ReaderLoop = spawn_link(fun() -> reader_loop(Self, Source, MissingRevs) end),      State = #state{          parent = Parent,          source = Source, @@ -183,8 +173,6 @@ handle_reader_loop_complete(#state{monitor_count=0} = State) ->  handle_reader_loop_complete(State) ->      {noreply, State#state{complete = waiting_on_monitors}}. -calculate_new_high_seq(#state{missing_revs=nil}) -> -    nil;  calculate_new_high_seq(#state{requested_seqs=[], opened_seqs=[Open|_]}) ->      Open;  calculate_new_high_seq(#state{requested_seqs=[Req|_], opened_seqs=[Open|_]}) @@ -209,8 +197,6 @@ split_revlist(Rev, {[CurrentAcc|Rest], BaseLength, Length}) ->  % opened seqs greater than the smallest outstanding request.  I believe its the  % minimal set of info needed to correctly calculate which seqs have been  % replicated (because remote docs can be opened out-of-order) -- APK -update_sequence_lists(_Seq, #state{missing_revs=nil} = State) -> -    State;  update_sequence_lists(Seq, State) ->      Requested = lists:delete(Seq, State#state.requested_seqs),      AllOpened = lists:merge([Seq], State#state.opened_seqs), @@ -261,44 +247,6 @@ open_doc_revs(#http_db{url = Url} = DbS, DocId, Revs) ->      end,      lists:reverse(lists:foldl(Transform, [], JsonResults)). -open_doc(#http_db{url = Url} = DbS, DocId) -> -    % get latest rev of the doc -    Req = DbS#http_db{ -        resource=encode_doc_id(DocId), -        qs=[{att_encoding_info, true}] -    }, -    {Props} = Json = couch_rep_httpc:request(Req), -    case couch_util:get_value(<<"_id">>, Props) of -    Id when is_binary(Id) -> -        #doc{id=Id, revs=Rev, atts=Atts} = Doc = couch_doc:from_json_obj(Json), -        [Doc#doc{ -            atts=[couch_rep_att:convert_stub(A, {DbS,Id,Rev}) || A <- Atts] -        }]; -    undefined -> -        Err = couch_util:get_value(<<"error">>, Props, ?JSON_ENCODE(Json)), -        ?LOG_ERROR("Replicator: error accessing doc ~s at ~s, reason: ~s", -            [DocId, couch_util:url_strip_password(Url), Err]), -        [] -    end. - -reader_loop(ReaderServer, Source, DocIds) when is_list(DocIds) -> -    case Source of -    #http_db{} -> -        [gen_server:call(ReaderServer, {open_remote_doc, Id, nil, nil}, -            infinity) || Id <- DocIds]; -    _LocalDb -> -        Docs = lists:foldr(fun(Id, Acc) -> -            case couch_db:open_doc(Source, Id) of -            {ok, Doc} -> -                [Doc | Acc]; -            _ -> -                Acc -            end -        end, [], DocIds), -        gen_server:call(ReaderServer, {add_docs, nil, Docs}, infinity) -    end, -    exit(complete); -      reader_loop(ReaderServer, Source, MissingRevsServer) ->      case couch_rep_missing_revs:next(MissingRevsServer) of      complete -> @@ -332,8 +280,6 @@ maybe_reopen_db(#db{update_seq=OldSeq} = Db, HighSeq) when HighSeq > OldSeq ->  maybe_reopen_db(Db, _HighSeq) ->      Db. -spawn_document_request(Source, Id, nil, nil) -> -    spawn_document_request(Source, Id);  spawn_document_request(Source, Id, Seq, Revs) ->      Server = self(),      SpawnFun = fun() -> @@ -341,11 +287,3 @@ spawn_document_request(Source, Id, Seq, Revs) ->          gen_server:call(Server, {add_docs, Seq, Results}, infinity)      end,      spawn_monitor(SpawnFun). - -spawn_document_request(Source, Id) -> -    Server = self(), -    SpawnFun = fun() -> -        Results = open_doc(Source, Id), -        gen_server:call(Server, {add_docs, nil, Results}, infinity) -    end, -    spawn_monitor(SpawnFun). diff --git a/src/couchdb/couch_rep_writer.erl b/src/couchdb/couch_rep_writer.erl index f7bc9a72..622bfb27 100644 --- a/src/couchdb/couch_rep_writer.erl +++ b/src/couchdb/couch_rep_writer.erl @@ -21,8 +21,6 @@ start_link(Parent, Target, Reader, _PostProps) ->  writer_loop(Parent, Reader, Target) ->      case couch_rep_reader:next(Reader) of -    {complete, nil} -> -        ok;      {complete, FinalSeq} ->          Parent ! {writer_checkpoint, FinalSeq},          ok; @@ -40,12 +38,7 @@ writer_loop(Parent, Reader, Target) ->              ?LOG_DEBUG("writer failed to write an attachment ~p", [Err]),              exit({attachment_request_failed, Err, Docs})          end, -        case HighSeq of -        nil -> -            ok; -        _SeqNumber -> -            Parent ! {writer_checkpoint, HighSeq} -        end, +        Parent ! {writer_checkpoint, HighSeq},          couch_rep_att:cleanup(),          couch_util:should_flush(),          writer_loop(Parent, Reader, Target)  | 
