diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/Makefile | 11 | ||||
-rw-r--r-- | src/bootstrap_manager.erl | 261 | ||||
-rw-r--r-- | src/bootstrap_receiver.erl | 121 | ||||
-rw-r--r-- | src/cluster_ops.erl | 282 | ||||
-rw-r--r-- | src/configuration.erl | 99 | ||||
-rw-r--r-- | src/dynomite.erl | 23 | ||||
-rw-r--r-- | src/dynomite_app.erl | 145 | ||||
-rw-r--r-- | src/dynomite_couch_api.erl | 140 | ||||
-rw-r--r-- | src/dynomite_couch_storage.erl | 41 | ||||
-rw-r--r-- | src/dynomite_http.erl | 21 | ||||
-rw-r--r-- | src/dynomite_prof.erl | 164 | ||||
-rw-r--r-- | src/dynomite_sup.erl | 85 | ||||
-rw-r--r-- | src/lib_misc.erl | 235 | ||||
-rw-r--r-- | src/mem_utils.erl | 129 | ||||
-rw-r--r-- | src/membership2.erl | 686 | ||||
-rw-r--r-- | src/node.erl | 39 | ||||
-rw-r--r-- | src/partitions.erl | 334 | ||||
-rw-r--r-- | src/replication.erl | 165 | ||||
-rw-r--r-- | src/vector_clock.erl | 99 |
19 files changed, 3080 insertions, 0 deletions
diff --git a/src/Makefile b/src/Makefile new file mode 100644 index 00000000..32aa1872 --- /dev/null +++ b/src/Makefile @@ -0,0 +1,11 @@ +include ../support/include.mk + +all: $(EBIN_FILES_NO_DOCS) + +doc: $(EBIN_FILES) + +debug: + $(MAKE) DEBUG=-DDEBUG + +clean: + rm -rf $(EBIN_FILES) diff --git a/src/bootstrap_manager.erl b/src/bootstrap_manager.erl new file mode 100644 index 00000000..f1303223 --- /dev/null +++ b/src/bootstrap_manager.erl @@ -0,0 +1,261 @@ +%%%------------------------------------------------------------------- +%%% File: bootstrap_manager.erl +%%% @author Cliff Moon <> [] +%%% @copyright 2009 Cliff Moon +%%% @doc This is the bootstrap manager for a cluster. +%%% +%%% @end +%%% +%%% @since 2009-07-29 by Cliff Moon +%%%------------------------------------------------------------------- +-module(bootstrap_manager). +-author('cliff@powerset.com'). +-author('brad@cloudant.com'). + +-behaviour(gen_server). + +%% API +-export([start_bootstrap/3, end_bootstrap/1, + start_link/3, start/3, stop/0, + start_transfers/0, transfers/0]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-record(state, {transfer_list, nodes, transfers, futurefullmap}). +-record(transfer, {partition, receivers, rate=0, status=starting}). + +-include("../include/config.hrl"). +-include("../include/common.hrl"). + +%%==================================================================== +%% API +%%==================================================================== +%%-------------------------------------------------------------------- +%% @spec start_link() -> {ok,Pid} | ignore | {error,Error} +%% @doc Starts the server +%% @end +%%-------------------------------------------------------------------- +start_bootstrap(State=#membership{node=Node, nodes=Nodes}, + OldFullMap, NewFullMap) -> + case partitions:diff(OldFullMap, NewFullMap) of + [] -> + % no difference in pmaps + {NewFullMap, State#membership{fullmap=NewFullMap}}; + TransferList when is_list(TransferList) -> + ?LOG_DEBUG("~nBootstrap~nNode : ~p~nTransferList :~n~p~n", + [Node, partitions:pp_diff(TransferList)]), + case start_link(TransferList, Nodes, NewFullMap) of + {ok, _Pid} -> + start_transfers(); + Other -> throw(Other) + end, + + % bootstrap has some stuff to do (async), so just give the state + % passed in for now. end_bootstrap will be called with the resulting + % state when it completes + {OldFullMap, State}; + Other -> + % probably occurs b/c T (# of nodes) < N currently. + % more nodes joining should help avoid this error. + ?LOG_ERROR("no_bootstrap - Other: ~p", [Other]), + {NewFullMap, State#membership{fullmap=NewFullMap}} + end. + + +end_bootstrap(#state{futurefullmap=FutureFullMap}) -> + end_bootstrap(FutureFullMap); + +end_bootstrap(NewFullMap) -> + gen_server:call(membership, {newfullmap, NewFullMap}), + stop(). + + +start(TransferList, Nodes, FutureFullMap) -> + gen_server:start({global, bootstrap_manager}, ?MODULE, + [TransferList, Nodes, FutureFullMap], []). + + +start_link(TransferList, Nodes, FutureFullMap) -> + gen_server:start_link({global, bootstrap_manager}, ?MODULE, + [TransferList, Nodes, FutureFullMap], []). + + +stop() -> + gen_server:cast({global, bootstrap_manager}, stop). + + +start_transfers() -> + gen_server:cast({global, bootstrap_manager}, start_transfers). + + +transfers() -> + gen_server:call({global, bootstrap_manager}, transfers). + + +%%==================================================================== +%% gen_server callbacks +%%==================================================================== + +%%-------------------------------------------------------------------- +%% @spec init(Args) -> {ok, State} | +%% {ok, State, Timeout} | +%% ignore | +%% {stop, Reason} +%% @doc Initiates the server +%% @end +%%-------------------------------------------------------------------- +init([TransferList, Nodes, FutureFullMap]) -> + process_flag(trap_exit, true), + {ok, #state{transfer_list=TransferList,nodes=Nodes, + futurefullmap=FutureFullMap}}. + + +%%-------------------------------------------------------------------- +%% @spec +%% handle_call(Request, From, State) -> {reply, Reply, State} | +%% {reply, Reply, State, Timeout} | +%% {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, Reply, State} | +%% {stop, Reason, State} +%% @doc Handling call messages +%% @end +%%-------------------------------------------------------------------- +handle_call(average_transfer_rate, _From, + State=#state{transfers=Transfers}) -> + {Sum, Cardinality} = ets:foldl( + fun(#transfer{rate=Rate}, {Sum, Cardinality}) -> + {Sum+Rate,Cardinality+1} + end, {0, 0}, Transfers), + AverageRate = Sum / Cardinality, + {reply, AverageRate, State}; + +handle_call(aggregate_transfer_rate, _From, + State=#state{transfers=Transfers}) -> + Sum = ets:foldl(fun(#transfer{rate=Rate}, Sum) -> + Rate + Sum + end, 0, Transfers), + {reply, Sum, State}; + +handle_call(transfers, _From, + State=#state{transfers=Transfers}) -> + {reply, {ok, ets:tab2list(Transfers)}, State}; + +%% at least reply that this 'catch-all' was ignored +handle_call(_Request, _From, State) -> + {reply, ignored, State}. + + +%%-------------------------------------------------------------------- +%% @spec handle_cast(Msg, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% @doc Handling cast messages +%% @end +%%-------------------------------------------------------------------- +handle_cast(stop, State) -> + {stop, normal, State}; + +handle_cast(start_transfers, + State=#state{transfer_list=TransferList}) -> + Transfers = start_transfers(TransferList, State), + {noreply, State#state{transfers=Transfers}}; + +handle_cast(_Msg, State) -> + {noreply, State}. + + +%%-------------------------------------------------------------------- +%% @spec handle_info(Info, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% @doc Handling all non call/cast messages +%% @end +%%-------------------------------------------------------------------- + +handle_info({receiver_done, FromNode, _ToNode, Partition, DbName, Receiver}, + State = #state{transfers=Transfers}) -> + %% TODO use bring_online & ToNode? instead of waiting until end & installing + %% NewFullMap into mem2 + + %% handle the old file + membership2:decommission_part(FromNode, Partition, DbName), + + %% remove from Transfers table + case ets:lookup(Transfers, Partition) of + [Transfer] = [#transfer{receivers=Receivers}] -> + NewReceivers = lists:delete(Receiver, Receivers), + if + length(NewReceivers) == 0 -> ets:delete(Transfers, Partition); + true -> ets:insert(Transfers, Transfer#transfer{receivers=NewReceivers}) + end; + _ -> ok + end, + case ets:first(Transfers) of + '$end_of_table' -> + end_bootstrap(State), + {noreply, State}; + _ -> {noreply, State} + end; + +handle_info(_Info, State) -> + {noreply, State}. + + +%%-------------------------------------------------------------------- +%% @spec terminate(Reason, State) -> void() +%% @doc This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any necessary +%% cleaning up. When it returns, the gen_server terminates with Reason. +%% The return value is ignored. +%% @end +%%-------------------------------------------------------------------- +terminate(_Reason, _State) -> + ok. + + +%%-------------------------------------------------------------------- +%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState} +%% @doc Convert process state when code is changed +%% @end +%%-------------------------------------------------------------------- +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + + +%%-------------------------------------------------------------------- +%%% Internal functions +%%-------------------------------------------------------------------- +start_transfers([], State) -> + no_transfers, % no diff in pmaps, so no transfers + end_bootstrap(State); + +start_transfers(Diff, State=#state{nodes=Nodes}) -> + case showroom_db:all_databases("") of + {ok, AllDbs} when length(AllDbs) > 0 -> + start_transfers(Diff, Nodes, configuration:get_config(), AllDbs, + ets:new(transfers, [public, set, {keypos, 2}])); + {ok, []} -> end_bootstrap(State); % no databases, so bootstrap not needed + Other -> throw(Other) % problem getting list of dbs + end. + + +start_transfers([], _, _, _, Transfers) -> + Transfers; + +start_transfers([{FromNode, ToNode, Partition} | Diff], Nodes, Config, + AllDbs, Transfers) -> + membership2:take_offline(FromNode, Partition), + Receivers = lists:map( + fun(DbName) -> + {ok, Receiver} = + bootstrap_receiver:start_link(FromNode, ToNode, Partition, + DbName, 10000, self()), + Receiver + end, AllDbs), + % NOTE: by using AllDbs, we are omitting .deleted.couch files + ets:insert(Transfers, #transfer{partition=Partition, + receivers=Receivers}), + start_transfers(Diff, Nodes, Config, AllDbs, Transfers). diff --git a/src/bootstrap_receiver.erl b/src/bootstrap_receiver.erl new file mode 100644 index 00000000..3b4907cb --- /dev/null +++ b/src/bootstrap_receiver.erl @@ -0,0 +1,121 @@ +%%%------------------------------------------------------------------- +%%% File: bootstrap_receiver.erl +%%% @author Brad Anderson <brad@cloudant.com> +%%% @copyright 2009 Brad Anderson +%%% @doc +%%% +%%% @end +%%% +%%% @since 2009-09-22 by Brad Anderson +%%%------------------------------------------------------------------- +-module(bootstrap_receiver). +-author('brad@cloudant.com'). + +-include("../include/config.hrl"). +-include("../include/common.hrl"). + +%% API +-export([start_link/6, loop/6, fetch_shard/5]). + + +%%==================================================================== +%% API +%%==================================================================== +%%-------------------------------------------------------------------- +%% @spec +%% @doc +%% @end +%%-------------------------------------------------------------------- +start_link(FromNode, ToNode, Partition, DbName, Timeout, Manager) -> + Pid = proc_lib:spawn_link(ToNode, bootstrap_receiver, loop, + [FromNode, Partition, DbName, Timeout, Manager, + self()]), + sync_wait(Pid, Timeout). + + +loop(FromNode, Partition, DbName, Timeout, Manager, Parent) -> + proc_lib:init_ack(Parent, {ok, self()}), + fetch_shard(FromNode, Partition, DbName, Timeout, Manager). + + +%% @doc run at "ToNode" via spawn_link +fetch_shard(FromNode, Partition, DbName, Timeout, Manager) -> + Directory = couch_config:get("couchdb", "database_dir"), + [_NodeName, Hostname] = string:tokens(atom_to_list(FromNode), "@"), + SrcFile = binary_to_list(partitions:shard_name(Partition, DbName)), + DestFile = showroom_utils:full_filename(Partition, DbName, Directory), + Authn = fetch_authn(), + Port = fetch_port(), + Url = lists:concat(["http://", Authn, Hostname, Port, "/", SrcFile, + ".couch"]), + Options = [{save_response_to_file, DestFile}, + {inactivity_timeout, Timeout}], + case filelib:ensure_dir(DestFile) of + ok -> ok; + {error, eexist} -> ok; % duh! + Other -> throw(Other) + end, + ?LOG_DEBUG("~n" + "Directory: ~p~n" + "Hostname : ~p~n" + "SrcFile : ~p~n" + "DestFile : ~p~n" + "Url : ~p~n" + "Options : ~p~n" + , [Directory, Hostname, SrcFile, DestFile, Url, Options]), + case ibrowse:send_req(Url, [], get, [], Options, infinity) of + {ok, "200", _Headers, Body} -> + ?LOG_DEBUG("~nBootstrap ibrowse req Body: ~p~n", [Body]), + Manager ! {receiver_done, FromNode, node(), Partition, DbName, + self()}; + Error -> + ?LOG_ERROR("~nBootstrap ibrowse req Error: ~p~n", [Error]), + throw(Error) + end. + + +%%==================================================================== +%% Internal functions +%%==================================================================== + + +%% from proc_lib.erl in otp r13b01 +sync_wait(Pid, Timeout) -> + receive + {ack, Pid, Return} -> + Return; + {'EXIT', Pid, Reason} -> + {error, Reason} + after Timeout -> + unlink(Pid), + exit(Pid, kill), + flush(Pid), + {error, timeout} + end. + + +flush(Pid) -> + receive + {'EXIT', Pid, _} -> + true + after 0 -> + true + end. + + +fetch_authn() -> + User = couch_config:get("shard_moving", "user", ""), + Pass = couch_config:get("shard_moving", "pass", ""), + if + length(User) > 0 andalso length(Pass) > 0 -> + lists:concat([User, ":", Pass, "@"]); + true -> "" + end. + + +fetch_port() -> + Port = couch_config:get("shard_moving", "port", "8080"), + if + Port =:= "80" -> ""; + true -> lists:concat([":", Port]) + end. diff --git a/src/cluster_ops.erl b/src/cluster_ops.erl new file mode 100644 index 00000000..bd2ad83d --- /dev/null +++ b/src/cluster_ops.erl @@ -0,0 +1,282 @@ +%%%------------------------------------------------------------------- +%%% File: cluster_ops.erl +%%% @author Brad Anderson <brad@cloudant.com> [http://cloudant.com] +%%% @copyright 2009 Brad Anderson +%%% @doc +%%% +%%% @end +%%% +%%% @since 2009-07-21 by Brad Anderson +%%%------------------------------------------------------------------- +-module(cluster_ops). +-author('brad@cloudant.com'). + +%% API +-export([key_lookup/3, key_lookup/5, + all_parts/4, + some_parts/4, some_parts/5, + quorum_from_each_part/3]). + +-include("../include/common.hrl"). +-include("../include/config.hrl"). + +-include("../include/profile.hrl"). + + +%%==================================================================== +%% API +%%==================================================================== + +%% @doc Get to the proper shard on N nodes by key lookup +%% +%% This fun uses quorum constants from config +key_lookup(Key, {M,F,A}, Access) -> + {N,_R,_W} = Consts = unpack_config(configuration:get_config()), + Const = get_const(Access, Consts), + key_lookup(Key, {M,F,A}, Access, Const, N). + + +%% @doc Get to the proper shard on N nodes by key lookup +%% +%% This fun uses a provided quorum constant, possibly from request, +%% possibly from config +key_lookup(Key, {M,F,A}, Access, Const, N) -> + NodeParts = membership2:nodeparts_for_key(Key), + {ResolveFun, NotFoundFun} = case Access of + r -> {fun resolve_read/1, fun resolve_not_found/2}; + w -> {fun resolve_write/1, fun(_,_) -> {false, notused, []} end} + end, + MapFun = fun({Node,Part}) -> + try + rpc:call(Node, M, F, [[Part | A]]) + catch Class:Exception -> + {error, Class, Exception} + end + end, + {GoodReplies, Bad} = pcall(MapFun, NodeParts, Const), + if length(Bad) > 0 -> ?LOG_DEBUG("~nBad: ~p~n", [Bad]); true -> ok end, + Good = lists:map(fun strip_ok/1, GoodReplies), + final_key_lookup(Good, Bad, N, Const, ResolveFun, NotFoundFun, Access). + + +%% @doc Do op on all shards (and maybe even replication partners) +all_parts({M,F,A}, Access, AndPartners, ResolveFun) -> + NodePartList = membership2:all_nodes_parts(AndPartners), + MapFun = fun({Node, Part}) -> + try + rpc:call(Node, M, F, [[Part | A]]) + catch Class:Exception -> + {error, Class, Exception} + end + end, + Replies = ?PMAP(MapFun, NodePartList), + {Good, Bad} = lists:partition(fun valid/1, Replies), + final_all_parts(Good, Bad, length(NodePartList), ResolveFun, Access). + + +%% @doc Do op on some shards, depending on list of keys sent in. +%% +%% This fun uses quorum constants from config +some_parts(KeyFun, SeqsKVPairs, {M,F,A}, Access) -> + Const = get_const(Access), + some_parts(KeyFun, SeqsKVPairs, {M,F,A}, Access, Const). + + +%% @doc Do op on some shards, depending on list of keys sent in. +%% +%% This fun uses a provided quorum constant, possibly from request, +%% possibly from config +some_parts(KeyFun, SeqsKVPairs, {M,F,A}, _Access, Const) -> + TaskFun = fun({{Node,Part}, Values}) -> + try + rpc:call(Node, M, F, [[Part | [Values | A]]]) + catch Class:Exception -> + {error, Class, Exception} + end + end, + + % get tasks per node that are part / values for that partition + DistTasks = get_dist_tasks(KeyFun, SeqsKVPairs), + + % With the distributed tasklist in hand, do the tasks per partition. + % For each partition, do the work on all nodes/parts. + TaskReplies = ?PMAP(TaskFun, DistTasks), + {GoodReplies, Bad} = lists:partition(fun valid/1, TaskReplies), + if length(Bad) > 0 -> ?LOG_DEBUG("~nBad: ~p~n", [Bad]); true -> ok end, + Good = lists:map(fun strip_ok/1, GoodReplies), + final_some_parts(Good, Bad, Const). + + +quorum_from_each_part({M,F,A}, Access, ResolveFun) -> + Const = get_const(Access), + {_, Parts} = lists:unzip(membership2:partitions()), + PartsMapFun = fun(Part) -> + Nodes = membership2:nodes_for_part(Part), + NodesMapFun = fun(Node) -> rpc:call(Node, M, F, [[Part | A]]) end, + {GoodReplies,BadReplies} = pcall(NodesMapFun, Nodes, Const), + Good1 = lists:map(fun strip_ok/1, GoodReplies), + Bad1 = case length(Good1) >= Const of + true -> []; + false -> BadReplies + end, + {Good1,Bad1} + end, + Results1 = ?PMAP(PartsMapFun, Parts), + {Good,Bad} = lists:foldl(fun({G,B}, {GAcc,BAcc}) -> + {lists:append(G,GAcc),lists:append(B,BAcc)} + end, {[],[]}, Results1), + if length(Bad) > 0 -> ?LOG_DEBUG("~nBad: ~p~n", [Bad]); true -> ok end, + final_quorum_from_each_part(Good, Bad, length(Parts), ResolveFun, Access). + + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + +final_key_lookup(Good, Bad, N, Const, ResolveFun, NotFoundFun, Access) -> + {NotFound, Return, Reasons} = NotFoundFun(Bad, Const), + if + length(Good) >= Const -> {ok, ResolveFun(Good)}; + NotFound -> {ok, Return, Reasons}; + true -> error_message(Good, Bad, N, Const, Access) + end. + + +final_all_parts(Good, Bad, Total, ResolveFun, Access) -> + case length(Good) =:= Total of + true -> {ok, ResolveFun(Good)}; + _ -> error_message(Good, Bad, Total, Total, Access) + end. + + +final_some_parts(Good, _Bad, Const) -> + Good1 = lists:flatten(Good), + {Seqs, _} = lists:unzip(Good1), + {ResG,ResB} = + lists:foldl( + fun(Seq, {AccG,AccB}) -> + Vals = proplists:get_all_values(Seq, Good1), + case length(Vals) >= Const of + true -> {[{Seq, Vals}|AccG],AccB}; + _ -> {AccG, [{Seq, Vals}|AccB]} + end + end, {[],[]}, lists:usort(Seqs)), + case length(ResB) of + 0 -> {ok, ResG}; + _ -> {error, ResB} + end. + + +final_quorum_from_each_part(Good, Bad, Total, ResolveFun, Access) -> + case length(Good) =:= Total of + true -> {ok, ResolveFun(Good)}; + _ -> error_message(Good, Bad, Total, Total, Access) + end. + + +resolve_read([First|Responses]) -> + case First of + not_found -> not_found; + _ -> lists:foldr(fun vector_clock:resolve/2, First, Responses) + end. + + +resolve_write([First|Responses]) -> + case First of + not_found -> not_found; + _ -> lists:foldr(fun vector_clock:resolve/2, First, Responses) + end. + + +resolve_not_found(Bad, R) -> + {NotFoundCnt, DeletedCnt, OtherReasons} = + lists:foldl(fun({Error,Reason}, {NotFoundAcc, DeletedAcc, ReasonAcc}) -> + case {Error,Reason} of + {not_found, {_Clock, [missing|_Rest]}} -> + {NotFoundAcc+1, DeletedAcc, ReasonAcc}; + {not_found, {_Clock, [deleted|_Rest]}} -> + {NotFoundAcc, DeletedAcc+1, ReasonAcc}; + _ -> + {NotFoundAcc, DeletedAcc, [Reason|ReasonAcc]} + end + end, {0, 0, []}, Bad), + % TODO: is the comparison to R good here, or should it be N-R? + if + NotFoundCnt >= R -> {true, {not_found, missing}, OtherReasons}; + DeletedCnt >= R -> {true, {not_found, deleted}, OtherReasons}; + true -> {false, other, OtherReasons} + end. + + +error_message(Good, Bad, N, T, Access) -> + Msg = list_to_atom(lists:concat([atom_to_list(Access), "_quorum_not_met"])), + ?LOG_ERROR("~p~nSuccess on ~p of ~p servers. Needed ~p. Errors: ~w" + , [Msg, length(Good), N, T, Bad]), + [{error, Msg}, {good, Good}, {bad, Bad}]. + + +unpack_config(#config{n=N,r=R,w=W}) -> + {N, R, W}. + + +pcall(MapFun, Servers, Const) -> + Replies = lib_misc:pmap(MapFun, Servers, Const), + lists:partition(fun valid/1, Replies). + + +valid({ok, _}) -> true; +valid(ok) -> true; +valid(_) -> false. + + +strip_ok({ok, Val}) -> Val; +strip_ok(Val) -> Val. + + +%% @spec get_dist_tasks(KeyFun::function(), KVPairs::list()) -> +%% [{{Node::node(), Part::integer()}, SeqVals}] +%% Type - ordered | ?? +%% SeqVals - [{Seq, Val}] +%% @doc builds a distributed task list of nodes with a list of shard/values. +%% This looks like a dict structure +%% but is a list so we can use ?PMAP with the results +%% @end +get_dist_tasks(KeyFun, SeqsKVPairs) -> + %% loop thru SeqsKVPairs adding node/part to each + NPSV = lists:flatmap( + fun({Seq,KVPair}) -> + NodeParts = membership2:nodeparts_for_key(KeyFun(KVPair)), + lists:map( + fun(NodePart) -> + {NodePart, {Seq, KVPair}} + end, NodeParts) + end, SeqsKVPairs), + nodepart_values_list(NPSV). + + +%% pile up the List by NodePart (like a dict) +nodepart_values_list(List) -> + DistTasks = + lists:foldl( + fun(NodePart, AccIn) -> + Values = proplists:get_all_values(NodePart, List), + case length(Values) of + 0 -> AccIn; + _ -> [{NodePart, Values} | AccIn] + end + end, [], membership2:all_nodes_parts(true)), + % ?LOG_DEBUG("~nDistTasks: ~p~n", [DistTasks]), + DistTasks. + + +get_const(Access) -> + get_const(Access, unpack_config(configuration:get_config())). + + +get_const(Access, {_N,R,W}) -> + case Access of + r -> R; + w -> W; + r1 -> 1; + Other -> throw({bad_access_term, Other}) + end. diff --git a/src/configuration.erl b/src/configuration.erl new file mode 100644 index 00000000..1caca5ec --- /dev/null +++ b/src/configuration.erl @@ -0,0 +1,99 @@ +%%% -*- erlang-indent-level:2 -*- +%%%------------------------------------------------------------------- +%%% File: configuration.erl +%%% @author Cliff Moon <cliff@powerset.com> +%%% @author Brad Anderson <brad@cloudant.com> +%%% @copyright 2008 Cliff Moon +%%% @doc +%%% This module keeps Dynomite source relatively unchanged, but +%%% reads from couchdb config stuffs +%%% @end +%%% +%%% @since 2008-07-18 by Cliff Moon +%%%------------------------------------------------------------------- +-module(configuration). +-author('cliff@powerset.com'). +-author('brad@cloudant.com'). + +%%-behaviour(gen_server). + +%% API +-export([start_link/1, get_config/1, get_config/0, set_config/1, stop/0]). + +-include_lib("eunit/include/eunit.hrl"). + +-include("../include/config.hrl"). +-include("../include/common.hrl"). + +-define(SERVER, couch_config). +-define(i2l(V), integer_to_list(V)). +-define(l2i(V), list_to_integer(V)). + + +%% ----------------------------------------------------------------- +%% API +%% ----------------------------------------------------------------- + +%% @doc starts couch_config gen_server if it's not already started +start_link(DynomiteConfig) -> + couch_config:start_link([]), + set_config(DynomiteConfig). + + +%% @doc get the config for a remote node +get_config(Node) -> + ClusterConfig = rpc:call(Node, couch_config, get, ["cluster"]), + Directory = rpc:call(Node, couch_config, get, ["couchdb", "database_dir"]), + couch2dynomite_config(ClusterConfig, Directory). + + +%% @doc get the config for the local node +get_config() -> + get_config(node()). + + +%% @doc given a Dynomite config record, put the values into the Couch config +set_config(DynomiteConfig) -> + dynomite2couch_config(DynomiteConfig). + + +%% @doc stop the config server (nothing to do until after couch_config refactor) +stop() -> + couch_config:stop(). + + +%% ----------------------------------------------------------------- +%% Internal functions +%% ----------------------------------------------------------------- + +%% @doc turn a couch config proplist into a dynomite configuration record +couch2dynomite_config(ClusterConfig, Directory) -> + Q = ?l2i(proplists:get_value("q", ClusterConfig, "3")), + R = ?l2i(proplists:get_value("r", ClusterConfig, "2")), + W = ?l2i(proplists:get_value("w", ClusterConfig, "1")), + N = ?l2i(proplists:get_value("n", ClusterConfig, "4")), + %% use couch's database_dir here, to avoid /tmp/data not existing + Webport = ?l2i(proplists:get_value("webport", ClusterConfig, "8080")), + Meta = proplists:get_value("meta", ClusterConfig, []), + StorageMod = proplists:get_value("storage_mod", ClusterConfig, []), + #config{q=Q, r=R, w=W, n=N, directory=Directory, web_port=Webport, + meta=Meta, storage_mod=StorageMod}. + + +%% @doc workhorse for set_config/1 above +dynomite2couch_config(DynomiteConfig) -> + couch_config:set("cluster", "q", ?i2l(DynomiteConfig#config.q), false), + couch_config:set("cluster", "r", ?i2l(DynomiteConfig#config.r), false), + couch_config:set("cluster", "w", ?i2l(DynomiteConfig#config.w), false), + couch_config:set("cluster", "n", ?i2l(DynomiteConfig#config.n), false), + couch_config:set("couchdb", "database_dir", DynomiteConfig#config.directory, + false), + couch_config:set("cluster", "webport", + case DynomiteConfig#config.web_port of + undefined -> "8080"; + _ -> ?i2l(DynomiteConfig#config.web_port) + end, false), + couch_config:set("cluster", "meta", DynomiteConfig#config.meta, false), + couch_config:set("cluster", "storage_mod", + DynomiteConfig#config.storage_mod, false), + ok. diff --git a/src/dynomite.erl b/src/dynomite.erl new file mode 100644 index 00000000..1b9798c0 --- /dev/null +++ b/src/dynomite.erl @@ -0,0 +1,23 @@ +%%% @author Brad Anderson <brad@cloudant.com> +%%% @doc convenience start/stop functions for Dynomite +%%% +-module(dynomite). +-author('Brad Anderson <brad@cloudant.com>'). + +-export([start/0, stop/0, restart/0]). + + +%% @doc start Dynomite app with no args, for -s at the command-line +start() -> + application:start(dynomite). + + +%% @doc stops the Dynomite application +stop() -> + application:stop(dynomite). + + +%% @doc restart Dynomite app, with no args +restart() -> + stop(), + start(). diff --git a/src/dynomite_app.erl b/src/dynomite_app.erl new file mode 100644 index 00000000..6ee0b978 --- /dev/null +++ b/src/dynomite_app.erl @@ -0,0 +1,145 @@ +%%%------------------------------------------------------------------- +%%% File: dynomite.erl +%%% @author Cliff Moon <cliff@powerset.com> [] +%%% @copyright 2008 Cliff Moon +%%% @doc +%%% +%%% @end +%%% +%%% @since 2008-06-27 by Cliff Moon +%%%------------------------------------------------------------------- +-module(dynomite_app). +-author('cliff@powerset.com'). +-author('brad@cloudant.com'). + +-behaviour(application). + +-include("../include/config.hrl"). +-include("../../couch/src/couch_db.hrl"). + +%% Application callbacks +-export([start/2, stop/1]). + +-define(APPS, [crypto,sasl,mochiweb]). +-define(DEFAULT_CLUSTER_URL, "http://localhost:5984/_cluster"). + +%%==================================================================== +%% Application callbacks +%%==================================================================== +%%-------------------------------------------------------------------- +%% @spec start(Type, StartArgs) -> {ok, Pid} | +%% {ok, Pid, State} | +%% {error, Reason} +%% @doc This function is called whenever an application +%% is started using application:start/1,2, and should start the processes +%% of the application. If the application is structured according to the +%% OTP design principles as a supervision tree, this means starting the +%% top supervisor of the tree. +%% @end +%%-------------------------------------------------------------------- + + +%% @doc start required apps, join cluster, start dynomite supervisor +start(_Type, _StartArgs) -> + % get process_dict hack for startargs (i.e. not from .app file) + PdStartArgs = case erase(startargs) of + undefined -> + []; + Args -> + Args + end, + + % start required apps + State = start_apps(), + + % start dynomite supervisor + ok = start_node(), + case dynomite_sup:start_link(PdStartArgs) of + {ok, Supervisor} -> + {ok, Supervisor, State}; + Error -> + Error + end. + + +%%-------------------------------------------------------------------- +%% @spec stop(State) -> void() +%% @doc This function is called whenever an application +%% has stopped. It is intended to be the opposite of Module:start/2 and +%% should do any necessary cleaning up. The return value is ignored. +%% @end +%%-------------------------------------------------------------------- +stop({_, Sup}) -> + showroom_log:message(alert, "dynomite application stopped", []), + exit(Sup, normal), + ok. + + +%%==================================================================== +%% Internal functions +%%==================================================================== + +start_apps() -> + Fun = fun(App, AccIn) -> + Result = case application:start(App) of + ok -> + App; + {error, {already_started, App}} -> + nil; + _Error -> + exit(app_start_fail) + end, + if + Result =/= nil -> [App|AccIn]; + true -> AccIn + end + end, + lists:foldl(Fun, [], ?APPS). + + +%% @spec start_node() -> ok | {error, Reason} +%% @doc start this node (join to dist. erlang cluster) +start_node() -> + PingUrl = couch_config:get("cluster","ping", ?DEFAULT_CLUSTER_URL), + ?LOG_DEBUG("PingUrl: ~p", [PingUrl]), + Result = case get_pingnode(PingUrl, 1) of + {ok, PingNode} -> + join(PingNode); + _ -> + ?LOG_INFO("No pingnode found. Becoming single-node cluster", []) + end, + couch_api:create_db(<<"users">>, []), % all nodes have local 'users' db + Result. + + +%% @spec get_pingnode(Url::string(), Retries::int()) -> node() | +%% {error, Reason} +%% @doc make a http get call to Url to get cluster information +get_pingnode(Url, Retries) -> + try couch_rep_httpc:request(#http_db{url=Url, retries=Retries}) of + {[{<<"ping_node">>, Node}]} -> + {ok, list_to_atom(binary_to_list(Node))}; + _ -> + {error, pingnode_not_found} + catch + _:_ -> + {error, pingnode_not_found} + end. + + +join(PingNode) -> + if + node() =:= PingNode -> + ok; % we must be brain, so we'll take over the world + true -> + case net_adm:ping(PingNode) of + pong -> + % there is a cluster, we just joined it + ?LOG_DEBUG("ping successful, we're in.", []), + timer:sleep(1000); %% grr, what a hack, erlang. rly? + pang -> + ?LOG_ERROR("ping not successful.", []), + throw({cluster_error, ?l2b("cluster ping not successful")}) + end + end, + ok. diff --git a/src/dynomite_couch_api.erl b/src/dynomite_couch_api.erl new file mode 100644 index 00000000..a5ad53c4 --- /dev/null +++ b/src/dynomite_couch_api.erl @@ -0,0 +1,140 @@ +%% This is a Dynomite plugin for calling the CouchDB raw Erlang API +%% +%% Most calls will have come from any of the web endpoints to execute +%% these functions on the proper node for the key(s). + +-module(dynomite_couch_api). +-author('brad@cloudant.com'). + +-export([create_db/1, delete_db/1, get/1, put/1, + bulk_docs/1, missing_revs/1, get_db_info/1, get_view_group_info/1, + ensure_full_commit/1 + ]). + +-include("../../couch/src/couch_db.hrl"). +-include("../include/common.hrl"). + + +%%-------------------------------------------------------------------- +%% @spec create_db([Part, DbName, Options]) -> {ok,Db} | {error,Error} +%% Description: Creates the database shard. +%%-------------------------------------------------------------------- +create_db([Part, DbName, Options]) -> + case couch_server:create(partitions:shard_name(Part, DbName), Options) of + {ok, Shard} -> + couch_db:close(Shard), + ok; + Error -> Error + end. + + +%%-------------------------------------------------------------------- +%% @spec delete_db([Part, DbName, Options]) -> {ok,deleted} | {error,Error} +%% Description: Deletes the database shard. +%%-------------------------------------------------------------------- +delete_db([Part, DbName, Options]) -> + couch_server:delete(partitions:shard_name(Part, DbName), Options). + + +get([Part, Db, DocId, Revs, Options]) -> + case showroom_db:open_shard(node(), Part, Db) of + {ok, Shard} -> + {Status, Doc} = couch_api:open_doc(Shard, DocId, Revs, Options), + showroom_db:close_shard(Shard), + {Status, {[], [Doc]}}; + Error -> + Error + end. + + +put([Part, Db, Doc = #doc{clock=Clock}, Options]) -> + case showroom_db:open_shard(node(), Part, Db) of + {ok, Shard} -> + {Status, NewRev} = couch_db:update_doc(Shard, Doc, Options), + showroom_db:close_shard(Shard), + {Status, {Clock, [NewRev]}}; + Error -> + Error + end. + + +bulk_docs([Part, SeqsDocs, Db, Options, Type]) -> + {Seqs, Docs} = lists:unzip(SeqsDocs), + case Docs of + [] -> {ok, []}; + _ -> + case showroom_db:open_shard(node(), Part, Db) of + {ok, Shard} -> + {ok, Results1} = couch_db:update_docs(Shard, Docs, Options, Type), + showroom_db:close_shard(Shard), + Results = int_zip(Seqs, Results1), + {ok, Results}; + Error -> + Error + end + end. + + +missing_revs([Part, SeqsIdsRevs, Db]) -> + {_Seqs, IdsRevs} = lists:unzip(SeqsIdsRevs), + case IdsRevs of + [] -> {ok, []}; + _ -> + case showroom_db:open_shard(node(), Part, Db) of + {ok, Shard} -> + {ok, Results1} = couch_db:get_missing_revs(Shard, IdsRevs), + showroom_db:close_shard(Shard), + {ok, Results1}; + Error -> + Error + end + end. + + +get_db_info([Part, Db]) -> + case showroom_db:open_shard(node(), Part, Db) of + {ok, Shard} -> + {Status, Info} = couch_db:get_db_info(Shard), + showroom_db:close_shard(Shard), + {Status, {[], Info}}; + Error -> + Error + end. + +get_view_group_info([Part, Db, DesignId]) -> + case showroom_db:open_shard(node(), Part, Db) of + {ok, Shard} -> + {ok, EmptyGroup} = showroom_view:build_skeleton_view_group(Db, DesignId), + <<"S", ShardName/binary>> = Shard#db.name, + {ok, Pid} = gen_server:call(couch_view, {get_group_server, + ShardName, EmptyGroup}), + {ok, Info} = couch_view_group:request_group_info(Pid), + showroom_db:close_shard(Shard), + {ok, {[], Info}}; + Error -> + Error + end. + + +ensure_full_commit([Part, Db]) -> + case showroom_db:open_shard(node(), Part, Db) of + {ok, Shard} -> + {Status, Info} = couch_db:ensure_full_commit(Shard), + showroom_db:close_shard(Shard), + {Status, {[], Info}}; + Error -> + Error + end. + + +%% ======================= +%% internal +%% ======================= + +int_zip(Seqs, Docs) when length(Seqs) == length(Docs) -> + lists:zip(Seqs, Docs); +int_zip(_Seqs, []) -> + []; +int_zip(Seqs, Docs) -> + ?debugFmt("~nWTF? int_zip~nSeqs: ~p~nDocs: ~p~n", [Seqs, Docs]), + []. diff --git a/src/dynomite_couch_storage.erl b/src/dynomite_couch_storage.erl new file mode 100644 index 00000000..4fd21b80 --- /dev/null +++ b/src/dynomite_couch_storage.erl @@ -0,0 +1,41 @@ +%%%------------------------------------------------------------------- +%%% File: dynomite_couch_storage.erl +%%% @author Brad Anderson +%%% @copyright 2009 Brad Anderson +%%% @doc +%%% +%%% @end +%%% +%%% @since 2009-07-14 +%%%------------------------------------------------------------------- +-module(dynomite_couch_storage). +-author('brad@cloudant.com'). + +%% API +-export([name/1, open/2, close/1, create/2]). +%% , close/1, get/2, put/4, has_key/2, delete/2, fold/3 + +-include_lib("../include/common.hrl"). + +%% -record(row, {key, context, values}). + +%%==================================================================== +%% API +%%==================================================================== + +name(Boundary) -> + showroom_utils:int_to_hexstr(Boundary). + +open(Directory, Name) -> +%% ?debugFmt("~nDirectory: ~p~nName : ~p~n", [Directory,Name]), + {ok, {Directory, Name}}. + +close(_Table) -> ok. + +create(_Directory, _Name) -> + ok. + + +%%==================================================================== +%% Internal functions +%%==================================================================== diff --git a/src/dynomite_http.erl b/src/dynomite_http.erl new file mode 100644 index 00000000..8b6f7fbb --- /dev/null +++ b/src/dynomite_http.erl @@ -0,0 +1,21 @@ +%%%------------------------------------------------------------------- +%%% File : dynomite_http.erl +%%% Author : Brad Anderson <brad@cloudant.com> +%%% Description : +%%% +%%% Created : 10 Jan 2010 by Brad Anderson <brad@cloudant.com> +%%%------------------------------------------------------------------- +-module(dynomite_http). +-author('Brad Anderson <brad@cloudant.com>'). + +-include("../couch/src/couch_db.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-export([handle_cluster_info/1]). + + +%% GET /_cluster +handle_cluster_info(#httpd{method='GET', path_parts=[_]}=Req) -> + ClusterInfo = [{<<"ping_node">>, ?l2b(atom_to_list(node()))}], + showroom_log:message(info, "Cluster Info: ~p", [ClusterInfo]), + couch_httpd:send_json(Req, {ClusterInfo}). diff --git a/src/dynomite_prof.erl b/src/dynomite_prof.erl new file mode 100644 index 00000000..80c4b5b7 --- /dev/null +++ b/src/dynomite_prof.erl @@ -0,0 +1,164 @@ +%%%------------------------------------------------------------------- +%%% File: dynomite_prof.erl +%%% @author Cliff Moon <> [] +%%% @copyright 2009 Cliff Moon +%%% @doc +%%% +%%% @end +%%% +%%% @since 2009-02-15 by Cliff Moon +%%%------------------------------------------------------------------- +-module(dynomite_prof). +-author('cliff@powerset.com'). + +-behaviour(gen_server). + +%% API +-export([start_link/0, start_prof/1, stop_prof/1, stats/1, averages/0, balance_prof/0]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-record(state, {ets,balance}). + +-record(profile, {name, count, sum}). + +%%==================================================================== +%% API +%%==================================================================== +%%-------------------------------------------------------------------- +%% @spec start_link() -> {ok,Pid} | ignore | {error,Error} +%% @doc Starts the server +%% @end +%%-------------------------------------------------------------------- +start_link() -> + gen_server:start_link({local, dynomite_prof}, ?MODULE, [], []). + +stats(Id) -> + gen_server:call(dynomite_prof, {stats, Id}). + +balance_prof() -> + gen_server:cast(dynomite_prof, {balance, self(), lib_misc:now_float()}). + +start_prof(Id) -> + gen_server:cast(dynomite_prof, {start, self(), Id, lib_misc:now_float()}). + +stop_prof(Id) -> + gen_server:cast(dynomite_prof, {stop, self(), Id, lib_misc:now_float()}). + +averages() -> + gen_server:call(dynomite_prof, averages). + +%%==================================================================== +%% gen_server callbacks +%%==================================================================== + +%%-------------------------------------------------------------------- +%% @spec init(Args) -> {ok, State} | +%% {ok, State, Timeout} | +%% ignore | +%% {stop, Reason} +%% @doc Initiates the server +%% @end +%%-------------------------------------------------------------------- +init([]) -> + Tid = ets:new(profiling, [set, {keypos, 2}]), + Bal = ets:new(balance, [set]), + {ok, #state{ets=Tid, balance=Bal}}. + +%%-------------------------------------------------------------------- +%% @spec +%% handle_call(Request, From, State) -> {reply, Reply, State} | +%% {reply, Reply, State, Timeout} | +%% {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, Reply, State} | +%% {stop, Reason, State} +%% @doc Handling call messages +%% @end +%%-------------------------------------------------------------------- +handle_call({stats, Id}, _From, State = #state{ets=Ets}) -> + Reply = ets:lookup(Ets, Id), + {reply, Reply, State}; + +handle_call(table, _From, State = #state{ets=Ets}) -> + {reply, Ets, State}; + +handle_call(averages, _From, State = #state{ets=Ets,balance=Bal}) -> + Avgs = ets:foldl(fun(#profile{name=Name,count=Count,sum=Sum}, List) -> + [{Name, Sum/Count}|List] + end, [], Ets), + {_, MaxCount} = ets:foldl(fun + ({Pid, Count}, {_P, M}) when Count > M -> {Pid, Count}; + (_, {P, M}) -> {P, M} + end, {pid, 0}, Bal), + Balances = ets:foldl(fun({Pid, Count}, List) -> + [{Pid, Count / MaxCount} | List] + end, [], Bal), + {reply, [Balances, Avgs], State}. + +%%-------------------------------------------------------------------- +%% @spec handle_cast(Msg, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% @doc Handling cast messages +%% @end +%%-------------------------------------------------------------------- +handle_cast({balance, Pid, _Time}, State = #state{balance=Ets}) -> + case ets:lookup(Ets, Pid) of + [] -> ets:insert(Ets, {Pid, 1}); + [{Pid, Count}] -> ets:insert(Ets, {Pid, Count+1}) + end, + {noreply, State}; + +handle_cast({start, Pid, Id, Time}, State = #state{ets=_Ets}) -> + put({Pid,Id}, Time), + {noreply, State}; + +handle_cast({stop, Pid, Id, Time}, State = #state{ets=Ets}) -> + case get({Pid, Id}) of + undefined -> ok; + OldTime -> + erase({Pid, Id}), + increment_time(Ets, Time-OldTime, Id) + end, + {noreply, State}. + +%%-------------------------------------------------------------------- +%% @spec handle_info(Info, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% @doc Handling all non call/cast messages +%% @end +%%-------------------------------------------------------------------- +handle_info(_Info, State) -> + {noreply, State}. + +%%-------------------------------------------------------------------- +%% @spec terminate(Reason, State) -> void() +%% @doc This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any necessary +%% cleaning up. When it returns, the gen_server terminates with Reason. +%% The return value is ignored. +%% @end +%%-------------------------------------------------------------------- +terminate(_Reason, _State) -> + ok. + +%%-------------------------------------------------------------------- +%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState} +%% @doc Convert process state when code is changed +%% @end +%%-------------------------------------------------------------------- +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%-------------------------------------------------------------------- +%%% Internal functions +%%-------------------------------------------------------------------- +increment_time(Ets, Time, Id) -> + case ets:lookup(Ets, Id) of + [] -> ets:insert(Ets, #profile{name=Id,count=1,sum=Time}); + [#profile{name=Id,count=Count,sum=Sum}] -> ets:insert(Ets, #profile{name=Id,count=Count+1,sum=Sum+Time}) + end. diff --git a/src/dynomite_sup.erl b/src/dynomite_sup.erl new file mode 100644 index 00000000..f8136934 --- /dev/null +++ b/src/dynomite_sup.erl @@ -0,0 +1,85 @@ +%%%------------------------------------------------------------------- +%%% File: dynomite_sup.erl +%%% @author Cliff Moon <cliff@powerset.com> [] +%%% @copyright 2008 Cliff Moon +%%% @doc +%%% +%%% @end +%%% +%%% @since 2008-06-27 by Cliff Moon +%%%------------------------------------------------------------------- +-module(dynomite_sup). +-author('cliff@powerset.com'). + +-behaviour(supervisor). + +%% API +-export([start_link/1]). + +%% Supervisor callbacks +-export([init/1]). + +-include("../include/config.hrl"). + +-define(SERVER, ?MODULE). + +%%==================================================================== +%% API functions +%%==================================================================== +%%-------------------------------------------------------------------- +%% @spec start_link() -> {ok,Pid} | ignore | {error,Error} +%% @doc Starts the supervisor +%% @end +%%-------------------------------------------------------------------- +start_link(Hints) -> + supervisor:start_link(?MODULE, [Hints]). + +%%==================================================================== +%% Supervisor callbacks +%%==================================================================== +%%-------------------------------------------------------------------- +%% @spec init(Args) -> {ok, {SupFlags, [ChildSpec]}} | +%% ignore | +%% {error, Reason} +%% @doc Whenever a supervisor is started using +%% supervisor:start_link/[2,3], this function is called by the new process +%% to find out about restart strategy, maximum restart frequency and child +%% specifications. +%% @end +%%-------------------------------------------------------------------- +init(Args) -> + Node = node(), + Nodes = running_nodes() ++ [node()], + Membership = {membership, + {membership2, start_link, [Node, Nodes, Args]}, + permanent, + 1000, + worker, + [membership2]}, + MemEventMgr = {mem_event_manager, + {gen_event, start_link, [{local, membership_events}]}, + permanent, + 1000, + worker, + []}, + {ok, {{one_for_one,10,1}, [Membership, MemEventMgr]}}. + + +%%==================================================================== +%% Internal functions +%%==================================================================== + +%% @doc get a list of running nodes visible to this local node +running_nodes() -> + [Node || Node <- nodes([this,visible]), running(Node)]. + +%% @doc monitor the membership server on Node from here +running(Node) -> + Ref = erlang:monitor(process, {membership, Node}), + R = receive + {'DOWN', Ref, _, _, _} -> false + after 1 -> + true + end, + erlang:demonitor(Ref), + R. diff --git a/src/lib_misc.erl b/src/lib_misc.erl new file mode 100644 index 00000000..f5449295 --- /dev/null +++ b/src/lib_misc.erl @@ -0,0 +1,235 @@ +-module(lib_misc). + +-define(OFFSET_BASIS, 2166136261). +-define(FNV_PRIME, 16777619). + +-export([rm_rf/1, pmap/3, succ/1, fast_acc/3, hash/1, hash/2, fnv/1, + nthdelete/2, zero_split/1, nthreplace/3, rand_str/1, position/2, + shuffle/1, floor/1, ceiling/1, time_to_epoch_int/1, + time_to_epoch_float/1, now_int/0, now_float/0, byte_size/1, listify/1, + reverse_bits/1]). + +-include("../include/config.hrl"). +-include("../include/profile.hrl"). + + +rm_rf(Name) when is_list(Name) -> + case filelib:is_dir(Name) of + false -> + file:delete(Name); + true -> + case file:list_dir(Name) of + {ok, Filenames} -> + lists:foreach(fun rm_rf/1, [ filename:join(Name, F) || F <- Filenames]), + file:del_dir(Name); + {error, Reason} -> error_logger:info_msg("rm_rf failed because ~p~n", [Reason]) + end + end. + +zero_split(Bin) -> + zero_split(0, Bin). + +zero_split(N, Bin) when N > erlang:byte_size(Bin) -> Bin; + +zero_split(N, Bin) -> + case Bin of + <<_:N/binary, 0:8, _/binary>> -> split_binary(Bin, N); + _ -> zero_split(N+1, Bin) + end. + +rand_str(N) -> + lists:map(fun(_I) -> + random:uniform(26) + $a - 1 + end, lists:seq(1,N)). + +nthreplace(N, E, List) -> + lists:sublist(List, N-1) ++ [E] ++ lists:nthtail(N, List). + +nthdelete(N, List) -> + nthdelete(N, List, []). + +nthdelete(0, List, Ret) -> + lists:reverse(Ret) ++ List; + +nthdelete(_, [], Ret) -> + lists:reverse(Ret); + +nthdelete(1, [_E|L], Ret) -> + nthdelete(0, L, Ret); + +nthdelete(N, [E|L], Ret) -> + nthdelete(N-1, L, [E|Ret]). + +floor(X) -> + T = erlang:trunc(X), + case (X - T) of + Neg when Neg < 0 -> T - 1; + Pos when Pos > 0 -> T; + _ -> T + end. + +ceiling(X) -> + T = erlang:trunc(X), + case (X - T) of + Neg when Neg < 0 -> T; + Pos when Pos > 0 -> T + 1; + _ -> T + end. + +succ([]) -> + []; + +succ(Str) -> + succ_int(lists:reverse(Str), []). + +succ_int([Char|Str], Acc) -> + if + Char >= $z -> succ_int(Str, [$a|Acc]); + true -> lists:reverse(lists:reverse([Char+1|Acc]) ++ Str) + end. + +fast_acc(_, Acc, 0) -> Acc; + +fast_acc(Fun, Acc, N) -> + fast_acc(Fun, Fun(Acc), N-1). + +shuffle(List) when is_list(List) -> + [ N || {_R,N} <- lists:keysort(1, [{random:uniform(),X} || X <- List]) ]. + +pmap(Fun, List, ReturnNum) -> + N = if + ReturnNum > length(List) -> length(List); + true -> ReturnNum + end, + SuperParent = self(), + SuperRef = erlang:make_ref(), + Ref = erlang:make_ref(), + %% we spawn an intermediary to collect the results + %% this is so that there will be no leaked messages sitting in our mailbox + Parent = spawn(fun() -> + L = gather(N, length(List), Ref, []), + SuperParent ! {SuperRef, pmap_sort(List, L)} + end), + Pids = [spawn(fun() -> + Parent ! {Ref, {Elem, (catch Fun(Elem))}} + end) || Elem <- List], + Ret = receive + {SuperRef, Ret1} -> Ret1 + end, + % i think we need to cleanup here. + lists:foreach(fun(P) -> exit(P, die) end, Pids), + Ret. + +pmap_sort(Original, Results) -> + pmap_sort([], Original, lists:reverse(Results)). + +% pmap_sort(Sorted, [], _) -> lists:reverse(Sorted); +pmap_sort(Sorted, _, []) -> lists:reverse(Sorted); +pmap_sort(Sorted, [E|Original], Results) -> + case lists:keytake(E, 1, Results) of + {value, {E, Val}, Rest} -> pmap_sort([Val|Sorted], Original, Rest); + false -> pmap_sort(Sorted, Original, Results) + end. + +gather(_, Max, _, L) when length(L) == Max -> L; +gather(0, _, _, L) -> L; +gather(N, Max, Ref, L) -> + receive + {Ref, {Elem, {not_found, Ret}}} -> gather(N, Max, Ref, [{Elem, {not_found, Ret}}|L]); + {Ref, {Elem, {badrpc, Ret}}} -> gather(N, Max, Ref, [{Elem, {badrpc, Ret}}|L]); + {Ref, {Elem, {'EXIT', Ret}}} -> gather(N, Max, Ref, [{Elem, {'EXIT', Ret}}|L]); + {Ref, Ret} -> gather(N-1, Max, Ref, [Ret|L]) + end. + +get_hash_module(#config{hash_module=HashModule}) -> + HashModule. + +hash(Term) -> + HashModule = get_hash_module(configuration:get_config()), + ?prof(hash), + R = HashModule:hash(Term), + ?forp(hash), + R. + +hash(Term, Seed) -> + HashModule = get_hash_module(configuration:get_config()), + ?prof(hash), + R = HashModule:hash(Term, Seed), + ?forp(hash), + R. + +%32 bit fnv. magic numbers ahoy +fnv(Term) when is_binary(Term) -> + fnv_int(?OFFSET_BASIS, 0, Term); + +fnv(Term) -> + fnv_int(?OFFSET_BASIS, 0, term_to_binary(Term)). + +fnv_int(Hash, ByteOffset, Bin) when erlang:byte_size(Bin) == ByteOffset -> + Hash; + +fnv_int(Hash, ByteOffset, Bin) -> + <<_:ByteOffset/binary, Octet:8, _/binary>> = Bin, + Xord = Hash bxor Octet, + fnv_int((Xord * ?FNV_PRIME) rem (2 bsl 31), ByteOffset+1, Bin). + +position(Predicate, List) when is_function(Predicate) -> + position(Predicate, List, 1); + +position(E, List) -> + position(E, List, 1). + +position(Predicate, [], _N) when is_function(Predicate) -> false; + +position(Predicate, [E|List], N) when is_function(Predicate) -> + case Predicate(E) of + true -> N; + false -> position(Predicate, List, N+1) + end; + +position(_, [], _) -> false; + +position(E, [E|_List], N) -> N; + +position(E, [_|List], N) -> position(E, List, N+1). + +now_int() -> + time_to_epoch_int(now()). + +now_float() -> + time_to_epoch_float(now()). + +time_to_epoch_int(Time) when is_integer(Time) or is_float(Time) -> + Time; + +time_to_epoch_int({Mega,Sec,_}) -> + Mega * 1000000 + Sec. + +time_to_epoch_float(Time) when is_integer(Time) or is_float(Time) -> + Time; + +time_to_epoch_float({Mega,Sec,Micro}) -> + Mega * 1000000 + Sec + Micro / 1000000. + +byte_size(List) when is_list(List) -> + lists:foldl(fun(El, Acc) -> Acc + lib_misc:byte_size(El) end, 0, List); + +byte_size(Term) -> + erlang:byte_size(Term). + +listify(List) when is_list(List) -> + List; + +listify(El) -> [El]. + +reverse_bits(V) when is_integer(V) -> + % swap odd and even bits + V1 = ((V bsr 1) band 16#55555555) bor (((V band 16#55555555) bsl 1) band 16#ffffffff), + % swap consecutive pairs + V2 = ((V1 bsr 2) band 16#33333333) bor (((V1 band 16#33333333) bsl 2) band 16#ffffffff), + % swap nibbles ... + V3 = ((V2 bsr 4) band 16#0F0F0F0F) bor (((V2 band 16#0F0F0F0F) bsl 4) band 16#ffffffff), + % swap bytes + V4 = ((V3 bsr 8) band 16#00FF00FF) bor (((V3 band 16#00FF00FF) bsl 8) band 16#ffffffff), + % swap 2-byte long pairs + ((V4 bsr 16) band 16#ffffffff) bor ((V4 bsl 16) band 16#ffffffff). diff --git a/src/mem_utils.erl b/src/mem_utils.erl new file mode 100644 index 00000000..ffefd5cb --- /dev/null +++ b/src/mem_utils.erl @@ -0,0 +1,129 @@ +-module(mem_utils). + +-export([fix_mappings/3, get_remote_fullmap/1, join_type/3, pmap_from_full/1, + nodeparts_up/1, remove_partition/3, use_persistent/2, + was_i_nodedown/2]). + +-include("../include/common.hrl"). + +join_type(Node, Fullmap, Options) -> + case proplists:get_value(replace, Options) of + undefined -> + case lists:filter(fun({N,_P,_T}) -> N =:= Node end, Fullmap) of + [] -> new; + _ -> rejoin + end; + OldNode when is_atom(OldNode) -> + % not a particularly strong guard, but will have to do + {replace, OldNode}; + _ -> new + end. + + +%% @doc return a {PMap, Fullmap} tuple that has corrections for +%% down, rejoining, or replacing Node +fix_mappings(nodedown, Node, OldFullmap) -> + fix_mappings_fold(fun({N,P,T}, AccIn) -> + case {N,T} of + {Node, {nodedown, Type}} -> + % already marked as nodedown, so leave it + [{N,P, {nodedown, Type}} | AccIn]; + {Node, _} -> + % mark it as nodedown + [{N,P, {nodedown, T}} | AccIn]; + _ -> [{N,P,T} | AccIn] + end + end, [], OldFullmap); + +fix_mappings(rejoin, Node, OldFullmap) -> + fix_mappings_fold(fun({N,P,{nodedown,T}}, AccIn) when N =:= Node -> + [{N,P,T} | AccIn]; + (NPT, AccIn) -> [NPT | AccIn] + end, [], OldFullmap); + +fix_mappings(replace, {OldNode, NewNode}, OldFullmap) -> + fix_mappings_fold(fun({N,P,T}, AccIn) -> + case {N, T} of + {OldNode, {nodedown,T1}} -> [{NewNode,P,T1} | AccIn]; + {OldNode, _} -> [{NewNode,P,T} | AccIn]; + _ -> [{N,P,T} | AccIn] + end + end, [], OldFullmap). + + +fix_mappings_fold(Fun, Acc0, OldFullmap) -> + NewFullmap = lists:foldl(Fun, Acc0, OldFullmap), + NewPMap = pmap_from_full(NewFullmap), + {NewPMap, NewFullmap}. + + +%% @doc create a PMap (primary nodes only) from provided Fullmap +%% If a primary node is down, a partner will be supplied +pmap_from_full(Fullmap) -> + NodePartList = nodeparts_up(Fullmap), + lists:keysort(2,lists:foldl(fun({N,P,T}, AccIn) -> + case T of + primary -> [{N,P} | AccIn]; + {nodedown, primary} -> + NewNode = case lists:delete(N, + membership2:nodes_for_part(P, NodePartList)) of + [First|_] -> First; + [] -> N % wtf, are all partners down too? + end, + [{NewNode,P} | AccIn]; + _ -> AccIn + end + end, [], Fullmap)). + + +nodeparts_up(Fullmap) -> + lists:foldl(fun({_N,_P,{nodedown,_}}, AccIn) -> AccIn; + ({N,P,_T}, AccIn) -> [{N,P} | AccIn] + end, [], Fullmap). + + + +%% @doc if Node is in the Fullmap as {nodedown,_} return true +was_i_nodedown(Node, Fullmap) -> + lists:member(yes, lists:map(fun({N,_P,{nodedown,_T}}) -> + case N of + Node -> yes; + _ -> no + end; + (_) -> no + end, Fullmap)). + + +remove_partition(FullMap, Node, Partition) -> + case lists:filter( + fun({N,P,_Type}) -> N =:= Node andalso P =:= Partition end, + FullMap) of + [Elem|_] -> + lists:delete(Elem, FullMap); + Other -> + ?LOG_ERROR("~nNo partition to remove: ~p~n" + "Node: ~p~nPartition: ~p~n", [Other, Node, Partition]), + FullMap + end. + + +use_persistent(_PartnersPlus, undefined) -> + false; + +use_persistent(PartnersPlus, _PersistentParts) -> + % get a fullmap from a partner + % this may need rework for network partitions, as you could get a bad + % fullmap from another node that was partitioned w/ this one :\ + RemoteFullmap = get_remote_fullmap(PartnersPlus), + % return opposite of was_i_nodedown + not mem_utils:was_i_nodedown(node(), RemoteFullmap). + + +get_remote_fullmap([]) -> + []; % no remote fullmap available, so return empty list + +get_remote_fullmap([Node|Rest]) -> + case gen_server:call({membership, Node}, fullmap) of + {ok, Fullmap} -> Fullmap; + _ -> get_remote_fullmap(Rest) + end. diff --git a/src/membership2.erl b/src/membership2.erl new file mode 100644 index 00000000..4c4780c3 --- /dev/null +++ b/src/membership2.erl @@ -0,0 +1,686 @@ +%%%------------------------------------------------------------------- +%%% File: membership2.erl +%%% @author Cliff Moon <cliff@powerset.com> [] +%%% @copyright 2009 Cliff Moon +%%% @doc +%%% +%%% @end +%%% +%%% @since 2009-05-04 by Cliff Moon +%%%------------------------------------------------------------------- +-module(membership2). +-author('cliff@powerset.com'). +-author('brad@cloudant.com'). + +-behaviour(gen_server). + +%% API +-export([start_link/2, start_link/3, stop/1, check_nodes/0, + partitions/0, partition_for_key/1, fullmap/0, + all_nodes_parts/1, clock/0, + nodes/0, nodeparts_for_key/1, nodes_for_part/1, nodes_for_part/2, + nodes_for_shard/1, nodes_down/0, + parts_for_node/1, + take_offline/2, bring_online/2, + decommission_part/3, pp_fullmap/0, snafu/1, snafu/3]). + + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +%% includes +-include("../include/config.hrl"). +-include("../include/common.hrl"). +-include("../include/profile.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +%%==================================================================== +%% API +%%==================================================================== +%% @doc Starts the server +%% @end +%%-------------------------------------------------------------------- + +start_link(Node, Nodes) -> + start_link(Node, Nodes, []). + + +start_link(Node, Nodes, Args) -> + gen_server:start_link({local, membership}, ?MODULE, [Node, Nodes, Args], []). + + +stop(Server) -> + gen_server:cast(Server, stop). + + +%% @doc for when things have really gone south. Install a new state on all +%% nodes, given a filename, or node list, partition map, and fullmap. +%% @end +snafu(Filename) -> + NewState = case file:consult(Filename) of + {ok, [Terms]} -> + Terms; + Error -> + throw(Error) + end, + #membership{nodes=Nodes, partitions=PMap, fullmap=Fullmap} = NewState, + snafu(Nodes, PMap, Fullmap). + + +snafu(Nodes, PMap, Fullmap) -> + NewState = #membership{node=node(), nodes=Nodes, + partitions=PMap, fullmap=Fullmap, version=vector_clock:create(dbcore)}, + update_ets(ets_name(node()), NewState), + fire_gossip(node(), Nodes, NewState), + save(NewState). + + +check_nodes() -> + ErlangNodes = lists:usort([node() | erlang:nodes()]), + {ok, MemNodeList} = membership2:nodes(), + MemNodes = lists:usort(MemNodeList), + {PMapNodeList, _PMapPartList} = lists:unzip(partitions()), + PMapNodes = lists:usort(PMapNodeList), + case ErlangNodes =:= MemNodes andalso + ErlangNodes =:= PMapNodes andalso + MemNodes =:= PMapNodes of + true -> true; + _ -> + Msg = "membership: Node Lists do not match.~n" + "Erlang Nodes : ~p~n" + "Membership Nodes : ~p~n" + "PMap Nodes : ~p~n", + Lst = [ErlangNodes, MemNodes, PMapNodes], + showroom_log:message(error, Msg, Lst), + io:format(Msg, Lst), + false + end. + + +%% @doc retrieve the primary partition map. This is a list of partitions and +%% their corresponding primary node, no replication partner nodes. +partitions() -> + ets_pmap(). + + +%% @doc retrieve the full partition map, like above, but including replication +%% partner nodes. List should number 2^Q * N +fullmap() -> + lists:keysort(2, ets_fullmap()). + + +%% @doc pretty-print the full partition map (sorted by node, then part) +pp_fullmap() -> + lists:foreach( + fun({N,P}) -> + io:format("~-60s ~s~n", [N, showroom_utils:int_to_hexstr(P)]) + end, + lists:sort(membership2:all_nodes_parts(true))). + + +%% @doc get the current vector clock from membership state +clock() -> + gen_server:call(membership, clock). + + +%% @doc get the list of cluster nodes (according to membership module) +%% This may differ from erlang:nodes() +nodes() -> + gen_server:call(membership, nodes). + + +%% @doc get all the responsible nodes for a given partition, including +%% replication partner nodes +nodes_for_part(Part) -> + nodes_for_part(Part, all_nodes_parts(true)). + + +nodes_for_part(Part, NodePartList) -> + Filtered = lists:filter(fun({_N, P}) -> P =:= Part end, NodePartList), + {Nodes, _Parts} = lists:unzip(Filtered), + lists:usort(Nodes). + + +nodes_for_shard(ShardName) when is_binary(ShardName) -> + nodes_for_shard(binary_to_list(ShardName)); + +nodes_for_shard(ShardName) when is_list(ShardName) -> + HexPart = case string:rchr(ShardName, $_) + 1 of + 1 -> ShardName; + Last -> string:substr(ShardName, Last) + end, + Int = showroom_utils:hexstr_to_int(HexPart), + {_, Parts} = lists:unzip(membership2:partitions()), + nodes_for_part(partitions:int_to_partition(Int, Parts)). + + +%% @doc get all the responsible nodes and partitions for a given key, including +%% nodes/parts on replication partner nodes +nodeparts_for_key(Key) -> + int_node_parts_for_key(Key). + + +%% @doc get a list of all the nodes marked down in this node's fullmap +nodes_down() -> + Downs = lists:foldl(fun({N,_P,{nodedown, _T}}, AccIn) -> [N|AccIn]; + (_, AccIn) -> AccIn end, [], fullmap()), + lists:usort(Downs). + + +%% @doc return the partition responsible for the given Key +partition_for_key(Key) -> + Config = configuration:get_config(), + Hash = lib_misc:hash(Key), + partitions:hash_to_partition(Hash, Config#config.q). + + +%% @doc return the partitions that reside on a given node +parts_for_node(Node) -> + lists:sort(lists:foldl(fun({N,P,_Type}, AccIn) -> + case N of + Node -> [P | AccIn]; + _ -> AccIn + end + end, [], fullmap())). + + +%% @doc get all the nodes and partitions in the cluster. Depending on the +%% AllPartners param, you get only primary nodes or replication partner +%% nodes, as well. +%% No nodes/parts currently down are returned. +all_nodes_parts(false) -> + ets_pmap(); +all_nodes_parts(true) -> + mem_utils:nodeparts_up(ets_fullmap()). + + +%% @doc If a local storage server exists for this partition it will be taken +%% out of rotation until put back in. +%% @end +take_offline(Node, Partition) when Node =:= node() -> + gen_server:call(membership, {take_offline, Partition}); + +take_offline(Node, Partition)-> + gen_server:call({membership, Node}, {take_offline, Partition}). + + +%% @doc Brings a storage server that has been taken offline back online. +%% @end +bring_online(Node, Partition) -> + showroom_log:message(debug, "membership: bring_online Node: ~p Partition: ~p", + [Node, Partition]), + gen_server:call({membership, Node}, {bring_online, Partition}). + + +%% @doc cleans up the remaining .couch shard/partition file after it has been +%% moved to a new node. +decommission_part(Node, Part, DbName) -> + gen_server:cast({membership, Node}, {decommission, Part, DbName}). + + +%%==================================================================== +%% gen_server callbacks +%%==================================================================== + +%%-------------------------------------------------------------------- +%% @spec init(Args) -> {ok, State} | +%% {ok, State, Timeout} | +%% ignore | +%% {stop, Reason} +%% @doc Initiates the server +%% @end +%%-------------------------------------------------------------------- +init([Node, Nodes, Args]) -> + process_flag(trap_exit,true), + showroom_log:message(info, "membership: membership server starting...", []), + Options = lists:flatten(Args), + showroom_log:message(info, "membership: options ~p", [Options]), + net_kernel:monitor_nodes(true), + Config = configuration:get_config(), + PersistentState=#membership{partitions=PersistentParts} = load(Node), + PartnersPlus = replication:partners_plus(Node, Nodes), + State = + case mem_utils:use_persistent(PartnersPlus, PersistentParts) of + false -> + showroom_log:message(info, "membership: not using persisted state", []), + % didn't find persistent state on disk or this node was nodedown + % so we don't want to use persisted state + PartialNodes = lists:usort(Nodes), + {NewVersion, RemoteNodes, NewPMap1, NewFullMap1} = + join_to(Node, PartnersPlus, Options), + NewWorldNodes = lists:usort(PartialNodes ++ RemoteNodes), + NewPMap = case NewPMap1 of + [] -> partitions:create_partitions(Config#config.q, Node, + NewWorldNodes); + _ -> NewPMap1 + end, + NewFullMap = case NewFullMap1 of + [] -> make_all_nodes_parts(NewPMap); + _ -> NewFullMap1 + end, + #membership{ + node=Node, + nodes=NewWorldNodes, + partitions=lists:keysort(2,NewPMap), + % version=vector_clock:increment(dbcore, NewVersion), + version=NewVersion, + fullmap=NewFullMap}; + _ -> + % found persistent state on disk + showroom_log:message(info, "membership: using persisted state", []), + case Options of + [] -> ok; + _ -> + showroom_log:message(info, "membership: options ~p ignored.", [Options]) + end, + %% fire gossip even if state comes from disk + fire_gossip(Node, Nodes, PersistentState), + PersistentState + end, + save(State), + % ets table is an optimization for cluster_ops performance + Ets = ets:new(ets_name(Node), [public, set, named_table]), + update_ets(Ets, State), + {ok, State}. + + +%%-------------------------------------------------------------------- +%% @spec +%% handle_call(Request, From, State) -> {reply, Reply, State} | +%% {reply, Reply, State, Timeout} | +%% {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, Reply, State} | +%% {stop, Reason, State} +%% @doc Handling call messages +%% @end +%%-------------------------------------------------------------------- + +%% join +handle_call({join, JoiningNode, Options}, _From, + State = #membership{version=Version, node=Node, nodes=Nodes, + partitions=Partitions, fullmap=OldFullMap}) -> + JoinType = mem_utils:join_type(JoiningNode, OldFullMap, Options), + showroom_log:message(alert, "membership: node ~p wants to join, type '~p'", + [JoiningNode, JoinType]), + {PMap, NewFullmap} = case JoinType of + rejoin -> + mem_utils:fix_mappings(rejoin, JoiningNode, OldFullMap); + {replace, OldNode} -> + mem_utils:fix_mappings(replace, {OldNode, JoiningNode}, OldFullMap); + new -> + Hints = proplists:get_value(hints, Options), + PMap1 = case partitions:join(JoiningNode, Partitions, Hints) of + {ok, Table} -> Table; + {error, Error, _Table} -> throw({join_error, Error}) + end, + Fullmap1 = make_all_nodes_parts(PMap1), + {PMap1, Fullmap1} + end, + WorldNodes = lists:usort(Nodes ++ [JoiningNode]), + NewVersion = vector_clock:increment(dbcore, Version), + NewState1 = State#membership{nodes=WorldNodes, partitions=PMap, + version=NewVersion}, + {Fullmap, NewState2} = case proplists:get_value(bootstrap, Options) of + true -> + % join not complete until bootstrap finishes, + % so this NewState isn't the final (i.e. NewState1 will be installed) + showroom_log:message(info, "membership: bootstrap process starting", []), + bootstrap_manager:start_bootstrap(NewState1, OldFullMap, NewFullmap); + _ -> + % no bootstrap, so install NewFullmap now + showroom_log:message(info, "membership: no bootstrap", []), + {NewFullmap, NewState1#membership{fullmap=NewFullmap}} + end, + save(NewState2), + update_ets(ets_name(node()), NewState2), + notify(node_join, [JoiningNode]), + fire_gossip(Node, WorldNodes, NewState2), + % If we're bootstrapping, then the join is not complete. + % So return FullMap for now. bootstrap_manager:end_bootstrap will fix it + {reply, {ok, NewVersion, WorldNodes, PMap, Fullmap}, NewState2}; + +%% clock +handle_call(clock, _From, State = #membership{version=Version}) -> + {reply, Version, State}; + +%% state +handle_call(state, _From, State) -> + {reply, State, State}; + +%% newfullmap +handle_call({newfullmap, NewFullMap}, _From, + State = #membership{node=Node, nodes=Nodes, version=Version}) -> + NewVersion = vector_clock:increment(dbcore, Version), + NewState = State#membership{version=NewVersion, fullmap=NewFullMap}, + save(NewState), + update_ets(ets_name(node()), NewState), + fire_gossip(Node, Nodes, NewState), + {reply, installed, NewState}; + +%% partitions +handle_call(partitions, _From, State = #membership{partitions=Parts}) -> + {reply, {ok, Parts}, State}; + +%% fullmap +handle_call(fullmap, _From, State = #membership{fullmap=FullMap}) -> + {reply, {ok, FullMap}, State}; + +%% nodes +handle_call(nodes, _From, State = #membership{nodes=Nodes}) -> + {reply, {ok, Nodes}, State}; + +%% take_offline +handle_call({take_offline, Partition}, _From, + State = #membership{node=Node, nodes=Nodes, fullmap=OldFullMap}) -> + showroom_log:message(info, "membership: take_offline Node: ~p Partition: ~p", + [Node, Partition]), + NewFullMap = mem_utils:remove_partition(OldFullMap, Node, Partition), + NewState = State#membership{fullmap=NewFullMap}, + fire_gossip(Node, Nodes, NewState), + update_ets(ets_name(node()), NewState), + {reply, {offline, Node, Partition}, NewState}; + +%% at least reply that this 'catch-all' was ignored +handle_call(_Request, _From, State) -> + {reply, ignored, State}. + + +%%-------------------------------------------------------------------- +%% @spec handle_cast(Msg, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% @doc Handling cast messages +%% @end +%%-------------------------------------------------------------------- + +handle_cast({gossip, RemoteState = #membership{node=RemoteNode}}, + LocalState = #membership{node=_Me}) -> + showroom_log:message(info, "membership: received gossip from ~p", + [RemoteNode]), + {MergeType, MergedState = #membership{nodes=_MergedNodes}} = + merge_state(RemoteState, LocalState), + case MergeType of + equal -> {noreply, MergedState}; + merged -> + showroom_log:message(info, "membership: merged new gossip", []), + % fire_gossip(Me, MergedNodes, MergedState), + update_ets(ets_name(node()), MergedState), + save(MergedState), + {noreply, MergedState} + end; + +% decommission +% renaming for now, until case 1245 can be completed +handle_cast({decommission, Part, DbName}, State) -> + {{Y,Mon,D}, {H,Min,S}} = calendar:universal_time(), + Directory = couch_config:get("couchdb", "database_dir"), + OrigFilename = showroom_utils:full_filename(Part, DbName, Directory), + Moved = lists:flatten(io_lib:format(".~w~2.10.0B~2.10.0B." ++ + "~2.10.0B~2.10.0B~2.10.0B.moved.couch", [Y,Mon,D,H,Min,S])), + % Note: this MovedFilename bit below gives weird results: + % ["/Users/brad/dev/erlang/dbcore/tmp/lib/x800000/test_800000", + % ".20091001.162640.moved.couch"] but list/string behavior handles it. + MovedFilename = lists:map(fun(E) -> binary_to_list(E) end, + re:replace(OrigFilename, "\.couch", Moved, [])), + ok = file:rename(OrigFilename, MovedFilename), + {noreply, State}. + + +%% @doc handle nodedown messages because we have +%% net_kernel:monitor_nodes(true) +handle_info({nodedown, Node}, + State = #membership{nodes=OldNodes, fullmap=OldFullmap, + version=OldVersion}) -> + showroom_log:message(alert, "membership: nodedown from ~p", [Node]), + case lists:member(Node, OldNodes) of + true -> + notify(nodedown, [Node]), + % clean up membership state + Nodes = lists:delete(Node, OldNodes), + {PMap, Fullmap} = mem_utils:fix_mappings(nodedown, Node, OldFullmap), + % Do we increment clock here? w/o gossip? + % This is happening near simultaneously on the other nodes, too :\ + % Only reason to increment is persisted clock on down node will be older + % when it returns + Version = vector_clock:increment(dbcore, OldVersion), + NewState = State#membership{nodes=Nodes, partitions=PMap, fullmap=Fullmap, + version=Version}, + update_ets(ets_name(node()), NewState), + save(NewState), + {noreply, NewState}; + _ -> {noreply, State} + end; + +%% @doc handle nodeup messages because we have +%% net_kernel:monitor_nodes(true) +handle_info({nodeup, Node}, State) -> + showroom_log:message(alert, "membership: nodeup Node: ~p", [Node]), + {noreply, State}; + +handle_info(Info, State) -> + showroom_log:message(info, "membership: handle_info Info: ~p", [Info]), + {noreply, State}. + +%%-------------------------------------------------------------------- +%% @spec terminate(Reason, State) -> void() +%% @doc This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any necessary +%% cleaning up. When it returns, the gen_server terminates with Reason. +%% The return value is ignored. +%% @end +%%-------------------------------------------------------------------- +terminate(_Reason, _State) -> + ok. + +%% 0.5.6 to 0.5.7 +code_change(184380560337424323902805568963460261434, State, _Extra) -> + backup_old_config_file(), + % update State to the new version + {membership, _Hdr, Node, Nodes, PMap, Version} = State, + NewState = #membership{ + node = Node, + nodes = Nodes, + partitions = PMap, + version = Version, + fullmap = make_all_nodes_parts(PMap) + }, + save(NewState), + % also create new ets table + Ets = ets:new(ets_name(Node), [public, set, named_table]), + update_ets(Ets, NewState), + {ok, NewState}; + +%% 0.8.8 to 0.9.0 +code_change(239470595681156900105628017899543243419, State, _Extra) -> + net_kernel:monitor_nodes(true), + {ok, State}; + +code_change(OldVsn, State, _Extra) -> + io:format("Unknown Old Version!~nOldVsn: ~p~nState : ~p~n", [OldVsn, State]), + {ok, State}. + +%%-------------------------------------------------------------------- +%%% Internal functions +%%-------------------------------------------------------------------- + +backup_old_config_file() -> + Config = configuration:get_config(), + FileName = filename:join([Config#config.directory, + lists:concat([node:name(node()), ".state"])]), + BackupName = filename:join([Config#config.directory, + lists:concat([node:name(node()), ".state.bak"])]), + file:copy(FileName, BackupName). + + +%% return State from membership file +load(Node) -> + Config = configuration:get_config(), + case file:consult(filename:join([Config#config.directory, + lists:concat([node:name(Node), ".state"])])) of + {error, Reason} -> + showroom_log:message(info, "membership: could not load state: ~p~n", + [Reason]), + #membership{nodes=[]}; + {ok, [Terms]} -> + Terms + end. + + +%% save the State to a file +save(State) -> + Config = configuration:get_config(), + Filename = filename:join([Config#config.directory, + lists:concat([node:name(State#membership.node), ".state"])]), + {ok, File} = file:open(Filename, [binary, write]), + io:format(File, "~w.~n", [State]), + file:close(File). + + +%% joining is bi-directional, as opposed to gossip which is unidirectional +%% we want to collect the list of known nodes to compute the partition map +%% which isn't necessarily the same as the list of running nodes +join_to(Node, Partners, Options) -> + join_to(Node, Partners, + {vector_clock:create(dbcore), [], [], []}, Options). + + +%% @doc join this node to one of its partners (or PartnersPlus if no partners +%% are available). +join_to(_, [], {Version, World, PMap, FullMap}, _Options) -> + {Version, World, PMap, FullMap}; + +join_to(Node, [Partner|Rest], {Version, World, PMap, FullMap}, Options) -> + case call_join(Partner, Node, Options) of + {ok, RemoteVersion, NewNodes, NewPMap, NewFullMap} -> + {vector_clock:merge(Version, RemoteVersion), + lists:usort(World ++ NewNodes), + NewPMap, + NewFullMap}; + Other -> + showroom_log:message(info, "membership: join_to Other: ~p~n", [Other]), + join_to(Node, Rest, {Version, World, PMap, FullMap}, Options) + end. + + +%% @doc make the join call to Remote node (usually a partner of Node) +call_join(Remote, Node, Options) -> + showroom_log:message(info, "membership: call_join From: ~p To: ~p", + [Node, Remote]), + catch gen_server:call({membership, node:name(Remote)}, + {join, Node, Options}). + + +merge_state(_RemoteState=#membership{version=RemoteVersion, nodes=RemoteNodes, + partitions=RemotePMap, + fullmap=RemoteFullMap}, + LocalState=#membership{version=LocalVersion, nodes=LocalNodes, + partitions=LocalPMap, + fullmap=LocalFullMap}) -> + case vector_clock:equals(RemoteVersion, LocalVersion) of + true -> + {equal, LocalState}; + false -> + % Note, we're matching MergedVersion from these funs. + % They should be the same. + {MergedVersion, MergedNodes} = + merge_nodes(RemoteVersion, RemoteNodes, LocalVersion, LocalNodes), + {MergedVersion, MergedPMap} = + merge_pmaps(RemoteVersion, RemotePMap, LocalVersion, LocalPMap), + {MergedVersion, MergedFullMap} = + merge_fullmaps(RemoteVersion, RemoteFullMap, + LocalVersion, LocalFullMap), + + % notify of arrivals & departures + Arrived = MergedNodes -- LocalNodes, + notify(node_join, Arrived), + % Departed = LocalNodes -- MergedNodes, + % notify(node_leave, Departed), + + {merged, LocalState#membership{version=MergedVersion, nodes=MergedNodes, + partitions=MergedPMap, + fullmap=MergedFullMap}} + end. + + +merge_nodes(RemoteVersion, RemoteNodes, LocalVersion, LocalNodes) -> + {MergedVersion, Merged} = vector_clock:resolve({RemoteVersion, RemoteNodes}, + {LocalVersion, LocalNodes}), + {MergedVersion, lists:usort(Merged)}. + + +merge_pmaps(RemoteVersion, RemotePMap, LocalVersion, LocalPMap) -> + {MergedVersion, Merged} = vector_clock:resolve({RemoteVersion, RemotePMap}, + {LocalVersion, LocalPMap}), + {MergedVersion, lists:ukeysort(2, Merged)}. + + +merge_fullmaps(RemoteVersion, RemoteFullMap, LocalVersion, LocalFullMap) -> + {MergedVersion, Merged} = vector_clock:resolve({RemoteVersion, RemoteFullMap}, + {LocalVersion, LocalFullMap}), + {MergedVersion, lists:usort(Merged)}. + + +notify(Type, Nodes) -> + lists:foreach(fun(Node) -> + gen_event:notify(membership_events, {Type, Node}) + end, Nodes). + + +%% @doc fires a gossip message (membership state) to partners nodes in the +%% cluster. +%% @end +fire_gossip(Me, WorldNodes, Gossip) -> + % GossipPartners = partners_plus(Me, WorldNodes), + % random experiment, gossip with all nodes, not just partners_plus + GossipPartners = lists:delete(Me, WorldNodes), + lists:foreach(fun(TargetNode) -> + showroom_log:message(info, "membership: firing gossip from ~p to ~p", + [Me, TargetNode]), + gen_server:cast({membership, TargetNode}, {gossip, Gossip}) + end, GossipPartners). + + +%% @doc construct a table with all partitions, with the primary node and all +%% replication partner nodes as well. +make_all_nodes_parts(PMap) -> + {Nodes, _Parts} = lists:unzip(PMap), + NodeParts = lists:flatmap( + fun({Node,Part}) -> + Partners = replication:partners(Node, lists:usort(Nodes)), + PartnerList = [{Partner, Part, partner} || Partner <- Partners], + [{Node, Part, primary} | PartnerList] + end, PMap), + NodeParts. + + +%% @doc for the given key, return a list of {Node,Part} tuples. Nodes are both +%% primary and replication partner nodes, and should number N. +int_node_parts_for_key(Key) -> + Config = configuration:get_config(), + Hash = lib_misc:hash(Key), + Part = partitions:hash_to_partition(Hash, Config#config.q), + NodePartList = all_nodes_parts(true), + lists:filter(fun({_N,P}) -> P =:= Part end, NodePartList). + + +%% ets table helper functions +ets_name(Node) -> + list_to_atom(lists:concat(["mem_", atom_to_list(Node)])). + + +update_ets(Table, #membership{partitions=PMap, fullmap=FullMap}) -> + ets:insert(Table, {pmap, PMap}), + ets:insert(Table, {fullmap, FullMap}), + ok. + + +ets_pmap() -> + [{pmap, PMap}] = ets:lookup(ets_name(node()), pmap), + PMap. + + +ets_fullmap() -> + [{fullmap, FullMap}] = ets:lookup(ets_name(node()), fullmap), + FullMap. diff --git a/src/node.erl b/src/node.erl new file mode 100644 index 00000000..9a9c82c1 --- /dev/null +++ b/src/node.erl @@ -0,0 +1,39 @@ +%%%------------------------------------------------------------------- +%%% File: node.erl +%%% @author Cliff Moon <> [] +%%% @copyright 2009 Cliff Moon +%%% @doc +%%% +%%% @end +%%% +%%% @since 2009-05-11 by Cliff Moon +%%%------------------------------------------------------------------- +-module(node). +-author('cliff@powerset.com'). + +%% API +-export([name/1, attributes/1]). + +-include("../include/common.hrl"). + +%% -ifdef(TEST). +%% -include("../etest/node_test.erl"). +%% -endif. + +%%==================================================================== +%% API +%%==================================================================== + +name(Name) when is_atom(Name) -> + Name; +name(Node) when is_tuple(Node) -> + element(1, Node); +name(Node) -> + Node. + +attributes(Name) when is_atom(Name) -> + []; +attributes(Node) when is_tuple(Node) -> + element(2, Node); +attributes(_) -> + []. diff --git a/src/partitions.erl b/src/partitions.erl new file mode 100644 index 00000000..942968e1 --- /dev/null +++ b/src/partitions.erl @@ -0,0 +1,334 @@ +%%%------------------------------------------------------------------- +%%% File: partitions.erl +%%% @author Cliff Moon <cliff@powerset.com> [http://www.powerset.com/] +%%% @copyright 2008 Cliff Moon +%%% @doc +%%% +%%% @end +%%% +%%% @since 2008-10-12 by Cliff Moon +%%%------------------------------------------------------------------- +-module(partitions). +-author('cliff@powerset.com'). + +%% API +-export([partition_range/1, create_partitions/3, map_partitions/2, + diff/2, pp_diff/1, int_to_partition/2, + join/3, leave/3, hash/1, hash_to_partition/2, item_to_nodepart/1, + shard_name/2, hash_to_hex/2]). + +-define(RINGTOP, trunc(math:pow(2,160)-1)). % SHA-1 space + +-include("../../couch/src/couch_db.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +%% -ifdef(TEST). +%% -include("etest/partitions_test.erl"). +%% -endif. + +%%==================================================================== +%% API +%%==================================================================== + +partition_range(Q) -> + trunc( ?RINGTOP / math:pow(2,Q) ). % SHA-1 space / 2^Q + +create_partitions(Q, Node, _Nodes) -> + fresh(trunc(math:pow(2,Q)), Node). + % map_partitions(Table, Nodes). + + +%% @spec map_partitions(Table::proplist(),Nodes::list()) -> proplist() +%% @doc maps partitions to nodes. The resulting list should be Dynomite format, +%% namely {Node,Part} +%% @end +map_partitions(Table, Nodes) -> + {_Nodes, Parts} = lists:unzip(Table), + do_map(Nodes, Parts). + + +%% @doc in case Hints is undefined, turn it into a list for clauses below. +join(Node, Table, undefined) -> + join(Node, Table, []); + +%% @spec join(node(), proplist(), list()) -> {ok, PartTable::proplist()} | +%% {error, Error} +%% @doc given a node, current partition table, and hints, this function returns +%% the new partition table +join(Node, Table, Hints) -> + {NodeList, Parts} = lists:unzip(Table), + OtherNodes = lists:delete(Node, NodeList), + OtherDistinctNodes = lists:usort(OtherNodes), + %% quick check to see if we have more nodes than partitions + if + length(Parts) == length(OtherDistinctNodes) -> + {error, "Too many nodes vs partitions", Table}; + true -> + AlreadyPresent = length(NodeList) - length(OtherNodes), + Nodes = lists:usort(NodeList), + PartCountToTake = trunc(length(Parts) / (length(Nodes) + 1)), + %% calcs done, let's steal some partitions + {HintsTaken, NewTable} = steal_hints(Node, Table, Hints), + if + PartCountToTake - AlreadyPresent - HintsTaken > 0 -> + steal_partitions(Node, OtherDistinctNodes, NewTable, + PartCountToTake - AlreadyPresent - HintsTaken); + true -> + %% no partitions to take + {ok, NewTable} + end + end. + + +%% TODO: implement me +leave(_Node, Table, _Hints) -> + Table. + + +diff(From, To) when length(From) =/= length(To) -> + {error, badlength, "Cannot diff partition maps with different length"}; + +diff(From, To) -> + diff(sort_for_diff(From), sort_for_diff(To), []). + + +pp_diff(Diff) -> + lists:map( + fun({F,T,Part}) -> {F,T,showroom_utils:int_to_hexstr(Part)} end, + Diff). + + +%% @spec hash(term()) -> Digest::binary() +%% @doc Showroom uses SHA-1 as its hash +hash(Item) -> + crypto:sha(term_to_binary(Item)). + + +%% @spec hash_to_partition(binary(), integer()) -> integer() +%% @doc given a hashed value and Q, return the partition +hash_to_partition(Hash, Q) -> + HashInt = hash_int(Hash), + Size = partition_range(Q), + Factor = (HashInt div Size), + Rem = (HashInt rem Size), + if + Rem > 0 -> Factor * Size; + true -> ((Factor-1) * Size) + end. + + +hash_to_hex(Hash, Q) -> + Part = hash_to_partition(Hash, Q), + showroom_utils:int_to_hexstr(Part). + + +%% @doc given an int and a list of partitions, get the first part greater +%% than Int. Used for a hex part being turned back into an int. +int_to_partition(Int, Parts) -> + Rem = lists:dropwhile(fun(E) -> E < Int end, lists:sort(Parts)), + case Rem of + [] -> 0; % wrap-around-ring case (back to 0) + [H|_T] -> H + end. + + +%% @spec item_to_nodepart(bin()) -> {Node::node(),Part::integer()} +%% @doc given a raw item, return the node/partition/shard +%% name based on consistent hashing +item_to_nodepart(Item) when is_binary(Item) -> + Q = list_to_integer(couch_config:get("cluster","q")), + Hash = hash(?b2l(Item)), + Part = hash_to_partition(Hash, Q), + {ok, Table} = membership2:partitions(), + lists:keyfind(Part, 2, Table); + +item_to_nodepart(Item) -> + item_to_nodepart(term_to_binary(Item)). + + +%% @spec shard_name(integer(), binary()) -> binary() +%% @doc create shard name +shard_name(Part, DbName) -> + PartHex = ?l2b(showroom_utils:int_to_hexstr(Part)), + <<"x", PartHex/binary, "/", DbName/binary, "_", PartHex/binary>>. + +%%==================================================================== +%% Internal functions +%%==================================================================== + +%% @doc Create a brand new table. The size and seednode are specified; +%% initially all partitions are owned by the seednode. If NumPartitions +%% is not much larger than the intended eventual number of +%% participating nodes, then performance will suffer. +%% from http://code.google.com/p/distributerl (trunk revision 4) chash:fresh/2 +%% @spec fresh(NumPartitions :: integer(), SeedNode :: node()) -> table() +fresh(NumPartitions, SeedNode) -> + Increment = ?RINGTOP div NumPartitions, + [{SeedNode, IndexAsInt} || IndexAsInt <- lists:seq(0,(?RINGTOP-1),Increment)]. + + +%% @spec steal_hints(node(), proplist(), list( integer() )) -> +%% {integer(), proplist()} +%% @doc move the partitions listed in Hints over to the new owner, Node +steal_hints(Node, Table, Hints) -> + steal_hints(Node, Table, Hints, 0). + + +%% @doc recursive workhorse for hints mechanism, Acc is tracking how many +%% hints/partitions were successfully moved to a new Node. +%% @end +steal_hints(_Node, Table, [], Acc) -> + {Acc, Table}; + +steal_hints(Node, Table, [Hint|RestHints], Acc) -> + {Status, NewTable} = swap_node_for_part(Node, Hint, Table), + Acc1 = case Status of + ok -> Acc+1; + _ -> Acc + end, + steal_hints(Node, NewTable, RestHints, Acc1). + + +%% @doc take a part from one of the other nodes based on most # of parts per +%% node. +%% @end +%% TODO: This fun does list ops on the Table each time through. Inefficient? +%% Hopefully not, due to small Table sizes +steal_partitions(_Node, _OtherNodes, Table, 0) -> + {ok, Table}; +steal_partitions(Node, OtherNodes, Table, Count) -> + %% first, get a list of OtherNodes and their partition counts + NPCountFun = fun(N) -> + L = proplists:get_all_values(N, Table), + {N, length(lists:delete(undefined, L))} + end, + NPCounts = lists:reverse(lists:keysort(2,lists:map(NPCountFun, OtherNodes))), + %% grab the node that has the most partitions + [{TakeFrom, _PartsCount}|_RestOfTable] = NPCounts, + %% get the highest # partition of the TakeFrom node + TakeFromParts = lists:reverse(lists:sort(proplists:get_all_values(TakeFrom, + Table))), + [Part|_RestOfParts] = TakeFromParts, + {ok, NewTable} = swap_node_for_part(Node, Part, Table), + steal_partitions(Node, OtherNodes, NewTable, Count-1). + + +%% @doc Make Node the owner of the partition beginning at Part. +%% from http://code.google.com/p/distributerl (trunk revision 4) chash:update/3 +swap_node_for_part(Node, Part, Table) -> + case lists:keymember(Part, 2, Table) of + true -> + GapList = [{N,P} || {N,P} <- Table, P /= Part], + {A, B} = lists:partition(fun({_,K1}) -> K1 < Part end, GapList), + {ok, A ++ [{Node, Part}] ++ B}; + false -> + showroom_log:message(info, + "'~p' partition was not found in partition table", [Part]), + {noswap, Table} + end. + + +%% @doc get the difference between two FullPMaps +%% lists need to be sorted by part, then node +diff([], [], Results) -> + lists:reverse(remove_dupes(Results)); + +diff([{Node,Part,_}|PartsA], [{Node,Part,_}|PartsB], Results) -> + diff(PartsA, PartsB, Results); + +diff([{NodeA,Part,_}|PartsA], [{NodeB,Part,_}|PartsB], Results) -> + diff(PartsA, PartsB, [{NodeA,NodeB,Part}|Results]). + + +%% @doc sorts the full map for diff/3. This may change to get more accurate +%% diff w/o dupes +sort_for_diff(FullMap) -> + lists:keysort(2,lists:sort(FullMap)). + + +remove_dupes(Diff) -> + {_,_,AllParts} = lists:unzip3(Diff), + Parts = lists:usort(AllParts), + remove_dupes_from_part(Parts, Diff, []). + + +%% @doc ex: take [{a,b,1},{b,c,1}] diff and make it [{a,c,1}] so we don't go +%% moving unnecessary shard files. 'Move partition 1 from a to b and +%% then move partition 1 from b to c' is unnecessary. Just move it a to c. +remove_dupes_from_part([], _Diff, Acc) -> + Acc; + +remove_dupes_from_part([Part|Rest], Diff, Acc) -> + PartData = lists:filter(fun({_,_,P}) -> P =:= Part end, Diff), + NewPartData = process_part_data(Part, PartData, PartData, PartData), + remove_dupes_from_part(Rest, Diff, lists:concat([NewPartData, Acc])). + + +%% for one partition of the full diff, remove the dupes +process_part_data(_Part, _PartData, [], Acc) -> + Acc; + +process_part_data(Part, PartData, [{From,To,_Part}|Rest], Acc) -> + case proplists:lookup(To, PartData) of + {To, NewTo, _Part} -> + + Remove1 = proplists:delete(To, PartData), + Remove2 = proplists:delete(From, Remove1), + NewPartData = [{From, NewTo, Part}|Remove2], + %?debugFmt("~nFrom : ~p~nTo : ~p~nNewTo: ~p~n" + % "Remove1: ~p~nRemove2: ~p~n" + % "NewPartData: ~p~n" + % , [From, To, NewTo, Remove1, Remove2, NewPartData]), + process_part_data(Part, NewPartData, Rest, NewPartData); + none -> + process_part_data(Part, PartData, Rest, Acc) + end. + + +% %% @doc from dynomite +% diff([], [], Results) -> +% lists:reverse(Results); + +% diff([{Node,Part}|PartsA], [{Node,Part}|PartsB], Results) -> +% diff(PartsA, PartsB, Results); + +% diff([{NodeA,Part}|PartsA], [{NodeB,Part}|PartsB], Results) -> +% diff(PartsA, PartsB, [{NodeA,NodeB,Part}|Results]). + + +%% @doc does Node/Partition mapping based on Amazon Dynamo paper, +%% section 6.2, strategy 3, more or less +%% http://www.allthingsdistributed.com/2007/10/amazons_dynamo.html +%% @end +do_map([Node|RestNodes], Parts) -> + Max = length(Parts) / length([Node|RestNodes]), + do_map(Node, RestNodes, Parts, [], 1, Max). + + +%% return final mapped list +do_map(_,_,[],Mapped, _, _) -> + lists:keysort(1, Mapped); + +%% finish off last node, Cnt & Max no longer needed +do_map(Node, [], [Part|RestParts], Mapped, _, _) -> + do_map(Node, [], RestParts, [{Node, Part}|Mapped], 0,0); + +%% workhorse clause, iterates through parts, until Cnt > Max, then advances to +%% next node, wash, rinse, repeat +do_map(Node, [NextNode|RestNodes], [Part|RestParts], Mapped, Cnt, Max) -> + case Cnt > Max of + true -> + do_map(NextNode, RestNodes, RestParts, [{Node, Part}|Mapped], + 1, Max); + false -> + do_map(Node, [NextNode|RestNodes], RestParts, [{Node, Part}|Mapped], + Cnt+1, Max) + end. + + +%% TODO: other guards +hash_int(Hash) when is_binary(Hash) -> + <<IndexAsInt:160/integer>> = Hash, + IndexAsInt; +hash_int(Hash) when is_integer(Hash) -> + Hash. diff --git a/src/replication.erl b/src/replication.erl new file mode 100644 index 00000000..96be0ad3 --- /dev/null +++ b/src/replication.erl @@ -0,0 +1,165 @@ +%%%------------------------------------------------------------------- +%%% File: replication.erl +%%% @author Brad Anderson <brad@cloudant.com> [http://www.cloudant.com] +%%% @copyright 2009 Brad Anderson +%%% @doc +%%% +%%% @end +%%% +%%% @since 2009-06-14 by Brad Anderson +%%%------------------------------------------------------------------- +-module(replication). +-author('brad@cloudant.com'). + +%% API +-export([partners/2, partners/3, partners_plus/2]). + +-include_lib("eunit/include/eunit.hrl"). +-include("../include/config.hrl"). +-include("../include/common.hrl"). + + +%%==================================================================== +%% API +%%==================================================================== + +partners(Node, Nodes) -> + partners(Node, Nodes, configuration:get_config()). + + +%%-------------------------------------------------------------------- +%% @spec partners(Node::atom(), Nodes::list(), Config::config()) -> +%% list() +%% @doc returns the list of all replication partners for the specified node +%% @end +%%-------------------------------------------------------------------- +partners(Node, Nodes, Config) -> + N = Config#config.n, + Meta = Config#config.meta, + pick_partners(Meta, Node, Nodes, [], N - 1). + + +%% return a list of live/up Partners, and if all Partners are down, +%% walk the ring to get one other remote node and return it. +partners_plus(Node, Nodes) -> + Partners = partners(Node, Nodes), + PartnersDown = lists:subtract(Partners, erlang:nodes()), + PartnersUp = lists:subtract(Partners, PartnersDown), + case PartnersUp of + [] -> + TargetNodes = target_list(Node, Nodes), + NonPartners = lists:subtract(TargetNodes, + lists:flatten([Node, Partners])), + walk_ring(NonPartners); + _ -> + %% at least one partner is up, so gossip w/ them + PartnersUp + end. + + +%%==================================================================== +%% Internal functions +%%==================================================================== + +%% @spec pick_partners(proplist(), Node::dynomite_node(), [Node], [Node], +%% integer()) -> list() +%% @doc iterate through N-1 partner picks, returning the resulting list sorted +pick_partners(_Meta, Node, _Nodes, Acc, 0) -> + lists:sort(lists:delete(Node, Acc)); +pick_partners(Meta, Node, Nodes, Acc, Count) -> + Partner = pick_partner(Meta, Node, Nodes, Acc, 1), + NewNodes = lists:filter(fun(Elem) -> + case Elem of + no_partner_found -> false; + Partner -> false; + _ -> true + end + end, Nodes), + NewAcc = case Partner of + no_partner_found -> Acc; + _ -> [Partner|Acc] + end, + pick_partners(Meta, Node, NewNodes, NewAcc, Count-1). + + +%% @spec pick_partner(proplist(), Node::dynomite_node(), [Node], [Node], +%% integer()) -> Node::dynomite_node() +%% @doc pick a specific replication partner at the given level +pick_partner([], Node, Nodes, _Acc, 1) -> + %% handle the no metadata situation + %% Note: This clause must be before the Level > length(Meta) guarded clause + target_key(node:name(Node), lists:map(fun node:name/1, Nodes), roundrobin); + +pick_partner(Meta, _Node, _Nodes, Acc, Level) when Level > length(Meta) -> + Acc; + +pick_partner(Meta, Node, Nodes, Acc, Level) -> + MetaDict = meta_dict(Nodes, Level, dict:new()), + NodeKey = lists:sublist(node:attributes(Node), Level), + Keys = dict:fetch_keys(MetaDict), + {_MetaName, Strategy} = lists:nth(Level, Meta), + TargetKey = target_key(NodeKey, Keys, Strategy), + Candidates = dict:fetch(TargetKey, MetaDict), + case length(Candidates) of + 0 -> + %% didn't find a candidate + no_partner_found; + 1 -> + %% found only one candidate, return it + [Partner] = Candidates, + Partner; + _ -> + pick_partner(Meta, Node, Nodes, Acc, Level + 1) + end. + + +%% @doc construct a dict that holds the key of metadata values so far (up to +%% the current level, and dynomite_node() list as the value. This is used +%% to select a partner in pick_partner/5 +%% @end +meta_dict([], _Level, Dict) -> + Dict; + +meta_dict([Node|Rest], Level, Dict) -> + Key = lists:sublist(node:attributes(Node), Level), + DictNew = dict:append(Key, Node, Dict), + meta_dict(Rest, Level, DictNew). + + +%% @spec target_key(term(), list(), Strategy::atom()) -> term() +%% @doc given the key and keys, sort the list of keys based on stragety (i.e. +%% for roundrobin, sort them, put the NodeKey on the end of the list, and +%% then return the head of the list as the target. +%% @end +%% TODO: moar strategies other than roundrobin? +target_key(NodeKey, Keys, roundrobin) -> + SortedKeys = lists:sort(Keys), + TargetKey = case target_list(NodeKey, SortedKeys) of + [] -> no_partner_found; + [Key|_Rest] -> Key + end, + TargetKey. + + +%% @spec target_list(term(), list()) -> list() +%% @doc split the list of keys into 'lessthan NodeKey', NodeKey, and 'greaterthan +%% Nodekey' and then put the lessthan section on the end of the list +%% @end +target_list(_NodeKey, []) -> + []; +target_list(NodeKey, Keys) -> + {A, [NodeKey|B]} = lists:splitwith(fun(K) -> K /= NodeKey end, Keys), + lists:append([B, A, [NodeKey]]). + + +walk_ring([]) -> + %% TODO: should we be more forceful here and throw? not for now + showroom_log:message(info, + "~p:walk_ring/1 - could not find node for gossip", [?MODULE]), + []; + +walk_ring([Node|Rest]) -> + case lists:member(Node, erlang:nodes()) of + true -> [Node]; + _ -> walk_ring(Rest) + end. diff --git a/src/vector_clock.erl b/src/vector_clock.erl new file mode 100644 index 00000000..0a89d41e --- /dev/null +++ b/src/vector_clock.erl @@ -0,0 +1,99 @@ +%%% @author Cliff Moon <cliff@powerset.com> [] +%%% @copyright 2008 Cliff Moon + +-module (vector_clock). +-export ([create/1, truncate/1, increment/2, compare/2, resolve/2, merge/2, + equals/2]). + +%% -ifdef(TEST). +%% -include("etest/vector_clock_test.erl"). +%% -endif. + +create(NodeName) -> [{NodeName, lib_misc:now_float()}]. + +truncate(Clock) when length(Clock) > 10 -> + lists:nthtail(length(Clock) - 10, lists:keysort(2, Clock)); + +truncate(Clock) -> Clock. + +increment(NodeName, [{NodeName, _Version}|Clocks]) -> + [{NodeName, lib_misc:now_float()}|Clocks]; + +increment(NodeName, [NodeClock|Clocks]) -> + [NodeClock|increment(NodeName, Clocks)]; + +increment(NodeName, []) -> + [{NodeName, lib_misc:now_float()}]. + +resolve({ClockA, ValuesA}, {ClockB, ValuesB}) -> + case compare(ClockA, ClockB) of + less -> {ClockB, ValuesB}; + greater -> {ClockA, ValuesA}; + equal -> {ClockA, ValuesA}; + concurrent -> + io:format("~nConcurrent Clocks~n" + "ClockA : ~p~nClockB : ~p~n" + "ValuesA: ~p~nValuesB: ~p~n" + , [ClockA, ClockB, ValuesA, ValuesB]), + {merge(ClockA,ClockB), ValuesA ++ ValuesB} + end; +resolve(not_found, {Clock, Values}) -> + {Clock, Values}; +resolve({Clock, Values}, not_found) -> + {Clock, Values}. + +merge(ClockA, ClockB) -> + merge([], ClockA, ClockB). + +merge(Merged, [], ClockB) -> lists:keysort(1, Merged ++ ClockB); + +merge(Merged, ClockA, []) -> lists:keysort(1, Merged ++ ClockA); + +merge(Merged, [{NodeA, VersionA}|ClockA], ClockB) -> + case lists:keytake(NodeA, 1, ClockB) of + {value, {NodeA, VersionB}, TrunkClockB} when VersionA > VersionB -> + merge([{NodeA,VersionA}|Merged],ClockA,TrunkClockB); + {value, {NodeA, VersionB}, TrunkClockB} -> + merge([{NodeA,VersionB}|Merged],ClockA,TrunkClockB); + false -> + merge([{NodeA,VersionA}|Merged],ClockA,ClockB) + end. + +compare(ClockA, ClockB) -> + AltB = less_than(ClockA, ClockB), + if AltB -> less; true -> + BltA = less_than(ClockB, ClockA), + if BltA -> greater; true -> + AeqB = equals(ClockA, ClockB), + if AeqB -> equal; true -> concurrent end + end + end. + +%% ClockA is less than ClockB if and only if ClockA[z] <= ClockB[z] for all +%% instances z and there exists an index z' such that ClockA[z'] < ClockB[z'] +less_than(ClockA, ClockB) -> + ForAll = lists:all(fun({Node, VersionA}) -> + case lists:keysearch(Node, 1, ClockB) of + {value, {_NodeB, VersionB}} -> VersionA =< VersionB; + false -> false + end + end, ClockA), + Exists = lists:any(fun({NodeA, VersionA}) -> + case lists:keysearch(NodeA, 1, ClockB) of + {value, {_NodeB, VersionB}} -> VersionA /= VersionB; + false -> true + end + end, ClockA), + %length takes care of the case when clockA is shorter than B + ForAll and (Exists or (length(ClockA) < length(ClockB))). + +equals(ClockA, ClockB) -> + Equivalent = lists:all(fun({NodeA, VersionA}) -> + lists:any(fun(NodeClockB) -> + case NodeClockB of + {NodeA, VersionA} -> true; + _ -> false + end + end, ClockB) + end, ClockA), + Equivalent and (length(ClockA) == length(ClockB)). |