diff options
author | Adam Kocoloski <adam@cloudant.com> | 2010-06-08 16:20:31 -0400 |
---|---|---|
committer | Adam Kocoloski <adam@cloudant.com> | 2010-06-08 16:20:31 -0400 |
commit | 44380d7ac0b135fa2d6107cf784a99a1efd393e6 (patch) | |
tree | d440c0e586eeb3cec3713e0d7fdc93f811935068 /src/fabric_view_changes.erl | |
parent | d7a6bd635ce60d0fc6a6a40dc3027cee308701f6 (diff) |
fabric handler for _changes, BugzID 10219
Diffstat (limited to 'src/fabric_view_changes.erl')
-rw-r--r-- | src/fabric_view_changes.erl | 242 |
1 files changed, 242 insertions, 0 deletions
diff --git a/src/fabric_view_changes.erl b/src/fabric_view_changes.erl new file mode 100644 index 00000000..666a85c6 --- /dev/null +++ b/src/fabric_view_changes.erl @@ -0,0 +1,242 @@ +-module(fabric_view_changes). + +-export([go/4, start_update_notifier/1]). + +-include("fabric.hrl"). + +go(DbName, Feed, Options, Callback) when Feed == "continuous" orelse + Feed == "longpoll" -> + Args = make_changes_args(Options), + {ok, Acc0} = Callback(start, Feed), + Notifiers = start_update_notifiers(DbName), + {Timeout, TimeoutFun} = couch_changes:get_changes_timeout(Args, Callback), + try + keep_sending_changes( + DbName, + Args, + Callback, + get_start_seq(DbName, Args), + Acc0, + Timeout, + TimeoutFun + ) + after + stop_update_notifiers(Notifiers), + couch_changes:get_rest_db_updated() + end; + +go(DbName, "normal", Options, Callback) -> + Args = make_changes_args(Options), + {ok, Acc0} = Callback(start, "normal"), + {ok, #collector{counters=Seqs, user_acc=AccOut}} = send_changes( + DbName, + Args, + Callback, + get_start_seq(DbName, Args), + Acc0 + ), + Callback({stop, pack_seqs(Seqs)}, AccOut), + {ok, AccOut}. + +keep_sending_changes(DbName, Args, Callback, Seqs, AccIn, Timeout, TFun) -> + #changes_args{limit=Limit, feed=Feed} = Args, + {ok, Collector} = send_changes(DbName, Args, Callback, Seqs, AccIn), + #collector{limit=Limit2, counters=Seqs, user_acc=AccOut} = Collector, + LastSeq = pack_seqs(Seqs), + if Limit > Limit2, Feed == "longpoll" -> + Callback({stop, LastSeq}, AccOut); + true -> + case couch_changes:wait_db_updated(Timeout, TFun) of + updated -> + keep_sending_changes( + DbName, + Args#changes_args{limit=Limit2}, + Callback, + LastSeq, + AccIn, + Timeout, + TFun + ); + stop -> + Callback({stop, LastSeq}, AccOut) + end + end. + +send_changes(DbName, ChangesArgs, Callback, PackedSeqs, AccIn) -> + AllShards = partitions:all_parts(DbName), + Seqs = lists:flatmap(fun({#shard{name=Name, node=N} = Shard, Seq}) -> + case lists:member(Shard, AllShards) of + true -> + Ref = rexi:cast(N, {fabric_rpc, changes, [Name,ChangesArgs,Seq]}), + [{Shard#shard{ref = Ref}, Seq}]; + false -> + % Find some replacement shards to cover the missing range + % TODO It's possible in rare cases of shard merging to end up + % with overlapping shard ranges from this technique + lists:map(fun(#shard{name=Name2, node=N2} = NewShard) -> + Ref = rexi:cast(N2, {fabric_rpc, changes, [Name2,ChangesArgs,0]}), + {NewShard#shard{ref = Ref}, 0} + end, find_replacement_shards(Shard, AllShards)) + end + end, unpack_seqs(PackedSeqs, DbName)), + {Workers, _} = lists:unzip(Seqs), + State = #collector{ + query_args = ChangesArgs, + callback = Callback, + counters = fabric_dict:init(Workers, 0), + user_acc = AccIn, + limit = ChangesArgs#changes_args.limit, + rows = Seqs % store sequence positions instead + }, + try fabric_util:receive_loop(Workers, #shard.ref, fun handle_message/3, + State, infinity, 5000) + after + fabric_util:cleanup(Workers) + end. + +handle_message({rexi_DOWN, _, _, _}, nil, State) -> + % TODO see if progress can be made here, possibly by removing all shards + % from that node and checking is_progress_possible + {ok, State}; + +handle_message({rexi_EXIT, Reason}, Worker, State) -> + ?LOG_ERROR("~p rexi_EXIT ~p", [?MODULE, Reason]), + #collector{ + callback=Callback, + counters=Counters0, + rows = Seqs0, + user_acc=Acc + } = State, + Counters = fabric_dict:erase(Worker, Counters0), + Seqs = fabric_dict:erase(Worker, Seqs0), + case fabric_view:is_progress_possible(Counters) of + true -> + {ok, State#collector{counters = Counters, rows=Seqs}}; + false -> + Callback({error, dead_shards}, Acc), + {error, dead_shards} + end; + +handle_message(_, _, #collector{limit=0} = State) -> + {stop, State}; + +handle_message(#view_row{key=Seq} = Row0, {Worker, From}, St) -> + #collector{ + query_args = #changes_args{include_docs=IncludeDocs}, + callback = Callback, + counters = S0, + limit = Limit, + user_acc = AccIn + } = St, + case fabric_dict:lookup_element(Worker, S0) of + undefined -> + % this worker lost the race with other partition copies, terminate it + gen_server:reply(From, stop), + {ok, St}; + _ -> + S1 = fabric_dict:store(Worker, Seq, S0), + S2 = fabric_view:remove_overlapping_shards(Worker, S1), + Row = Row0#view_row{key = pack_seqs(S2)}, + {Go, Acc} = Callback(changes_row(Row, IncludeDocs), AccIn), + gen_server:reply(From, Go), + {Go, St#collector{counters=S2, limit=Limit-1, user_acc=Acc}} + end; + +handle_message({complete, EndSeq}, Worker, State) -> + #collector{ + counters = S0, + total_rows = Completed % override + } = State, + case fabric_dict:lookup_element(Worker, S0) of + undefined -> + {ok, State}; + _ -> + S1 = fabric_dict:store(Worker, EndSeq, S0), + % unlikely to have overlaps here, but possible w/ filters + S2 = fabric_view:remove_overlapping_shards(Worker, S1), + NewState = State#collector{counters=S2, total_rows=Completed+1}, + case fabric_dict:size(S2) =:= (Completed+1) of + true -> + {stop, NewState}; + false -> + {ok, NewState} + end + end. + +make_changes_args(Options) -> + Options. + +get_start_seq(_DbName, #changes_args{dir=fwd, since=Since}) -> + Since; +get_start_seq(DbName, #changes_args{dir=rev}) -> + Shards = partitions:all_parts(DbName), + Workers = fabric_util:submit_jobs(Shards, get_update_seq, []), + {ok, Since} = fabric_util:recv(Workers, #shard.ref, + fun collect_update_seqs/3, fabric_dict:init(Workers, -1)), + Since. + +collect_update_seqs(Seq, Shard, Counters) when is_integer(Seq) -> + case fabric_dict:lookup_element(Shard, Counters) of + undefined -> + % already heard from someone else in this range + {ok, Counters}; + -1 -> + C1 = fabric_dict:store(Shard, Seq, Counters), + C2 = fabric_view:remove_overlapping_shards(Shard, C1), + case fabric_dict:any(-1, C2) of + true -> + {ok, C2}; + false -> + {stop, pack_seqs(C2)} + end + end. + +pack_seqs(Workers) -> + SeqList = [{N,R,S} || {#shard{node=N, range=R}, S} <- Workers], + couch_util:encodeBase64Url(term_to_binary(SeqList, [compressed])). + +unpack_seqs(0, DbName) -> + fabric_dict:init(partitions:all_parts(DbName), 0); + +unpack_seqs("0", DbName) -> + fabric_dict:init(partitions:all_parts(DbName), 0); + +unpack_seqs(Packed, DbName) -> + % TODO relies on internal structure of fabric_dict as keylist + lists:map(fun({Node, [A,B], Seq}) -> + Name = partitions:shard_name(DbName, A), + {#shard{node=Node, range=[A,B], dbname=DbName, name=Name}, Seq} + end, binary_to_term(couch_util:decodeBase64Url(Packed))). + +start_update_notifiers(DbName) -> + lists:map(fun(#shard{node=Node, name=Name}) -> + {Node, rexi:cast(Node, {?MODULE, start_update_notifier, [Name]})} + end, partitions:all_parts(DbName)). + +% rexi endpoint +start_update_notifier(DbName) -> + {Caller, _} = get(rexi_from), + Fun = fun({_, X}) when X == DbName -> Caller ! db_updated; (_) -> ok end, + Id = {couch_db_update_notifier, make_ref()}, + ok = gen_event:add_sup_handler(couch_db_update, Id, Fun), + receive {gen_event_EXIT, Id, Reason} -> + rexi:reply({gen_event_EXIT, DbName, Reason}) + end. + +stop_update_notifiers(Notifiers) -> + [rexi:kill(Node, Ref) || {Node, Ref} <- Notifiers]. + +changes_row(#view_row{key=Seq, id=Id, value=Value, doc=deleted}, true) -> + {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {deleted, true}, {doc, null}]}}; +changes_row(#view_row{key=Seq, id=Id, value=Value, doc=deleted}, false) -> + {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {deleted, true}]}}; +changes_row(#view_row{key=Seq, id=Id, value=Value, doc={error,Reason}}, true) -> + {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {error,Reason}]}}; +changes_row(#view_row{key=Seq, id=Id, value=Value, doc=Doc}, true) -> + {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {doc,Doc}]}}; +changes_row(#view_row{key=Seq, id=Id, value=Value}, false) -> + {change, {[{seq,Seq}, {id,Id}, {changes,Value}]}}. + +find_replacement_shards(#shard{range=Range}, AllShards) -> + % TODO make this moar betta -- we might have split or merged the partition + [Shard || Shard <- AllShards, Shard#shard.range =:= Range]. |