summaryrefslogtreecommitdiff
path: root/src/couchdb/couch_rep_changes_feed.erl
diff options
context:
space:
mode:
authorFilipe David Borba Manana <fdmanana@apache.org>2010-11-16 20:12:05 +0000
committerFilipe David Borba Manana <fdmanana@apache.org>2010-11-16 20:12:05 +0000
commit5afece2131f4ef4000d768b671bddbbf714303d9 (patch)
tree921522f0085d0de787e7857a73769d76863087e4 /src/couchdb/couch_rep_changes_feed.erl
parent2bc2dc1ad85943214a21597723ab4b357fbe1766 (diff)
Replicator: use the new builtin _doc_ids filter for the by doc IDs replication.
This reduces code complexity and allows for continuous by doc IDs replication. git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@1035780 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src/couchdb/couch_rep_changes_feed.erl')
-rw-r--r--src/couchdb/couch_rep_changes_feed.erl107
1 files changed, 65 insertions, 42 deletions
diff --git a/src/couchdb/couch_rep_changes_feed.erl b/src/couchdb/couch_rep_changes_feed.erl
index 98e929fe..246e82c0 100644
--- a/src/couchdb/couch_rep_changes_feed.erl
+++ b/src/couchdb/couch_rep_changes_feed.erl
@@ -18,6 +18,7 @@
-export([start_link/4, next/1, stop/1]).
-define(BUFFER_SIZE, 1000).
+-define(DOC_IDS_FILTER_NAME, "_doc_ids").
-include("couch_db.hrl").
-include("../ibrowse/ibrowse.hrl").
@@ -36,6 +37,11 @@
rows = queue:new()
}).
+-import(couch_util, [
+ get_value/2,
+ get_value/3
+]).
+
start_link(Parent, Source, StartSeq, PostProps) ->
gen_server:start_link(?MODULE, [Parent, Source, StartSeq, PostProps], []).
@@ -46,9 +52,9 @@ stop(Server) ->
catch gen_server:call(Server, stop),
ok.
-init([Parent, #http_db{}=Source, Since, PostProps]) ->
+init([Parent, #http_db{headers = Headers0} = Source, Since, PostProps]) ->
process_flag(trap_exit, true),
- Feed = case couch_util:get_value(<<"continuous">>, PostProps, false) of
+ Feed = case get_value(<<"continuous">>, PostProps, false) of
false ->
normal;
true ->
@@ -60,33 +66,24 @@ init([Parent, #http_db{}=Source, Since, PostProps]) ->
{"since", Since},
{"feed", Feed}
],
- QS = case couch_util:get_value(<<"filter">>, PostProps) of
+ {QS, Method, Body, Headers} = case get_value(<<"doc_ids">>, PostProps) of
undefined ->
- BaseQS;
- FilterName ->
- {Params} = couch_util:get_value(<<"query_params">>, PostProps, {[]}),
- lists:foldr(
- fun({K, V}, QSAcc) ->
- Ks = couch_util:to_list(K),
- case proplists:is_defined(Ks, QSAcc) of
- true ->
- QSAcc;
- false ->
- [{Ks, V} | QSAcc]
- end
- end,
- [{"filter", FilterName} | BaseQS],
- Params
- )
+ {maybe_add_filter_qs_params(PostProps, BaseQS), get, nil, Headers0};
+ DocIds when is_list(DocIds) ->
+ Headers1 = [{"Content-Type", "application/json"} | Headers0],
+ QS1 = [{"filter", ?l2b(?DOC_IDS_FILTER_NAME)} | BaseQS],
+ {QS1, post, {[{<<"doc_ids">>, DocIds}]}, Headers1}
end,
Pid = couch_rep_httpc:spawn_link_worker_process(Source),
Req = Source#http_db{
+ method = Method,
+ body = Body,
resource = "_changes",
qs = QS,
conn = Pid,
options = [{stream_to, {self(), once}}] ++
lists:keydelete(inactivity_timeout, 1, Source#http_db.options),
- headers = Source#http_db.headers -- [{"Accept-Encoding", "gzip"}]
+ headers = Headers -- [{"Accept-Encoding", "gzip"}]
},
{ibrowse_req_id, ReqId} = couch_rep_httpc:request(Req),
Args = [Parent, Req, Since, PostProps],
@@ -123,11 +120,17 @@ init([Parent, #http_db{}=Source, Since, PostProps]) ->
init([_Parent, Source, Since, PostProps] = InitArgs) ->
process_flag(trap_exit, true),
Server = self(),
+ Filter = case get_value(<<"doc_ids">>, PostProps) of
+ undefined ->
+ ?b2l(get_value(<<"filter">>, PostProps, <<>>));
+ DocIds when is_list(DocIds) ->
+ ?DOC_IDS_FILTER_NAME
+ end,
ChangesArgs = #changes_args{
style = all_docs,
since = Since,
- filter = ?b2l(couch_util:get_value(<<"filter">>, PostProps, <<>>)),
- feed = case couch_util:get_value(<<"continuous">>, PostProps, false) of
+ filter = Filter,
+ feed = case get_value(<<"continuous">>, PostProps, false) of
true ->
"continuous";
false ->
@@ -138,7 +141,7 @@ init([_Parent, Source, Since, PostProps] = InitArgs) ->
ChangesPid = spawn_link(fun() ->
ChangesFeedFun = couch_changes:handle_changes(
ChangesArgs,
- {json_req, filter_json_req(Source, PostProps)},
+ {json_req, filter_json_req(Filter, Source, PostProps)},
Source
),
ChangesFeedFun(fun({change, Change, _}, _) ->
@@ -149,29 +152,49 @@ init([_Parent, Source, Since, PostProps] = InitArgs) ->
end),
{ok, #state{changes_loop=ChangesPid, init_args=InitArgs}}.
-filter_json_req(Db, PostProps) ->
- case couch_util:get_value(<<"filter">>, PostProps) of
+maybe_add_filter_qs_params(PostProps, BaseQS) ->
+ case get_value(<<"filter">>, PostProps) of
undefined ->
- {[]};
+ BaseQS;
FilterName ->
- {Query} = couch_util:get_value(<<"query_params">>, PostProps, {[]}),
- {ok, Info} = couch_db:get_db_info(Db),
- % simulate a request to db_name/_changes
- {[
- {<<"info">>, {Info}},
- {<<"id">>, null},
- {<<"method">>, 'GET'},
- {<<"path">>, [couch_db:name(Db), <<"_changes">>]},
- {<<"query">>, {[{<<"filter">>, FilterName} | Query]}},
- {<<"headers">>, []},
- {<<"body">>, []},
- {<<"peer">>, <<"replicator">>},
- {<<"form">>, []},
- {<<"cookie">>, []},
- {<<"userCtx">>, couch_util:json_user_ctx(Db)}
- ]}
+ {Params} = get_value(<<"query_params">>, PostProps, {[]}),
+ lists:foldr(
+ fun({K, V}, QSAcc) ->
+ Ks = couch_util:to_list(K),
+ case proplists:is_defined(Ks, QSAcc) of
+ true ->
+ QSAcc;
+ false ->
+ [{Ks, V} | QSAcc]
+ end
+ end,
+ [{"filter", FilterName} | BaseQS],
+ Params
+ )
end.
+filter_json_req([], _Db, _PostProps) ->
+ {[]};
+filter_json_req(?DOC_IDS_FILTER_NAME, _Db, PostProps) ->
+ {[{<<"doc_ids">>, get_value(<<"doc_ids">>, PostProps)}]};
+filter_json_req(FilterName, Db, PostProps) ->
+ {Query} = get_value(<<"query_params">>, PostProps, {[]}),
+ {ok, Info} = couch_db:get_db_info(Db),
+ % simulate a request to db_name/_changes
+ {[
+ {<<"info">>, {Info}},
+ {<<"id">>, null},
+ {<<"method">>, 'GET'},
+ {<<"path">>, [couch_db:name(Db), <<"_changes">>]},
+ {<<"query">>, {[{<<"filter">>, FilterName} | Query]}},
+ {<<"headers">>, []},
+ {<<"body">>, []},
+ {<<"peer">>, <<"replicator">>},
+ {<<"form">>, []},
+ {<<"cookie">>, []},
+ {<<"userCtx">>, couch_util:json_user_ctx(Db)}
+ ]}.
+
handle_call({add_change, Row}, From, State) ->
handle_add_change(Row, From, State);