diff options
Diffstat (limited to 'src/cluster_ops.erl')
-rw-r--r-- | src/cluster_ops.erl | 264 |
1 files changed, 0 insertions, 264 deletions
diff --git a/src/cluster_ops.erl b/src/cluster_ops.erl deleted file mode 100644 index 72bba92f..00000000 --- a/src/cluster_ops.erl +++ /dev/null @@ -1,264 +0,0 @@ -%%%------------------------------------------------------------------- -%%% File: cluster_ops.erl -%%% @author Brad Anderson <brad@cloudant.com> [http://cloudant.com] -%%% @copyright 2009 Brad Anderson -%%% @doc -%%% -%%% @end -%%% -%%% @since 2009-07-21 by Brad Anderson -%%%------------------------------------------------------------------- --module(cluster_ops). --author('brad@cloudant.com'). - -%% API --export([key_lookup/3, key_lookup/5, - all_parts/4, - some_parts/4, some_parts/5, - quorum_from_each_part/3]). - --include("../include/common.hrl"). --include("../include/config.hrl"). - --include("../include/profile.hrl"). - - -%%==================================================================== -%% API -%%==================================================================== - -%% @doc Get to the proper shard on N nodes by key lookup -%% -%% This fun uses quorum constants from config -key_lookup(Key, {M,F,A}, Access) -> - N = list_to_integer(couch_config:get("cluster", "n", "3")), - key_lookup(Key, {M,F,A}, Access, get_const(Access), N). - - -%% @doc Get to the proper shard on N nodes by key lookup -%% -%% This fun uses a provided quorum constant, possibly from request, -%% possibly from config -key_lookup(Key, {M,F,A}, Access, Const, N) -> - NodeParts = membership2:nodeparts_for_key(Key), - {ResolveFun, NotFoundFun} = case Access of - r -> {fun resolve_read/1, fun resolve_not_found/2}; - w -> {fun resolve_write/1, fun(_,_) -> {false, notused, []} end} - end, - MapFun = fun({Node,Part}) -> - try - rpc:call(Node, M, F, [[Part | A]]) - catch Class:Exception -> - {error, Class, Exception} - end - end, - {GoodReplies, Bad} = pcall(MapFun, NodeParts, N), - if length(Bad) > 0 -> ?LOG_DEBUG("~nBad: ~p~n", [Bad]); true -> ok end, - Good = lists:map(fun strip_ok/1, GoodReplies), - final_key_lookup(Good, Bad, N, Const, ResolveFun, NotFoundFun, Access). - - -%% @doc Do op on all shards (and maybe even replication partners) -all_parts({M,F,A}, Access, AndPartners, ResolveFun) -> - NodePartList = membership2:all_nodes_parts(AndPartners), - MapFun = fun({Node, Part}) -> - try - rpc:call(Node, M, F, [[Part | A]]) - catch Class:Exception -> - {error, Class, Exception} - end - end, - Replies = ?PMAP(MapFun, NodePartList), - {Good, Bad} = lists:partition(fun valid/1, Replies), - final_all_parts(Good, Bad, length(NodePartList), ResolveFun, Access). - - -%% @doc Do op on some shards, depending on list of keys sent in. -%% -%% This fun uses quorum constants from config -some_parts(KeyFun, SeqsKVPairs, {M,F,A}, Access) -> - some_parts(KeyFun, SeqsKVPairs, {M,F,A}, Access, get_const(Access)). - - -%% @doc Do op on some shards, depending on list of keys sent in. -%% -%% This fun uses a provided quorum constant, possibly from request, -%% possibly from config -some_parts(KeyFun, SeqsKVPairs, {M,F,A}, _Access, Const) -> - TaskFun = fun({{Node,Part}, Values}) -> - try - rpc:call(Node, M, F, [[Part | [Values | A]]]) - catch Class:Exception -> - {error, Class, Exception} - end - end, - - % get tasks per node that are part / values for that partition - DistTasks = get_dist_tasks(KeyFun, SeqsKVPairs), - - % With the distributed tasklist in hand, do the tasks per partition. - % For each partition, do the work on all nodes/parts. - TaskReplies = ?PMAP(TaskFun, DistTasks), - {GoodReplies, Bad} = lists:partition(fun valid/1, TaskReplies), - if length(Bad) > 0 -> ?LOG_DEBUG("~nBad: ~p~n", [Bad]); true -> ok end, - Good = lists:map(fun strip_ok/1, GoodReplies), - final_some_parts(Good, Bad, Const). - - -quorum_from_each_part({M,F,A}, Access, ResolveFun) -> - Const = get_const(Access), - {_, Parts} = lists:unzip(membership2:partitions()), - PartsMapFun = fun(Part) -> - Nodes = membership2:nodes_for_part(Part), - NodesMapFun = fun(Node) -> rpc:call(Node, M, F, [[Part | A]]) end, - {GoodReplies,BadReplies} = pcall(NodesMapFun, Nodes, Const), - Good1 = lists:map(fun strip_ok/1, GoodReplies), - Bad1 = case length(Good1) >= Const of - true -> []; - false -> BadReplies - end, - {Good1,Bad1} - end, - Results1 = ?PMAP(PartsMapFun, Parts), - {Good,Bad} = lists:foldl(fun({G,B}, {GAcc,BAcc}) -> - {lists:append(G,GAcc),lists:append(B,BAcc)} - end, {[],[]}, Results1), - if length(Bad) > 0 -> ?LOG_DEBUG("~nBad: ~p~n", [Bad]); true -> ok end, - final_quorum_from_each_part(Good, Bad, length(Parts), ResolveFun, Access). - - -%%-------------------------------------------------------------------- -%% Internal functions -%%-------------------------------------------------------------------- - -final_key_lookup(Good, Bad, N, Const, ResolveFun, NotFoundFun, Access) -> - {NotFound, Return, Reasons} = NotFoundFun(Bad, Const), - if - length(Good) >= Const -> {ok, ResolveFun(Good)}; - NotFound -> {ok, Return, Reasons}; - true -> error_message(Good, Bad, N, Const, Access) - end. - - -final_all_parts(Good, Bad, Total, ResolveFun, Access) -> - case length(Good) =:= Total of - true -> {ok, ResolveFun(Good)}; - _ -> error_message(Good, Bad, Total, Total, Access) - end. - - -final_some_parts(Good, _Bad, Const) -> - Good1 = lists:flatten(Good), - {Seqs, _} = lists:unzip(Good1), - {ResG,ResB} = - lists:foldl( - fun(Seq, {AccG,AccB}) -> - Vals = proplists:get_all_values(Seq, Good1), - case length(Vals) >= Const of - true -> {[{Seq, Vals}|AccG],AccB}; - _ -> {AccG, [{Seq, Vals}|AccB]} - end - end, {[],[]}, lists:usort(Seqs)), - case length(ResB) of - 0 -> {ok, ResG}; - _ -> {error, ResB} - end. - - -final_quorum_from_each_part(Good, Bad, Total, ResolveFun, Access) -> - case length(Good) =:= Total of - true -> {ok, ResolveFun(Good)}; - _ -> error_message(Good, Bad, Total, Total, Access) - end. - - -resolve_read([First|Responses]) -> - case First of - not_found -> not_found; - _ -> lists:foldr(fun vector_clock:resolve/2, First, Responses) - end. - - -resolve_write([First|Responses]) -> - case First of - not_found -> not_found; - _ -> lists:foldr(fun vector_clock:resolve/2, First, Responses) - end. - - -resolve_not_found(Bad, R) -> - {NotFoundCnt, DeletedCnt, OtherReasons} = - lists:foldl(fun({Error,Reason}, {NotFoundAcc, DeletedAcc, ReasonAcc}) -> - case {Error,Reason} of - {not_found, {_Clock, [missing|_Rest]}} -> - {NotFoundAcc+1, DeletedAcc, ReasonAcc}; - {not_found, {_Clock, [deleted|_Rest]}} -> - {NotFoundAcc, DeletedAcc+1, ReasonAcc}; - _ -> - {NotFoundAcc, DeletedAcc, [Reason|ReasonAcc]} - end - end, {0, 0, []}, Bad), - % TODO: is the comparison to R good here, or should it be N-R? - if - NotFoundCnt >= R -> {true, {not_found, missing}, OtherReasons}; - DeletedCnt >= R -> {true, {not_found, deleted}, OtherReasons}; - true -> {false, other, OtherReasons} - end. - - -error_message(Good, Bad, N, T, Access) -> - Msg = list_to_atom(lists:concat([atom_to_list(Access), "_quorum_not_met"])), - ?LOG_ERROR("~p~nSuccess on ~p of ~p servers. Needed ~p. Errors: ~w" - , [Msg, length(Good), N, T, Bad]), - [{error, Msg}, {good, Good}, {bad, Bad}]. - - -pcall(MapFun, Servers, Const) -> - Replies = lib_misc:pmap(MapFun, Servers, Const), - lists:partition(fun valid/1, Replies). - - -valid({ok, _}) -> true; -valid(ok) -> true; -valid(_) -> false. - - -strip_ok({ok, Val}) -> Val; -strip_ok(Val) -> Val. - - -%% @spec get_dist_tasks(KeyFun::function(), KVPairs::list()) -> -%% [{{Node::node(), Part::integer()}, SeqVals}] -%% Type - ordered | ?? -%% SeqVals - [{Seq, Val}] -%% @doc builds a distributed task list of nodes with a list of shard/values. -%% This looks like a dict structure -%% but is a list so we can use ?PMAP with the results -%% @end -get_dist_tasks(KeyFun, SeqsKVPairs) -> - NPSV = lists:flatmap(fun({_,KVPair} = Elem) -> - [{NP, Elem} || NP <- membership2:nodeparts_for_key(KeyFun(KVPair))] - end, SeqsKVPairs), - group_by_key(NPSV). - -group_by_key([]) -> - []; -group_by_key(List) -> - [{FirstK,FirstV} | Rest] = lists:keysort(1,List), - Acc0 = {FirstK, [FirstV], []}, - FoldFun = fun({K,V}, {K,Vs,Acc}) -> - {K, [V|Vs], Acc}; - ({NewKey,V}, {OldKey,Vs,Acc}) -> - {NewKey, [V], [{OldKey,Vs}|Acc]} - end, - {LastK, LastVs, Acc} = lists:foldl(FoldFun, Acc0, Rest), - [{LastK, LastVs} | Acc]. - -get_const(r) -> - list_to_integer(couch_config:get("cluster", "r", "2")); -get_const(w) -> - list_to_integer(couch_config:get("cluster", "w", "2")); -get_const(r1) -> - 1; -get_const(Other) -> - throw({bad_access_term, Other}). |