summaryrefslogtreecommitdiff
path: root/src/cluster_ops.erl
diff options
context:
space:
mode:
authorJoe <joe@ubuntu.localdomain>2010-02-22 12:19:15 -0800
committerJoe <joe@ubuntu.localdomain>2010-02-22 12:19:15 -0800
commit6fce297e9ff9f495b10281f2c5c78e6e0c2d48ad (patch)
tree42f34b519a411ce8f594a375d5be5c885ee37ed6 /src/cluster_ops.erl
merge attempt #1
Diffstat (limited to 'src/cluster_ops.erl')
-rw-r--r--src/cluster_ops.erl282
1 files changed, 282 insertions, 0 deletions
diff --git a/src/cluster_ops.erl b/src/cluster_ops.erl
new file mode 100644
index 00000000..bd2ad83d
--- /dev/null
+++ b/src/cluster_ops.erl
@@ -0,0 +1,282 @@
+%%%-------------------------------------------------------------------
+%%% File: cluster_ops.erl
+%%% @author Brad Anderson <brad@cloudant.com> [http://cloudant.com]
+%%% @copyright 2009 Brad Anderson
+%%% @doc
+%%%
+%%% @end
+%%%
+%%% @since 2009-07-21 by Brad Anderson
+%%%-------------------------------------------------------------------
+-module(cluster_ops).
+-author('brad@cloudant.com').
+
+%% API
+-export([key_lookup/3, key_lookup/5,
+ all_parts/4,
+ some_parts/4, some_parts/5,
+ quorum_from_each_part/3]).
+
+-include("../include/common.hrl").
+-include("../include/config.hrl").
+
+-include("../include/profile.hrl").
+
+
+%%====================================================================
+%% API
+%%====================================================================
+
+%% @doc Get to the proper shard on N nodes by key lookup
+%%
+%% This fun uses quorum constants from config
+key_lookup(Key, {M,F,A}, Access) ->
+ {N,_R,_W} = Consts = unpack_config(configuration:get_config()),
+ Const = get_const(Access, Consts),
+ key_lookup(Key, {M,F,A}, Access, Const, N).
+
+
+%% @doc Get to the proper shard on N nodes by key lookup
+%%
+%% This fun uses a provided quorum constant, possibly from request,
+%% possibly from config
+key_lookup(Key, {M,F,A}, Access, Const, N) ->
+ NodeParts = membership2:nodeparts_for_key(Key),
+ {ResolveFun, NotFoundFun} = case Access of
+ r -> {fun resolve_read/1, fun resolve_not_found/2};
+ w -> {fun resolve_write/1, fun(_,_) -> {false, notused, []} end}
+ end,
+ MapFun = fun({Node,Part}) ->
+ try
+ rpc:call(Node, M, F, [[Part | A]])
+ catch Class:Exception ->
+ {error, Class, Exception}
+ end
+ end,
+ {GoodReplies, Bad} = pcall(MapFun, NodeParts, Const),
+ if length(Bad) > 0 -> ?LOG_DEBUG("~nBad: ~p~n", [Bad]); true -> ok end,
+ Good = lists:map(fun strip_ok/1, GoodReplies),
+ final_key_lookup(Good, Bad, N, Const, ResolveFun, NotFoundFun, Access).
+
+
+%% @doc Do op on all shards (and maybe even replication partners)
+all_parts({M,F,A}, Access, AndPartners, ResolveFun) ->
+ NodePartList = membership2:all_nodes_parts(AndPartners),
+ MapFun = fun({Node, Part}) ->
+ try
+ rpc:call(Node, M, F, [[Part | A]])
+ catch Class:Exception ->
+ {error, Class, Exception}
+ end
+ end,
+ Replies = ?PMAP(MapFun, NodePartList),
+ {Good, Bad} = lists:partition(fun valid/1, Replies),
+ final_all_parts(Good, Bad, length(NodePartList), ResolveFun, Access).
+
+
+%% @doc Do op on some shards, depending on list of keys sent in.
+%%
+%% This fun uses quorum constants from config
+some_parts(KeyFun, SeqsKVPairs, {M,F,A}, Access) ->
+ Const = get_const(Access),
+ some_parts(KeyFun, SeqsKVPairs, {M,F,A}, Access, Const).
+
+
+%% @doc Do op on some shards, depending on list of keys sent in.
+%%
+%% This fun uses a provided quorum constant, possibly from request,
+%% possibly from config
+some_parts(KeyFun, SeqsKVPairs, {M,F,A}, _Access, Const) ->
+ TaskFun = fun({{Node,Part}, Values}) ->
+ try
+ rpc:call(Node, M, F, [[Part | [Values | A]]])
+ catch Class:Exception ->
+ {error, Class, Exception}
+ end
+ end,
+
+ % get tasks per node that are part / values for that partition
+ DistTasks = get_dist_tasks(KeyFun, SeqsKVPairs),
+
+ % With the distributed tasklist in hand, do the tasks per partition.
+ % For each partition, do the work on all nodes/parts.
+ TaskReplies = ?PMAP(TaskFun, DistTasks),
+ {GoodReplies, Bad} = lists:partition(fun valid/1, TaskReplies),
+ if length(Bad) > 0 -> ?LOG_DEBUG("~nBad: ~p~n", [Bad]); true -> ok end,
+ Good = lists:map(fun strip_ok/1, GoodReplies),
+ final_some_parts(Good, Bad, Const).
+
+
+quorum_from_each_part({M,F,A}, Access, ResolveFun) ->
+ Const = get_const(Access),
+ {_, Parts} = lists:unzip(membership2:partitions()),
+ PartsMapFun = fun(Part) ->
+ Nodes = membership2:nodes_for_part(Part),
+ NodesMapFun = fun(Node) -> rpc:call(Node, M, F, [[Part | A]]) end,
+ {GoodReplies,BadReplies} = pcall(NodesMapFun, Nodes, Const),
+ Good1 = lists:map(fun strip_ok/1, GoodReplies),
+ Bad1 = case length(Good1) >= Const of
+ true -> [];
+ false -> BadReplies
+ end,
+ {Good1,Bad1}
+ end,
+ Results1 = ?PMAP(PartsMapFun, Parts),
+ {Good,Bad} = lists:foldl(fun({G,B}, {GAcc,BAcc}) ->
+ {lists:append(G,GAcc),lists:append(B,BAcc)}
+ end, {[],[]}, Results1),
+ if length(Bad) > 0 -> ?LOG_DEBUG("~nBad: ~p~n", [Bad]); true -> ok end,
+ final_quorum_from_each_part(Good, Bad, length(Parts), ResolveFun, Access).
+
+
+%%--------------------------------------------------------------------
+%% Internal functions
+%%--------------------------------------------------------------------
+
+final_key_lookup(Good, Bad, N, Const, ResolveFun, NotFoundFun, Access) ->
+ {NotFound, Return, Reasons} = NotFoundFun(Bad, Const),
+ if
+ length(Good) >= Const -> {ok, ResolveFun(Good)};
+ NotFound -> {ok, Return, Reasons};
+ true -> error_message(Good, Bad, N, Const, Access)
+ end.
+
+
+final_all_parts(Good, Bad, Total, ResolveFun, Access) ->
+ case length(Good) =:= Total of
+ true -> {ok, ResolveFun(Good)};
+ _ -> error_message(Good, Bad, Total, Total, Access)
+ end.
+
+
+final_some_parts(Good, _Bad, Const) ->
+ Good1 = lists:flatten(Good),
+ {Seqs, _} = lists:unzip(Good1),
+ {ResG,ResB} =
+ lists:foldl(
+ fun(Seq, {AccG,AccB}) ->
+ Vals = proplists:get_all_values(Seq, Good1),
+ case length(Vals) >= Const of
+ true -> {[{Seq, Vals}|AccG],AccB};
+ _ -> {AccG, [{Seq, Vals}|AccB]}
+ end
+ end, {[],[]}, lists:usort(Seqs)),
+ case length(ResB) of
+ 0 -> {ok, ResG};
+ _ -> {error, ResB}
+ end.
+
+
+final_quorum_from_each_part(Good, Bad, Total, ResolveFun, Access) ->
+ case length(Good) =:= Total of
+ true -> {ok, ResolveFun(Good)};
+ _ -> error_message(Good, Bad, Total, Total, Access)
+ end.
+
+
+resolve_read([First|Responses]) ->
+ case First of
+ not_found -> not_found;
+ _ -> lists:foldr(fun vector_clock:resolve/2, First, Responses)
+ end.
+
+
+resolve_write([First|Responses]) ->
+ case First of
+ not_found -> not_found;
+ _ -> lists:foldr(fun vector_clock:resolve/2, First, Responses)
+ end.
+
+
+resolve_not_found(Bad, R) ->
+ {NotFoundCnt, DeletedCnt, OtherReasons} =
+ lists:foldl(fun({Error,Reason}, {NotFoundAcc, DeletedAcc, ReasonAcc}) ->
+ case {Error,Reason} of
+ {not_found, {_Clock, [missing|_Rest]}} ->
+ {NotFoundAcc+1, DeletedAcc, ReasonAcc};
+ {not_found, {_Clock, [deleted|_Rest]}} ->
+ {NotFoundAcc, DeletedAcc+1, ReasonAcc};
+ _ ->
+ {NotFoundAcc, DeletedAcc, [Reason|ReasonAcc]}
+ end
+ end, {0, 0, []}, Bad),
+ % TODO: is the comparison to R good here, or should it be N-R?
+ if
+ NotFoundCnt >= R -> {true, {not_found, missing}, OtherReasons};
+ DeletedCnt >= R -> {true, {not_found, deleted}, OtherReasons};
+ true -> {false, other, OtherReasons}
+ end.
+
+
+error_message(Good, Bad, N, T, Access) ->
+ Msg = list_to_atom(lists:concat([atom_to_list(Access), "_quorum_not_met"])),
+ ?LOG_ERROR("~p~nSuccess on ~p of ~p servers. Needed ~p. Errors: ~w"
+ , [Msg, length(Good), N, T, Bad]),
+ [{error, Msg}, {good, Good}, {bad, Bad}].
+
+
+unpack_config(#config{n=N,r=R,w=W}) ->
+ {N, R, W}.
+
+
+pcall(MapFun, Servers, Const) ->
+ Replies = lib_misc:pmap(MapFun, Servers, Const),
+ lists:partition(fun valid/1, Replies).
+
+
+valid({ok, _}) -> true;
+valid(ok) -> true;
+valid(_) -> false.
+
+
+strip_ok({ok, Val}) -> Val;
+strip_ok(Val) -> Val.
+
+
+%% @spec get_dist_tasks(KeyFun::function(), KVPairs::list()) ->
+%% [{{Node::node(), Part::integer()}, SeqVals}]
+%% Type - ordered | ??
+%% SeqVals - [{Seq, Val}]
+%% @doc builds a distributed task list of nodes with a list of shard/values.
+%% This looks like a dict structure
+%% but is a list so we can use ?PMAP with the results
+%% @end
+get_dist_tasks(KeyFun, SeqsKVPairs) ->
+ %% loop thru SeqsKVPairs adding node/part to each
+ NPSV = lists:flatmap(
+ fun({Seq,KVPair}) ->
+ NodeParts = membership2:nodeparts_for_key(KeyFun(KVPair)),
+ lists:map(
+ fun(NodePart) ->
+ {NodePart, {Seq, KVPair}}
+ end, NodeParts)
+ end, SeqsKVPairs),
+ nodepart_values_list(NPSV).
+
+
+%% pile up the List by NodePart (like a dict)
+nodepart_values_list(List) ->
+ DistTasks =
+ lists:foldl(
+ fun(NodePart, AccIn) ->
+ Values = proplists:get_all_values(NodePart, List),
+ case length(Values) of
+ 0 -> AccIn;
+ _ -> [{NodePart, Values} | AccIn]
+ end
+ end, [], membership2:all_nodes_parts(true)),
+ % ?LOG_DEBUG("~nDistTasks: ~p~n", [DistTasks]),
+ DistTasks.
+
+
+get_const(Access) ->
+ get_const(Access, unpack_config(configuration:get_config())).
+
+
+get_const(Access, {_N,R,W}) ->
+ case Access of
+ r -> R;
+ w -> W;
+ r1 -> 1;
+ Other -> throw({bad_access_term, Other})
+ end.