diff options
39 files changed, 4442 insertions, 0 deletions
diff --git a/ebin/.gitignore b/ebin/.gitignore new file mode 100644 index 00000000..13d94f8b --- /dev/null +++ b/ebin/.gitignore @@ -0,0 +1 @@ +*.app diff --git a/ebin/dynomite.appup b/ebin/dynomite.appup new file mode 100644 index 00000000..d6d7726b --- /dev/null +++ b/ebin/dynomite.appup @@ -0,0 +1,6 @@ +{"0.9.0-cloudant", [{"0.9.0-cloudant", [ + {apply, {supervisor, terminate_child, [showroom_sup, dynomite_sup]}}, + {restart_application, dynomite}, + {apply, {supervisor, delete_child, [showroom_sup, dynomite_sup]}}, + {update, showroom_sup, supervisor} +]}],[{"0.9.0-cloudant",[]}]}. diff --git a/include/chunk_size.hrl b/include/chunk_size.hrl new file mode 100644 index 00000000..f9906b5f --- /dev/null +++ b/include/chunk_size.hrl @@ -0,0 +1 @@ +-define(CHUNK_SIZE, 5120). diff --git a/include/common.hrl b/include/common.hrl new file mode 100644 index 00000000..2299950d --- /dev/null +++ b/include/common.hrl @@ -0,0 +1,41 @@ + +-include_lib("eunit/include/eunit.hrl"). + +-define(fmt(Msg, Args), lists:flatten(io_lib:format(Msg, Args))). +-define(infoFmt(Msg, Args), error_logger:info_msg(Msg, Args)). +-define(infoMsg(Msg), error_logger:info_msg(Msg)). + + +%% from couch_db.hrl +-ifndef(LOG_DEBUG). +-define(LOG_DEBUG(Format, Args), + showroom_log:message(debug, Format, Args)). +-endif. + +-ifndef(LOG_INFO). +-define(LOG_INFO(Format, Args), + showroom_log:message(info, Format, Args)). +-endif. + +-ifndef(LOG_ERROR). +-define(LOG_ERROR(Format, Args), + showroom_log:message(error, Format, Args)). +-endif. + +%% -define(PMAP(F,L), lists:map(F,L)). +-define(PMAP(F,L), showroom_utils:pmap(F,L)). + + +%% +%% membership2 (in here for separate testing module) +%% + +-define(VERSION,2). + +-record(membership, {header=?VERSION, + node, + nodes, + partitions, + version, + fullmap + }). diff --git a/include/config.hrl b/include/config.hrl new file mode 100644 index 00000000..20983d26 --- /dev/null +++ b/include/config.hrl @@ -0,0 +1,24 @@ + +-ifndef(CONFIG_HRL). +-define(CONFIG_HRL, true). +%we don't want to turn protocol buffers on by default, since the library is not included +%it should be very easy for new users to start up an instance +-record(config, {n=3, + r=1, + w=1, + q=6, + directory, + web_port, + text_port=11222, + storage_mod=dets_storage, + blocksize=4096, + thrift_port=9200, + pb_port=undefined, + buffered_writes=undefined, + cache=undefined, + cache_size=1048576, + hash_module=partitions, + meta=[] + }). + +-endif. diff --git a/include/dmerkle.hrl b/include/dmerkle.hrl new file mode 100644 index 00000000..b4fe2a08 --- /dev/null +++ b/include/dmerkle.hrl @@ -0,0 +1,14 @@ +-define(DMERKLE_VERSION, 2). +-define(STATIC_HEADER, 93). + +-define(d_from_blocksize(BlockSize), trunc((BlockSize - 17)/16)). +-define(pointers_from_blocksize(BlockSize), (lib_misc:ceiling(math:log(BlockSize)/math:log(2)) - 3)). +-define(pointer_for_size(Size, BlockSize), (if Size =< 16 -> 1; Size =< BlockSize -> ?pointers_from_blocksize(Size); true -> last end)). +-define(size_for_pointer(N), (2 bsl (N+2))). +-define(headersize_from_blocksize(BlockSize), (?STATIC_HEADER + ?pointers_from_blocksize(BlockSize) * 8)). +-define(aligned(Ptr, HeaderSize, BlockSize), (((Ptr - (HeaderSize)) rem BlockSize) == 0)). +-define(block(Ptr, HeaderSize, BlockSize), ((Ptr - (HeaderSize)) div BlockSize)). + +-record(node, {m=0, keys=[], children=[], offset=eof}). +-record(leaf, {m=0, values=[], offset=eof}). +-record(free, {offset,size=0,pointer=0}). diff --git a/include/profile.hrl b/include/profile.hrl new file mode 100644 index 00000000..2ffd8009 --- /dev/null +++ b/include/profile.hrl @@ -0,0 +1,9 @@ +-ifdef(PROF). +-define(balance_prof, dynomite_prof:balance_prof()). +-define(prof(Label), dynomite_prof:start_prof(Label)). +-define(forp(Label), dynomite_prof:stop_prof(Label)). +-else. +-define(prof(Label), true). +-define(forp(Label), true). +-define(balance_prof, true). +-endif. diff --git a/include/test.hrl b/include/test.hrl new file mode 100644 index 00000000..38fb850f --- /dev/null +++ b/include/test.hrl @@ -0,0 +1,13 @@ +-define(TMP_DIR, "../../../tmp/lib"). + +-define(TMP_FILE, fun(File) -> + filename:join(?TMP_DIR, File) + end). + +%% priv_dir() -> +%% Dir = filename:join([t:config(priv_dir), "data", atom_to_list(?MODULE), pid_to_list(self())]), +%% filelib:ensure_dir(filename:join([Dir, atom_to_list(?MODULE)])), +%% Dir. + +%% priv_file(File) -> +%% filename:join(priv_dir(), File). diff --git a/src/Makefile b/src/Makefile new file mode 100644 index 00000000..32aa1872 --- /dev/null +++ b/src/Makefile @@ -0,0 +1,11 @@ +include ../support/include.mk + +all: $(EBIN_FILES_NO_DOCS) + +doc: $(EBIN_FILES) + +debug: + $(MAKE) DEBUG=-DDEBUG + +clean: + rm -rf $(EBIN_FILES) diff --git a/src/bootstrap_manager.erl b/src/bootstrap_manager.erl new file mode 100644 index 00000000..f1303223 --- /dev/null +++ b/src/bootstrap_manager.erl @@ -0,0 +1,261 @@ +%%%------------------------------------------------------------------- +%%% File: bootstrap_manager.erl +%%% @author Cliff Moon <> [] +%%% @copyright 2009 Cliff Moon +%%% @doc This is the bootstrap manager for a cluster. +%%% +%%% @end +%%% +%%% @since 2009-07-29 by Cliff Moon +%%%------------------------------------------------------------------- +-module(bootstrap_manager). +-author('cliff@powerset.com'). +-author('brad@cloudant.com'). + +-behaviour(gen_server). + +%% API +-export([start_bootstrap/3, end_bootstrap/1, + start_link/3, start/3, stop/0, + start_transfers/0, transfers/0]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-record(state, {transfer_list, nodes, transfers, futurefullmap}). +-record(transfer, {partition, receivers, rate=0, status=starting}). + +-include("../include/config.hrl"). +-include("../include/common.hrl"). + +%%==================================================================== +%% API +%%==================================================================== +%%-------------------------------------------------------------------- +%% @spec start_link() -> {ok,Pid} | ignore | {error,Error} +%% @doc Starts the server +%% @end +%%-------------------------------------------------------------------- +start_bootstrap(State=#membership{node=Node, nodes=Nodes}, + OldFullMap, NewFullMap) -> + case partitions:diff(OldFullMap, NewFullMap) of + [] -> + % no difference in pmaps + {NewFullMap, State#membership{fullmap=NewFullMap}}; + TransferList when is_list(TransferList) -> + ?LOG_DEBUG("~nBootstrap~nNode : ~p~nTransferList :~n~p~n", + [Node, partitions:pp_diff(TransferList)]), + case start_link(TransferList, Nodes, NewFullMap) of + {ok, _Pid} -> + start_transfers(); + Other -> throw(Other) + end, + + % bootstrap has some stuff to do (async), so just give the state + % passed in for now. end_bootstrap will be called with the resulting + % state when it completes + {OldFullMap, State}; + Other -> + % probably occurs b/c T (# of nodes) < N currently. + % more nodes joining should help avoid this error. + ?LOG_ERROR("no_bootstrap - Other: ~p", [Other]), + {NewFullMap, State#membership{fullmap=NewFullMap}} + end. + + +end_bootstrap(#state{futurefullmap=FutureFullMap}) -> + end_bootstrap(FutureFullMap); + +end_bootstrap(NewFullMap) -> + gen_server:call(membership, {newfullmap, NewFullMap}), + stop(). + + +start(TransferList, Nodes, FutureFullMap) -> + gen_server:start({global, bootstrap_manager}, ?MODULE, + [TransferList, Nodes, FutureFullMap], []). + + +start_link(TransferList, Nodes, FutureFullMap) -> + gen_server:start_link({global, bootstrap_manager}, ?MODULE, + [TransferList, Nodes, FutureFullMap], []). + + +stop() -> + gen_server:cast({global, bootstrap_manager}, stop). + + +start_transfers() -> + gen_server:cast({global, bootstrap_manager}, start_transfers). + + +transfers() -> + gen_server:call({global, bootstrap_manager}, transfers). + + +%%==================================================================== +%% gen_server callbacks +%%==================================================================== + +%%-------------------------------------------------------------------- +%% @spec init(Args) -> {ok, State} | +%% {ok, State, Timeout} | +%% ignore | +%% {stop, Reason} +%% @doc Initiates the server +%% @end +%%-------------------------------------------------------------------- +init([TransferList, Nodes, FutureFullMap]) -> + process_flag(trap_exit, true), + {ok, #state{transfer_list=TransferList,nodes=Nodes, + futurefullmap=FutureFullMap}}. + + +%%-------------------------------------------------------------------- +%% @spec +%% handle_call(Request, From, State) -> {reply, Reply, State} | +%% {reply, Reply, State, Timeout} | +%% {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, Reply, State} | +%% {stop, Reason, State} +%% @doc Handling call messages +%% @end +%%-------------------------------------------------------------------- +handle_call(average_transfer_rate, _From, + State=#state{transfers=Transfers}) -> + {Sum, Cardinality} = ets:foldl( + fun(#transfer{rate=Rate}, {Sum, Cardinality}) -> + {Sum+Rate,Cardinality+1} + end, {0, 0}, Transfers), + AverageRate = Sum / Cardinality, + {reply, AverageRate, State}; + +handle_call(aggregate_transfer_rate, _From, + State=#state{transfers=Transfers}) -> + Sum = ets:foldl(fun(#transfer{rate=Rate}, Sum) -> + Rate + Sum + end, 0, Transfers), + {reply, Sum, State}; + +handle_call(transfers, _From, + State=#state{transfers=Transfers}) -> + {reply, {ok, ets:tab2list(Transfers)}, State}; + +%% at least reply that this 'catch-all' was ignored +handle_call(_Request, _From, State) -> + {reply, ignored, State}. + + +%%-------------------------------------------------------------------- +%% @spec handle_cast(Msg, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% @doc Handling cast messages +%% @end +%%-------------------------------------------------------------------- +handle_cast(stop, State) -> + {stop, normal, State}; + +handle_cast(start_transfers, + State=#state{transfer_list=TransferList}) -> + Transfers = start_transfers(TransferList, State), + {noreply, State#state{transfers=Transfers}}; + +handle_cast(_Msg, State) -> + {noreply, State}. + + +%%-------------------------------------------------------------------- +%% @spec handle_info(Info, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% @doc Handling all non call/cast messages +%% @end +%%-------------------------------------------------------------------- + +handle_info({receiver_done, FromNode, _ToNode, Partition, DbName, Receiver}, + State = #state{transfers=Transfers}) -> + %% TODO use bring_online & ToNode? instead of waiting until end & installing + %% NewFullMap into mem2 + + %% handle the old file + membership2:decommission_part(FromNode, Partition, DbName), + + %% remove from Transfers table + case ets:lookup(Transfers, Partition) of + [Transfer] = [#transfer{receivers=Receivers}] -> + NewReceivers = lists:delete(Receiver, Receivers), + if + length(NewReceivers) == 0 -> ets:delete(Transfers, Partition); + true -> ets:insert(Transfers, Transfer#transfer{receivers=NewReceivers}) + end; + _ -> ok + end, + case ets:first(Transfers) of + '$end_of_table' -> + end_bootstrap(State), + {noreply, State}; + _ -> {noreply, State} + end; + +handle_info(_Info, State) -> + {noreply, State}. + + +%%-------------------------------------------------------------------- +%% @spec terminate(Reason, State) -> void() +%% @doc This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any necessary +%% cleaning up. When it returns, the gen_server terminates with Reason. +%% The return value is ignored. +%% @end +%%-------------------------------------------------------------------- +terminate(_Reason, _State) -> + ok. + + +%%-------------------------------------------------------------------- +%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState} +%% @doc Convert process state when code is changed +%% @end +%%-------------------------------------------------------------------- +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + + +%%-------------------------------------------------------------------- +%%% Internal functions +%%-------------------------------------------------------------------- +start_transfers([], State) -> + no_transfers, % no diff in pmaps, so no transfers + end_bootstrap(State); + +start_transfers(Diff, State=#state{nodes=Nodes}) -> + case showroom_db:all_databases("") of + {ok, AllDbs} when length(AllDbs) > 0 -> + start_transfers(Diff, Nodes, configuration:get_config(), AllDbs, + ets:new(transfers, [public, set, {keypos, 2}])); + {ok, []} -> end_bootstrap(State); % no databases, so bootstrap not needed + Other -> throw(Other) % problem getting list of dbs + end. + + +start_transfers([], _, _, _, Transfers) -> + Transfers; + +start_transfers([{FromNode, ToNode, Partition} | Diff], Nodes, Config, + AllDbs, Transfers) -> + membership2:take_offline(FromNode, Partition), + Receivers = lists:map( + fun(DbName) -> + {ok, Receiver} = + bootstrap_receiver:start_link(FromNode, ToNode, Partition, + DbName, 10000, self()), + Receiver + end, AllDbs), + % NOTE: by using AllDbs, we are omitting .deleted.couch files + ets:insert(Transfers, #transfer{partition=Partition, + receivers=Receivers}), + start_transfers(Diff, Nodes, Config, AllDbs, Transfers). diff --git a/src/bootstrap_receiver.erl b/src/bootstrap_receiver.erl new file mode 100644 index 00000000..3b4907cb --- /dev/null +++ b/src/bootstrap_receiver.erl @@ -0,0 +1,121 @@ +%%%------------------------------------------------------------------- +%%% File: bootstrap_receiver.erl +%%% @author Brad Anderson <brad@cloudant.com> +%%% @copyright 2009 Brad Anderson +%%% @doc +%%% +%%% @end +%%% +%%% @since 2009-09-22 by Brad Anderson +%%%------------------------------------------------------------------- +-module(bootstrap_receiver). +-author('brad@cloudant.com'). + +-include("../include/config.hrl"). +-include("../include/common.hrl"). + +%% API +-export([start_link/6, loop/6, fetch_shard/5]). + + +%%==================================================================== +%% API +%%==================================================================== +%%-------------------------------------------------------------------- +%% @spec +%% @doc +%% @end +%%-------------------------------------------------------------------- +start_link(FromNode, ToNode, Partition, DbName, Timeout, Manager) -> + Pid = proc_lib:spawn_link(ToNode, bootstrap_receiver, loop, + [FromNode, Partition, DbName, Timeout, Manager, + self()]), + sync_wait(Pid, Timeout). + + +loop(FromNode, Partition, DbName, Timeout, Manager, Parent) -> + proc_lib:init_ack(Parent, {ok, self()}), + fetch_shard(FromNode, Partition, DbName, Timeout, Manager). + + +%% @doc run at "ToNode" via spawn_link +fetch_shard(FromNode, Partition, DbName, Timeout, Manager) -> + Directory = couch_config:get("couchdb", "database_dir"), + [_NodeName, Hostname] = string:tokens(atom_to_list(FromNode), "@"), + SrcFile = binary_to_list(partitions:shard_name(Partition, DbName)), + DestFile = showroom_utils:full_filename(Partition, DbName, Directory), + Authn = fetch_authn(), + Port = fetch_port(), + Url = lists:concat(["http://", Authn, Hostname, Port, "/", SrcFile, + ".couch"]), + Options = [{save_response_to_file, DestFile}, + {inactivity_timeout, Timeout}], + case filelib:ensure_dir(DestFile) of + ok -> ok; + {error, eexist} -> ok; % duh! + Other -> throw(Other) + end, + ?LOG_DEBUG("~n" + "Directory: ~p~n" + "Hostname : ~p~n" + "SrcFile : ~p~n" + "DestFile : ~p~n" + "Url : ~p~n" + "Options : ~p~n" + , [Directory, Hostname, SrcFile, DestFile, Url, Options]), + case ibrowse:send_req(Url, [], get, [], Options, infinity) of + {ok, "200", _Headers, Body} -> + ?LOG_DEBUG("~nBootstrap ibrowse req Body: ~p~n", [Body]), + Manager ! {receiver_done, FromNode, node(), Partition, DbName, + self()}; + Error -> + ?LOG_ERROR("~nBootstrap ibrowse req Error: ~p~n", [Error]), + throw(Error) + end. + + +%%==================================================================== +%% Internal functions +%%==================================================================== + + +%% from proc_lib.erl in otp r13b01 +sync_wait(Pid, Timeout) -> + receive + {ack, Pid, Return} -> + Return; + {'EXIT', Pid, Reason} -> + {error, Reason} + after Timeout -> + unlink(Pid), + exit(Pid, kill), + flush(Pid), + {error, timeout} + end. + + +flush(Pid) -> + receive + {'EXIT', Pid, _} -> + true + after 0 -> + true + end. + + +fetch_authn() -> + User = couch_config:get("shard_moving", "user", ""), + Pass = couch_config:get("shard_moving", "pass", ""), + if + length(User) > 0 andalso length(Pass) > 0 -> + lists:concat([User, ":", Pass, "@"]); + true -> "" + end. + + +fetch_port() -> + Port = couch_config:get("shard_moving", "port", "8080"), + if + Port =:= "80" -> ""; + true -> lists:concat([":", Port]) + end. diff --git a/src/cluster_ops.erl b/src/cluster_ops.erl new file mode 100644 index 00000000..bd2ad83d --- /dev/null +++ b/src/cluster_ops.erl @@ -0,0 +1,282 @@ +%%%------------------------------------------------------------------- +%%% File: cluster_ops.erl +%%% @author Brad Anderson <brad@cloudant.com> [http://cloudant.com] +%%% @copyright 2009 Brad Anderson +%%% @doc +%%% +%%% @end +%%% +%%% @since 2009-07-21 by Brad Anderson +%%%------------------------------------------------------------------- +-module(cluster_ops). +-author('brad@cloudant.com'). + +%% API +-export([key_lookup/3, key_lookup/5, + all_parts/4, + some_parts/4, some_parts/5, + quorum_from_each_part/3]). + +-include("../include/common.hrl"). +-include("../include/config.hrl"). + +-include("../include/profile.hrl"). + + +%%==================================================================== +%% API +%%==================================================================== + +%% @doc Get to the proper shard on N nodes by key lookup +%% +%% This fun uses quorum constants from config +key_lookup(Key, {M,F,A}, Access) -> + {N,_R,_W} = Consts = unpack_config(configuration:get_config()), + Const = get_const(Access, Consts), + key_lookup(Key, {M,F,A}, Access, Const, N). + + +%% @doc Get to the proper shard on N nodes by key lookup +%% +%% This fun uses a provided quorum constant, possibly from request, +%% possibly from config +key_lookup(Key, {M,F,A}, Access, Const, N) -> + NodeParts = membership2:nodeparts_for_key(Key), + {ResolveFun, NotFoundFun} = case Access of + r -> {fun resolve_read/1, fun resolve_not_found/2}; + w -> {fun resolve_write/1, fun(_,_) -> {false, notused, []} end} + end, + MapFun = fun({Node,Part}) -> + try + rpc:call(Node, M, F, [[Part | A]]) + catch Class:Exception -> + {error, Class, Exception} + end + end, + {GoodReplies, Bad} = pcall(MapFun, NodeParts, Const), + if length(Bad) > 0 -> ?LOG_DEBUG("~nBad: ~p~n", [Bad]); true -> ok end, + Good = lists:map(fun strip_ok/1, GoodReplies), + final_key_lookup(Good, Bad, N, Const, ResolveFun, NotFoundFun, Access). + + +%% @doc Do op on all shards (and maybe even replication partners) +all_parts({M,F,A}, Access, AndPartners, ResolveFun) -> + NodePartList = membership2:all_nodes_parts(AndPartners), + MapFun = fun({Node, Part}) -> + try + rpc:call(Node, M, F, [[Part | A]]) + catch Class:Exception -> + {error, Class, Exception} + end + end, + Replies = ?PMAP(MapFun, NodePartList), + {Good, Bad} = lists:partition(fun valid/1, Replies), + final_all_parts(Good, Bad, length(NodePartList), ResolveFun, Access). + + +%% @doc Do op on some shards, depending on list of keys sent in. +%% +%% This fun uses quorum constants from config +some_parts(KeyFun, SeqsKVPairs, {M,F,A}, Access) -> + Const = get_const(Access), + some_parts(KeyFun, SeqsKVPairs, {M,F,A}, Access, Const). + + +%% @doc Do op on some shards, depending on list of keys sent in. +%% +%% This fun uses a provided quorum constant, possibly from request, +%% possibly from config +some_parts(KeyFun, SeqsKVPairs, {M,F,A}, _Access, Const) -> + TaskFun = fun({{Node,Part}, Values}) -> + try + rpc:call(Node, M, F, [[Part | [Values | A]]]) + catch Class:Exception -> + {error, Class, Exception} + end + end, + + % get tasks per node that are part / values for that partition + DistTasks = get_dist_tasks(KeyFun, SeqsKVPairs), + + % With the distributed tasklist in hand, do the tasks per partition. + % For each partition, do the work on all nodes/parts. + TaskReplies = ?PMAP(TaskFun, DistTasks), + {GoodReplies, Bad} = lists:partition(fun valid/1, TaskReplies), + if length(Bad) > 0 -> ?LOG_DEBUG("~nBad: ~p~n", [Bad]); true -> ok end, + Good = lists:map(fun strip_ok/1, GoodReplies), + final_some_parts(Good, Bad, Const). + + +quorum_from_each_part({M,F,A}, Access, ResolveFun) -> + Const = get_const(Access), + {_, Parts} = lists:unzip(membership2:partitions()), + PartsMapFun = fun(Part) -> + Nodes = membership2:nodes_for_part(Part), + NodesMapFun = fun(Node) -> rpc:call(Node, M, F, [[Part | A]]) end, + {GoodReplies,BadReplies} = pcall(NodesMapFun, Nodes, Const), + Good1 = lists:map(fun strip_ok/1, GoodReplies), + Bad1 = case length(Good1) >= Const of + true -> []; + false -> BadReplies + end, + {Good1,Bad1} + end, + Results1 = ?PMAP(PartsMapFun, Parts), + {Good,Bad} = lists:foldl(fun({G,B}, {GAcc,BAcc}) -> + {lists:append(G,GAcc),lists:append(B,BAcc)} + end, {[],[]}, Results1), + if length(Bad) > 0 -> ?LOG_DEBUG("~nBad: ~p~n", [Bad]); true -> ok end, + final_quorum_from_each_part(Good, Bad, length(Parts), ResolveFun, Access). + + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + +final_key_lookup(Good, Bad, N, Const, ResolveFun, NotFoundFun, Access) -> + {NotFound, Return, Reasons} = NotFoundFun(Bad, Const), + if + length(Good) >= Const -> {ok, ResolveFun(Good)}; + NotFound -> {ok, Return, Reasons}; + true -> error_message(Good, Bad, N, Const, Access) + end. + + +final_all_parts(Good, Bad, Total, ResolveFun, Access) -> + case length(Good) =:= Total of + true -> {ok, ResolveFun(Good)}; + _ -> error_message(Good, Bad, Total, Total, Access) + end. + + +final_some_parts(Good, _Bad, Const) -> + Good1 = lists:flatten(Good), + {Seqs, _} = lists:unzip(Good1), + {ResG,ResB} = + lists:foldl( + fun(Seq, {AccG,AccB}) -> + Vals = proplists:get_all_values(Seq, Good1), + case length(Vals) >= Const of + true -> {[{Seq, Vals}|AccG],AccB}; + _ -> {AccG, [{Seq, Vals}|AccB]} + end + end, {[],[]}, lists:usort(Seqs)), + case length(ResB) of + 0 -> {ok, ResG}; + _ -> {error, ResB} + end. + + +final_quorum_from_each_part(Good, Bad, Total, ResolveFun, Access) -> + case length(Good) =:= Total of + true -> {ok, ResolveFun(Good)}; + _ -> error_message(Good, Bad, Total, Total, Access) + end. + + +resolve_read([First|Responses]) -> + case First of + not_found -> not_found; + _ -> lists:foldr(fun vector_clock:resolve/2, First, Responses) + end. + + +resolve_write([First|Responses]) -> + case First of + not_found -> not_found; + _ -> lists:foldr(fun vector_clock:resolve/2, First, Responses) + end. + + +resolve_not_found(Bad, R) -> + {NotFoundCnt, DeletedCnt, OtherReasons} = + lists:foldl(fun({Error,Reason}, {NotFoundAcc, DeletedAcc, ReasonAcc}) -> + case {Error,Reason} of + {not_found, {_Clock, [missing|_Rest]}} -> + {NotFoundAcc+1, DeletedAcc, ReasonAcc}; + {not_found, {_Clock, [deleted|_Rest]}} -> + {NotFoundAcc, DeletedAcc+1, ReasonAcc}; + _ -> + {NotFoundAcc, DeletedAcc, [Reason|ReasonAcc]} + end + end, {0, 0, []}, Bad), + % TODO: is the comparison to R good here, or should it be N-R? + if + NotFoundCnt >= R -> {true, {not_found, missing}, OtherReasons}; + DeletedCnt >= R -> {true, {not_found, deleted}, OtherReasons}; + true -> {false, other, OtherReasons} + end. + + +error_message(Good, Bad, N, T, Access) -> + Msg = list_to_atom(lists:concat([atom_to_list(Access), "_quorum_not_met"])), + ?LOG_ERROR("~p~nSuccess on ~p of ~p servers. Needed ~p. Errors: ~w" + , [Msg, length(Good), N, T, Bad]), + [{error, Msg}, {good, Good}, {bad, Bad}]. + + +unpack_config(#config{n=N,r=R,w=W}) -> + {N, R, W}. + + +pcall(MapFun, Servers, Const) -> + Replies = lib_misc:pmap(MapFun, Servers, Const), + lists:partition(fun valid/1, Replies). + + +valid({ok, _}) -> true; +valid(ok) -> true; +valid(_) -> false. + + +strip_ok({ok, Val}) -> Val; +strip_ok(Val) -> Val. + + +%% @spec get_dist_tasks(KeyFun::function(), KVPairs::list()) -> +%% [{{Node::node(), Part::integer()}, SeqVals}] +%% Type - ordered | ?? +%% SeqVals - [{Seq, Val}] +%% @doc builds a distributed task list of nodes with a list of shard/values. +%% This looks like a dict structure +%% but is a list so we can use ?PMAP with the results +%% @end +get_dist_tasks(KeyFun, SeqsKVPairs) -> + %% loop thru SeqsKVPairs adding node/part to each + NPSV = lists:flatmap( + fun({Seq,KVPair}) -> + NodeParts = membership2:nodeparts_for_key(KeyFun(KVPair)), + lists:map( + fun(NodePart) -> + {NodePart, {Seq, KVPair}} + end, NodeParts) + end, SeqsKVPairs), + nodepart_values_list(NPSV). + + +%% pile up the List by NodePart (like a dict) +nodepart_values_list(List) -> + DistTasks = + lists:foldl( + fun(NodePart, AccIn) -> + Values = proplists:get_all_values(NodePart, List), + case length(Values) of + 0 -> AccIn; + _ -> [{NodePart, Values} | AccIn] + end + end, [], membership2:all_nodes_parts(true)), + % ?LOG_DEBUG("~nDistTasks: ~p~n", [DistTasks]), + DistTasks. + + +get_const(Access) -> + get_const(Access, unpack_config(configuration:get_config())). + + +get_const(Access, {_N,R,W}) -> + case Access of + r -> R; + w -> W; + r1 -> 1; + Other -> throw({bad_access_term, Other}) + end. diff --git a/src/configuration.erl b/src/configuration.erl new file mode 100644 index 00000000..1caca5ec --- /dev/null +++ b/src/configuration.erl @@ -0,0 +1,99 @@ +%%% -*- erlang-indent-level:2 -*- +%%%------------------------------------------------------------------- +%%% File: configuration.erl +%%% @author Cliff Moon <cliff@powerset.com> +%%% @author Brad Anderson <brad@cloudant.com> +%%% @copyright 2008 Cliff Moon +%%% @doc +%%% This module keeps Dynomite source relatively unchanged, but +%%% reads from couchdb config stuffs +%%% @end +%%% +%%% @since 2008-07-18 by Cliff Moon +%%%------------------------------------------------------------------- +-module(configuration). +-author('cliff@powerset.com'). +-author('brad@cloudant.com'). + +%%-behaviour(gen_server). + +%% API +-export([start_link/1, get_config/1, get_config/0, set_config/1, stop/0]). + +-include_lib("eunit/include/eunit.hrl"). + +-include("../include/config.hrl"). +-include("../include/common.hrl"). + +-define(SERVER, couch_config). +-define(i2l(V), integer_to_list(V)). +-define(l2i(V), list_to_integer(V)). + + +%% ----------------------------------------------------------------- +%% API +%% ----------------------------------------------------------------- + +%% @doc starts couch_config gen_server if it's not already started +start_link(DynomiteConfig) -> + couch_config:start_link([]), + set_config(DynomiteConfig). + + +%% @doc get the config for a remote node +get_config(Node) -> + ClusterConfig = rpc:call(Node, couch_config, get, ["cluster"]), + Directory = rpc:call(Node, couch_config, get, ["couchdb", "database_dir"]), + couch2dynomite_config(ClusterConfig, Directory). + + +%% @doc get the config for the local node +get_config() -> + get_config(node()). + + +%% @doc given a Dynomite config record, put the values into the Couch config +set_config(DynomiteConfig) -> + dynomite2couch_config(DynomiteConfig). + + +%% @doc stop the config server (nothing to do until after couch_config refactor) +stop() -> + couch_config:stop(). + + +%% ----------------------------------------------------------------- +%% Internal functions +%% ----------------------------------------------------------------- + +%% @doc turn a couch config proplist into a dynomite configuration record +couch2dynomite_config(ClusterConfig, Directory) -> + Q = ?l2i(proplists:get_value("q", ClusterConfig, "3")), + R = ?l2i(proplists:get_value("r", ClusterConfig, "2")), + W = ?l2i(proplists:get_value("w", ClusterConfig, "1")), + N = ?l2i(proplists:get_value("n", ClusterConfig, "4")), + %% use couch's database_dir here, to avoid /tmp/data not existing + Webport = ?l2i(proplists:get_value("webport", ClusterConfig, "8080")), + Meta = proplists:get_value("meta", ClusterConfig, []), + StorageMod = proplists:get_value("storage_mod", ClusterConfig, []), + #config{q=Q, r=R, w=W, n=N, directory=Directory, web_port=Webport, + meta=Meta, storage_mod=StorageMod}. + + +%% @doc workhorse for set_config/1 above +dynomite2couch_config(DynomiteConfig) -> + couch_config:set("cluster", "q", ?i2l(DynomiteConfig#config.q), false), + couch_config:set("cluster", "r", ?i2l(DynomiteConfig#config.r), false), + couch_config:set("cluster", "w", ?i2l(DynomiteConfig#config.w), false), + couch_config:set("cluster", "n", ?i2l(DynomiteConfig#config.n), false), + couch_config:set("couchdb", "database_dir", DynomiteConfig#config.directory, + false), + couch_config:set("cluster", "webport", + case DynomiteConfig#config.web_port of + undefined -> "8080"; + _ -> ?i2l(DynomiteConfig#config.web_port) + end, false), + couch_config:set("cluster", "meta", DynomiteConfig#config.meta, false), + couch_config:set("cluster", "storage_mod", + DynomiteConfig#config.storage_mod, false), + ok. diff --git a/src/dynomite.erl b/src/dynomite.erl new file mode 100644 index 00000000..1b9798c0 --- /dev/null +++ b/src/dynomite.erl @@ -0,0 +1,23 @@ +%%% @author Brad Anderson <brad@cloudant.com> +%%% @doc convenience start/stop functions for Dynomite +%%% +-module(dynomite). +-author('Brad Anderson <brad@cloudant.com>'). + +-export([start/0, stop/0, restart/0]). + + +%% @doc start Dynomite app with no args, for -s at the command-line +start() -> + application:start(dynomite). + + +%% @doc stops the Dynomite application +stop() -> + application:stop(dynomite). + + +%% @doc restart Dynomite app, with no args +restart() -> + stop(), + start(). diff --git a/src/dynomite_app.erl b/src/dynomite_app.erl new file mode 100644 index 00000000..6ee0b978 --- /dev/null +++ b/src/dynomite_app.erl @@ -0,0 +1,145 @@ +%%%------------------------------------------------------------------- +%%% File: dynomite.erl +%%% @author Cliff Moon <cliff@powerset.com> [] +%%% @copyright 2008 Cliff Moon +%%% @doc +%%% +%%% @end +%%% +%%% @since 2008-06-27 by Cliff Moon +%%%------------------------------------------------------------------- +-module(dynomite_app). +-author('cliff@powerset.com'). +-author('brad@cloudant.com'). + +-behaviour(application). + +-include("../include/config.hrl"). +-include("../../couch/src/couch_db.hrl"). + +%% Application callbacks +-export([start/2, stop/1]). + +-define(APPS, [crypto,sasl,mochiweb]). +-define(DEFAULT_CLUSTER_URL, "http://localhost:5984/_cluster"). + +%%==================================================================== +%% Application callbacks +%%==================================================================== +%%-------------------------------------------------------------------- +%% @spec start(Type, StartArgs) -> {ok, Pid} | +%% {ok, Pid, State} | +%% {error, Reason} +%% @doc This function is called whenever an application +%% is started using application:start/1,2, and should start the processes +%% of the application. If the application is structured according to the +%% OTP design principles as a supervision tree, this means starting the +%% top supervisor of the tree. +%% @end +%%-------------------------------------------------------------------- + + +%% @doc start required apps, join cluster, start dynomite supervisor +start(_Type, _StartArgs) -> + % get process_dict hack for startargs (i.e. not from .app file) + PdStartArgs = case erase(startargs) of + undefined -> + []; + Args -> + Args + end, + + % start required apps + State = start_apps(), + + % start dynomite supervisor + ok = start_node(), + case dynomite_sup:start_link(PdStartArgs) of + {ok, Supervisor} -> + {ok, Supervisor, State}; + Error -> + Error + end. + + +%%-------------------------------------------------------------------- +%% @spec stop(State) -> void() +%% @doc This function is called whenever an application +%% has stopped. It is intended to be the opposite of Module:start/2 and +%% should do any necessary cleaning up. The return value is ignored. +%% @end +%%-------------------------------------------------------------------- +stop({_, Sup}) -> + showroom_log:message(alert, "dynomite application stopped", []), + exit(Sup, normal), + ok. + + +%%==================================================================== +%% Internal functions +%%==================================================================== + +start_apps() -> + Fun = fun(App, AccIn) -> + Result = case application:start(App) of + ok -> + App; + {error, {already_started, App}} -> + nil; + _Error -> + exit(app_start_fail) + end, + if + Result =/= nil -> [App|AccIn]; + true -> AccIn + end + end, + lists:foldl(Fun, [], ?APPS). + + +%% @spec start_node() -> ok | {error, Reason} +%% @doc start this node (join to dist. erlang cluster) +start_node() -> + PingUrl = couch_config:get("cluster","ping", ?DEFAULT_CLUSTER_URL), + ?LOG_DEBUG("PingUrl: ~p", [PingUrl]), + Result = case get_pingnode(PingUrl, 1) of + {ok, PingNode} -> + join(PingNode); + _ -> + ?LOG_INFO("No pingnode found. Becoming single-node cluster", []) + end, + couch_api:create_db(<<"users">>, []), % all nodes have local 'users' db + Result. + + +%% @spec get_pingnode(Url::string(), Retries::int()) -> node() | +%% {error, Reason} +%% @doc make a http get call to Url to get cluster information +get_pingnode(Url, Retries) -> + try couch_rep_httpc:request(#http_db{url=Url, retries=Retries}) of + {[{<<"ping_node">>, Node}]} -> + {ok, list_to_atom(binary_to_list(Node))}; + _ -> + {error, pingnode_not_found} + catch + _:_ -> + {error, pingnode_not_found} + end. + + +join(PingNode) -> + if + node() =:= PingNode -> + ok; % we must be brain, so we'll take over the world + true -> + case net_adm:ping(PingNode) of + pong -> + % there is a cluster, we just joined it + ?LOG_DEBUG("ping successful, we're in.", []), + timer:sleep(1000); %% grr, what a hack, erlang. rly? + pang -> + ?LOG_ERROR("ping not successful.", []), + throw({cluster_error, ?l2b("cluster ping not successful")}) + end + end, + ok. diff --git a/src/dynomite_couch_api.erl b/src/dynomite_couch_api.erl new file mode 100644 index 00000000..a5ad53c4 --- /dev/null +++ b/src/dynomite_couch_api.erl @@ -0,0 +1,140 @@ +%% This is a Dynomite plugin for calling the CouchDB raw Erlang API +%% +%% Most calls will have come from any of the web endpoints to execute +%% these functions on the proper node for the key(s). + +-module(dynomite_couch_api). +-author('brad@cloudant.com'). + +-export([create_db/1, delete_db/1, get/1, put/1, + bulk_docs/1, missing_revs/1, get_db_info/1, get_view_group_info/1, + ensure_full_commit/1 + ]). + +-include("../../couch/src/couch_db.hrl"). +-include("../include/common.hrl"). + + +%%-------------------------------------------------------------------- +%% @spec create_db([Part, DbName, Options]) -> {ok,Db} | {error,Error} +%% Description: Creates the database shard. +%%-------------------------------------------------------------------- +create_db([Part, DbName, Options]) -> + case couch_server:create(partitions:shard_name(Part, DbName), Options) of + {ok, Shard} -> + couch_db:close(Shard), + ok; + Error -> Error + end. + + +%%-------------------------------------------------------------------- +%% @spec delete_db([Part, DbName, Options]) -> {ok,deleted} | {error,Error} +%% Description: Deletes the database shard. +%%-------------------------------------------------------------------- +delete_db([Part, DbName, Options]) -> + couch_server:delete(partitions:shard_name(Part, DbName), Options). + + +get([Part, Db, DocId, Revs, Options]) -> + case showroom_db:open_shard(node(), Part, Db) of + {ok, Shard} -> + {Status, Doc} = couch_api:open_doc(Shard, DocId, Revs, Options), + showroom_db:close_shard(Shard), + {Status, {[], [Doc]}}; + Error -> + Error + end. + + +put([Part, Db, Doc = #doc{clock=Clock}, Options]) -> + case showroom_db:open_shard(node(), Part, Db) of + {ok, Shard} -> + {Status, NewRev} = couch_db:update_doc(Shard, Doc, Options), + showroom_db:close_shard(Shard), + {Status, {Clock, [NewRev]}}; + Error -> + Error + end. + + +bulk_docs([Part, SeqsDocs, Db, Options, Type]) -> + {Seqs, Docs} = lists:unzip(SeqsDocs), + case Docs of + [] -> {ok, []}; + _ -> + case showroom_db:open_shard(node(), Part, Db) of + {ok, Shard} -> + {ok, Results1} = couch_db:update_docs(Shard, Docs, Options, Type), + showroom_db:close_shard(Shard), + Results = int_zip(Seqs, Results1), + {ok, Results}; + Error -> + Error + end + end. + + +missing_revs([Part, SeqsIdsRevs, Db]) -> + {_Seqs, IdsRevs} = lists:unzip(SeqsIdsRevs), + case IdsRevs of + [] -> {ok, []}; + _ -> + case showroom_db:open_shard(node(), Part, Db) of + {ok, Shard} -> + {ok, Results1} = couch_db:get_missing_revs(Shard, IdsRevs), + showroom_db:close_shard(Shard), + {ok, Results1}; + Error -> + Error + end + end. + + +get_db_info([Part, Db]) -> + case showroom_db:open_shard(node(), Part, Db) of + {ok, Shard} -> + {Status, Info} = couch_db:get_db_info(Shard), + showroom_db:close_shard(Shard), + {Status, {[], Info}}; + Error -> + Error + end. + +get_view_group_info([Part, Db, DesignId]) -> + case showroom_db:open_shard(node(), Part, Db) of + {ok, Shard} -> + {ok, EmptyGroup} = showroom_view:build_skeleton_view_group(Db, DesignId), + <<"S", ShardName/binary>> = Shard#db.name, + {ok, Pid} = gen_server:call(couch_view, {get_group_server, + ShardName, EmptyGroup}), + {ok, Info} = couch_view_group:request_group_info(Pid), + showroom_db:close_shard(Shard), + {ok, {[], Info}}; + Error -> + Error + end. + + +ensure_full_commit([Part, Db]) -> + case showroom_db:open_shard(node(), Part, Db) of + {ok, Shard} -> + {Status, Info} = couch_db:ensure_full_commit(Shard), + showroom_db:close_shard(Shard), + {Status, {[], Info}}; + Error -> + Error + end. + + +%% ======================= +%% internal +%% ======================= + +int_zip(Seqs, Docs) when length(Seqs) == length(Docs) -> + lists:zip(Seqs, Docs); +int_zip(_Seqs, []) -> + []; +int_zip(Seqs, Docs) -> + ?debugFmt("~nWTF? int_zip~nSeqs: ~p~nDocs: ~p~n", [Seqs, Docs]), + []. diff --git a/src/dynomite_couch_storage.erl b/src/dynomite_couch_storage.erl new file mode 100644 index 00000000..4fd21b80 --- /dev/null +++ b/src/dynomite_couch_storage.erl @@ -0,0 +1,41 @@ +%%%------------------------------------------------------------------- +%%% File: dynomite_couch_storage.erl +%%% @author Brad Anderson +%%% @copyright 2009 Brad Anderson +%%% @doc +%%% +%%% @end +%%% +%%% @since 2009-07-14 +%%%------------------------------------------------------------------- +-module(dynomite_couch_storage). +-author('brad@cloudant.com'). + +%% API +-export([name/1, open/2, close/1, create/2]). +%% , close/1, get/2, put/4, has_key/2, delete/2, fold/3 + +-include_lib("../include/common.hrl"). + +%% -record(row, {key, context, values}). + +%%==================================================================== +%% API +%%==================================================================== + +name(Boundary) -> + showroom_utils:int_to_hexstr(Boundary). + +open(Directory, Name) -> +%% ?debugFmt("~nDirectory: ~p~nName : ~p~n", [Directory,Name]), + {ok, {Directory, Name}}. + +close(_Table) -> ok. + +create(_Directory, _Name) -> + ok. + + +%%==================================================================== +%% Internal functions +%%==================================================================== diff --git a/src/dynomite_http.erl b/src/dynomite_http.erl new file mode 100644 index 00000000..8b6f7fbb --- /dev/null +++ b/src/dynomite_http.erl @@ -0,0 +1,21 @@ +%%%------------------------------------------------------------------- +%%% File : dynomite_http.erl +%%% Author : Brad Anderson <brad@cloudant.com> +%%% Description : +%%% +%%% Created : 10 Jan 2010 by Brad Anderson <brad@cloudant.com> +%%%------------------------------------------------------------------- +-module(dynomite_http). +-author('Brad Anderson <brad@cloudant.com>'). + +-include("../couch/src/couch_db.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-export([handle_cluster_info/1]). + + +%% GET /_cluster +handle_cluster_info(#httpd{method='GET', path_parts=[_]}=Req) -> + ClusterInfo = [{<<"ping_node">>, ?l2b(atom_to_list(node()))}], + showroom_log:message(info, "Cluster Info: ~p", [ClusterInfo]), + couch_httpd:send_json(Req, {ClusterInfo}). diff --git a/src/dynomite_prof.erl b/src/dynomite_prof.erl new file mode 100644 index 00000000..80c4b5b7 --- /dev/null +++ b/src/dynomite_prof.erl @@ -0,0 +1,164 @@ +%%%------------------------------------------------------------------- +%%% File: dynomite_prof.erl +%%% @author Cliff Moon <> [] +%%% @copyright 2009 Cliff Moon +%%% @doc +%%% +%%% @end +%%% +%%% @since 2009-02-15 by Cliff Moon +%%%------------------------------------------------------------------- +-module(dynomite_prof). +-author('cliff@powerset.com'). + +-behaviour(gen_server). + +%% API +-export([start_link/0, start_prof/1, stop_prof/1, stats/1, averages/0, balance_prof/0]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-record(state, {ets,balance}). + +-record(profile, {name, count, sum}). + +%%==================================================================== +%% API +%%==================================================================== +%%-------------------------------------------------------------------- +%% @spec start_link() -> {ok,Pid} | ignore | {error,Error} +%% @doc Starts the server +%% @end +%%-------------------------------------------------------------------- +start_link() -> + gen_server:start_link({local, dynomite_prof}, ?MODULE, [], []). + +stats(Id) -> + gen_server:call(dynomite_prof, {stats, Id}). + +balance_prof() -> + gen_server:cast(dynomite_prof, {balance, self(), lib_misc:now_float()}). + +start_prof(Id) -> + gen_server:cast(dynomite_prof, {start, self(), Id, lib_misc:now_float()}). + +stop_prof(Id) -> + gen_server:cast(dynomite_prof, {stop, self(), Id, lib_misc:now_float()}). + +averages() -> + gen_server:call(dynomite_prof, averages). + +%%==================================================================== +%% gen_server callbacks +%%==================================================================== + +%%-------------------------------------------------------------------- +%% @spec init(Args) -> {ok, State} | +%% {ok, State, Timeout} | +%% ignore | +%% {stop, Reason} +%% @doc Initiates the server +%% @end +%%-------------------------------------------------------------------- +init([]) -> + Tid = ets:new(profiling, [set, {keypos, 2}]), + Bal = ets:new(balance, [set]), + {ok, #state{ets=Tid, balance=Bal}}. + +%%-------------------------------------------------------------------- +%% @spec +%% handle_call(Request, From, State) -> {reply, Reply, State} | +%% {reply, Reply, State, Timeout} | +%% {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, Reply, State} | +%% {stop, Reason, State} +%% @doc Handling call messages +%% @end +%%-------------------------------------------------------------------- +handle_call({stats, Id}, _From, State = #state{ets=Ets}) -> + Reply = ets:lookup(Ets, Id), + {reply, Reply, State}; + +handle_call(table, _From, State = #state{ets=Ets}) -> + {reply, Ets, State}; + +handle_call(averages, _From, State = #state{ets=Ets,balance=Bal}) -> + Avgs = ets:foldl(fun(#profile{name=Name,count=Count,sum=Sum}, List) -> + [{Name, Sum/Count}|List] + end, [], Ets), + {_, MaxCount} = ets:foldl(fun + ({Pid, Count}, {_P, M}) when Count > M -> {Pid, Count}; + (_, {P, M}) -> {P, M} + end, {pid, 0}, Bal), + Balances = ets:foldl(fun({Pid, Count}, List) -> + [{Pid, Count / MaxCount} | List] + end, [], Bal), + {reply, [Balances, Avgs], State}. + +%%-------------------------------------------------------------------- +%% @spec handle_cast(Msg, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% @doc Handling cast messages +%% @end +%%-------------------------------------------------------------------- +handle_cast({balance, Pid, _Time}, State = #state{balance=Ets}) -> + case ets:lookup(Ets, Pid) of + [] -> ets:insert(Ets, {Pid, 1}); + [{Pid, Count}] -> ets:insert(Ets, {Pid, Count+1}) + end, + {noreply, State}; + +handle_cast({start, Pid, Id, Time}, State = #state{ets=_Ets}) -> + put({Pid,Id}, Time), + {noreply, State}; + +handle_cast({stop, Pid, Id, Time}, State = #state{ets=Ets}) -> + case get({Pid, Id}) of + undefined -> ok; + OldTime -> + erase({Pid, Id}), + increment_time(Ets, Time-OldTime, Id) + end, + {noreply, State}. + +%%-------------------------------------------------------------------- +%% @spec handle_info(Info, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% @doc Handling all non call/cast messages +%% @end +%%-------------------------------------------------------------------- +handle_info(_Info, State) -> + {noreply, State}. + +%%-------------------------------------------------------------------- +%% @spec terminate(Reason, State) -> void() +%% @doc This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any necessary +%% cleaning up. When it returns, the gen_server terminates with Reason. +%% The return value is ignored. +%% @end +%%-------------------------------------------------------------------- +terminate(_Reason, _State) -> + ok. + +%%-------------------------------------------------------------------- +%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState} +%% @doc Convert process state when code is changed +%% @end +%%-------------------------------------------------------------------- +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%-------------------------------------------------------------------- +%%% Internal functions +%%-------------------------------------------------------------------- +increment_time(Ets, Time, Id) -> + case ets:lookup(Ets, Id) of + [] -> ets:insert(Ets, #profile{name=Id,count=1,sum=Time}); + [#profile{name=Id,count=Count,sum=Sum}] -> ets:insert(Ets, #profile{name=Id,count=Count+1,sum=Sum+Time}) + end. diff --git a/src/dynomite_sup.erl b/src/dynomite_sup.erl new file mode 100644 index 00000000..f8136934 --- /dev/null +++ b/src/dynomite_sup.erl @@ -0,0 +1,85 @@ +%%%------------------------------------------------------------------- +%%% File: dynomite_sup.erl +%%% @author Cliff Moon <cliff@powerset.com> [] +%%% @copyright 2008 Cliff Moon +%%% @doc +%%% +%%% @end +%%% +%%% @since 2008-06-27 by Cliff Moon +%%%------------------------------------------------------------------- +-module(dynomite_sup). +-author('cliff@powerset.com'). + +-behaviour(supervisor). + +%% API +-export([start_link/1]). + +%% Supervisor callbacks +-export([init/1]). + +-include("../include/config.hrl"). + +-define(SERVER, ?MODULE). + +%%==================================================================== +%% API functions +%%==================================================================== +%%-------------------------------------------------------------------- +%% @spec start_link() -> {ok,Pid} | ignore | {error,Error} +%% @doc Starts the supervisor +%% @end +%%-------------------------------------------------------------------- +start_link(Hints) -> + supervisor:start_link(?MODULE, [Hints]). + +%%==================================================================== +%% Supervisor callbacks +%%==================================================================== +%%-------------------------------------------------------------------- +%% @spec init(Args) -> {ok, {SupFlags, [ChildSpec]}} | +%% ignore | +%% {error, Reason} +%% @doc Whenever a supervisor is started using +%% supervisor:start_link/[2,3], this function is called by the new process +%% to find out about restart strategy, maximum restart frequency and child +%% specifications. +%% @end +%%-------------------------------------------------------------------- +init(Args) -> + Node = node(), + Nodes = running_nodes() ++ [node()], + Membership = {membership, + {membership2, start_link, [Node, Nodes, Args]}, + permanent, + 1000, + worker, + [membership2]}, + MemEventMgr = {mem_event_manager, + {gen_event, start_link, [{local, membership_events}]}, + permanent, + 1000, + worker, + []}, + {ok, {{one_for_one,10,1}, [Membership, MemEventMgr]}}. + + +%%==================================================================== +%% Internal functions +%%==================================================================== + +%% @doc get a list of running nodes visible to this local node +running_nodes() -> + [Node || Node <- nodes([this,visible]), running(Node)]. + +%% @doc monitor the membership server on Node from here +running(Node) -> + Ref = erlang:monitor(process, {membership, Node}), + R = receive + {'DOWN', Ref, _, _, _} -> false + after 1 -> + true + end, + erlang:demonitor(Ref), + R. diff --git a/src/lib_misc.erl b/src/lib_misc.erl new file mode 100644 index 00000000..f5449295 --- /dev/null +++ b/src/lib_misc.erl @@ -0,0 +1,235 @@ +-module(lib_misc). + +-define(OFFSET_BASIS, 2166136261). +-define(FNV_PRIME, 16777619). + +-export([rm_rf/1, pmap/3, succ/1, fast_acc/3, hash/1, hash/2, fnv/1, + nthdelete/2, zero_split/1, nthreplace/3, rand_str/1, position/2, + shuffle/1, floor/1, ceiling/1, time_to_epoch_int/1, + time_to_epoch_float/1, now_int/0, now_float/0, byte_size/1, listify/1, + reverse_bits/1]). + +-include("../include/config.hrl"). +-include("../include/profile.hrl"). + + +rm_rf(Name) when is_list(Name) -> + case filelib:is_dir(Name) of + false -> + file:delete(Name); + true -> + case file:list_dir(Name) of + {ok, Filenames} -> + lists:foreach(fun rm_rf/1, [ filename:join(Name, F) || F <- Filenames]), + file:del_dir(Name); + {error, Reason} -> error_logger:info_msg("rm_rf failed because ~p~n", [Reason]) + end + end. + +zero_split(Bin) -> + zero_split(0, Bin). + +zero_split(N, Bin) when N > erlang:byte_size(Bin) -> Bin; + +zero_split(N, Bin) -> + case Bin of + <<_:N/binary, 0:8, _/binary>> -> split_binary(Bin, N); + _ -> zero_split(N+1, Bin) + end. + +rand_str(N) -> + lists:map(fun(_I) -> + random:uniform(26) + $a - 1 + end, lists:seq(1,N)). + +nthreplace(N, E, List) -> + lists:sublist(List, N-1) ++ [E] ++ lists:nthtail(N, List). + +nthdelete(N, List) -> + nthdelete(N, List, []). + +nthdelete(0, List, Ret) -> + lists:reverse(Ret) ++ List; + +nthdelete(_, [], Ret) -> + lists:reverse(Ret); + +nthdelete(1, [_E|L], Ret) -> + nthdelete(0, L, Ret); + +nthdelete(N, [E|L], Ret) -> + nthdelete(N-1, L, [E|Ret]). + +floor(X) -> + T = erlang:trunc(X), + case (X - T) of + Neg when Neg < 0 -> T - 1; + Pos when Pos > 0 -> T; + _ -> T + end. + +ceiling(X) -> + T = erlang:trunc(X), + case (X - T) of + Neg when Neg < 0 -> T; + Pos when Pos > 0 -> T + 1; + _ -> T + end. + +succ([]) -> + []; + +succ(Str) -> + succ_int(lists:reverse(Str), []). + +succ_int([Char|Str], Acc) -> + if + Char >= $z -> succ_int(Str, [$a|Acc]); + true -> lists:reverse(lists:reverse([Char+1|Acc]) ++ Str) + end. + +fast_acc(_, Acc, 0) -> Acc; + +fast_acc(Fun, Acc, N) -> + fast_acc(Fun, Fun(Acc), N-1). + +shuffle(List) when is_list(List) -> + [ N || {_R,N} <- lists:keysort(1, [{random:uniform(),X} || X <- List]) ]. + +pmap(Fun, List, ReturnNum) -> + N = if + ReturnNum > length(List) -> length(List); + true -> ReturnNum + end, + SuperParent = self(), + SuperRef = erlang:make_ref(), + Ref = erlang:make_ref(), + %% we spawn an intermediary to collect the results + %% this is so that there will be no leaked messages sitting in our mailbox + Parent = spawn(fun() -> + L = gather(N, length(List), Ref, []), + SuperParent ! {SuperRef, pmap_sort(List, L)} + end), + Pids = [spawn(fun() -> + Parent ! {Ref, {Elem, (catch Fun(Elem))}} + end) || Elem <- List], + Ret = receive + {SuperRef, Ret1} -> Ret1 + end, + % i think we need to cleanup here. + lists:foreach(fun(P) -> exit(P, die) end, Pids), + Ret. + +pmap_sort(Original, Results) -> + pmap_sort([], Original, lists:reverse(Results)). + +% pmap_sort(Sorted, [], _) -> lists:reverse(Sorted); +pmap_sort(Sorted, _, []) -> lists:reverse(Sorted); +pmap_sort(Sorted, [E|Original], Results) -> + case lists:keytake(E, 1, Results) of + {value, {E, Val}, Rest} -> pmap_sort([Val|Sorted], Original, Rest); + false -> pmap_sort(Sorted, Original, Results) + end. + +gather(_, Max, _, L) when length(L) == Max -> L; +gather(0, _, _, L) -> L; +gather(N, Max, Ref, L) -> + receive + {Ref, {Elem, {not_found, Ret}}} -> gather(N, Max, Ref, [{Elem, {not_found, Ret}}|L]); + {Ref, {Elem, {badrpc, Ret}}} -> gather(N, Max, Ref, [{Elem, {badrpc, Ret}}|L]); + {Ref, {Elem, {'EXIT', Ret}}} -> gather(N, Max, Ref, [{Elem, {'EXIT', Ret}}|L]); + {Ref, Ret} -> gather(N-1, Max, Ref, [Ret|L]) + end. + +get_hash_module(#config{hash_module=HashModule}) -> + HashModule. + +hash(Term) -> + HashModule = get_hash_module(configuration:get_config()), + ?prof(hash), + R = HashModule:hash(Term), + ?forp(hash), + R. + +hash(Term, Seed) -> + HashModule = get_hash_module(configuration:get_config()), + ?prof(hash), + R = HashModule:hash(Term, Seed), + ?forp(hash), + R. + +%32 bit fnv. magic numbers ahoy +fnv(Term) when is_binary(Term) -> + fnv_int(?OFFSET_BASIS, 0, Term); + +fnv(Term) -> + fnv_int(?OFFSET_BASIS, 0, term_to_binary(Term)). + +fnv_int(Hash, ByteOffset, Bin) when erlang:byte_size(Bin) == ByteOffset -> + Hash; + +fnv_int(Hash, ByteOffset, Bin) -> + <<_:ByteOffset/binary, Octet:8, _/binary>> = Bin, + Xord = Hash bxor Octet, + fnv_int((Xord * ?FNV_PRIME) rem (2 bsl 31), ByteOffset+1, Bin). + +position(Predicate, List) when is_function(Predicate) -> + position(Predicate, List, 1); + +position(E, List) -> + position(E, List, 1). + +position(Predicate, [], _N) when is_function(Predicate) -> false; + +position(Predicate, [E|List], N) when is_function(Predicate) -> + case Predicate(E) of + true -> N; + false -> position(Predicate, List, N+1) + end; + +position(_, [], _) -> false; + +position(E, [E|_List], N) -> N; + +position(E, [_|List], N) -> position(E, List, N+1). + +now_int() -> + time_to_epoch_int(now()). + +now_float() -> + time_to_epoch_float(now()). + +time_to_epoch_int(Time) when is_integer(Time) or is_float(Time) -> + Time; + +time_to_epoch_int({Mega,Sec,_}) -> + Mega * 1000000 + Sec. + +time_to_epoch_float(Time) when is_integer(Time) or is_float(Time) -> + Time; + +time_to_epoch_float({Mega,Sec,Micro}) -> + Mega * 1000000 + Sec + Micro / 1000000. + +byte_size(List) when is_list(List) -> + lists:foldl(fun(El, Acc) -> Acc + lib_misc:byte_size(El) end, 0, List); + +byte_size(Term) -> + erlang:byte_size(Term). + +listify(List) when is_list(List) -> + List; + +listify(El) -> [El]. + +reverse_bits(V) when is_integer(V) -> + % swap odd and even bits + V1 = ((V bsr 1) band 16#55555555) bor (((V band 16#55555555) bsl 1) band 16#ffffffff), + % swap consecutive pairs + V2 = ((V1 bsr 2) band 16#33333333) bor (((V1 band 16#33333333) bsl 2) band 16#ffffffff), + % swap nibbles ... + V3 = ((V2 bsr 4) band 16#0F0F0F0F) bor (((V2 band 16#0F0F0F0F) bsl 4) band 16#ffffffff), + % swap bytes + V4 = ((V3 bsr 8) band 16#00FF00FF) bor (((V3 band 16#00FF00FF) bsl 8) band 16#ffffffff), + % swap 2-byte long pairs + ((V4 bsr 16) band 16#ffffffff) bor ((V4 bsl 16) band 16#ffffffff). diff --git a/src/mem_utils.erl b/src/mem_utils.erl new file mode 100644 index 00000000..ffefd5cb --- /dev/null +++ b/src/mem_utils.erl @@ -0,0 +1,129 @@ +-module(mem_utils). + +-export([fix_mappings/3, get_remote_fullmap/1, join_type/3, pmap_from_full/1, + nodeparts_up/1, remove_partition/3, use_persistent/2, + was_i_nodedown/2]). + +-include("../include/common.hrl"). + +join_type(Node, Fullmap, Options) -> + case proplists:get_value(replace, Options) of + undefined -> + case lists:filter(fun({N,_P,_T}) -> N =:= Node end, Fullmap) of + [] -> new; + _ -> rejoin + end; + OldNode when is_atom(OldNode) -> + % not a particularly strong guard, but will have to do + {replace, OldNode}; + _ -> new + end. + + +%% @doc return a {PMap, Fullmap} tuple that has corrections for +%% down, rejoining, or replacing Node +fix_mappings(nodedown, Node, OldFullmap) -> + fix_mappings_fold(fun({N,P,T}, AccIn) -> + case {N,T} of + {Node, {nodedown, Type}} -> + % already marked as nodedown, so leave it + [{N,P, {nodedown, Type}} | AccIn]; + {Node, _} -> + % mark it as nodedown + [{N,P, {nodedown, T}} | AccIn]; + _ -> [{N,P,T} | AccIn] + end + end, [], OldFullmap); + +fix_mappings(rejoin, Node, OldFullmap) -> + fix_mappings_fold(fun({N,P,{nodedown,T}}, AccIn) when N =:= Node -> + [{N,P,T} | AccIn]; + (NPT, AccIn) -> [NPT | AccIn] + end, [], OldFullmap); + +fix_mappings(replace, {OldNode, NewNode}, OldFullmap) -> + fix_mappings_fold(fun({N,P,T}, AccIn) -> + case {N, T} of + {OldNode, {nodedown,T1}} -> [{NewNode,P,T1} | AccIn]; + {OldNode, _} -> [{NewNode,P,T} | AccIn]; + _ -> [{N,P,T} | AccIn] + end + end, [], OldFullmap). + + +fix_mappings_fold(Fun, Acc0, OldFullmap) -> + NewFullmap = lists:foldl(Fun, Acc0, OldFullmap), + NewPMap = pmap_from_full(NewFullmap), + {NewPMap, NewFullmap}. + + +%% @doc create a PMap (primary nodes only) from provided Fullmap +%% If a primary node is down, a partner will be supplied +pmap_from_full(Fullmap) -> + NodePartList = nodeparts_up(Fullmap), + lists:keysort(2,lists:foldl(fun({N,P,T}, AccIn) -> + case T of + primary -> [{N,P} | AccIn]; + {nodedown, primary} -> + NewNode = case lists:delete(N, + membership2:nodes_for_part(P, NodePartList)) of + [First|_] -> First; + [] -> N % wtf, are all partners down too? + end, + [{NewNode,P} | AccIn]; + _ -> AccIn + end + end, [], Fullmap)). + + +nodeparts_up(Fullmap) -> + lists:foldl(fun({_N,_P,{nodedown,_}}, AccIn) -> AccIn; + ({N,P,_T}, AccIn) -> [{N,P} | AccIn] + end, [], Fullmap). + + + +%% @doc if Node is in the Fullmap as {nodedown,_} return true +was_i_nodedown(Node, Fullmap) -> + lists:member(yes, lists:map(fun({N,_P,{nodedown,_T}}) -> + case N of + Node -> yes; + _ -> no + end; + (_) -> no + end, Fullmap)). + + +remove_partition(FullMap, Node, Partition) -> + case lists:filter( + fun({N,P,_Type}) -> N =:= Node andalso P =:= Partition end, + FullMap) of + [Elem|_] -> + lists:delete(Elem, FullMap); + Other -> + ?LOG_ERROR("~nNo partition to remove: ~p~n" + "Node: ~p~nPartition: ~p~n", [Other, Node, Partition]), + FullMap + end. + + +use_persistent(_PartnersPlus, undefined) -> + false; + +use_persistent(PartnersPlus, _PersistentParts) -> + % get a fullmap from a partner + % this may need rework for network partitions, as you could get a bad + % fullmap from another node that was partitioned w/ this one :\ + RemoteFullmap = get_remote_fullmap(PartnersPlus), + % return opposite of was_i_nodedown + not mem_utils:was_i_nodedown(node(), RemoteFullmap). + + +get_remote_fullmap([]) -> + []; % no remote fullmap available, so return empty list + +get_remote_fullmap([Node|Rest]) -> + case gen_server:call({membership, Node}, fullmap) of + {ok, Fullmap} -> Fullmap; + _ -> get_remote_fullmap(Rest) + end. diff --git a/src/membership2.erl b/src/membership2.erl new file mode 100644 index 00000000..4c4780c3 --- /dev/null +++ b/src/membership2.erl @@ -0,0 +1,686 @@ +%%%------------------------------------------------------------------- +%%% File: membership2.erl +%%% @author Cliff Moon <cliff@powerset.com> [] +%%% @copyright 2009 Cliff Moon +%%% @doc +%%% +%%% @end +%%% +%%% @since 2009-05-04 by Cliff Moon +%%%------------------------------------------------------------------- +-module(membership2). +-author('cliff@powerset.com'). +-author('brad@cloudant.com'). + +-behaviour(gen_server). + +%% API +-export([start_link/2, start_link/3, stop/1, check_nodes/0, + partitions/0, partition_for_key/1, fullmap/0, + all_nodes_parts/1, clock/0, + nodes/0, nodeparts_for_key/1, nodes_for_part/1, nodes_for_part/2, + nodes_for_shard/1, nodes_down/0, + parts_for_node/1, + take_offline/2, bring_online/2, + decommission_part/3, pp_fullmap/0, snafu/1, snafu/3]). + + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +%% includes +-include("../include/config.hrl"). +-include("../include/common.hrl"). +-include("../include/profile.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +%%==================================================================== +%% API +%%==================================================================== +%% @doc Starts the server +%% @end +%%-------------------------------------------------------------------- + +start_link(Node, Nodes) -> + start_link(Node, Nodes, []). + + +start_link(Node, Nodes, Args) -> + gen_server:start_link({local, membership}, ?MODULE, [Node, Nodes, Args], []). + + +stop(Server) -> + gen_server:cast(Server, stop). + + +%% @doc for when things have really gone south. Install a new state on all +%% nodes, given a filename, or node list, partition map, and fullmap. +%% @end +snafu(Filename) -> + NewState = case file:consult(Filename) of + {ok, [Terms]} -> + Terms; + Error -> + throw(Error) + end, + #membership{nodes=Nodes, partitions=PMap, fullmap=Fullmap} = NewState, + snafu(Nodes, PMap, Fullmap). + + +snafu(Nodes, PMap, Fullmap) -> + NewState = #membership{node=node(), nodes=Nodes, + partitions=PMap, fullmap=Fullmap, version=vector_clock:create(dbcore)}, + update_ets(ets_name(node()), NewState), + fire_gossip(node(), Nodes, NewState), + save(NewState). + + +check_nodes() -> + ErlangNodes = lists:usort([node() | erlang:nodes()]), + {ok, MemNodeList} = membership2:nodes(), + MemNodes = lists:usort(MemNodeList), + {PMapNodeList, _PMapPartList} = lists:unzip(partitions()), + PMapNodes = lists:usort(PMapNodeList), + case ErlangNodes =:= MemNodes andalso + ErlangNodes =:= PMapNodes andalso + MemNodes =:= PMapNodes of + true -> true; + _ -> + Msg = "membership: Node Lists do not match.~n" + "Erlang Nodes : ~p~n" + "Membership Nodes : ~p~n" + "PMap Nodes : ~p~n", + Lst = [ErlangNodes, MemNodes, PMapNodes], + showroom_log:message(error, Msg, Lst), + io:format(Msg, Lst), + false + end. + + +%% @doc retrieve the primary partition map. This is a list of partitions and +%% their corresponding primary node, no replication partner nodes. +partitions() -> + ets_pmap(). + + +%% @doc retrieve the full partition map, like above, but including replication +%% partner nodes. List should number 2^Q * N +fullmap() -> + lists:keysort(2, ets_fullmap()). + + +%% @doc pretty-print the full partition map (sorted by node, then part) +pp_fullmap() -> + lists:foreach( + fun({N,P}) -> + io:format("~-60s ~s~n", [N, showroom_utils:int_to_hexstr(P)]) + end, + lists:sort(membership2:all_nodes_parts(true))). + + +%% @doc get the current vector clock from membership state +clock() -> + gen_server:call(membership, clock). + + +%% @doc get the list of cluster nodes (according to membership module) +%% This may differ from erlang:nodes() +nodes() -> + gen_server:call(membership, nodes). + + +%% @doc get all the responsible nodes for a given partition, including +%% replication partner nodes +nodes_for_part(Part) -> + nodes_for_part(Part, all_nodes_parts(true)). + + +nodes_for_part(Part, NodePartList) -> + Filtered = lists:filter(fun({_N, P}) -> P =:= Part end, NodePartList), + {Nodes, _Parts} = lists:unzip(Filtered), + lists:usort(Nodes). + + +nodes_for_shard(ShardName) when is_binary(ShardName) -> + nodes_for_shard(binary_to_list(ShardName)); + +nodes_for_shard(ShardName) when is_list(ShardName) -> + HexPart = case string:rchr(ShardName, $_) + 1 of + 1 -> ShardName; + Last -> string:substr(ShardName, Last) + end, + Int = showroom_utils:hexstr_to_int(HexPart), + {_, Parts} = lists:unzip(membership2:partitions()), + nodes_for_part(partitions:int_to_partition(Int, Parts)). + + +%% @doc get all the responsible nodes and partitions for a given key, including +%% nodes/parts on replication partner nodes +nodeparts_for_key(Key) -> + int_node_parts_for_key(Key). + + +%% @doc get a list of all the nodes marked down in this node's fullmap +nodes_down() -> + Downs = lists:foldl(fun({N,_P,{nodedown, _T}}, AccIn) -> [N|AccIn]; + (_, AccIn) -> AccIn end, [], fullmap()), + lists:usort(Downs). + + +%% @doc return the partition responsible for the given Key +partition_for_key(Key) -> + Config = configuration:get_config(), + Hash = lib_misc:hash(Key), + partitions:hash_to_partition(Hash, Config#config.q). + + +%% @doc return the partitions that reside on a given node +parts_for_node(Node) -> + lists:sort(lists:foldl(fun({N,P,_Type}, AccIn) -> + case N of + Node -> [P | AccIn]; + _ -> AccIn + end + end, [], fullmap())). + + +%% @doc get all the nodes and partitions in the cluster. Depending on the +%% AllPartners param, you get only primary nodes or replication partner +%% nodes, as well. +%% No nodes/parts currently down are returned. +all_nodes_parts(false) -> + ets_pmap(); +all_nodes_parts(true) -> + mem_utils:nodeparts_up(ets_fullmap()). + + +%% @doc If a local storage server exists for this partition it will be taken +%% out of rotation until put back in. +%% @end +take_offline(Node, Partition) when Node =:= node() -> + gen_server:call(membership, {take_offline, Partition}); + +take_offline(Node, Partition)-> + gen_server:call({membership, Node}, {take_offline, Partition}). + + +%% @doc Brings a storage server that has been taken offline back online. +%% @end +bring_online(Node, Partition) -> + showroom_log:message(debug, "membership: bring_online Node: ~p Partition: ~p", + [Node, Partition]), + gen_server:call({membership, Node}, {bring_online, Partition}). + + +%% @doc cleans up the remaining .couch shard/partition file after it has been +%% moved to a new node. +decommission_part(Node, Part, DbName) -> + gen_server:cast({membership, Node}, {decommission, Part, DbName}). + + +%%==================================================================== +%% gen_server callbacks +%%==================================================================== + +%%-------------------------------------------------------------------- +%% @spec init(Args) -> {ok, State} | +%% {ok, State, Timeout} | +%% ignore | +%% {stop, Reason} +%% @doc Initiates the server +%% @end +%%-------------------------------------------------------------------- +init([Node, Nodes, Args]) -> + process_flag(trap_exit,true), + showroom_log:message(info, "membership: membership server starting...", []), + Options = lists:flatten(Args), + showroom_log:message(info, "membership: options ~p", [Options]), + net_kernel:monitor_nodes(true), + Config = configuration:get_config(), + PersistentState=#membership{partitions=PersistentParts} = load(Node), + PartnersPlus = replication:partners_plus(Node, Nodes), + State = + case mem_utils:use_persistent(PartnersPlus, PersistentParts) of + false -> + showroom_log:message(info, "membership: not using persisted state", []), + % didn't find persistent state on disk or this node was nodedown + % so we don't want to use persisted state + PartialNodes = lists:usort(Nodes), + {NewVersion, RemoteNodes, NewPMap1, NewFullMap1} = + join_to(Node, PartnersPlus, Options), + NewWorldNodes = lists:usort(PartialNodes ++ RemoteNodes), + NewPMap = case NewPMap1 of + [] -> partitions:create_partitions(Config#config.q, Node, + NewWorldNodes); + _ -> NewPMap1 + end, + NewFullMap = case NewFullMap1 of + [] -> make_all_nodes_parts(NewPMap); + _ -> NewFullMap1 + end, + #membership{ + node=Node, + nodes=NewWorldNodes, + partitions=lists:keysort(2,NewPMap), + % version=vector_clock:increment(dbcore, NewVersion), + version=NewVersion, + fullmap=NewFullMap}; + _ -> + % found persistent state on disk + showroom_log:message(info, "membership: using persisted state", []), + case Options of + [] -> ok; + _ -> + showroom_log:message(info, "membership: options ~p ignored.", [Options]) + end, + %% fire gossip even if state comes from disk + fire_gossip(Node, Nodes, PersistentState), + PersistentState + end, + save(State), + % ets table is an optimization for cluster_ops performance + Ets = ets:new(ets_name(Node), [public, set, named_table]), + update_ets(Ets, State), + {ok, State}. + + +%%-------------------------------------------------------------------- +%% @spec +%% handle_call(Request, From, State) -> {reply, Reply, State} | +%% {reply, Reply, State, Timeout} | +%% {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, Reply, State} | +%% {stop, Reason, State} +%% @doc Handling call messages +%% @end +%%-------------------------------------------------------------------- + +%% join +handle_call({join, JoiningNode, Options}, _From, + State = #membership{version=Version, node=Node, nodes=Nodes, + partitions=Partitions, fullmap=OldFullMap}) -> + JoinType = mem_utils:join_type(JoiningNode, OldFullMap, Options), + showroom_log:message(alert, "membership: node ~p wants to join, type '~p'", + [JoiningNode, JoinType]), + {PMap, NewFullmap} = case JoinType of + rejoin -> + mem_utils:fix_mappings(rejoin, JoiningNode, OldFullMap); + {replace, OldNode} -> + mem_utils:fix_mappings(replace, {OldNode, JoiningNode}, OldFullMap); + new -> + Hints = proplists:get_value(hints, Options), + PMap1 = case partitions:join(JoiningNode, Partitions, Hints) of + {ok, Table} -> Table; + {error, Error, _Table} -> throw({join_error, Error}) + end, + Fullmap1 = make_all_nodes_parts(PMap1), + {PMap1, Fullmap1} + end, + WorldNodes = lists:usort(Nodes ++ [JoiningNode]), + NewVersion = vector_clock:increment(dbcore, Version), + NewState1 = State#membership{nodes=WorldNodes, partitions=PMap, + version=NewVersion}, + {Fullmap, NewState2} = case proplists:get_value(bootstrap, Options) of + true -> + % join not complete until bootstrap finishes, + % so this NewState isn't the final (i.e. NewState1 will be installed) + showroom_log:message(info, "membership: bootstrap process starting", []), + bootstrap_manager:start_bootstrap(NewState1, OldFullMap, NewFullmap); + _ -> + % no bootstrap, so install NewFullmap now + showroom_log:message(info, "membership: no bootstrap", []), + {NewFullmap, NewState1#membership{fullmap=NewFullmap}} + end, + save(NewState2), + update_ets(ets_name(node()), NewState2), + notify(node_join, [JoiningNode]), + fire_gossip(Node, WorldNodes, NewState2), + % If we're bootstrapping, then the join is not complete. + % So return FullMap for now. bootstrap_manager:end_bootstrap will fix it + {reply, {ok, NewVersion, WorldNodes, PMap, Fullmap}, NewState2}; + +%% clock +handle_call(clock, _From, State = #membership{version=Version}) -> + {reply, Version, State}; + +%% state +handle_call(state, _From, State) -> + {reply, State, State}; + +%% newfullmap +handle_call({newfullmap, NewFullMap}, _From, + State = #membership{node=Node, nodes=Nodes, version=Version}) -> + NewVersion = vector_clock:increment(dbcore, Version), + NewState = State#membership{version=NewVersion, fullmap=NewFullMap}, + save(NewState), + update_ets(ets_name(node()), NewState), + fire_gossip(Node, Nodes, NewState), + {reply, installed, NewState}; + +%% partitions +handle_call(partitions, _From, State = #membership{partitions=Parts}) -> + {reply, {ok, Parts}, State}; + +%% fullmap +handle_call(fullmap, _From, State = #membership{fullmap=FullMap}) -> + {reply, {ok, FullMap}, State}; + +%% nodes +handle_call(nodes, _From, State = #membership{nodes=Nodes}) -> + {reply, {ok, Nodes}, State}; + +%% take_offline +handle_call({take_offline, Partition}, _From, + State = #membership{node=Node, nodes=Nodes, fullmap=OldFullMap}) -> + showroom_log:message(info, "membership: take_offline Node: ~p Partition: ~p", + [Node, Partition]), + NewFullMap = mem_utils:remove_partition(OldFullMap, Node, Partition), + NewState = State#membership{fullmap=NewFullMap}, + fire_gossip(Node, Nodes, NewState), + update_ets(ets_name(node()), NewState), + {reply, {offline, Node, Partition}, NewState}; + +%% at least reply that this 'catch-all' was ignored +handle_call(_Request, _From, State) -> + {reply, ignored, State}. + + +%%-------------------------------------------------------------------- +%% @spec handle_cast(Msg, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% @doc Handling cast messages +%% @end +%%-------------------------------------------------------------------- + +handle_cast({gossip, RemoteState = #membership{node=RemoteNode}}, + LocalState = #membership{node=_Me}) -> + showroom_log:message(info, "membership: received gossip from ~p", + [RemoteNode]), + {MergeType, MergedState = #membership{nodes=_MergedNodes}} = + merge_state(RemoteState, LocalState), + case MergeType of + equal -> {noreply, MergedState}; + merged -> + showroom_log:message(info, "membership: merged new gossip", []), + % fire_gossip(Me, MergedNodes, MergedState), + update_ets(ets_name(node()), MergedState), + save(MergedState), + {noreply, MergedState} + end; + +% decommission +% renaming for now, until case 1245 can be completed +handle_cast({decommission, Part, DbName}, State) -> + {{Y,Mon,D}, {H,Min,S}} = calendar:universal_time(), + Directory = couch_config:get("couchdb", "database_dir"), + OrigFilename = showroom_utils:full_filename(Part, DbName, Directory), + Moved = lists:flatten(io_lib:format(".~w~2.10.0B~2.10.0B." ++ + "~2.10.0B~2.10.0B~2.10.0B.moved.couch", [Y,Mon,D,H,Min,S])), + % Note: this MovedFilename bit below gives weird results: + % ["/Users/brad/dev/erlang/dbcore/tmp/lib/x800000/test_800000", + % ".20091001.162640.moved.couch"] but list/string behavior handles it. + MovedFilename = lists:map(fun(E) -> binary_to_list(E) end, + re:replace(OrigFilename, "\.couch", Moved, [])), + ok = file:rename(OrigFilename, MovedFilename), + {noreply, State}. + + +%% @doc handle nodedown messages because we have +%% net_kernel:monitor_nodes(true) +handle_info({nodedown, Node}, + State = #membership{nodes=OldNodes, fullmap=OldFullmap, + version=OldVersion}) -> + showroom_log:message(alert, "membership: nodedown from ~p", [Node]), + case lists:member(Node, OldNodes) of + true -> + notify(nodedown, [Node]), + % clean up membership state + Nodes = lists:delete(Node, OldNodes), + {PMap, Fullmap} = mem_utils:fix_mappings(nodedown, Node, OldFullmap), + % Do we increment clock here? w/o gossip? + % This is happening near simultaneously on the other nodes, too :\ + % Only reason to increment is persisted clock on down node will be older + % when it returns + Version = vector_clock:increment(dbcore, OldVersion), + NewState = State#membership{nodes=Nodes, partitions=PMap, fullmap=Fullmap, + version=Version}, + update_ets(ets_name(node()), NewState), + save(NewState), + {noreply, NewState}; + _ -> {noreply, State} + end; + +%% @doc handle nodeup messages because we have +%% net_kernel:monitor_nodes(true) +handle_info({nodeup, Node}, State) -> + showroom_log:message(alert, "membership: nodeup Node: ~p", [Node]), + {noreply, State}; + +handle_info(Info, State) -> + showroom_log:message(info, "membership: handle_info Info: ~p", [Info]), + {noreply, State}. + +%%-------------------------------------------------------------------- +%% @spec terminate(Reason, State) -> void() +%% @doc This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any necessary +%% cleaning up. When it returns, the gen_server terminates with Reason. +%% The return value is ignored. +%% @end +%%-------------------------------------------------------------------- +terminate(_Reason, _State) -> + ok. + +%% 0.5.6 to 0.5.7 +code_change(184380560337424323902805568963460261434, State, _Extra) -> + backup_old_config_file(), + % update State to the new version + {membership, _Hdr, Node, Nodes, PMap, Version} = State, + NewState = #membership{ + node = Node, + nodes = Nodes, + partitions = PMap, + version = Version, + fullmap = make_all_nodes_parts(PMap) + }, + save(NewState), + % also create new ets table + Ets = ets:new(ets_name(Node), [public, set, named_table]), + update_ets(Ets, NewState), + {ok, NewState}; + +%% 0.8.8 to 0.9.0 +code_change(239470595681156900105628017899543243419, State, _Extra) -> + net_kernel:monitor_nodes(true), + {ok, State}; + +code_change(OldVsn, State, _Extra) -> + io:format("Unknown Old Version!~nOldVsn: ~p~nState : ~p~n", [OldVsn, State]), + {ok, State}. + +%%-------------------------------------------------------------------- +%%% Internal functions +%%-------------------------------------------------------------------- + +backup_old_config_file() -> + Config = configuration:get_config(), + FileName = filename:join([Config#config.directory, + lists:concat([node:name(node()), ".state"])]), + BackupName = filename:join([Config#config.directory, + lists:concat([node:name(node()), ".state.bak"])]), + file:copy(FileName, BackupName). + + +%% return State from membership file +load(Node) -> + Config = configuration:get_config(), + case file:consult(filename:join([Config#config.directory, + lists:concat([node:name(Node), ".state"])])) of + {error, Reason} -> + showroom_log:message(info, "membership: could not load state: ~p~n", + [Reason]), + #membership{nodes=[]}; + {ok, [Terms]} -> + Terms + end. + + +%% save the State to a file +save(State) -> + Config = configuration:get_config(), + Filename = filename:join([Config#config.directory, + lists:concat([node:name(State#membership.node), ".state"])]), + {ok, File} = file:open(Filename, [binary, write]), + io:format(File, "~w.~n", [State]), + file:close(File). + + +%% joining is bi-directional, as opposed to gossip which is unidirectional +%% we want to collect the list of known nodes to compute the partition map +%% which isn't necessarily the same as the list of running nodes +join_to(Node, Partners, Options) -> + join_to(Node, Partners, + {vector_clock:create(dbcore), [], [], []}, Options). + + +%% @doc join this node to one of its partners (or PartnersPlus if no partners +%% are available). +join_to(_, [], {Version, World, PMap, FullMap}, _Options) -> + {Version, World, PMap, FullMap}; + +join_to(Node, [Partner|Rest], {Version, World, PMap, FullMap}, Options) -> + case call_join(Partner, Node, Options) of + {ok, RemoteVersion, NewNodes, NewPMap, NewFullMap} -> + {vector_clock:merge(Version, RemoteVersion), + lists:usort(World ++ NewNodes), + NewPMap, + NewFullMap}; + Other -> + showroom_log:message(info, "membership: join_to Other: ~p~n", [Other]), + join_to(Node, Rest, {Version, World, PMap, FullMap}, Options) + end. + + +%% @doc make the join call to Remote node (usually a partner of Node) +call_join(Remote, Node, Options) -> + showroom_log:message(info, "membership: call_join From: ~p To: ~p", + [Node, Remote]), + catch gen_server:call({membership, node:name(Remote)}, + {join, Node, Options}). + + +merge_state(_RemoteState=#membership{version=RemoteVersion, nodes=RemoteNodes, + partitions=RemotePMap, + fullmap=RemoteFullMap}, + LocalState=#membership{version=LocalVersion, nodes=LocalNodes, + partitions=LocalPMap, + fullmap=LocalFullMap}) -> + case vector_clock:equals(RemoteVersion, LocalVersion) of + true -> + {equal, LocalState}; + false -> + % Note, we're matching MergedVersion from these funs. + % They should be the same. + {MergedVersion, MergedNodes} = + merge_nodes(RemoteVersion, RemoteNodes, LocalVersion, LocalNodes), + {MergedVersion, MergedPMap} = + merge_pmaps(RemoteVersion, RemotePMap, LocalVersion, LocalPMap), + {MergedVersion, MergedFullMap} = + merge_fullmaps(RemoteVersion, RemoteFullMap, + LocalVersion, LocalFullMap), + + % notify of arrivals & departures + Arrived = MergedNodes -- LocalNodes, + notify(node_join, Arrived), + % Departed = LocalNodes -- MergedNodes, + % notify(node_leave, Departed), + + {merged, LocalState#membership{version=MergedVersion, nodes=MergedNodes, + partitions=MergedPMap, + fullmap=MergedFullMap}} + end. + + +merge_nodes(RemoteVersion, RemoteNodes, LocalVersion, LocalNodes) -> + {MergedVersion, Merged} = vector_clock:resolve({RemoteVersion, RemoteNodes}, + {LocalVersion, LocalNodes}), + {MergedVersion, lists:usort(Merged)}. + + +merge_pmaps(RemoteVersion, RemotePMap, LocalVersion, LocalPMap) -> + {MergedVersion, Merged} = vector_clock:resolve({RemoteVersion, RemotePMap}, + {LocalVersion, LocalPMap}), + {MergedVersion, lists:ukeysort(2, Merged)}. + + +merge_fullmaps(RemoteVersion, RemoteFullMap, LocalVersion, LocalFullMap) -> + {MergedVersion, Merged} = vector_clock:resolve({RemoteVersion, RemoteFullMap}, + {LocalVersion, LocalFullMap}), + {MergedVersion, lists:usort(Merged)}. + + +notify(Type, Nodes) -> + lists:foreach(fun(Node) -> + gen_event:notify(membership_events, {Type, Node}) + end, Nodes). + + +%% @doc fires a gossip message (membership state) to partners nodes in the +%% cluster. +%% @end +fire_gossip(Me, WorldNodes, Gossip) -> + % GossipPartners = partners_plus(Me, WorldNodes), + % random experiment, gossip with all nodes, not just partners_plus + GossipPartners = lists:delete(Me, WorldNodes), + lists:foreach(fun(TargetNode) -> + showroom_log:message(info, "membership: firing gossip from ~p to ~p", + [Me, TargetNode]), + gen_server:cast({membership, TargetNode}, {gossip, Gossip}) + end, GossipPartners). + + +%% @doc construct a table with all partitions, with the primary node and all +%% replication partner nodes as well. +make_all_nodes_parts(PMap) -> + {Nodes, _Parts} = lists:unzip(PMap), + NodeParts = lists:flatmap( + fun({Node,Part}) -> + Partners = replication:partners(Node, lists:usort(Nodes)), + PartnerList = [{Partner, Part, partner} || Partner <- Partners], + [{Node, Part, primary} | PartnerList] + end, PMap), + NodeParts. + + +%% @doc for the given key, return a list of {Node,Part} tuples. Nodes are both +%% primary and replication partner nodes, and should number N. +int_node_parts_for_key(Key) -> + Config = configuration:get_config(), + Hash = lib_misc:hash(Key), + Part = partitions:hash_to_partition(Hash, Config#config.q), + NodePartList = all_nodes_parts(true), + lists:filter(fun({_N,P}) -> P =:= Part end, NodePartList). + + +%% ets table helper functions +ets_name(Node) -> + list_to_atom(lists:concat(["mem_", atom_to_list(Node)])). + + +update_ets(Table, #membership{partitions=PMap, fullmap=FullMap}) -> + ets:insert(Table, {pmap, PMap}), + ets:insert(Table, {fullmap, FullMap}), + ok. + + +ets_pmap() -> + [{pmap, PMap}] = ets:lookup(ets_name(node()), pmap), + PMap. + + +ets_fullmap() -> + [{fullmap, FullMap}] = ets:lookup(ets_name(node()), fullmap), + FullMap. diff --git a/src/node.erl b/src/node.erl new file mode 100644 index 00000000..9a9c82c1 --- /dev/null +++ b/src/node.erl @@ -0,0 +1,39 @@ +%%%------------------------------------------------------------------- +%%% File: node.erl +%%% @author Cliff Moon <> [] +%%% @copyright 2009 Cliff Moon +%%% @doc +%%% +%%% @end +%%% +%%% @since 2009-05-11 by Cliff Moon +%%%------------------------------------------------------------------- +-module(node). +-author('cliff@powerset.com'). + +%% API +-export([name/1, attributes/1]). + +-include("../include/common.hrl"). + +%% -ifdef(TEST). +%% -include("../etest/node_test.erl"). +%% -endif. + +%%==================================================================== +%% API +%%==================================================================== + +name(Name) when is_atom(Name) -> + Name; +name(Node) when is_tuple(Node) -> + element(1, Node); +name(Node) -> + Node. + +attributes(Name) when is_atom(Name) -> + []; +attributes(Node) when is_tuple(Node) -> + element(2, Node); +attributes(_) -> + []. diff --git a/src/partitions.erl b/src/partitions.erl new file mode 100644 index 00000000..942968e1 --- /dev/null +++ b/src/partitions.erl @@ -0,0 +1,334 @@ +%%%------------------------------------------------------------------- +%%% File: partitions.erl +%%% @author Cliff Moon <cliff@powerset.com> [http://www.powerset.com/] +%%% @copyright 2008 Cliff Moon +%%% @doc +%%% +%%% @end +%%% +%%% @since 2008-10-12 by Cliff Moon +%%%------------------------------------------------------------------- +-module(partitions). +-author('cliff@powerset.com'). + +%% API +-export([partition_range/1, create_partitions/3, map_partitions/2, + diff/2, pp_diff/1, int_to_partition/2, + join/3, leave/3, hash/1, hash_to_partition/2, item_to_nodepart/1, + shard_name/2, hash_to_hex/2]). + +-define(RINGTOP, trunc(math:pow(2,160)-1)). % SHA-1 space + +-include("../../couch/src/couch_db.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +%% -ifdef(TEST). +%% -include("etest/partitions_test.erl"). +%% -endif. + +%%==================================================================== +%% API +%%==================================================================== + +partition_range(Q) -> + trunc( ?RINGTOP / math:pow(2,Q) ). % SHA-1 space / 2^Q + +create_partitions(Q, Node, _Nodes) -> + fresh(trunc(math:pow(2,Q)), Node). + % map_partitions(Table, Nodes). + + +%% @spec map_partitions(Table::proplist(),Nodes::list()) -> proplist() +%% @doc maps partitions to nodes. The resulting list should be Dynomite format, +%% namely {Node,Part} +%% @end +map_partitions(Table, Nodes) -> + {_Nodes, Parts} = lists:unzip(Table), + do_map(Nodes, Parts). + + +%% @doc in case Hints is undefined, turn it into a list for clauses below. +join(Node, Table, undefined) -> + join(Node, Table, []); + +%% @spec join(node(), proplist(), list()) -> {ok, PartTable::proplist()} | +%% {error, Error} +%% @doc given a node, current partition table, and hints, this function returns +%% the new partition table +join(Node, Table, Hints) -> + {NodeList, Parts} = lists:unzip(Table), + OtherNodes = lists:delete(Node, NodeList), + OtherDistinctNodes = lists:usort(OtherNodes), + %% quick check to see if we have more nodes than partitions + if + length(Parts) == length(OtherDistinctNodes) -> + {error, "Too many nodes vs partitions", Table}; + true -> + AlreadyPresent = length(NodeList) - length(OtherNodes), + Nodes = lists:usort(NodeList), + PartCountToTake = trunc(length(Parts) / (length(Nodes) + 1)), + %% calcs done, let's steal some partitions + {HintsTaken, NewTable} = steal_hints(Node, Table, Hints), + if + PartCountToTake - AlreadyPresent - HintsTaken > 0 -> + steal_partitions(Node, OtherDistinctNodes, NewTable, + PartCountToTake - AlreadyPresent - HintsTaken); + true -> + %% no partitions to take + {ok, NewTable} + end + end. + + +%% TODO: implement me +leave(_Node, Table, _Hints) -> + Table. + + +diff(From, To) when length(From) =/= length(To) -> + {error, badlength, "Cannot diff partition maps with different length"}; + +diff(From, To) -> + diff(sort_for_diff(From), sort_for_diff(To), []). + + +pp_diff(Diff) -> + lists:map( + fun({F,T,Part}) -> {F,T,showroom_utils:int_to_hexstr(Part)} end, + Diff). + + +%% @spec hash(term()) -> Digest::binary() +%% @doc Showroom uses SHA-1 as its hash +hash(Item) -> + crypto:sha(term_to_binary(Item)). + + +%% @spec hash_to_partition(binary(), integer()) -> integer() +%% @doc given a hashed value and Q, return the partition +hash_to_partition(Hash, Q) -> + HashInt = hash_int(Hash), + Size = partition_range(Q), + Factor = (HashInt div Size), + Rem = (HashInt rem Size), + if + Rem > 0 -> Factor * Size; + true -> ((Factor-1) * Size) + end. + + +hash_to_hex(Hash, Q) -> + Part = hash_to_partition(Hash, Q), + showroom_utils:int_to_hexstr(Part). + + +%% @doc given an int and a list of partitions, get the first part greater +%% than Int. Used for a hex part being turned back into an int. +int_to_partition(Int, Parts) -> + Rem = lists:dropwhile(fun(E) -> E < Int end, lists:sort(Parts)), + case Rem of + [] -> 0; % wrap-around-ring case (back to 0) + [H|_T] -> H + end. + + +%% @spec item_to_nodepart(bin()) -> {Node::node(),Part::integer()} +%% @doc given a raw item, return the node/partition/shard +%% name based on consistent hashing +item_to_nodepart(Item) when is_binary(Item) -> + Q = list_to_integer(couch_config:get("cluster","q")), + Hash = hash(?b2l(Item)), + Part = hash_to_partition(Hash, Q), + {ok, Table} = membership2:partitions(), + lists:keyfind(Part, 2, Table); + +item_to_nodepart(Item) -> + item_to_nodepart(term_to_binary(Item)). + + +%% @spec shard_name(integer(), binary()) -> binary() +%% @doc create shard name +shard_name(Part, DbName) -> + PartHex = ?l2b(showroom_utils:int_to_hexstr(Part)), + <<"x", PartHex/binary, "/", DbName/binary, "_", PartHex/binary>>. + +%%==================================================================== +%% Internal functions +%%==================================================================== + +%% @doc Create a brand new table. The size and seednode are specified; +%% initially all partitions are owned by the seednode. If NumPartitions +%% is not much larger than the intended eventual number of +%% participating nodes, then performance will suffer. +%% from http://code.google.com/p/distributerl (trunk revision 4) chash:fresh/2 +%% @spec fresh(NumPartitions :: integer(), SeedNode :: node()) -> table() +fresh(NumPartitions, SeedNode) -> + Increment = ?RINGTOP div NumPartitions, + [{SeedNode, IndexAsInt} || IndexAsInt <- lists:seq(0,(?RINGTOP-1),Increment)]. + + +%% @spec steal_hints(node(), proplist(), list( integer() )) -> +%% {integer(), proplist()} +%% @doc move the partitions listed in Hints over to the new owner, Node +steal_hints(Node, Table, Hints) -> + steal_hints(Node, Table, Hints, 0). + + +%% @doc recursive workhorse for hints mechanism, Acc is tracking how many +%% hints/partitions were successfully moved to a new Node. +%% @end +steal_hints(_Node, Table, [], Acc) -> + {Acc, Table}; + +steal_hints(Node, Table, [Hint|RestHints], Acc) -> + {Status, NewTable} = swap_node_for_part(Node, Hint, Table), + Acc1 = case Status of + ok -> Acc+1; + _ -> Acc + end, + steal_hints(Node, NewTable, RestHints, Acc1). + + +%% @doc take a part from one of the other nodes based on most # of parts per +%% node. +%% @end +%% TODO: This fun does list ops on the Table each time through. Inefficient? +%% Hopefully not, due to small Table sizes +steal_partitions(_Node, _OtherNodes, Table, 0) -> + {ok, Table}; +steal_partitions(Node, OtherNodes, Table, Count) -> + %% first, get a list of OtherNodes and their partition counts + NPCountFun = fun(N) -> + L = proplists:get_all_values(N, Table), + {N, length(lists:delete(undefined, L))} + end, + NPCounts = lists:reverse(lists:keysort(2,lists:map(NPCountFun, OtherNodes))), + %% grab the node that has the most partitions + [{TakeFrom, _PartsCount}|_RestOfTable] = NPCounts, + %% get the highest # partition of the TakeFrom node + TakeFromParts = lists:reverse(lists:sort(proplists:get_all_values(TakeFrom, + Table))), + [Part|_RestOfParts] = TakeFromParts, + {ok, NewTable} = swap_node_for_part(Node, Part, Table), + steal_partitions(Node, OtherNodes, NewTable, Count-1). + + +%% @doc Make Node the owner of the partition beginning at Part. +%% from http://code.google.com/p/distributerl (trunk revision 4) chash:update/3 +swap_node_for_part(Node, Part, Table) -> + case lists:keymember(Part, 2, Table) of + true -> + GapList = [{N,P} || {N,P} <- Table, P /= Part], + {A, B} = lists:partition(fun({_,K1}) -> K1 < Part end, GapList), + {ok, A ++ [{Node, Part}] ++ B}; + false -> + showroom_log:message(info, + "'~p' partition was not found in partition table", [Part]), + {noswap, Table} + end. + + +%% @doc get the difference between two FullPMaps +%% lists need to be sorted by part, then node +diff([], [], Results) -> + lists:reverse(remove_dupes(Results)); + +diff([{Node,Part,_}|PartsA], [{Node,Part,_}|PartsB], Results) -> + diff(PartsA, PartsB, Results); + +diff([{NodeA,Part,_}|PartsA], [{NodeB,Part,_}|PartsB], Results) -> + diff(PartsA, PartsB, [{NodeA,NodeB,Part}|Results]). + + +%% @doc sorts the full map for diff/3. This may change to get more accurate +%% diff w/o dupes +sort_for_diff(FullMap) -> + lists:keysort(2,lists:sort(FullMap)). + + +remove_dupes(Diff) -> + {_,_,AllParts} = lists:unzip3(Diff), + Parts = lists:usort(AllParts), + remove_dupes_from_part(Parts, Diff, []). + + +%% @doc ex: take [{a,b,1},{b,c,1}] diff and make it [{a,c,1}] so we don't go +%% moving unnecessary shard files. 'Move partition 1 from a to b and +%% then move partition 1 from b to c' is unnecessary. Just move it a to c. +remove_dupes_from_part([], _Diff, Acc) -> + Acc; + +remove_dupes_from_part([Part|Rest], Diff, Acc) -> + PartData = lists:filter(fun({_,_,P}) -> P =:= Part end, Diff), + NewPartData = process_part_data(Part, PartData, PartData, PartData), + remove_dupes_from_part(Rest, Diff, lists:concat([NewPartData, Acc])). + + +%% for one partition of the full diff, remove the dupes +process_part_data(_Part, _PartData, [], Acc) -> + Acc; + +process_part_data(Part, PartData, [{From,To,_Part}|Rest], Acc) -> + case proplists:lookup(To, PartData) of + {To, NewTo, _Part} -> + + Remove1 = proplists:delete(To, PartData), + Remove2 = proplists:delete(From, Remove1), + NewPartData = [{From, NewTo, Part}|Remove2], + %?debugFmt("~nFrom : ~p~nTo : ~p~nNewTo: ~p~n" + % "Remove1: ~p~nRemove2: ~p~n" + % "NewPartData: ~p~n" + % , [From, To, NewTo, Remove1, Remove2, NewPartData]), + process_part_data(Part, NewPartData, Rest, NewPartData); + none -> + process_part_data(Part, PartData, Rest, Acc) + end. + + +% %% @doc from dynomite +% diff([], [], Results) -> +% lists:reverse(Results); + +% diff([{Node,Part}|PartsA], [{Node,Part}|PartsB], Results) -> +% diff(PartsA, PartsB, Results); + +% diff([{NodeA,Part}|PartsA], [{NodeB,Part}|PartsB], Results) -> +% diff(PartsA, PartsB, [{NodeA,NodeB,Part}|Results]). + + +%% @doc does Node/Partition mapping based on Amazon Dynamo paper, +%% section 6.2, strategy 3, more or less +%% http://www.allthingsdistributed.com/2007/10/amazons_dynamo.html +%% @end +do_map([Node|RestNodes], Parts) -> + Max = length(Parts) / length([Node|RestNodes]), + do_map(Node, RestNodes, Parts, [], 1, Max). + + +%% return final mapped list +do_map(_,_,[],Mapped, _, _) -> + lists:keysort(1, Mapped); + +%% finish off last node, Cnt & Max no longer needed +do_map(Node, [], [Part|RestParts], Mapped, _, _) -> + do_map(Node, [], RestParts, [{Node, Part}|Mapped], 0,0); + +%% workhorse clause, iterates through parts, until Cnt > Max, then advances to +%% next node, wash, rinse, repeat +do_map(Node, [NextNode|RestNodes], [Part|RestParts], Mapped, Cnt, Max) -> + case Cnt > Max of + true -> + do_map(NextNode, RestNodes, RestParts, [{Node, Part}|Mapped], + 1, Max); + false -> + do_map(Node, [NextNode|RestNodes], RestParts, [{Node, Part}|Mapped], + Cnt+1, Max) + end. + + +%% TODO: other guards +hash_int(Hash) when is_binary(Hash) -> + <<IndexAsInt:160/integer>> = Hash, + IndexAsInt; +hash_int(Hash) when is_integer(Hash) -> + Hash. diff --git a/src/replication.erl b/src/replication.erl new file mode 100644 index 00000000..96be0ad3 --- /dev/null +++ b/src/replication.erl @@ -0,0 +1,165 @@ +%%%------------------------------------------------------------------- +%%% File: replication.erl +%%% @author Brad Anderson <brad@cloudant.com> [http://www.cloudant.com] +%%% @copyright 2009 Brad Anderson +%%% @doc +%%% +%%% @end +%%% +%%% @since 2009-06-14 by Brad Anderson +%%%------------------------------------------------------------------- +-module(replication). +-author('brad@cloudant.com'). + +%% API +-export([partners/2, partners/3, partners_plus/2]). + +-include_lib("eunit/include/eunit.hrl"). +-include("../include/config.hrl"). +-include("../include/common.hrl"). + + +%%==================================================================== +%% API +%%==================================================================== + +partners(Node, Nodes) -> + partners(Node, Nodes, configuration:get_config()). + + +%%-------------------------------------------------------------------- +%% @spec partners(Node::atom(), Nodes::list(), Config::config()) -> +%% list() +%% @doc returns the list of all replication partners for the specified node +%% @end +%%-------------------------------------------------------------------- +partners(Node, Nodes, Config) -> + N = Config#config.n, + Meta = Config#config.meta, + pick_partners(Meta, Node, Nodes, [], N - 1). + + +%% return a list of live/up Partners, and if all Partners are down, +%% walk the ring to get one other remote node and return it. +partners_plus(Node, Nodes) -> + Partners = partners(Node, Nodes), + PartnersDown = lists:subtract(Partners, erlang:nodes()), + PartnersUp = lists:subtract(Partners, PartnersDown), + case PartnersUp of + [] -> + TargetNodes = target_list(Node, Nodes), + NonPartners = lists:subtract(TargetNodes, + lists:flatten([Node, Partners])), + walk_ring(NonPartners); + _ -> + %% at least one partner is up, so gossip w/ them + PartnersUp + end. + + +%%==================================================================== +%% Internal functions +%%==================================================================== + +%% @spec pick_partners(proplist(), Node::dynomite_node(), [Node], [Node], +%% integer()) -> list() +%% @doc iterate through N-1 partner picks, returning the resulting list sorted +pick_partners(_Meta, Node, _Nodes, Acc, 0) -> + lists:sort(lists:delete(Node, Acc)); +pick_partners(Meta, Node, Nodes, Acc, Count) -> + Partner = pick_partner(Meta, Node, Nodes, Acc, 1), + NewNodes = lists:filter(fun(Elem) -> + case Elem of + no_partner_found -> false; + Partner -> false; + _ -> true + end + end, Nodes), + NewAcc = case Partner of + no_partner_found -> Acc; + _ -> [Partner|Acc] + end, + pick_partners(Meta, Node, NewNodes, NewAcc, Count-1). + + +%% @spec pick_partner(proplist(), Node::dynomite_node(), [Node], [Node], +%% integer()) -> Node::dynomite_node() +%% @doc pick a specific replication partner at the given level +pick_partner([], Node, Nodes, _Acc, 1) -> + %% handle the no metadata situation + %% Note: This clause must be before the Level > length(Meta) guarded clause + target_key(node:name(Node), lists:map(fun node:name/1, Nodes), roundrobin); + +pick_partner(Meta, _Node, _Nodes, Acc, Level) when Level > length(Meta) -> + Acc; + +pick_partner(Meta, Node, Nodes, Acc, Level) -> + MetaDict = meta_dict(Nodes, Level, dict:new()), + NodeKey = lists:sublist(node:attributes(Node), Level), + Keys = dict:fetch_keys(MetaDict), + {_MetaName, Strategy} = lists:nth(Level, Meta), + TargetKey = target_key(NodeKey, Keys, Strategy), + Candidates = dict:fetch(TargetKey, MetaDict), + case length(Candidates) of + 0 -> + %% didn't find a candidate + no_partner_found; + 1 -> + %% found only one candidate, return it + [Partner] = Candidates, + Partner; + _ -> + pick_partner(Meta, Node, Nodes, Acc, Level + 1) + end. + + +%% @doc construct a dict that holds the key of metadata values so far (up to +%% the current level, and dynomite_node() list as the value. This is used +%% to select a partner in pick_partner/5 +%% @end +meta_dict([], _Level, Dict) -> + Dict; + +meta_dict([Node|Rest], Level, Dict) -> + Key = lists:sublist(node:attributes(Node), Level), + DictNew = dict:append(Key, Node, Dict), + meta_dict(Rest, Level, DictNew). + + +%% @spec target_key(term(), list(), Strategy::atom()) -> term() +%% @doc given the key and keys, sort the list of keys based on stragety (i.e. +%% for roundrobin, sort them, put the NodeKey on the end of the list, and +%% then return the head of the list as the target. +%% @end +%% TODO: moar strategies other than roundrobin? +target_key(NodeKey, Keys, roundrobin) -> + SortedKeys = lists:sort(Keys), + TargetKey = case target_list(NodeKey, SortedKeys) of + [] -> no_partner_found; + [Key|_Rest] -> Key + end, + TargetKey. + + +%% @spec target_list(term(), list()) -> list() +%% @doc split the list of keys into 'lessthan NodeKey', NodeKey, and 'greaterthan +%% Nodekey' and then put the lessthan section on the end of the list +%% @end +target_list(_NodeKey, []) -> + []; +target_list(NodeKey, Keys) -> + {A, [NodeKey|B]} = lists:splitwith(fun(K) -> K /= NodeKey end, Keys), + lists:append([B, A, [NodeKey]]). + + +walk_ring([]) -> + %% TODO: should we be more forceful here and throw? not for now + showroom_log:message(info, + "~p:walk_ring/1 - could not find node for gossip", [?MODULE]), + []; + +walk_ring([Node|Rest]) -> + case lists:member(Node, erlang:nodes()) of + true -> [Node]; + _ -> walk_ring(Rest) + end. diff --git a/src/vector_clock.erl b/src/vector_clock.erl new file mode 100644 index 00000000..0a89d41e --- /dev/null +++ b/src/vector_clock.erl @@ -0,0 +1,99 @@ +%%% @author Cliff Moon <cliff@powerset.com> [] +%%% @copyright 2008 Cliff Moon + +-module (vector_clock). +-export ([create/1, truncate/1, increment/2, compare/2, resolve/2, merge/2, + equals/2]). + +%% -ifdef(TEST). +%% -include("etest/vector_clock_test.erl"). +%% -endif. + +create(NodeName) -> [{NodeName, lib_misc:now_float()}]. + +truncate(Clock) when length(Clock) > 10 -> + lists:nthtail(length(Clock) - 10, lists:keysort(2, Clock)); + +truncate(Clock) -> Clock. + +increment(NodeName, [{NodeName, _Version}|Clocks]) -> + [{NodeName, lib_misc:now_float()}|Clocks]; + +increment(NodeName, [NodeClock|Clocks]) -> + [NodeClock|increment(NodeName, Clocks)]; + +increment(NodeName, []) -> + [{NodeName, lib_misc:now_float()}]. + +resolve({ClockA, ValuesA}, {ClockB, ValuesB}) -> + case compare(ClockA, ClockB) of + less -> {ClockB, ValuesB}; + greater -> {ClockA, ValuesA}; + equal -> {ClockA, ValuesA}; + concurrent -> + io:format("~nConcurrent Clocks~n" + "ClockA : ~p~nClockB : ~p~n" + "ValuesA: ~p~nValuesB: ~p~n" + , [ClockA, ClockB, ValuesA, ValuesB]), + {merge(ClockA,ClockB), ValuesA ++ ValuesB} + end; +resolve(not_found, {Clock, Values}) -> + {Clock, Values}; +resolve({Clock, Values}, not_found) -> + {Clock, Values}. + +merge(ClockA, ClockB) -> + merge([], ClockA, ClockB). + +merge(Merged, [], ClockB) -> lists:keysort(1, Merged ++ ClockB); + +merge(Merged, ClockA, []) -> lists:keysort(1, Merged ++ ClockA); + +merge(Merged, [{NodeA, VersionA}|ClockA], ClockB) -> + case lists:keytake(NodeA, 1, ClockB) of + {value, {NodeA, VersionB}, TrunkClockB} when VersionA > VersionB -> + merge([{NodeA,VersionA}|Merged],ClockA,TrunkClockB); + {value, {NodeA, VersionB}, TrunkClockB} -> + merge([{NodeA,VersionB}|Merged],ClockA,TrunkClockB); + false -> + merge([{NodeA,VersionA}|Merged],ClockA,ClockB) + end. + +compare(ClockA, ClockB) -> + AltB = less_than(ClockA, ClockB), + if AltB -> less; true -> + BltA = less_than(ClockB, ClockA), + if BltA -> greater; true -> + AeqB = equals(ClockA, ClockB), + if AeqB -> equal; true -> concurrent end + end + end. + +%% ClockA is less than ClockB if and only if ClockA[z] <= ClockB[z] for all +%% instances z and there exists an index z' such that ClockA[z'] < ClockB[z'] +less_than(ClockA, ClockB) -> + ForAll = lists:all(fun({Node, VersionA}) -> + case lists:keysearch(Node, 1, ClockB) of + {value, {_NodeB, VersionB}} -> VersionA =< VersionB; + false -> false + end + end, ClockA), + Exists = lists:any(fun({NodeA, VersionA}) -> + case lists:keysearch(NodeA, 1, ClockB) of + {value, {_NodeB, VersionB}} -> VersionA /= VersionB; + false -> true + end + end, ClockA), + %length takes care of the case when clockA is shorter than B + ForAll and (Exists or (length(ClockA) < length(ClockB))). + +equals(ClockA, ClockB) -> + Equivalent = lists:all(fun({NodeA, VersionA}) -> + lists:any(fun(NodeClockB) -> + case NodeClockB of + {NodeA, VersionA} -> true; + _ -> false + end + end, ClockB) + end, ClockA), + Equivalent and (length(ClockA) == length(ClockB)). diff --git a/test/Emakefile b/test/Emakefile new file mode 100644 index 00000000..d05e4d94 --- /dev/null +++ b/test/Emakefile @@ -0,0 +1,4 @@ +{"*", [warn_obsolete_guard, warn_unused_import, + warn_shadow_vars, warn_export_vars, debug_info, + {i, "../include"}, + {outdir, "../tests_ebin"}]}. diff --git a/test/Makefile b/test/Makefile new file mode 100644 index 00000000..45998c6e --- /dev/null +++ b/test/Makefile @@ -0,0 +1,12 @@ +include ../support/include.mk + +all: $(EBIN_FILES_NO_DOCS) + +doc: $(EBIN_FILES) + +debug: + $(MAKE) DEBUG=-DDEBUG + +clean: + rm -rf $(EBIN_FILES) + rm -rf ../tests_ebin
\ No newline at end of file diff --git a/test/cluster_ops_test.erl b/test/cluster_ops_test.erl new file mode 100644 index 00000000..1c692dcf --- /dev/null +++ b/test/cluster_ops_test.erl @@ -0,0 +1,83 @@ +-module(cluster_ops_test). + +-include("../../couchdb/couch_db.hrl"). +-include_lib("eunit/include/eunit.hrl"). + + +% read_quorum_test() -> +% % we need to be running a cluster here... +% % not sure how to start things up for unit tests + +% % but we're testing reads when a node is missing a doc, so disable internal +% % replication - a bit harsh if anything else is here, but hey, it's a test +% rpc:multicall(showroom, stop, []), +% rpc:multicall(supervisor, terminate_child, +% [couch_primary_services, couch_replication_supervisor]), +% rpc:multicall(supervisor, delete_child, +% [couch_primary_services, couch_replication_supervisor]), + +% % create db +% DbName = <<"cluster_ops_test">>, +% showroom_db:delete_db(DbName, []), +% {Status, #db{name=DbName}} = showroom_db:create_db(DbName, []), +% ?assertEqual(ok, Status), + +% % open db +% {ok, Db} = showroom_db:open_db(DbName, []), + +% % make a test doc +% Key = <<"a">>, +% Json = {[{<<"_id">>,Key}]}, +% Doc = couch_doc:from_json_obj(Json), +% Clock = vector_clock:create(node()), +% NewDoc = Doc#doc{clock=Clock}, + +% % insert a doc in two shards out of three +% % TODO: we need N=3, need to fix that at db create time Options above +% % (fb 1001) +% {M,F,A} = {dynomite_couch_api, put,[Db, NewDoc, []]}, +% CorrectNodeParts = membership2:nodeparts_for_key(Key), +% [{MissingNode, MissingPart} | BadNodeParts] = CorrectNodeParts, +% MapFun = fun({Node,Part}) -> +% rpc:call(Node, M, F, [[Part | A]]) +% end, +% {Good, Bad} = pcall(MapFun, BadNodeParts, 2), +% ?assertEqual(2, length(Good)), +% ?assertEqual([], Bad), + +% % make sure it's notfound on the MissingNode +% MissingNodeGet = rpc:call(MissingNode, dynomite_couch_api, get, +% [[MissingPart, Db, Key, nil, []]]), +% ?assertEqual({not_found, {[], [missing]}}, MissingNodeGet), + +% JsonDoc = {[{<<"_id">>,<<"a">>}, +% {<<"_rev">>, +% <<"1-967a00dff5e02add41819138abb3284d">>}]}, + +% % r=3 should fail +% {r_quorum_not_met, {[{message, _M}, {good, G}, {bad, B}]}} = +% showroom_doc:open_doc(Db, Key, nil, [{r, "3"}]), +% ?assertEqual([JsonDoc,JsonDoc], G), +% ?assertEqual([{not_found, missing}], B), + +% % r=2 should never fail (run it many times to make sure) +% do_opens({Db, Key, nil, [{r, "2"}]}, 20), + +% ok. + + +% pcall(MapFun, Servers, Const) -> +% Replies = lib_misc:pmap(MapFun, Servers, Const), +% lists:partition(fun valid/1, Replies). + + +% valid({ok, _}) -> true; +% valid(ok) -> true; +% valid(_) -> false. + + +% do_opens(_,0) -> ok; +% do_opens({Db, DocId, Refs, Options} = Payload, Times) -> +% {Status, _Doc} = showroom_doc:open_doc(Db, DocId, Refs, Options), +% ?assertEqual(ok, Status), +% do_opens(Payload, Times-1). diff --git a/test/mem2_code_change.erl b/test/mem2_code_change.erl new file mode 100644 index 00000000..3b0c73fb --- /dev/null +++ b/test/mem2_code_change.erl @@ -0,0 +1,12 @@ +-module(mem2_code_change). + +-export([run/0]). + +run() -> + Pid = whereis(membership), + OldVsn = "0.7.1-cloudant", + Extra = "", + + sys:suspend(Pid), + sys:change_code(Pid, membership2, OldVsn, Extra), + sys:resume(Pid). diff --git a/test/mem_utils_test.erl b/test/mem_utils_test.erl new file mode 100644 index 00000000..b884d94e --- /dev/null +++ b/test/mem_utils_test.erl @@ -0,0 +1,97 @@ +-module(mem_utils_test). + +-include_lib("eunit/include/eunit.hrl"). + + +join_type_test() -> + Options = [{replace,node3}], + ?assertEqual({replace,node3}, mem_utils:join_type(dummy,dummy,Options)). + + +pmap_from_full_test() -> + ?assertEqual([{n1,0},{n2,1},{n3,2},{n4,3}], + mem_utils:pmap_from_full(t_fullmap(0))). + + +fix_mappings_nodedown_test() -> + {PMap0, Fullmap0} = mem_utils:fix_mappings(nodedown, n3, t_fullmap(0)), + % with n3 down, n1 takes over + ?assertEqual([{n1,0},{n2,1},{n1,2},{n4,3}], PMap0), + ?assertEqual(t_fullmap(1), lists:sort(Fullmap0)). + + +fix_mappings_rejoin_test() -> + {PMap0, Fullmap0} = mem_utils:fix_mappings(nodedown, n3, t_fullmap(0)), + % with n3 down, n1 takes over + ?assertEqual([{n1,0},{n2,1},{n1,2},{n4,3}], PMap0), + ?assertEqual(t_fullmap(1), lists:sort(Fullmap0)), + % now have n3 rejoin + {PMap1, Fullmap1} = mem_utils:fix_mappings(rejoin, n3, Fullmap0), + ?assertEqual([{n1,0},{n2,1},{n3,2},{n4,3}], PMap1), + ?assertEqual(lists:sort(t_fullmap(0)), lists:sort(Fullmap1)). + + +fix_mappings_replace_test() -> + {PMap0, Fullmap0} = mem_utils:fix_mappings(nodedown, n3, t_fullmap(0)), + % with n3 down, n1 takes over + ?assertEqual([{n1,0},{n2,1},{n1,2},{n4,3}], PMap0), + ?assertEqual(t_fullmap(1), lists:sort(Fullmap0)), + % now replace n3 with n5 + {PMap2, Fullmap2} = mem_utils:fix_mappings(replace, {n3,n5}, Fullmap0), + ?assertEqual([{n1,0},{n2,1},{n5,2},{n4,3}], PMap2), + ?assertEqual(lists:sort(t_fullmap(2)), lists:sort(Fullmap2)). + + +fix_mappings_already_down_test() -> + {_PMap0, Fullmap0} = mem_utils:fix_mappings(nodedown, n3, t_fullmap(1)), + ?assertEqual(t_fullmap(1), lists:sort(Fullmap0)). + + +was_i_nodedown_test() -> + ?assertEqual(true, mem_utils:was_i_nodedown(n3, t_fullmap(1))), + ?assertEqual(false, mem_utils:was_i_nodedown(n3, t_fullmap(0))). + + +%% test helper funs + +t_fullmap(0) -> % four node, four part fullmap (unsorted) + [{n1,0,primary}, + {n2,0,partner}, + {n3,0,partner}, + {n2,1,primary}, + {n3,1,partner}, + {n4,1,partner}, + {n3,2,primary}, + {n4,2,partner}, + {n1,2,partner}, + {n4,3,primary}, + {n1,3,partner}, + {n2,3,partner}]; +t_fullmap(1) -> % like (0) above, but n3 is down (sorted) + [{n1,0,primary}, + {n1,2,partner}, + {n1,3,partner}, + {n2,0,partner}, + {n2,1,primary}, + {n2,3,partner}, + {n3,0,{nodedown,partner}}, + {n3,1,{nodedown,partner}}, + {n3,2,{nodedown,primary}}, + {n4,1,partner}, + {n4,2,partner}, + {n4,3,primary}]; +t_fullmap(2) -> % like (0) above, but n3 is replaced w/ n5 (unsorted) + [{n1,0,primary}, + {n2,0,partner}, + {n5,0,partner}, + {n2,1,primary}, + {n5,1,partner}, + {n4,1,partner}, + {n5,2,primary}, + {n4,2,partner}, + {n1,2,partner}, + {n4,3,primary}, + {n1,3,partner}, + {n2,3,partner}]; +t_fullmap(_Huh) -> + huh. diff --git a/test/membership2_test.erl b/test/membership2_test.erl new file mode 100644 index 00000000..ed804cc2 --- /dev/null +++ b/test/membership2_test.erl @@ -0,0 +1,126 @@ +%%% -*- erlang-indent-level:2 -*- +-module(membership2_test). +-author('cliff@powerset.com'). +-author('brad@cloudant.com'). + +-include("../include/config.hrl"). +-include("../include/common.hrl"). +-include("../include/test.hrl"). + +-include_lib("eunit/include/eunit.hrl"). + +% singular_startup_sequence_test() -> +% %% configuration:start_link(#config{n=1,r=1,w=1,q=6,directory=?TMP_DIR}), +% {ok, _} = mock:mock(configuration), +% mock:expects(configuration, get_config, fun(_Args) -> true end, +% #config{n=1,r=1,w=1,q=6,directory=?TMP_DIR}, 3), +% {ok, _} = mock:mock(replication), +% mock:expects(replication, partners, fun({_, [a], _}) -> true end, []), +% mock:expects(replication, partners_plus, fun({a, [a]}) -> true end, []), +% {ok, M} = membership2:start_link(a, [a]), +% State = gen_server:call(M, state), +% ?assertEqual(a, State#membership.node), +% ?assertEqual([a], State#membership.nodes), +% mock:verify_and_stop(replication), +% membership2:stop(M), +% %% configuration:stop(), +% mock:verify_and_stop(configuration), +% ?assertMatch({ok, [[a]]}, file:consult(?TMP_FILE("a.world"))), +% file:delete(?TMP_FILE("a.world")). + +% -define(NODEA, {a, ["d", "1", "4"]}). +% -define(NODEB, {b, ["e", "3", "1"]}). +% -define(NODEC, {c, ["f", "1", "2"]}). +% -define(NODES, [?NODEA, ?NODEB, ?NODEC]). + +% multi_startup_sequence_test() -> +% {ok, _} = mock:mock(configuration), +% mock:expects(configuration, get_config, fun(_Args) -> true end, +% (#config{n=3,r=1,w=1,q=6,directory=?TMP_DIR}), 3), +% {ok, _} = mock:mock(replication), +% VersionOne = vector_clock:create(make_ref()), +% Pid1 = make_ref(), +% VersionTwo = vector_clock:create(make_ref()), +% Pid2 = make_ref(), +% mock:expects(replication, partners, fun({_, ?NODES, _}) -> true end, [?NODEB, ?NODEC]), +% {ok, _} = stub:stub(membership2, call_join, fun(?NODEB, ?NODEA) -> +% {VersionOne, ?NODES, [{1,Pid1}]}; +% (?NODEC, ?NODEA) -> +% {VersionTwo, ?NODES, [{2,Pid2}]} +% end, 2), +% ?debugMsg("proxied"), +% ?debugFmt("check process code: ~p", [erlang:check_process_code(self(), membership2)]), +% {ok, M} = membership2:start_link(?NODEA, ?NODES), +% State = gen_server:call(M, state), +% ?assertEqual(?NODEA, State#membership.node), +% ?assertEqual(?NODES, State#membership.nodes), +% % Servers = State#membership.servers, +% % ?assertMatch([{1,Pid1},{2,Pid2}], membership2:servers_to_list(Servers)), +% ?assertEqual(greater, vector_clock:compare(State#membership.version, VersionOne)), +% ?assertEqual(greater, vector_clock:compare(State#membership.version, VersionTwo)), +% mock:verify_and_stop(replication), +% membership2:stop(M), +% mock:verify_and_stop(configuration), +% ?assertMatch({ok, [?NODES]}, file:consult(?TMP_FILE("a.world"))), +% file:delete(?TMP_FILE("a.world")). + +% startup_and_first_servers_for_key_test() -> +% configuration:start_link(#config{n=1,r=1,w=1,q=6,directory=?TMP_DIR}), +% {ok, _} = mock:mock(replication), +% mock:expects(replication, partners, fun({_, [a], _}) -> true end, []), +% {ok, M} = membership2:start_link(a, [a]), +% _State = gen_server:call(M, state), +% ?assertEqual([], membership2:servers_for_key("blah")), +% mock:verify_and_stop(replication), +% membership2:stop(M), +% configuration:stop(), +% ?assertMatch({ok, [[a]]}, file:consult(?TMP_FILE("a.world"))), +% file:delete(?TMP_FILE("a.world")). + +% startup_and_register_test() -> +% configuration:start_link(#config{n=1,r=1,w=1,q=0,directory=?TMP_DIR}), +% {ok, _} = mock:mock(replication), +% mock:expects(replication, partners, fun({_, [?NODEA], _}) -> true end, [], 3), +% {ok, M} = membership2:start_link(?NODEA, [?NODEA]), +% SServer1 = make_server(), +% SServer2 = make_server(), +% membership2:register(1, SServer1), +% membership2:register(1, SServer2), +% ?assertEqual([SServer1, SServer2], membership2:servers_for_key("blah")), +% mock:verify_and_stop(replication), +% membership2:stop(M), +% configuration:stop(), +% SServer1 ! stop, +% SServer2 ! stop, +% file:delete(?TMP_FILE("a.world")). + +% handle_local_server_outage_test() -> +% configuration:start_link(#config{n=1,r=1,w=1,q=0,directory=?TMP_DIR}), +% {ok, _} = mock:mock(replication), +% mock:expects(replication, partners, fun({_, [?NODEA], _}) -> true end, [], 4), +% {ok, M} = membership2:start_link(?NODEA, [?NODEA]), +% SServer1 = make_server(), +% SServer2 = make_server(), +% membership2:register(1, SServer1), +% membership2:register(1, SServer2), +% SServer1 ! stop, +% timer:sleep(1), +% ?assertEqual([SServer2], membership2:servers_for_key("blah")), +% mock:verify_and_stop(replication), +% membership2:stop(M), +% configuration:stop(), +% SServer2 ! stop, +% file:delete(?TMP_FILE("a.world")). + +% full_gossip_test() -> +% configuration:start_link(#config{n=1,r=1,w=1,q=2,directory=priv_dir()}), +% {ok, _} = mock:mock(replication), +% mock:expects(replication, partners, fun({_, ?NODES, _}) -> true end, [?NODEB, ?NODEC],4), + + +% make_server() -> +% spawn(fun() -> +% receive +% stop -> ok +% end +% end). diff --git a/test/mock.erl b/test/mock.erl new file mode 100644 index 00000000..2ecbf4f7 --- /dev/null +++ b/test/mock.erl @@ -0,0 +1,322 @@ +%%% -*- erlang-indent-level:2 -*- +%%%------------------------------------------------------------------- +%%% File: mock.erl +%%% @author Cliff Moon <> [] +%%% @copyright 2009 Cliff Moon +%%% @doc +%%% +%%% @end +%%% +%%% @since 2009-01-04 by Cliff Moon +%%%------------------------------------------------------------------- +-module(mock). +-author('cliff@powerset.com'). + +%% API +-export([mock/1, proxy_call/2, proxy_call/3, expects/4, expects/5, + verify_and_stop/1, verify/1, stub_proxy_call/3, stop/1]). + +-include_lib("eunit/include/eunit.hrl"). +-include("../include/common.hrl"). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-record(mockstate, {old_code, module, expectations=[]}). + +%%==================================================================== +%% API +%%==================================================================== +%%-------------------------------------------------------------------- +%% @spec mock(Module::atom()) -> {ok,Mock::record()} | ignore | {error,Error} +%% @doc Starts the server +%% @end +%%-------------------------------------------------------------------- +mock(Module) -> + case gen_server:start_link({local, mod_to_name(Module)}, mock, Module, []) of + {ok, Pid} -> {ok, Pid}; + {error, Reason} -> {error, Reason} + end. + +%% @spec proxy_call(Module::atom(), Function::atom()) -> term() +%% @doc Proxies a call to the mock server for Module without arguments +%% @end +proxy_call(Module, Function) -> + gen_server:call(mod_to_name(Module), {proxy_call, Function, {}}). + +%% @spec proxy_call(Module::atom(), Function::atom(), Args::tuple()) -> term() +%% @doc Proxies a call to the mock server for Module with arguments +%% @end +proxy_call(Module, Function, Args) -> + gen_server:call(mod_to_name(Module), {proxy_call, Function, Args}). + +stub_proxy_call(Module, Function, Args) -> + RegName = list_to_atom(lists:concat([Module, "_", Function, "_stub"])), + Ref = make_ref(), + RegName ! {Ref, self(), Args}, + ?debugFmt("sending {~p,~p,~p}", [Ref, self(), Args]), + receive + {Ref, Answer} -> Answer + end. + +%% @spec expects(Module::atom(), +%% Function::atom(), +%% Args::function(), +%% Ret::function() | term() ) -> term() + +%% Times:: {at_least, integer()} | never | {no_more_than, integer()} | integer()) -> term() + +%% @doc Sets the expectation that Function of Module will be called during a +%% test with Args. Args should be a fun predicate that will return true or +%% false whether or not the argument list matches. The argument list of the +%% function is passed in as a tuple. Ret is either a value to return or a fun +%% of arity 2 to be evaluated in response to a proxied call. The first argument +%% is the actual args from the call, the second is the call count starting +%% with 1. +expects(Module, Function, Args, Ret) -> + gen_server:call(mod_to_name(Module), {expects, Function, Args, Ret, 1}). + +expects(Module, Function, Args, Ret, Times) -> + gen_server:call(mod_to_name(Module), {expects, Function, Args, Ret, Times}). + +%% stub(Module, Function, Args, Ret) -> +%% gen_server:call(mod_to_name(Module), {stub, Function, Args, Ret}). + +verify_and_stop(Module) -> + verify(Module), + stop(Module). + +verify(Module) -> + ?assertEqual(ok, gen_server:call(mod_to_name(Module), verify)). + +stop(Module) -> + gen_server:cast(mod_to_name(Module), stop), + timer:sleep(10). + + +%%==================================================================== +%% gen_server callbacks +%%==================================================================== + +%%-------------------------------------------------------------------- +%% @spec init(Args) -> {ok, State} | +%% {ok, State, Timeout} | +%% ignore | +%% {stop, Reason} +%% @doc Initiates the server +%% @end +%%-------------------------------------------------------------------- +init(Module) -> + case code:get_object_code(Module) of + {Module, Bin, Filename} -> + case replace_code(Module) of + ok -> {ok, #mockstate{module=Module,old_code={Module, Bin, Filename}}}; + {error, Reason} -> {stop, Reason} + end; + error -> {stop, ?fmt("Could not get object code for module ~p", [Module])} + end. + +%%-------------------------------------------------------------------- +%% @spec +%% handle_call(Request, From, State) -> {reply, Reply, State} | +%% {reply, Reply, State, Timeout} | +%% {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, Reply, State} | +%% {stop, Reason, State} +%% @doc Handling call messages +%% @end +%%-------------------------------------------------------------------- +handle_call({proxy_call, Function, Args}, _From, + State = #mockstate{module=Mod,expectations=Expects}) -> + case match_expectation(Function, Args, Expects) of + {matched, ReturnTerm, NewExpects} -> + {reply, ReturnTerm, State#mockstate{expectations=NewExpects}}; + unmatched -> + {stop, ?fmt("got unexpected call to ~p:~p", [Mod,Function])} + end; + +handle_call({expects, Function, Args, Ret, Times}, _From, + State = #mockstate{expectations=Expects}) -> + {reply, ok, State#mockstate{ + expectations=add_expectation(Function, Args, Ret, Times, Expects)}}; + +handle_call(verify, _From, State = #mockstate{expectations=Expects,module=Mod}) -> + ?infoFmt("verifying ~p~n", [Mod]), + if + length(Expects) > 0 -> + {reply, {mismatch, format_missing_expectations(Expects, Mod)}, State}; + true -> {reply, ok, State} + end. + +%%-------------------------------------------------------------------- +%% @spec handle_cast(Msg, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% @doc Handling cast messages +%% @end +%%-------------------------------------------------------------------- +handle_cast(stop, State) -> + timer:sleep(10), + {stop, normal, State}. + +%%-------------------------------------------------------------------- +%% @spec handle_info(Info, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% @doc Handling all non call/cast messages +%% @end +%%-------------------------------------------------------------------- +handle_info(_Info, State) -> + {noreply, State}. + +%%-------------------------------------------------------------------- +%% @spec terminate(Reason, State) -> void() +%% @doc This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any necessary +%% cleaning up. When it returns, the gen_server terminates with Reason. +%% The return value is ignored. +%% @end +%%-------------------------------------------------------------------- +terminate(_Reason, #mockstate{old_code={Module, Binary, Filename}}) -> + code:purge(Module), + code:delete(Module), + code:load_binary(Module, Filename, Binary), + timer:sleep(10). + +%%-------------------------------------------------------------------- +%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState} +%% @doc Convert process state when code is changed +%% @end +%%-------------------------------------------------------------------- +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%-------------------------------------------------------------------- +%%% Internal functions +%%-------------------------------------------------------------------- +format_missing_expectations(Expects, Mod) -> + format_missing_expectations(Expects, Mod, []). + +format_missing_expectations([], _, Msgs) -> + lists:reverse(Msgs); + +format_missing_expectations([{Function, _Args, _Ret, Times, Called}|Expects], Mod, Msgs) -> + Msgs1 = [?fmt("expected ~p:~p to be called ~p times but was called ~p", [Mod,Function,Times,Called])|Msgs], + format_missing_expectations(Expects, Mod, Msgs1). + +add_expectation(Function, Args, Ret, Times, Expects) -> + Expects ++ [{Function, Args, Ret, Times, 0}]. + +match_expectation(Function, Args, Expectations) -> + match_expectation(Function, Args, Expectations, []). + +match_expectation(_Function, _Args, [], _Rest) -> + unmatched; + +match_expectation(Function, Args, [{Function, Matcher, Ret, MaxTimes, Invoked}|Expects], Rest) -> + case Matcher(Args) of + true -> + ReturnTerm = prepare_return(Args, Ret, Invoked+1), + if + Invoked + 1 >= MaxTimes -> {matched, ReturnTerm, lists:reverse(Rest) ++ Expects}; + true -> {matched, ReturnTerm, lists:reverse(Rest) ++ [{Function, Matcher, Ret, MaxTimes, Invoked+1}] ++ Expects} + end; + false -> match_expectation(Function, Args, Expects, [{Function,Matcher,Ret,MaxTimes,Invoked}|Rest]) + end; + +match_expectation(Function, Args, [Expect|Expects], Rest) -> + match_expectation(Function, Args, Expects, [Expect|Rest]). + +prepare_return(Args, Ret, Invoked) when is_function(Ret) -> + Ret(Args, Invoked); + +prepare_return(_Args, Ret, _Invoked) -> + Ret. + +replace_code(Module) -> + Info = Module:module_info(), + Exports = get_exports(Info), + unload_code(Module), + NewFunctions = generate_functions(Module, Exports), + Forms = [ + {attribute,1,module,Module}, + {attribute,2,export,Exports} + ] ++ NewFunctions, + case compile:forms(Forms, [binary]) of + {ok, Module, Binary} -> case code:load_binary(Module, atom_to_list(Module) ++ ".erl", Binary) of + {module, Module} -> ok; + {error, Reason} -> {error, Reason} + end; + error -> {error, "An undefined error happened when compiling."}; + {error, Errors, Warnings} -> {error, Errors ++ Warnings} + end. + +unload_code(Module) -> + code:purge(Module), + code:delete(Module). + +get_exports(Info) -> + get_exports(Info, []). + +get_exports(Info, Acc) -> + case lists:keytake(exports, 1, Info) of + {value, {exports, Exports}, ModInfo} -> + get_exports(ModInfo, Acc ++ lists:filter(fun({module_info, _}) -> false; (_) -> true end, Exports)); + _ -> Acc + end. + +%% stub_function_loop(Fun) -> +%% receive +%% {Ref, Pid, Args} -> +%% ?debugFmt("received {~p,~p,~p}", [Ref, Pid, Args]), +%% Ret = (catch Fun(Args) ), +%% ?debugFmt("sending {~p,~p}", [Ref,Ret]), +%% Pid ! {Ref, Ret}, +%% stub_function_loop(Fun) +%% end. + +% Function -> {function, Lineno, Name, Arity, [Clauses]} +% Clause -> {clause, Lineno, [Variables], [Guards], [Expressions]} +% Variable -> {var, Line, Name} +% +generate_functions(Module, Exports) -> + generate_functions(Module, Exports, []). + +generate_functions(_Module, [], FunctionForms) -> + lists:reverse(FunctionForms); + +generate_functions(Module, [{Name,Arity}|Exports], FunctionForms) -> + generate_functions(Module, Exports, [generate_function(Module, Name, Arity)|FunctionForms]). + +generate_function(Module, Name, Arity) -> + {function, 1, Name, Arity, [{clause, 1, generate_variables(Arity), [], generate_expression(mock, proxy_call, Module, Name, Arity)}]}. + +generate_variables(0) -> []; +generate_variables(Arity) -> + lists:map(fun(N) -> + {var, 1, list_to_atom(lists:concat(['Arg', N]))} + end, lists:seq(1, Arity)). + +generate_expression(M, F, Module, Name, 0) -> + [{call,1,{remote,1,{atom,1,M},{atom,1,F}}, [{atom,1,Module}, {atom,1,Name}]}]; +generate_expression(M, F, Module, Name, Arity) -> + [{call,1,{remote,1,{atom,1,M},{atom,1,F}}, [{atom,1,Module}, {atom,1,Name}, {tuple,1,lists:map(fun(N) -> + {var, 1, list_to_atom(lists:concat(['Arg', N]))} + end, lists:seq(1, Arity))}]}]. + +mod_to_name(Module) -> + list_to_atom(lists:concat([mock_, Module])). + +%% replace_function(FF, Forms) -> +%% replace_function(FF, Forms, []). + +%% replace_function(FF, [], Ret) -> +%% [FF|lists:reverse(Ret)]; + +%% replace_function({function,_,Name,Arity,Clauses}, [{function,Line,Name,Arity,_}|Forms], Ret) -> +%% lists:reverse(Ret) ++ [{function,Line,Name,Arity,Clauses}|Forms]; + +%% replace_function(FF, [FD|Forms], Ret) -> +%% replace_function(FF, Forms, [FD|Ret]). diff --git a/test/mock_genserver.erl b/test/mock_genserver.erl new file mode 100644 index 00000000..cde41ff5 --- /dev/null +++ b/test/mock_genserver.erl @@ -0,0 +1,209 @@ +%%%------------------------------------------------------------------- +%%% File: mock_genserver.erl +%%% @author Cliff Moon <> [] +%%% @copyright 2009 Cliff Moon +%%% @doc +%%% +%%% @end +%%% +%%% @since 2009-01-02 by Cliff Moon +%%%------------------------------------------------------------------- +-module(mock_genserver). +-author('cliff@powerset.com'). + +-behaviour(gen_server). + +-include_lib("eunit/include/eunit.hrl"). + +%% API +-export([start_link/1, stub_call/3, expects_call/3, expects_call/4, stop/1]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-record(state, {call_stubs=[], call_expects=[], cast_expectations, info_expectations}). + +%%==================================================================== +%% API +%%==================================================================== +%%-------------------------------------------------------------------- +%% @spec start_link(Reference::atom()) -> {ok,Pid} | ignore | {error,Error} +%% @doc Starts the server +%% @end +%%-------------------------------------------------------------------- +start_link(Reference) -> + gen_server:start_link(Reference, ?MODULE, [], []). + +stub_call(Server, Sym, Fun) when is_function(Fun) -> + gen_server:call(Server, {mock_stub_call, Sym, Fun}). + +expects_call(Server, Args, Fun) when is_function(Fun) -> + gen_server:call(Server, {mock_expects_call, Args, Fun}). + +expects_call(Server, Args, Fun, Times) when is_function(Fun) -> + gen_server:call(Server, {mock_expects_call, Args, Fun, Times}). + +stop(Server) -> + gen_server:call(Server, mock_stop). + +%%==================================================================== +%% gen_server callbacks +%%==================================================================== + +%%-------------------------------------------------------------------- +%% @spec init(Args) -> {ok, State} | +%% {ok, State, Timeout} | +%% ignore | +%% {stop, Reason} +%% @doc Initiates the server +%% @end +%%-------------------------------------------------------------------- +init([]) -> + {ok, #state{}}. + +%%-------------------------------------------------------------------- +%% @spec +%% handle_call(Request, From, State) -> {reply, Reply, State} | +%% {reply, Reply, State, Timeout} | +%% {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, Reply, State} | +%% {stop, Reason, State} +%% @doc Handling call messages +%% @end +%%-------------------------------------------------------------------- +handle_call({mock_stub_call, Sym, Fun}, _From, State = #state{call_stubs=Stubs}) -> + {reply, ok, State#state{call_stubs=[{Sym, Fun}|Stubs]}}; + +handle_call({mock_expects_call, Args, Fun}, _From, State = #state{call_expects=Expects}) -> + {reply, ok, State#state{call_expects=add_expectation(Args, Fun, at_least_once, Expects)}}; + +handle_call({mock_expects_call, Args, Fun, Times}, _From, State = #state{call_expects=Expects}) -> + {reply, ok, State#state{call_expects=add_expectation(Args, Fun, Times, Expects)}}; + +handle_call(mock_stop, _From, State) -> + {stop, normal, ok, State}; + +handle_call(Request, _From, State = #state{call_stubs=Stubs,call_expects=Expects}) -> + % expectations have a higher priority + case find_expectation(Request, Expects) of + {found, {_, Fun, Time}, NewExpects} -> {reply, Fun(Request, Time), State#state{call_expects=NewExpects}}; + not_found -> % look for a stub + case find_stub(Request, Stubs) of + {found, {_, Fun}} -> {reply, Fun(Request), State}; + not_found -> + {stop, {unexpected_call, Request}, State} + end + end. + +%%-------------------------------------------------------------------- +%% @spec handle_cast(Msg, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% @doc Handling cast messages +%% @end +%%-------------------------------------------------------------------- +handle_cast(_Msg, State) -> + {noreply, State}. + +%%-------------------------------------------------------------------- +%% @spec handle_info(Info, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% @doc Handling all non call/cast messages +%% @end +%%-------------------------------------------------------------------- +handle_info(_Info, State) -> + {noreply, State}. + +%%-------------------------------------------------------------------- +%% @spec terminate(Reason, State) -> void() +%% @doc This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any necessary +%% cleaning up. When it returns, the gen_server terminates with Reason. +%% The return value is ignored. +%% @end +%%-------------------------------------------------------------------- +terminate(_Reason, _State) -> + ok. + +%%-------------------------------------------------------------------- +%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState} +%% @doc Convert process state when code is changed +%% @end +%%-------------------------------------------------------------------- +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%-------------------------------------------------------------------- +%%% Internal functions +%%-------------------------------------------------------------------- + + +add_expectation(Args, Fun, Times, Expects) -> + Expects ++ [{Args, Fun, Times}]. + +find_expectation(Request, Expects) -> + find_expectation(Request, Expects, []). + +find_expectation(_Request, [], _Rest) -> + not_found; + +find_expectation(Request, [{Args, Fun, Times}|Expects], Rest) -> + MatchFun = generate_match_fun(Args), + case MatchFun(Request) of + true -> + if + Times == at_least_once -> {found, {Args, Fun, Times}, lists:reverse(Rest) ++ [{Args, Fun, Times}] ++ Expects}; + Times == 1 -> {found, {Args, Fun, Times}, lists:reverse(Rest) ++ Expects}; + true -> {found, {Args, Fun, Times}, lists:reverse(Rest) ++ [{Args, Fun, Times-1}] ++ Expects} + end; + false -> find_expectation(Request, Expects, [{Args, Fun, Times}|Rest]) + end. + +find_stub(Request, Stub) when is_tuple(Request) -> + Sym = element(1, Request), + find_stub(Sym, Stub); + +find_stub(_Sym, []) -> + not_found; + +find_stub(Sym, _Stubs) when not is_atom(Sym) -> + not_found; + +find_stub(Sym, [{Sym, Fun}|_Stubs]) -> + {found, {Sym, Fun}}; + +find_stub(Sym, [_Stub|Stubs]) -> + find_stub(Sym, Stubs). + +generate_match_fun(Args) when is_tuple(Args) -> + generate_match_fun(tuple_to_list(Args)); + +generate_match_fun(Args) when not is_list(Args) -> + generate_match_fun([Args]); + +generate_match_fun(Args) when is_list(Args) -> + Src = generate_match_fun("fun({", Args), + {ok, Tokens, _} = erl_scan:string(Src), + {ok, [Form]} = erl_parse:parse_exprs(Tokens), + {value, Fun, _} = erl_eval:expr(Form, erl_eval:new_bindings()), + Fun. + +generate_match_fun(Src, []) -> + Src ++ "}) -> true; (_) -> false end."; + +% unbound atom means you don't care about an arg +generate_match_fun(Src, [unbound|Args]) -> + if + length(Args) > 0 -> generate_match_fun(Src ++ "_,", Args); + true -> generate_match_fun(Src ++ "_", Args) + end; + +generate_match_fun(Src, [Bound|Args]) -> + Term = lists:flatten(io_lib:format("~w", [Bound])), + if + length(Args) > 0 -> generate_match_fun(Src ++ Term ++ ",", Args); + true -> generate_match_fun(Src ++ Term, Args) + end. diff --git a/test/partitions_test.erl b/test/partitions_test.erl new file mode 100644 index 00000000..20effd8a --- /dev/null +++ b/test/partitions_test.erl @@ -0,0 +1,121 @@ +%%% -*- erlang-indent-level:2 -*- +-module(partitions_test). +-author('brad@cloudant.com'). + +-include("../include/config.hrl"). +-include("../include/common.hrl"). +-include("../include/test.hrl"). + + +join_test() -> + TableA = [{a,1},{a,2},{a,3},{a,4},{a,5},{a,6},{a,7},{a,8}], + TableB = [{a,1},{a,2},{a,3},{a,4},{b,5},{b,6},{b,7},{b,8}], + TableC = [{a,1},{a,2},{a,3},{c,4},{b,5},{b,6},{b,7},{c,8}], + TableD = [{a,1},{a,2},{d,3},{c,4},{b,5},{b,6},{d,7},{c,8}], + TableE = [{a,1},{a,2},{d,3},{c,4},{b,5},{b,6},{e,7},{c,8}], + TableF = [{a,1},{a,2},{d,3},{c,4},{b,5},{b,6},{e,7},{f,8}], + TableG = [{a,1},{a,2},{d,3},{c,4},{b,5},{g,6},{e,7},{f,8}], + TableH = [{a,1},{h,2},{d,3},{c,4},{b,5},{g,6},{e,7},{f,8}], + ?assertEqual({ok,TableA}, partitions:join(a, TableA, [])), + ?assertEqual({ok,TableB}, partitions:join(b, TableA, [])), + ?assertEqual({ok,TableC}, partitions:join(c, TableB, [])), + ?assertEqual({ok,TableD}, partitions:join(d, TableC, [])), + ?assertEqual({ok,TableE}, partitions:join(e, TableD, [])), + ?assertEqual({ok,TableF}, partitions:join(f, TableE, [])), + ?assertEqual({ok,TableG}, partitions:join(g, TableF, [])), + ?assertEqual({ok,TableH}, partitions:join(h, TableG, [])), + ?assertEqual({error, "Too many nodes vs partitions", TableH}, + partitions:join(i, TableH, [])), + ok. + + +hints_test() -> + TableA = [{a,1},{a,2},{a,3},{a,4},{a,5},{a,6},{a,7},{a,8}], + TableB = [{a,1},{b,2},{a,3},{a,4},{a,5},{b,6},{b,7},{b,8}], + TableC = [{a,1},{a,2},{a,3},{a,4},{c,5},{c,6},{c,7},{c,8}], + TableD = [{d,1},{d,2},{d,3},{d,4},{a,5},{a,6},{a,7},{a,8}], + ?assertEqual({ok, TableB}, partitions:join(b, TableA, [2])), + ?assertEqual({ok, TableC}, partitions:join(c, TableA, [0])), + ?assertEqual({ok, TableD}, partitions:join(d, TableA, [1,2,3,4])), + ok. + + +shard_name_test() -> + ?assertEqual(<<"x000000/dbname_000000">>, + partitions:shard_name(0, <<"dbname">>)), + ok. + + +%% note: fullmaps used here +diff_same_length_test() -> + OldMap = [{a,1, type},{a,2, type},{b,3, type},{b,4, type}], + NewMap = [{a,1, type},{a,2, type},{b,3, type},{c,4, type}], + ?assertEqual([{b,c,4}], partitions:diff(OldMap, NewMap)), + ok. + + +diff_dupes_test() -> + OldMap = [{'node1@node1.boorad.local',0,primary}, + {'node2@node2.boorad.local',0,partner}, + {'node3@node3.boorad.local',0,partner}, + {'node1@node1.boorad.local',182687704666362864775460604089535377456991567872, primary}, + {'node2@node2.boorad.local',182687704666362864775460604089535377456991567872, partner}, + {'node3@node3.boorad.local',182687704666362864775460604089535377456991567872, partner}, + {'node1@node1.boorad.local',365375409332725729550921208179070754913983135744, primary}, + {'node2@node2.boorad.local',365375409332725729550921208179070754913983135744, partner}, + {'node3@node3.boorad.local',365375409332725729550921208179070754913983135744, partner}, + {'node1@node1.boorad.local',548063113999088594326381812268606132370974703616, partner}, + {'node2@node2.boorad.local',548063113999088594326381812268606132370974703616, partner}, + {'node3@node3.boorad.local',548063113999088594326381812268606132370974703616, primary}, + {'node1@node1.boorad.local',730750818665451459101842416358141509827966271488, partner}, + {'node2@node2.boorad.local',730750818665451459101842416358141509827966271488, primary}, + {'node3@node3.boorad.local',730750818665451459101842416358141509827966271488, partner}, + {'node1@node1.boorad.local',913438523331814323877303020447676887284957839360, partner}, + {'node2@node2.boorad.local',913438523331814323877303020447676887284957839360, primary}, + {'node3@node3.boorad.local',913438523331814323877303020447676887284957839360, partner}, + {'node1@node1.boorad.local',1096126227998177188652763624537212264741949407232, partner}, + {'node2@node2.boorad.local',1096126227998177188652763624537212264741949407232, primary}, + {'node3@node3.boorad.local',1096126227998177188652763624537212264741949407232, partner}, + {'node1@node1.boorad.local',1278813932664540053428224228626747642198940975104, partner}, + {'node2@node2.boorad.local',1278813932664540053428224228626747642198940975104, partner}, + {'node3@node3.boorad.local',1278813932664540053428224228626747642198940975104, primary}], + NewMap = [{'node1@node1.boorad.local',0,primary}, + {'node2@node2.boorad.local',0,partner}, + {'node3@node3.boorad.local',0,partner}, + {'node1@node1.boorad.local',182687704666362864775460604089535377456991567872, primary}, + {'node2@node2.boorad.local',182687704666362864775460604089535377456991567872, partner}, + {'node3@node3.boorad.local',182687704666362864775460604089535377456991567872, partner}, + {'node1@node1.boorad.local',365375409332725729550921208179070754913983135744, partner}, + {'node2@node2.boorad.local',365375409332725729550921208179070754913983135744, partner}, + {'node4@node4.boorad.local',365375409332725729550921208179070754913983135744, primary}, + {'node1@node1.boorad.local',548063113999088594326381812268606132370974703616, partner}, + {'node3@node3.boorad.local',548063113999088594326381812268606132370974703616, primary}, + {'node4@node4.boorad.local',548063113999088594326381812268606132370974703616, partner}, + {'node2@node2.boorad.local',730750818665451459101842416358141509827966271488, primary}, + {'node3@node3.boorad.local',730750818665451459101842416358141509827966271488, partner}, + {'node4@node4.boorad.local',730750818665451459101842416358141509827966271488, partner}, + {'node2@node2.boorad.local',913438523331814323877303020447676887284957839360, primary}, + {'node3@node3.boorad.local',913438523331814323877303020447676887284957839360, partner}, + {'node4@node4.boorad.local',913438523331814323877303020447676887284957839360, partner}, + {'node1@node1.boorad.local',1096126227998177188652763624537212264741949407232, partner}, + {'node2@node2.boorad.local',1096126227998177188652763624537212264741949407232, partner}, + {'node4@node4.boorad.local',1096126227998177188652763624537212264741949407232, primary}, + {'node1@node1.boorad.local',1278813932664540053428224228626747642198940975104, partner}, + {'node3@node3.boorad.local',1278813932664540053428224228626747642198940975104, primary}, + {'node4@node4.boorad.local',1278813932664540053428224228626747642198940975104, partner}], + + Diff = [{'node3@node3.boorad.local','node4@node4.boorad.local', + 365375409332725729550921208179070754913983135744}, + {'node2@node2.boorad.local','node4@node4.boorad.local', + 548063113999088594326381812268606132370974703616}, + {'node1@node1.boorad.local','node4@node4.boorad.local', + 730750818665451459101842416358141509827966271488}, + {'node1@node1.boorad.local','node4@node4.boorad.local', + 913438523331814323877303020447676887284957839360}, + {'node3@node3.boorad.local','node4@node4.boorad.local', + 1096126227998177188652763624537212264741949407232}, + {'node2@node2.boorad.local','node4@node4.boorad.local', + 1278813932664540053428224228626747642198940975104}], + + ?assertEqual(Diff, partitions:diff(OldMap, NewMap)), + ok. diff --git a/test/replication_test.erl b/test/replication_test.erl new file mode 100644 index 00000000..095e1b44 --- /dev/null +++ b/test/replication_test.erl @@ -0,0 +1,89 @@ +%%% -*- erlang-indent-level:2 -*- +-module(replication_test). +-author('brad@cloudant.com'). + +-include("../include/config.hrl"). +-include("../include/test.hrl"). + +-include_lib("eunit/include/eunit.hrl"). + +-define(NODEA, {a, ["d", "1", "4"]}). +-define(NODEB, {b, ["e", "3", "1"]}). +-define(NODEC, {c, ["f", "1", "2"]}). +-define(NODED, {d, ["e", "1", "2"]}). +-define(NODEE, {e, ["e", "2", "2"]}). +-define(NODES, [?NODEA, ?NODEB, ?NODEC, ?NODED, ?NODEE]). + +%% TODO: give this some effigy love, mock configuration up all of these +%% different ways. + +metadata_level_1_test() -> + configuration:start_link(#config{n=3,r=1,w=1,q=6, + directory=?TMP_DIR, + meta=[{datacenter,roundrobin}, + {rack, roundrobin}, + {slot, roundrobin} + ]}), + Partners = replication:partners(?NODEA, + [?NODEA, ?NODEB, ?NODEC], + configuration:get_config()), + ?assertEqual([?NODEB, ?NODEC], Partners), + configuration:stop(). + + +metadata_level_2_test() -> + configuration:start_link(#config{n=3,r=1,w=1,q=6, + directory=?TMP_DIR, + meta=[{datacenter,roundrobin}, + {rack, roundrobin}, + {slot, roundrobin} + ]}), + Partners = replication:partners(?NODEA, + ?NODES, + configuration:get_config()), + ?assertEqual([?NODED,?NODEE], Partners), + configuration:stop(). + + +no_metadata_test() -> + configuration:start_link(#config{n=2,r=1,w=1,q=6, + directory=?TMP_DIR, + meta=[]}), + Partners = replication:partners(a, + [a,b,c,d], + configuration:get_config()), + ?assertEqual([b], Partners), + configuration:stop(). + + +wrap_test() -> + configuration:start_link(#config{n=3,r=1,w=1,q=6, + directory=?TMP_DIR, + meta=[]}), + Wrap1Partners = replication:partners(c, + [a,b,c,d], + configuration:get_config()), + ?assertEqual([a,d], Wrap1Partners), + Wrap2Partners = replication:partners(d, + [a,b,c,d], + configuration:get_config()), + ?assertEqual([a,b], Wrap2Partners), + configuration:stop(). + + +self_test() -> + configuration:start_link(#config{n=3,r=1,w=1,q=6, + directory=?TMP_DIR, + meta=[]}), + Partners = replication:partners(a, [a], + configuration:get_config()), + ?assertEqual([], Partners), + configuration:stop(). + + +remove_self_test() -> + configuration:start_link( + #config{n=4,r=1,w=1,q=6, directory=?TMP_DIR, meta=[]}), + Partners = replication:partners(a, [a,b], configuration:get_config()), + ?assertEqual([b], Partners), + configuration:stop(). diff --git a/test/stub.erl b/test/stub.erl new file mode 100644 index 00000000..2a6173b5 --- /dev/null +++ b/test/stub.erl @@ -0,0 +1,168 @@ +%%%------------------------------------------------------------------- +%%% File: stub.erl +%%% @author Cliff Moon <> [] +%%% @copyright 2009 Cliff Moon +%%% @doc +%%% +%%% @end +%%% +%%% @since 2009-05-10 by Cliff Moon +%%%------------------------------------------------------------------- +-module(stub). +-author('cliff@powerset.com'). + +-behaviour(gen_server). + +%% API +-export([stub/3, stub/4, proxy_call/3]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-include_lib("eunit/include/eunit.hrl"). +-include("../include/common.hrl"). + +-record(state, {old_code, module, stub, times}). + +%%==================================================================== +%% API +%%==================================================================== + +stub(Module, Function, Fun) -> + stub(Module, Function, Fun, 1). + +stub(Module, Function, Fun, Times) when is_function(Fun) -> + gen_server:start({local, name(Module, Function)}, ?MODULE, [Module, Function, Fun, Times], []). + +proxy_call(_, Name, Args) -> + {Times, Reply} = gen_server:call(Name, {proxy_call, Args}), + if + Times =< 0 -> gen_server:cast(Name, stop); + true -> ok + end, + Reply. + +%%==================================================================== +%% gen_server callbacks +%%==================================================================== + +%%-------------------------------------------------------------------- +%% @spec init(Args) -> {ok, State} | +%% {ok, State, Timeout} | +%% ignore | +%% {stop, Reason} +%% @doc Initiates the server +%% @end +%%-------------------------------------------------------------------- +init([Module, Function, Fun, Times]) -> + case code:get_object_code(Module) of + {Module, Bin, Filename} -> + ?debugMsg("stubbing"), + stub_function(Module, Function, arity(Fun)), + {ok, #state{module=Module,old_code={Module,Bin,Filename},times=Times,stub=Fun}}; + error -> {stop, ?fmt("Could not get object code for module ~p", [Module])} + end. + +%%-------------------------------------------------------------------- +%% @spec +%% handle_call(Request, From, State) -> {reply, Reply, State} | +%% {reply, Reply, State, Timeout} | +%% {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, Reply, State} | +%% {stop, Reason, State} +%% @doc Handling call messages +%% @end +%%-------------------------------------------------------------------- +handle_call({proxy_call, Args}, _From, State = #state{stub=Fun, times=Times}) -> + Reply = apply(Fun, tuple_to_list(Args)), + {reply, {Times-1, Reply}, State#state{times=Times-1}}. + +%%-------------------------------------------------------------------- +%% @spec handle_cast(Msg, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% @doc Handling cast messages +%% @end +%%-------------------------------------------------------------------- +handle_cast(stop, State) -> + sleep:timer(10), + {stop, normal, State}. + +%%-------------------------------------------------------------------- +%% @spec handle_info(Info, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% @doc Handling all non call/cast messages +%% @end +%%-------------------------------------------------------------------- +handle_info(_Info, State) -> + {noreply, State}. + +%%-------------------------------------------------------------------- +%% @spec terminate(Reason, State) -> void() +%% @doc This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any necessary +%% cleaning up. When it returns, the gen_server terminates with Reason. +%% The return value is ignored. +%% @end +%%-------------------------------------------------------------------- +terminate(_Reason, #state{old_code={_Module,_Bin,_Filename}}) -> + ok. + +%%-------------------------------------------------------------------- +%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState} +%% @doc Convert process state when code is changed +%% @end +%%-------------------------------------------------------------------- +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%-------------------------------------------------------------------- +%%% Internal functions +%%-------------------------------------------------------------------- +name(Module, Function) -> + list_to_atom(lists:concat([Module, Function, "stub"])). + +stub_function(Module, Function, Arity) -> + {_, Bin, _} = code:get_object_code(Module), + {ok, {Module,[{abstract_code,{raw_abstract_v1,Forms}}]}} = beam_lib:chunks(Bin, [abstract_code]), + ?debugMsg("replacing function"), + StubbedForms = replace_function(Module, Function, Arity, Forms), + case compile:forms(StubbedForms, [binary]) of + {ok, Module, Binary} -> code:load_binary(Module, atom_to_list(Module) ++ ".erl", Binary); + Other -> Other + end. + +arity(Fun) when is_function(Fun) -> + Props = erlang:fun_info(Fun), + proplists:get_value(arity, Props). + +replace_function(Module, Function, Arity, Forms) -> + replace_function(Module, Function, Arity, Forms, []). + +replace_function(_Module, _Function, _Arity, [], Acc) -> + lists:reverse(Acc); +replace_function(Module, Function, Arity, [{function, Line, Function, Arity, _Clauses}|Forms], Acc) -> + lists:reverse(Acc) ++ [{function, Line, Function, Arity, [ + {clause, + Line, + generate_variables(Arity), + [], + generate_expression(stub,proxy_call,Module,name(Module,Function),Arity)}]}] ++ Forms; +replace_function(Module, Function, Arity, [Form|Forms], Acc) -> + replace_function(Module, Function, Arity, Forms, [Form|Acc]). + +generate_variables(0) -> []; +generate_variables(Arity) -> + lists:map(fun(N) -> + {var, 1, list_to_atom(lists:concat(['Arg', N]))} + end, lists:seq(1, Arity)). + +generate_expression(M, F, Module, Name, 0) -> + [{call,1,{remote,1,{atom,1,M},{atom,1,F}}, [{atom,1,Module}, {atom,1,Name}]}]; +generate_expression(M, F, Module, Name, Arity) -> + [{call,1,{remote,1,{atom,1,M},{atom,1,F}}, [{atom,1,Module}, {atom,1,Name}, {tuple,1,lists:map(fun(N) -> + {var, 1, list_to_atom(lists:concat(['Arg', N]))} + end, lists:seq(1, Arity))}]}]. diff --git a/test/test_suite.erl b/test/test_suite.erl new file mode 100644 index 00000000..255ed5a9 --- /dev/null +++ b/test/test_suite.erl @@ -0,0 +1,10 @@ +-module(test_suite). + +-include_lib("eunit/include/eunit.hrl"). + +all_test_() -> + [{module, mem_utils_test}, + {module, membership2_test}, + {module, partitions_test}, + {module, replication_test} + ]. |