summaryrefslogtreecommitdiff
path: root/deps/fabric/src/fabric_view_changes.erl
diff options
context:
space:
mode:
Diffstat (limited to 'deps/fabric/src/fabric_view_changes.erl')
-rw-r--r--deps/fabric/src/fabric_view_changes.erl334
1 files changed, 334 insertions, 0 deletions
diff --git a/deps/fabric/src/fabric_view_changes.erl b/deps/fabric/src/fabric_view_changes.erl
new file mode 100644
index 00000000..41347095
--- /dev/null
+++ b/deps/fabric/src/fabric_view_changes.erl
@@ -0,0 +1,334 @@
+% Copyright 2012 Cloudant
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric_view_changes).
+
+-export([go/5, pack_seqs/1]).
+
+-include("fabric.hrl").
+-include_lib("mem3/include/mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+-import(fabric_db_update_listener, [wait_db_updated/1, stop/1]).
+
+go(DbName, Feed, Options, Callback, Acc0) when Feed == "continuous" orelse
+ Feed == "longpoll" ->
+ Args = make_changes_args(Options),
+ Since = get_start_seq(DbName, Args),
+ case validate_start_seq(DbName, Since) of
+ ok ->
+ {ok, Acc} = Callback(start, Acc0),
+ {Timeout, _} = couch_changes:get_changes_timeout(Args, Callback),
+ Ref = make_ref(),
+ Parent = self(),
+ UpdateListener = {spawn_link(fabric_db_update_listener, go,
+ [Parent, Ref, DbName, Timeout]),
+ Ref},
+ try
+ keep_sending_changes(
+ DbName,
+ Args,
+ Callback,
+ Since,
+ Acc,
+ Timeout,
+ UpdateListener
+ )
+ after
+ stop(UpdateListener)
+ end;
+ Error ->
+ Callback(Error, Acc0)
+ end;
+
+go(DbName, "normal", Options, Callback, Acc0) ->
+ Args = make_changes_args(Options),
+ Since = get_start_seq(DbName, Args),
+ case validate_start_seq(DbName, Since) of
+ ok ->
+ {ok, Acc} = Callback(start, Acc0),
+ {ok, #collector{counters=Seqs, user_acc=AccOut}} = send_changes(
+ DbName,
+ Args,
+ Callback,
+ Since,
+ Acc,
+ 5000
+ ),
+ Callback({stop, pack_seqs(Seqs)}, AccOut);
+ Error ->
+ Callback(Error, Acc0)
+ end.
+
+keep_sending_changes(DbName, Args, Callback, Seqs, AccIn, Timeout, UpListen) ->
+ #changes_args{limit=Limit, feed=Feed, heartbeat=Heartbeat} = Args,
+ {ok, Collector} = send_changes(DbName, Args, Callback, Seqs, AccIn, Timeout),
+ #collector{limit=Limit2, counters=NewSeqs, user_acc=AccOut} = Collector,
+ LastSeq = pack_seqs(NewSeqs),
+ if Limit > Limit2, Feed == "longpoll" ->
+ Callback({stop, LastSeq}, AccOut);
+ true ->
+ case {Heartbeat, wait_db_updated(UpListen)} of
+ {undefined, timeout} ->
+ Callback({stop, LastSeq}, AccOut);
+ _ ->
+ {ok, AccTimeout} = Callback(timeout, AccOut),
+ keep_sending_changes(
+ DbName,
+ Args#changes_args{limit=Limit2},
+ Callback,
+ LastSeq,
+ AccTimeout,
+ Timeout,
+ UpListen
+ )
+ end
+ end.
+
+send_changes(DbName, ChangesArgs, Callback, PackedSeqs, AccIn, Timeout) ->
+ AllShards = mem3:shards(DbName),
+ Seqs = lists:flatmap(fun({#shard{name=Name, node=N} = Shard, Seq}) ->
+ case lists:member(Shard, AllShards) of
+ true ->
+ Ref = rexi:cast(N, {fabric_rpc, changes, [Name,ChangesArgs,Seq]}),
+ [{Shard#shard{ref = Ref}, Seq}];
+ false ->
+ % Find some replacement shards to cover the missing range
+ % TODO It's possible in rare cases of shard merging to end up
+ % with overlapping shard ranges from this technique
+ lists:map(fun(#shard{name=Name2, node=N2} = NewShard) ->
+ Ref = rexi:cast(N2, {fabric_rpc, changes, [Name2,ChangesArgs,0]}),
+ {NewShard#shard{ref = Ref}, 0}
+ end, find_replacement_shards(Shard, AllShards))
+ end
+ end, unpack_seqs(PackedSeqs, DbName)),
+ {Workers, _} = lists:unzip(Seqs),
+ RexiMon = fabric_util:create_monitors(Workers),
+ State = #collector{
+ query_args = ChangesArgs,
+ callback = Callback,
+ counters = orddict:from_list(Seqs),
+ user_acc = AccIn,
+ limit = ChangesArgs#changes_args.limit,
+ rows = Seqs % store sequence positions instead
+ },
+ %% TODO: errors need to be handled here
+ try
+ receive_results(Workers, State, Timeout, Callback, AccIn)
+ after
+ rexi_monitor:stop(RexiMon),
+ fabric_util:cleanup(Workers)
+ end.
+
+receive_results(Workers, State, Timeout, Callback, AccIn) ->
+ case rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, State,
+ infinity, Timeout) of
+ {timeout, NewState0} ->
+ {ok, AccOut} = Callback(timeout, NewState0#collector.user_acc),
+ NewState = NewState0#collector{user_acc = AccOut},
+ receive_results(Workers, NewState, Timeout, Callback, AccOut);
+ {_, NewState} ->
+ {ok, NewState}
+ end.
+
+handle_message({rexi_DOWN, _, {_, NodeRef}, _}, nil, State) ->
+ fabric_view:remove_down_shards(State, NodeRef);
+
+handle_message({rexi_EXIT, Reason}, Worker, State) ->
+ #collector{
+ callback=Callback,
+ counters=Counters0,
+ rows = Seqs0,
+ user_acc=Acc
+ } = State,
+ Counters = fabric_dict:erase(Worker, Counters0),
+ Seqs = fabric_dict:erase(Worker, Seqs0),
+ case fabric_view:is_progress_possible(Counters) of
+ true ->
+ {ok, State#collector{counters = Counters, rows=Seqs}};
+ false ->
+ {ok, Resp} = Callback({error, fabric_util:error_info(Reason)}, Acc),
+ {error, Resp}
+ end;
+
+handle_message(_, _, #collector{limit=0} = State) ->
+ {stop, State};
+
+handle_message(#change{key=Seq} = Row0, {Worker, From}, St) ->
+ #collector{
+ query_args = #changes_args{include_docs=IncludeDocs},
+ callback = Callback,
+ counters = S0,
+ limit = Limit,
+ user_acc = AccIn
+ } = St,
+ case fabric_dict:lookup_element(Worker, S0) of
+ undefined ->
+ % this worker lost the race with other partition copies, terminate it
+ gen_server:reply(From, stop),
+ {ok, St};
+ _ ->
+ S1 = fabric_dict:store(Worker, Seq, S0),
+ S2 = fabric_view:remove_overlapping_shards(Worker, S1),
+ Row = Row0#change{key = pack_seqs(S2)},
+ {Go, Acc} = Callback(changes_row(Row, IncludeDocs), AccIn),
+ gen_server:reply(From, Go),
+ {Go, St#collector{counters=S2, limit=Limit-1, user_acc=Acc}}
+ end;
+
+handle_message({complete, EndSeq}, Worker, State) ->
+ #collector{
+ counters = S0,
+ total_rows = Completed % override
+ } = State,
+ case fabric_dict:lookup_element(Worker, S0) of
+ undefined ->
+ {ok, State};
+ _ ->
+ S1 = fabric_dict:store(Worker, EndSeq, S0),
+ % unlikely to have overlaps here, but possible w/ filters
+ S2 = fabric_view:remove_overlapping_shards(Worker, S1),
+ NewState = State#collector{counters=S2, total_rows=Completed+1},
+ case fabric_dict:size(S2) =:= (Completed+1) of
+ true ->
+ {stop, NewState};
+ false ->
+ {ok, NewState}
+ end
+ end.
+
+make_changes_args(#changes_args{style=Style, filter=undefined}=Args) ->
+ Args#changes_args{filter = Style};
+make_changes_args(Args) ->
+ Args.
+
+get_start_seq(_DbName, #changes_args{dir=fwd, since=Since}) ->
+ Since;
+get_start_seq(DbName, #changes_args{dir=rev}) ->
+ Shards = mem3:shards(DbName),
+ Workers = fabric_util:submit_jobs(Shards, get_update_seq, []),
+ {ok, Since} = fabric_util:recv(Workers, #shard.ref,
+ fun collect_update_seqs/3, fabric_dict:init(Workers, -1)),
+ Since.
+
+collect_update_seqs(Seq, Shard, Counters) when is_integer(Seq) ->
+ case fabric_dict:lookup_element(Shard, Counters) of
+ undefined ->
+ % already heard from someone else in this range
+ {ok, Counters};
+ -1 ->
+ C1 = fabric_dict:store(Shard, Seq, Counters),
+ C2 = fabric_view:remove_overlapping_shards(Shard, C1),
+ case fabric_dict:any(-1, C2) of
+ true ->
+ {ok, C2};
+ false ->
+ {stop, pack_seqs(C2)}
+ end
+ end.
+
+pack_seqs(Workers) ->
+ SeqList = [{N,R,S} || {#shard{node=N, range=R}, S} <- Workers],
+ SeqSum = lists:sum(element(2, lists:unzip(Workers))),
+ Opaque = couch_util:encodeBase64Url(term_to_binary(SeqList, [compressed])),
+ [SeqSum, Opaque].
+
+unpack_seqs(0, DbName) ->
+ fabric_dict:init(mem3:shards(DbName), 0);
+
+unpack_seqs("0", DbName) ->
+ fabric_dict:init(mem3:shards(DbName), 0);
+
+unpack_seqs([_SeqNum, Opaque], DbName) ->
+ do_unpack_seqs(Opaque, DbName);
+
+unpack_seqs(Packed, DbName) ->
+ NewPattern = "^\\[[0-9]+,\"(?<opaque>.*)\"\\]$",
+ OldPattern = "^([0-9]+-)?(?<opaque>.*)$",
+ Options = [{capture, [opaque], binary}],
+ Opaque = case re:run(Packed, NewPattern, Options) of
+ {match, Match} ->
+ Match;
+ nomatch ->
+ {match, Match} = re:run(Packed, OldPattern, Options),
+ Match
+ end,
+ do_unpack_seqs(Opaque, DbName).
+
+do_unpack_seqs(Opaque, DbName) ->
+ % TODO relies on internal structure of fabric_dict as keylist
+ lists:map(fun({Node, [A,B], Seq}) ->
+ Match = #shard{node=Node, range=[A,B], dbname=DbName, _ = '_'},
+ case ets:match_object(partitions, Match) of
+ [Shard] ->
+ {Shard, Seq};
+ [] ->
+ {Match, Seq} % will be replaced in find_replacement_shards
+ end
+ end, binary_to_term(couch_util:decodeBase64Url(Opaque))).
+
+changes_row(#change{key=Seq, id=Id, value=Value, deleted=true, doc=Doc}, true) ->
+ {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {deleted, true}, {doc, Doc}]}};
+changes_row(#change{key=Seq, id=Id, value=Value, deleted=true}, false) ->
+ {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {deleted, true}]}};
+changes_row(#change{key=Seq, id=Id, value=Value, doc={error,Reason}}, true) ->
+ {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {error,Reason}]}};
+changes_row(#change{key=Seq, id=Id, value=Value, doc=Doc}, true) ->
+ {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {doc,Doc}]}};
+changes_row(#change{key=Seq, id=Id, value=Value}, false) ->
+ {change, {[{seq,Seq}, {id,Id}, {changes,Value}]}}.
+
+find_replacement_shards(#shard{range=Range}, AllShards) ->
+ % TODO make this moar betta -- we might have split or merged the partition
+ [Shard || Shard <- AllShards, Shard#shard.range =:= Range].
+
+validate_start_seq(DbName, Seq) ->
+ try unpack_seqs(Seq, DbName) of _Any ->
+ ok
+ catch _:_ ->
+ Reason = <<"Malformed sequence supplied in 'since' parameter.">>,
+ {error, {bad_request, Reason}}
+ end.
+
+unpack_seqs_test() ->
+ ets:new(partitions, [named_table]),
+
+ % BigCouch 0.3 style.
+ assert_shards("23423-g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwND"
+ "LXMwBCwxygOFMiQ5L8____sxIZcKlIUgCSSfZgRUw4FTmAFMWDFTHiVJQAUlSPX1Ee"
+ "C5BkaABSQHXzsxKZ8StcAFG4H4_bIAoPQBTeJ2j1A4hCUJBkAQC7U1NA"),
+
+ % BigCouch 0.4 style.
+ assert_shards([23423,<<"g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwND"
+ "LXMwBCwxygOFMiQ5L8____sxIZcKlIUgCSSfZgRUw4FTmAFMWDFTHiVJQAUlSPX1Ee"
+ "C5BkaABSQHXzsxKZ8StcAFG4H4_bIAoPQBTeJ2j1A4hCUJBkAQC7U1NA">>]),
+
+ % BigCouch 0.4 style (as string).
+ assert_shards("[23423,\"g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwND"
+ "LXMwBCwxygOFMiQ5L8____sxIZcKlIUgCSSfZgRUw4FTmAFMWDFTHiVJQAUlSPX1Ee"
+ "C5BkaABSQHXzsxKZ8StcAFG4H4_bIAoPQBTeJ2j1A4hCUJBkAQC7U1NA\"]"),
+
+ % with internal hypen
+ assert_shards("651-g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwNDLXMwBCwxygOFMiQ"
+ "5L8____sxJTcalIUgCSSfZgReE4FTmAFMWDFYXgVJQAUlQPVuSKS1EeC5BkaABSQHXz8"
+ "VgJUbgAonB_VqIPfoUHIArvE7T6AUQh0I1-WQAzp1XB"),
+ assert_shards([651,"g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwNDLXMwBCwxygOFMiQ"
+ "5L8____sxJTcalIUgCSSfZgReE4FTmAFMWDFYXgVJQAUlQPVuSKS1EeC5BkaABSQHXz8"
+ "VgJUbgAonB_VqIPfoUHIArvE7T6AUQh0I1-WQAzp1XB"]),
+
+ ets:delete(partitions).
+
+assert_shards(Packed) ->
+ ?assertMatch([{#shard{},_}|_], unpack_seqs(Packed, <<"foo">>)).