diff options
Diffstat (limited to 'src/couchdb/couch_rep_changes_feed.erl')
-rw-r--r-- | src/couchdb/couch_rep_changes_feed.erl | 136 |
1 files changed, 72 insertions, 64 deletions
diff --git a/src/couchdb/couch_rep_changes_feed.erl b/src/couchdb/couch_rep_changes_feed.erl index cdfed6a0..67cfabe4 100644 --- a/src/couchdb/couch_rep_changes_feed.erl +++ b/src/couchdb/couch_rep_changes_feed.erl @@ -53,11 +53,35 @@ init([_Parent, #http_db{}=Source, Since, PostProps] = Args) -> true -> continuous end, + BaseQS = [ + {"style", all_docs}, + {"heartbeat", 10000}, + {"since", Since}, + {"feed", Feed} + ], + QS = case proplists:get_value(<<"filter">>, PostProps) of + undefined -> + BaseQS; + FilterName -> + {Params} = proplists:get_value(<<"query_params">>, PostProps, {[]}), + lists:foldr( + fun({K, V}, QSAcc) -> + Ks = couch_util:to_list(K), + case proplists:is_defined(Ks, QSAcc) of + true -> + QSAcc; + false -> + [{Ks, V} | QSAcc] + end + end, + [{"filter", FilterName} | BaseQS], + Params + ) + end, Pid = couch_rep_httpc:spawn_link_worker_process(Source), Req = Source#http_db{ resource = "_changes", - qs = [{style, all_docs}, {heartbeat, 10000}, {since, Since}, - {feed, Feed}], + qs = QS, conn = Pid, options = [{stream_to, {self(), once}}, {response_format, binary}, {inactivity_timeout, 31000}], % miss 3 heartbeats, assume death @@ -94,20 +118,54 @@ init([_Parent, #http_db{}=Source, Since, PostProps] = Args) -> init([_Parent, Source, Since, PostProps] = InitArgs) -> process_flag(trap_exit, true), Server = self(), - ChangesPid = - case proplists:get_value(<<"continuous">>, PostProps, false) of - false -> - spawn_link(fun() -> send_local_changes_once(Server, Source, Since) end); - true -> - spawn_link(fun() -> - Self = self(), - {ok, _} = couch_db_update_notifier:start_link(fun(Msg) -> - local_update_notification(Self, Source#db.name, Msg) end), - send_local_changes_forever(Server, Source, Since) + ChangesArgs = #changes_args{ + style = all_docs, + since = Since, + filter = ?b2l(proplists:get_value(<<"filter">>, PostProps, <<>>)), + feed = case proplists:get_value(<<"continuous">>, PostProps, false) of + true -> + "continuous"; + false -> + "normal" + end + }, + ChangesPid = spawn_link(fun() -> + ChangesFeedFun = couch_changes:handle_changes( + ChangesArgs, + {json_req, filter_json_req(Source, PostProps)}, + Source + ), + ChangesFeedFun(fun({change, Change, _}, _) -> + gen_server:call(Server, {add_change, Change}, infinity); + (_, _) -> + ok end) - end, + end), {ok, #state{changes_loop=ChangesPid, init_args=InitArgs}}. +filter_json_req(Db, PostProps) -> + case proplists:get_value(<<"filter">>, PostProps) of + undefined -> + {[]}; + FilterName -> + {Query} = proplists:get_value(<<"query_params">>, PostProps, {[]}), + {ok, Info} = couch_db:get_db_info(Db), + % simulate a request to db_name/_changes + {[ + {<<"info">>, {Info}}, + {<<"id">>, null}, + {<<"method">>, 'GET'}, + {<<"path">>, [couch_db:name(Db), <<"_changes">>]}, + {<<"query">>, {[{<<"filter">>, FilterName} | Query]}}, + {<<"headers">>, []}, + {<<"body">>, []}, + {<<"peer">>, <<"replicator">>}, + {<<"form">>, []}, + {<<"cookie">>, []}, + {<<"userCtx">>, couch_util:json_user_ctx(Db)} + ]} + end. + handle_call({add_change, Row}, From, State) -> handle_add_change(Row, From, State); @@ -302,23 +360,7 @@ by_seq_loop(Server, Source, StartSeq) -> decode_row(<<",", Rest/binary>>) -> decode_row(Rest); decode_row(Row) -> - {Props} = ?JSON_DECODE(Row), - % [Seq, Id, {<<"changes">>,C}] - Seq = proplists:get_value(<<"seq">>, Props), - Id = proplists:get_value(<<"id">>, Props), - C = proplists:get_value(<<"changes">>, Props), - C2 = [{[{<<"rev">>,couch_doc:parse_rev(R)}]} || {[{<<"rev">>,R}]} <- C], - {[{<<"seq">>, Seq}, {<<"id">>,Id}, {<<"changes">>,C2}]}. - -flush_updated_messages() -> - receive updated -> flush_updated_messages() - after 0 -> ok - end. - -local_update_notification(Self, DbName, {updated, DbName}) -> - Self ! updated; -local_update_notification(_, _, _) -> - ok. + ?JSON_DECODE(Row). maybe_stream_next(#state{reqid=nil}) -> ok; @@ -327,35 +369,6 @@ maybe_stream_next(#state{complete=false, count=N} = S) when N < ?BUFFER_SIZE -> maybe_stream_next(_) -> ok. -send_local_changes_forever(Server, Db, Since) -> - #db{name = DbName, user_ctx = UserCtx} = Db, - {ok, NewSeq} = send_local_changes_once(Server, Db, Since), - couch_db:close(Db), - ok = wait_db_updated(), - {ok, NewDb} = couch_db:open(DbName, [{user_ctx, UserCtx}]), - send_local_changes_forever(Server, NewDb, NewSeq). - -send_local_changes_once(Server, Db, Since) -> - FilterFun = - fun(#doc_info{revs=[#rev_info{rev=Rev}|_]}) -> - {[{<<"rev">>, Rev}]} - end, - - ChangesFun = - fun([#doc_info{id=Id, high_seq=Seq}|_]=DocInfos, _) -> - Results0 = [FilterFun(DocInfo) || DocInfo <- DocInfos], - Results = [Result || Result <- Results0, Result /= null], - if Results /= [] -> - Change = {[{<<"seq">>,Seq}, {<<"id">>,Id}, {<<"changes">>,Results}]}, - gen_server:call(Server, {add_change, Change}, infinity); - true -> - ok - end, - {ok, Seq} - end, - - couch_db:changes_since(Db, all_docs, Since, ChangesFun, Since). - start_http_request(RawUrl) -> Url = ibrowse_lib:parse_url(RawUrl), {ok, Pid} = ibrowse:spawn_link_worker_process(Url#url.host, Url#url.port), @@ -367,8 +380,3 @@ start_http_request(RawUrl) -> {ibrowse_req_id, Id} = ibrowse:send_req_direct(Pid, RawUrl, [], get, [], Opts, infinity), {Pid, Id}. - -wait_db_updated() -> - receive updated -> - flush_updated_messages() - end. |