summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/fabric.erl14
-rw-r--r--src/fabric_db_create.erl102
-rw-r--r--src/fabric_db_delete.erl78
-rw-r--r--src/fabric_db_doc_count.erl2
-rw-r--r--src/fabric_db_info.erl2
-rw-r--r--src/fabric_doc_missing_revs.erl2
-rw-r--r--src/fabric_doc_open.erl2
-rw-r--r--src/fabric_doc_open_revs.erl2
-rw-r--r--src/fabric_doc_update.erl2
-rw-r--r--src/fabric_group_info.erl2
-rw-r--r--src/fabric_rpc.erl14
-rw-r--r--src/fabric_view.erl44
-rw-r--r--src/fabric_view_all_docs.erl2
-rw-r--r--src/fabric_view_changes.erl10
-rw-r--r--src/fabric_view_map.erl2
-rw-r--r--src/fabric_view_reduce.erl2
16 files changed, 138 insertions, 144 deletions
diff --git a/src/fabric.erl b/src/fabric.erl
index badc1379..b233677b 100644
--- a/src/fabric.erl
+++ b/src/fabric.erl
@@ -1,8 +1,8 @@
-module(fabric).
% DBs
--export([all_dbs/0, all_dbs/1, create_db/2, delete_db/2, get_db_info/1,
- get_doc_count/1]).
+-export([all_dbs/0, all_dbs/1, create_db/1, create_db/2, delete_db/1,
+ delete_db/2, get_db_info/1, get_doc_count/1]).
% Documents
-export([open_doc/3, open_revs/4, get_missing_revs/2, update_doc/3,
@@ -36,11 +36,17 @@ get_db_info(DbName) ->
get_doc_count(DbName) ->
fabric_db_doc_count:go(dbname(DbName)).
+create_db(DbName) ->
+ create_db(DbName, []).
+
create_db(DbName, Options) ->
- fabric_db_create:create_db(dbname(DbName), opts(Options)).
+ fabric_db_create:go(dbname(DbName), opts(Options)).
+
+delete_db(DbName) ->
+ delete_db(DbName, []).
delete_db(DbName, Options) ->
- fabric_db_delete:delete_db(dbname(DbName), opts(Options)).
+ fabric_db_delete:go(dbname(DbName), opts(Options)).
% doc operations
diff --git a/src/fabric_db_create.erl b/src/fabric_db_create.erl
index 4f4e3b20..80bd1eb1 100644
--- a/src/fabric_db_create.erl
+++ b/src/fabric_db_create.erl
@@ -1,64 +1,56 @@
-module(fabric_db_create).
--author(brad@cloudant.com).
-
--export([create_db/2]).
+-export([go/2]).
-include("fabric.hrl").
%% @doc Create a new database, and all its partition files across the cluster
%% Options is proplist with user_ctx, n, q
--spec create_db(binary(), list()) -> {ok, #db{}} | {error, any()}.
-create_db(DbName, Options) ->
- Fullmap = partitions:fullmap(DbName, Options),
- {ok, FullNodes} = mem3:fullnodes(),
- RefPartMap = send_create_calls(Fullmap, Options),
- Acc0 = {false, length(RefPartMap), lists:usort([ {Beg, false} ||
- {_,#shard{range=[Beg,_]}} <- RefPartMap])},
- Result = case fabric_util:receive_loop(
- RefPartMap, 1, fun handle_create_msg/3, Acc0) of
- {ok, _Results} -> ok;
- Error -> Error
- end,
- % always install partition map, even w/ errors, so delete is possible
- partitions:install_fullmap(DbName, Fullmap, FullNodes, Options),
- Result.
-
-%%
-%% internal
-%%
+go(DbName, Options) ->
+ Shards = mem3:choose_shards(DbName, Options),
+ Doc = make_document(Shards),
+ Workers = fabric_util:submit_jobs(Shards, create_db, [Options, Doc]),
+ Acc0 = fabric_dict:init(Workers, nil),
+ case fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) of
+ {ok, _} ->
+ ok;
+ Else ->
+ Else
+ end.
-%% @doc create the partitions on all appropriate nodes (rexi calls)
--spec send_create_calls(fullmap(), list()) -> [{reference(), part()}].
-send_create_calls(Fullmap, Options) ->
- lists:map(fun(#shard{node=Node, name=ShardName} = Part) ->
- Ref = rexi:async_server_call({couch_server, Node},
- {create, ShardName, Options}),
- {Ref, Part}
- end, Fullmap).
+handle_message(Msg, Shard, Counters) ->
+ C1 = fabric_dict:store(Shard, Msg, Counters),
+ case fabric_dict:any(nil, C1) of
+ true ->
+ {ok, C1};
+ false ->
+ final_answer(C1)
+ end.
-%% @doc handle create messages from shards
-handle_create_msg(file_exists, _, _) ->
- {error, file_exists};
-handle_create_msg({rexi_EXIT, _Reason}, _, {Complete, N, Parts}) ->
- {ok, {Complete, N-1, Parts}};
-handle_create_msg({rexi_DOWN, _, _, _}, _, {Complete, _N, _Parts}) ->
- if
- Complete -> {stop, ok};
- true -> {error, create_db_fubar}
- end;
-handle_create_msg(_, _, {true, 1, _Acc}) ->
- {stop, ok};
-handle_create_msg({ok, _}, {_, #shard{range=[Beg,_]}}, {false, 1, PartResults0}) ->
- PartResults = lists:keyreplace(Beg, 1, PartResults0, {Beg, true}),
- case is_complete(PartResults) of
- true -> {stop, ok};
- false -> {error, create_db_fubar}
- end;
-handle_create_msg({ok, _}, _RefPart, {true, N, Parts}) ->
- {ok, {true, N-1, Parts}};
-handle_create_msg({ok, _}, {_Ref, #shard{range=[Beg,_]}}, {false, Rem, PartResults0}) ->
- PartResults = lists:keyreplace(Beg, 1, PartResults0, {Beg, true}),
- {ok, {is_complete(PartResults), Rem-1, PartResults}}.
+make_document([#shard{dbname=DbName}|_] = Shards) ->
+ {RawOut, ByNodeOut, ByRangeOut} =
+ lists:foldl(fun(#shard{node=N, range=[B,E]}, {Raw, ByNode, ByRange}) ->
+ Range = ?l2b([couch_util:to_hex(<<B:32/integer>>), "-",
+ couch_util:to_hex(<<E:32/integer>>)]),
+ Node = couch_util:to_binary(N),
+ {[[<<"add">>, Range, Node] | Raw], orddict:append(Node, Range, ByNode),
+ orddict:append(Range, Node, ByRange)}
+ end, {[], [], []}, Shards),
+ #doc{id=DbName, body = {[
+ {<<"changelog">>, lists:sort(RawOut)},
+ {<<"by_node">>, {[{K,lists:sort(V)} || {K,V} <- ByNodeOut]}},
+ {<<"by_range">>, {[{K,lists:sort(V)} || {K,V} <- ByRangeOut]}}
+ ]}}.
-is_complete(List) ->
- lists:all(fun({_,Bool}) -> Bool end, List).
+final_answer(Counters) ->
+ Successes = [X || {_, M} = X <- Counters, M == ok orelse M == file_exists],
+ case fabric_view:is_progress_possible(Successes) of
+ true ->
+ case lists:keymember(file_exists, 2, Successes) of
+ true ->
+ {error, file_exists};
+ false ->
+ {stop, ok}
+ end;
+ false ->
+ {error, internal_server_error}
+ end.
diff --git a/src/fabric_db_delete.erl b/src/fabric_db_delete.erl
index 95b1a5ef..923b38dc 100644
--- a/src/fabric_db_delete.erl
+++ b/src/fabric_db_delete.erl
@@ -1,56 +1,40 @@
-module(fabric_db_delete).
--author(brad@cloudant.com).
-
--export([delete_db/2]).
+-export([go/2]).
-include("fabric.hrl").
-%% @doc Delete a database, and all its partition files across the cluster
-%% Options is proplist with user_ctx, n, q
--spec delete_db(binary(), list()) -> {ok, #db{}} | {error, any()}.
-delete_db(DbName, Options) ->
- Fullmap = partitions:all_parts(DbName),
- RefPartMap = send_delete_calls(Fullmap, Options),
- Acc0 = {not_found, length(RefPartMap)},
- case fabric_util:receive_loop(
- RefPartMap, 1, fun handle_delete_msg/3, Acc0) of
- {ok, _Results} ->
- delete_fullmap(DbName),
+go(DbName, Options) ->
+ Shards = mem3:shards(DbName),
+ Workers = fabric_util:submit_jobs(Shards, delete_db, [Options, DbName]),
+ Acc0 = fabric_dict:init(Workers, nil),
+ case fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) of
+ {ok, ok} ->
ok;
- Error -> Error
+ {ok, not_found} ->
+ erlang:error(database_does_not_exist);
+ Error ->
+ Error
end.
-%%
-%% internal
-%%
-
-%% @doc delete the partitions on all appropriate nodes (rexi calls)
--spec send_delete_calls(fullmap(), list()) -> [{reference(), part()}].
-send_delete_calls(Parts, Options) ->
- lists:map(fun(#shard{node=Node, name=ShardName} = Part) ->
- Ref = rexi:async_server_call({couch_server, Node},
- {delete, ShardName, Options}),
- {Ref, Part}
- end, Parts).
-
-handle_delete_msg(ok, _, {_, 1}) ->
- {stop, ok};
-handle_delete_msg(not_found, _, {Acc, 1}) ->
- {stop, Acc};
-handle_delete_msg({rexi_EXIT, _Reason}, _, {Acc, N}) ->
- % TODO is this the appropriate action to take, or should we abort?
- {ok, {Acc, N-1}};
-handle_delete_msg({rexi_DOWN, _, _, _}, _, _Acc) ->
- {error, delete_db_fubar};
-handle_delete_msg(not_found, _, {Acc, N}) ->
- {ok, {Acc, N-1}};
-handle_delete_msg(ok, _, {_, N}) ->
- {ok, {ok, N-1}}.
+handle_message(Msg, Shard, Counters) ->
+ C1 = fabric_dict:store(Shard, Msg, Counters),
+ case fabric_dict:any(nil, C1) of
+ true ->
+ {ok, C1};
+ false ->
+ final_answer(C1)
+ end.
-delete_fullmap(DbName) ->
- case couch_db:open(<<"dbs">>, []) of
- {ok, Db} ->
- {ok, Doc} = couch_api:open_doc(Db, DbName, nil, []),
- couch_api:update_doc(Db, Doc#doc{deleted=true});
- Error -> Error
+final_answer(Counters) ->
+ Successes = [X || {_, M} = X <- Counters, M == ok orelse M == not_found],
+ case fabric_view:is_progress_possible(Successes) of
+ true ->
+ case lists:keymember(ok, 2, Successes) of
+ true ->
+ {stop, ok};
+ false ->
+ {stop, not_found}
+ end;
+ false ->
+ {error, internal_server_error}
end.
diff --git a/src/fabric_db_doc_count.erl b/src/fabric_db_doc_count.erl
index 4c3a72d5..c587d103 100644
--- a/src/fabric_db_doc_count.erl
+++ b/src/fabric_db_doc_count.erl
@@ -5,7 +5,7 @@
-include("fabric.hrl").
go(DbName) ->
- Shards = partitions:all_parts(DbName),
+ Shards = mem3:shards(DbName),
Workers = fabric_util:submit_jobs(Shards, get_doc_count, []),
Acc0 = {length(Workers), [{Beg,nil} || #shard{range=[Beg,_]} <- Workers]},
fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0).
diff --git a/src/fabric_db_info.erl b/src/fabric_db_info.erl
index 8db7212f..ecb8ce1c 100644
--- a/src/fabric_db_info.erl
+++ b/src/fabric_db_info.erl
@@ -5,7 +5,7 @@
-include("fabric.hrl").
go(DbName) ->
- Shards = partitions:all_parts(DbName),
+ Shards = mem3:shards(DbName),
Workers = fabric_util:submit_jobs(Shards, get_db_info, []),
Acc0 = {fabric_dict:init(Workers, nil), []},
fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0).
diff --git a/src/fabric_doc_missing_revs.erl b/src/fabric_doc_missing_revs.erl
index 8f1f5b4c..22c11ad6 100644
--- a/src/fabric_doc_missing_revs.erl
+++ b/src/fabric_doc_missing_revs.erl
@@ -51,7 +51,7 @@ group_idrevs_by_shard(DbName, IdsRevs) ->
dict:to_list(lists:foldl(fun({Id, Revs}, D0) ->
lists:foldl(fun(Shard, D1) ->
dict:append(Shard, {Id, Revs}, D1)
- end, D0, partitions:for_key(DbName,Id))
+ end, D0, mem3:shards(DbName,Id))
end, dict:new(), IdsRevs)).
update_dict(D0, KVs) ->
diff --git a/src/fabric_doc_open.erl b/src/fabric_doc_open.erl
index 6f39f39e..96545c55 100644
--- a/src/fabric_doc_open.erl
+++ b/src/fabric_doc_open.erl
@@ -5,7 +5,7 @@
-include("fabric.hrl").
go(DbName, Id, Options) ->
- Workers = fabric_util:submit_jobs(partitions:for_key(DbName,Id), open_doc,
+ Workers = fabric_util:submit_jobs(mem3:shards(DbName,Id), open_doc,
[Id, Options]),
SuppressDeletedDoc = not lists:member(deleted, Options),
Acc0 = {length(Workers), couch_util:get_value(r, Options, 1), []},
diff --git a/src/fabric_doc_open_revs.erl b/src/fabric_doc_open_revs.erl
index 2fa91208..5bf5499f 100644
--- a/src/fabric_doc_open_revs.erl
+++ b/src/fabric_doc_open_revs.erl
@@ -5,7 +5,7 @@
-include("fabric.hrl").
go(DbName, Id, Revs, Options) ->
- Workers = fabric_util:submit_jobs(partitions:for_key(DbName,Id), open_revs,
+ Workers = fabric_util:submit_jobs(mem3:shards(DbName,Id), open_revs,
[Id, Revs, Options]),
Acc0 = {length(Workers), couch_util:get_value(r, Options, 1), []},
case fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) of
diff --git a/src/fabric_doc_update.erl b/src/fabric_doc_update.erl
index 9d3cd3d1..77c182f1 100644
--- a/src/fabric_doc_update.erl
+++ b/src/fabric_doc_update.erl
@@ -84,7 +84,7 @@ group_docs_by_shard(DbName, Docs) ->
dict:to_list(lists:foldl(fun(#doc{id=Id} = Doc, D0) ->
lists:foldl(fun(Shard, D1) ->
dict:append(Shard, Doc, D1)
- end, D0, partitions:for_key(DbName,Id))
+ end, D0, mem3:shards(DbName,Id))
end, dict:new(), Docs)).
append_update_replies([], [], DocReplyDict) ->
diff --git a/src/fabric_group_info.erl b/src/fabric_group_info.erl
index d2b76674..a1ba92cc 100644
--- a/src/fabric_group_info.erl
+++ b/src/fabric_group_info.erl
@@ -10,7 +10,7 @@ go(DbName, GroupId) when is_binary(GroupId) ->
go(DbName, #doc{} = DDoc) ->
Group = couch_view_group:design_doc_to_view_group(#db{name=DbName}, DDoc),
- Shards = partitions:all_parts(DbName),
+ Shards = mem3:shards(DbName),
Workers = fabric_util:submit_jobs(Shards, group_info, [Group]),
Acc0 = {fabric_dict:init(Workers, nil), []},
fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0).
diff --git a/src/fabric_rpc.erl b/src/fabric_rpc.erl
index 04aebfd1..1a2edf77 100644
--- a/src/fabric_rpc.erl
+++ b/src/fabric_rpc.erl
@@ -3,6 +3,7 @@
-export([get_db_info/1, get_doc_count/1, get_update_seq/1]).
-export([open_doc/3, open_revs/4, get_missing_revs/2, update_docs/3]).
-export([all_docs/2, changes/3, map_view/4, reduce_view/4, group_info/2]).
+-export([create_db/3, delete_db/3]).
-include("fabric.hrl").
@@ -136,6 +137,19 @@ reduce_view(DbName, Group0, ViewName, QueryArgs) ->
end,
rexi:reply(complete).
+create_db(DbName, Options, Doc) ->
+ mem3_util:write_db_doc(Doc),
+ rexi:reply(case couch_server:create(DbName, Options) of
+ {ok, _} ->
+ ok;
+ Error ->
+ Error
+ end).
+
+delete_db(DbName, Options, DocId) ->
+ mem3_util:delete_db_doc(DocId),
+ rexi:reply(couch_server:delete(DbName, Options)).
+
get_db_info(DbName) ->
with_db(DbName, [], {couch_db, get_db_info, []}).
diff --git a/src/fabric_view.erl b/src/fabric_view.erl
index 738bb7dd..09fcd43c 100644
--- a/src/fabric_view.erl
+++ b/src/fabric_view.erl
@@ -8,34 +8,32 @@
%% @doc looks for a fully covered keyrange in the list of counters
-spec is_progress_possible([{#shard{}, non_neg_integer()}]) -> boolean().
+is_progress_possible([]) ->
+ false;
is_progress_possible(Counters) ->
Ranges = fabric_dict:fold(fun(#shard{range=[X,Y]}, _, A) -> [{X,Y}|A] end,
[], Counters),
- [First | Rest] = lists:ukeysort(1, Ranges),
- {Head, Tail} = lists:foldl(fun
- (_, {Head, Tail}) when Head =:= Tail ->
- % this is the success condition, we can fast-forward
- {Head, Tail};
- (_, {foo, bar}) ->
+ [{0, Tail0} | Rest] = lists:ukeysort(1, Ranges),
+ Result = lists:foldl(fun
+ (_, fail) ->
% we've already declared failure
- {foo, bar};
- ({X,_}, {Head, Tail}) when Head < Tail, X > Tail ->
+ fail;
+ (_, complete) ->
+ % this is the success condition, we can fast-forward
+ complete;
+ ({X,_}, Tail) when X > (Tail+1) ->
% gap in the keyrange, we're dead
- {foo, bar};
- ({X,Y}, {Head, Tail}) when Head < Tail, X < Y ->
- % the normal condition, adding to the tail
- {Head, erlang:max(Tail, Y)};
- ({X,Y}, {Head, Tail}) when Head < Tail, X > Y, Y >= Head ->
- % we've wrapped all the way around, trigger success condition
- {Head, Head};
- ({X,Y}, {Head, Tail}) when Head < Tail, X > Y ->
- % this wraps the keyspace, but there's still a gap. We're dead
- % TODO technically, another shard could be a superset of this one, and
- % we could still be alive. Pretty unlikely though, and impossible if
- % we don't allow shards to wrap around the boundary
- {foo, bar}
- end, First, Rest),
- Head =:= Tail.
+ fail;
+ ({_,Y}, Tail) ->
+ case erlang:max(Tail, Y) of
+ End when (End+1) =:= (2 bsl 31) ->
+ complete;
+ Else ->
+ % the normal condition, adding to the tail
+ Else
+ end
+ end, Tail0, Rest),
+ Result =:= complete.
-spec remove_overlapping_shards(#shard{}, [#shard{}]) -> [#shard{}].
remove_overlapping_shards(#shard{range=[A,B]} = Shard0, Shards) ->
diff --git a/src/fabric_view_all_docs.erl b/src/fabric_view_all_docs.erl
index 196d6837..f1713b86 100644
--- a/src/fabric_view_all_docs.erl
+++ b/src/fabric_view_all_docs.erl
@@ -9,7 +9,7 @@ go(DbName, #view_query_args{keys=nil} = QueryArgs, Callback, Acc0) ->
Workers = lists:map(fun(#shard{name=Name, node=Node} = Shard) ->
Ref = rexi:cast(Node, {fabric_rpc, all_docs, [Name, QueryArgs]}),
Shard#shard{ref = Ref}
- end, partitions:all_parts(DbName)),
+ end, mem3:shards(DbName)),
BufferSize = couch_config:get("fabric", "map_buffer_size", "2"),
#view_query_args{limit = Limit, skip = Skip} = QueryArgs,
State = #collector{
diff --git a/src/fabric_view_changes.erl b/src/fabric_view_changes.erl
index 39a57176..f6989061 100644
--- a/src/fabric_view_changes.erl
+++ b/src/fabric_view_changes.erl
@@ -62,7 +62,7 @@ keep_sending_changes(DbName, Args, Callback, Seqs, AccIn, Timeout, TFun) ->
end.
send_changes(DbName, ChangesArgs, Callback, PackedSeqs, AccIn) ->
- AllShards = partitions:all_parts(DbName),
+ AllShards = mem3:shards(DbName),
Seqs = lists:flatmap(fun({#shard{name=Name, node=N} = Shard, Seq}) ->
case lists:member(Shard, AllShards) of
true ->
@@ -168,7 +168,7 @@ make_changes_args(Options) ->
get_start_seq(_DbName, #changes_args{dir=fwd, since=Since}) ->
Since;
get_start_seq(DbName, #changes_args{dir=rev}) ->
- Shards = partitions:all_parts(DbName),
+ Shards = mem3:shards(DbName),
Workers = fabric_util:submit_jobs(Shards, get_update_seq, []),
{ok, Since} = fabric_util:recv(Workers, #shard.ref,
fun collect_update_seqs/3, fabric_dict:init(Workers, -1)),
@@ -195,10 +195,10 @@ pack_seqs(Workers) ->
couch_util:encodeBase64Url(term_to_binary(SeqList, [compressed])).
unpack_seqs(0, DbName) ->
- fabric_dict:init(partitions:all_parts(DbName), 0);
+ fabric_dict:init(mem3:shards(DbName), 0);
unpack_seqs("0", DbName) ->
- fabric_dict:init(partitions:all_parts(DbName), 0);
+ fabric_dict:init(mem3:shards(DbName), 0);
unpack_seqs(Packed, DbName) ->
% TODO relies on internal structure of fabric_dict as keylist
@@ -210,7 +210,7 @@ unpack_seqs(Packed, DbName) ->
start_update_notifiers(DbName) ->
lists:map(fun(#shard{node=Node, name=Name}) ->
{Node, rexi:cast(Node, {?MODULE, start_update_notifier, [Name]})}
- end, partitions:all_parts(DbName)).
+ end, mem3:shards(DbName)).
% rexi endpoint
start_update_notifier(DbName) ->
diff --git a/src/fabric_view_map.erl b/src/fabric_view_map.erl
index 69133f3b..6c6dfc96 100644
--- a/src/fabric_view_map.erl
+++ b/src/fabric_view_map.erl
@@ -12,7 +12,7 @@ go(DbName, DDoc, View, Args, Callback, Acc0) ->
Workers = lists:map(fun(#shard{name=Name, node=Node} = Shard) ->
Ref = rexi:cast(Node, {fabric_rpc, map_view, [Name, DDoc, View, Args]}),
Shard#shard{ref = Ref}
- end, partitions:all_parts(DbName)),
+ end, mem3:shards(DbName)),
BufferSize = couch_config:get("fabric", "map_buffer_size", "2"),
#view_query_args{limit = Limit, skip = Skip, keys = Keys} = Args,
State = #collector{
diff --git a/src/fabric_view_reduce.erl b/src/fabric_view_reduce.erl
index ca137314..4f08b3ed 100644
--- a/src/fabric_view_reduce.erl
+++ b/src/fabric_view_reduce.erl
@@ -16,7 +16,7 @@ go(DbName, DDoc, VName, Args, Callback, Acc0) ->
Workers = lists:map(fun(#shard{name=Name, node=N} = Shard) ->
Ref = rexi:cast(N, {fabric_rpc, reduce_view, [Name,Group,VName,Args]}),
Shard#shard{ref = Ref}
- end, partitions:all_parts(DbName)),
+ end, mem3:shards(DbName)),
BufferSize = couch_config:get("fabric", "reduce_buffer_size", "20"),
#view_query_args{limit = Limit, skip = Skip} = Args,
State = #collector{