summaryrefslogtreecommitdiff
path: root/src/fabric_db_create.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/fabric_db_create.erl')
-rw-r--r--src/fabric_db_create.erl102
1 files changed, 47 insertions, 55 deletions
diff --git a/src/fabric_db_create.erl b/src/fabric_db_create.erl
index 4f4e3b20..80bd1eb1 100644
--- a/src/fabric_db_create.erl
+++ b/src/fabric_db_create.erl
@@ -1,64 +1,56 @@
-module(fabric_db_create).
--author(brad@cloudant.com).
-
--export([create_db/2]).
+-export([go/2]).
-include("fabric.hrl").
%% @doc Create a new database, and all its partition files across the cluster
%% Options is proplist with user_ctx, n, q
--spec create_db(binary(), list()) -> {ok, #db{}} | {error, any()}.
-create_db(DbName, Options) ->
- Fullmap = partitions:fullmap(DbName, Options),
- {ok, FullNodes} = mem3:fullnodes(),
- RefPartMap = send_create_calls(Fullmap, Options),
- Acc0 = {false, length(RefPartMap), lists:usort([ {Beg, false} ||
- {_,#shard{range=[Beg,_]}} <- RefPartMap])},
- Result = case fabric_util:receive_loop(
- RefPartMap, 1, fun handle_create_msg/3, Acc0) of
- {ok, _Results} -> ok;
- Error -> Error
- end,
- % always install partition map, even w/ errors, so delete is possible
- partitions:install_fullmap(DbName, Fullmap, FullNodes, Options),
- Result.
-
-%%
-%% internal
-%%
+go(DbName, Options) ->
+ Shards = mem3:choose_shards(DbName, Options),
+ Doc = make_document(Shards),
+ Workers = fabric_util:submit_jobs(Shards, create_db, [Options, Doc]),
+ Acc0 = fabric_dict:init(Workers, nil),
+ case fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) of
+ {ok, _} ->
+ ok;
+ Else ->
+ Else
+ end.
-%% @doc create the partitions on all appropriate nodes (rexi calls)
--spec send_create_calls(fullmap(), list()) -> [{reference(), part()}].
-send_create_calls(Fullmap, Options) ->
- lists:map(fun(#shard{node=Node, name=ShardName} = Part) ->
- Ref = rexi:async_server_call({couch_server, Node},
- {create, ShardName, Options}),
- {Ref, Part}
- end, Fullmap).
+handle_message(Msg, Shard, Counters) ->
+ C1 = fabric_dict:store(Shard, Msg, Counters),
+ case fabric_dict:any(nil, C1) of
+ true ->
+ {ok, C1};
+ false ->
+ final_answer(C1)
+ end.
-%% @doc handle create messages from shards
-handle_create_msg(file_exists, _, _) ->
- {error, file_exists};
-handle_create_msg({rexi_EXIT, _Reason}, _, {Complete, N, Parts}) ->
- {ok, {Complete, N-1, Parts}};
-handle_create_msg({rexi_DOWN, _, _, _}, _, {Complete, _N, _Parts}) ->
- if
- Complete -> {stop, ok};
- true -> {error, create_db_fubar}
- end;
-handle_create_msg(_, _, {true, 1, _Acc}) ->
- {stop, ok};
-handle_create_msg({ok, _}, {_, #shard{range=[Beg,_]}}, {false, 1, PartResults0}) ->
- PartResults = lists:keyreplace(Beg, 1, PartResults0, {Beg, true}),
- case is_complete(PartResults) of
- true -> {stop, ok};
- false -> {error, create_db_fubar}
- end;
-handle_create_msg({ok, _}, _RefPart, {true, N, Parts}) ->
- {ok, {true, N-1, Parts}};
-handle_create_msg({ok, _}, {_Ref, #shard{range=[Beg,_]}}, {false, Rem, PartResults0}) ->
- PartResults = lists:keyreplace(Beg, 1, PartResults0, {Beg, true}),
- {ok, {is_complete(PartResults), Rem-1, PartResults}}.
+make_document([#shard{dbname=DbName}|_] = Shards) ->
+ {RawOut, ByNodeOut, ByRangeOut} =
+ lists:foldl(fun(#shard{node=N, range=[B,E]}, {Raw, ByNode, ByRange}) ->
+ Range = ?l2b([couch_util:to_hex(<<B:32/integer>>), "-",
+ couch_util:to_hex(<<E:32/integer>>)]),
+ Node = couch_util:to_binary(N),
+ {[[<<"add">>, Range, Node] | Raw], orddict:append(Node, Range, ByNode),
+ orddict:append(Range, Node, ByRange)}
+ end, {[], [], []}, Shards),
+ #doc{id=DbName, body = {[
+ {<<"changelog">>, lists:sort(RawOut)},
+ {<<"by_node">>, {[{K,lists:sort(V)} || {K,V} <- ByNodeOut]}},
+ {<<"by_range">>, {[{K,lists:sort(V)} || {K,V} <- ByRangeOut]}}
+ ]}}.
-is_complete(List) ->
- lists:all(fun({_,Bool}) -> Bool end, List).
+final_answer(Counters) ->
+ Successes = [X || {_, M} = X <- Counters, M == ok orelse M == file_exists],
+ case fabric_view:is_progress_possible(Successes) of
+ true ->
+ case lists:keymember(file_exists, 2, Successes) of
+ true ->
+ {error, file_exists};
+ false ->
+ {stop, ok}
+ end;
+ false ->
+ {error, internal_server_error}
+ end.