diff options
-rw-r--r-- | ebin/fabric.app | 1 | ||||
-rw-r--r-- | src/fabric.erl | 7 | ||||
-rw-r--r-- | src/fabric_dict.erl | 5 | ||||
-rw-r--r-- | src/fabric_rpc.erl | 61 | ||||
-rw-r--r-- | src/fabric_view_changes.erl | 242 |
5 files changed, 308 insertions, 8 deletions
diff --git a/ebin/fabric.app b/ebin/fabric.app index 1fb67200..9c26b635 100644 --- a/ebin/fabric.app +++ b/ebin/fabric.app @@ -20,6 +20,7 @@ fabric_util, fabric_view, fabric_view_all_docs, + fabric_view_changes, fabric_view_map, fabric_view_reduce ]}, diff --git a/src/fabric.erl b/src/fabric.erl index 80bc6d4c..80646561 100644 --- a/src/fabric.erl +++ b/src/fabric.erl @@ -9,7 +9,7 @@ update_docs/3, att_receiver/2]). % Views --export([all_docs/4, query_view/5]). +-export([all_docs/4, changes/3, query_view/5]). % miscellany -export([db_path/2, design_docs/1]). @@ -67,6 +67,11 @@ all_docs(DbName, #view_query_args{} = QueryArgs, Callback, Acc0) when is_function(Callback, 2) -> fabric_view_all_docs:go(dbname(DbName), QueryArgs, Callback, Acc0). +changes(DbName, Options, Callback) -> + % TODO use a keylist for Options instead of #changes_args, BugzID 10281 + Feed = Options#changes_args.feed, + fabric_view_changes:go(dbname(DbName), Feed, Options, Callback). + query_view(DbName, View, #view_query_args{view_type=reduce} = QueryArgs, Callback, Acc0) -> fabric_view_reduce:go(dbname(DbName), view(View), QueryArgs, Callback, Acc0); diff --git a/src/fabric_dict.erl b/src/fabric_dict.erl index a4191682..42d46b34 100644 --- a/src/fabric_dict.erl +++ b/src/fabric_dict.erl @@ -11,6 +11,9 @@ init(Keys, InitialValue) -> decrement_all(Dict) -> [{K,V-1} || {K,V} <- Dict]. +store(Key, Value, Dict) -> + orddict:store(Key, Value, Dict). + erase(Key, Dict) -> orddict:erase(Key, Dict). @@ -21,6 +24,8 @@ update_counter(Key, Incr, Dict0) -> lookup_element(Key, Dict) -> couch_util:get_value(Key, Dict). +size(Dict) -> + orddict:size(Dict). any(Value, Dict) -> lists:keymember(Value, 2, Dict). diff --git a/src/fabric_rpc.erl b/src/fabric_rpc.erl index aa922585..fea95f24 100644 --- a/src/fabric_rpc.erl +++ b/src/fabric_rpc.erl @@ -2,7 +2,7 @@ -export([get_db_info/1, get_doc_count/1, get_update_seq/1]). -export([open_doc/3, open_revs/4, get_missing_revs/2, update_docs/3]). --export([all_docs/2, map_view/4, reduce_view/4]). +-export([all_docs/2, changes/3, map_view/4, reduce_view/4]). -include("fabric.hrl"). @@ -42,6 +42,28 @@ all_docs(DbName, #view_query_args{keys=nil} = QueryArgs) -> {ok, Acc} = couch_db:enum_docs(Db, StartId, Dir, fun view_fold/3, Acc0), final_response(Total, Acc#view_acc.offset). +changes(DbName, Args0, StartSeq) -> + #changes_args{style=Style, dir=Dir, filter=FilterName} = Args0, + case couch_db:open(DbName, []) of + {ok, Db} -> + % couch code has a MochiReq for 2nd argument, ick + Args = Args0#changes_args{ + filter = couch_changes:make_filter_fun(FilterName, nil, Db) + }, + Enum = fun changes_enumerator/2, + Opts = [{dir,Dir}], + Acc0 = {Db, StartSeq, Args}, + try + {ok, {_, LastSeq, _}} = + couch_db:changes_since(Db, Style, StartSeq, Enum, Opts, Acc0), + rexi:reply({complete, LastSeq}) + after + couch_db:close(Db) + end; + Error -> + rexi:reply(Error) + end. + map_view(DbName, DDoc, ViewName, QueryArgs) -> {ok, Db} = couch_db:open(DbName, []), #view_query_args{ @@ -124,12 +146,7 @@ get_doc_count(DbName) -> with_db(DbName, [], {couch_db, get_doc_count, []}). get_update_seq(DbName) -> - rexi:reply(case couch_db:open(DbName, []) of - {ok, #db{update_seq = Seq}} -> - {ok, Seq}; - Error -> - Error - end). + with_db(DbName, [], {couch_db, get_update_seq, []}). open_doc(DbName, DocId, Options) -> with_db(DbName, Options, {couch_db, open_doc_int, [DocId, Options]}). @@ -288,3 +305,33 @@ send(Key, Value, #view_acc{limit=Limit} = Acc) -> stop -> exit(normal) end. + +changes_enumerator(DocInfos, {Db, _Seq, Args}) -> + #changes_args{include_docs=IncludeDocs, filter=FilterFun} = Args, + [#doc_info{id=Id, high_seq=Seq, revs=[#rev_info{deleted=Del,rev=Rev}|_]}|_] + = DocInfos, + case [Result || Result <- FilterFun(DocInfos), Result /= null] of + [] -> + {ok, {Db, Seq, Args}}; + Results -> + ChangesRow = changes_row(Db, Seq, Id, Results, Rev, Del, IncludeDocs), + Go = rexi:sync_reply(ChangesRow), + {Go, {Db, Seq, Args}} + end. + +changes_row(_, Seq, Id, Results, _, true, true) -> + #view_row{key=Seq, id=Id, value=Results, doc=deleted}; +changes_row(_, Seq, Id, Results, _, true, false) -> + #view_row{key=Seq, id=Id, value=Results, doc=deleted}; +changes_row(Db, Seq, Id, Results, Rev, false, true) -> + #view_row{key=Seq, id=Id, value=Results, doc=doc_member(Db, Id, Rev)}; +changes_row(_, Seq, Id, Results, _, false, false) -> + #view_row{key=Seq, id=Id, value=Results}. + +doc_member(Shard, Id, Rev) -> + case couch_db:open_doc_revs(Shard, Id, [Rev], []) of + {ok, [{ok,Doc}]} -> + couch_doc:to_json_obj(Doc, []); + Error -> + Error + end. diff --git a/src/fabric_view_changes.erl b/src/fabric_view_changes.erl new file mode 100644 index 00000000..666a85c6 --- /dev/null +++ b/src/fabric_view_changes.erl @@ -0,0 +1,242 @@ +-module(fabric_view_changes). + +-export([go/4, start_update_notifier/1]). + +-include("fabric.hrl"). + +go(DbName, Feed, Options, Callback) when Feed == "continuous" orelse + Feed == "longpoll" -> + Args = make_changes_args(Options), + {ok, Acc0} = Callback(start, Feed), + Notifiers = start_update_notifiers(DbName), + {Timeout, TimeoutFun} = couch_changes:get_changes_timeout(Args, Callback), + try + keep_sending_changes( + DbName, + Args, + Callback, + get_start_seq(DbName, Args), + Acc0, + Timeout, + TimeoutFun + ) + after + stop_update_notifiers(Notifiers), + couch_changes:get_rest_db_updated() + end; + +go(DbName, "normal", Options, Callback) -> + Args = make_changes_args(Options), + {ok, Acc0} = Callback(start, "normal"), + {ok, #collector{counters=Seqs, user_acc=AccOut}} = send_changes( + DbName, + Args, + Callback, + get_start_seq(DbName, Args), + Acc0 + ), + Callback({stop, pack_seqs(Seqs)}, AccOut), + {ok, AccOut}. + +keep_sending_changes(DbName, Args, Callback, Seqs, AccIn, Timeout, TFun) -> + #changes_args{limit=Limit, feed=Feed} = Args, + {ok, Collector} = send_changes(DbName, Args, Callback, Seqs, AccIn), + #collector{limit=Limit2, counters=Seqs, user_acc=AccOut} = Collector, + LastSeq = pack_seqs(Seqs), + if Limit > Limit2, Feed == "longpoll" -> + Callback({stop, LastSeq}, AccOut); + true -> + case couch_changes:wait_db_updated(Timeout, TFun) of + updated -> + keep_sending_changes( + DbName, + Args#changes_args{limit=Limit2}, + Callback, + LastSeq, + AccIn, + Timeout, + TFun + ); + stop -> + Callback({stop, LastSeq}, AccOut) + end + end. + +send_changes(DbName, ChangesArgs, Callback, PackedSeqs, AccIn) -> + AllShards = partitions:all_parts(DbName), + Seqs = lists:flatmap(fun({#shard{name=Name, node=N} = Shard, Seq}) -> + case lists:member(Shard, AllShards) of + true -> + Ref = rexi:cast(N, {fabric_rpc, changes, [Name,ChangesArgs,Seq]}), + [{Shard#shard{ref = Ref}, Seq}]; + false -> + % Find some replacement shards to cover the missing range + % TODO It's possible in rare cases of shard merging to end up + % with overlapping shard ranges from this technique + lists:map(fun(#shard{name=Name2, node=N2} = NewShard) -> + Ref = rexi:cast(N2, {fabric_rpc, changes, [Name2,ChangesArgs,0]}), + {NewShard#shard{ref = Ref}, 0} + end, find_replacement_shards(Shard, AllShards)) + end + end, unpack_seqs(PackedSeqs, DbName)), + {Workers, _} = lists:unzip(Seqs), + State = #collector{ + query_args = ChangesArgs, + callback = Callback, + counters = fabric_dict:init(Workers, 0), + user_acc = AccIn, + limit = ChangesArgs#changes_args.limit, + rows = Seqs % store sequence positions instead + }, + try fabric_util:receive_loop(Workers, #shard.ref, fun handle_message/3, + State, infinity, 5000) + after + fabric_util:cleanup(Workers) + end. + +handle_message({rexi_DOWN, _, _, _}, nil, State) -> + % TODO see if progress can be made here, possibly by removing all shards + % from that node and checking is_progress_possible + {ok, State}; + +handle_message({rexi_EXIT, Reason}, Worker, State) -> + ?LOG_ERROR("~p rexi_EXIT ~p", [?MODULE, Reason]), + #collector{ + callback=Callback, + counters=Counters0, + rows = Seqs0, + user_acc=Acc + } = State, + Counters = fabric_dict:erase(Worker, Counters0), + Seqs = fabric_dict:erase(Worker, Seqs0), + case fabric_view:is_progress_possible(Counters) of + true -> + {ok, State#collector{counters = Counters, rows=Seqs}}; + false -> + Callback({error, dead_shards}, Acc), + {error, dead_shards} + end; + +handle_message(_, _, #collector{limit=0} = State) -> + {stop, State}; + +handle_message(#view_row{key=Seq} = Row0, {Worker, From}, St) -> + #collector{ + query_args = #changes_args{include_docs=IncludeDocs}, + callback = Callback, + counters = S0, + limit = Limit, + user_acc = AccIn + } = St, + case fabric_dict:lookup_element(Worker, S0) of + undefined -> + % this worker lost the race with other partition copies, terminate it + gen_server:reply(From, stop), + {ok, St}; + _ -> + S1 = fabric_dict:store(Worker, Seq, S0), + S2 = fabric_view:remove_overlapping_shards(Worker, S1), + Row = Row0#view_row{key = pack_seqs(S2)}, + {Go, Acc} = Callback(changes_row(Row, IncludeDocs), AccIn), + gen_server:reply(From, Go), + {Go, St#collector{counters=S2, limit=Limit-1, user_acc=Acc}} + end; + +handle_message({complete, EndSeq}, Worker, State) -> + #collector{ + counters = S0, + total_rows = Completed % override + } = State, + case fabric_dict:lookup_element(Worker, S0) of + undefined -> + {ok, State}; + _ -> + S1 = fabric_dict:store(Worker, EndSeq, S0), + % unlikely to have overlaps here, but possible w/ filters + S2 = fabric_view:remove_overlapping_shards(Worker, S1), + NewState = State#collector{counters=S2, total_rows=Completed+1}, + case fabric_dict:size(S2) =:= (Completed+1) of + true -> + {stop, NewState}; + false -> + {ok, NewState} + end + end. + +make_changes_args(Options) -> + Options. + +get_start_seq(_DbName, #changes_args{dir=fwd, since=Since}) -> + Since; +get_start_seq(DbName, #changes_args{dir=rev}) -> + Shards = partitions:all_parts(DbName), + Workers = fabric_util:submit_jobs(Shards, get_update_seq, []), + {ok, Since} = fabric_util:recv(Workers, #shard.ref, + fun collect_update_seqs/3, fabric_dict:init(Workers, -1)), + Since. + +collect_update_seqs(Seq, Shard, Counters) when is_integer(Seq) -> + case fabric_dict:lookup_element(Shard, Counters) of + undefined -> + % already heard from someone else in this range + {ok, Counters}; + -1 -> + C1 = fabric_dict:store(Shard, Seq, Counters), + C2 = fabric_view:remove_overlapping_shards(Shard, C1), + case fabric_dict:any(-1, C2) of + true -> + {ok, C2}; + false -> + {stop, pack_seqs(C2)} + end + end. + +pack_seqs(Workers) -> + SeqList = [{N,R,S} || {#shard{node=N, range=R}, S} <- Workers], + couch_util:encodeBase64Url(term_to_binary(SeqList, [compressed])). + +unpack_seqs(0, DbName) -> + fabric_dict:init(partitions:all_parts(DbName), 0); + +unpack_seqs("0", DbName) -> + fabric_dict:init(partitions:all_parts(DbName), 0); + +unpack_seqs(Packed, DbName) -> + % TODO relies on internal structure of fabric_dict as keylist + lists:map(fun({Node, [A,B], Seq}) -> + Name = partitions:shard_name(DbName, A), + {#shard{node=Node, range=[A,B], dbname=DbName, name=Name}, Seq} + end, binary_to_term(couch_util:decodeBase64Url(Packed))). + +start_update_notifiers(DbName) -> + lists:map(fun(#shard{node=Node, name=Name}) -> + {Node, rexi:cast(Node, {?MODULE, start_update_notifier, [Name]})} + end, partitions:all_parts(DbName)). + +% rexi endpoint +start_update_notifier(DbName) -> + {Caller, _} = get(rexi_from), + Fun = fun({_, X}) when X == DbName -> Caller ! db_updated; (_) -> ok end, + Id = {couch_db_update_notifier, make_ref()}, + ok = gen_event:add_sup_handler(couch_db_update, Id, Fun), + receive {gen_event_EXIT, Id, Reason} -> + rexi:reply({gen_event_EXIT, DbName, Reason}) + end. + +stop_update_notifiers(Notifiers) -> + [rexi:kill(Node, Ref) || {Node, Ref} <- Notifiers]. + +changes_row(#view_row{key=Seq, id=Id, value=Value, doc=deleted}, true) -> + {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {deleted, true}, {doc, null}]}}; +changes_row(#view_row{key=Seq, id=Id, value=Value, doc=deleted}, false) -> + {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {deleted, true}]}}; +changes_row(#view_row{key=Seq, id=Id, value=Value, doc={error,Reason}}, true) -> + {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {error,Reason}]}}; +changes_row(#view_row{key=Seq, id=Id, value=Value, doc=Doc}, true) -> + {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {doc,Doc}]}}; +changes_row(#view_row{key=Seq, id=Id, value=Value}, false) -> + {change, {[{seq,Seq}, {id,Id}, {changes,Value}]}}. + +find_replacement_shards(#shard{range=Range}, AllShards) -> + % TODO make this moar betta -- we might have split or merged the partition + [Shard || Shard <- AllShards, Shard#shard.range =:= Range]. |