diff options
Diffstat (limited to 'src/fabric_db_create.erl')
-rw-r--r-- | src/fabric_db_create.erl | 102 |
1 files changed, 47 insertions, 55 deletions
diff --git a/src/fabric_db_create.erl b/src/fabric_db_create.erl index 4f4e3b20..80bd1eb1 100644 --- a/src/fabric_db_create.erl +++ b/src/fabric_db_create.erl @@ -1,64 +1,56 @@ -module(fabric_db_create). --author(brad@cloudant.com). - --export([create_db/2]). +-export([go/2]). -include("fabric.hrl"). %% @doc Create a new database, and all its partition files across the cluster %% Options is proplist with user_ctx, n, q --spec create_db(binary(), list()) -> {ok, #db{}} | {error, any()}. -create_db(DbName, Options) -> - Fullmap = partitions:fullmap(DbName, Options), - {ok, FullNodes} = mem3:fullnodes(), - RefPartMap = send_create_calls(Fullmap, Options), - Acc0 = {false, length(RefPartMap), lists:usort([ {Beg, false} || - {_,#shard{range=[Beg,_]}} <- RefPartMap])}, - Result = case fabric_util:receive_loop( - RefPartMap, 1, fun handle_create_msg/3, Acc0) of - {ok, _Results} -> ok; - Error -> Error - end, - % always install partition map, even w/ errors, so delete is possible - partitions:install_fullmap(DbName, Fullmap, FullNodes, Options), - Result. - -%% -%% internal -%% +go(DbName, Options) -> + Shards = mem3:choose_shards(DbName, Options), + Doc = make_document(Shards), + Workers = fabric_util:submit_jobs(Shards, create_db, [Options, Doc]), + Acc0 = fabric_dict:init(Workers, nil), + case fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) of + {ok, _} -> + ok; + Else -> + Else + end. -%% @doc create the partitions on all appropriate nodes (rexi calls) --spec send_create_calls(fullmap(), list()) -> [{reference(), part()}]. -send_create_calls(Fullmap, Options) -> - lists:map(fun(#shard{node=Node, name=ShardName} = Part) -> - Ref = rexi:async_server_call({couch_server, Node}, - {create, ShardName, Options}), - {Ref, Part} - end, Fullmap). +handle_message(Msg, Shard, Counters) -> + C1 = fabric_dict:store(Shard, Msg, Counters), + case fabric_dict:any(nil, C1) of + true -> + {ok, C1}; + false -> + final_answer(C1) + end. -%% @doc handle create messages from shards -handle_create_msg(file_exists, _, _) -> - {error, file_exists}; -handle_create_msg({rexi_EXIT, _Reason}, _, {Complete, N, Parts}) -> - {ok, {Complete, N-1, Parts}}; -handle_create_msg({rexi_DOWN, _, _, _}, _, {Complete, _N, _Parts}) -> - if - Complete -> {stop, ok}; - true -> {error, create_db_fubar} - end; -handle_create_msg(_, _, {true, 1, _Acc}) -> - {stop, ok}; -handle_create_msg({ok, _}, {_, #shard{range=[Beg,_]}}, {false, 1, PartResults0}) -> - PartResults = lists:keyreplace(Beg, 1, PartResults0, {Beg, true}), - case is_complete(PartResults) of - true -> {stop, ok}; - false -> {error, create_db_fubar} - end; -handle_create_msg({ok, _}, _RefPart, {true, N, Parts}) -> - {ok, {true, N-1, Parts}}; -handle_create_msg({ok, _}, {_Ref, #shard{range=[Beg,_]}}, {false, Rem, PartResults0}) -> - PartResults = lists:keyreplace(Beg, 1, PartResults0, {Beg, true}), - {ok, {is_complete(PartResults), Rem-1, PartResults}}. +make_document([#shard{dbname=DbName}|_] = Shards) -> + {RawOut, ByNodeOut, ByRangeOut} = + lists:foldl(fun(#shard{node=N, range=[B,E]}, {Raw, ByNode, ByRange}) -> + Range = ?l2b([couch_util:to_hex(<<B:32/integer>>), "-", + couch_util:to_hex(<<E:32/integer>>)]), + Node = couch_util:to_binary(N), + {[[<<"add">>, Range, Node] | Raw], orddict:append(Node, Range, ByNode), + orddict:append(Range, Node, ByRange)} + end, {[], [], []}, Shards), + #doc{id=DbName, body = {[ + {<<"changelog">>, lists:sort(RawOut)}, + {<<"by_node">>, {[{K,lists:sort(V)} || {K,V} <- ByNodeOut]}}, + {<<"by_range">>, {[{K,lists:sort(V)} || {K,V} <- ByRangeOut]}} + ]}}. -is_complete(List) -> - lists:all(fun({_,Bool}) -> Bool end, List). +final_answer(Counters) -> + Successes = [X || {_, M} = X <- Counters, M == ok orelse M == file_exists], + case fabric_view:is_progress_possible(Successes) of + true -> + case lists:keymember(file_exists, 2, Successes) of + true -> + {error, file_exists}; + false -> + {stop, ok} + end; + false -> + {error, internal_server_error} + end. |