summaryrefslogtreecommitdiff
path: root/src/cluster_ops.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/cluster_ops.erl')
-rw-r--r--src/cluster_ops.erl43
1 files changed, 17 insertions, 26 deletions
diff --git a/src/cluster_ops.erl b/src/cluster_ops.erl
index bd2ad83d..5bcb6bfa 100644
--- a/src/cluster_ops.erl
+++ b/src/cluster_ops.erl
@@ -242,32 +242,23 @@ strip_ok(Val) -> Val.
%% but is a list so we can use ?PMAP with the results
%% @end
get_dist_tasks(KeyFun, SeqsKVPairs) ->
- %% loop thru SeqsKVPairs adding node/part to each
- NPSV = lists:flatmap(
- fun({Seq,KVPair}) ->
- NodeParts = membership2:nodeparts_for_key(KeyFun(KVPair)),
- lists:map(
- fun(NodePart) ->
- {NodePart, {Seq, KVPair}}
- end, NodeParts)
- end, SeqsKVPairs),
- nodepart_values_list(NPSV).
-
-
-%% pile up the List by NodePart (like a dict)
-nodepart_values_list(List) ->
- DistTasks =
- lists:foldl(
- fun(NodePart, AccIn) ->
- Values = proplists:get_all_values(NodePart, List),
- case length(Values) of
- 0 -> AccIn;
- _ -> [{NodePart, Values} | AccIn]
- end
- end, [], membership2:all_nodes_parts(true)),
- % ?LOG_DEBUG("~nDistTasks: ~p~n", [DistTasks]),
- DistTasks.
-
+ NPSV = lists:flatmap(fun({_,KVPair} = Elem) ->
+ [{NP, Elem} || NP <- membership2:nodeparts_for_key(KeyFun(KVPair))]
+ end, SeqsKVPairs),
+ group_by_key(NPSV).
+
+group_by_key([]) ->
+ [];
+group_by_key(List) ->
+ [{FirstK,FirstV} | Rest] = lists:keysort(1,List),
+ Acc0 = {FirstK, [FirstV], []},
+ FoldFun = fun({K,V}, {K,Vs,Acc}) ->
+ {K, [V|Vs], Acc};
+ ({NewKey,V}, {OldKey,Vs,Acc}) ->
+ {NewKey, [V], [{OldKey,Vs}|Acc]}
+ end,
+ {LastK, LastVs, Acc} = lists:foldl(FoldFun, Acc0, Rest),
+ [{LastK, LastVs} | Acc].
get_const(Access) ->
get_const(Access, unpack_config(configuration:get_config())).