diff options
author | Adam Kocoloski <adam@cloudant.com> | 2010-07-02 03:02:12 -0400 |
---|---|---|
committer | Adam Kocoloski <adam@cloudant.com> | 2010-07-02 03:02:12 -0400 |
commit | ced7cd5afff2f79b91f364f713d9851038e3b6ab (patch) | |
tree | ec813d676a632582cd376fe32bb8cbfc393153bf | |
parent | 2436c4fba43d02bd0a73893895b43e198715082b (diff) |
update for mem3 refactor, more robust DB create/delete
-rw-r--r-- | src/fabric.erl | 14 | ||||
-rw-r--r-- | src/fabric_db_create.erl | 102 | ||||
-rw-r--r-- | src/fabric_db_delete.erl | 78 | ||||
-rw-r--r-- | src/fabric_db_doc_count.erl | 2 | ||||
-rw-r--r-- | src/fabric_db_info.erl | 2 | ||||
-rw-r--r-- | src/fabric_doc_missing_revs.erl | 2 | ||||
-rw-r--r-- | src/fabric_doc_open.erl | 2 | ||||
-rw-r--r-- | src/fabric_doc_open_revs.erl | 2 | ||||
-rw-r--r-- | src/fabric_doc_update.erl | 2 | ||||
-rw-r--r-- | src/fabric_group_info.erl | 2 | ||||
-rw-r--r-- | src/fabric_rpc.erl | 14 | ||||
-rw-r--r-- | src/fabric_view.erl | 44 | ||||
-rw-r--r-- | src/fabric_view_all_docs.erl | 2 | ||||
-rw-r--r-- | src/fabric_view_changes.erl | 10 | ||||
-rw-r--r-- | src/fabric_view_map.erl | 2 | ||||
-rw-r--r-- | src/fabric_view_reduce.erl | 2 |
16 files changed, 138 insertions, 144 deletions
diff --git a/src/fabric.erl b/src/fabric.erl index badc1379..b233677b 100644 --- a/src/fabric.erl +++ b/src/fabric.erl @@ -1,8 +1,8 @@ -module(fabric). % DBs --export([all_dbs/0, all_dbs/1, create_db/2, delete_db/2, get_db_info/1, - get_doc_count/1]). +-export([all_dbs/0, all_dbs/1, create_db/1, create_db/2, delete_db/1, + delete_db/2, get_db_info/1, get_doc_count/1]). % Documents -export([open_doc/3, open_revs/4, get_missing_revs/2, update_doc/3, @@ -36,11 +36,17 @@ get_db_info(DbName) -> get_doc_count(DbName) -> fabric_db_doc_count:go(dbname(DbName)). +create_db(DbName) -> + create_db(DbName, []). + create_db(DbName, Options) -> - fabric_db_create:create_db(dbname(DbName), opts(Options)). + fabric_db_create:go(dbname(DbName), opts(Options)). + +delete_db(DbName) -> + delete_db(DbName, []). delete_db(DbName, Options) -> - fabric_db_delete:delete_db(dbname(DbName), opts(Options)). + fabric_db_delete:go(dbname(DbName), opts(Options)). % doc operations diff --git a/src/fabric_db_create.erl b/src/fabric_db_create.erl index 4f4e3b20..80bd1eb1 100644 --- a/src/fabric_db_create.erl +++ b/src/fabric_db_create.erl @@ -1,64 +1,56 @@ -module(fabric_db_create). --author(brad@cloudant.com). - --export([create_db/2]). +-export([go/2]). -include("fabric.hrl"). %% @doc Create a new database, and all its partition files across the cluster %% Options is proplist with user_ctx, n, q --spec create_db(binary(), list()) -> {ok, #db{}} | {error, any()}. -create_db(DbName, Options) -> - Fullmap = partitions:fullmap(DbName, Options), - {ok, FullNodes} = mem3:fullnodes(), - RefPartMap = send_create_calls(Fullmap, Options), - Acc0 = {false, length(RefPartMap), lists:usort([ {Beg, false} || - {_,#shard{range=[Beg,_]}} <- RefPartMap])}, - Result = case fabric_util:receive_loop( - RefPartMap, 1, fun handle_create_msg/3, Acc0) of - {ok, _Results} -> ok; - Error -> Error - end, - % always install partition map, even w/ errors, so delete is possible - partitions:install_fullmap(DbName, Fullmap, FullNodes, Options), - Result. - -%% -%% internal -%% +go(DbName, Options) -> + Shards = mem3:choose_shards(DbName, Options), + Doc = make_document(Shards), + Workers = fabric_util:submit_jobs(Shards, create_db, [Options, Doc]), + Acc0 = fabric_dict:init(Workers, nil), + case fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) of + {ok, _} -> + ok; + Else -> + Else + end. -%% @doc create the partitions on all appropriate nodes (rexi calls) --spec send_create_calls(fullmap(), list()) -> [{reference(), part()}]. -send_create_calls(Fullmap, Options) -> - lists:map(fun(#shard{node=Node, name=ShardName} = Part) -> - Ref = rexi:async_server_call({couch_server, Node}, - {create, ShardName, Options}), - {Ref, Part} - end, Fullmap). +handle_message(Msg, Shard, Counters) -> + C1 = fabric_dict:store(Shard, Msg, Counters), + case fabric_dict:any(nil, C1) of + true -> + {ok, C1}; + false -> + final_answer(C1) + end. -%% @doc handle create messages from shards -handle_create_msg(file_exists, _, _) -> - {error, file_exists}; -handle_create_msg({rexi_EXIT, _Reason}, _, {Complete, N, Parts}) -> - {ok, {Complete, N-1, Parts}}; -handle_create_msg({rexi_DOWN, _, _, _}, _, {Complete, _N, _Parts}) -> - if - Complete -> {stop, ok}; - true -> {error, create_db_fubar} - end; -handle_create_msg(_, _, {true, 1, _Acc}) -> - {stop, ok}; -handle_create_msg({ok, _}, {_, #shard{range=[Beg,_]}}, {false, 1, PartResults0}) -> - PartResults = lists:keyreplace(Beg, 1, PartResults0, {Beg, true}), - case is_complete(PartResults) of - true -> {stop, ok}; - false -> {error, create_db_fubar} - end; -handle_create_msg({ok, _}, _RefPart, {true, N, Parts}) -> - {ok, {true, N-1, Parts}}; -handle_create_msg({ok, _}, {_Ref, #shard{range=[Beg,_]}}, {false, Rem, PartResults0}) -> - PartResults = lists:keyreplace(Beg, 1, PartResults0, {Beg, true}), - {ok, {is_complete(PartResults), Rem-1, PartResults}}. +make_document([#shard{dbname=DbName}|_] = Shards) -> + {RawOut, ByNodeOut, ByRangeOut} = + lists:foldl(fun(#shard{node=N, range=[B,E]}, {Raw, ByNode, ByRange}) -> + Range = ?l2b([couch_util:to_hex(<<B:32/integer>>), "-", + couch_util:to_hex(<<E:32/integer>>)]), + Node = couch_util:to_binary(N), + {[[<<"add">>, Range, Node] | Raw], orddict:append(Node, Range, ByNode), + orddict:append(Range, Node, ByRange)} + end, {[], [], []}, Shards), + #doc{id=DbName, body = {[ + {<<"changelog">>, lists:sort(RawOut)}, + {<<"by_node">>, {[{K,lists:sort(V)} || {K,V} <- ByNodeOut]}}, + {<<"by_range">>, {[{K,lists:sort(V)} || {K,V} <- ByRangeOut]}} + ]}}. -is_complete(List) -> - lists:all(fun({_,Bool}) -> Bool end, List). +final_answer(Counters) -> + Successes = [X || {_, M} = X <- Counters, M == ok orelse M == file_exists], + case fabric_view:is_progress_possible(Successes) of + true -> + case lists:keymember(file_exists, 2, Successes) of + true -> + {error, file_exists}; + false -> + {stop, ok} + end; + false -> + {error, internal_server_error} + end. diff --git a/src/fabric_db_delete.erl b/src/fabric_db_delete.erl index 95b1a5ef..923b38dc 100644 --- a/src/fabric_db_delete.erl +++ b/src/fabric_db_delete.erl @@ -1,56 +1,40 @@ -module(fabric_db_delete). --author(brad@cloudant.com). - --export([delete_db/2]). +-export([go/2]). -include("fabric.hrl"). -%% @doc Delete a database, and all its partition files across the cluster -%% Options is proplist with user_ctx, n, q --spec delete_db(binary(), list()) -> {ok, #db{}} | {error, any()}. -delete_db(DbName, Options) -> - Fullmap = partitions:all_parts(DbName), - RefPartMap = send_delete_calls(Fullmap, Options), - Acc0 = {not_found, length(RefPartMap)}, - case fabric_util:receive_loop( - RefPartMap, 1, fun handle_delete_msg/3, Acc0) of - {ok, _Results} -> - delete_fullmap(DbName), +go(DbName, Options) -> + Shards = mem3:shards(DbName), + Workers = fabric_util:submit_jobs(Shards, delete_db, [Options, DbName]), + Acc0 = fabric_dict:init(Workers, nil), + case fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) of + {ok, ok} -> ok; - Error -> Error + {ok, not_found} -> + erlang:error(database_does_not_exist); + Error -> + Error end. -%% -%% internal -%% - -%% @doc delete the partitions on all appropriate nodes (rexi calls) --spec send_delete_calls(fullmap(), list()) -> [{reference(), part()}]. -send_delete_calls(Parts, Options) -> - lists:map(fun(#shard{node=Node, name=ShardName} = Part) -> - Ref = rexi:async_server_call({couch_server, Node}, - {delete, ShardName, Options}), - {Ref, Part} - end, Parts). - -handle_delete_msg(ok, _, {_, 1}) -> - {stop, ok}; -handle_delete_msg(not_found, _, {Acc, 1}) -> - {stop, Acc}; -handle_delete_msg({rexi_EXIT, _Reason}, _, {Acc, N}) -> - % TODO is this the appropriate action to take, or should we abort? - {ok, {Acc, N-1}}; -handle_delete_msg({rexi_DOWN, _, _, _}, _, _Acc) -> - {error, delete_db_fubar}; -handle_delete_msg(not_found, _, {Acc, N}) -> - {ok, {Acc, N-1}}; -handle_delete_msg(ok, _, {_, N}) -> - {ok, {ok, N-1}}. +handle_message(Msg, Shard, Counters) -> + C1 = fabric_dict:store(Shard, Msg, Counters), + case fabric_dict:any(nil, C1) of + true -> + {ok, C1}; + false -> + final_answer(C1) + end. -delete_fullmap(DbName) -> - case couch_db:open(<<"dbs">>, []) of - {ok, Db} -> - {ok, Doc} = couch_api:open_doc(Db, DbName, nil, []), - couch_api:update_doc(Db, Doc#doc{deleted=true}); - Error -> Error +final_answer(Counters) -> + Successes = [X || {_, M} = X <- Counters, M == ok orelse M == not_found], + case fabric_view:is_progress_possible(Successes) of + true -> + case lists:keymember(ok, 2, Successes) of + true -> + {stop, ok}; + false -> + {stop, not_found} + end; + false -> + {error, internal_server_error} end. diff --git a/src/fabric_db_doc_count.erl b/src/fabric_db_doc_count.erl index 4c3a72d5..c587d103 100644 --- a/src/fabric_db_doc_count.erl +++ b/src/fabric_db_doc_count.erl @@ -5,7 +5,7 @@ -include("fabric.hrl"). go(DbName) -> - Shards = partitions:all_parts(DbName), + Shards = mem3:shards(DbName), Workers = fabric_util:submit_jobs(Shards, get_doc_count, []), Acc0 = {length(Workers), [{Beg,nil} || #shard{range=[Beg,_]} <- Workers]}, fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0). diff --git a/src/fabric_db_info.erl b/src/fabric_db_info.erl index 8db7212f..ecb8ce1c 100644 --- a/src/fabric_db_info.erl +++ b/src/fabric_db_info.erl @@ -5,7 +5,7 @@ -include("fabric.hrl"). go(DbName) -> - Shards = partitions:all_parts(DbName), + Shards = mem3:shards(DbName), Workers = fabric_util:submit_jobs(Shards, get_db_info, []), Acc0 = {fabric_dict:init(Workers, nil), []}, fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0). diff --git a/src/fabric_doc_missing_revs.erl b/src/fabric_doc_missing_revs.erl index 8f1f5b4c..22c11ad6 100644 --- a/src/fabric_doc_missing_revs.erl +++ b/src/fabric_doc_missing_revs.erl @@ -51,7 +51,7 @@ group_idrevs_by_shard(DbName, IdsRevs) -> dict:to_list(lists:foldl(fun({Id, Revs}, D0) -> lists:foldl(fun(Shard, D1) -> dict:append(Shard, {Id, Revs}, D1) - end, D0, partitions:for_key(DbName,Id)) + end, D0, mem3:shards(DbName,Id)) end, dict:new(), IdsRevs)). update_dict(D0, KVs) -> diff --git a/src/fabric_doc_open.erl b/src/fabric_doc_open.erl index 6f39f39e..96545c55 100644 --- a/src/fabric_doc_open.erl +++ b/src/fabric_doc_open.erl @@ -5,7 +5,7 @@ -include("fabric.hrl"). go(DbName, Id, Options) -> - Workers = fabric_util:submit_jobs(partitions:for_key(DbName,Id), open_doc, + Workers = fabric_util:submit_jobs(mem3:shards(DbName,Id), open_doc, [Id, Options]), SuppressDeletedDoc = not lists:member(deleted, Options), Acc0 = {length(Workers), couch_util:get_value(r, Options, 1), []}, diff --git a/src/fabric_doc_open_revs.erl b/src/fabric_doc_open_revs.erl index 2fa91208..5bf5499f 100644 --- a/src/fabric_doc_open_revs.erl +++ b/src/fabric_doc_open_revs.erl @@ -5,7 +5,7 @@ -include("fabric.hrl"). go(DbName, Id, Revs, Options) -> - Workers = fabric_util:submit_jobs(partitions:for_key(DbName,Id), open_revs, + Workers = fabric_util:submit_jobs(mem3:shards(DbName,Id), open_revs, [Id, Revs, Options]), Acc0 = {length(Workers), couch_util:get_value(r, Options, 1), []}, case fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) of diff --git a/src/fabric_doc_update.erl b/src/fabric_doc_update.erl index 9d3cd3d1..77c182f1 100644 --- a/src/fabric_doc_update.erl +++ b/src/fabric_doc_update.erl @@ -84,7 +84,7 @@ group_docs_by_shard(DbName, Docs) -> dict:to_list(lists:foldl(fun(#doc{id=Id} = Doc, D0) -> lists:foldl(fun(Shard, D1) -> dict:append(Shard, Doc, D1) - end, D0, partitions:for_key(DbName,Id)) + end, D0, mem3:shards(DbName,Id)) end, dict:new(), Docs)). append_update_replies([], [], DocReplyDict) -> diff --git a/src/fabric_group_info.erl b/src/fabric_group_info.erl index d2b76674..a1ba92cc 100644 --- a/src/fabric_group_info.erl +++ b/src/fabric_group_info.erl @@ -10,7 +10,7 @@ go(DbName, GroupId) when is_binary(GroupId) -> go(DbName, #doc{} = DDoc) -> Group = couch_view_group:design_doc_to_view_group(#db{name=DbName}, DDoc), - Shards = partitions:all_parts(DbName), + Shards = mem3:shards(DbName), Workers = fabric_util:submit_jobs(Shards, group_info, [Group]), Acc0 = {fabric_dict:init(Workers, nil), []}, fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0). diff --git a/src/fabric_rpc.erl b/src/fabric_rpc.erl index 04aebfd1..1a2edf77 100644 --- a/src/fabric_rpc.erl +++ b/src/fabric_rpc.erl @@ -3,6 +3,7 @@ -export([get_db_info/1, get_doc_count/1, get_update_seq/1]). -export([open_doc/3, open_revs/4, get_missing_revs/2, update_docs/3]). -export([all_docs/2, changes/3, map_view/4, reduce_view/4, group_info/2]). +-export([create_db/3, delete_db/3]). -include("fabric.hrl"). @@ -136,6 +137,19 @@ reduce_view(DbName, Group0, ViewName, QueryArgs) -> end, rexi:reply(complete). +create_db(DbName, Options, Doc) -> + mem3_util:write_db_doc(Doc), + rexi:reply(case couch_server:create(DbName, Options) of + {ok, _} -> + ok; + Error -> + Error + end). + +delete_db(DbName, Options, DocId) -> + mem3_util:delete_db_doc(DocId), + rexi:reply(couch_server:delete(DbName, Options)). + get_db_info(DbName) -> with_db(DbName, [], {couch_db, get_db_info, []}). diff --git a/src/fabric_view.erl b/src/fabric_view.erl index 738bb7dd..09fcd43c 100644 --- a/src/fabric_view.erl +++ b/src/fabric_view.erl @@ -8,34 +8,32 @@ %% @doc looks for a fully covered keyrange in the list of counters -spec is_progress_possible([{#shard{}, non_neg_integer()}]) -> boolean(). +is_progress_possible([]) -> + false; is_progress_possible(Counters) -> Ranges = fabric_dict:fold(fun(#shard{range=[X,Y]}, _, A) -> [{X,Y}|A] end, [], Counters), - [First | Rest] = lists:ukeysort(1, Ranges), - {Head, Tail} = lists:foldl(fun - (_, {Head, Tail}) when Head =:= Tail -> - % this is the success condition, we can fast-forward - {Head, Tail}; - (_, {foo, bar}) -> + [{0, Tail0} | Rest] = lists:ukeysort(1, Ranges), + Result = lists:foldl(fun + (_, fail) -> % we've already declared failure - {foo, bar}; - ({X,_}, {Head, Tail}) when Head < Tail, X > Tail -> + fail; + (_, complete) -> + % this is the success condition, we can fast-forward + complete; + ({X,_}, Tail) when X > (Tail+1) -> % gap in the keyrange, we're dead - {foo, bar}; - ({X,Y}, {Head, Tail}) when Head < Tail, X < Y -> - % the normal condition, adding to the tail - {Head, erlang:max(Tail, Y)}; - ({X,Y}, {Head, Tail}) when Head < Tail, X > Y, Y >= Head -> - % we've wrapped all the way around, trigger success condition - {Head, Head}; - ({X,Y}, {Head, Tail}) when Head < Tail, X > Y -> - % this wraps the keyspace, but there's still a gap. We're dead - % TODO technically, another shard could be a superset of this one, and - % we could still be alive. Pretty unlikely though, and impossible if - % we don't allow shards to wrap around the boundary - {foo, bar} - end, First, Rest), - Head =:= Tail. + fail; + ({_,Y}, Tail) -> + case erlang:max(Tail, Y) of + End when (End+1) =:= (2 bsl 31) -> + complete; + Else -> + % the normal condition, adding to the tail + Else + end + end, Tail0, Rest), + Result =:= complete. -spec remove_overlapping_shards(#shard{}, [#shard{}]) -> [#shard{}]. remove_overlapping_shards(#shard{range=[A,B]} = Shard0, Shards) -> diff --git a/src/fabric_view_all_docs.erl b/src/fabric_view_all_docs.erl index 196d6837..f1713b86 100644 --- a/src/fabric_view_all_docs.erl +++ b/src/fabric_view_all_docs.erl @@ -9,7 +9,7 @@ go(DbName, #view_query_args{keys=nil} = QueryArgs, Callback, Acc0) -> Workers = lists:map(fun(#shard{name=Name, node=Node} = Shard) -> Ref = rexi:cast(Node, {fabric_rpc, all_docs, [Name, QueryArgs]}), Shard#shard{ref = Ref} - end, partitions:all_parts(DbName)), + end, mem3:shards(DbName)), BufferSize = couch_config:get("fabric", "map_buffer_size", "2"), #view_query_args{limit = Limit, skip = Skip} = QueryArgs, State = #collector{ diff --git a/src/fabric_view_changes.erl b/src/fabric_view_changes.erl index 39a57176..f6989061 100644 --- a/src/fabric_view_changes.erl +++ b/src/fabric_view_changes.erl @@ -62,7 +62,7 @@ keep_sending_changes(DbName, Args, Callback, Seqs, AccIn, Timeout, TFun) -> end. send_changes(DbName, ChangesArgs, Callback, PackedSeqs, AccIn) -> - AllShards = partitions:all_parts(DbName), + AllShards = mem3:shards(DbName), Seqs = lists:flatmap(fun({#shard{name=Name, node=N} = Shard, Seq}) -> case lists:member(Shard, AllShards) of true -> @@ -168,7 +168,7 @@ make_changes_args(Options) -> get_start_seq(_DbName, #changes_args{dir=fwd, since=Since}) -> Since; get_start_seq(DbName, #changes_args{dir=rev}) -> - Shards = partitions:all_parts(DbName), + Shards = mem3:shards(DbName), Workers = fabric_util:submit_jobs(Shards, get_update_seq, []), {ok, Since} = fabric_util:recv(Workers, #shard.ref, fun collect_update_seqs/3, fabric_dict:init(Workers, -1)), @@ -195,10 +195,10 @@ pack_seqs(Workers) -> couch_util:encodeBase64Url(term_to_binary(SeqList, [compressed])). unpack_seqs(0, DbName) -> - fabric_dict:init(partitions:all_parts(DbName), 0); + fabric_dict:init(mem3:shards(DbName), 0); unpack_seqs("0", DbName) -> - fabric_dict:init(partitions:all_parts(DbName), 0); + fabric_dict:init(mem3:shards(DbName), 0); unpack_seqs(Packed, DbName) -> % TODO relies on internal structure of fabric_dict as keylist @@ -210,7 +210,7 @@ unpack_seqs(Packed, DbName) -> start_update_notifiers(DbName) -> lists:map(fun(#shard{node=Node, name=Name}) -> {Node, rexi:cast(Node, {?MODULE, start_update_notifier, [Name]})} - end, partitions:all_parts(DbName)). + end, mem3:shards(DbName)). % rexi endpoint start_update_notifier(DbName) -> diff --git a/src/fabric_view_map.erl b/src/fabric_view_map.erl index 69133f3b..6c6dfc96 100644 --- a/src/fabric_view_map.erl +++ b/src/fabric_view_map.erl @@ -12,7 +12,7 @@ go(DbName, DDoc, View, Args, Callback, Acc0) -> Workers = lists:map(fun(#shard{name=Name, node=Node} = Shard) -> Ref = rexi:cast(Node, {fabric_rpc, map_view, [Name, DDoc, View, Args]}), Shard#shard{ref = Ref} - end, partitions:all_parts(DbName)), + end, mem3:shards(DbName)), BufferSize = couch_config:get("fabric", "map_buffer_size", "2"), #view_query_args{limit = Limit, skip = Skip, keys = Keys} = Args, State = #collector{ diff --git a/src/fabric_view_reduce.erl b/src/fabric_view_reduce.erl index ca137314..4f08b3ed 100644 --- a/src/fabric_view_reduce.erl +++ b/src/fabric_view_reduce.erl @@ -16,7 +16,7 @@ go(DbName, DDoc, VName, Args, Callback, Acc0) -> Workers = lists:map(fun(#shard{name=Name, node=N} = Shard) -> Ref = rexi:cast(N, {fabric_rpc, reduce_view, [Name,Group,VName,Args]}), Shard#shard{ref = Ref} - end, partitions:all_parts(DbName)), + end, mem3:shards(DbName)), BufferSize = couch_config:get("fabric", "reduce_buffer_size", "20"), #view_query_args{limit = Limit, skip = Skip} = Args, State = #collector{ |