diff options
-rw-r--r-- | include/chunk_size.hrl | 1 | ||||
-rw-r--r-- | include/common.hrl | 41 | ||||
-rw-r--r-- | include/config.hrl | 24 | ||||
-rw-r--r-- | include/dmerkle.hrl | 14 | ||||
-rw-r--r-- | include/test.hrl | 13 | ||||
-rw-r--r-- | src/bootstrap_manager.erl | 261 | ||||
-rw-r--r-- | src/bootstrap_receiver.erl | 121 | ||||
-rw-r--r-- | src/cluster_ops.erl | 264 | ||||
-rw-r--r-- | src/configuration.erl | 100 | ||||
-rw-r--r-- | src/dynomite_couch_api.erl | 140 | ||||
-rw-r--r-- | src/dynomite_couch_storage.erl | 41 | ||||
-rw-r--r-- | src/lib_misc.erl | 235 | ||||
-rw-r--r-- | src/mem_utils.erl | 129 | ||||
-rw-r--r-- | src/membership2.erl | 686 | ||||
-rw-r--r-- | src/node.erl | 39 | ||||
-rw-r--r-- | src/replication.erl | 165 |
16 files changed, 0 insertions, 2274 deletions
diff --git a/include/chunk_size.hrl b/include/chunk_size.hrl deleted file mode 100644 index f9906b5f..00000000 --- a/include/chunk_size.hrl +++ /dev/null @@ -1 +0,0 @@ --define(CHUNK_SIZE, 5120). diff --git a/include/common.hrl b/include/common.hrl deleted file mode 100644 index 2299950d..00000000 --- a/include/common.hrl +++ /dev/null @@ -1,41 +0,0 @@ - --include_lib("eunit/include/eunit.hrl"). - --define(fmt(Msg, Args), lists:flatten(io_lib:format(Msg, Args))). --define(infoFmt(Msg, Args), error_logger:info_msg(Msg, Args)). --define(infoMsg(Msg), error_logger:info_msg(Msg)). - - -%% from couch_db.hrl --ifndef(LOG_DEBUG). --define(LOG_DEBUG(Format, Args), - showroom_log:message(debug, Format, Args)). --endif. - --ifndef(LOG_INFO). --define(LOG_INFO(Format, Args), - showroom_log:message(info, Format, Args)). --endif. - --ifndef(LOG_ERROR). --define(LOG_ERROR(Format, Args), - showroom_log:message(error, Format, Args)). --endif. - -%% -define(PMAP(F,L), lists:map(F,L)). --define(PMAP(F,L), showroom_utils:pmap(F,L)). - - -%% -%% membership2 (in here for separate testing module) -%% - --define(VERSION,2). - --record(membership, {header=?VERSION, - node, - nodes, - partitions, - version, - fullmap - }). diff --git a/include/config.hrl b/include/config.hrl deleted file mode 100644 index 20983d26..00000000 --- a/include/config.hrl +++ /dev/null @@ -1,24 +0,0 @@ - --ifndef(CONFIG_HRL). --define(CONFIG_HRL, true). -%we don't want to turn protocol buffers on by default, since the library is not included -%it should be very easy for new users to start up an instance --record(config, {n=3, - r=1, - w=1, - q=6, - directory, - web_port, - text_port=11222, - storage_mod=dets_storage, - blocksize=4096, - thrift_port=9200, - pb_port=undefined, - buffered_writes=undefined, - cache=undefined, - cache_size=1048576, - hash_module=partitions, - meta=[] - }). - --endif. diff --git a/include/dmerkle.hrl b/include/dmerkle.hrl deleted file mode 100644 index b4fe2a08..00000000 --- a/include/dmerkle.hrl +++ /dev/null @@ -1,14 +0,0 @@ --define(DMERKLE_VERSION, 2). --define(STATIC_HEADER, 93). - --define(d_from_blocksize(BlockSize), trunc((BlockSize - 17)/16)). --define(pointers_from_blocksize(BlockSize), (lib_misc:ceiling(math:log(BlockSize)/math:log(2)) - 3)). --define(pointer_for_size(Size, BlockSize), (if Size =< 16 -> 1; Size =< BlockSize -> ?pointers_from_blocksize(Size); true -> last end)). --define(size_for_pointer(N), (2 bsl (N+2))). --define(headersize_from_blocksize(BlockSize), (?STATIC_HEADER + ?pointers_from_blocksize(BlockSize) * 8)). --define(aligned(Ptr, HeaderSize, BlockSize), (((Ptr - (HeaderSize)) rem BlockSize) == 0)). --define(block(Ptr, HeaderSize, BlockSize), ((Ptr - (HeaderSize)) div BlockSize)). - --record(node, {m=0, keys=[], children=[], offset=eof}). --record(leaf, {m=0, values=[], offset=eof}). --record(free, {offset,size=0,pointer=0}). diff --git a/include/test.hrl b/include/test.hrl deleted file mode 100644 index 38fb850f..00000000 --- a/include/test.hrl +++ /dev/null @@ -1,13 +0,0 @@ --define(TMP_DIR, "../../../tmp/lib"). - --define(TMP_FILE, fun(File) -> - filename:join(?TMP_DIR, File) - end). - -%% priv_dir() -> -%% Dir = filename:join([t:config(priv_dir), "data", atom_to_list(?MODULE), pid_to_list(self())]), -%% filelib:ensure_dir(filename:join([Dir, atom_to_list(?MODULE)])), -%% Dir. - -%% priv_file(File) -> -%% filename:join(priv_dir(), File). diff --git a/src/bootstrap_manager.erl b/src/bootstrap_manager.erl deleted file mode 100644 index f1303223..00000000 --- a/src/bootstrap_manager.erl +++ /dev/null @@ -1,261 +0,0 @@ -%%%------------------------------------------------------------------- -%%% File: bootstrap_manager.erl -%%% @author Cliff Moon <> [] -%%% @copyright 2009 Cliff Moon -%%% @doc This is the bootstrap manager for a cluster. -%%% -%%% @end -%%% -%%% @since 2009-07-29 by Cliff Moon -%%%------------------------------------------------------------------- --module(bootstrap_manager). --author('cliff@powerset.com'). --author('brad@cloudant.com'). - --behaviour(gen_server). - -%% API --export([start_bootstrap/3, end_bootstrap/1, - start_link/3, start/3, stop/0, - start_transfers/0, transfers/0]). - -%% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). - --record(state, {transfer_list, nodes, transfers, futurefullmap}). --record(transfer, {partition, receivers, rate=0, status=starting}). - --include("../include/config.hrl"). --include("../include/common.hrl"). - -%%==================================================================== -%% API -%%==================================================================== -%%-------------------------------------------------------------------- -%% @spec start_link() -> {ok,Pid} | ignore | {error,Error} -%% @doc Starts the server -%% @end -%%-------------------------------------------------------------------- -start_bootstrap(State=#membership{node=Node, nodes=Nodes}, - OldFullMap, NewFullMap) -> - case partitions:diff(OldFullMap, NewFullMap) of - [] -> - % no difference in pmaps - {NewFullMap, State#membership{fullmap=NewFullMap}}; - TransferList when is_list(TransferList) -> - ?LOG_DEBUG("~nBootstrap~nNode : ~p~nTransferList :~n~p~n", - [Node, partitions:pp_diff(TransferList)]), - case start_link(TransferList, Nodes, NewFullMap) of - {ok, _Pid} -> - start_transfers(); - Other -> throw(Other) - end, - - % bootstrap has some stuff to do (async), so just give the state - % passed in for now. end_bootstrap will be called with the resulting - % state when it completes - {OldFullMap, State}; - Other -> - % probably occurs b/c T (# of nodes) < N currently. - % more nodes joining should help avoid this error. - ?LOG_ERROR("no_bootstrap - Other: ~p", [Other]), - {NewFullMap, State#membership{fullmap=NewFullMap}} - end. - - -end_bootstrap(#state{futurefullmap=FutureFullMap}) -> - end_bootstrap(FutureFullMap); - -end_bootstrap(NewFullMap) -> - gen_server:call(membership, {newfullmap, NewFullMap}), - stop(). - - -start(TransferList, Nodes, FutureFullMap) -> - gen_server:start({global, bootstrap_manager}, ?MODULE, - [TransferList, Nodes, FutureFullMap], []). - - -start_link(TransferList, Nodes, FutureFullMap) -> - gen_server:start_link({global, bootstrap_manager}, ?MODULE, - [TransferList, Nodes, FutureFullMap], []). - - -stop() -> - gen_server:cast({global, bootstrap_manager}, stop). - - -start_transfers() -> - gen_server:cast({global, bootstrap_manager}, start_transfers). - - -transfers() -> - gen_server:call({global, bootstrap_manager}, transfers). - - -%%==================================================================== -%% gen_server callbacks -%%==================================================================== - -%%-------------------------------------------------------------------- -%% @spec init(Args) -> {ok, State} | -%% {ok, State, Timeout} | -%% ignore | -%% {stop, Reason} -%% @doc Initiates the server -%% @end -%%-------------------------------------------------------------------- -init([TransferList, Nodes, FutureFullMap]) -> - process_flag(trap_exit, true), - {ok, #state{transfer_list=TransferList,nodes=Nodes, - futurefullmap=FutureFullMap}}. - - -%%-------------------------------------------------------------------- -%% @spec -%% handle_call(Request, From, State) -> {reply, Reply, State} | -%% {reply, Reply, State, Timeout} | -%% {noreply, State} | -%% {noreply, State, Timeout} | -%% {stop, Reason, Reply, State} | -%% {stop, Reason, State} -%% @doc Handling call messages -%% @end -%%-------------------------------------------------------------------- -handle_call(average_transfer_rate, _From, - State=#state{transfers=Transfers}) -> - {Sum, Cardinality} = ets:foldl( - fun(#transfer{rate=Rate}, {Sum, Cardinality}) -> - {Sum+Rate,Cardinality+1} - end, {0, 0}, Transfers), - AverageRate = Sum / Cardinality, - {reply, AverageRate, State}; - -handle_call(aggregate_transfer_rate, _From, - State=#state{transfers=Transfers}) -> - Sum = ets:foldl(fun(#transfer{rate=Rate}, Sum) -> - Rate + Sum - end, 0, Transfers), - {reply, Sum, State}; - -handle_call(transfers, _From, - State=#state{transfers=Transfers}) -> - {reply, {ok, ets:tab2list(Transfers)}, State}; - -%% at least reply that this 'catch-all' was ignored -handle_call(_Request, _From, State) -> - {reply, ignored, State}. - - -%%-------------------------------------------------------------------- -%% @spec handle_cast(Msg, State) -> {noreply, State} | -%% {noreply, State, Timeout} | -%% {stop, Reason, State} -%% @doc Handling cast messages -%% @end -%%-------------------------------------------------------------------- -handle_cast(stop, State) -> - {stop, normal, State}; - -handle_cast(start_transfers, - State=#state{transfer_list=TransferList}) -> - Transfers = start_transfers(TransferList, State), - {noreply, State#state{transfers=Transfers}}; - -handle_cast(_Msg, State) -> - {noreply, State}. - - -%%-------------------------------------------------------------------- -%% @spec handle_info(Info, State) -> {noreply, State} | -%% {noreply, State, Timeout} | -%% {stop, Reason, State} -%% @doc Handling all non call/cast messages -%% @end -%%-------------------------------------------------------------------- - -handle_info({receiver_done, FromNode, _ToNode, Partition, DbName, Receiver}, - State = #state{transfers=Transfers}) -> - %% TODO use bring_online & ToNode? instead of waiting until end & installing - %% NewFullMap into mem2 - - %% handle the old file - membership2:decommission_part(FromNode, Partition, DbName), - - %% remove from Transfers table - case ets:lookup(Transfers, Partition) of - [Transfer] = [#transfer{receivers=Receivers}] -> - NewReceivers = lists:delete(Receiver, Receivers), - if - length(NewReceivers) == 0 -> ets:delete(Transfers, Partition); - true -> ets:insert(Transfers, Transfer#transfer{receivers=NewReceivers}) - end; - _ -> ok - end, - case ets:first(Transfers) of - '$end_of_table' -> - end_bootstrap(State), - {noreply, State}; - _ -> {noreply, State} - end; - -handle_info(_Info, State) -> - {noreply, State}. - - -%%-------------------------------------------------------------------- -%% @spec terminate(Reason, State) -> void() -%% @doc This function is called by a gen_server when it is about to -%% terminate. It should be the opposite of Module:init/1 and do any necessary -%% cleaning up. When it returns, the gen_server terminates with Reason. -%% The return value is ignored. -%% @end -%%-------------------------------------------------------------------- -terminate(_Reason, _State) -> - ok. - - -%%-------------------------------------------------------------------- -%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState} -%% @doc Convert process state when code is changed -%% @end -%%-------------------------------------------------------------------- -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - - -%%-------------------------------------------------------------------- -%%% Internal functions -%%-------------------------------------------------------------------- -start_transfers([], State) -> - no_transfers, % no diff in pmaps, so no transfers - end_bootstrap(State); - -start_transfers(Diff, State=#state{nodes=Nodes}) -> - case showroom_db:all_databases("") of - {ok, AllDbs} when length(AllDbs) > 0 -> - start_transfers(Diff, Nodes, configuration:get_config(), AllDbs, - ets:new(transfers, [public, set, {keypos, 2}])); - {ok, []} -> end_bootstrap(State); % no databases, so bootstrap not needed - Other -> throw(Other) % problem getting list of dbs - end. - - -start_transfers([], _, _, _, Transfers) -> - Transfers; - -start_transfers([{FromNode, ToNode, Partition} | Diff], Nodes, Config, - AllDbs, Transfers) -> - membership2:take_offline(FromNode, Partition), - Receivers = lists:map( - fun(DbName) -> - {ok, Receiver} = - bootstrap_receiver:start_link(FromNode, ToNode, Partition, - DbName, 10000, self()), - Receiver - end, AllDbs), - % NOTE: by using AllDbs, we are omitting .deleted.couch files - ets:insert(Transfers, #transfer{partition=Partition, - receivers=Receivers}), - start_transfers(Diff, Nodes, Config, AllDbs, Transfers). diff --git a/src/bootstrap_receiver.erl b/src/bootstrap_receiver.erl deleted file mode 100644 index 3b4907cb..00000000 --- a/src/bootstrap_receiver.erl +++ /dev/null @@ -1,121 +0,0 @@ -%%%------------------------------------------------------------------- -%%% File: bootstrap_receiver.erl -%%% @author Brad Anderson <brad@cloudant.com> -%%% @copyright 2009 Brad Anderson -%%% @doc -%%% -%%% @end -%%% -%%% @since 2009-09-22 by Brad Anderson -%%%------------------------------------------------------------------- --module(bootstrap_receiver). --author('brad@cloudant.com'). - --include("../include/config.hrl"). --include("../include/common.hrl"). - -%% API --export([start_link/6, loop/6, fetch_shard/5]). - - -%%==================================================================== -%% API -%%==================================================================== -%%-------------------------------------------------------------------- -%% @spec -%% @doc -%% @end -%%-------------------------------------------------------------------- -start_link(FromNode, ToNode, Partition, DbName, Timeout, Manager) -> - Pid = proc_lib:spawn_link(ToNode, bootstrap_receiver, loop, - [FromNode, Partition, DbName, Timeout, Manager, - self()]), - sync_wait(Pid, Timeout). - - -loop(FromNode, Partition, DbName, Timeout, Manager, Parent) -> - proc_lib:init_ack(Parent, {ok, self()}), - fetch_shard(FromNode, Partition, DbName, Timeout, Manager). - - -%% @doc run at "ToNode" via spawn_link -fetch_shard(FromNode, Partition, DbName, Timeout, Manager) -> - Directory = couch_config:get("couchdb", "database_dir"), - [_NodeName, Hostname] = string:tokens(atom_to_list(FromNode), "@"), - SrcFile = binary_to_list(partitions:shard_name(Partition, DbName)), - DestFile = showroom_utils:full_filename(Partition, DbName, Directory), - Authn = fetch_authn(), - Port = fetch_port(), - Url = lists:concat(["http://", Authn, Hostname, Port, "/", SrcFile, - ".couch"]), - Options = [{save_response_to_file, DestFile}, - {inactivity_timeout, Timeout}], - case filelib:ensure_dir(DestFile) of - ok -> ok; - {error, eexist} -> ok; % duh! - Other -> throw(Other) - end, - ?LOG_DEBUG("~n" - "Directory: ~p~n" - "Hostname : ~p~n" - "SrcFile : ~p~n" - "DestFile : ~p~n" - "Url : ~p~n" - "Options : ~p~n" - , [Directory, Hostname, SrcFile, DestFile, Url, Options]), - case ibrowse:send_req(Url, [], get, [], Options, infinity) of - {ok, "200", _Headers, Body} -> - ?LOG_DEBUG("~nBootstrap ibrowse req Body: ~p~n", [Body]), - Manager ! {receiver_done, FromNode, node(), Partition, DbName, - self()}; - Error -> - ?LOG_ERROR("~nBootstrap ibrowse req Error: ~p~n", [Error]), - throw(Error) - end. - - -%%==================================================================== -%% Internal functions -%%==================================================================== - - -%% from proc_lib.erl in otp r13b01 -sync_wait(Pid, Timeout) -> - receive - {ack, Pid, Return} -> - Return; - {'EXIT', Pid, Reason} -> - {error, Reason} - after Timeout -> - unlink(Pid), - exit(Pid, kill), - flush(Pid), - {error, timeout} - end. - - -flush(Pid) -> - receive - {'EXIT', Pid, _} -> - true - after 0 -> - true - end. - - -fetch_authn() -> - User = couch_config:get("shard_moving", "user", ""), - Pass = couch_config:get("shard_moving", "pass", ""), - if - length(User) > 0 andalso length(Pass) > 0 -> - lists:concat([User, ":", Pass, "@"]); - true -> "" - end. - - -fetch_port() -> - Port = couch_config:get("shard_moving", "port", "8080"), - if - Port =:= "80" -> ""; - true -> lists:concat([":", Port]) - end. diff --git a/src/cluster_ops.erl b/src/cluster_ops.erl deleted file mode 100644 index 72bba92f..00000000 --- a/src/cluster_ops.erl +++ /dev/null @@ -1,264 +0,0 @@ -%%%------------------------------------------------------------------- -%%% File: cluster_ops.erl -%%% @author Brad Anderson <brad@cloudant.com> [http://cloudant.com] -%%% @copyright 2009 Brad Anderson -%%% @doc -%%% -%%% @end -%%% -%%% @since 2009-07-21 by Brad Anderson -%%%------------------------------------------------------------------- --module(cluster_ops). --author('brad@cloudant.com'). - -%% API --export([key_lookup/3, key_lookup/5, - all_parts/4, - some_parts/4, some_parts/5, - quorum_from_each_part/3]). - --include("../include/common.hrl"). --include("../include/config.hrl"). - --include("../include/profile.hrl"). - - -%%==================================================================== -%% API -%%==================================================================== - -%% @doc Get to the proper shard on N nodes by key lookup -%% -%% This fun uses quorum constants from config -key_lookup(Key, {M,F,A}, Access) -> - N = list_to_integer(couch_config:get("cluster", "n", "3")), - key_lookup(Key, {M,F,A}, Access, get_const(Access), N). - - -%% @doc Get to the proper shard on N nodes by key lookup -%% -%% This fun uses a provided quorum constant, possibly from request, -%% possibly from config -key_lookup(Key, {M,F,A}, Access, Const, N) -> - NodeParts = membership2:nodeparts_for_key(Key), - {ResolveFun, NotFoundFun} = case Access of - r -> {fun resolve_read/1, fun resolve_not_found/2}; - w -> {fun resolve_write/1, fun(_,_) -> {false, notused, []} end} - end, - MapFun = fun({Node,Part}) -> - try - rpc:call(Node, M, F, [[Part | A]]) - catch Class:Exception -> - {error, Class, Exception} - end - end, - {GoodReplies, Bad} = pcall(MapFun, NodeParts, N), - if length(Bad) > 0 -> ?LOG_DEBUG("~nBad: ~p~n", [Bad]); true -> ok end, - Good = lists:map(fun strip_ok/1, GoodReplies), - final_key_lookup(Good, Bad, N, Const, ResolveFun, NotFoundFun, Access). - - -%% @doc Do op on all shards (and maybe even replication partners) -all_parts({M,F,A}, Access, AndPartners, ResolveFun) -> - NodePartList = membership2:all_nodes_parts(AndPartners), - MapFun = fun({Node, Part}) -> - try - rpc:call(Node, M, F, [[Part | A]]) - catch Class:Exception -> - {error, Class, Exception} - end - end, - Replies = ?PMAP(MapFun, NodePartList), - {Good, Bad} = lists:partition(fun valid/1, Replies), - final_all_parts(Good, Bad, length(NodePartList), ResolveFun, Access). - - -%% @doc Do op on some shards, depending on list of keys sent in. -%% -%% This fun uses quorum constants from config -some_parts(KeyFun, SeqsKVPairs, {M,F,A}, Access) -> - some_parts(KeyFun, SeqsKVPairs, {M,F,A}, Access, get_const(Access)). - - -%% @doc Do op on some shards, depending on list of keys sent in. -%% -%% This fun uses a provided quorum constant, possibly from request, -%% possibly from config -some_parts(KeyFun, SeqsKVPairs, {M,F,A}, _Access, Const) -> - TaskFun = fun({{Node,Part}, Values}) -> - try - rpc:call(Node, M, F, [[Part | [Values | A]]]) - catch Class:Exception -> - {error, Class, Exception} - end - end, - - % get tasks per node that are part / values for that partition - DistTasks = get_dist_tasks(KeyFun, SeqsKVPairs), - - % With the distributed tasklist in hand, do the tasks per partition. - % For each partition, do the work on all nodes/parts. - TaskReplies = ?PMAP(TaskFun, DistTasks), - {GoodReplies, Bad} = lists:partition(fun valid/1, TaskReplies), - if length(Bad) > 0 -> ?LOG_DEBUG("~nBad: ~p~n", [Bad]); true -> ok end, - Good = lists:map(fun strip_ok/1, GoodReplies), - final_some_parts(Good, Bad, Const). - - -quorum_from_each_part({M,F,A}, Access, ResolveFun) -> - Const = get_const(Access), - {_, Parts} = lists:unzip(membership2:partitions()), - PartsMapFun = fun(Part) -> - Nodes = membership2:nodes_for_part(Part), - NodesMapFun = fun(Node) -> rpc:call(Node, M, F, [[Part | A]]) end, - {GoodReplies,BadReplies} = pcall(NodesMapFun, Nodes, Const), - Good1 = lists:map(fun strip_ok/1, GoodReplies), - Bad1 = case length(Good1) >= Const of - true -> []; - false -> BadReplies - end, - {Good1,Bad1} - end, - Results1 = ?PMAP(PartsMapFun, Parts), - {Good,Bad} = lists:foldl(fun({G,B}, {GAcc,BAcc}) -> - {lists:append(G,GAcc),lists:append(B,BAcc)} - end, {[],[]}, Results1), - if length(Bad) > 0 -> ?LOG_DEBUG("~nBad: ~p~n", [Bad]); true -> ok end, - final_quorum_from_each_part(Good, Bad, length(Parts), ResolveFun, Access). - - -%%-------------------------------------------------------------------- -%% Internal functions -%%-------------------------------------------------------------------- - -final_key_lookup(Good, Bad, N, Const, ResolveFun, NotFoundFun, Access) -> - {NotFound, Return, Reasons} = NotFoundFun(Bad, Const), - if - length(Good) >= Const -> {ok, ResolveFun(Good)}; - NotFound -> {ok, Return, Reasons}; - true -> error_message(Good, Bad, N, Const, Access) - end. - - -final_all_parts(Good, Bad, Total, ResolveFun, Access) -> - case length(Good) =:= Total of - true -> {ok, ResolveFun(Good)}; - _ -> error_message(Good, Bad, Total, Total, Access) - end. - - -final_some_parts(Good, _Bad, Const) -> - Good1 = lists:flatten(Good), - {Seqs, _} = lists:unzip(Good1), - {ResG,ResB} = - lists:foldl( - fun(Seq, {AccG,AccB}) -> - Vals = proplists:get_all_values(Seq, Good1), - case length(Vals) >= Const of - true -> {[{Seq, Vals}|AccG],AccB}; - _ -> {AccG, [{Seq, Vals}|AccB]} - end - end, {[],[]}, lists:usort(Seqs)), - case length(ResB) of - 0 -> {ok, ResG}; - _ -> {error, ResB} - end. - - -final_quorum_from_each_part(Good, Bad, Total, ResolveFun, Access) -> - case length(Good) =:= Total of - true -> {ok, ResolveFun(Good)}; - _ -> error_message(Good, Bad, Total, Total, Access) - end. - - -resolve_read([First|Responses]) -> - case First of - not_found -> not_found; - _ -> lists:foldr(fun vector_clock:resolve/2, First, Responses) - end. - - -resolve_write([First|Responses]) -> - case First of - not_found -> not_found; - _ -> lists:foldr(fun vector_clock:resolve/2, First, Responses) - end. - - -resolve_not_found(Bad, R) -> - {NotFoundCnt, DeletedCnt, OtherReasons} = - lists:foldl(fun({Error,Reason}, {NotFoundAcc, DeletedAcc, ReasonAcc}) -> - case {Error,Reason} of - {not_found, {_Clock, [missing|_Rest]}} -> - {NotFoundAcc+1, DeletedAcc, ReasonAcc}; - {not_found, {_Clock, [deleted|_Rest]}} -> - {NotFoundAcc, DeletedAcc+1, ReasonAcc}; - _ -> - {NotFoundAcc, DeletedAcc, [Reason|ReasonAcc]} - end - end, {0, 0, []}, Bad), - % TODO: is the comparison to R good here, or should it be N-R? - if - NotFoundCnt >= R -> {true, {not_found, missing}, OtherReasons}; - DeletedCnt >= R -> {true, {not_found, deleted}, OtherReasons}; - true -> {false, other, OtherReasons} - end. - - -error_message(Good, Bad, N, T, Access) -> - Msg = list_to_atom(lists:concat([atom_to_list(Access), "_quorum_not_met"])), - ?LOG_ERROR("~p~nSuccess on ~p of ~p servers. Needed ~p. Errors: ~w" - , [Msg, length(Good), N, T, Bad]), - [{error, Msg}, {good, Good}, {bad, Bad}]. - - -pcall(MapFun, Servers, Const) -> - Replies = lib_misc:pmap(MapFun, Servers, Const), - lists:partition(fun valid/1, Replies). - - -valid({ok, _}) -> true; -valid(ok) -> true; -valid(_) -> false. - - -strip_ok({ok, Val}) -> Val; -strip_ok(Val) -> Val. - - -%% @spec get_dist_tasks(KeyFun::function(), KVPairs::list()) -> -%% [{{Node::node(), Part::integer()}, SeqVals}] -%% Type - ordered | ?? -%% SeqVals - [{Seq, Val}] -%% @doc builds a distributed task list of nodes with a list of shard/values. -%% This looks like a dict structure -%% but is a list so we can use ?PMAP with the results -%% @end -get_dist_tasks(KeyFun, SeqsKVPairs) -> - NPSV = lists:flatmap(fun({_,KVPair} = Elem) -> - [{NP, Elem} || NP <- membership2:nodeparts_for_key(KeyFun(KVPair))] - end, SeqsKVPairs), - group_by_key(NPSV). - -group_by_key([]) -> - []; -group_by_key(List) -> - [{FirstK,FirstV} | Rest] = lists:keysort(1,List), - Acc0 = {FirstK, [FirstV], []}, - FoldFun = fun({K,V}, {K,Vs,Acc}) -> - {K, [V|Vs], Acc}; - ({NewKey,V}, {OldKey,Vs,Acc}) -> - {NewKey, [V], [{OldKey,Vs}|Acc]} - end, - {LastK, LastVs, Acc} = lists:foldl(FoldFun, Acc0, Rest), - [{LastK, LastVs} | Acc]. - -get_const(r) -> - list_to_integer(couch_config:get("cluster", "r", "2")); -get_const(w) -> - list_to_integer(couch_config:get("cluster", "w", "2")); -get_const(r1) -> - 1; -get_const(Other) -> - throw({bad_access_term, Other}). diff --git a/src/configuration.erl b/src/configuration.erl deleted file mode 100644 index db44e83c..00000000 --- a/src/configuration.erl +++ /dev/null @@ -1,100 +0,0 @@ -%%% -*- erlang-indent-level:2 -*- -%%%------------------------------------------------------------------- -%%% File: configuration.erl -%%% @author Cliff Moon <cliff@powerset.com> -%%% @author Brad Anderson <brad@cloudant.com> -%%% @copyright 2008 Cliff Moon -%%% @doc -%%% This module keeps Dynomite source relatively unchanged, but -%%% reads from couchdb config stuffs -%%% @end -%%% -%%% @since 2008-07-18 by Cliff Moon -%%%------------------------------------------------------------------- --module(configuration). --author('cliff@powerset.com'). --author('brad@cloudant.com'). - -%%-behaviour(gen_server). - -%% API --export([start_link/1, get_config/1, get_config/0, set_config/1, stop/0]). - --include_lib("eunit/include/eunit.hrl"). - --include("../include/config.hrl"). --include("../include/common.hrl"). - --define(SERVER, couch_config). --define(i2l(V), integer_to_list(V)). --define(l2i(V), list_to_integer(V)). - - -%% ----------------------------------------------------------------- -%% API -%% ----------------------------------------------------------------- - -%% @doc starts couch_config gen_server if it's not already started -start_link(DynomiteConfig) -> - couch_config_event:start_link(), - couch_config:start_link([]), - set_config(DynomiteConfig). - - -%% @doc get the config for a remote node -get_config(Node) -> - ClusterConfig = rpc:call(Node, couch_config, get, ["cluster"]), - Directory = rpc:call(Node, couch_config, get, ["couchdb", "database_dir"]), - couch2dynomite_config(ClusterConfig, Directory). - - -%% @doc get the config for the local node -get_config() -> - get_config(node()). - - -%% @doc given a Dynomite config record, put the values into the Couch config -set_config(DynomiteConfig) -> - dynomite2couch_config(DynomiteConfig). - - -%% @doc stop the config server (nothing to do until after couch_config refactor) -stop() -> - couch_config:stop(). - - -%% ----------------------------------------------------------------- -%% Internal functions -%% ----------------------------------------------------------------- - -%% @doc turn a couch config proplist into a dynomite configuration record -couch2dynomite_config(ClusterConfig, Directory) -> - Q = ?l2i(couch_util:get_value("q", ClusterConfig, "3")), - R = ?l2i(couch_util:get_value("r", ClusterConfig, "2")), - W = ?l2i(couch_util:get_value("w", ClusterConfig, "1")), - N = ?l2i(couch_util:get_value("n", ClusterConfig, "4")), - %% use couch's database_dir here, to avoid /tmp/data not existing - Webport = ?l2i(couch_util:get_value("webport", ClusterConfig, "8080")), - Meta = couch_util:get_value("meta", ClusterConfig, []), - StorageMod = couch_util:get_value("storage_mod", ClusterConfig, []), - #config{q=Q, r=R, w=W, n=N, directory=Directory, web_port=Webport, - meta=Meta, storage_mod=StorageMod}. - - -%% @doc workhorse for set_config/1 above -dynomite2couch_config(DynomiteConfig) -> - couch_config:set("cluster", "q", ?i2l(DynomiteConfig#config.q), false), - couch_config:set("cluster", "r", ?i2l(DynomiteConfig#config.r), false), - couch_config:set("cluster", "w", ?i2l(DynomiteConfig#config.w), false), - couch_config:set("cluster", "n", ?i2l(DynomiteConfig#config.n), false), - couch_config:set("couchdb", "database_dir", DynomiteConfig#config.directory, - false), - couch_config:set("cluster", "webport", - case DynomiteConfig#config.web_port of - undefined -> "8080"; - _ -> ?i2l(DynomiteConfig#config.web_port) - end, false), - couch_config:set("cluster", "meta", DynomiteConfig#config.meta, false), - couch_config:set("cluster", "storage_mod", - DynomiteConfig#config.storage_mod, false), - ok. diff --git a/src/dynomite_couch_api.erl b/src/dynomite_couch_api.erl deleted file mode 100644 index 554b84f6..00000000 --- a/src/dynomite_couch_api.erl +++ /dev/null @@ -1,140 +0,0 @@ -%% This is a Dynomite plugin for calling the CouchDB raw Erlang API -%% -%% Most calls will have come from any of the web endpoints to execute -%% these functions on the proper node for the key(s). - --module(dynomite_couch_api). --author('brad@cloudant.com'). - --export([create_db/1, delete_db/1, get/1, put/1, - bulk_docs/1, missing_revs/1, get_db_info/1, get_view_group_info/1, - ensure_full_commit/1 - ]). - --include("../../couch/src/couch_db.hrl"). --include("../include/common.hrl"). - - -%%-------------------------------------------------------------------- -%% @spec create_db([Part, DbName, Options]) -> {ok,Db} | {error,Error} -%% Description: Creates the database shard. -%%-------------------------------------------------------------------- -create_db([Part, DbName, Options]) -> - case couch_server:create(partitions:shard_name(Part, DbName), Options) of - {ok, Shard} -> - couch_db:close(Shard), - ok; - Error -> Error - end. - - -%%-------------------------------------------------------------------- -%% @spec delete_db([Part, DbName, Options]) -> {ok,deleted} | {error,Error} -%% Description: Deletes the database shard. -%%-------------------------------------------------------------------- -delete_db([Part, DbName, Options]) -> - couch_server:delete(partitions:shard_name(Part, DbName), Options). - - -get([Part, Db, DocId, Revs, Options]) -> - case showroom_db:open_shard(node(), Part, Db) of - {ok, Shard} -> - {Status, Doc} = couch_api:open_doc(Shard, DocId, Revs, Options), - showroom_db:close_shard(Shard), - {Status, {[], [Doc]}}; - Error -> - Error - end. - - -put([Part, Db, Doc, Options]) -> - case showroom_db:open_shard(node(), Part, Db) of - {ok, Shard} -> - {Status, NewRev} = couch_db:update_doc(Shard, Doc, Options), - showroom_db:close_shard(Shard), - {Status, [NewRev]}; - Error -> - Error - end. - - -bulk_docs([Part, SeqsDocs, Db, Options, Type]) -> - {Seqs, Docs} = lists:unzip(SeqsDocs), - case Docs of - [] -> {ok, []}; - _ -> - case showroom_db:open_shard(node(), Part, Db) of - {ok, Shard} -> - {ok, Results1} = couch_db:update_docs(Shard, Docs, Options, Type), - showroom_db:close_shard(Shard), - Results = int_zip(Seqs, Results1), - {ok, Results}; - Error -> - Error - end - end. - - -missing_revs([Part, SeqsIdsRevs, Db]) -> - {_Seqs, IdsRevs} = lists:unzip(SeqsIdsRevs), - case IdsRevs of - [] -> {ok, []}; - _ -> - case showroom_db:open_shard(node(), Part, Db) of - {ok, Shard} -> - {ok, Results1} = couch_db:get_missing_revs(Shard, IdsRevs), - showroom_db:close_shard(Shard), - {ok, Results1}; - Error -> - Error - end - end. - - -get_db_info([Part, Db]) -> - case showroom_db:open_shard(node(), Part, Db) of - {ok, Shard} -> - {Status, Info} = couch_db:get_db_info(Shard), - showroom_db:close_shard(Shard), - {Status, {[], Info}}; - Error -> - Error - end. - -get_view_group_info([Part, Db, DesignId]) -> - case showroom_db:open_shard(node(), Part, Db) of - {ok, Shard} -> - {ok, EmptyGroup} = showroom_view:build_skeleton_view_group(Db, DesignId), - <<"S", ShardName/binary>> = Shard#db.name, - {ok, Pid} = gen_server:call(couch_view, {get_group_server, - ShardName, EmptyGroup}), - {ok, Info} = couch_view_group:request_group_info(Pid), - showroom_db:close_shard(Shard), - {ok, {[], Info}}; - Error -> - Error - end. - - -ensure_full_commit([Part, Db]) -> - case showroom_db:open_shard(node(), Part, Db) of - {ok, Shard} -> - {Status, Info} = couch_db:ensure_full_commit(Shard), - showroom_db:close_shard(Shard), - {Status, {[], Info}}; - Error -> - Error - end. - - -%% ======================= -%% internal -%% ======================= - -int_zip(Seqs, Docs) when length(Seqs) == length(Docs) -> - lists:zip(Seqs, Docs); -int_zip(_Seqs, []) -> - []; -int_zip(Seqs, Docs) -> - ?debugFmt("~nWTF? int_zip~nSeqs: ~p~nDocs: ~p~n", [Seqs, Docs]), - []. diff --git a/src/dynomite_couch_storage.erl b/src/dynomite_couch_storage.erl deleted file mode 100644 index 4fd21b80..00000000 --- a/src/dynomite_couch_storage.erl +++ /dev/null @@ -1,41 +0,0 @@ -%%%------------------------------------------------------------------- -%%% File: dynomite_couch_storage.erl -%%% @author Brad Anderson -%%% @copyright 2009 Brad Anderson -%%% @doc -%%% -%%% @end -%%% -%%% @since 2009-07-14 -%%%------------------------------------------------------------------- --module(dynomite_couch_storage). --author('brad@cloudant.com'). - -%% API --export([name/1, open/2, close/1, create/2]). -%% , close/1, get/2, put/4, has_key/2, delete/2, fold/3 - --include_lib("../include/common.hrl"). - -%% -record(row, {key, context, values}). - -%%==================================================================== -%% API -%%==================================================================== - -name(Boundary) -> - showroom_utils:int_to_hexstr(Boundary). - -open(Directory, Name) -> -%% ?debugFmt("~nDirectory: ~p~nName : ~p~n", [Directory,Name]), - {ok, {Directory, Name}}. - -close(_Table) -> ok. - -create(_Directory, _Name) -> - ok. - - -%%==================================================================== -%% Internal functions -%%==================================================================== diff --git a/src/lib_misc.erl b/src/lib_misc.erl deleted file mode 100644 index f5449295..00000000 --- a/src/lib_misc.erl +++ /dev/null @@ -1,235 +0,0 @@ --module(lib_misc). - --define(OFFSET_BASIS, 2166136261). --define(FNV_PRIME, 16777619). - --export([rm_rf/1, pmap/3, succ/1, fast_acc/3, hash/1, hash/2, fnv/1, - nthdelete/2, zero_split/1, nthreplace/3, rand_str/1, position/2, - shuffle/1, floor/1, ceiling/1, time_to_epoch_int/1, - time_to_epoch_float/1, now_int/0, now_float/0, byte_size/1, listify/1, - reverse_bits/1]). - --include("../include/config.hrl"). --include("../include/profile.hrl"). - - -rm_rf(Name) when is_list(Name) -> - case filelib:is_dir(Name) of - false -> - file:delete(Name); - true -> - case file:list_dir(Name) of - {ok, Filenames} -> - lists:foreach(fun rm_rf/1, [ filename:join(Name, F) || F <- Filenames]), - file:del_dir(Name); - {error, Reason} -> error_logger:info_msg("rm_rf failed because ~p~n", [Reason]) - end - end. - -zero_split(Bin) -> - zero_split(0, Bin). - -zero_split(N, Bin) when N > erlang:byte_size(Bin) -> Bin; - -zero_split(N, Bin) -> - case Bin of - <<_:N/binary, 0:8, _/binary>> -> split_binary(Bin, N); - _ -> zero_split(N+1, Bin) - end. - -rand_str(N) -> - lists:map(fun(_I) -> - random:uniform(26) + $a - 1 - end, lists:seq(1,N)). - -nthreplace(N, E, List) -> - lists:sublist(List, N-1) ++ [E] ++ lists:nthtail(N, List). - -nthdelete(N, List) -> - nthdelete(N, List, []). - -nthdelete(0, List, Ret) -> - lists:reverse(Ret) ++ List; - -nthdelete(_, [], Ret) -> - lists:reverse(Ret); - -nthdelete(1, [_E|L], Ret) -> - nthdelete(0, L, Ret); - -nthdelete(N, [E|L], Ret) -> - nthdelete(N-1, L, [E|Ret]). - -floor(X) -> - T = erlang:trunc(X), - case (X - T) of - Neg when Neg < 0 -> T - 1; - Pos when Pos > 0 -> T; - _ -> T - end. - -ceiling(X) -> - T = erlang:trunc(X), - case (X - T) of - Neg when Neg < 0 -> T; - Pos when Pos > 0 -> T + 1; - _ -> T - end. - -succ([]) -> - []; - -succ(Str) -> - succ_int(lists:reverse(Str), []). - -succ_int([Char|Str], Acc) -> - if - Char >= $z -> succ_int(Str, [$a|Acc]); - true -> lists:reverse(lists:reverse([Char+1|Acc]) ++ Str) - end. - -fast_acc(_, Acc, 0) -> Acc; - -fast_acc(Fun, Acc, N) -> - fast_acc(Fun, Fun(Acc), N-1). - -shuffle(List) when is_list(List) -> - [ N || {_R,N} <- lists:keysort(1, [{random:uniform(),X} || X <- List]) ]. - -pmap(Fun, List, ReturnNum) -> - N = if - ReturnNum > length(List) -> length(List); - true -> ReturnNum - end, - SuperParent = self(), - SuperRef = erlang:make_ref(), - Ref = erlang:make_ref(), - %% we spawn an intermediary to collect the results - %% this is so that there will be no leaked messages sitting in our mailbox - Parent = spawn(fun() -> - L = gather(N, length(List), Ref, []), - SuperParent ! {SuperRef, pmap_sort(List, L)} - end), - Pids = [spawn(fun() -> - Parent ! {Ref, {Elem, (catch Fun(Elem))}} - end) || Elem <- List], - Ret = receive - {SuperRef, Ret1} -> Ret1 - end, - % i think we need to cleanup here. - lists:foreach(fun(P) -> exit(P, die) end, Pids), - Ret. - -pmap_sort(Original, Results) -> - pmap_sort([], Original, lists:reverse(Results)). - -% pmap_sort(Sorted, [], _) -> lists:reverse(Sorted); -pmap_sort(Sorted, _, []) -> lists:reverse(Sorted); -pmap_sort(Sorted, [E|Original], Results) -> - case lists:keytake(E, 1, Results) of - {value, {E, Val}, Rest} -> pmap_sort([Val|Sorted], Original, Rest); - false -> pmap_sort(Sorted, Original, Results) - end. - -gather(_, Max, _, L) when length(L) == Max -> L; -gather(0, _, _, L) -> L; -gather(N, Max, Ref, L) -> - receive - {Ref, {Elem, {not_found, Ret}}} -> gather(N, Max, Ref, [{Elem, {not_found, Ret}}|L]); - {Ref, {Elem, {badrpc, Ret}}} -> gather(N, Max, Ref, [{Elem, {badrpc, Ret}}|L]); - {Ref, {Elem, {'EXIT', Ret}}} -> gather(N, Max, Ref, [{Elem, {'EXIT', Ret}}|L]); - {Ref, Ret} -> gather(N-1, Max, Ref, [Ret|L]) - end. - -get_hash_module(#config{hash_module=HashModule}) -> - HashModule. - -hash(Term) -> - HashModule = get_hash_module(configuration:get_config()), - ?prof(hash), - R = HashModule:hash(Term), - ?forp(hash), - R. - -hash(Term, Seed) -> - HashModule = get_hash_module(configuration:get_config()), - ?prof(hash), - R = HashModule:hash(Term, Seed), - ?forp(hash), - R. - -%32 bit fnv. magic numbers ahoy -fnv(Term) when is_binary(Term) -> - fnv_int(?OFFSET_BASIS, 0, Term); - -fnv(Term) -> - fnv_int(?OFFSET_BASIS, 0, term_to_binary(Term)). - -fnv_int(Hash, ByteOffset, Bin) when erlang:byte_size(Bin) == ByteOffset -> - Hash; - -fnv_int(Hash, ByteOffset, Bin) -> - <<_:ByteOffset/binary, Octet:8, _/binary>> = Bin, - Xord = Hash bxor Octet, - fnv_int((Xord * ?FNV_PRIME) rem (2 bsl 31), ByteOffset+1, Bin). - -position(Predicate, List) when is_function(Predicate) -> - position(Predicate, List, 1); - -position(E, List) -> - position(E, List, 1). - -position(Predicate, [], _N) when is_function(Predicate) -> false; - -position(Predicate, [E|List], N) when is_function(Predicate) -> - case Predicate(E) of - true -> N; - false -> position(Predicate, List, N+1) - end; - -position(_, [], _) -> false; - -position(E, [E|_List], N) -> N; - -position(E, [_|List], N) -> position(E, List, N+1). - -now_int() -> - time_to_epoch_int(now()). - -now_float() -> - time_to_epoch_float(now()). - -time_to_epoch_int(Time) when is_integer(Time) or is_float(Time) -> - Time; - -time_to_epoch_int({Mega,Sec,_}) -> - Mega * 1000000 + Sec. - -time_to_epoch_float(Time) when is_integer(Time) or is_float(Time) -> - Time; - -time_to_epoch_float({Mega,Sec,Micro}) -> - Mega * 1000000 + Sec + Micro / 1000000. - -byte_size(List) when is_list(List) -> - lists:foldl(fun(El, Acc) -> Acc + lib_misc:byte_size(El) end, 0, List); - -byte_size(Term) -> - erlang:byte_size(Term). - -listify(List) when is_list(List) -> - List; - -listify(El) -> [El]. - -reverse_bits(V) when is_integer(V) -> - % swap odd and even bits - V1 = ((V bsr 1) band 16#55555555) bor (((V band 16#55555555) bsl 1) band 16#ffffffff), - % swap consecutive pairs - V2 = ((V1 bsr 2) band 16#33333333) bor (((V1 band 16#33333333) bsl 2) band 16#ffffffff), - % swap nibbles ... - V3 = ((V2 bsr 4) band 16#0F0F0F0F) bor (((V2 band 16#0F0F0F0F) bsl 4) band 16#ffffffff), - % swap bytes - V4 = ((V3 bsr 8) band 16#00FF00FF) bor (((V3 band 16#00FF00FF) bsl 8) band 16#ffffffff), - % swap 2-byte long pairs - ((V4 bsr 16) band 16#ffffffff) bor ((V4 bsl 16) band 16#ffffffff). diff --git a/src/mem_utils.erl b/src/mem_utils.erl deleted file mode 100644 index ffefd5cb..00000000 --- a/src/mem_utils.erl +++ /dev/null @@ -1,129 +0,0 @@ --module(mem_utils). - --export([fix_mappings/3, get_remote_fullmap/1, join_type/3, pmap_from_full/1, - nodeparts_up/1, remove_partition/3, use_persistent/2, - was_i_nodedown/2]). - --include("../include/common.hrl"). - -join_type(Node, Fullmap, Options) -> - case proplists:get_value(replace, Options) of - undefined -> - case lists:filter(fun({N,_P,_T}) -> N =:= Node end, Fullmap) of - [] -> new; - _ -> rejoin - end; - OldNode when is_atom(OldNode) -> - % not a particularly strong guard, but will have to do - {replace, OldNode}; - _ -> new - end. - - -%% @doc return a {PMap, Fullmap} tuple that has corrections for -%% down, rejoining, or replacing Node -fix_mappings(nodedown, Node, OldFullmap) -> - fix_mappings_fold(fun({N,P,T}, AccIn) -> - case {N,T} of - {Node, {nodedown, Type}} -> - % already marked as nodedown, so leave it - [{N,P, {nodedown, Type}} | AccIn]; - {Node, _} -> - % mark it as nodedown - [{N,P, {nodedown, T}} | AccIn]; - _ -> [{N,P,T} | AccIn] - end - end, [], OldFullmap); - -fix_mappings(rejoin, Node, OldFullmap) -> - fix_mappings_fold(fun({N,P,{nodedown,T}}, AccIn) when N =:= Node -> - [{N,P,T} | AccIn]; - (NPT, AccIn) -> [NPT | AccIn] - end, [], OldFullmap); - -fix_mappings(replace, {OldNode, NewNode}, OldFullmap) -> - fix_mappings_fold(fun({N,P,T}, AccIn) -> - case {N, T} of - {OldNode, {nodedown,T1}} -> [{NewNode,P,T1} | AccIn]; - {OldNode, _} -> [{NewNode,P,T} | AccIn]; - _ -> [{N,P,T} | AccIn] - end - end, [], OldFullmap). - - -fix_mappings_fold(Fun, Acc0, OldFullmap) -> - NewFullmap = lists:foldl(Fun, Acc0, OldFullmap), - NewPMap = pmap_from_full(NewFullmap), - {NewPMap, NewFullmap}. - - -%% @doc create a PMap (primary nodes only) from provided Fullmap -%% If a primary node is down, a partner will be supplied -pmap_from_full(Fullmap) -> - NodePartList = nodeparts_up(Fullmap), - lists:keysort(2,lists:foldl(fun({N,P,T}, AccIn) -> - case T of - primary -> [{N,P} | AccIn]; - {nodedown, primary} -> - NewNode = case lists:delete(N, - membership2:nodes_for_part(P, NodePartList)) of - [First|_] -> First; - [] -> N % wtf, are all partners down too? - end, - [{NewNode,P} | AccIn]; - _ -> AccIn - end - end, [], Fullmap)). - - -nodeparts_up(Fullmap) -> - lists:foldl(fun({_N,_P,{nodedown,_}}, AccIn) -> AccIn; - ({N,P,_T}, AccIn) -> [{N,P} | AccIn] - end, [], Fullmap). - - - -%% @doc if Node is in the Fullmap as {nodedown,_} return true -was_i_nodedown(Node, Fullmap) -> - lists:member(yes, lists:map(fun({N,_P,{nodedown,_T}}) -> - case N of - Node -> yes; - _ -> no - end; - (_) -> no - end, Fullmap)). - - -remove_partition(FullMap, Node, Partition) -> - case lists:filter( - fun({N,P,_Type}) -> N =:= Node andalso P =:= Partition end, - FullMap) of - [Elem|_] -> - lists:delete(Elem, FullMap); - Other -> - ?LOG_ERROR("~nNo partition to remove: ~p~n" - "Node: ~p~nPartition: ~p~n", [Other, Node, Partition]), - FullMap - end. - - -use_persistent(_PartnersPlus, undefined) -> - false; - -use_persistent(PartnersPlus, _PersistentParts) -> - % get a fullmap from a partner - % this may need rework for network partitions, as you could get a bad - % fullmap from another node that was partitioned w/ this one :\ - RemoteFullmap = get_remote_fullmap(PartnersPlus), - % return opposite of was_i_nodedown - not mem_utils:was_i_nodedown(node(), RemoteFullmap). - - -get_remote_fullmap([]) -> - []; % no remote fullmap available, so return empty list - -get_remote_fullmap([Node|Rest]) -> - case gen_server:call({membership, Node}, fullmap) of - {ok, Fullmap} -> Fullmap; - _ -> get_remote_fullmap(Rest) - end. diff --git a/src/membership2.erl b/src/membership2.erl deleted file mode 100644 index 4c4780c3..00000000 --- a/src/membership2.erl +++ /dev/null @@ -1,686 +0,0 @@ -%%%------------------------------------------------------------------- -%%% File: membership2.erl -%%% @author Cliff Moon <cliff@powerset.com> [] -%%% @copyright 2009 Cliff Moon -%%% @doc -%%% -%%% @end -%%% -%%% @since 2009-05-04 by Cliff Moon -%%%------------------------------------------------------------------- --module(membership2). --author('cliff@powerset.com'). --author('brad@cloudant.com'). - --behaviour(gen_server). - -%% API --export([start_link/2, start_link/3, stop/1, check_nodes/0, - partitions/0, partition_for_key/1, fullmap/0, - all_nodes_parts/1, clock/0, - nodes/0, nodeparts_for_key/1, nodes_for_part/1, nodes_for_part/2, - nodes_for_shard/1, nodes_down/0, - parts_for_node/1, - take_offline/2, bring_online/2, - decommission_part/3, pp_fullmap/0, snafu/1, snafu/3]). - - -%% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). - -%% includes --include("../include/config.hrl"). --include("../include/common.hrl"). --include("../include/profile.hrl"). --include_lib("eunit/include/eunit.hrl"). - -%%==================================================================== -%% API -%%==================================================================== -%% @doc Starts the server -%% @end -%%-------------------------------------------------------------------- - -start_link(Node, Nodes) -> - start_link(Node, Nodes, []). - - -start_link(Node, Nodes, Args) -> - gen_server:start_link({local, membership}, ?MODULE, [Node, Nodes, Args], []). - - -stop(Server) -> - gen_server:cast(Server, stop). - - -%% @doc for when things have really gone south. Install a new state on all -%% nodes, given a filename, or node list, partition map, and fullmap. -%% @end -snafu(Filename) -> - NewState = case file:consult(Filename) of - {ok, [Terms]} -> - Terms; - Error -> - throw(Error) - end, - #membership{nodes=Nodes, partitions=PMap, fullmap=Fullmap} = NewState, - snafu(Nodes, PMap, Fullmap). - - -snafu(Nodes, PMap, Fullmap) -> - NewState = #membership{node=node(), nodes=Nodes, - partitions=PMap, fullmap=Fullmap, version=vector_clock:create(dbcore)}, - update_ets(ets_name(node()), NewState), - fire_gossip(node(), Nodes, NewState), - save(NewState). - - -check_nodes() -> - ErlangNodes = lists:usort([node() | erlang:nodes()]), - {ok, MemNodeList} = membership2:nodes(), - MemNodes = lists:usort(MemNodeList), - {PMapNodeList, _PMapPartList} = lists:unzip(partitions()), - PMapNodes = lists:usort(PMapNodeList), - case ErlangNodes =:= MemNodes andalso - ErlangNodes =:= PMapNodes andalso - MemNodes =:= PMapNodes of - true -> true; - _ -> - Msg = "membership: Node Lists do not match.~n" - "Erlang Nodes : ~p~n" - "Membership Nodes : ~p~n" - "PMap Nodes : ~p~n", - Lst = [ErlangNodes, MemNodes, PMapNodes], - showroom_log:message(error, Msg, Lst), - io:format(Msg, Lst), - false - end. - - -%% @doc retrieve the primary partition map. This is a list of partitions and -%% their corresponding primary node, no replication partner nodes. -partitions() -> - ets_pmap(). - - -%% @doc retrieve the full partition map, like above, but including replication -%% partner nodes. List should number 2^Q * N -fullmap() -> - lists:keysort(2, ets_fullmap()). - - -%% @doc pretty-print the full partition map (sorted by node, then part) -pp_fullmap() -> - lists:foreach( - fun({N,P}) -> - io:format("~-60s ~s~n", [N, showroom_utils:int_to_hexstr(P)]) - end, - lists:sort(membership2:all_nodes_parts(true))). - - -%% @doc get the current vector clock from membership state -clock() -> - gen_server:call(membership, clock). - - -%% @doc get the list of cluster nodes (according to membership module) -%% This may differ from erlang:nodes() -nodes() -> - gen_server:call(membership, nodes). - - -%% @doc get all the responsible nodes for a given partition, including -%% replication partner nodes -nodes_for_part(Part) -> - nodes_for_part(Part, all_nodes_parts(true)). - - -nodes_for_part(Part, NodePartList) -> - Filtered = lists:filter(fun({_N, P}) -> P =:= Part end, NodePartList), - {Nodes, _Parts} = lists:unzip(Filtered), - lists:usort(Nodes). - - -nodes_for_shard(ShardName) when is_binary(ShardName) -> - nodes_for_shard(binary_to_list(ShardName)); - -nodes_for_shard(ShardName) when is_list(ShardName) -> - HexPart = case string:rchr(ShardName, $_) + 1 of - 1 -> ShardName; - Last -> string:substr(ShardName, Last) - end, - Int = showroom_utils:hexstr_to_int(HexPart), - {_, Parts} = lists:unzip(membership2:partitions()), - nodes_for_part(partitions:int_to_partition(Int, Parts)). - - -%% @doc get all the responsible nodes and partitions for a given key, including -%% nodes/parts on replication partner nodes -nodeparts_for_key(Key) -> - int_node_parts_for_key(Key). - - -%% @doc get a list of all the nodes marked down in this node's fullmap -nodes_down() -> - Downs = lists:foldl(fun({N,_P,{nodedown, _T}}, AccIn) -> [N|AccIn]; - (_, AccIn) -> AccIn end, [], fullmap()), - lists:usort(Downs). - - -%% @doc return the partition responsible for the given Key -partition_for_key(Key) -> - Config = configuration:get_config(), - Hash = lib_misc:hash(Key), - partitions:hash_to_partition(Hash, Config#config.q). - - -%% @doc return the partitions that reside on a given node -parts_for_node(Node) -> - lists:sort(lists:foldl(fun({N,P,_Type}, AccIn) -> - case N of - Node -> [P | AccIn]; - _ -> AccIn - end - end, [], fullmap())). - - -%% @doc get all the nodes and partitions in the cluster. Depending on the -%% AllPartners param, you get only primary nodes or replication partner -%% nodes, as well. -%% No nodes/parts currently down are returned. -all_nodes_parts(false) -> - ets_pmap(); -all_nodes_parts(true) -> - mem_utils:nodeparts_up(ets_fullmap()). - - -%% @doc If a local storage server exists for this partition it will be taken -%% out of rotation until put back in. -%% @end -take_offline(Node, Partition) when Node =:= node() -> - gen_server:call(membership, {take_offline, Partition}); - -take_offline(Node, Partition)-> - gen_server:call({membership, Node}, {take_offline, Partition}). - - -%% @doc Brings a storage server that has been taken offline back online. -%% @end -bring_online(Node, Partition) -> - showroom_log:message(debug, "membership: bring_online Node: ~p Partition: ~p", - [Node, Partition]), - gen_server:call({membership, Node}, {bring_online, Partition}). - - -%% @doc cleans up the remaining .couch shard/partition file after it has been -%% moved to a new node. -decommission_part(Node, Part, DbName) -> - gen_server:cast({membership, Node}, {decommission, Part, DbName}). - - -%%==================================================================== -%% gen_server callbacks -%%==================================================================== - -%%-------------------------------------------------------------------- -%% @spec init(Args) -> {ok, State} | -%% {ok, State, Timeout} | -%% ignore | -%% {stop, Reason} -%% @doc Initiates the server -%% @end -%%-------------------------------------------------------------------- -init([Node, Nodes, Args]) -> - process_flag(trap_exit,true), - showroom_log:message(info, "membership: membership server starting...", []), - Options = lists:flatten(Args), - showroom_log:message(info, "membership: options ~p", [Options]), - net_kernel:monitor_nodes(true), - Config = configuration:get_config(), - PersistentState=#membership{partitions=PersistentParts} = load(Node), - PartnersPlus = replication:partners_plus(Node, Nodes), - State = - case mem_utils:use_persistent(PartnersPlus, PersistentParts) of - false -> - showroom_log:message(info, "membership: not using persisted state", []), - % didn't find persistent state on disk or this node was nodedown - % so we don't want to use persisted state - PartialNodes = lists:usort(Nodes), - {NewVersion, RemoteNodes, NewPMap1, NewFullMap1} = - join_to(Node, PartnersPlus, Options), - NewWorldNodes = lists:usort(PartialNodes ++ RemoteNodes), - NewPMap = case NewPMap1 of - [] -> partitions:create_partitions(Config#config.q, Node, - NewWorldNodes); - _ -> NewPMap1 - end, - NewFullMap = case NewFullMap1 of - [] -> make_all_nodes_parts(NewPMap); - _ -> NewFullMap1 - end, - #membership{ - node=Node, - nodes=NewWorldNodes, - partitions=lists:keysort(2,NewPMap), - % version=vector_clock:increment(dbcore, NewVersion), - version=NewVersion, - fullmap=NewFullMap}; - _ -> - % found persistent state on disk - showroom_log:message(info, "membership: using persisted state", []), - case Options of - [] -> ok; - _ -> - showroom_log:message(info, "membership: options ~p ignored.", [Options]) - end, - %% fire gossip even if state comes from disk - fire_gossip(Node, Nodes, PersistentState), - PersistentState - end, - save(State), - % ets table is an optimization for cluster_ops performance - Ets = ets:new(ets_name(Node), [public, set, named_table]), - update_ets(Ets, State), - {ok, State}. - - -%%-------------------------------------------------------------------- -%% @spec -%% handle_call(Request, From, State) -> {reply, Reply, State} | -%% {reply, Reply, State, Timeout} | -%% {noreply, State} | -%% {noreply, State, Timeout} | -%% {stop, Reason, Reply, State} | -%% {stop, Reason, State} -%% @doc Handling call messages -%% @end -%%-------------------------------------------------------------------- - -%% join -handle_call({join, JoiningNode, Options}, _From, - State = #membership{version=Version, node=Node, nodes=Nodes, - partitions=Partitions, fullmap=OldFullMap}) -> - JoinType = mem_utils:join_type(JoiningNode, OldFullMap, Options), - showroom_log:message(alert, "membership: node ~p wants to join, type '~p'", - [JoiningNode, JoinType]), - {PMap, NewFullmap} = case JoinType of - rejoin -> - mem_utils:fix_mappings(rejoin, JoiningNode, OldFullMap); - {replace, OldNode} -> - mem_utils:fix_mappings(replace, {OldNode, JoiningNode}, OldFullMap); - new -> - Hints = proplists:get_value(hints, Options), - PMap1 = case partitions:join(JoiningNode, Partitions, Hints) of - {ok, Table} -> Table; - {error, Error, _Table} -> throw({join_error, Error}) - end, - Fullmap1 = make_all_nodes_parts(PMap1), - {PMap1, Fullmap1} - end, - WorldNodes = lists:usort(Nodes ++ [JoiningNode]), - NewVersion = vector_clock:increment(dbcore, Version), - NewState1 = State#membership{nodes=WorldNodes, partitions=PMap, - version=NewVersion}, - {Fullmap, NewState2} = case proplists:get_value(bootstrap, Options) of - true -> - % join not complete until bootstrap finishes, - % so this NewState isn't the final (i.e. NewState1 will be installed) - showroom_log:message(info, "membership: bootstrap process starting", []), - bootstrap_manager:start_bootstrap(NewState1, OldFullMap, NewFullmap); - _ -> - % no bootstrap, so install NewFullmap now - showroom_log:message(info, "membership: no bootstrap", []), - {NewFullmap, NewState1#membership{fullmap=NewFullmap}} - end, - save(NewState2), - update_ets(ets_name(node()), NewState2), - notify(node_join, [JoiningNode]), - fire_gossip(Node, WorldNodes, NewState2), - % If we're bootstrapping, then the join is not complete. - % So return FullMap for now. bootstrap_manager:end_bootstrap will fix it - {reply, {ok, NewVersion, WorldNodes, PMap, Fullmap}, NewState2}; - -%% clock -handle_call(clock, _From, State = #membership{version=Version}) -> - {reply, Version, State}; - -%% state -handle_call(state, _From, State) -> - {reply, State, State}; - -%% newfullmap -handle_call({newfullmap, NewFullMap}, _From, - State = #membership{node=Node, nodes=Nodes, version=Version}) -> - NewVersion = vector_clock:increment(dbcore, Version), - NewState = State#membership{version=NewVersion, fullmap=NewFullMap}, - save(NewState), - update_ets(ets_name(node()), NewState), - fire_gossip(Node, Nodes, NewState), - {reply, installed, NewState}; - -%% partitions -handle_call(partitions, _From, State = #membership{partitions=Parts}) -> - {reply, {ok, Parts}, State}; - -%% fullmap -handle_call(fullmap, _From, State = #membership{fullmap=FullMap}) -> - {reply, {ok, FullMap}, State}; - -%% nodes -handle_call(nodes, _From, State = #membership{nodes=Nodes}) -> - {reply, {ok, Nodes}, State}; - -%% take_offline -handle_call({take_offline, Partition}, _From, - State = #membership{node=Node, nodes=Nodes, fullmap=OldFullMap}) -> - showroom_log:message(info, "membership: take_offline Node: ~p Partition: ~p", - [Node, Partition]), - NewFullMap = mem_utils:remove_partition(OldFullMap, Node, Partition), - NewState = State#membership{fullmap=NewFullMap}, - fire_gossip(Node, Nodes, NewState), - update_ets(ets_name(node()), NewState), - {reply, {offline, Node, Partition}, NewState}; - -%% at least reply that this 'catch-all' was ignored -handle_call(_Request, _From, State) -> - {reply, ignored, State}. - - -%%-------------------------------------------------------------------- -%% @spec handle_cast(Msg, State) -> {noreply, State} | -%% {noreply, State, Timeout} | -%% {stop, Reason, State} -%% @doc Handling cast messages -%% @end -%%-------------------------------------------------------------------- - -handle_cast({gossip, RemoteState = #membership{node=RemoteNode}}, - LocalState = #membership{node=_Me}) -> - showroom_log:message(info, "membership: received gossip from ~p", - [RemoteNode]), - {MergeType, MergedState = #membership{nodes=_MergedNodes}} = - merge_state(RemoteState, LocalState), - case MergeType of - equal -> {noreply, MergedState}; - merged -> - showroom_log:message(info, "membership: merged new gossip", []), - % fire_gossip(Me, MergedNodes, MergedState), - update_ets(ets_name(node()), MergedState), - save(MergedState), - {noreply, MergedState} - end; - -% decommission -% renaming for now, until case 1245 can be completed -handle_cast({decommission, Part, DbName}, State) -> - {{Y,Mon,D}, {H,Min,S}} = calendar:universal_time(), - Directory = couch_config:get("couchdb", "database_dir"), - OrigFilename = showroom_utils:full_filename(Part, DbName, Directory), - Moved = lists:flatten(io_lib:format(".~w~2.10.0B~2.10.0B." ++ - "~2.10.0B~2.10.0B~2.10.0B.moved.couch", [Y,Mon,D,H,Min,S])), - % Note: this MovedFilename bit below gives weird results: - % ["/Users/brad/dev/erlang/dbcore/tmp/lib/x800000/test_800000", - % ".20091001.162640.moved.couch"] but list/string behavior handles it. - MovedFilename = lists:map(fun(E) -> binary_to_list(E) end, - re:replace(OrigFilename, "\.couch", Moved, [])), - ok = file:rename(OrigFilename, MovedFilename), - {noreply, State}. - - -%% @doc handle nodedown messages because we have -%% net_kernel:monitor_nodes(true) -handle_info({nodedown, Node}, - State = #membership{nodes=OldNodes, fullmap=OldFullmap, - version=OldVersion}) -> - showroom_log:message(alert, "membership: nodedown from ~p", [Node]), - case lists:member(Node, OldNodes) of - true -> - notify(nodedown, [Node]), - % clean up membership state - Nodes = lists:delete(Node, OldNodes), - {PMap, Fullmap} = mem_utils:fix_mappings(nodedown, Node, OldFullmap), - % Do we increment clock here? w/o gossip? - % This is happening near simultaneously on the other nodes, too :\ - % Only reason to increment is persisted clock on down node will be older - % when it returns - Version = vector_clock:increment(dbcore, OldVersion), - NewState = State#membership{nodes=Nodes, partitions=PMap, fullmap=Fullmap, - version=Version}, - update_ets(ets_name(node()), NewState), - save(NewState), - {noreply, NewState}; - _ -> {noreply, State} - end; - -%% @doc handle nodeup messages because we have -%% net_kernel:monitor_nodes(true) -handle_info({nodeup, Node}, State) -> - showroom_log:message(alert, "membership: nodeup Node: ~p", [Node]), - {noreply, State}; - -handle_info(Info, State) -> - showroom_log:message(info, "membership: handle_info Info: ~p", [Info]), - {noreply, State}. - -%%-------------------------------------------------------------------- -%% @spec terminate(Reason, State) -> void() -%% @doc This function is called by a gen_server when it is about to -%% terminate. It should be the opposite of Module:init/1 and do any necessary -%% cleaning up. When it returns, the gen_server terminates with Reason. -%% The return value is ignored. -%% @end -%%-------------------------------------------------------------------- -terminate(_Reason, _State) -> - ok. - -%% 0.5.6 to 0.5.7 -code_change(184380560337424323902805568963460261434, State, _Extra) -> - backup_old_config_file(), - % update State to the new version - {membership, _Hdr, Node, Nodes, PMap, Version} = State, - NewState = #membership{ - node = Node, - nodes = Nodes, - partitions = PMap, - version = Version, - fullmap = make_all_nodes_parts(PMap) - }, - save(NewState), - % also create new ets table - Ets = ets:new(ets_name(Node), [public, set, named_table]), - update_ets(Ets, NewState), - {ok, NewState}; - -%% 0.8.8 to 0.9.0 -code_change(239470595681156900105628017899543243419, State, _Extra) -> - net_kernel:monitor_nodes(true), - {ok, State}; - -code_change(OldVsn, State, _Extra) -> - io:format("Unknown Old Version!~nOldVsn: ~p~nState : ~p~n", [OldVsn, State]), - {ok, State}. - -%%-------------------------------------------------------------------- -%%% Internal functions -%%-------------------------------------------------------------------- - -backup_old_config_file() -> - Config = configuration:get_config(), - FileName = filename:join([Config#config.directory, - lists:concat([node:name(node()), ".state"])]), - BackupName = filename:join([Config#config.directory, - lists:concat([node:name(node()), ".state.bak"])]), - file:copy(FileName, BackupName). - - -%% return State from membership file -load(Node) -> - Config = configuration:get_config(), - case file:consult(filename:join([Config#config.directory, - lists:concat([node:name(Node), ".state"])])) of - {error, Reason} -> - showroom_log:message(info, "membership: could not load state: ~p~n", - [Reason]), - #membership{nodes=[]}; - {ok, [Terms]} -> - Terms - end. - - -%% save the State to a file -save(State) -> - Config = configuration:get_config(), - Filename = filename:join([Config#config.directory, - lists:concat([node:name(State#membership.node), ".state"])]), - {ok, File} = file:open(Filename, [binary, write]), - io:format(File, "~w.~n", [State]), - file:close(File). - - -%% joining is bi-directional, as opposed to gossip which is unidirectional -%% we want to collect the list of known nodes to compute the partition map -%% which isn't necessarily the same as the list of running nodes -join_to(Node, Partners, Options) -> - join_to(Node, Partners, - {vector_clock:create(dbcore), [], [], []}, Options). - - -%% @doc join this node to one of its partners (or PartnersPlus if no partners -%% are available). -join_to(_, [], {Version, World, PMap, FullMap}, _Options) -> - {Version, World, PMap, FullMap}; - -join_to(Node, [Partner|Rest], {Version, World, PMap, FullMap}, Options) -> - case call_join(Partner, Node, Options) of - {ok, RemoteVersion, NewNodes, NewPMap, NewFullMap} -> - {vector_clock:merge(Version, RemoteVersion), - lists:usort(World ++ NewNodes), - NewPMap, - NewFullMap}; - Other -> - showroom_log:message(info, "membership: join_to Other: ~p~n", [Other]), - join_to(Node, Rest, {Version, World, PMap, FullMap}, Options) - end. - - -%% @doc make the join call to Remote node (usually a partner of Node) -call_join(Remote, Node, Options) -> - showroom_log:message(info, "membership: call_join From: ~p To: ~p", - [Node, Remote]), - catch gen_server:call({membership, node:name(Remote)}, - {join, Node, Options}). - - -merge_state(_RemoteState=#membership{version=RemoteVersion, nodes=RemoteNodes, - partitions=RemotePMap, - fullmap=RemoteFullMap}, - LocalState=#membership{version=LocalVersion, nodes=LocalNodes, - partitions=LocalPMap, - fullmap=LocalFullMap}) -> - case vector_clock:equals(RemoteVersion, LocalVersion) of - true -> - {equal, LocalState}; - false -> - % Note, we're matching MergedVersion from these funs. - % They should be the same. - {MergedVersion, MergedNodes} = - merge_nodes(RemoteVersion, RemoteNodes, LocalVersion, LocalNodes), - {MergedVersion, MergedPMap} = - merge_pmaps(RemoteVersion, RemotePMap, LocalVersion, LocalPMap), - {MergedVersion, MergedFullMap} = - merge_fullmaps(RemoteVersion, RemoteFullMap, - LocalVersion, LocalFullMap), - - % notify of arrivals & departures - Arrived = MergedNodes -- LocalNodes, - notify(node_join, Arrived), - % Departed = LocalNodes -- MergedNodes, - % notify(node_leave, Departed), - - {merged, LocalState#membership{version=MergedVersion, nodes=MergedNodes, - partitions=MergedPMap, - fullmap=MergedFullMap}} - end. - - -merge_nodes(RemoteVersion, RemoteNodes, LocalVersion, LocalNodes) -> - {MergedVersion, Merged} = vector_clock:resolve({RemoteVersion, RemoteNodes}, - {LocalVersion, LocalNodes}), - {MergedVersion, lists:usort(Merged)}. - - -merge_pmaps(RemoteVersion, RemotePMap, LocalVersion, LocalPMap) -> - {MergedVersion, Merged} = vector_clock:resolve({RemoteVersion, RemotePMap}, - {LocalVersion, LocalPMap}), - {MergedVersion, lists:ukeysort(2, Merged)}. - - -merge_fullmaps(RemoteVersion, RemoteFullMap, LocalVersion, LocalFullMap) -> - {MergedVersion, Merged} = vector_clock:resolve({RemoteVersion, RemoteFullMap}, - {LocalVersion, LocalFullMap}), - {MergedVersion, lists:usort(Merged)}. - - -notify(Type, Nodes) -> - lists:foreach(fun(Node) -> - gen_event:notify(membership_events, {Type, Node}) - end, Nodes). - - -%% @doc fires a gossip message (membership state) to partners nodes in the -%% cluster. -%% @end -fire_gossip(Me, WorldNodes, Gossip) -> - % GossipPartners = partners_plus(Me, WorldNodes), - % random experiment, gossip with all nodes, not just partners_plus - GossipPartners = lists:delete(Me, WorldNodes), - lists:foreach(fun(TargetNode) -> - showroom_log:message(info, "membership: firing gossip from ~p to ~p", - [Me, TargetNode]), - gen_server:cast({membership, TargetNode}, {gossip, Gossip}) - end, GossipPartners). - - -%% @doc construct a table with all partitions, with the primary node and all -%% replication partner nodes as well. -make_all_nodes_parts(PMap) -> - {Nodes, _Parts} = lists:unzip(PMap), - NodeParts = lists:flatmap( - fun({Node,Part}) -> - Partners = replication:partners(Node, lists:usort(Nodes)), - PartnerList = [{Partner, Part, partner} || Partner <- Partners], - [{Node, Part, primary} | PartnerList] - end, PMap), - NodeParts. - - -%% @doc for the given key, return a list of {Node,Part} tuples. Nodes are both -%% primary and replication partner nodes, and should number N. -int_node_parts_for_key(Key) -> - Config = configuration:get_config(), - Hash = lib_misc:hash(Key), - Part = partitions:hash_to_partition(Hash, Config#config.q), - NodePartList = all_nodes_parts(true), - lists:filter(fun({_N,P}) -> P =:= Part end, NodePartList). - - -%% ets table helper functions -ets_name(Node) -> - list_to_atom(lists:concat(["mem_", atom_to_list(Node)])). - - -update_ets(Table, #membership{partitions=PMap, fullmap=FullMap}) -> - ets:insert(Table, {pmap, PMap}), - ets:insert(Table, {fullmap, FullMap}), - ok. - - -ets_pmap() -> - [{pmap, PMap}] = ets:lookup(ets_name(node()), pmap), - PMap. - - -ets_fullmap() -> - [{fullmap, FullMap}] = ets:lookup(ets_name(node()), fullmap), - FullMap. diff --git a/src/node.erl b/src/node.erl deleted file mode 100644 index 9a9c82c1..00000000 --- a/src/node.erl +++ /dev/null @@ -1,39 +0,0 @@ -%%%------------------------------------------------------------------- -%%% File: node.erl -%%% @author Cliff Moon <> [] -%%% @copyright 2009 Cliff Moon -%%% @doc -%%% -%%% @end -%%% -%%% @since 2009-05-11 by Cliff Moon -%%%------------------------------------------------------------------- --module(node). --author('cliff@powerset.com'). - -%% API --export([name/1, attributes/1]). - --include("../include/common.hrl"). - -%% -ifdef(TEST). -%% -include("../etest/node_test.erl"). -%% -endif. - -%%==================================================================== -%% API -%%==================================================================== - -name(Name) when is_atom(Name) -> - Name; -name(Node) when is_tuple(Node) -> - element(1, Node); -name(Node) -> - Node. - -attributes(Name) when is_atom(Name) -> - []; -attributes(Node) when is_tuple(Node) -> - element(2, Node); -attributes(_) -> - []. diff --git a/src/replication.erl b/src/replication.erl deleted file mode 100644 index 96be0ad3..00000000 --- a/src/replication.erl +++ /dev/null @@ -1,165 +0,0 @@ -%%%------------------------------------------------------------------- -%%% File: replication.erl -%%% @author Brad Anderson <brad@cloudant.com> [http://www.cloudant.com] -%%% @copyright 2009 Brad Anderson -%%% @doc -%%% -%%% @end -%%% -%%% @since 2009-06-14 by Brad Anderson -%%%------------------------------------------------------------------- --module(replication). --author('brad@cloudant.com'). - -%% API --export([partners/2, partners/3, partners_plus/2]). - --include_lib("eunit/include/eunit.hrl"). --include("../include/config.hrl"). --include("../include/common.hrl"). - - -%%==================================================================== -%% API -%%==================================================================== - -partners(Node, Nodes) -> - partners(Node, Nodes, configuration:get_config()). - - -%%-------------------------------------------------------------------- -%% @spec partners(Node::atom(), Nodes::list(), Config::config()) -> -%% list() -%% @doc returns the list of all replication partners for the specified node -%% @end -%%-------------------------------------------------------------------- -partners(Node, Nodes, Config) -> - N = Config#config.n, - Meta = Config#config.meta, - pick_partners(Meta, Node, Nodes, [], N - 1). - - -%% return a list of live/up Partners, and if all Partners are down, -%% walk the ring to get one other remote node and return it. -partners_plus(Node, Nodes) -> - Partners = partners(Node, Nodes), - PartnersDown = lists:subtract(Partners, erlang:nodes()), - PartnersUp = lists:subtract(Partners, PartnersDown), - case PartnersUp of - [] -> - TargetNodes = target_list(Node, Nodes), - NonPartners = lists:subtract(TargetNodes, - lists:flatten([Node, Partners])), - walk_ring(NonPartners); - _ -> - %% at least one partner is up, so gossip w/ them - PartnersUp - end. - - -%%==================================================================== -%% Internal functions -%%==================================================================== - -%% @spec pick_partners(proplist(), Node::dynomite_node(), [Node], [Node], -%% integer()) -> list() -%% @doc iterate through N-1 partner picks, returning the resulting list sorted -pick_partners(_Meta, Node, _Nodes, Acc, 0) -> - lists:sort(lists:delete(Node, Acc)); -pick_partners(Meta, Node, Nodes, Acc, Count) -> - Partner = pick_partner(Meta, Node, Nodes, Acc, 1), - NewNodes = lists:filter(fun(Elem) -> - case Elem of - no_partner_found -> false; - Partner -> false; - _ -> true - end - end, Nodes), - NewAcc = case Partner of - no_partner_found -> Acc; - _ -> [Partner|Acc] - end, - pick_partners(Meta, Node, NewNodes, NewAcc, Count-1). - - -%% @spec pick_partner(proplist(), Node::dynomite_node(), [Node], [Node], -%% integer()) -> Node::dynomite_node() -%% @doc pick a specific replication partner at the given level -pick_partner([], Node, Nodes, _Acc, 1) -> - %% handle the no metadata situation - %% Note: This clause must be before the Level > length(Meta) guarded clause - target_key(node:name(Node), lists:map(fun node:name/1, Nodes), roundrobin); - -pick_partner(Meta, _Node, _Nodes, Acc, Level) when Level > length(Meta) -> - Acc; - -pick_partner(Meta, Node, Nodes, Acc, Level) -> - MetaDict = meta_dict(Nodes, Level, dict:new()), - NodeKey = lists:sublist(node:attributes(Node), Level), - Keys = dict:fetch_keys(MetaDict), - {_MetaName, Strategy} = lists:nth(Level, Meta), - TargetKey = target_key(NodeKey, Keys, Strategy), - Candidates = dict:fetch(TargetKey, MetaDict), - case length(Candidates) of - 0 -> - %% didn't find a candidate - no_partner_found; - 1 -> - %% found only one candidate, return it - [Partner] = Candidates, - Partner; - _ -> - pick_partner(Meta, Node, Nodes, Acc, Level + 1) - end. - - -%% @doc construct a dict that holds the key of metadata values so far (up to -%% the current level, and dynomite_node() list as the value. This is used -%% to select a partner in pick_partner/5 -%% @end -meta_dict([], _Level, Dict) -> - Dict; - -meta_dict([Node|Rest], Level, Dict) -> - Key = lists:sublist(node:attributes(Node), Level), - DictNew = dict:append(Key, Node, Dict), - meta_dict(Rest, Level, DictNew). - - -%% @spec target_key(term(), list(), Strategy::atom()) -> term() -%% @doc given the key and keys, sort the list of keys based on stragety (i.e. -%% for roundrobin, sort them, put the NodeKey on the end of the list, and -%% then return the head of the list as the target. -%% @end -%% TODO: moar strategies other than roundrobin? -target_key(NodeKey, Keys, roundrobin) -> - SortedKeys = lists:sort(Keys), - TargetKey = case target_list(NodeKey, SortedKeys) of - [] -> no_partner_found; - [Key|_Rest] -> Key - end, - TargetKey. - - -%% @spec target_list(term(), list()) -> list() -%% @doc split the list of keys into 'lessthan NodeKey', NodeKey, and 'greaterthan -%% Nodekey' and then put the lessthan section on the end of the list -%% @end -target_list(_NodeKey, []) -> - []; -target_list(NodeKey, Keys) -> - {A, [NodeKey|B]} = lists:splitwith(fun(K) -> K /= NodeKey end, Keys), - lists:append([B, A, [NodeKey]]). - - -walk_ring([]) -> - %% TODO: should we be more forceful here and throw? not for now - showroom_log:message(info, - "~p:walk_ring/1 - could not find node for gossip", [?MODULE]), - []; - -walk_ring([Node|Rest]) -> - case lists:member(Node, erlang:nodes()) of - true -> [Node]; - _ -> walk_ring(Rest) - end. |