summaryrefslogtreecommitdiff
path: root/apps/fabric/src/fabric_view_changes.erl
diff options
context:
space:
mode:
Diffstat (limited to 'apps/fabric/src/fabric_view_changes.erl')
-rw-r--r--apps/fabric/src/fabric_view_changes.erl271
1 files changed, 0 insertions, 271 deletions
diff --git a/apps/fabric/src/fabric_view_changes.erl b/apps/fabric/src/fabric_view_changes.erl
deleted file mode 100644
index a4421a92..00000000
--- a/apps/fabric/src/fabric_view_changes.erl
+++ /dev/null
@@ -1,271 +0,0 @@
-% Copyright 2010 Cloudant
-%
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(fabric_view_changes).
-
--export([go/5, start_update_notifier/1, pack_seqs/1]).
-
--include("fabric.hrl").
--include_lib("mem3/include/mem3.hrl").
--include_lib("couch/include/couch_db.hrl").
-
-go(DbName, Feed, Options, Callback, Acc0) when Feed == "continuous" orelse
- Feed == "longpoll" ->
- Args = make_changes_args(Options),
- {ok, Acc} = Callback(start, Acc0),
- Notifiers = start_update_notifiers(DbName),
- {Timeout, _} = couch_changes:get_changes_timeout(Args, Callback),
- try
- keep_sending_changes(
- DbName,
- Args,
- Callback,
- get_start_seq(DbName, Args),
- Acc,
- Timeout
- )
- after
- stop_update_notifiers(Notifiers),
- couch_changes:get_rest_db_updated()
- end;
-
-go(DbName, "normal", Options, Callback, Acc0) ->
- Args = make_changes_args(Options),
- {ok, Acc} = Callback(start, Acc0),
- {ok, #collector{counters=Seqs, user_acc=AccOut}} = send_changes(
- DbName,
- Args,
- Callback,
- get_start_seq(DbName, Args),
- Acc
- ),
- Callback({stop, pack_seqs(Seqs)}, AccOut).
-
-keep_sending_changes(DbName, Args, Callback, Seqs, AccIn, Timeout) ->
- #changes_args{limit=Limit, feed=Feed, heartbeat=Heartbeat} = Args,
- {ok, Collector} = send_changes(DbName, Args, Callback, Seqs, AccIn),
- #collector{limit=Limit2, counters=NewSeqs, user_acc=AccOut} = Collector,
- LastSeq = pack_seqs(NewSeqs),
- if Limit > Limit2, Feed == "longpoll" ->
- Callback({stop, LastSeq}, AccOut);
- true ->
- case wait_db_updated(Timeout) of
- updated ->
- keep_sending_changes(
- DbName,
- Args#changes_args{limit=Limit2},
- Callback,
- LastSeq,
- AccOut,
- Timeout
- );
- timeout ->
- case Heartbeat of undefined ->
- Callback({stop, LastSeq}, AccOut);
- _ ->
- {ok, AccTimeout} = Callback(timeout, AccOut),
- keep_sending_changes(DbName, Args#changes_args{limit=Limit2},
- Callback, LastSeq, AccTimeout, Timeout)
- end
- end
- end.
-
-send_changes(DbName, ChangesArgs, Callback, PackedSeqs, AccIn) ->
- AllShards = mem3:shards(DbName),
- Seqs = lists:flatmap(fun({#shard{name=Name, node=N} = Shard, Seq}) ->
- case lists:member(Shard, AllShards) of
- true ->
- Ref = rexi:cast(N, {fabric_rpc, changes, [Name,ChangesArgs,Seq]}),
- [{Shard#shard{ref = Ref}, Seq}];
- false ->
- % Find some replacement shards to cover the missing range
- % TODO It's possible in rare cases of shard merging to end up
- % with overlapping shard ranges from this technique
- lists:map(fun(#shard{name=Name2, node=N2} = NewShard) ->
- Ref = rexi:cast(N2, {fabric_rpc, changes, [Name2,ChangesArgs,0]}),
- {NewShard#shard{ref = Ref}, 0}
- end, find_replacement_shards(Shard, AllShards))
- end
- end, unpack_seqs(PackedSeqs, DbName)),
- {Workers, _} = lists:unzip(Seqs),
- State = #collector{
- query_args = ChangesArgs,
- callback = Callback,
- counters = fabric_dict:init(Workers, 0),
- user_acc = AccIn,
- limit = ChangesArgs#changes_args.limit,
- rows = Seqs % store sequence positions instead
- },
- try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3,
- State, infinity, 5000)
- after
- fabric_util:cleanup(Workers)
- end.
-
-handle_message({rexi_DOWN, _, _, _}, nil, State) ->
- % TODO see if progress can be made here, possibly by removing all shards
- % from that node and checking is_progress_possible
- {ok, State};
-
-handle_message({rexi_EXIT, Reason}, Worker, State) ->
- ?LOG_ERROR("~p rexi_EXIT ~p", [?MODULE, Reason]),
- #collector{
- callback=Callback,
- counters=Counters0,
- rows = Seqs0,
- user_acc=Acc
- } = State,
- Counters = fabric_dict:erase(Worker, Counters0),
- Seqs = fabric_dict:erase(Worker, Seqs0),
- case fabric_view:is_progress_possible(Counters) of
- true ->
- {ok, State#collector{counters = Counters, rows=Seqs}};
- false ->
- Callback({error, dead_shards}, Acc),
- {error, dead_shards}
- end;
-
-handle_message(_, _, #collector{limit=0} = State) ->
- {stop, State};
-
-handle_message(#view_row{key=Seq} = Row0, {Worker, From}, St) ->
- #collector{
- query_args = #changes_args{include_docs=IncludeDocs},
- callback = Callback,
- counters = S0,
- limit = Limit,
- user_acc = AccIn
- } = St,
- case fabric_dict:lookup_element(Worker, S0) of
- undefined ->
- % this worker lost the race with other partition copies, terminate it
- gen_server:reply(From, stop),
- {ok, St};
- _ ->
- S1 = fabric_dict:store(Worker, Seq, S0),
- S2 = fabric_view:remove_overlapping_shards(Worker, S1),
- Row = Row0#view_row{key = pack_seqs(S2)},
- {Go, Acc} = Callback(changes_row(Row, IncludeDocs), AccIn),
- gen_server:reply(From, Go),
- {Go, St#collector{counters=S2, limit=Limit-1, user_acc=Acc}}
- end;
-
-handle_message({complete, EndSeq}, Worker, State) ->
- #collector{
- counters = S0,
- total_rows = Completed % override
- } = State,
- case fabric_dict:lookup_element(Worker, S0) of
- undefined ->
- {ok, State};
- _ ->
- S1 = fabric_dict:store(Worker, EndSeq, S0),
- % unlikely to have overlaps here, but possible w/ filters
- S2 = fabric_view:remove_overlapping_shards(Worker, S1),
- NewState = State#collector{counters=S2, total_rows=Completed+1},
- case fabric_dict:size(S2) =:= (Completed+1) of
- true ->
- {stop, NewState};
- false ->
- {ok, NewState}
- end
- end.
-
-make_changes_args(#changes_args{style=Style, filter=undefined}=Args) ->
- Args#changes_args{filter = Style};
-make_changes_args(Args) ->
- Args.
-
-get_start_seq(_DbName, #changes_args{dir=fwd, since=Since}) ->
- Since;
-get_start_seq(DbName, #changes_args{dir=rev}) ->
- Shards = mem3:shards(DbName),
- Workers = fabric_util:submit_jobs(Shards, get_update_seq, []),
- {ok, Since} = fabric_util:recv(Workers, #shard.ref,
- fun collect_update_seqs/3, fabric_dict:init(Workers, -1)),
- Since.
-
-collect_update_seqs(Seq, Shard, Counters) when is_integer(Seq) ->
- case fabric_dict:lookup_element(Shard, Counters) of
- undefined ->
- % already heard from someone else in this range
- {ok, Counters};
- -1 ->
- C1 = fabric_dict:store(Shard, Seq, Counters),
- C2 = fabric_view:remove_overlapping_shards(Shard, C1),
- case fabric_dict:any(-1, C2) of
- true ->
- {ok, C2};
- false ->
- {stop, pack_seqs(C2)}
- end
- end.
-
-pack_seqs(Workers) ->
- SeqList = [{N,R,S} || {#shard{node=N, range=R}, S} <- Workers],
- SeqSum = lists:sum(element(2, lists:unzip(Workers))),
- Opaque = couch_util:encodeBase64Url(term_to_binary(SeqList, [compressed])),
- list_to_binary([integer_to_list(SeqSum), $-, Opaque]).
-
-unpack_seqs(0, DbName) ->
- fabric_dict:init(mem3:shards(DbName), 0);
-
-unpack_seqs("0", DbName) ->
- fabric_dict:init(mem3:shards(DbName), 0);
-
-unpack_seqs(Packed, DbName) ->
- {match, [Opaque]} = re:run(Packed, "^([0-9]+-)?(?<opaque>.*)", [{capture,
- [opaque], binary}]),
- % TODO relies on internal structure of fabric_dict as keylist
- lists:map(fun({Node, [A,B], Seq}) ->
- Shard = #shard{node=Node, range=[A,B], dbname=DbName},
- {mem3_util:name_shard(Shard), Seq}
- end, binary_to_term(couch_util:decodeBase64Url(Opaque))).
-
-start_update_notifiers(DbName) ->
- lists:map(fun(#shard{node=Node, name=Name}) ->
- {Node, rexi:cast(Node, {?MODULE, start_update_notifier, [Name]})}
- end, mem3:shards(DbName)).
-
-% rexi endpoint
-start_update_notifier(DbName) ->
- {Caller, _} = get(rexi_from),
- Fun = fun({_, X}) when X == DbName -> Caller ! db_updated; (_) -> ok end,
- Id = {couch_db_update_notifier, make_ref()},
- ok = gen_event:add_sup_handler(couch_db_update, Id, Fun),
- receive {gen_event_EXIT, Id, Reason} ->
- rexi:reply({gen_event_EXIT, DbName, Reason})
- end.
-
-stop_update_notifiers(Notifiers) ->
- [rexi:kill(Node, Ref) || {Node, Ref} <- Notifiers].
-
-changes_row(#view_row{key=Seq, id=Id, value=Value, doc=deleted}, true) ->
- {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {deleted, true}, {doc, null}]}};
-changes_row(#view_row{key=Seq, id=Id, value=Value, doc=deleted}, false) ->
- {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {deleted, true}]}};
-changes_row(#view_row{key=Seq, id=Id, value=Value, doc={error,Reason}}, true) ->
- {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {error,Reason}]}};
-changes_row(#view_row{key=Seq, id=Id, value=Value, doc=Doc}, true) ->
- {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {doc,Doc}]}};
-changes_row(#view_row{key=Seq, id=Id, value=Value}, false) ->
- {change, {[{seq,Seq}, {id,Id}, {changes,Value}]}}.
-
-find_replacement_shards(#shard{range=Range}, AllShards) ->
- % TODO make this moar betta -- we might have split or merged the partition
- [Shard || Shard <- AllShards, Shard#shard.range =:= Range].
-
-wait_db_updated(Timeout) ->
- receive db_updated -> couch_changes:get_rest_db_updated()
- after Timeout -> timeout end.