% Copyright 2010 Cloudant % % Licensed under the Apache License, Version 2.0 (the "License"); you may not % use this file except in compliance with the License. You may obtain a copy of % the License at % % http://www.apache.org/licenses/LICENSE-2.0 % % Unless required by applicable law or agreed to in writing, software % distributed under the License is distributed on an "AS IS" BASIS, WITHOUT % WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the % License for the specific language governing permissions and limitations under % the License. -module(fabric_view_changes). -export([go/5, start_update_notifier/1]). -include("fabric.hrl"). -include_lib("mem3/include/mem3.hrl"). -include_lib("couch/include/couch_db.hrl"). go(DbName, Feed, Options, Callback, Acc0) when Feed == "continuous" orelse Feed == "longpoll" -> Args = make_changes_args(Options), {ok, Acc} = Callback(start, Acc0), Notifiers = start_update_notifiers(DbName), {Timeout, TimeoutFun} = couch_changes:get_changes_timeout(Args, Callback), try keep_sending_changes( DbName, Args, Callback, get_start_seq(DbName, Args), Acc, Timeout, TimeoutFun ) after stop_update_notifiers(Notifiers), couch_changes:get_rest_db_updated() end; go(DbName, "normal", Options, Callback, Acc0) -> Args = make_changes_args(Options), {ok, Acc} = Callback(start, Acc0), {ok, #collector{counters=Seqs, user_acc=AccOut}} = send_changes( DbName, Args, Callback, get_start_seq(DbName, Args), Acc ), Callback({stop, pack_seqs(Seqs)}, AccOut). keep_sending_changes(DbName, Args, Callback, Seqs, AccIn, Timeout, TFun) -> #changes_args{limit=Limit, feed=Feed} = Args, {ok, Collector} = send_changes(DbName, Args, Callback, Seqs, AccIn), #collector{limit=Limit2, counters=NewSeqs, user_acc=AccOut} = Collector, LastSeq = pack_seqs(NewSeqs), if Limit > Limit2, Feed == "longpoll" -> Callback({stop, LastSeq}, AccOut); true -> case couch_changes:wait_db_updated(Timeout, TFun) of updated -> keep_sending_changes( DbName, Args#changes_args{limit=Limit2}, Callback, LastSeq, AccIn, Timeout, TFun ); stop -> Callback({stop, LastSeq}, AccOut) end end. send_changes(DbName, ChangesArgs, Callback, PackedSeqs, AccIn) -> AllShards = mem3:shards(DbName), Seqs = lists:flatmap(fun({#shard{name=Name, node=N} = Shard, Seq}) -> case lists:member(Shard, AllShards) of true -> Ref = rexi:cast(N, {fabric_rpc, changes, [Name,ChangesArgs,Seq]}), [{Shard#shard{ref = Ref}, Seq}]; false -> % Find some replacement shards to cover the missing range % TODO It's possible in rare cases of shard merging to end up % with overlapping shard ranges from this technique lists:map(fun(#shard{name=Name2, node=N2} = NewShard) -> Ref = rexi:cast(N2, {fabric_rpc, changes, [Name2,ChangesArgs,0]}), {NewShard#shard{ref = Ref}, 0} end, find_replacement_shards(Shard, AllShards)) end end, unpack_seqs(PackedSeqs, DbName)), {Workers, _} = lists:unzip(Seqs), State = #collector{ query_args = ChangesArgs, callback = Callback, counters = fabric_dict:init(Workers, 0), user_acc = AccIn, limit = ChangesArgs#changes_args.limit, rows = Seqs % store sequence positions instead }, try fabric_util:receive_loop(Workers, #shard.ref, fun handle_message/3, State, infinity, 5000) after fabric_util:cleanup(Workers) end. handle_message({rexi_DOWN, _, _, _}, nil, State) -> % TODO see if progress can be made here, possibly by removing all shards % from that node and checking is_progress_possible {ok, State}; handle_message({rexi_EXIT, Reason}, Worker, State) -> ?LOG_ERROR("~p rexi_EXIT ~p", [?MODULE, Reason]), #collector{ callback=Callback, counters=Counters0, rows = Seqs0, user_acc=Acc } = State, Counters = fabric_dict:erase(Worker, Counters0), Seqs = fabric_dict:erase(Worker, Seqs0), case fabric_view:is_progress_possible(Counters) of true -> {ok, State#collector{counters = Counters, rows=Seqs}}; false -> Callback({error, dead_shards}, Acc), {error, dead_shards} end; handle_message(_, _, #collector{limit=0} = State) -> {stop, State}; handle_message(#view_row{key=Seq} = Row0, {Worker, From}, St) -> #collector{ query_args = #changes_args{include_docs=IncludeDocs}, callback = Callback, counters = S0, limit = Limit, user_acc = AccIn } = St, case fabric_dict:lookup_element(Worker, S0) of undefined -> % this worker lost the race with other partition copies, terminate it gen_server:reply(From, stop), {ok, St}; _ -> S1 = fabric_dict:store(Worker, Seq, S0), S2 = fabric_view:remove_overlapping_shards(Worker, S1), Row = Row0#view_row{key = pack_seqs(S2)}, {Go, Acc} = Callback(changes_row(Row, IncludeDocs), AccIn), gen_server:reply(From, Go), {Go, St#collector{counters=S2, limit=Limit-1, user_acc=Acc}} end; handle_message({complete, EndSeq}, Worker, State) -> #collector{ counters = S0, total_rows = Completed % override } = State, case fabric_dict:lookup_element(Worker, S0) of undefined -> {ok, State}; _ -> S1 = fabric_dict:store(Worker, EndSeq, S0), % unlikely to have overlaps here, but possible w/ filters S2 = fabric_view:remove_overlapping_shards(Worker, S1), NewState = State#collector{counters=S2, total_rows=Completed+1}, case fabric_dict:size(S2) =:= (Completed+1) of true -> {stop, NewState}; false -> {ok, NewState} end end. make_changes_args(#changes_args{style=main_only, filter=undefined}=Args) -> Args#changes_args{filter = fun couch_changes:main_only_filter/1}; make_changes_args(#changes_args{style=all_docs, filter=undefined}=Args) -> Args#changes_args{filter = fun couch_changes:all_docs_filter/1}; make_changes_args(Args) -> Args. get_start_seq(_DbName, #changes_args{dir=fwd, since=Since}) -> Since; get_start_seq(DbName, #changes_args{dir=rev}) -> Shards = mem3:shards(DbName), Workers = fabric_util:submit_jobs(Shards, get_update_seq, []), {ok, Since} = fabric_util:recv(Workers, #shard.ref, fun collect_update_seqs/3, fabric_dict:init(Workers, -1)), Since. collect_update_seqs(Seq, Shard, Counters) when is_integer(Seq) -> case fabric_dict:lookup_element(Shard, Counters) of undefined -> % already heard from someone else in this range {ok, Counters}; -1 -> C1 = fabric_dict:store(Shard, Seq, Counters), C2 = fabric_view:remove_overlapping_shards(Shard, C1), case fabric_dict:any(-1, C2) of true -> {ok, C2}; false -> {stop, pack_seqs(C2)} end end. pack_seqs(Workers) -> SeqList = [{N,R,S} || {#shard{node=N, range=R}, S} <- Workers], SeqSum = lists:sum(element(2, lists:unzip(Workers))), Opaque = couch_util:encodeBase64Url(term_to_binary(SeqList, [compressed])), list_to_binary([integer_to_list(SeqSum), $-, Opaque]). unpack_seqs(0, DbName) -> fabric_dict:init(mem3:shards(DbName), 0); unpack_seqs("0", DbName) -> fabric_dict:init(mem3:shards(DbName), 0); unpack_seqs(Packed, DbName) -> {match, [Opaque]} = re:run(Packed, "^([0-9]+-)?(?.*)", [{capture, [opaque], binary}]), % TODO relies on internal structure of fabric_dict as keylist lists:map(fun({Node, [A,B], Seq}) -> Shard = #shard{node=Node, range=[A,B], dbname=DbName}, {mem3_util:name_shard(Shard), Seq} end, binary_to_term(couch_util:decodeBase64Url(Opaque))). start_update_notifiers(DbName) -> lists:map(fun(#shard{node=Node, name=Name}) -> {Node, rexi:cast(Node, {?MODULE, start_update_notifier, [Name]})} end, mem3:shards(DbName)). % rexi endpoint start_update_notifier(DbName) -> {Caller, _} = get(rexi_from), Fun = fun({_, X}) when X == DbName -> Caller ! db_updated; (_) -> ok end, Id = {couch_db_update_notifier, make_ref()}, ok = gen_event:add_sup_handler(couch_db_update, Id, Fun), receive {gen_event_EXIT, Id, Reason} -> rexi:reply({gen_event_EXIT, DbName, Reason}) end. stop_update_notifiers(Notifiers) -> [rexi:kill(Node, Ref) || {Node, Ref} <- Notifiers]. changes_row(#view_row{key=Seq, id=Id, value=Value, doc=deleted}, true) -> {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {deleted, true}, {doc, null}]}}; changes_row(#view_row{key=Seq, id=Id, value=Value, doc=deleted}, false) -> {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {deleted, true}]}}; changes_row(#view_row{key=Seq, id=Id, value=Value, doc={error,Reason}}, true) -> {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {error,Reason}]}}; changes_row(#view_row{key=Seq, id=Id, value=Value, doc=Doc}, true) -> {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {doc,Doc}]}}; changes_row(#view_row{key=Seq, id=Id, value=Value}, false) -> {change, {[{seq,Seq}, {id,Id}, {changes,Value}]}}. find_replacement_shards(#shard{range=Range}, AllShards) -> % TODO make this moar betta -- we might have split or merged the partition [Shard || Shard <- AllShards, Shard#shard.range =:= Range].