summaryrefslogtreecommitdiff
path: root/src/fabric_view_changes.erl
diff options
context:
space:
mode:
authorAdam Kocoloski <adam@cloudant.com>2010-06-08 16:20:31 -0400
committerAdam Kocoloski <adam@cloudant.com>2010-06-08 16:20:31 -0400
commit44380d7ac0b135fa2d6107cf784a99a1efd393e6 (patch)
treed440c0e586eeb3cec3713e0d7fdc93f811935068 /src/fabric_view_changes.erl
parentd7a6bd635ce60d0fc6a6a40dc3027cee308701f6 (diff)
fabric handler for _changes, BugzID 10219
Diffstat (limited to 'src/fabric_view_changes.erl')
-rw-r--r--src/fabric_view_changes.erl242
1 files changed, 242 insertions, 0 deletions
diff --git a/src/fabric_view_changes.erl b/src/fabric_view_changes.erl
new file mode 100644
index 00000000..666a85c6
--- /dev/null
+++ b/src/fabric_view_changes.erl
@@ -0,0 +1,242 @@
+-module(fabric_view_changes).
+
+-export([go/4, start_update_notifier/1]).
+
+-include("fabric.hrl").
+
+go(DbName, Feed, Options, Callback) when Feed == "continuous" orelse
+ Feed == "longpoll" ->
+ Args = make_changes_args(Options),
+ {ok, Acc0} = Callback(start, Feed),
+ Notifiers = start_update_notifiers(DbName),
+ {Timeout, TimeoutFun} = couch_changes:get_changes_timeout(Args, Callback),
+ try
+ keep_sending_changes(
+ DbName,
+ Args,
+ Callback,
+ get_start_seq(DbName, Args),
+ Acc0,
+ Timeout,
+ TimeoutFun
+ )
+ after
+ stop_update_notifiers(Notifiers),
+ couch_changes:get_rest_db_updated()
+ end;
+
+go(DbName, "normal", Options, Callback) ->
+ Args = make_changes_args(Options),
+ {ok, Acc0} = Callback(start, "normal"),
+ {ok, #collector{counters=Seqs, user_acc=AccOut}} = send_changes(
+ DbName,
+ Args,
+ Callback,
+ get_start_seq(DbName, Args),
+ Acc0
+ ),
+ Callback({stop, pack_seqs(Seqs)}, AccOut),
+ {ok, AccOut}.
+
+keep_sending_changes(DbName, Args, Callback, Seqs, AccIn, Timeout, TFun) ->
+ #changes_args{limit=Limit, feed=Feed} = Args,
+ {ok, Collector} = send_changes(DbName, Args, Callback, Seqs, AccIn),
+ #collector{limit=Limit2, counters=Seqs, user_acc=AccOut} = Collector,
+ LastSeq = pack_seqs(Seqs),
+ if Limit > Limit2, Feed == "longpoll" ->
+ Callback({stop, LastSeq}, AccOut);
+ true ->
+ case couch_changes:wait_db_updated(Timeout, TFun) of
+ updated ->
+ keep_sending_changes(
+ DbName,
+ Args#changes_args{limit=Limit2},
+ Callback,
+ LastSeq,
+ AccIn,
+ Timeout,
+ TFun
+ );
+ stop ->
+ Callback({stop, LastSeq}, AccOut)
+ end
+ end.
+
+send_changes(DbName, ChangesArgs, Callback, PackedSeqs, AccIn) ->
+ AllShards = partitions:all_parts(DbName),
+ Seqs = lists:flatmap(fun({#shard{name=Name, node=N} = Shard, Seq}) ->
+ case lists:member(Shard, AllShards) of
+ true ->
+ Ref = rexi:cast(N, {fabric_rpc, changes, [Name,ChangesArgs,Seq]}),
+ [{Shard#shard{ref = Ref}, Seq}];
+ false ->
+ % Find some replacement shards to cover the missing range
+ % TODO It's possible in rare cases of shard merging to end up
+ % with overlapping shard ranges from this technique
+ lists:map(fun(#shard{name=Name2, node=N2} = NewShard) ->
+ Ref = rexi:cast(N2, {fabric_rpc, changes, [Name2,ChangesArgs,0]}),
+ {NewShard#shard{ref = Ref}, 0}
+ end, find_replacement_shards(Shard, AllShards))
+ end
+ end, unpack_seqs(PackedSeqs, DbName)),
+ {Workers, _} = lists:unzip(Seqs),
+ State = #collector{
+ query_args = ChangesArgs,
+ callback = Callback,
+ counters = fabric_dict:init(Workers, 0),
+ user_acc = AccIn,
+ limit = ChangesArgs#changes_args.limit,
+ rows = Seqs % store sequence positions instead
+ },
+ try fabric_util:receive_loop(Workers, #shard.ref, fun handle_message/3,
+ State, infinity, 5000)
+ after
+ fabric_util:cleanup(Workers)
+ end.
+
+handle_message({rexi_DOWN, _, _, _}, nil, State) ->
+ % TODO see if progress can be made here, possibly by removing all shards
+ % from that node and checking is_progress_possible
+ {ok, State};
+
+handle_message({rexi_EXIT, Reason}, Worker, State) ->
+ ?LOG_ERROR("~p rexi_EXIT ~p", [?MODULE, Reason]),
+ #collector{
+ callback=Callback,
+ counters=Counters0,
+ rows = Seqs0,
+ user_acc=Acc
+ } = State,
+ Counters = fabric_dict:erase(Worker, Counters0),
+ Seqs = fabric_dict:erase(Worker, Seqs0),
+ case fabric_view:is_progress_possible(Counters) of
+ true ->
+ {ok, State#collector{counters = Counters, rows=Seqs}};
+ false ->
+ Callback({error, dead_shards}, Acc),
+ {error, dead_shards}
+ end;
+
+handle_message(_, _, #collector{limit=0} = State) ->
+ {stop, State};
+
+handle_message(#view_row{key=Seq} = Row0, {Worker, From}, St) ->
+ #collector{
+ query_args = #changes_args{include_docs=IncludeDocs},
+ callback = Callback,
+ counters = S0,
+ limit = Limit,
+ user_acc = AccIn
+ } = St,
+ case fabric_dict:lookup_element(Worker, S0) of
+ undefined ->
+ % this worker lost the race with other partition copies, terminate it
+ gen_server:reply(From, stop),
+ {ok, St};
+ _ ->
+ S1 = fabric_dict:store(Worker, Seq, S0),
+ S2 = fabric_view:remove_overlapping_shards(Worker, S1),
+ Row = Row0#view_row{key = pack_seqs(S2)},
+ {Go, Acc} = Callback(changes_row(Row, IncludeDocs), AccIn),
+ gen_server:reply(From, Go),
+ {Go, St#collector{counters=S2, limit=Limit-1, user_acc=Acc}}
+ end;
+
+handle_message({complete, EndSeq}, Worker, State) ->
+ #collector{
+ counters = S0,
+ total_rows = Completed % override
+ } = State,
+ case fabric_dict:lookup_element(Worker, S0) of
+ undefined ->
+ {ok, State};
+ _ ->
+ S1 = fabric_dict:store(Worker, EndSeq, S0),
+ % unlikely to have overlaps here, but possible w/ filters
+ S2 = fabric_view:remove_overlapping_shards(Worker, S1),
+ NewState = State#collector{counters=S2, total_rows=Completed+1},
+ case fabric_dict:size(S2) =:= (Completed+1) of
+ true ->
+ {stop, NewState};
+ false ->
+ {ok, NewState}
+ end
+ end.
+
+make_changes_args(Options) ->
+ Options.
+
+get_start_seq(_DbName, #changes_args{dir=fwd, since=Since}) ->
+ Since;
+get_start_seq(DbName, #changes_args{dir=rev}) ->
+ Shards = partitions:all_parts(DbName),
+ Workers = fabric_util:submit_jobs(Shards, get_update_seq, []),
+ {ok, Since} = fabric_util:recv(Workers, #shard.ref,
+ fun collect_update_seqs/3, fabric_dict:init(Workers, -1)),
+ Since.
+
+collect_update_seqs(Seq, Shard, Counters) when is_integer(Seq) ->
+ case fabric_dict:lookup_element(Shard, Counters) of
+ undefined ->
+ % already heard from someone else in this range
+ {ok, Counters};
+ -1 ->
+ C1 = fabric_dict:store(Shard, Seq, Counters),
+ C2 = fabric_view:remove_overlapping_shards(Shard, C1),
+ case fabric_dict:any(-1, C2) of
+ true ->
+ {ok, C2};
+ false ->
+ {stop, pack_seqs(C2)}
+ end
+ end.
+
+pack_seqs(Workers) ->
+ SeqList = [{N,R,S} || {#shard{node=N, range=R}, S} <- Workers],
+ couch_util:encodeBase64Url(term_to_binary(SeqList, [compressed])).
+
+unpack_seqs(0, DbName) ->
+ fabric_dict:init(partitions:all_parts(DbName), 0);
+
+unpack_seqs("0", DbName) ->
+ fabric_dict:init(partitions:all_parts(DbName), 0);
+
+unpack_seqs(Packed, DbName) ->
+ % TODO relies on internal structure of fabric_dict as keylist
+ lists:map(fun({Node, [A,B], Seq}) ->
+ Name = partitions:shard_name(DbName, A),
+ {#shard{node=Node, range=[A,B], dbname=DbName, name=Name}, Seq}
+ end, binary_to_term(couch_util:decodeBase64Url(Packed))).
+
+start_update_notifiers(DbName) ->
+ lists:map(fun(#shard{node=Node, name=Name}) ->
+ {Node, rexi:cast(Node, {?MODULE, start_update_notifier, [Name]})}
+ end, partitions:all_parts(DbName)).
+
+% rexi endpoint
+start_update_notifier(DbName) ->
+ {Caller, _} = get(rexi_from),
+ Fun = fun({_, X}) when X == DbName -> Caller ! db_updated; (_) -> ok end,
+ Id = {couch_db_update_notifier, make_ref()},
+ ok = gen_event:add_sup_handler(couch_db_update, Id, Fun),
+ receive {gen_event_EXIT, Id, Reason} ->
+ rexi:reply({gen_event_EXIT, DbName, Reason})
+ end.
+
+stop_update_notifiers(Notifiers) ->
+ [rexi:kill(Node, Ref) || {Node, Ref} <- Notifiers].
+
+changes_row(#view_row{key=Seq, id=Id, value=Value, doc=deleted}, true) ->
+ {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {deleted, true}, {doc, null}]}};
+changes_row(#view_row{key=Seq, id=Id, value=Value, doc=deleted}, false) ->
+ {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {deleted, true}]}};
+changes_row(#view_row{key=Seq, id=Id, value=Value, doc={error,Reason}}, true) ->
+ {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {error,Reason}]}};
+changes_row(#view_row{key=Seq, id=Id, value=Value, doc=Doc}, true) ->
+ {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {doc,Doc}]}};
+changes_row(#view_row{key=Seq, id=Id, value=Value}, false) ->
+ {change, {[{seq,Seq}, {id,Id}, {changes,Value}]}}.
+
+find_replacement_shards(#shard{range=Range}, AllShards) ->
+ % TODO make this moar betta -- we might have split or merged the partition
+ [Shard || Shard <- AllShards, Shard#shard.range =:= Range].