summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAdam Kocoloski <adam@cloudant.com>2010-06-08 16:20:31 -0400
committerAdam Kocoloski <adam@cloudant.com>2010-06-08 16:20:31 -0400
commit44380d7ac0b135fa2d6107cf784a99a1efd393e6 (patch)
treed440c0e586eeb3cec3713e0d7fdc93f811935068
parentd7a6bd635ce60d0fc6a6a40dc3027cee308701f6 (diff)
fabric handler for _changes, BugzID 10219
-rw-r--r--ebin/fabric.app1
-rw-r--r--src/fabric.erl7
-rw-r--r--src/fabric_dict.erl5
-rw-r--r--src/fabric_rpc.erl61
-rw-r--r--src/fabric_view_changes.erl242
5 files changed, 308 insertions, 8 deletions
diff --git a/ebin/fabric.app b/ebin/fabric.app
index 1fb67200..9c26b635 100644
--- a/ebin/fabric.app
+++ b/ebin/fabric.app
@@ -20,6 +20,7 @@
fabric_util,
fabric_view,
fabric_view_all_docs,
+ fabric_view_changes,
fabric_view_map,
fabric_view_reduce
]},
diff --git a/src/fabric.erl b/src/fabric.erl
index 80bc6d4c..80646561 100644
--- a/src/fabric.erl
+++ b/src/fabric.erl
@@ -9,7 +9,7 @@
update_docs/3, att_receiver/2]).
% Views
--export([all_docs/4, query_view/5]).
+-export([all_docs/4, changes/3, query_view/5]).
% miscellany
-export([db_path/2, design_docs/1]).
@@ -67,6 +67,11 @@ all_docs(DbName, #view_query_args{} = QueryArgs, Callback, Acc0) when
is_function(Callback, 2) ->
fabric_view_all_docs:go(dbname(DbName), QueryArgs, Callback, Acc0).
+changes(DbName, Options, Callback) ->
+ % TODO use a keylist for Options instead of #changes_args, BugzID 10281
+ Feed = Options#changes_args.feed,
+ fabric_view_changes:go(dbname(DbName), Feed, Options, Callback).
+
query_view(DbName, View, #view_query_args{view_type=reduce} = QueryArgs,
Callback, Acc0) ->
fabric_view_reduce:go(dbname(DbName), view(View), QueryArgs, Callback, Acc0);
diff --git a/src/fabric_dict.erl b/src/fabric_dict.erl
index a4191682..42d46b34 100644
--- a/src/fabric_dict.erl
+++ b/src/fabric_dict.erl
@@ -11,6 +11,9 @@ init(Keys, InitialValue) ->
decrement_all(Dict) ->
[{K,V-1} || {K,V} <- Dict].
+store(Key, Value, Dict) ->
+ orddict:store(Key, Value, Dict).
+
erase(Key, Dict) ->
orddict:erase(Key, Dict).
@@ -21,6 +24,8 @@ update_counter(Key, Incr, Dict0) ->
lookup_element(Key, Dict) ->
couch_util:get_value(Key, Dict).
+size(Dict) ->
+ orddict:size(Dict).
any(Value, Dict) ->
lists:keymember(Value, 2, Dict).
diff --git a/src/fabric_rpc.erl b/src/fabric_rpc.erl
index aa922585..fea95f24 100644
--- a/src/fabric_rpc.erl
+++ b/src/fabric_rpc.erl
@@ -2,7 +2,7 @@
-export([get_db_info/1, get_doc_count/1, get_update_seq/1]).
-export([open_doc/3, open_revs/4, get_missing_revs/2, update_docs/3]).
--export([all_docs/2, map_view/4, reduce_view/4]).
+-export([all_docs/2, changes/3, map_view/4, reduce_view/4]).
-include("fabric.hrl").
@@ -42,6 +42,28 @@ all_docs(DbName, #view_query_args{keys=nil} = QueryArgs) ->
{ok, Acc} = couch_db:enum_docs(Db, StartId, Dir, fun view_fold/3, Acc0),
final_response(Total, Acc#view_acc.offset).
+changes(DbName, Args0, StartSeq) ->
+ #changes_args{style=Style, dir=Dir, filter=FilterName} = Args0,
+ case couch_db:open(DbName, []) of
+ {ok, Db} ->
+ % couch code has a MochiReq for 2nd argument, ick
+ Args = Args0#changes_args{
+ filter = couch_changes:make_filter_fun(FilterName, nil, Db)
+ },
+ Enum = fun changes_enumerator/2,
+ Opts = [{dir,Dir}],
+ Acc0 = {Db, StartSeq, Args},
+ try
+ {ok, {_, LastSeq, _}} =
+ couch_db:changes_since(Db, Style, StartSeq, Enum, Opts, Acc0),
+ rexi:reply({complete, LastSeq})
+ after
+ couch_db:close(Db)
+ end;
+ Error ->
+ rexi:reply(Error)
+ end.
+
map_view(DbName, DDoc, ViewName, QueryArgs) ->
{ok, Db} = couch_db:open(DbName, []),
#view_query_args{
@@ -124,12 +146,7 @@ get_doc_count(DbName) ->
with_db(DbName, [], {couch_db, get_doc_count, []}).
get_update_seq(DbName) ->
- rexi:reply(case couch_db:open(DbName, []) of
- {ok, #db{update_seq = Seq}} ->
- {ok, Seq};
- Error ->
- Error
- end).
+ with_db(DbName, [], {couch_db, get_update_seq, []}).
open_doc(DbName, DocId, Options) ->
with_db(DbName, Options, {couch_db, open_doc_int, [DocId, Options]}).
@@ -288,3 +305,33 @@ send(Key, Value, #view_acc{limit=Limit} = Acc) ->
stop ->
exit(normal)
end.
+
+changes_enumerator(DocInfos, {Db, _Seq, Args}) ->
+ #changes_args{include_docs=IncludeDocs, filter=FilterFun} = Args,
+ [#doc_info{id=Id, high_seq=Seq, revs=[#rev_info{deleted=Del,rev=Rev}|_]}|_]
+ = DocInfos,
+ case [Result || Result <- FilterFun(DocInfos), Result /= null] of
+ [] ->
+ {ok, {Db, Seq, Args}};
+ Results ->
+ ChangesRow = changes_row(Db, Seq, Id, Results, Rev, Del, IncludeDocs),
+ Go = rexi:sync_reply(ChangesRow),
+ {Go, {Db, Seq, Args}}
+ end.
+
+changes_row(_, Seq, Id, Results, _, true, true) ->
+ #view_row{key=Seq, id=Id, value=Results, doc=deleted};
+changes_row(_, Seq, Id, Results, _, true, false) ->
+ #view_row{key=Seq, id=Id, value=Results, doc=deleted};
+changes_row(Db, Seq, Id, Results, Rev, false, true) ->
+ #view_row{key=Seq, id=Id, value=Results, doc=doc_member(Db, Id, Rev)};
+changes_row(_, Seq, Id, Results, _, false, false) ->
+ #view_row{key=Seq, id=Id, value=Results}.
+
+doc_member(Shard, Id, Rev) ->
+ case couch_db:open_doc_revs(Shard, Id, [Rev], []) of
+ {ok, [{ok,Doc}]} ->
+ couch_doc:to_json_obj(Doc, []);
+ Error ->
+ Error
+ end.
diff --git a/src/fabric_view_changes.erl b/src/fabric_view_changes.erl
new file mode 100644
index 00000000..666a85c6
--- /dev/null
+++ b/src/fabric_view_changes.erl
@@ -0,0 +1,242 @@
+-module(fabric_view_changes).
+
+-export([go/4, start_update_notifier/1]).
+
+-include("fabric.hrl").
+
+go(DbName, Feed, Options, Callback) when Feed == "continuous" orelse
+ Feed == "longpoll" ->
+ Args = make_changes_args(Options),
+ {ok, Acc0} = Callback(start, Feed),
+ Notifiers = start_update_notifiers(DbName),
+ {Timeout, TimeoutFun} = couch_changes:get_changes_timeout(Args, Callback),
+ try
+ keep_sending_changes(
+ DbName,
+ Args,
+ Callback,
+ get_start_seq(DbName, Args),
+ Acc0,
+ Timeout,
+ TimeoutFun
+ )
+ after
+ stop_update_notifiers(Notifiers),
+ couch_changes:get_rest_db_updated()
+ end;
+
+go(DbName, "normal", Options, Callback) ->
+ Args = make_changes_args(Options),
+ {ok, Acc0} = Callback(start, "normal"),
+ {ok, #collector{counters=Seqs, user_acc=AccOut}} = send_changes(
+ DbName,
+ Args,
+ Callback,
+ get_start_seq(DbName, Args),
+ Acc0
+ ),
+ Callback({stop, pack_seqs(Seqs)}, AccOut),
+ {ok, AccOut}.
+
+keep_sending_changes(DbName, Args, Callback, Seqs, AccIn, Timeout, TFun) ->
+ #changes_args{limit=Limit, feed=Feed} = Args,
+ {ok, Collector} = send_changes(DbName, Args, Callback, Seqs, AccIn),
+ #collector{limit=Limit2, counters=Seqs, user_acc=AccOut} = Collector,
+ LastSeq = pack_seqs(Seqs),
+ if Limit > Limit2, Feed == "longpoll" ->
+ Callback({stop, LastSeq}, AccOut);
+ true ->
+ case couch_changes:wait_db_updated(Timeout, TFun) of
+ updated ->
+ keep_sending_changes(
+ DbName,
+ Args#changes_args{limit=Limit2},
+ Callback,
+ LastSeq,
+ AccIn,
+ Timeout,
+ TFun
+ );
+ stop ->
+ Callback({stop, LastSeq}, AccOut)
+ end
+ end.
+
+send_changes(DbName, ChangesArgs, Callback, PackedSeqs, AccIn) ->
+ AllShards = partitions:all_parts(DbName),
+ Seqs = lists:flatmap(fun({#shard{name=Name, node=N} = Shard, Seq}) ->
+ case lists:member(Shard, AllShards) of
+ true ->
+ Ref = rexi:cast(N, {fabric_rpc, changes, [Name,ChangesArgs,Seq]}),
+ [{Shard#shard{ref = Ref}, Seq}];
+ false ->
+ % Find some replacement shards to cover the missing range
+ % TODO It's possible in rare cases of shard merging to end up
+ % with overlapping shard ranges from this technique
+ lists:map(fun(#shard{name=Name2, node=N2} = NewShard) ->
+ Ref = rexi:cast(N2, {fabric_rpc, changes, [Name2,ChangesArgs,0]}),
+ {NewShard#shard{ref = Ref}, 0}
+ end, find_replacement_shards(Shard, AllShards))
+ end
+ end, unpack_seqs(PackedSeqs, DbName)),
+ {Workers, _} = lists:unzip(Seqs),
+ State = #collector{
+ query_args = ChangesArgs,
+ callback = Callback,
+ counters = fabric_dict:init(Workers, 0),
+ user_acc = AccIn,
+ limit = ChangesArgs#changes_args.limit,
+ rows = Seqs % store sequence positions instead
+ },
+ try fabric_util:receive_loop(Workers, #shard.ref, fun handle_message/3,
+ State, infinity, 5000)
+ after
+ fabric_util:cleanup(Workers)
+ end.
+
+handle_message({rexi_DOWN, _, _, _}, nil, State) ->
+ % TODO see if progress can be made here, possibly by removing all shards
+ % from that node and checking is_progress_possible
+ {ok, State};
+
+handle_message({rexi_EXIT, Reason}, Worker, State) ->
+ ?LOG_ERROR("~p rexi_EXIT ~p", [?MODULE, Reason]),
+ #collector{
+ callback=Callback,
+ counters=Counters0,
+ rows = Seqs0,
+ user_acc=Acc
+ } = State,
+ Counters = fabric_dict:erase(Worker, Counters0),
+ Seqs = fabric_dict:erase(Worker, Seqs0),
+ case fabric_view:is_progress_possible(Counters) of
+ true ->
+ {ok, State#collector{counters = Counters, rows=Seqs}};
+ false ->
+ Callback({error, dead_shards}, Acc),
+ {error, dead_shards}
+ end;
+
+handle_message(_, _, #collector{limit=0} = State) ->
+ {stop, State};
+
+handle_message(#view_row{key=Seq} = Row0, {Worker, From}, St) ->
+ #collector{
+ query_args = #changes_args{include_docs=IncludeDocs},
+ callback = Callback,
+ counters = S0,
+ limit = Limit,
+ user_acc = AccIn
+ } = St,
+ case fabric_dict:lookup_element(Worker, S0) of
+ undefined ->
+ % this worker lost the race with other partition copies, terminate it
+ gen_server:reply(From, stop),
+ {ok, St};
+ _ ->
+ S1 = fabric_dict:store(Worker, Seq, S0),
+ S2 = fabric_view:remove_overlapping_shards(Worker, S1),
+ Row = Row0#view_row{key = pack_seqs(S2)},
+ {Go, Acc} = Callback(changes_row(Row, IncludeDocs), AccIn),
+ gen_server:reply(From, Go),
+ {Go, St#collector{counters=S2, limit=Limit-1, user_acc=Acc}}
+ end;
+
+handle_message({complete, EndSeq}, Worker, State) ->
+ #collector{
+ counters = S0,
+ total_rows = Completed % override
+ } = State,
+ case fabric_dict:lookup_element(Worker, S0) of
+ undefined ->
+ {ok, State};
+ _ ->
+ S1 = fabric_dict:store(Worker, EndSeq, S0),
+ % unlikely to have overlaps here, but possible w/ filters
+ S2 = fabric_view:remove_overlapping_shards(Worker, S1),
+ NewState = State#collector{counters=S2, total_rows=Completed+1},
+ case fabric_dict:size(S2) =:= (Completed+1) of
+ true ->
+ {stop, NewState};
+ false ->
+ {ok, NewState}
+ end
+ end.
+
+make_changes_args(Options) ->
+ Options.
+
+get_start_seq(_DbName, #changes_args{dir=fwd, since=Since}) ->
+ Since;
+get_start_seq(DbName, #changes_args{dir=rev}) ->
+ Shards = partitions:all_parts(DbName),
+ Workers = fabric_util:submit_jobs(Shards, get_update_seq, []),
+ {ok, Since} = fabric_util:recv(Workers, #shard.ref,
+ fun collect_update_seqs/3, fabric_dict:init(Workers, -1)),
+ Since.
+
+collect_update_seqs(Seq, Shard, Counters) when is_integer(Seq) ->
+ case fabric_dict:lookup_element(Shard, Counters) of
+ undefined ->
+ % already heard from someone else in this range
+ {ok, Counters};
+ -1 ->
+ C1 = fabric_dict:store(Shard, Seq, Counters),
+ C2 = fabric_view:remove_overlapping_shards(Shard, C1),
+ case fabric_dict:any(-1, C2) of
+ true ->
+ {ok, C2};
+ false ->
+ {stop, pack_seqs(C2)}
+ end
+ end.
+
+pack_seqs(Workers) ->
+ SeqList = [{N,R,S} || {#shard{node=N, range=R}, S} <- Workers],
+ couch_util:encodeBase64Url(term_to_binary(SeqList, [compressed])).
+
+unpack_seqs(0, DbName) ->
+ fabric_dict:init(partitions:all_parts(DbName), 0);
+
+unpack_seqs("0", DbName) ->
+ fabric_dict:init(partitions:all_parts(DbName), 0);
+
+unpack_seqs(Packed, DbName) ->
+ % TODO relies on internal structure of fabric_dict as keylist
+ lists:map(fun({Node, [A,B], Seq}) ->
+ Name = partitions:shard_name(DbName, A),
+ {#shard{node=Node, range=[A,B], dbname=DbName, name=Name}, Seq}
+ end, binary_to_term(couch_util:decodeBase64Url(Packed))).
+
+start_update_notifiers(DbName) ->
+ lists:map(fun(#shard{node=Node, name=Name}) ->
+ {Node, rexi:cast(Node, {?MODULE, start_update_notifier, [Name]})}
+ end, partitions:all_parts(DbName)).
+
+% rexi endpoint
+start_update_notifier(DbName) ->
+ {Caller, _} = get(rexi_from),
+ Fun = fun({_, X}) when X == DbName -> Caller ! db_updated; (_) -> ok end,
+ Id = {couch_db_update_notifier, make_ref()},
+ ok = gen_event:add_sup_handler(couch_db_update, Id, Fun),
+ receive {gen_event_EXIT, Id, Reason} ->
+ rexi:reply({gen_event_EXIT, DbName, Reason})
+ end.
+
+stop_update_notifiers(Notifiers) ->
+ [rexi:kill(Node, Ref) || {Node, Ref} <- Notifiers].
+
+changes_row(#view_row{key=Seq, id=Id, value=Value, doc=deleted}, true) ->
+ {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {deleted, true}, {doc, null}]}};
+changes_row(#view_row{key=Seq, id=Id, value=Value, doc=deleted}, false) ->
+ {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {deleted, true}]}};
+changes_row(#view_row{key=Seq, id=Id, value=Value, doc={error,Reason}}, true) ->
+ {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {error,Reason}]}};
+changes_row(#view_row{key=Seq, id=Id, value=Value, doc=Doc}, true) ->
+ {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {doc,Doc}]}};
+changes_row(#view_row{key=Seq, id=Id, value=Value}, false) ->
+ {change, {[{seq,Seq}, {id,Id}, {changes,Value}]}}.
+
+find_replacement_shards(#shard{range=Range}, AllShards) ->
+ % TODO make this moar betta -- we might have split or merged the partition
+ [Shard || Shard <- AllShards, Shard#shard.range =:= Range].