-%%% File: cluster_ops.erl
-%%% @author Brad Anderson <> []
-%%% @copyright 2009 Brad Anderson
-%%% @doc
-%%% @end
-%%% @since 2009-07-21 by Brad Anderson
-%% API
--export([key_lookup/3, key_lookup/5,
- all_parts/4,
- some_parts/4, some_parts/5,
- quorum_from_each_part/3]).
-%% API
-%% @doc Get to the proper shard on N nodes by key lookup
-%% This fun uses quorum constants from config
-key_lookup(Key, {M,F,A}, Access) ->
- N = list_to_integer(couch_config:get("cluster", "n", "3")),
- key_lookup(Key, {M,F,A}, Access, get_const(Access), N).
-%% @doc Get to the proper shard on N nodes by key lookup
-%% This fun uses a provided quorum constant, possibly from request,
-%% possibly from config
-key_lookup(Key, {M,F,A}, Access, Const, N) ->
- NodeParts = membership2:nodeparts_for_key(Key),
- {ResolveFun, NotFoundFun} = case Access of
- r -> {fun resolve_read/1, fun resolve_not_found/2};
- w -> {fun resolve_write/1, fun(_,_) -> {false, notused, []} end}
- end,
- MapFun = fun({Node,Part}) ->
- try
- rpc:call(Node, M, F, [[Part | A]])
- catch Class:Exception ->
- {error, Class, Exception}
- end
- end,
- {GoodReplies, Bad} = pcall(MapFun, NodeParts, N),
- if length(Bad) > 0 -> ?LOG_DEBUG("~nBad: ~p~n", [Bad]); true -> ok end,
- Good = lists:map(fun strip_ok/1, GoodReplies),
- final_key_lookup(Good, Bad, N, Const, ResolveFun, NotFoundFun, Access).
-%% @doc Do op on all shards (and maybe even replication partners)
-all_parts({M,F,A}, Access, AndPartners, ResolveFun) ->
- NodePartList = membership2:all_nodes_parts(AndPartners),
- MapFun = fun({Node, Part}) ->
- try
- rpc:call(Node, M, F, [[Part | A]])
- catch Class:Exception ->
- {error, Class, Exception}
- end
- end,
- Replies = ?PMAP(MapFun, NodePartList),
- {Good, Bad} = lists:partition(fun valid/1, Replies),
- final_all_parts(Good, Bad, length(NodePartList), ResolveFun, Access).
-%% @doc Do op on some shards, depending on list of keys sent in.
-%% This fun uses quorum constants from config
-some_parts(KeyFun, SeqsKVPairs, {M,F,A}, Access) ->
- some_parts(KeyFun, SeqsKVPairs, {M,F,A}, Access, get_const(Access)).
-%% @doc Do op on some shards, depending on list of keys sent in.
-%% This fun uses a provided quorum constant, possibly from request,
-%% possibly from config
-some_parts(KeyFun, SeqsKVPairs, {M,F,A}, _Access, Const) ->
- TaskFun = fun({{Node,Part}, Values}) ->
- try
- rpc:call(Node, M, F, [[Part | [Values | A]]])
- catch Class:Exception ->
- {error, Class, Exception}
- end
- end,
- % get tasks per node that are part / values for that partition
- DistTasks = get_dist_tasks(KeyFun, SeqsKVPairs),
- % With the distributed tasklist in hand, do the tasks per partition.
- % For each partition, do the work on all nodes/parts.
- TaskReplies = ?PMAP(TaskFun, DistTasks),
- {GoodReplies, Bad} = lists:partition(fun valid/1, TaskReplies),
- if length(Bad) > 0 -> ?LOG_DEBUG("~nBad: ~p~n", [Bad]); true -> ok end,
- Good = lists:map(fun strip_ok/1, GoodReplies),
- final_some_parts(Good, Bad, Const).
-quorum_from_each_part({M,F,A}, Access, ResolveFun) ->
- Const = get_const(Access),
- {_, Parts} = lists:unzip(membership2:partitions()),
- PartsMapFun = fun(Part) ->
- Nodes = membership2:nodes_for_part(Part),
- NodesMapFun = fun(Node) -> rpc:call(Node, M, F, [[Part | A]]) end,
- {GoodReplies,BadReplies} = pcall(NodesMapFun, Nodes, Const),
- Good1 = lists:map(fun strip_ok/1, GoodReplies),
- Bad1 = case length(Good1) >= Const of
- true -> [];
- false -> BadReplies
- end,
- {Good1,Bad1}
- end,
- Results1 = ?PMAP(PartsMapFun, Parts),
- {Good,Bad} = lists:foldl(fun({G,B}, {GAcc,BAcc}) ->
- {lists:append(G,GAcc),lists:append(B,BAcc)}
- end, {[],[]}, Results1),
- if length(Bad) > 0 -> ?LOG_DEBUG("~nBad: ~p~n", [Bad]); true -> ok end,
- final_quorum_from_each_part(Good, Bad, length(Parts), ResolveFun, Access).
-%% Internal functions
-final_key_lookup(Good, Bad, N, Const, ResolveFun, NotFoundFun, Access) ->
- {NotFound, Return, Reasons} = NotFoundFun(Bad, Const),
- if
- length(Good) >= Const -> {ok, ResolveFun(Good)};
- NotFound -> {ok, Return, Reasons};
- true -> error_message(Good, Bad, N, Const, Access)
- end.
-final_all_parts(Good, Bad, Total, ResolveFun, Access) ->
- case length(Good) =:= Total of
- true -> {ok, ResolveFun(Good)};
- _ -> error_message(Good, Bad, Total, Total, Access)
- end.
-final_some_parts(Good, _Bad, Const) ->
- Good1 = lists:flatten(Good),
- {Seqs, _} = lists:unzip(Good1),
- {ResG,ResB} =
- lists:foldl(
- fun(Seq, {AccG,AccB}) ->
- Vals = proplists:get_all_values(Seq, Good1),
- case length(Vals) >= Const of
- true -> {[{Seq, Vals}|AccG],AccB};
- _ -> {AccG, [{Seq, Vals}|AccB]}
- end
- end, {[],[]}, lists:usort(Seqs)),
- case length(ResB) of
- 0 -> {ok, ResG};
- _ -> {error, ResB}
- end.
-final_quorum_from_each_part(Good, Bad, Total, ResolveFun, Access) ->
- case length(Good) =:= Total of
- true -> {ok, ResolveFun(Good)};
- _ -> error_message(Good, Bad, Total, Total, Access)
- end.
-resolve_read([First|Responses]) ->
- case First of
- not_found -> not_found;
- _ -> lists:foldr(fun vector_clock:resolve/2, First, Responses)
- end.
-resolve_write([First|Responses]) ->
- case First of
- not_found -> not_found;
- _ -> lists:foldr(fun vector_clock:resolve/2, First, Responses)
- end.
-resolve_not_found(Bad, R) ->
- {NotFoundCnt, DeletedCnt, OtherReasons} =
- lists:foldl(fun({Error,Reason}, {NotFoundAcc, DeletedAcc, ReasonAcc}) ->
- case {Error,Reason} of
- {not_found, {_Clock, [missing|_Rest]}} ->
- {NotFoundAcc+1, DeletedAcc, ReasonAcc};
- {not_found, {_Clock, [deleted|_Rest]}} ->
- {NotFoundAcc, DeletedAcc+1, ReasonAcc};
- _ ->
- {NotFoundAcc, DeletedAcc, [Reason|ReasonAcc]}
- end
- end, {0, 0, []}, Bad),
- % TODO: is the comparison to R good here, or should it be N-R?
- if
- NotFoundCnt >= R -> {true, {not_found, missing}, OtherReasons};
- DeletedCnt >= R -> {true, {not_found, deleted}, OtherReasons};
- true -> {false, other, OtherReasons}
- end.
-error_message(Good, Bad, N, T, Access) ->
- Msg = list_to_atom(lists:concat([atom_to_list(Access), "_quorum_not_met"])),
- ?LOG_ERROR("~p~nSuccess on ~p of ~p servers. Needed ~p. Errors: ~w"
- , [Msg, length(Good), N, T, Bad]),
- [{error, Msg}, {good, Good}, {bad, Bad}].
-pcall(MapFun, Servers, Const) ->
- Replies = lib_misc:pmap(MapFun, Servers, Const),
- lists:partition(fun valid/1, Replies).
-valid({ok, _}) -> true;
-valid(ok) -> true;
-valid(_) -> false.
-strip_ok({ok, Val}) -> Val;
-strip_ok(Val) -> Val.
-%% @spec get_dist_tasks(KeyFun::function(), KVPairs::list()) ->
-%% [{{Node::node(), Part::integer()}, SeqVals}]
-%% Type - ordered | ??
-%% SeqVals - [{Seq, Val}]
-%% @doc builds a distributed task list of nodes with a list of shard/values.
-%% This looks like a dict structure
-%% but is a list so we can use ?PMAP with the results
-%% @end
-get_dist_tasks(KeyFun, SeqsKVPairs) ->
- NPSV = lists:flatmap(fun({_,KVPair} = Elem) ->
- [{NP, Elem} || NP <- membership2:nodeparts_for_key(KeyFun(KVPair))]
- end, SeqsKVPairs),
- group_by_key(NPSV).
-group_by_key([]) ->
- [];
-group_by_key(List) ->
- [{FirstK,FirstV} | Rest] = lists:keysort(1,List),
- Acc0 = {FirstK, [FirstV], []},
- FoldFun = fun({K,V}, {K,Vs,Acc}) ->
- {K, [V|Vs], Acc};
- ({NewKey,V}, {OldKey,Vs,Acc}) ->
- {NewKey, [V], [{OldKey,Vs}|Acc]}
- end,
- {LastK, LastVs, Acc} = lists:foldl(FoldFun, Acc0, Rest),
- [{LastK, LastVs} | Acc].
-get_const(r) ->
- list_to_integer(couch_config:get("cluster", "r", "2"));
-get_const(w) ->
- list_to_integer(couch_config:get("cluster", "w", "2"));
-get_const(r1) ->
- 1;
-get_const(Other) ->
- throw({bad_access_term, Other}).