summaryrefslogtreecommitdiff
path: root/src/partitions.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/partitions.erl')
-rw-r--r--src/partitions.erl334
1 files changed, 334 insertions, 0 deletions
diff --git a/src/partitions.erl b/src/partitions.erl
new file mode 100644
index 00000000..942968e1
--- /dev/null
+++ b/src/partitions.erl
@@ -0,0 +1,334 @@
+%%%-------------------------------------------------------------------
+%%% File: partitions.erl
+%%% @author Cliff Moon <cliff@powerset.com> [http://www.powerset.com/]
+%%% @copyright 2008 Cliff Moon
+%%% @doc
+%%%
+%%% @end
+%%%
+%%% @since 2008-10-12 by Cliff Moon
+%%%-------------------------------------------------------------------
+-module(partitions).
+-author('cliff@powerset.com').
+
+%% API
+-export([partition_range/1, create_partitions/3, map_partitions/2,
+ diff/2, pp_diff/1, int_to_partition/2,
+ join/3, leave/3, hash/1, hash_to_partition/2, item_to_nodepart/1,
+ shard_name/2, hash_to_hex/2]).
+
+-define(RINGTOP, trunc(math:pow(2,160)-1)). % SHA-1 space
+
+-include("../../couch/src/couch_db.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+%% -ifdef(TEST).
+%% -include("etest/partitions_test.erl").
+%% -endif.
+
+%%====================================================================
+%% API
+%%====================================================================
+
+partition_range(Q) ->
+ trunc( ?RINGTOP / math:pow(2,Q) ). % SHA-1 space / 2^Q
+
+create_partitions(Q, Node, _Nodes) ->
+ fresh(trunc(math:pow(2,Q)), Node).
+ % map_partitions(Table, Nodes).
+
+
+%% @spec map_partitions(Table::proplist(),Nodes::list()) -> proplist()
+%% @doc maps partitions to nodes. The resulting list should be Dynomite format,
+%% namely {Node,Part}
+%% @end
+map_partitions(Table, Nodes) ->
+ {_Nodes, Parts} = lists:unzip(Table),
+ do_map(Nodes, Parts).
+
+
+%% @doc in case Hints is undefined, turn it into a list for clauses below.
+join(Node, Table, undefined) ->
+ join(Node, Table, []);
+
+%% @spec join(node(), proplist(), list()) -> {ok, PartTable::proplist()} |
+%% {error, Error}
+%% @doc given a node, current partition table, and hints, this function returns
+%% the new partition table
+join(Node, Table, Hints) ->
+ {NodeList, Parts} = lists:unzip(Table),
+ OtherNodes = lists:delete(Node, NodeList),
+ OtherDistinctNodes = lists:usort(OtherNodes),
+ %% quick check to see if we have more nodes than partitions
+ if
+ length(Parts) == length(OtherDistinctNodes) ->
+ {error, "Too many nodes vs partitions", Table};
+ true ->
+ AlreadyPresent = length(NodeList) - length(OtherNodes),
+ Nodes = lists:usort(NodeList),
+ PartCountToTake = trunc(length(Parts) / (length(Nodes) + 1)),
+ %% calcs done, let's steal some partitions
+ {HintsTaken, NewTable} = steal_hints(Node, Table, Hints),
+ if
+ PartCountToTake - AlreadyPresent - HintsTaken > 0 ->
+ steal_partitions(Node, OtherDistinctNodes, NewTable,
+ PartCountToTake - AlreadyPresent - HintsTaken);
+ true ->
+ %% no partitions to take
+ {ok, NewTable}
+ end
+ end.
+
+
+%% TODO: implement me
+leave(_Node, Table, _Hints) ->
+ Table.
+
+
+diff(From, To) when length(From) =/= length(To) ->
+ {error, badlength, "Cannot diff partition maps with different length"};
+
+diff(From, To) ->
+ diff(sort_for_diff(From), sort_for_diff(To), []).
+
+
+pp_diff(Diff) ->
+ lists:map(
+ fun({F,T,Part}) -> {F,T,showroom_utils:int_to_hexstr(Part)} end,
+ Diff).
+
+
+%% @spec hash(term()) -> Digest::binary()
+%% @doc Showroom uses SHA-1 as its hash
+hash(Item) ->
+ crypto:sha(term_to_binary(Item)).
+
+
+%% @spec hash_to_partition(binary(), integer()) -> integer()
+%% @doc given a hashed value and Q, return the partition
+hash_to_partition(Hash, Q) ->
+ HashInt = hash_int(Hash),
+ Size = partition_range(Q),
+ Factor = (HashInt div Size),
+ Rem = (HashInt rem Size),
+ if
+ Rem > 0 -> Factor * Size;
+ true -> ((Factor-1) * Size)
+ end.
+
+
+hash_to_hex(Hash, Q) ->
+ Part = hash_to_partition(Hash, Q),
+ showroom_utils:int_to_hexstr(Part).
+
+
+%% @doc given an int and a list of partitions, get the first part greater
+%% than Int. Used for a hex part being turned back into an int.
+int_to_partition(Int, Parts) ->
+ Rem = lists:dropwhile(fun(E) -> E < Int end, lists:sort(Parts)),
+ case Rem of
+ [] -> 0; % wrap-around-ring case (back to 0)
+ [H|_T] -> H
+ end.
+
+
+%% @spec item_to_nodepart(bin()) -> {Node::node(),Part::integer()}
+%% @doc given a raw item, return the node/partition/shard
+%% name based on consistent hashing
+item_to_nodepart(Item) when is_binary(Item) ->
+ Q = list_to_integer(couch_config:get("cluster","q")),
+ Hash = hash(?b2l(Item)),
+ Part = hash_to_partition(Hash, Q),
+ {ok, Table} = membership2:partitions(),
+ lists:keyfind(Part, 2, Table);
+
+item_to_nodepart(Item) ->
+ item_to_nodepart(term_to_binary(Item)).
+
+
+%% @spec shard_name(integer(), binary()) -> binary()
+%% @doc create shard name
+shard_name(Part, DbName) ->
+ PartHex = ?l2b(showroom_utils:int_to_hexstr(Part)),
+ <<"x", PartHex/binary, "/", DbName/binary, "_", PartHex/binary>>.
+
+%%====================================================================
+%% Internal functions
+%%====================================================================
+
+%% @doc Create a brand new table. The size and seednode are specified;
+%% initially all partitions are owned by the seednode. If NumPartitions
+%% is not much larger than the intended eventual number of
+%% participating nodes, then performance will suffer.
+%% from http://code.google.com/p/distributerl (trunk revision 4) chash:fresh/2
+%% @spec fresh(NumPartitions :: integer(), SeedNode :: node()) -> table()
+fresh(NumPartitions, SeedNode) ->
+ Increment = ?RINGTOP div NumPartitions,
+ [{SeedNode, IndexAsInt} || IndexAsInt <- lists:seq(0,(?RINGTOP-1),Increment)].
+
+
+%% @spec steal_hints(node(), proplist(), list( integer() )) ->
+%% {integer(), proplist()}
+%% @doc move the partitions listed in Hints over to the new owner, Node
+steal_hints(Node, Table, Hints) ->
+ steal_hints(Node, Table, Hints, 0).
+
+
+%% @doc recursive workhorse for hints mechanism, Acc is tracking how many
+%% hints/partitions were successfully moved to a new Node.
+%% @end
+steal_hints(_Node, Table, [], Acc) ->
+ {Acc, Table};
+
+steal_hints(Node, Table, [Hint|RestHints], Acc) ->
+ {Status, NewTable} = swap_node_for_part(Node, Hint, Table),
+ Acc1 = case Status of
+ ok -> Acc+1;
+ _ -> Acc
+ end,
+ steal_hints(Node, NewTable, RestHints, Acc1).
+
+
+%% @doc take a part from one of the other nodes based on most # of parts per
+%% node.
+%% @end
+%% TODO: This fun does list ops on the Table each time through. Inefficient?
+%% Hopefully not, due to small Table sizes
+steal_partitions(_Node, _OtherNodes, Table, 0) ->
+ {ok, Table};
+steal_partitions(Node, OtherNodes, Table, Count) ->
+ %% first, get a list of OtherNodes and their partition counts
+ NPCountFun = fun(N) ->
+ L = proplists:get_all_values(N, Table),
+ {N, length(lists:delete(undefined, L))}
+ end,
+ NPCounts = lists:reverse(lists:keysort(2,lists:map(NPCountFun, OtherNodes))),
+ %% grab the node that has the most partitions
+ [{TakeFrom, _PartsCount}|_RestOfTable] = NPCounts,
+ %% get the highest # partition of the TakeFrom node
+ TakeFromParts = lists:reverse(lists:sort(proplists:get_all_values(TakeFrom,
+ Table))),
+ [Part|_RestOfParts] = TakeFromParts,
+ {ok, NewTable} = swap_node_for_part(Node, Part, Table),
+ steal_partitions(Node, OtherNodes, NewTable, Count-1).
+
+
+%% @doc Make Node the owner of the partition beginning at Part.
+%% from http://code.google.com/p/distributerl (trunk revision 4) chash:update/3
+swap_node_for_part(Node, Part, Table) ->
+ case lists:keymember(Part, 2, Table) of
+ true ->
+ GapList = [{N,P} || {N,P} <- Table, P /= Part],
+ {A, B} = lists:partition(fun({_,K1}) -> K1 < Part end, GapList),
+ {ok, A ++ [{Node, Part}] ++ B};
+ false ->
+ showroom_log:message(info,
+ "'~p' partition was not found in partition table", [Part]),
+ {noswap, Table}
+ end.
+
+
+%% @doc get the difference between two FullPMaps
+%% lists need to be sorted by part, then node
+diff([], [], Results) ->
+ lists:reverse(remove_dupes(Results));
+
+diff([{Node,Part,_}|PartsA], [{Node,Part,_}|PartsB], Results) ->
+ diff(PartsA, PartsB, Results);
+
+diff([{NodeA,Part,_}|PartsA], [{NodeB,Part,_}|PartsB], Results) ->
+ diff(PartsA, PartsB, [{NodeA,NodeB,Part}|Results]).
+
+
+%% @doc sorts the full map for diff/3. This may change to get more accurate
+%% diff w/o dupes
+sort_for_diff(FullMap) ->
+ lists:keysort(2,lists:sort(FullMap)).
+
+
+remove_dupes(Diff) ->
+ {_,_,AllParts} = lists:unzip3(Diff),
+ Parts = lists:usort(AllParts),
+ remove_dupes_from_part(Parts, Diff, []).
+
+
+%% @doc ex: take [{a,b,1},{b,c,1}] diff and make it [{a,c,1}] so we don't go
+%% moving unnecessary shard files. 'Move partition 1 from a to b and
+%% then move partition 1 from b to c' is unnecessary. Just move it a to c.
+remove_dupes_from_part([], _Diff, Acc) ->
+ Acc;
+
+remove_dupes_from_part([Part|Rest], Diff, Acc) ->
+ PartData = lists:filter(fun({_,_,P}) -> P =:= Part end, Diff),
+ NewPartData = process_part_data(Part, PartData, PartData, PartData),
+ remove_dupes_from_part(Rest, Diff, lists:concat([NewPartData, Acc])).
+
+
+%% for one partition of the full diff, remove the dupes
+process_part_data(_Part, _PartData, [], Acc) ->
+ Acc;
+
+process_part_data(Part, PartData, [{From,To,_Part}|Rest], Acc) ->
+ case proplists:lookup(To, PartData) of
+ {To, NewTo, _Part} ->
+
+ Remove1 = proplists:delete(To, PartData),
+ Remove2 = proplists:delete(From, Remove1),
+ NewPartData = [{From, NewTo, Part}|Remove2],
+ %?debugFmt("~nFrom : ~p~nTo : ~p~nNewTo: ~p~n"
+ % "Remove1: ~p~nRemove2: ~p~n"
+ % "NewPartData: ~p~n"
+ % , [From, To, NewTo, Remove1, Remove2, NewPartData]),
+ process_part_data(Part, NewPartData, Rest, NewPartData);
+ none ->
+ process_part_data(Part, PartData, Rest, Acc)
+ end.
+
+
+% %% @doc from dynomite
+% diff([], [], Results) ->
+% lists:reverse(Results);
+
+% diff([{Node,Part}|PartsA], [{Node,Part}|PartsB], Results) ->
+% diff(PartsA, PartsB, Results);
+
+% diff([{NodeA,Part}|PartsA], [{NodeB,Part}|PartsB], Results) ->
+% diff(PartsA, PartsB, [{NodeA,NodeB,Part}|Results]).
+
+
+%% @doc does Node/Partition mapping based on Amazon Dynamo paper,
+%% section 6.2, strategy 3, more or less
+%% http://www.allthingsdistributed.com/2007/10/amazons_dynamo.html
+%% @end
+do_map([Node|RestNodes], Parts) ->
+ Max = length(Parts) / length([Node|RestNodes]),
+ do_map(Node, RestNodes, Parts, [], 1, Max).
+
+
+%% return final mapped list
+do_map(_,_,[],Mapped, _, _) ->
+ lists:keysort(1, Mapped);
+
+%% finish off last node, Cnt & Max no longer needed
+do_map(Node, [], [Part|RestParts], Mapped, _, _) ->
+ do_map(Node, [], RestParts, [{Node, Part}|Mapped], 0,0);
+
+%% workhorse clause, iterates through parts, until Cnt > Max, then advances to
+%% next node, wash, rinse, repeat
+do_map(Node, [NextNode|RestNodes], [Part|RestParts], Mapped, Cnt, Max) ->
+ case Cnt > Max of
+ true ->
+ do_map(NextNode, RestNodes, RestParts, [{Node, Part}|Mapped],
+ 1, Max);
+ false ->
+ do_map(Node, [NextNode|RestNodes], RestParts, [{Node, Part}|Mapped],
+ Cnt+1, Max)
+ end.
+
+
+%% TODO: other guards
+hash_int(Hash) when is_binary(Hash) ->
+ <<IndexAsInt:160/integer>> = Hash,
+ IndexAsInt;
+hash_int(Hash) when is_integer(Hash) ->
+ Hash.