From 916871eb58001d8f261edeb838f6839dbc195303 Mon Sep 17 00:00:00 2001 From: Brad Anderson Date: Fri, 28 May 2010 16:50:35 -0400 Subject: begin move of dynomite to membership --- ebin/dynomite.app | 29 ------- ebin/membership.app | 20 +++++ include/membership.hrl | 11 +++ src/dbs.erl | 46 +++++++++++ src/dbs_cache.erl | 86 ++++++++++++++++++++ src/dynomite.erl | 22 ----- src/dynomite_app.erl | 50 ------------ src/dynomite_prof.erl | 164 ------------------------------------- src/dynomite_sup.erl | 58 ------------- src/mem3.erl | 6 +- src/membership.erl | 15 ++++ src/membership_app.erl | 19 +++++ src/membership_sup.erl | 24 ++++++ src/partitions.erl | 217 +++++++++++++++++++++++++++++++++++++++++++++++++ 14 files changed, 439 insertions(+), 328 deletions(-) delete mode 100644 ebin/dynomite.app create mode 100644 ebin/membership.app create mode 100644 src/dbs.erl create mode 100644 src/dbs_cache.erl delete mode 100644 src/dynomite.erl delete mode 100644 src/dynomite_app.erl delete mode 100644 src/dynomite_prof.erl delete mode 100644 src/dynomite_sup.erl create mode 100644 src/membership.erl create mode 100644 src/membership_app.erl create mode 100644 src/membership_sup.erl create mode 100644 src/partitions.erl diff --git a/ebin/dynomite.app b/ebin/dynomite.app deleted file mode 100644 index 634c09b2..00000000 --- a/ebin/dynomite.app +++ /dev/null @@ -1,29 +0,0 @@ -%% dynomite app resource file - -{application, dynomite, - [{description, "Dynomite Clustering System"}, - {mod, {dynomite_app, []}}, - {vsn, "0.9.5-cloudant"}, - {modules, - [ - bootstrap_manager, - bootstrap_receiver, - cluster_ops, - configuration, - dynomite, - dynomite_app, - dynomite_couch_api, - dynomite_couch_storage, - dynomite_prof, - dynomite_sup, - lib_misc, - mem3, - mem_utils, - membership2, - node, - replication, - vector_clock - ]}, - {registered, [membership]}, - {applications, [kernel, stdlib, sasl, crypto, mochiweb]} - ]}. diff --git a/ebin/membership.app b/ebin/membership.app new file mode 100644 index 00000000..82f0b299 --- /dev/null +++ b/ebin/membership.app @@ -0,0 +1,20 @@ +%% membership app resource file + +{application, membership, + [{description, "cluster membership"}, + {mod, {membership_app, []}}, + {vsn, "0.9.6"}, + {modules, + [ + dbs, + dbs_cache, + membership, + membership_app, + membership_sup, + mem3, + partitions, + vector_clock + ]}, + {registered, [membership]}, + {applications, [kernel, stdlib, sasl, crypto, mochiweb]} + ]}. diff --git a/include/membership.hrl b/include/membership.hrl index 98b47e2c..031c12c4 100644 --- a/include/membership.hrl +++ b/include/membership.hrl @@ -1,3 +1,14 @@ +-define(MEMBERSHIP, true). + +-ifndef(FABRIC). +-include("../../fabric/include/fabric.hrl"). +-endif. + +-ifndef(COUCH). +-include("../../couch/src/couch_db.hrl"). +-endif. + +-include_lib("eunit/include/eunit.hrl"). %% version 3 of membership state -record(mem, {header=3, diff --git a/src/dbs.erl b/src/dbs.erl new file mode 100644 index 00000000..b5e17b6a --- /dev/null +++ b/src/dbs.erl @@ -0,0 +1,46 @@ +-module(dbs). +-behaviour(supervisor). + +-export([start_link/0, init/1, childspec/1, sup_upgrade_notify/2]). + +-include("membership.hrl"). + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +init([]) -> + {ok, MemNodes} = mem3:nodes(), + LiveNodes = nodes(), + ChildSpecs = [childspec(N) || N <- MemNodes, lists:member(N, LiveNodes)], + %gen_event:add_handler(membership_events, showroom_dbs_event, []), + {ok, {{one_for_one, 10, 8}, ChildSpecs}}. + +childspec(Node) -> + ?LOG_INFO("dbs repl ~p --> ~p starting", [node(), Node]), + PostBody = {[ + {<<"source">>, <<"dbs">>}, + {<<"target">>, {[{<<"node">>, Node}, {<<"name">>, <<"dbs">>}]}}, + {<<"continuous">>, true} + ]}, + Id = couch_util:to_hex(erlang:md5(term_to_binary([node(), Node]))), + MFA = {couch_rep, start_link, [Id, PostBody, #user_ctx{}]}, + {Node, MFA, permanent, 100, worker, [couch_rep]}. + +% from http://code.google.com/p/erlrc/wiki/ErlrcHowto +sup_upgrade_notify (_Old, _New) -> + {ok, {_, Specs}} = init([]), + + Old = sets:from_list( + [Name || {Name, _, _, _} <- supervisor:which_children(?MODULE)]), + New = sets:from_list([Name || {Name, _, _, _, _, _} <- Specs]), + Kill = sets:subtract(Old, New), + + sets:fold(fun(Id, ok) -> + supervisor:terminate_child(?MODULE, Id), + supervisor:delete_child(?MODULE, Id), + ok + end, + ok, + Kill), + [supervisor:start_child (?MODULE, Spec) || Spec <- Specs ], + ok. diff --git a/src/dbs_cache.erl b/src/dbs_cache.erl new file mode 100644 index 00000000..96319802 --- /dev/null +++ b/src/dbs_cache.erl @@ -0,0 +1,86 @@ +-module(dbs_cache). +-behaviour(gen_server). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). + +-export([start_link/0]). + +-include("membership.hrl"). + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +init([]) -> + ets:new(partitions, [bag, protected, named_table, {keypos,#shard.dbname}]), + ets:new(memnodes, [bag, protected, named_table]), + cache_dbs(), + Self = self(), + couch_db_update_notifier:start_link(fun({updated, <<"dbs">>}) -> + Self ! rebuild_dbs_cache; + (_) -> ok end), + {ok, nil}. + +handle_call(_Msg, _From, State) -> + {reply, ok, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info(rebuild_dbs_cache, State) -> + receive rebuild_dbs_cache -> + handle_info(rebuild_dbs_cache, State) + after 0 -> ok end, + T0 = now(), + ?LOG_INFO("rebuilding dbs DB cache", []), + ets:delete_all_objects(partitions), + ets:delete_all_objects(memnodes), + cache_dbs(), + ?LOG_INFO("rebuild of dbs DB cache complete in ~p ms", + [round(timer:now_diff(now(),T0)/1000)]), + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +cache_dbs() -> + try couch_db:open(<<"dbs">>, []) of + {ok, Db} -> + Bt = Db#db.id_tree, + FoldFun = fun(#full_doc_info{id=Id, deleted=false} = FullDocInfo, _, _) -> + {ok, Doc} = couch_db:open_doc_int(Db, FullDocInfo, []), + {Props} = couch_doc:to_json_obj(Doc, []), + cache_map(Id, Props), + cache_nodes(Id, Props), + {ok, true}; + (_, _, _) -> + {ok, nil} + end, + couch_btree:foldl(Bt, FoldFun, nil), + couch_db:close(Db) + catch exit:{noproc,{gen_server,call,[couch_server|_]}} -> + timer:sleep(1000), + exit(couch_server_is_dead) + end. + +cache_map(Id, Props) -> + Map = couch_util:get_value(map, Props), + lists:foreach(fun({[{node,Node},{b,Beg},{e,End}]}) -> + Part = #shard{ + name = partitions:shard_name(Beg, Id), + dbname = Id, + node = Node, + range = [Beg,End] + }, + ets:insert(partitions, Part) + end, Map). + +cache_nodes(Id, Props) -> + Nodes = couch_util:get_value(nodes, Props), + lists:foreach(fun({[{order,Order},{node, Node},{options,Opts}]}) -> + ets:insert(memnodes, {Id, {Order, Node, Opts}}) + end, Nodes). + +%{ok, ets:insert(dbs_cache, {Id, Props})}; diff --git a/src/dynomite.erl b/src/dynomite.erl deleted file mode 100644 index bb50986b..00000000 --- a/src/dynomite.erl +++ /dev/null @@ -1,22 +0,0 @@ -%%% @doc convenience start/stop functions for Dynomite -%%% --module(dynomite). --author('Brad Anderson '). - --export([start/0, stop/0, restart/0]). - - -%% @doc start Dynomite app with no args, for -s at the command-line -start() -> - application:start(dynomite). - - -%% @doc stops the Dynomite application -stop() -> - application:stop(dynomite). - - -%% @doc restart Dynomite app, with no args -restart() -> - stop(), - start(). diff --git a/src/dynomite_app.erl b/src/dynomite_app.erl deleted file mode 100644 index 4b520921..00000000 --- a/src/dynomite_app.erl +++ /dev/null @@ -1,50 +0,0 @@ --module(dynomite_app). --author('cliff@powerset.com'). --author('brad@cloudant.com'). - --behaviour(application). - --include("../include/config.hrl"). --include("../../couch/src/couch_db.hrl"). - -%% Application callbacks --export([start/2, stop/1]). - -%%==================================================================== -%% Application callbacks -%%==================================================================== -%%-------------------------------------------------------------------- -%% @spec start(Type, StartArgs) -> {ok, Pid} | -%% {ok, Pid, State} | -%% {error, Reason} -%% @doc This function is called whenever an application -%% is started using application:start/1,2, and should start the processes -%% of the application. If the application is structured according to the -%% OTP design principles as a supervision tree, this means starting the -%% top supervisor of the tree. -%% @end -%%-------------------------------------------------------------------- - - -%% @doc start required apps, join cluster, start dynomite supervisor -start(_Type, _StartArgs) -> - % start dynomite supervisor - dynomite_sup:start_link(). - - -%%-------------------------------------------------------------------- -%% @spec stop(State) -> void() -%% @doc This function is called whenever an application -%% has stopped. It is intended to be the opposite of Module:start/2 and -%% should do any necessary cleaning up. The return value is ignored. -%% @end -%%-------------------------------------------------------------------- -stop({_, Sup}) -> - showroom_log:message(alert, "dynomite application stopped", []), - exit(Sup, normal), - ok. - - -%%==================================================================== -%% Internal functions -%%==================================================================== diff --git a/src/dynomite_prof.erl b/src/dynomite_prof.erl deleted file mode 100644 index 80c4b5b7..00000000 --- a/src/dynomite_prof.erl +++ /dev/null @@ -1,164 +0,0 @@ -%%%------------------------------------------------------------------- -%%% File: dynomite_prof.erl -%%% @author Cliff Moon <> [] -%%% @copyright 2009 Cliff Moon -%%% @doc -%%% -%%% @end -%%% -%%% @since 2009-02-15 by Cliff Moon -%%%------------------------------------------------------------------- --module(dynomite_prof). --author('cliff@powerset.com'). - --behaviour(gen_server). - -%% API --export([start_link/0, start_prof/1, stop_prof/1, stats/1, averages/0, balance_prof/0]). - -%% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). - --record(state, {ets,balance}). - --record(profile, {name, count, sum}). - -%%==================================================================== -%% API -%%==================================================================== -%%-------------------------------------------------------------------- -%% @spec start_link() -> {ok,Pid} | ignore | {error,Error} -%% @doc Starts the server -%% @end -%%-------------------------------------------------------------------- -start_link() -> - gen_server:start_link({local, dynomite_prof}, ?MODULE, [], []). - -stats(Id) -> - gen_server:call(dynomite_prof, {stats, Id}). - -balance_prof() -> - gen_server:cast(dynomite_prof, {balance, self(), lib_misc:now_float()}). - -start_prof(Id) -> - gen_server:cast(dynomite_prof, {start, self(), Id, lib_misc:now_float()}). - -stop_prof(Id) -> - gen_server:cast(dynomite_prof, {stop, self(), Id, lib_misc:now_float()}). - -averages() -> - gen_server:call(dynomite_prof, averages). - -%%==================================================================== -%% gen_server callbacks -%%==================================================================== - -%%-------------------------------------------------------------------- -%% @spec init(Args) -> {ok, State} | -%% {ok, State, Timeout} | -%% ignore | -%% {stop, Reason} -%% @doc Initiates the server -%% @end -%%-------------------------------------------------------------------- -init([]) -> - Tid = ets:new(profiling, [set, {keypos, 2}]), - Bal = ets:new(balance, [set]), - {ok, #state{ets=Tid, balance=Bal}}. - -%%-------------------------------------------------------------------- -%% @spec -%% handle_call(Request, From, State) -> {reply, Reply, State} | -%% {reply, Reply, State, Timeout} | -%% {noreply, State} | -%% {noreply, State, Timeout} | -%% {stop, Reason, Reply, State} | -%% {stop, Reason, State} -%% @doc Handling call messages -%% @end -%%-------------------------------------------------------------------- -handle_call({stats, Id}, _From, State = #state{ets=Ets}) -> - Reply = ets:lookup(Ets, Id), - {reply, Reply, State}; - -handle_call(table, _From, State = #state{ets=Ets}) -> - {reply, Ets, State}; - -handle_call(averages, _From, State = #state{ets=Ets,balance=Bal}) -> - Avgs = ets:foldl(fun(#profile{name=Name,count=Count,sum=Sum}, List) -> - [{Name, Sum/Count}|List] - end, [], Ets), - {_, MaxCount} = ets:foldl(fun - ({Pid, Count}, {_P, M}) when Count > M -> {Pid, Count}; - (_, {P, M}) -> {P, M} - end, {pid, 0}, Bal), - Balances = ets:foldl(fun({Pid, Count}, List) -> - [{Pid, Count / MaxCount} | List] - end, [], Bal), - {reply, [Balances, Avgs], State}. - -%%-------------------------------------------------------------------- -%% @spec handle_cast(Msg, State) -> {noreply, State} | -%% {noreply, State, Timeout} | -%% {stop, Reason, State} -%% @doc Handling cast messages -%% @end -%%-------------------------------------------------------------------- -handle_cast({balance, Pid, _Time}, State = #state{balance=Ets}) -> - case ets:lookup(Ets, Pid) of - [] -> ets:insert(Ets, {Pid, 1}); - [{Pid, Count}] -> ets:insert(Ets, {Pid, Count+1}) - end, - {noreply, State}; - -handle_cast({start, Pid, Id, Time}, State = #state{ets=_Ets}) -> - put({Pid,Id}, Time), - {noreply, State}; - -handle_cast({stop, Pid, Id, Time}, State = #state{ets=Ets}) -> - case get({Pid, Id}) of - undefined -> ok; - OldTime -> - erase({Pid, Id}), - increment_time(Ets, Time-OldTime, Id) - end, - {noreply, State}. - -%%-------------------------------------------------------------------- -%% @spec handle_info(Info, State) -> {noreply, State} | -%% {noreply, State, Timeout} | -%% {stop, Reason, State} -%% @doc Handling all non call/cast messages -%% @end -%%-------------------------------------------------------------------- -handle_info(_Info, State) -> - {noreply, State}. - -%%-------------------------------------------------------------------- -%% @spec terminate(Reason, State) -> void() -%% @doc This function is called by a gen_server when it is about to -%% terminate. It should be the opposite of Module:init/1 and do any necessary -%% cleaning up. When it returns, the gen_server terminates with Reason. -%% The return value is ignored. -%% @end -%%-------------------------------------------------------------------- -terminate(_Reason, _State) -> - ok. - -%%-------------------------------------------------------------------- -%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState} -%% @doc Convert process state when code is changed -%% @end -%%-------------------------------------------------------------------- -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%-------------------------------------------------------------------- -%%% Internal functions -%%-------------------------------------------------------------------- -increment_time(Ets, Time, Id) -> - case ets:lookup(Ets, Id) of - [] -> ets:insert(Ets, #profile{name=Id,count=1,sum=Time}); - [#profile{name=Id,count=Count,sum=Sum}] -> ets:insert(Ets, #profile{name=Id,count=Count+1,sum=Sum+Time}) - end. diff --git a/src/dynomite_sup.erl b/src/dynomite_sup.erl deleted file mode 100644 index b60824ac..00000000 --- a/src/dynomite_sup.erl +++ /dev/null @@ -1,58 +0,0 @@ --module(dynomite_sup). --author('brad@cloudant.com'). - --behaviour(supervisor). - -%% API --export([start_link/0]). - -%% Supervisor callbacks --export([init/1]). - --include("../include/config.hrl"). - --define(SERVER, ?MODULE). - -%%==================================================================== -%% API functions -%%==================================================================== -%%-------------------------------------------------------------------- -%% @spec start_link() -> {ok,Pid} | ignore | {error,Error} -%% @doc Starts the supervisor -%% @end -%%-------------------------------------------------------------------- -start_link() -> - supervisor:start_link(?MODULE, []). - -%%==================================================================== -%% Supervisor callbacks -%%==================================================================== -%%-------------------------------------------------------------------- -%% @spec init(Args) -> {ok, {SupFlags, [ChildSpec]}} | -%% ignore | -%% {error, Reason} -%% @doc Whenever a supervisor is started using -%% supervisor:start_link/[2,3], this function is called by the new process -%% to find out about restart strategy, maximum restart frequency and child -%% specifications. -%% @end -%%-------------------------------------------------------------------- -init(_Args) -> - Membership = {membership, - {mem3, start_link, []}, - permanent, - 1000, - worker, - [mem3]}, - MemEventMgr = {mem_event_manager, - {gen_event, start_link, [{local, membership_events}]}, - permanent, - 1000, - worker, - []}, - {ok, {{one_for_one,10,1}, [Membership, MemEventMgr]}}. - - -%%==================================================================== -%% Internal functions -%%==================================================================== diff --git a/src/mem3.erl b/src/mem3.erl index 11c39ef7..cbb7a8d5 100644 --- a/src/mem3.erl +++ b/src/mem3.erl @@ -22,9 +22,6 @@ -export([start_link/0, start_link/1, stop/0, stop/1, reset/0]). -export([join/3, clock/0, state/0, states/0, nodes/0, fullnodes/0, start_gossip/0]). -%-export([partitions/0, fullmap/0]). -%-export([nodes/0, nodes_for_part/1, nodes_for_part/2, all_nodes_parts/1]). -%-export([parts_for_node/1]). %% for testing more than anything else -export([merge_nodes/2, next_up_node/1, next_up_node/3]). @@ -34,8 +31,7 @@ terminate/2, code_change/3]). %% includes --include("../include/membership.hrl"). --include_lib("eunit/include/eunit.hrl"). +-include("membership.hrl"). -define(SERVER, membership). -define(STATE_FILE_PREFIX, "membership"). diff --git a/src/membership.erl b/src/membership.erl new file mode 100644 index 00000000..1e06e798 --- /dev/null +++ b/src/membership.erl @@ -0,0 +1,15 @@ +-module(membership). +-author('Brad Anderson '). + +-export([start/0, stop/0, restart/0]). + + +start() -> + application:start(membership). + +stop() -> + application:stop(membership). + +restart() -> + stop(), + start(). diff --git a/src/membership_app.erl b/src/membership_app.erl new file mode 100644 index 00000000..589a6f81 --- /dev/null +++ b/src/membership_app.erl @@ -0,0 +1,19 @@ +-module(membership_app). +-author('brad@cloudant.com'). + +-behaviour(application). + +-include("membership.hrl"). + +%% Application callbacks +-export([start/2, stop/1]). + +%% @doc start required apps, join cluster, start supervisor +start(_Type, _StartArgs) -> + % start dynomite supervisor + membership_sup:start_link(). + +stop({_, Sup}) -> + ?LOG_ALERT("dynomite application stopped", []), + exit(Sup, normal), + ok. diff --git a/src/membership_sup.erl b/src/membership_sup.erl new file mode 100644 index 00000000..81b6562a --- /dev/null +++ b/src/membership_sup.erl @@ -0,0 +1,24 @@ +-module(membership_sup). +-author('brad@cloudant.com'). + +-behaviour(supervisor). + +%% API +-export([start_link/0]). + +%% Supervisor callbacks +-export([init/1]). + +-define(SERVER, ?MODULE). + +start_link() -> + supervisor:start_link(?MODULE, []). + +init(_Args) -> + Membership = {membership, + {mem3, start_link, []}, + permanent, + 1000, + worker, + [mem3]}, + {ok, {{one_for_one,10,1}, [Membership]}}. diff --git a/src/partitions.erl b/src/partitions.erl new file mode 100644 index 00000000..ade8efe4 --- /dev/null +++ b/src/partitions.erl @@ -0,0 +1,217 @@ +-module(partitions). +-author('brad@cloudant.com'). + +%% API +-export([fullmap/2, fullmap/3, hash/1, install_fullmap/4]). +-export([for_key/2, all_parts/1]). +-export([shard_name/2]). + +-define(RINGTOP, trunc(math:pow(2,160))). % SHA-1 space + +-include("../../couch/src/couch_db.hrl"). +-include("../../dynomite/include/membership.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +%%==================================================================== +%% API +%%==================================================================== + +%% @doc build a full partition map +fullmap(DbName, Options) -> + {ok, FullNodes} = mem3:fullnodes(), + {_, Nodes, _} = lists:unzip3(lists:keysort(1, FullNodes)), + fullmap(DbName, Nodes, Options). + +fullmap(DbName, Nodes, Options) -> + {N,Q} = db_init_constants(Options), + NewNodes = ordered_nodes(DbName, Nodes), + Pmap = pmap(Q, NewNodes), + int_fullmap(DbName, N, Pmap, NewNodes). + +%% @spec hash(term()) -> Digest::binary() +%% @doc uses SHA-1 as its hash +hash(Item) when is_binary(Item) -> + crypto:sha(Item); +hash(Item) -> + crypto:sha(term_to_binary(Item)). + +install_fullmap(DbName, Fullmap, FullNodes, Options) -> + {N,Q} = db_init_constants(Options), + Doc = {[{<<"_id">>,DbName}, + {map, jsonify(map, Fullmap)}, + {nodes, jsonify(nodes, FullNodes)}, + {n,N}, + {q,Q}]}, + write_db_doc(Doc). + +for_key(DbName, Key) -> + HashKey = hash_int(hash(Key)), + Head = #shard{ + name = '_', + node = '_', + dbname = DbName, + range = ['$1','$2'], + ref = '_' + }, + Conditions = [{'<', '$1', HashKey}, {'<', HashKey, '$2'}], + case ets:select(partitions, [{Head, Conditions, ['$_']}]) of + [] -> + erlang:error(database_does_not_exist); + Shards -> + Shards + end. + +all_parts(DbName) -> + ets:lookup(partitions, DbName). + +% %% @doc for the given key, return a list of {Node,Part} tuples. Nodes are both +% %% primary and replication partner nodes, and should number N. +% int_node_parts_for_key(Key) -> +% Config = configuration:get_config(), +% Hash = lib_misc:hash(Key), +% Part = partitions:hash_to_partition(Hash, Config#config.q), +% NodePartList = all_nodes_parts(true), +% lists:filter(fun({_N,P}) -> P =:= Part end, NodePartList). + +%%==================================================================== +%% Internal functions +%%==================================================================== + +%% @doc get cluster constants from options or config +db_init_constants(Options) -> + {const(n, Options), const(q, Options)}. + +%% @doc get individual constant +const(Const, Options) -> + ListResult = case couch_util:get_value(Const, Options) of + undefined -> couch_config:get("cluster", atom_to_list(Const)); + Val -> Val + end, + list_to_integer(ListResult). + +%% @doc hash the dbname, and return the corresponding node for seeding a ring +seednode(DbName, Nodes) -> + Hash = hash(DbName), + HashInt = hash_int(Hash), + Size = partition_range(length(Nodes)), + Factor = (HashInt div Size), + lists:nth(Factor+1, Nodes). + +%% @doc take the list of nodes, and rearrange it, starting with the node that +%% results from hashing the Term +ordered_nodes(Term, Nodes) -> + SeedNode = seednode(Term, Nodes), + {A, B} = lists:splitwith(fun(N) -> N /= SeedNode end, Nodes), + lists:append(B,A). + +%% @doc create a partition map +pmap(NumPartitions, Nodes) -> + Increment = ?RINGTOP div NumPartitions, + Parts = parts(?RINGTOP, Increment, 0, []), + make_map(Nodes, Nodes, Parts, []). + +%% @doc makes a {beg, end} list of partition ranges +%% last range may have an extra few values, because Increment is created +%% with Ringtop 'div' NumPartitions above. +parts(Top, _, Beg, Acc) when Beg > Top -> Acc; +parts(Top, Increment, Beg, Acc) -> + End = case Beg + 2*Increment of + Over when Over > Top -> Top; + _ -> Beg + Increment - 1 + end, + NewAcc = [{Beg, End} | Acc], + parts(Top, Increment, End+1, NewAcc). + +%% @doc create a full map, which is a pmap with N-1 replication partner nodes +%% added per partition +int_fullmap(DbName, N, Pmap, Nodes) -> + Full = lists:foldl(fun({Node,{B,E} = Part}, AccIn) -> + Primary = [#shard{dbname=DbName, node=Node, range=[B,E], + name=shard_name(B,DbName)}], + Partners = partners(DbName, N, Node, Nodes, Part), + lists:append([Primary, Partners, AccIn]) + end, [], Pmap), + lists:reverse(Full). + +partners(DbName, N, Node, Nodes, {Beg,End}) -> + {A, [Node|B]} = lists:splitwith(fun(Nd) -> Nd /= Node end, Nodes), + Nodes1 = lists:append(B,A), + Partners = lists:sublist(Nodes1, N-1), % N-1 replication partner nodes + lists:map(fun(Partner) -> + #shard{dbname=DbName, node=Partner, range=[Beg,End], + name=shard_name(Beg,DbName)} + end, Partners). + + +%% @doc turn hash into an integer +hash_int(Hash) when is_binary(Hash) -> + <> = Hash, + IndexAsInt; +hash_int(Hash) when is_integer(Hash) -> + Hash. + +%% @doc size of one partition in the ring +partition_range(Q) -> + trunc( ?RINGTOP / Q ). % SHA-1 space / Q + +%% @doc assign nodes to each of the partitions. When you run out of nodes, +%% start at the beginning of the node list again. +%% The provided node list starts with the seed node (seednode fun) +make_map(_,_,[], Acc) -> + lists:keysort(2,Acc); +make_map(AllNodes, [], Parts, Acc) -> + % start back at beginning of node list + make_map(AllNodes, AllNodes, Parts, Acc); +make_map(AllNodes, [Node|RestNodes], [Part|RestParts], Acc) -> + % add a node/part combo to the Acc + make_map(AllNodes, RestNodes, RestParts, [{Node,Part}|Acc]). + +jsonify(map, Map) -> + lists:map(fun(#shard{node=Node, range=[Beg,End]}) -> + {[{node, Node}, {b, Beg}, {e, End}]} + end, Map); +jsonify(nodes, Nodes) -> + lists:map(fun({Order, Node, Options}) -> + {[{order, Order}, {node, Node}, {options, Options}]} + end, Nodes). + +write_db_doc(EDoc) -> + Doc = couch_doc:from_json_obj(EDoc), + {ok, Db} = couch_db:open(<<"dbs">>, []), + {ok, NewRev} = couch_db:update_doc(Db, Doc, []), + NewRev. + +shard_name(Part, DbName) when is_list(DbName) -> + shard_name(Part, ?l2b(DbName)); +shard_name(Part, DbName) -> + PartHex = ?l2b(showroom_utils:int_to_hexstr(Part)), + <<"x", PartHex/binary, "/", DbName/binary, "_", PartHex/binary>>. + +% %% @doc given an int and a partition map from ets cache table, +% %% get the first part greater than Int. +% int_to_nps(_, [], _, Acc) -> Acc; +% int_to_nps(Int, [{_,{N,P}} | Rest], CurrentPart, NPAcc) -> +% case P > Int of +% true -> +% case P =/= CurrentPart of +% true -> NPAcc; +% _ -> +% NewAcc = [{N,P}|NPAcc], +% int_to_nps(Int, Rest, P, NewAcc) +% end; +% _ -> int_to_nps(Int, Rest, P, NPAcc) +% end. + + +% % get parts +% {_,NPs} = lists:unzip(Map), +% {_,AllParts} = lists:unzip(NPs), +% Parts = lists:usort(AllParts), +% % lookup part +% Rem = lists:dropwhile(fun(E) -> E < Int end, Parts), +% Part = case Rem of +% [] -> 0; % wrap-around-ring case (back to 0) +% [H|_T] -> H +% end, +% % get nodes/parts +% ok. -- cgit v1.2.3