summaryrefslogtreecommitdiff
path: root/src/couchdb/couch_rep_changes_feed.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/couchdb/couch_rep_changes_feed.erl')
-rw-r--r--src/couchdb/couch_rep_changes_feed.erl136
1 files changed, 72 insertions, 64 deletions
diff --git a/src/couchdb/couch_rep_changes_feed.erl b/src/couchdb/couch_rep_changes_feed.erl
index cdfed6a0..67cfabe4 100644
--- a/src/couchdb/couch_rep_changes_feed.erl
+++ b/src/couchdb/couch_rep_changes_feed.erl
@@ -53,11 +53,35 @@ init([_Parent, #http_db{}=Source, Since, PostProps] = Args) ->
true ->
continuous
end,
+ BaseQS = [
+ {"style", all_docs},
+ {"heartbeat", 10000},
+ {"since", Since},
+ {"feed", Feed}
+ ],
+ QS = case proplists:get_value(<<"filter">>, PostProps) of
+ undefined ->
+ BaseQS;
+ FilterName ->
+ {Params} = proplists:get_value(<<"query_params">>, PostProps, {[]}),
+ lists:foldr(
+ fun({K, V}, QSAcc) ->
+ Ks = couch_util:to_list(K),
+ case proplists:is_defined(Ks, QSAcc) of
+ true ->
+ QSAcc;
+ false ->
+ [{Ks, V} | QSAcc]
+ end
+ end,
+ [{"filter", FilterName} | BaseQS],
+ Params
+ )
+ end,
Pid = couch_rep_httpc:spawn_link_worker_process(Source),
Req = Source#http_db{
resource = "_changes",
- qs = [{style, all_docs}, {heartbeat, 10000}, {since, Since},
- {feed, Feed}],
+ qs = QS,
conn = Pid,
options = [{stream_to, {self(), once}}, {response_format, binary},
{inactivity_timeout, 31000}], % miss 3 heartbeats, assume death
@@ -94,20 +118,54 @@ init([_Parent, #http_db{}=Source, Since, PostProps] = Args) ->
init([_Parent, Source, Since, PostProps] = InitArgs) ->
process_flag(trap_exit, true),
Server = self(),
- ChangesPid =
- case proplists:get_value(<<"continuous">>, PostProps, false) of
- false ->
- spawn_link(fun() -> send_local_changes_once(Server, Source, Since) end);
- true ->
- spawn_link(fun() ->
- Self = self(),
- {ok, _} = couch_db_update_notifier:start_link(fun(Msg) ->
- local_update_notification(Self, Source#db.name, Msg) end),
- send_local_changes_forever(Server, Source, Since)
+ ChangesArgs = #changes_args{
+ style = all_docs,
+ since = Since,
+ filter = ?b2l(proplists:get_value(<<"filter">>, PostProps, <<>>)),
+ feed = case proplists:get_value(<<"continuous">>, PostProps, false) of
+ true ->
+ "continuous";
+ false ->
+ "normal"
+ end
+ },
+ ChangesPid = spawn_link(fun() ->
+ ChangesFeedFun = couch_changes:handle_changes(
+ ChangesArgs,
+ {json_req, filter_json_req(Source, PostProps)},
+ Source
+ ),
+ ChangesFeedFun(fun({change, Change, _}, _) ->
+ gen_server:call(Server, {add_change, Change}, infinity);
+ (_, _) ->
+ ok
end)
- end,
+ end),
{ok, #state{changes_loop=ChangesPid, init_args=InitArgs}}.
+filter_json_req(Db, PostProps) ->
+ case proplists:get_value(<<"filter">>, PostProps) of
+ undefined ->
+ {[]};
+ FilterName ->
+ {Query} = proplists:get_value(<<"query_params">>, PostProps, {[]}),
+ {ok, Info} = couch_db:get_db_info(Db),
+ % simulate a request to db_name/_changes
+ {[
+ {<<"info">>, {Info}},
+ {<<"id">>, null},
+ {<<"method">>, 'GET'},
+ {<<"path">>, [couch_db:name(Db), <<"_changes">>]},
+ {<<"query">>, {[{<<"filter">>, FilterName} | Query]}},
+ {<<"headers">>, []},
+ {<<"body">>, []},
+ {<<"peer">>, <<"replicator">>},
+ {<<"form">>, []},
+ {<<"cookie">>, []},
+ {<<"userCtx">>, couch_util:json_user_ctx(Db)}
+ ]}
+ end.
+
handle_call({add_change, Row}, From, State) ->
handle_add_change(Row, From, State);
@@ -302,23 +360,7 @@ by_seq_loop(Server, Source, StartSeq) ->
decode_row(<<",", Rest/binary>>) ->
decode_row(Rest);
decode_row(Row) ->
- {Props} = ?JSON_DECODE(Row),
- % [Seq, Id, {<<"changes">>,C}]
- Seq = proplists:get_value(<<"seq">>, Props),
- Id = proplists:get_value(<<"id">>, Props),
- C = proplists:get_value(<<"changes">>, Props),
- C2 = [{[{<<"rev">>,couch_doc:parse_rev(R)}]} || {[{<<"rev">>,R}]} <- C],
- {[{<<"seq">>, Seq}, {<<"id">>,Id}, {<<"changes">>,C2}]}.
-
-flush_updated_messages() ->
- receive updated -> flush_updated_messages()
- after 0 -> ok
- end.
-
-local_update_notification(Self, DbName, {updated, DbName}) ->
- Self ! updated;
-local_update_notification(_, _, _) ->
- ok.
+ ?JSON_DECODE(Row).
maybe_stream_next(#state{reqid=nil}) ->
ok;
@@ -327,35 +369,6 @@ maybe_stream_next(#state{complete=false, count=N} = S) when N < ?BUFFER_SIZE ->
maybe_stream_next(_) ->
ok.
-send_local_changes_forever(Server, Db, Since) ->
- #db{name = DbName, user_ctx = UserCtx} = Db,
- {ok, NewSeq} = send_local_changes_once(Server, Db, Since),
- couch_db:close(Db),
- ok = wait_db_updated(),
- {ok, NewDb} = couch_db:open(DbName, [{user_ctx, UserCtx}]),
- send_local_changes_forever(Server, NewDb, NewSeq).
-
-send_local_changes_once(Server, Db, Since) ->
- FilterFun =
- fun(#doc_info{revs=[#rev_info{rev=Rev}|_]}) ->
- {[{<<"rev">>, Rev}]}
- end,
-
- ChangesFun =
- fun([#doc_info{id=Id, high_seq=Seq}|_]=DocInfos, _) ->
- Results0 = [FilterFun(DocInfo) || DocInfo <- DocInfos],
- Results = [Result || Result <- Results0, Result /= null],
- if Results /= [] ->
- Change = {[{<<"seq">>,Seq}, {<<"id">>,Id}, {<<"changes">>,Results}]},
- gen_server:call(Server, {add_change, Change}, infinity);
- true ->
- ok
- end,
- {ok, Seq}
- end,
-
- couch_db:changes_since(Db, all_docs, Since, ChangesFun, Since).
-
start_http_request(RawUrl) ->
Url = ibrowse_lib:parse_url(RawUrl),
{ok, Pid} = ibrowse:spawn_link_worker_process(Url#url.host, Url#url.port),
@@ -367,8 +380,3 @@ start_http_request(RawUrl) ->
{ibrowse_req_id, Id} =
ibrowse:send_req_direct(Pid, RawUrl, [], get, [], Opts, infinity),
{Pid, Id}.
-
-wait_db_updated() ->
- receive updated ->
- flush_updated_messages()
- end.