diff options
-rw-r--r-- | ebin/fabric.app | 1 | ||||
-rw-r--r-- | include/fabric.hrl | 16 | ||||
-rw-r--r-- | src/fabric.erl | 5 | ||||
-rw-r--r-- | src/fabric_all_docs.erl | 264 | ||||
-rw-r--r-- | src/fabric_rpc.erl | 17 | ||||
-rw-r--r-- | src/fabric_util.erl | 4 |
6 files changed, 296 insertions, 11 deletions
diff --git a/ebin/fabric.app b/ebin/fabric.app index b4e7c461..7b072ca1 100644 --- a/ebin/fabric.app +++ b/ebin/fabric.app @@ -6,6 +6,7 @@ {modules, [ fabric, fabric_all_databases, + fabric_all_docs, fabric_create_db, fabric_delete_db, fabric_get_db_info, diff --git a/include/fabric.hrl b/include/fabric.hrl index 43c589e0..31e4336c 100644 --- a/include/fabric.hrl +++ b/include/fabric.hrl @@ -9,3 +9,19 @@ -endif. -include_lib("eunit/include/eunit.hrl"). + +-record(collector, { + query_args, + callback, + counters, + buffer_size, + blocked = [], + total_rows = 0, + offset = 0, + rows = [], + skip, + limit, + user_acc +}). + +-record(view_row, {key, id, value, doc, worker}). diff --git a/src/fabric.erl b/src/fabric.erl index 7f8f7e23..ed75a5ab 100644 --- a/src/fabric.erl +++ b/src/fabric.erl @@ -3,6 +3,7 @@ -export([all_databases/1, create_db/2, delete_db/2, get_db_info/2, db_path/2]). -export([open_doc/3, open_revs/4, get_missing_revs/2]). -export([update_doc/3, update_docs/3]). +-export([all_docs/4]). -include("fabric.hrl"). @@ -45,6 +46,10 @@ update_docs(DbName, Docs, Options) -> fabric_update_docs:go(dbname(DbName), docs(Docs), Options). +all_docs(DbName, #view_query_args{} = QueryArgs, Callback, Acc0) when + is_function(Callback, 2) -> + fabric_all_docs:go(dbname(DbName), QueryArgs, Callback, Acc0). + %% some simple type validation and transcoding dbname(DbName) when is_list(DbName) -> diff --git a/src/fabric_all_docs.erl b/src/fabric_all_docs.erl new file mode 100644 index 00000000..9304570a --- /dev/null +++ b/src/fabric_all_docs.erl @@ -0,0 +1,264 @@ +-module(fabric_all_docs). + +-export([go/4]). + +-include("fabric.hrl"). + +go(DbName, QueryArgs, Callback, Acc0) -> + Workers = lists:map(fun(#shard{name=Name, node=Node} = Shard) -> + Ref = rexi:cast(Node, {fabric_rpc, all_docs, [Name, QueryArgs]}), + Shard#shard{ref = Ref} + end, partitions:all_parts(DbName)), + BufferSize = couch_config:get("fabric", "map_buffer_size", "2"), + #view_query_args{limit = Limit, skip = Skip} = QueryArgs, + State = #collector{ + query_args = QueryArgs, + callback = Callback, + buffer_size = list_to_integer(BufferSize), + counters = init_counters(Workers), + skip = Skip, + limit = Limit, + user_acc = Acc0 + }, + case fabric_util:receive_loop(Workers, #shard.ref, fun handle_message/3, + State, infinity, 5000) of + {ok, NewState} -> + {ok, NewState#collector.user_acc}; + Error -> + Error + end. + +handle_message({rexi_DOWN, _, _, _}, nil, State) -> + % TODO see if progress can be made here, possibly by removing all shards + % from that node and checking is_progress_possible + {ok, State}; + +handle_message({rexi_EXIT, _}, Worker, State) -> + #collector{callback=Callback, counters=Counters0, user_acc=Acc} = State, + Counters = remove(Worker, Counters0), + case is_progress_possible(Counters) of + true -> + {ok, State#collector{counters = Counters}}; + false -> + Callback({error, dead_shards}, Acc), + {error, dead_shards} + end; + +handle_message({total_and_offset, Tot, Off}, {Worker, From}, State) -> + #collector{ + callback = Callback, + counters = Counters0, + total_rows = Total0, + offset = Offset0, + user_acc = AccIn + } = State, + case lookup_element(Worker, Counters0) of + undefined -> + % this worker lost the race with other partition copies, terminate + gen_server:reply(From, stop), + {ok, State}; + 0 -> + gen_server:reply(From, ok), + Counters1 = update_counter(Worker, 1, Counters0), + Counters2 = remove_overlapping_shards(Worker, Counters1), + Total = Total0 + Tot, + Offset = Offset0 + Off, + case waiting_on_shards(Counters2) of + true -> + {ok, State#collector{ + counters = Counters2, + total_rows = Total, + offset = Offset + }}; + false -> + FinalOffset = erlang:min(Total, Offset+State#collector.skip), + {Go, Acc} = Callback({total_and_offset, Total, FinalOffset}, AccIn), + {Go, State#collector{ + counters = decrement_all_counters(Counters2), + total_rows = Total, + offset = FinalOffset, + user_acc = Acc + }} + end + end; + +handle_message(#view_row{} = Row, {Worker, From}, State) -> + #collector{query_args = Args, counters = Counters0, rows = Rows0} = State, + Dir = Args#view_query_args.direction, + Rows = merge_row(Dir, Row#view_row{worker=Worker}, Rows0), + Counters1 = update_counter(Worker, 1, Counters0), + State1 = State#collector{rows=Rows, counters=Counters1}, + State2 = maybe_pause_worker(Worker, From, State1), + maybe_send_row(State2); + +handle_message(complete, Worker, State) -> + Counters = update_counter(Worker, 1, State#collector.counters), + maybe_send_row(State#collector{counters = Counters}). + + +maybe_pause_worker(Worker, From, State) -> + #collector{buffer_size = BufferSize, counters = Counters} = State, + case lookup_element(Worker, Counters) of + BufferSize -> + State#collector{blocked = [{Worker,From} | State#collector.blocked]}; + _Count -> + gen_server:reply(From, ok), + State + end. + +maybe_resume_worker(Worker, State) -> + #collector{buffer_size = Buffer, counters = C, blocked = B} = State, + case lookup_element(Worker, C) of + Count when Count < Buffer/2 -> + case couch_util:get_value(Worker, B) of + undefined -> + State; + From -> + gen_server:reply(From, ok), + State#collector{blocked = lists:keydelete(Worker, 1, B)} + end; + _Other -> + State + end. + +maybe_send_row(#collector{limit=0} = State) -> + #collector{user_acc=AccIn, callback=Callback} = State, + {_, Acc} = Callback(complete, AccIn), + {stop, State#collector{user_acc=Acc}}; +maybe_send_row(State) -> + #collector{ + callback = Callback, + counters = Counters, + skip = Skip, + limit = Limit, + user_acc = AccIn + } = State, + case waiting_on_shards(Counters) of + true -> + {ok, State}; + false -> + case get_next_row(State) of + complete -> + {_, Acc} = Callback(complete, AccIn), + {stop, State#collector{user_acc=Acc}}; + {_, NewState} when Skip > 0 -> + maybe_send_row(NewState#collector{skip=Skip-1, limit=Limit-1}); + {Row, NewState} -> + case Callback(transform_row(Row), AccIn) of + {stop, Acc} -> + {stop, NewState#collector{user_acc=Acc, limit=Limit-1}}; + {ok, Acc} -> + maybe_send_row(NewState#collector{user_acc=Acc, limit=Limit-1}) + end + end + end. + +get_next_row(#collector{rows = []}) -> + complete; +get_next_row(State) -> + #collector{query_args=Args, rows=[Row|Rest], counters=Counters0} = State, + Worker = Row#view_row.worker, + Counters1 = update_counter(Worker, -1, Counters0), + NewState = maybe_resume_worker(Worker, State#collector{counters=Counters1}), + case stop(Args, Row) of + true -> + complete; + false -> + {Row, NewState#collector{rows = Rest}} + end. + +stop(#view_query_args{direction=fwd, end_key=EndKey}, #view_row{id=Id}) -> + couch_db_updater:less_docid(EndKey, Id); +stop(#view_query_args{direction=rev, end_key=EndKey}, #view_row{id=Id}) -> + couch_db_updater:less_docid(Id, EndKey). + +transform_row(#view_row{key=Key, id=Id, value=Value, doc=undefined}) -> + {row, {[{key,Key}, {id,Id}, {value,Value}]}}; +transform_row(#view_row{key=Key, id=Id, value=Value, doc={error,Reason}}) -> + {row, {[{key,Key}, {id,Id}, {value,Value}, {error,Reason}]}}; +transform_row(#view_row{key=Key, id=Id, value=Value, doc=Doc}) -> + {row, {[{key,Key}, {id,Id}, {value,Value}, {doc,Doc}]}}. + +merge_row(fwd, Row, Rows) -> + lists:keymerge(#view_row.id, [Row], Rows); +merge_row(rev, Row, Rows) -> + lists:rkeymerge(#view_row.id, [Row], Rows). + +remove_overlapping_shards(#shard{range=[A,B]} = Shard0, Shards) -> + filter(fun(#shard{range=[X,Y]} = Shard, _Value) -> + if Shard =:= Shard0 -> + % we can't remove ourselves + true; + A < B, X >= A, X < B -> + % lower bound is inside our range + false; + A < B, Y > A, Y =< B -> + % upper bound is inside our range + false; + B < A, X >= A orelse B < A, X < B -> + % target shard wraps the key range, lower bound is inside + false; + B < A, Y > A orelse B < A, Y =< B -> + % target shard wraps the key range, upper bound is inside + false; + true -> + true + end + end, Shards). + +%% @doc looks for a fully covered keyrange in the list of counters +-spec is_progress_possible([{#shard{}, non_neg_integer()}]) -> boolean(). +is_progress_possible(Counters) -> + Ranges = fold(fun(#shard{range=[X,Y]}, _, A) -> [{X,Y}|A] end, [], Counters), + [First | Rest] = lists:ukeysort(1, Ranges), + {Head, Tail} = lists:foldl(fun + (_, {Head, Tail}) when Head =:= Tail -> + % this is the success condition, we can fast-forward + {Head, Tail}; + (_, {foo, bar}) -> + % we've already declared failure + {foo, bar}; + ({X,_}, {Head, Tail}) when Head < Tail, X > Tail -> + % gap in the keyrange, we're dead + {foo, bar}; + ({X,Y}, {Head, Tail}) when Head < Tail, X < Y -> + % the normal condition, adding to the tail + {Head, erlang:max(Tail, Y)}; + ({X,Y}, {Head, Tail}) when Head < Tail, X > Y, Y >= Head -> + % we've wrapped all the way around, trigger success condition + {Head, Head}; + ({X,Y}, {Head, Tail}) when Head < Tail, X > Y -> + % this wraps the keyspace, but there's still a gap. We're dead + % TODO technically, another shard could be a superset of this one, and + % we could still be alive. Pretty unlikely though, and impossible if + % we don't allow shards to wrap around the boundary + {foo, bar} + end, First, Rest), + Head =:= Tail. + +% Instead of ets, let's use an ordered keylist. We'll need to revisit if we +% have >> 100 shards, so a private interface is a good idea. - APK June 2010 + +init_counters(Keys) -> + orddict:from_list([{Key,0} || Key <- Keys]). + +decrement_all_counters(Dict) -> + [{K,V-1} || {K,V} <- Dict]. + +update_counter(Key, Incr, Dict0) -> + orddict:update_counter(Key, Incr, Dict0). + +lookup_element(Key, Dict) -> + couch_util:get_value(Key, Dict). + +waiting_on_shards(Dict) -> + lists:keymember(0, 2, Dict). + +remove(Shard, Dict) -> + orddict:erase(Shard, Dict). + +filter(Fun, Dict) -> + orddict:filter(Fun, Dict). + +fold(Fun, Acc0, Dict) -> + orddict:fold(Fun, Acc0, Dict). diff --git a/src/fabric_rpc.erl b/src/fabric_rpc.erl index 0ac58640..1cae5ac5 100644 --- a/src/fabric_rpc.erl +++ b/src/fabric_rpc.erl @@ -38,7 +38,7 @@ all_docs(DbName, #view_query_args{keys=nil} = QueryArgs) -> }, {ok, Acc} = couch_db:enum_docs(Db, StartId, Dir, fun view_fold/3, Acc0), if Acc#view_acc.offset == nil -> - Total = couch_db:get_doc_count(Db), + {ok, Total} = couch_db:get_doc_count(Db), rexi:sync_reply({total_and_offset, Total, Total}); true -> ok end, rexi:reply(complete). @@ -108,8 +108,9 @@ view_fold(#full_doc_info{} = FullDocInfo, OffsetReds, Acc) -> view_fold(KV, OffsetReds, #view_acc{offset=nil} = Acc) -> % calculates the offset for this shard #view_acc{db=Db, reduce_fun=Reduce} = Acc, + {ok, Total} = couch_db:get_doc_count(Db), Offset = Reduce(OffsetReds), - rexi:sync_reply({total_and_offset, couch_db:get_doc_count(Db), Offset}), + rexi:sync_reply({total_and_offset, Total, Offset}), view_fold(KV, OffsetReds, Acc#view_acc{offset=Offset}); view_fold(_KV, _Offset, #view_acc{limit=0} = Acc) -> % we scanned through limit+skip local rows @@ -126,21 +127,21 @@ view_fold({{Key,Id}, Value}, _Offset, Acc) -> true -> {stop, Acc}; false -> - RowProps = case IncludeDocs of + Row = case IncludeDocs of true -> case couch_db:open_doc(Db, Id, []) of {not_found, missing} -> - [{id, Id}, {key, Key}, {value, Value}, {error, missing}]; + #view_row{key=Key, id=Id, value=Value, doc={error,missing}}; {not_found, deleted} -> - [{id, Id}, {key, Key}, {value, Value}]; + #view_row{key=Key, id=Id, value=Value}; {ok, Doc} -> JsonDoc = couch_doc:to_json_obj(Doc, []), - [{id, Id}, {key, Key}, {value, Value}, {doc, JsonDoc}] + #view_row{key=Key, id=Id, value=Value, doc=JsonDoc} end; false -> - [{id, Id}, {key, Key}, {value, Value}] + #view_row{key=Key, id=Id, value=Value} end, - rexi:sync_reply({row, RowProps}), + rexi:sync_reply(Row), {ok, Acc#view_acc{limit=Limit-1}} end. diff --git a/src/fabric_util.erl b/src/fabric_util.erl index dd1aaf0a..e904b456 100644 --- a/src/fabric_util.erl +++ b/src/fabric_util.erl @@ -50,7 +50,6 @@ process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) -> {timeout, TimeoutRef} -> timeout; {Ref, Msg} -> - io:format("process_message ~p ~p~n", [Ref, Msg]), case lists:keyfind(Ref, Keypos, RefList) of false -> % this was some non-matching message which we will ignore @@ -59,7 +58,6 @@ process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) -> Fun(Msg, Worker, Acc0) end; {Ref, From, Msg} -> - io:format("process sync_reply {~p,~p} ~p~n", [Ref, From, Msg]), case lists:keyfind(Ref, Keypos, RefList) of false -> {ok, Acc0}; @@ -68,7 +66,7 @@ process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) -> end; {rexi_DOWN, _RexiMonPid, ServerPid, Reason} = Msg -> showroom_log:message(alert, "rexi_DOWN ~p ~p", [ServerPid, Reason]), - Fun(nil, Msg, Acc0) + Fun(Msg, nil, Acc0) after PerMsgTO -> timeout end. |