diff options
Diffstat (limited to 'apps/fabric/src/fabric_view_changes.erl')
-rw-r--r-- | apps/fabric/src/fabric_view_changes.erl | 251 |
1 files changed, 251 insertions, 0 deletions
diff --git a/apps/fabric/src/fabric_view_changes.erl b/apps/fabric/src/fabric_view_changes.erl new file mode 100644 index 00000000..6030df1d --- /dev/null +++ b/apps/fabric/src/fabric_view_changes.erl @@ -0,0 +1,251 @@ +-module(fabric_view_changes). + +-export([go/5, start_update_notifier/1]). + +-include("fabric.hrl"). +-include_lib("mem3/include/mem3.hrl"). +-include_lib("couch/include/couch_db.hrl"). + +go(DbName, Feed, Options, Callback, Acc0) when Feed == "continuous" orelse + Feed == "longpoll" -> + Args = make_changes_args(Options), + {ok, Acc} = Callback(start, Acc0), + Notifiers = start_update_notifiers(DbName), + {Timeout, TimeoutFun} = couch_changes:get_changes_timeout(Args, Callback), + try + keep_sending_changes( + DbName, + Args, + Callback, + get_start_seq(DbName, Args), + Acc, + Timeout, + TimeoutFun + ) + after + stop_update_notifiers(Notifiers), + couch_changes:get_rest_db_updated() + end; + +go(DbName, "normal", Options, Callback, Acc0) -> + Args = make_changes_args(Options), + {ok, Acc} = Callback(start, Acc0), + {ok, #collector{counters=Seqs, user_acc=AccOut}} = send_changes( + DbName, + Args, + Callback, + get_start_seq(DbName, Args), + Acc + ), + Callback({stop, pack_seqs(Seqs)}, AccOut). + +keep_sending_changes(DbName, Args, Callback, Seqs, AccIn, Timeout, TFun) -> + #changes_args{limit=Limit, feed=Feed} = Args, + {ok, Collector} = send_changes(DbName, Args, Callback, Seqs, AccIn), + #collector{limit=Limit2, counters=NewSeqs, user_acc=AccOut} = Collector, + LastSeq = pack_seqs(NewSeqs), + if Limit > Limit2, Feed == "longpoll" -> + Callback({stop, LastSeq}, AccOut); + true -> + case couch_changes:wait_db_updated(Timeout, TFun) of + updated -> + keep_sending_changes( + DbName, + Args#changes_args{limit=Limit2}, + Callback, + LastSeq, + AccIn, + Timeout, + TFun + ); + stop -> + Callback({stop, LastSeq}, AccOut) + end + end. + +send_changes(DbName, ChangesArgs, Callback, PackedSeqs, AccIn) -> + AllShards = mem3:shards(DbName), + Seqs = lists:flatmap(fun({#shard{name=Name, node=N} = Shard, Seq}) -> + case lists:member(Shard, AllShards) of + true -> + Ref = rexi:cast(N, {fabric_rpc, changes, [Name,ChangesArgs,Seq]}), + [{Shard#shard{ref = Ref}, Seq}]; + false -> + % Find some replacement shards to cover the missing range + % TODO It's possible in rare cases of shard merging to end up + % with overlapping shard ranges from this technique + lists:map(fun(#shard{name=Name2, node=N2} = NewShard) -> + Ref = rexi:cast(N2, {fabric_rpc, changes, [Name2,ChangesArgs,0]}), + {NewShard#shard{ref = Ref}, 0} + end, find_replacement_shards(Shard, AllShards)) + end + end, unpack_seqs(PackedSeqs, DbName)), + {Workers, _} = lists:unzip(Seqs), + State = #collector{ + query_args = ChangesArgs, + callback = Callback, + counters = fabric_dict:init(Workers, 0), + user_acc = AccIn, + limit = ChangesArgs#changes_args.limit, + rows = Seqs % store sequence positions instead + }, + try fabric_util:receive_loop(Workers, #shard.ref, fun handle_message/3, + State, infinity, 5000) + after + fabric_util:cleanup(Workers) + end. + +handle_message({rexi_DOWN, _, _, _}, nil, State) -> + % TODO see if progress can be made here, possibly by removing all shards + % from that node and checking is_progress_possible + {ok, State}; + +handle_message({rexi_EXIT, Reason}, Worker, State) -> + ?LOG_ERROR("~p rexi_EXIT ~p", [?MODULE, Reason]), + #collector{ + callback=Callback, + counters=Counters0, + rows = Seqs0, + user_acc=Acc + } = State, + Counters = fabric_dict:erase(Worker, Counters0), + Seqs = fabric_dict:erase(Worker, Seqs0), + case fabric_view:is_progress_possible(Counters) of + true -> + {ok, State#collector{counters = Counters, rows=Seqs}}; + false -> + Callback({error, dead_shards}, Acc), + {error, dead_shards} + end; + +handle_message(_, _, #collector{limit=0} = State) -> + {stop, State}; + +handle_message(#view_row{key=Seq} = Row0, {Worker, From}, St) -> + #collector{ + query_args = #changes_args{include_docs=IncludeDocs}, + callback = Callback, + counters = S0, + limit = Limit, + user_acc = AccIn + } = St, + case fabric_dict:lookup_element(Worker, S0) of + undefined -> + % this worker lost the race with other partition copies, terminate it + gen_server:reply(From, stop), + {ok, St}; + _ -> + S1 = fabric_dict:store(Worker, Seq, S0), + S2 = fabric_view:remove_overlapping_shards(Worker, S1), + Row = Row0#view_row{key = pack_seqs(S2)}, + {Go, Acc} = Callback(changes_row(Row, IncludeDocs), AccIn), + gen_server:reply(From, Go), + {Go, St#collector{counters=S2, limit=Limit-1, user_acc=Acc}} + end; + +handle_message({complete, EndSeq}, Worker, State) -> + #collector{ + counters = S0, + total_rows = Completed % override + } = State, + case fabric_dict:lookup_element(Worker, S0) of + undefined -> + {ok, State}; + _ -> + S1 = fabric_dict:store(Worker, EndSeq, S0), + % unlikely to have overlaps here, but possible w/ filters + S2 = fabric_view:remove_overlapping_shards(Worker, S1), + NewState = State#collector{counters=S2, total_rows=Completed+1}, + case fabric_dict:size(S2) =:= (Completed+1) of + true -> + {stop, NewState}; + false -> + {ok, NewState} + end + end. + +make_changes_args(#changes_args{style=main_only, filter=undefined}=Args) -> + Args#changes_args{filter = fun couch_changes:main_only_filter/1}; +make_changes_args(#changes_args{style=all_docs, filter=undefined}=Args) -> + Args#changes_args{filter = fun couch_changes:all_docs_filter/1}; +make_changes_args(Args) -> + Args. + +get_start_seq(_DbName, #changes_args{dir=fwd, since=Since}) -> + Since; +get_start_seq(DbName, #changes_args{dir=rev}) -> + Shards = mem3:shards(DbName), + Workers = fabric_util:submit_jobs(Shards, get_update_seq, []), + {ok, Since} = fabric_util:recv(Workers, #shard.ref, + fun collect_update_seqs/3, fabric_dict:init(Workers, -1)), + Since. + +collect_update_seqs(Seq, Shard, Counters) when is_integer(Seq) -> + case fabric_dict:lookup_element(Shard, Counters) of + undefined -> + % already heard from someone else in this range + {ok, Counters}; + -1 -> + C1 = fabric_dict:store(Shard, Seq, Counters), + C2 = fabric_view:remove_overlapping_shards(Shard, C1), + case fabric_dict:any(-1, C2) of + true -> + {ok, C2}; + false -> + {stop, pack_seqs(C2)} + end + end. + +pack_seqs(Workers) -> + SeqList = [{N,R,S} || {#shard{node=N, range=R}, S} <- Workers], + SeqSum = lists:sum(element(2, lists:unzip(Workers))), + Opaque = couch_util:encodeBase64Url(term_to_binary(SeqList, [compressed])), + list_to_binary([integer_to_list(SeqSum), $-, Opaque]). + +unpack_seqs(0, DbName) -> + fabric_dict:init(mem3:shards(DbName), 0); + +unpack_seqs("0", DbName) -> + fabric_dict:init(mem3:shards(DbName), 0); + +unpack_seqs(Packed, DbName) -> + {match, [Opaque]} = re:run(Packed, "^([0-9]+-)?(?<opaque>.*)", [{capture, + [opaque], binary}]), + % TODO relies on internal structure of fabric_dict as keylist + lists:map(fun({Node, [A,B], Seq}) -> + Shard = #shard{node=Node, range=[A,B], dbname=DbName}, + {mem3_util:name_shard(Shard), Seq} + end, binary_to_term(couch_util:decodeBase64Url(Opaque))). + +start_update_notifiers(DbName) -> + lists:map(fun(#shard{node=Node, name=Name}) -> + {Node, rexi:cast(Node, {?MODULE, start_update_notifier, [Name]})} + end, mem3:shards(DbName)). + +% rexi endpoint +start_update_notifier(DbName) -> + {Caller, _} = get(rexi_from), + Fun = fun({_, X}) when X == DbName -> Caller ! db_updated; (_) -> ok end, + Id = {couch_db_update_notifier, make_ref()}, + ok = gen_event:add_sup_handler(couch_db_update, Id, Fun), + receive {gen_event_EXIT, Id, Reason} -> + rexi:reply({gen_event_EXIT, DbName, Reason}) + end. + +stop_update_notifiers(Notifiers) -> + [rexi:kill(Node, Ref) || {Node, Ref} <- Notifiers]. + +changes_row(#view_row{key=Seq, id=Id, value=Value, doc=deleted}, true) -> + {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {deleted, true}, {doc, null}]}}; +changes_row(#view_row{key=Seq, id=Id, value=Value, doc=deleted}, false) -> + {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {deleted, true}]}}; +changes_row(#view_row{key=Seq, id=Id, value=Value, doc={error,Reason}}, true) -> + {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {error,Reason}]}}; +changes_row(#view_row{key=Seq, id=Id, value=Value, doc=Doc}, true) -> + {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {doc,Doc}]}}; +changes_row(#view_row{key=Seq, id=Id, value=Value}, false) -> + {change, {[{seq,Seq}, {id,Id}, {changes,Value}]}}. + +find_replacement_shards(#shard{range=Range}, AllShards) -> + % TODO make this moar betta -- we might have split or merged the partition + [Shard || Shard <- AllShards, Shard#shard.range =:= Range]. |