summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorBrad Anderson <brad@cloudant.com>2010-05-28 15:52:42 -0400
committerBrad Anderson <brad@cloudant.com>2010-05-28 15:52:42 -0400
commit4c6b7c7c12ba03e5b50d7379ab14cb0ba0037965 (patch)
treee352a307daa3daf988258ee7fabb614d904962d6 /src
parent819db8cbda15dc8f78831a8705b3a2ad468343ef (diff)
remove dynomite cruft
Diffstat (limited to 'src')
-rw-r--r--src/bootstrap_manager.erl261
-rw-r--r--src/bootstrap_receiver.erl121
-rw-r--r--src/cluster_ops.erl264
-rw-r--r--src/configuration.erl100
-rw-r--r--src/dynomite_couch_api.erl140
-rw-r--r--src/dynomite_couch_storage.erl41
-rw-r--r--src/lib_misc.erl235
-rw-r--r--src/mem_utils.erl129
-rw-r--r--src/membership2.erl686
-rw-r--r--src/node.erl39
-rw-r--r--src/replication.erl165
11 files changed, 0 insertions, 2181 deletions
diff --git a/src/bootstrap_manager.erl b/src/bootstrap_manager.erl
deleted file mode 100644
index f1303223..00000000
--- a/src/bootstrap_manager.erl
+++ /dev/null
@@ -1,261 +0,0 @@
-%%%-------------------------------------------------------------------
-%%% File: bootstrap_manager.erl
-%%% @author Cliff Moon <> []
-%%% @copyright 2009 Cliff Moon
-%%% @doc This is the bootstrap manager for a cluster.
-%%%
-%%% @end
-%%%
-%%% @since 2009-07-29 by Cliff Moon
-%%%-------------------------------------------------------------------
--module(bootstrap_manager).
--author('cliff@powerset.com').
--author('brad@cloudant.com').
-
--behaviour(gen_server).
-
-%% API
--export([start_bootstrap/3, end_bootstrap/1,
- start_link/3, start/3, stop/0,
- start_transfers/0, transfers/0]).
-
-%% gen_server callbacks
--export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3]).
-
--record(state, {transfer_list, nodes, transfers, futurefullmap}).
--record(transfer, {partition, receivers, rate=0, status=starting}).
-
--include("../include/config.hrl").
--include("../include/common.hrl").
-
-%%====================================================================
-%% API
-%%====================================================================
-%%--------------------------------------------------------------------
-%% @spec start_link() -> {ok,Pid} | ignore | {error,Error}
-%% @doc Starts the server
-%% @end
-%%--------------------------------------------------------------------
-start_bootstrap(State=#membership{node=Node, nodes=Nodes},
- OldFullMap, NewFullMap) ->
- case partitions:diff(OldFullMap, NewFullMap) of
- [] ->
- % no difference in pmaps
- {NewFullMap, State#membership{fullmap=NewFullMap}};
- TransferList when is_list(TransferList) ->
- ?LOG_DEBUG("~nBootstrap~nNode : ~p~nTransferList :~n~p~n",
- [Node, partitions:pp_diff(TransferList)]),
- case start_link(TransferList, Nodes, NewFullMap) of
- {ok, _Pid} ->
- start_transfers();
- Other -> throw(Other)
- end,
-
- % bootstrap has some stuff to do (async), so just give the state
- % passed in for now. end_bootstrap will be called with the resulting
- % state when it completes
- {OldFullMap, State};
- Other ->
- % probably occurs b/c T (# of nodes) < N currently.
- % more nodes joining should help avoid this error.
- ?LOG_ERROR("no_bootstrap - Other: ~p", [Other]),
- {NewFullMap, State#membership{fullmap=NewFullMap}}
- end.
-
-
-end_bootstrap(#state{futurefullmap=FutureFullMap}) ->
- end_bootstrap(FutureFullMap);
-
-end_bootstrap(NewFullMap) ->
- gen_server:call(membership, {newfullmap, NewFullMap}),
- stop().
-
-
-start(TransferList, Nodes, FutureFullMap) ->
- gen_server:start({global, bootstrap_manager}, ?MODULE,
- [TransferList, Nodes, FutureFullMap], []).
-
-
-start_link(TransferList, Nodes, FutureFullMap) ->
- gen_server:start_link({global, bootstrap_manager}, ?MODULE,
- [TransferList, Nodes, FutureFullMap], []).
-
-
-stop() ->
- gen_server:cast({global, bootstrap_manager}, stop).
-
-
-start_transfers() ->
- gen_server:cast({global, bootstrap_manager}, start_transfers).
-
-
-transfers() ->
- gen_server:call({global, bootstrap_manager}, transfers).
-
-
-%%====================================================================
-%% gen_server callbacks
-%%====================================================================
-
-%%--------------------------------------------------------------------
-%% @spec init(Args) -> {ok, State} |
-%% {ok, State, Timeout} |
-%% ignore |
-%% {stop, Reason}
-%% @doc Initiates the server
-%% @end
-%%--------------------------------------------------------------------
-init([TransferList, Nodes, FutureFullMap]) ->
- process_flag(trap_exit, true),
- {ok, #state{transfer_list=TransferList,nodes=Nodes,
- futurefullmap=FutureFullMap}}.
-
-
-%%--------------------------------------------------------------------
-%% @spec
-%% handle_call(Request, From, State) -> {reply, Reply, State} |
-%% {reply, Reply, State, Timeout} |
-%% {noreply, State} |
-%% {noreply, State, Timeout} |
-%% {stop, Reason, Reply, State} |
-%% {stop, Reason, State}
-%% @doc Handling call messages
-%% @end
-%%--------------------------------------------------------------------
-handle_call(average_transfer_rate, _From,
- State=#state{transfers=Transfers}) ->
- {Sum, Cardinality} = ets:foldl(
- fun(#transfer{rate=Rate}, {Sum, Cardinality}) ->
- {Sum+Rate,Cardinality+1}
- end, {0, 0}, Transfers),
- AverageRate = Sum / Cardinality,
- {reply, AverageRate, State};
-
-handle_call(aggregate_transfer_rate, _From,
- State=#state{transfers=Transfers}) ->
- Sum = ets:foldl(fun(#transfer{rate=Rate}, Sum) ->
- Rate + Sum
- end, 0, Transfers),
- {reply, Sum, State};
-
-handle_call(transfers, _From,
- State=#state{transfers=Transfers}) ->
- {reply, {ok, ets:tab2list(Transfers)}, State};
-
-%% at least reply that this 'catch-all' was ignored
-handle_call(_Request, _From, State) ->
- {reply, ignored, State}.
-
-
-%%--------------------------------------------------------------------
-%% @spec handle_cast(Msg, State) -> {noreply, State} |
-%% {noreply, State, Timeout} |
-%% {stop, Reason, State}
-%% @doc Handling cast messages
-%% @end
-%%--------------------------------------------------------------------
-handle_cast(stop, State) ->
- {stop, normal, State};
-
-handle_cast(start_transfers,
- State=#state{transfer_list=TransferList}) ->
- Transfers = start_transfers(TransferList, State),
- {noreply, State#state{transfers=Transfers}};
-
-handle_cast(_Msg, State) ->
- {noreply, State}.
-
-
-%%--------------------------------------------------------------------
-%% @spec handle_info(Info, State) -> {noreply, State} |
-%% {noreply, State, Timeout} |
-%% {stop, Reason, State}
-%% @doc Handling all non call/cast messages
-%% @end
-%%--------------------------------------------------------------------
-
-handle_info({receiver_done, FromNode, _ToNode, Partition, DbName, Receiver},
- State = #state{transfers=Transfers}) ->
- %% TODO use bring_online & ToNode? instead of waiting until end & installing
- %% NewFullMap into mem2
-
- %% handle the old file
- membership2:decommission_part(FromNode, Partition, DbName),
-
- %% remove from Transfers table
- case ets:lookup(Transfers, Partition) of
- [Transfer] = [#transfer{receivers=Receivers}] ->
- NewReceivers = lists:delete(Receiver, Receivers),
- if
- length(NewReceivers) == 0 -> ets:delete(Transfers, Partition);
- true -> ets:insert(Transfers, Transfer#transfer{receivers=NewReceivers})
- end;
- _ -> ok
- end,
- case ets:first(Transfers) of
- '$end_of_table' ->
- end_bootstrap(State),
- {noreply, State};
- _ -> {noreply, State}
- end;
-
-handle_info(_Info, State) ->
- {noreply, State}.
-
-
-%%--------------------------------------------------------------------
-%% @spec terminate(Reason, State) -> void()
-%% @doc This function is called by a gen_server when it is about to
-%% terminate. It should be the opposite of Module:init/1 and do any necessary
-%% cleaning up. When it returns, the gen_server terminates with Reason.
-%% The return value is ignored.
-%% @end
-%%--------------------------------------------------------------------
-terminate(_Reason, _State) ->
- ok.
-
-
-%%--------------------------------------------------------------------
-%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState}
-%% @doc Convert process state when code is changed
-%% @end
-%%--------------------------------------------------------------------
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-
-%%--------------------------------------------------------------------
-%%% Internal functions
-%%--------------------------------------------------------------------
-start_transfers([], State) ->
- no_transfers, % no diff in pmaps, so no transfers
- end_bootstrap(State);
-
-start_transfers(Diff, State=#state{nodes=Nodes}) ->
- case showroom_db:all_databases("") of
- {ok, AllDbs} when length(AllDbs) > 0 ->
- start_transfers(Diff, Nodes, configuration:get_config(), AllDbs,
- ets:new(transfers, [public, set, {keypos, 2}]));
- {ok, []} -> end_bootstrap(State); % no databases, so bootstrap not needed
- Other -> throw(Other) % problem getting list of dbs
- end.
-
-
-start_transfers([], _, _, _, Transfers) ->
- Transfers;
-
-start_transfers([{FromNode, ToNode, Partition} | Diff], Nodes, Config,
- AllDbs, Transfers) ->
- membership2:take_offline(FromNode, Partition),
- Receivers = lists:map(
- fun(DbName) ->
- {ok, Receiver} =
- bootstrap_receiver:start_link(FromNode, ToNode, Partition,
- DbName, 10000, self()),
- Receiver
- end, AllDbs),
- % NOTE: by using AllDbs, we are omitting .deleted.couch files
- ets:insert(Transfers, #transfer{partition=Partition,
- receivers=Receivers}),
- start_transfers(Diff, Nodes, Config, AllDbs, Transfers).
diff --git a/src/bootstrap_receiver.erl b/src/bootstrap_receiver.erl
deleted file mode 100644
index 3b4907cb..00000000
--- a/src/bootstrap_receiver.erl
+++ /dev/null
@@ -1,121 +0,0 @@
-%%%-------------------------------------------------------------------
-%%% File: bootstrap_receiver.erl
-%%% @author Brad Anderson <brad@cloudant.com>
-%%% @copyright 2009 Brad Anderson
-%%% @doc
-%%%
-%%% @end
-%%%
-%%% @since 2009-09-22 by Brad Anderson
-%%%-------------------------------------------------------------------
--module(bootstrap_receiver).
--author('brad@cloudant.com').
-
--include("../include/config.hrl").
--include("../include/common.hrl").
-
-%% API
--export([start_link/6, loop/6, fetch_shard/5]).
-
-
-%%====================================================================
-%% API
-%%====================================================================
-%%--------------------------------------------------------------------
-%% @spec
-%% @doc
-%% @end
-%%--------------------------------------------------------------------
-start_link(FromNode, ToNode, Partition, DbName, Timeout, Manager) ->
- Pid = proc_lib:spawn_link(ToNode, bootstrap_receiver, loop,
- [FromNode, Partition, DbName, Timeout, Manager,
- self()]),
- sync_wait(Pid, Timeout).
-
-
-loop(FromNode, Partition, DbName, Timeout, Manager, Parent) ->
- proc_lib:init_ack(Parent, {ok, self()}),
- fetch_shard(FromNode, Partition, DbName, Timeout, Manager).
-
-
-%% @doc run at "ToNode" via spawn_link
-fetch_shard(FromNode, Partition, DbName, Timeout, Manager) ->
- Directory = couch_config:get("couchdb", "database_dir"),
- [_NodeName, Hostname] = string:tokens(atom_to_list(FromNode), "@"),
- SrcFile = binary_to_list(partitions:shard_name(Partition, DbName)),
- DestFile = showroom_utils:full_filename(Partition, DbName, Directory),
- Authn = fetch_authn(),
- Port = fetch_port(),
- Url = lists:concat(["http://", Authn, Hostname, Port, "/", SrcFile,
- ".couch"]),
- Options = [{save_response_to_file, DestFile},
- {inactivity_timeout, Timeout}],
- case filelib:ensure_dir(DestFile) of
- ok -> ok;
- {error, eexist} -> ok; % duh!
- Other -> throw(Other)
- end,
- ?LOG_DEBUG("~n"
- "Directory: ~p~n"
- "Hostname : ~p~n"
- "SrcFile : ~p~n"
- "DestFile : ~p~n"
- "Url : ~p~n"
- "Options : ~p~n"
- , [Directory, Hostname, SrcFile, DestFile, Url, Options]),
- case ibrowse:send_req(Url, [], get, [], Options, infinity) of
- {ok, "200", _Headers, Body} ->
- ?LOG_DEBUG("~nBootstrap ibrowse req Body: ~p~n", [Body]),
- Manager ! {receiver_done, FromNode, node(), Partition, DbName,
- self()};
- Error ->
- ?LOG_ERROR("~nBootstrap ibrowse req Error: ~p~n", [Error]),
- throw(Error)
- end.
-
-
-%%====================================================================
-%% Internal functions
-%%====================================================================
-
-
-%% from proc_lib.erl in otp r13b01
-sync_wait(Pid, Timeout) ->
- receive
- {ack, Pid, Return} ->
- Return;
- {'EXIT', Pid, Reason} ->
- {error, Reason}
- after Timeout ->
- unlink(Pid),
- exit(Pid, kill),
- flush(Pid),
- {error, timeout}
- end.
-
-
-flush(Pid) ->
- receive
- {'EXIT', Pid, _} ->
- true
- after 0 ->
- true
- end.
-
-
-fetch_authn() ->
- User = couch_config:get("shard_moving", "user", ""),
- Pass = couch_config:get("shard_moving", "pass", ""),
- if
- length(User) > 0 andalso length(Pass) > 0 ->
- lists:concat([User, ":", Pass, "@"]);
- true -> ""
- end.
-
-
-fetch_port() ->
- Port = couch_config:get("shard_moving", "port", "8080"),
- if
- Port =:= "80" -> "";
- true -> lists:concat([":", Port])
- end.
diff --git a/src/cluster_ops.erl b/src/cluster_ops.erl
deleted file mode 100644
index 72bba92f..00000000
--- a/src/cluster_ops.erl
+++ /dev/null
@@ -1,264 +0,0 @@
-%%%-------------------------------------------------------------------
-%%% File: cluster_ops.erl
-%%% @author Brad Anderson <brad@cloudant.com> [http://cloudant.com]
-%%% @copyright 2009 Brad Anderson
-%%% @doc
-%%%
-%%% @end
-%%%
-%%% @since 2009-07-21 by Brad Anderson
-%%%-------------------------------------------------------------------
--module(cluster_ops).
--author('brad@cloudant.com').
-
-%% API
--export([key_lookup/3, key_lookup/5,
- all_parts/4,
- some_parts/4, some_parts/5,
- quorum_from_each_part/3]).
-
--include("../include/common.hrl").
--include("../include/config.hrl").
-
--include("../include/profile.hrl").
-
-
-%%====================================================================
-%% API
-%%====================================================================
-
-%% @doc Get to the proper shard on N nodes by key lookup
-%%
-%% This fun uses quorum constants from config
-key_lookup(Key, {M,F,A}, Access) ->
- N = list_to_integer(couch_config:get("cluster", "n", "3")),
- key_lookup(Key, {M,F,A}, Access, get_const(Access), N).
-
-
-%% @doc Get to the proper shard on N nodes by key lookup
-%%
-%% This fun uses a provided quorum constant, possibly from request,
-%% possibly from config
-key_lookup(Key, {M,F,A}, Access, Const, N) ->
- NodeParts = membership2:nodeparts_for_key(Key),
- {ResolveFun, NotFoundFun} = case Access of
- r -> {fun resolve_read/1, fun resolve_not_found/2};
- w -> {fun resolve_write/1, fun(_,_) -> {false, notused, []} end}
- end,
- MapFun = fun({Node,Part}) ->
- try
- rpc:call(Node, M, F, [[Part | A]])
- catch Class:Exception ->
- {error, Class, Exception}
- end
- end,
- {GoodReplies, Bad} = pcall(MapFun, NodeParts, N),
- if length(Bad) > 0 -> ?LOG_DEBUG("~nBad: ~p~n", [Bad]); true -> ok end,
- Good = lists:map(fun strip_ok/1, GoodReplies),
- final_key_lookup(Good, Bad, N, Const, ResolveFun, NotFoundFun, Access).
-
-
-%% @doc Do op on all shards (and maybe even replication partners)
-all_parts({M,F,A}, Access, AndPartners, ResolveFun) ->
- NodePartList = membership2:all_nodes_parts(AndPartners),
- MapFun = fun({Node, Part}) ->
- try
- rpc:call(Node, M, F, [[Part | A]])
- catch Class:Exception ->
- {error, Class, Exception}
- end
- end,
- Replies = ?PMAP(MapFun, NodePartList),
- {Good, Bad} = lists:partition(fun valid/1, Replies),
- final_all_parts(Good, Bad, length(NodePartList), ResolveFun, Access).
-
-
-%% @doc Do op on some shards, depending on list of keys sent in.
-%%
-%% This fun uses quorum constants from config
-some_parts(KeyFun, SeqsKVPairs, {M,F,A}, Access) ->
- some_parts(KeyFun, SeqsKVPairs, {M,F,A}, Access, get_const(Access)).
-
-
-%% @doc Do op on some shards, depending on list of keys sent in.
-%%
-%% This fun uses a provided quorum constant, possibly from request,
-%% possibly from config
-some_parts(KeyFun, SeqsKVPairs, {M,F,A}, _Access, Const) ->
- TaskFun = fun({{Node,Part}, Values}) ->
- try
- rpc:call(Node, M, F, [[Part | [Values | A]]])
- catch Class:Exception ->
- {error, Class, Exception}
- end
- end,
-
- % get tasks per node that are part / values for that partition
- DistTasks = get_dist_tasks(KeyFun, SeqsKVPairs),
-
- % With the distributed tasklist in hand, do the tasks per partition.
- % For each partition, do the work on all nodes/parts.
- TaskReplies = ?PMAP(TaskFun, DistTasks),
- {GoodReplies, Bad} = lists:partition(fun valid/1, TaskReplies),
- if length(Bad) > 0 -> ?LOG_DEBUG("~nBad: ~p~n", [Bad]); true -> ok end,
- Good = lists:map(fun strip_ok/1, GoodReplies),
- final_some_parts(Good, Bad, Const).
-
-
-quorum_from_each_part({M,F,A}, Access, ResolveFun) ->
- Const = get_const(Access),
- {_, Parts} = lists:unzip(membership2:partitions()),
- PartsMapFun = fun(Part) ->
- Nodes = membership2:nodes_for_part(Part),
- NodesMapFun = fun(Node) -> rpc:call(Node, M, F, [[Part | A]]) end,
- {GoodReplies,BadReplies} = pcall(NodesMapFun, Nodes, Const),
- Good1 = lists:map(fun strip_ok/1, GoodReplies),
- Bad1 = case length(Good1) >= Const of
- true -> [];
- false -> BadReplies
- end,
- {Good1,Bad1}
- end,
- Results1 = ?PMAP(PartsMapFun, Parts),
- {Good,Bad} = lists:foldl(fun({G,B}, {GAcc,BAcc}) ->
- {lists:append(G,GAcc),lists:append(B,BAcc)}
- end, {[],[]}, Results1),
- if length(Bad) > 0 -> ?LOG_DEBUG("~nBad: ~p~n", [Bad]); true -> ok end,
- final_quorum_from_each_part(Good, Bad, length(Parts), ResolveFun, Access).
-
-
-%%--------------------------------------------------------------------
-%% Internal functions
-%%--------------------------------------------------------------------
-
-final_key_lookup(Good, Bad, N, Const, ResolveFun, NotFoundFun, Access) ->
- {NotFound, Return, Reasons} = NotFoundFun(Bad, Const),
- if
- length(Good) >= Const -> {ok, ResolveFun(Good)};
- NotFound -> {ok, Return, Reasons};
- true -> error_message(Good, Bad, N, Const, Access)
- end.
-
-
-final_all_parts(Good, Bad, Total, ResolveFun, Access) ->
- case length(Good) =:= Total of
- true -> {ok, ResolveFun(Good)};
- _ -> error_message(Good, Bad, Total, Total, Access)
- end.
-
-
-final_some_parts(Good, _Bad, Const) ->
- Good1 = lists:flatten(Good),
- {Seqs, _} = lists:unzip(Good1),
- {ResG,ResB} =
- lists:foldl(
- fun(Seq, {AccG,AccB}) ->
- Vals = proplists:get_all_values(Seq, Good1),
- case length(Vals) >= Const of
- true -> {[{Seq, Vals}|AccG],AccB};
- _ -> {AccG, [{Seq, Vals}|AccB]}
- end
- end, {[],[]}, lists:usort(Seqs)),
- case length(ResB) of
- 0 -> {ok, ResG};
- _ -> {error, ResB}
- end.
-
-
-final_quorum_from_each_part(Good, Bad, Total, ResolveFun, Access) ->
- case length(Good) =:= Total of
- true -> {ok, ResolveFun(Good)};
- _ -> error_message(Good, Bad, Total, Total, Access)
- end.
-
-
-resolve_read([First|Responses]) ->
- case First of
- not_found -> not_found;
- _ -> lists:foldr(fun vector_clock:resolve/2, First, Responses)
- end.
-
-
-resolve_write([First|Responses]) ->
- case First of
- not_found -> not_found;
- _ -> lists:foldr(fun vector_clock:resolve/2, First, Responses)
- end.
-
-
-resolve_not_found(Bad, R) ->
- {NotFoundCnt, DeletedCnt, OtherReasons} =
- lists:foldl(fun({Error,Reason}, {NotFoundAcc, DeletedAcc, ReasonAcc}) ->
- case {Error,Reason} of
- {not_found, {_Clock, [missing|_Rest]}} ->
- {NotFoundAcc+1, DeletedAcc, ReasonAcc};
- {not_found, {_Clock, [deleted|_Rest]}} ->
- {NotFoundAcc, DeletedAcc+1, ReasonAcc};
- _ ->
- {NotFoundAcc, DeletedAcc, [Reason|ReasonAcc]}
- end
- end, {0, 0, []}, Bad),
- % TODO: is the comparison to R good here, or should it be N-R?
- if
- NotFoundCnt >= R -> {true, {not_found, missing}, OtherReasons};
- DeletedCnt >= R -> {true, {not_found, deleted}, OtherReasons};
- true -> {false, other, OtherReasons}
- end.
-
-
-error_message(Good, Bad, N, T, Access) ->
- Msg = list_to_atom(lists:concat([atom_to_list(Access), "_quorum_not_met"])),
- ?LOG_ERROR("~p~nSuccess on ~p of ~p servers. Needed ~p. Errors: ~w"
- , [Msg, length(Good), N, T, Bad]),
- [{error, Msg}, {good, Good}, {bad, Bad}].
-
-
-pcall(MapFun, Servers, Const) ->
- Replies = lib_misc:pmap(MapFun, Servers, Const),
- lists:partition(fun valid/1, Replies).
-
-
-valid({ok, _}) -> true;
-valid(ok) -> true;
-valid(_) -> false.
-
-
-strip_ok({ok, Val}) -> Val;
-strip_ok(Val) -> Val.
-
-
-%% @spec get_dist_tasks(KeyFun::function(), KVPairs::list()) ->
-%% [{{Node::node(), Part::integer()}, SeqVals}]
-%% Type - ordered | ??
-%% SeqVals - [{Seq, Val}]
-%% @doc builds a distributed task list of nodes with a list of shard/values.
-%% This looks like a dict structure
-%% but is a list so we can use ?PMAP with the results
-%% @end
-get_dist_tasks(KeyFun, SeqsKVPairs) ->
- NPSV = lists:flatmap(fun({_,KVPair} = Elem) ->
- [{NP, Elem} || NP <- membership2:nodeparts_for_key(KeyFun(KVPair))]
- end, SeqsKVPairs),
- group_by_key(NPSV).
-
-group_by_key([]) ->
- [];
-group_by_key(List) ->
- [{FirstK,FirstV} | Rest] = lists:keysort(1,List),
- Acc0 = {FirstK, [FirstV], []},
- FoldFun = fun({K,V}, {K,Vs,Acc}) ->
- {K, [V|Vs], Acc};
- ({NewKey,V}, {OldKey,Vs,Acc}) ->
- {NewKey, [V], [{OldKey,Vs}|Acc]}
- end,
- {LastK, LastVs, Acc} = lists:foldl(FoldFun, Acc0, Rest),
- [{LastK, LastVs} | Acc].
-
-get_const(r) ->
- list_to_integer(couch_config:get("cluster", "r", "2"));
-get_const(w) ->
- list_to_integer(couch_config:get("cluster", "w", "2"));
-get_const(r1) ->
- 1;
-get_const(Other) ->
- throw({bad_access_term, Other}).
diff --git a/src/configuration.erl b/src/configuration.erl
deleted file mode 100644
index db44e83c..00000000
--- a/src/configuration.erl
+++ /dev/null
@@ -1,100 +0,0 @@
-%%% -*- erlang-indent-level:2 -*-
-%%%-------------------------------------------------------------------
-%%% File: configuration.erl
-%%% @author Cliff Moon <cliff@powerset.com>
-%%% @author Brad Anderson <brad@cloudant.com>
-%%% @copyright 2008 Cliff Moon
-%%% @doc
-%%% This module keeps Dynomite source relatively unchanged, but
-%%% reads from couchdb config stuffs
-%%% @end
-%%%
-%%% @since 2008-07-18 by Cliff Moon
-%%%-------------------------------------------------------------------
--module(configuration).
--author('cliff@powerset.com').
--author('brad@cloudant.com').
-
-%%-behaviour(gen_server).
-
-%% API
--export([start_link/1, get_config/1, get_config/0, set_config/1, stop/0]).
-
--include_lib("eunit/include/eunit.hrl").
-
--include("../include/config.hrl").
--include("../include/common.hrl").
-
--define(SERVER, couch_config).
--define(i2l(V), integer_to_list(V)).
--define(l2i(V), list_to_integer(V)).
-
-
-%% -----------------------------------------------------------------
-%% API
-%% -----------------------------------------------------------------
-
-%% @doc starts couch_config gen_server if it's not already started
-start_link(DynomiteConfig) ->
- couch_config_event:start_link(),
- couch_config:start_link([]),
- set_config(DynomiteConfig).
-
-
-%% @doc get the config for a remote node
-get_config(Node) ->
- ClusterConfig = rpc:call(Node, couch_config, get, ["cluster"]),
- Directory = rpc:call(Node, couch_config, get, ["couchdb", "database_dir"]),
- couch2dynomite_config(ClusterConfig, Directory).
-
-
-%% @doc get the config for the local node
-get_config() ->
- get_config(node()).
-
-
-%% @doc given a Dynomite config record, put the values into the Couch config
-set_config(DynomiteConfig) ->
- dynomite2couch_config(DynomiteConfig).
-
-
-%% @doc stop the config server (nothing to do until after couch_config refactor)
-stop() ->
- couch_config:stop().
-
-
-%% -----------------------------------------------------------------
-%% Internal functions
-%% -----------------------------------------------------------------
-
-%% @doc turn a couch config proplist into a dynomite configuration record
-couch2dynomite_config(ClusterConfig, Directory) ->
- Q = ?l2i(couch_util:get_value("q", ClusterConfig, "3")),
- R = ?l2i(couch_util:get_value("r", ClusterConfig, "2")),
- W = ?l2i(couch_util:get_value("w", ClusterConfig, "1")),
- N = ?l2i(couch_util:get_value("n", ClusterConfig, "4")),
- %% use couch's database_dir here, to avoid /tmp/data not existing
- Webport = ?l2i(couch_util:get_value("webport", ClusterConfig, "8080")),
- Meta = couch_util:get_value("meta", ClusterConfig, []),
- StorageMod = couch_util:get_value("storage_mod", ClusterConfig, []),
- #config{q=Q, r=R, w=W, n=N, directory=Directory, web_port=Webport,
- meta=Meta, storage_mod=StorageMod}.
-
-
-%% @doc workhorse for set_config/1 above
-dynomite2couch_config(DynomiteConfig) ->
- couch_config:set("cluster", "q", ?i2l(DynomiteConfig#config.q), false),
- couch_config:set("cluster", "r", ?i2l(DynomiteConfig#config.r), false),
- couch_config:set("cluster", "w", ?i2l(DynomiteConfig#config.w), false),
- couch_config:set("cluster", "n", ?i2l(DynomiteConfig#config.n), false),
- couch_config:set("couchdb", "database_dir", DynomiteConfig#config.directory,
- false),
- couch_config:set("cluster", "webport",
- case DynomiteConfig#config.web_port of
- undefined -> "8080";
- _ -> ?i2l(DynomiteConfig#config.web_port)
- end, false),
- couch_config:set("cluster", "meta", DynomiteConfig#config.meta, false),
- couch_config:set("cluster", "storage_mod",
- DynomiteConfig#config.storage_mod, false),
- ok.
diff --git a/src/dynomite_couch_api.erl b/src/dynomite_couch_api.erl
deleted file mode 100644
index 554b84f6..00000000
--- a/src/dynomite_couch_api.erl
+++ /dev/null
@@ -1,140 +0,0 @@
-%% This is a Dynomite plugin for calling the CouchDB raw Erlang API
-%%
-%% Most calls will have come from any of the web endpoints to execute
-%% these functions on the proper node for the key(s).
-
--module(dynomite_couch_api).
--author('brad@cloudant.com').
-
--export([create_db/1, delete_db/1, get/1, put/1,
- bulk_docs/1, missing_revs/1, get_db_info/1, get_view_group_info/1,
- ensure_full_commit/1
- ]).
-
--include("../../couch/src/couch_db.hrl").
--include("../include/common.hrl").
-
-
-%%--------------------------------------------------------------------
-%% @spec create_db([Part, DbName, Options]) -> {ok,Db} | {error,Error}
-%% Description: Creates the database shard.
-%%--------------------------------------------------------------------
-create_db([Part, DbName, Options]) ->
- case couch_server:create(partitions:shard_name(Part, DbName), Options) of
- {ok, Shard} ->
- couch_db:close(Shard),
- ok;
- Error -> Error
- end.
-
-
-%%--------------------------------------------------------------------
-%% @spec delete_db([Part, DbName, Options]) -> {ok,deleted} | {error,Error}
-%% Description: Deletes the database shard.
-%%--------------------------------------------------------------------
-delete_db([Part, DbName, Options]) ->
- couch_server:delete(partitions:shard_name(Part, DbName), Options).
-
-
-get([Part, Db, DocId, Revs, Options]) ->
- case showroom_db:open_shard(node(), Part, Db) of
- {ok, Shard} ->
- {Status, Doc} = couch_api:open_doc(Shard, DocId, Revs, Options),
- showroom_db:close_shard(Shard),
- {Status, {[], [Doc]}};
- Error ->
- Error
- end.
-
-
-put([Part, Db, Doc, Options]) ->
- case showroom_db:open_shard(node(), Part, Db) of
- {ok, Shard} ->
- {Status, NewRev} = couch_db:update_doc(Shard, Doc, Options),
- showroom_db:close_shard(Shard),
- {Status, [NewRev]};
- Error ->
- Error
- end.
-
-
-bulk_docs([Part, SeqsDocs, Db, Options, Type]) ->
- {Seqs, Docs} = lists:unzip(SeqsDocs),
- case Docs of
- [] -> {ok, []};
- _ ->
- case showroom_db:open_shard(node(), Part, Db) of
- {ok, Shard} ->
- {ok, Results1} = couch_db:update_docs(Shard, Docs, Options, Type),
- showroom_db:close_shard(Shard),
- Results = int_zip(Seqs, Results1),
- {ok, Results};
- Error ->
- Error
- end
- end.
-
-
-missing_revs([Part, SeqsIdsRevs, Db]) ->
- {_Seqs, IdsRevs} = lists:unzip(SeqsIdsRevs),
- case IdsRevs of
- [] -> {ok, []};
- _ ->
- case showroom_db:open_shard(node(), Part, Db) of
- {ok, Shard} ->
- {ok, Results1} = couch_db:get_missing_revs(Shard, IdsRevs),
- showroom_db:close_shard(Shard),
- {ok, Results1};
- Error ->
- Error
- end
- end.
-
-
-get_db_info([Part, Db]) ->
- case showroom_db:open_shard(node(), Part, Db) of
- {ok, Shard} ->
- {Status, Info} = couch_db:get_db_info(Shard),
- showroom_db:close_shard(Shard),
- {Status, {[], Info}};
- Error ->
- Error
- end.
-
-get_view_group_info([Part, Db, DesignId]) ->
- case showroom_db:open_shard(node(), Part, Db) of
- {ok, Shard} ->
- {ok, EmptyGroup} = showroom_view:build_skeleton_view_group(Db, DesignId),
- <<"S", ShardName/binary>> = Shard#db.name,
- {ok, Pid} = gen_server:call(couch_view, {get_group_server,
- ShardName, EmptyGroup}),
- {ok, Info} = couch_view_group:request_group_info(Pid),
- showroom_db:close_shard(Shard),
- {ok, {[], Info}};
- Error ->
- Error
- end.
-
-
-ensure_full_commit([Part, Db]) ->
- case showroom_db:open_shard(node(), Part, Db) of
- {ok, Shard} ->
- {Status, Info} = couch_db:ensure_full_commit(Shard),
- showroom_db:close_shard(Shard),
- {Status, {[], Info}};
- Error ->
- Error
- end.
-
-
-%% =======================
-%% internal
-%% =======================
-
-int_zip(Seqs, Docs) when length(Seqs) == length(Docs) ->
- lists:zip(Seqs, Docs);
-int_zip(_Seqs, []) ->
- [];
-int_zip(Seqs, Docs) ->
- ?debugFmt("~nWTF? int_zip~nSeqs: ~p~nDocs: ~p~n", [Seqs, Docs]),
- [].
diff --git a/src/dynomite_couch_storage.erl b/src/dynomite_couch_storage.erl
deleted file mode 100644
index 4fd21b80..00000000
--- a/src/dynomite_couch_storage.erl
+++ /dev/null
@@ -1,41 +0,0 @@
-%%%-------------------------------------------------------------------
-%%% File: dynomite_couch_storage.erl
-%%% @author Brad Anderson
-%%% @copyright 2009 Brad Anderson
-%%% @doc
-%%%
-%%% @end
-%%%
-%%% @since 2009-07-14
-%%%-------------------------------------------------------------------
--module(dynomite_couch_storage).
--author('brad@cloudant.com').
-
-%% API
--export([name/1, open/2, close/1, create/2]).
-%% , close/1, get/2, put/4, has_key/2, delete/2, fold/3
-
--include_lib("../include/common.hrl").
-
-%% -record(row, {key, context, values}).
-
-%%====================================================================
-%% API
-%%====================================================================
-
-name(Boundary) ->
- showroom_utils:int_to_hexstr(Boundary).
-
-open(Directory, Name) ->
-%% ?debugFmt("~nDirectory: ~p~nName : ~p~n", [Directory,Name]),
- {ok, {Directory, Name}}.
-
-close(_Table) -> ok.
-
-create(_Directory, _Name) ->
- ok.
-
-
-%%====================================================================
-%% Internal functions
-%%====================================================================
diff --git a/src/lib_misc.erl b/src/lib_misc.erl
deleted file mode 100644
index f5449295..00000000
--- a/src/lib_misc.erl
+++ /dev/null
@@ -1,235 +0,0 @@
--module(lib_misc).
-
--define(OFFSET_BASIS, 2166136261).
--define(FNV_PRIME, 16777619).
-
--export([rm_rf/1, pmap/3, succ/1, fast_acc/3, hash/1, hash/2, fnv/1,
- nthdelete/2, zero_split/1, nthreplace/3, rand_str/1, position/2,
- shuffle/1, floor/1, ceiling/1, time_to_epoch_int/1,
- time_to_epoch_float/1, now_int/0, now_float/0, byte_size/1, listify/1,
- reverse_bits/1]).
-
--include("../include/config.hrl").
--include("../include/profile.hrl").
-
-
-rm_rf(Name) when is_list(Name) ->
- case filelib:is_dir(Name) of
- false ->
- file:delete(Name);
- true ->
- case file:list_dir(Name) of
- {ok, Filenames} ->
- lists:foreach(fun rm_rf/1, [ filename:join(Name, F) || F <- Filenames]),
- file:del_dir(Name);
- {error, Reason} -> error_logger:info_msg("rm_rf failed because ~p~n", [Reason])
- end
- end.
-
-zero_split(Bin) ->
- zero_split(0, Bin).
-
-zero_split(N, Bin) when N > erlang:byte_size(Bin) -> Bin;
-
-zero_split(N, Bin) ->
- case Bin of
- <<_:N/binary, 0:8, _/binary>> -> split_binary(Bin, N);
- _ -> zero_split(N+1, Bin)
- end.
-
-rand_str(N) ->
- lists:map(fun(_I) ->
- random:uniform(26) + $a - 1
- end, lists:seq(1,N)).
-
-nthreplace(N, E, List) ->
- lists:sublist(List, N-1) ++ [E] ++ lists:nthtail(N, List).
-
-nthdelete(N, List) ->
- nthdelete(N, List, []).
-
-nthdelete(0, List, Ret) ->
- lists:reverse(Ret) ++ List;
-
-nthdelete(_, [], Ret) ->
- lists:reverse(Ret);
-
-nthdelete(1, [_E|L], Ret) ->
- nthdelete(0, L, Ret);
-
-nthdelete(N, [E|L], Ret) ->
- nthdelete(N-1, L, [E|Ret]).
-
-floor(X) ->
- T = erlang:trunc(X),
- case (X - T) of
- Neg when Neg < 0 -> T - 1;
- Pos when Pos > 0 -> T;
- _ -> T
- end.
-
-ceiling(X) ->
- T = erlang:trunc(X),
- case (X - T) of
- Neg when Neg < 0 -> T;
- Pos when Pos > 0 -> T + 1;
- _ -> T
- end.
-
-succ([]) ->
- [];
-
-succ(Str) ->
- succ_int(lists:reverse(Str), []).
-
-succ_int([Char|Str], Acc) ->
- if
- Char >= $z -> succ_int(Str, [$a|Acc]);
- true -> lists:reverse(lists:reverse([Char+1|Acc]) ++ Str)
- end.
-
-fast_acc(_, Acc, 0) -> Acc;
-
-fast_acc(Fun, Acc, N) ->
- fast_acc(Fun, Fun(Acc), N-1).
-
-shuffle(List) when is_list(List) ->
- [ N || {_R,N} <- lists:keysort(1, [{random:uniform(),X} || X <- List]) ].
-
-pmap(Fun, List, ReturnNum) ->
- N = if
- ReturnNum > length(List) -> length(List);
- true -> ReturnNum
- end,
- SuperParent = self(),
- SuperRef = erlang:make_ref(),
- Ref = erlang:make_ref(),
- %% we spawn an intermediary to collect the results
- %% this is so that there will be no leaked messages sitting in our mailbox
- Parent = spawn(fun() ->
- L = gather(N, length(List), Ref, []),
- SuperParent ! {SuperRef, pmap_sort(List, L)}
- end),
- Pids = [spawn(fun() ->
- Parent ! {Ref, {Elem, (catch Fun(Elem))}}
- end) || Elem <- List],
- Ret = receive
- {SuperRef, Ret1} -> Ret1
- end,
- % i think we need to cleanup here.
- lists:foreach(fun(P) -> exit(P, die) end, Pids),
- Ret.
-
-pmap_sort(Original, Results) ->
- pmap_sort([], Original, lists:reverse(Results)).
-
-% pmap_sort(Sorted, [], _) -> lists:reverse(Sorted);
-pmap_sort(Sorted, _, []) -> lists:reverse(Sorted);
-pmap_sort(Sorted, [E|Original], Results) ->
- case lists:keytake(E, 1, Results) of
- {value, {E, Val}, Rest} -> pmap_sort([Val|Sorted], Original, Rest);
- false -> pmap_sort(Sorted, Original, Results)
- end.
-
-gather(_, Max, _, L) when length(L) == Max -> L;
-gather(0, _, _, L) -> L;
-gather(N, Max, Ref, L) ->
- receive
- {Ref, {Elem, {not_found, Ret}}} -> gather(N, Max, Ref, [{Elem, {not_found, Ret}}|L]);
- {Ref, {Elem, {badrpc, Ret}}} -> gather(N, Max, Ref, [{Elem, {badrpc, Ret}}|L]);
- {Ref, {Elem, {'EXIT', Ret}}} -> gather(N, Max, Ref, [{Elem, {'EXIT', Ret}}|L]);
- {Ref, Ret} -> gather(N-1, Max, Ref, [Ret|L])
- end.
-
-get_hash_module(#config{hash_module=HashModule}) ->
- HashModule.
-
-hash(Term) ->
- HashModule = get_hash_module(configuration:get_config()),
- ?prof(hash),
- R = HashModule:hash(Term),
- ?forp(hash),
- R.
-
-hash(Term, Seed) ->
- HashModule = get_hash_module(configuration:get_config()),
- ?prof(hash),
- R = HashModule:hash(Term, Seed),
- ?forp(hash),
- R.
-
-%32 bit fnv. magic numbers ahoy
-fnv(Term) when is_binary(Term) ->
- fnv_int(?OFFSET_BASIS, 0, Term);
-
-fnv(Term) ->
- fnv_int(?OFFSET_BASIS, 0, term_to_binary(Term)).
-
-fnv_int(Hash, ByteOffset, Bin) when erlang:byte_size(Bin) == ByteOffset ->
- Hash;
-
-fnv_int(Hash, ByteOffset, Bin) ->
- <<_:ByteOffset/binary, Octet:8, _/binary>> = Bin,
- Xord = Hash bxor Octet,
- fnv_int((Xord * ?FNV_PRIME) rem (2 bsl 31), ByteOffset+1, Bin).
-
-position(Predicate, List) when is_function(Predicate) ->
- position(Predicate, List, 1);
-
-position(E, List) ->
- position(E, List, 1).
-
-position(Predicate, [], _N) when is_function(Predicate) -> false;
-
-position(Predicate, [E|List], N) when is_function(Predicate) ->
- case Predicate(E) of
- true -> N;
- false -> position(Predicate, List, N+1)
- end;
-
-position(_, [], _) -> false;
-
-position(E, [E|_List], N) -> N;
-
-position(E, [_|List], N) -> position(E, List, N+1).
-
-now_int() ->
- time_to_epoch_int(now()).
-
-now_float() ->
- time_to_epoch_float(now()).
-
-time_to_epoch_int(Time) when is_integer(Time) or is_float(Time) ->
- Time;
-
-time_to_epoch_int({Mega,Sec,_}) ->
- Mega * 1000000 + Sec.
-
-time_to_epoch_float(Time) when is_integer(Time) or is_float(Time) ->
- Time;
-
-time_to_epoch_float({Mega,Sec,Micro}) ->
- Mega * 1000000 + Sec + Micro / 1000000.
-
-byte_size(List) when is_list(List) ->
- lists:foldl(fun(El, Acc) -> Acc + lib_misc:byte_size(El) end, 0, List);
-
-byte_size(Term) ->
- erlang:byte_size(Term).
-
-listify(List) when is_list(List) ->
- List;
-
-listify(El) -> [El].
-
-reverse_bits(V) when is_integer(V) ->
- % swap odd and even bits
- V1 = ((V bsr 1) band 16#55555555) bor (((V band 16#55555555) bsl 1) band 16#ffffffff),
- % swap consecutive pairs
- V2 = ((V1 bsr 2) band 16#33333333) bor (((V1 band 16#33333333) bsl 2) band 16#ffffffff),
- % swap nibbles ...
- V3 = ((V2 bsr 4) band 16#0F0F0F0F) bor (((V2 band 16#0F0F0F0F) bsl 4) band 16#ffffffff),
- % swap bytes
- V4 = ((V3 bsr 8) band 16#00FF00FF) bor (((V3 band 16#00FF00FF) bsl 8) band 16#ffffffff),
- % swap 2-byte long pairs
- ((V4 bsr 16) band 16#ffffffff) bor ((V4 bsl 16) band 16#ffffffff).
diff --git a/src/mem_utils.erl b/src/mem_utils.erl
deleted file mode 100644
index ffefd5cb..00000000
--- a/src/mem_utils.erl
+++ /dev/null
@@ -1,129 +0,0 @@
--module(mem_utils).
-
--export([fix_mappings/3, get_remote_fullmap/1, join_type/3, pmap_from_full/1,
- nodeparts_up/1, remove_partition/3, use_persistent/2,
- was_i_nodedown/2]).
-
--include("../include/common.hrl").
-
-join_type(Node, Fullmap, Options) ->
- case proplists:get_value(replace, Options) of
- undefined ->
- case lists:filter(fun({N,_P,_T}) -> N =:= Node end, Fullmap) of
- [] -> new;
- _ -> rejoin
- end;
- OldNode when is_atom(OldNode) ->
- % not a particularly strong guard, but will have to do
- {replace, OldNode};
- _ -> new
- end.
-
-
-%% @doc return a {PMap, Fullmap} tuple that has corrections for
-%% down, rejoining, or replacing Node
-fix_mappings(nodedown, Node, OldFullmap) ->
- fix_mappings_fold(fun({N,P,T}, AccIn) ->
- case {N,T} of
- {Node, {nodedown, Type}} ->
- % already marked as nodedown, so leave it
- [{N,P, {nodedown, Type}} | AccIn];
- {Node, _} ->
- % mark it as nodedown
- [{N,P, {nodedown, T}} | AccIn];
- _ -> [{N,P,T} | AccIn]
- end
- end, [], OldFullmap);
-
-fix_mappings(rejoin, Node, OldFullmap) ->
- fix_mappings_fold(fun({N,P,{nodedown,T}}, AccIn) when N =:= Node ->
- [{N,P,T} | AccIn];
- (NPT, AccIn) -> [NPT | AccIn]
- end, [], OldFullmap);
-
-fix_mappings(replace, {OldNode, NewNode}, OldFullmap) ->
- fix_mappings_fold(fun({N,P,T}, AccIn) ->
- case {N, T} of
- {OldNode, {nodedown,T1}} -> [{NewNode,P,T1} | AccIn];
- {OldNode, _} -> [{NewNode,P,T} | AccIn];
- _ -> [{N,P,T} | AccIn]
- end
- end, [], OldFullmap).
-
-
-fix_mappings_fold(Fun, Acc0, OldFullmap) ->
- NewFullmap = lists:foldl(Fun, Acc0, OldFullmap),
- NewPMap = pmap_from_full(NewFullmap),
- {NewPMap, NewFullmap}.
-
-
-%% @doc create a PMap (primary nodes only) from provided Fullmap
-%% If a primary node is down, a partner will be supplied
-pmap_from_full(Fullmap) ->
- NodePartList = nodeparts_up(Fullmap),
- lists:keysort(2,lists:foldl(fun({N,P,T}, AccIn) ->
- case T of
- primary -> [{N,P} | AccIn];
- {nodedown, primary} ->
- NewNode = case lists:delete(N,
- membership2:nodes_for_part(P, NodePartList)) of
- [First|_] -> First;
- [] -> N % wtf, are all partners down too?
- end,
- [{NewNode,P} | AccIn];
- _ -> AccIn
- end
- end, [], Fullmap)).
-
-
-nodeparts_up(Fullmap) ->
- lists:foldl(fun({_N,_P,{nodedown,_}}, AccIn) -> AccIn;
- ({N,P,_T}, AccIn) -> [{N,P} | AccIn]
- end, [], Fullmap).
-
-
-
-%% @doc if Node is in the Fullmap as {nodedown,_} return true
-was_i_nodedown(Node, Fullmap) ->
- lists:member(yes, lists:map(fun({N,_P,{nodedown,_T}}) ->
- case N of
- Node -> yes;
- _ -> no
- end;
- (_) -> no
- end, Fullmap)).
-
-
-remove_partition(FullMap, Node, Partition) ->
- case lists:filter(
- fun({N,P,_Type}) -> N =:= Node andalso P =:= Partition end,
- FullMap) of
- [Elem|_] ->
- lists:delete(Elem, FullMap);
- Other ->
- ?LOG_ERROR("~nNo partition to remove: ~p~n"
- "Node: ~p~nPartition: ~p~n", [Other, Node, Partition]),
- FullMap
- end.
-
-
-use_persistent(_PartnersPlus, undefined) ->
- false;
-
-use_persistent(PartnersPlus, _PersistentParts) ->
- % get a fullmap from a partner
- % this may need rework for network partitions, as you could get a bad
- % fullmap from another node that was partitioned w/ this one :\
- RemoteFullmap = get_remote_fullmap(PartnersPlus),
- % return opposite of was_i_nodedown
- not mem_utils:was_i_nodedown(node(), RemoteFullmap).
-
-
-get_remote_fullmap([]) ->
- []; % no remote fullmap available, so return empty list
-
-get_remote_fullmap([Node|Rest]) ->
- case gen_server:call({membership, Node}, fullmap) of
- {ok, Fullmap} -> Fullmap;
- _ -> get_remote_fullmap(Rest)
- end.
diff --git a/src/membership2.erl b/src/membership2.erl
deleted file mode 100644
index 4c4780c3..00000000
--- a/src/membership2.erl
+++ /dev/null
@@ -1,686 +0,0 @@
-%%%-------------------------------------------------------------------
-%%% File: membership2.erl
-%%% @author Cliff Moon <cliff@powerset.com> []
-%%% @copyright 2009 Cliff Moon
-%%% @doc
-%%%
-%%% @end
-%%%
-%%% @since 2009-05-04 by Cliff Moon
-%%%-------------------------------------------------------------------
--module(membership2).
--author('cliff@powerset.com').
--author('brad@cloudant.com').
-
--behaviour(gen_server).
-
-%% API
--export([start_link/2, start_link/3, stop/1, check_nodes/0,
- partitions/0, partition_for_key/1, fullmap/0,
- all_nodes_parts/1, clock/0,
- nodes/0, nodeparts_for_key/1, nodes_for_part/1, nodes_for_part/2,
- nodes_for_shard/1, nodes_down/0,
- parts_for_node/1,
- take_offline/2, bring_online/2,
- decommission_part/3, pp_fullmap/0, snafu/1, snafu/3]).
-
-
-%% gen_server callbacks
--export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3]).
-
-%% includes
--include("../include/config.hrl").
--include("../include/common.hrl").
--include("../include/profile.hrl").
--include_lib("eunit/include/eunit.hrl").
-
-%%====================================================================
-%% API
-%%====================================================================
-%% @doc Starts the server
-%% @end
-%%--------------------------------------------------------------------
-
-start_link(Node, Nodes) ->
- start_link(Node, Nodes, []).
-
-
-start_link(Node, Nodes, Args) ->
- gen_server:start_link({local, membership}, ?MODULE, [Node, Nodes, Args], []).
-
-
-stop(Server) ->
- gen_server:cast(Server, stop).
-
-
-%% @doc for when things have really gone south. Install a new state on all
-%% nodes, given a filename, or node list, partition map, and fullmap.
-%% @end
-snafu(Filename) ->
- NewState = case file:consult(Filename) of
- {ok, [Terms]} ->
- Terms;
- Error ->
- throw(Error)
- end,
- #membership{nodes=Nodes, partitions=PMap, fullmap=Fullmap} = NewState,
- snafu(Nodes, PMap, Fullmap).
-
-
-snafu(Nodes, PMap, Fullmap) ->
- NewState = #membership{node=node(), nodes=Nodes,
- partitions=PMap, fullmap=Fullmap, version=vector_clock:create(dbcore)},
- update_ets(ets_name(node()), NewState),
- fire_gossip(node(), Nodes, NewState),
- save(NewState).
-
-
-check_nodes() ->
- ErlangNodes = lists:usort([node() | erlang:nodes()]),
- {ok, MemNodeList} = membership2:nodes(),
- MemNodes = lists:usort(MemNodeList),
- {PMapNodeList, _PMapPartList} = lists:unzip(partitions()),
- PMapNodes = lists:usort(PMapNodeList),
- case ErlangNodes =:= MemNodes andalso
- ErlangNodes =:= PMapNodes andalso
- MemNodes =:= PMapNodes of
- true -> true;
- _ ->
- Msg = "membership: Node Lists do not match.~n"
- "Erlang Nodes : ~p~n"
- "Membership Nodes : ~p~n"
- "PMap Nodes : ~p~n",
- Lst = [ErlangNodes, MemNodes, PMapNodes],
- showroom_log:message(error, Msg, Lst),
- io:format(Msg, Lst),
- false
- end.
-
-
-%% @doc retrieve the primary partition map. This is a list of partitions and
-%% their corresponding primary node, no replication partner nodes.
-partitions() ->
- ets_pmap().
-
-
-%% @doc retrieve the full partition map, like above, but including replication
-%% partner nodes. List should number 2^Q * N
-fullmap() ->
- lists:keysort(2, ets_fullmap()).
-
-
-%% @doc pretty-print the full partition map (sorted by node, then part)
-pp_fullmap() ->
- lists:foreach(
- fun({N,P}) ->
- io:format("~-60s ~s~n", [N, showroom_utils:int_to_hexstr(P)])
- end,
- lists:sort(membership2:all_nodes_parts(true))).
-
-
-%% @doc get the current vector clock from membership state
-clock() ->
- gen_server:call(membership, clock).
-
-
-%% @doc get the list of cluster nodes (according to membership module)
-%% This may differ from erlang:nodes()
-nodes() ->
- gen_server:call(membership, nodes).
-
-
-%% @doc get all the responsible nodes for a given partition, including
-%% replication partner nodes
-nodes_for_part(Part) ->
- nodes_for_part(Part, all_nodes_parts(true)).
-
-
-nodes_for_part(Part, NodePartList) ->
- Filtered = lists:filter(fun({_N, P}) -> P =:= Part end, NodePartList),
- {Nodes, _Parts} = lists:unzip(Filtered),
- lists:usort(Nodes).
-
-
-nodes_for_shard(ShardName) when is_binary(ShardName) ->
- nodes_for_shard(binary_to_list(ShardName));
-
-nodes_for_shard(ShardName) when is_list(ShardName) ->
- HexPart = case string:rchr(ShardName, $_) + 1 of
- 1 -> ShardName;
- Last -> string:substr(ShardName, Last)
- end,
- Int = showroom_utils:hexstr_to_int(HexPart),
- {_, Parts} = lists:unzip(membership2:partitions()),
- nodes_for_part(partitions:int_to_partition(Int, Parts)).
-
-
-%% @doc get all the responsible nodes and partitions for a given key, including
-%% nodes/parts on replication partner nodes
-nodeparts_for_key(Key) ->
- int_node_parts_for_key(Key).
-
-
-%% @doc get a list of all the nodes marked down in this node's fullmap
-nodes_down() ->
- Downs = lists:foldl(fun({N,_P,{nodedown, _T}}, AccIn) -> [N|AccIn];
- (_, AccIn) -> AccIn end, [], fullmap()),
- lists:usort(Downs).
-
-
-%% @doc return the partition responsible for the given Key
-partition_for_key(Key) ->
- Config = configuration:get_config(),
- Hash = lib_misc:hash(Key),
- partitions:hash_to_partition(Hash, Config#config.q).
-
-
-%% @doc return the partitions that reside on a given node
-parts_for_node(Node) ->
- lists:sort(lists:foldl(fun({N,P,_Type}, AccIn) ->
- case N of
- Node -> [P | AccIn];
- _ -> AccIn
- end
- end, [], fullmap())).
-
-
-%% @doc get all the nodes and partitions in the cluster. Depending on the
-%% AllPartners param, you get only primary nodes or replication partner
-%% nodes, as well.
-%% No nodes/parts currently down are returned.
-all_nodes_parts(false) ->
- ets_pmap();
-all_nodes_parts(true) ->
- mem_utils:nodeparts_up(ets_fullmap()).
-
-
-%% @doc If a local storage server exists for this partition it will be taken
-%% out of rotation until put back in.
-%% @end
-take_offline(Node, Partition) when Node =:= node() ->
- gen_server:call(membership, {take_offline, Partition});
-
-take_offline(Node, Partition)->
- gen_server:call({membership, Node}, {take_offline, Partition}).
-
-
-%% @doc Brings a storage server that has been taken offline back online.
-%% @end
-bring_online(Node, Partition) ->
- showroom_log:message(debug, "membership: bring_online Node: ~p Partition: ~p",
- [Node, Partition]),
- gen_server:call({membership, Node}, {bring_online, Partition}).
-
-
-%% @doc cleans up the remaining .couch shard/partition file after it has been
-%% moved to a new node.
-decommission_part(Node, Part, DbName) ->
- gen_server:cast({membership, Node}, {decommission, Part, DbName}).
-
-
-%%====================================================================
-%% gen_server callbacks
-%%====================================================================
-
-%%--------------------------------------------------------------------
-%% @spec init(Args) -> {ok, State} |
-%% {ok, State, Timeout} |
-%% ignore |
-%% {stop, Reason}
-%% @doc Initiates the server
-%% @end
-%%--------------------------------------------------------------------
-init([Node, Nodes, Args]) ->
- process_flag(trap_exit,true),
- showroom_log:message(info, "membership: membership server starting...", []),
- Options = lists:flatten(Args),
- showroom_log:message(info, "membership: options ~p", [Options]),
- net_kernel:monitor_nodes(true),
- Config = configuration:get_config(),
- PersistentState=#membership{partitions=PersistentParts} = load(Node),
- PartnersPlus = replication:partners_plus(Node, Nodes),
- State =
- case mem_utils:use_persistent(PartnersPlus, PersistentParts) of
- false ->
- showroom_log:message(info, "membership: not using persisted state", []),
- % didn't find persistent state on disk or this node was nodedown
- % so we don't want to use persisted state
- PartialNodes = lists:usort(Nodes),
- {NewVersion, RemoteNodes, NewPMap1, NewFullMap1} =
- join_to(Node, PartnersPlus, Options),
- NewWorldNodes = lists:usort(PartialNodes ++ RemoteNodes),
- NewPMap = case NewPMap1 of
- [] -> partitions:create_partitions(Config#config.q, Node,
- NewWorldNodes);
- _ -> NewPMap1
- end,
- NewFullMap = case NewFullMap1 of
- [] -> make_all_nodes_parts(NewPMap);
- _ -> NewFullMap1
- end,
- #membership{
- node=Node,
- nodes=NewWorldNodes,
- partitions=lists:keysort(2,NewPMap),
- % version=vector_clock:increment(dbcore, NewVersion),
- version=NewVersion,
- fullmap=NewFullMap};
- _ ->
- % found persistent state on disk
- showroom_log:message(info, "membership: using persisted state", []),
- case Options of
- [] -> ok;
- _ ->
- showroom_log:message(info, "membership: options ~p ignored.", [Options])
- end,
- %% fire gossip even if state comes from disk
- fire_gossip(Node, Nodes, PersistentState),
- PersistentState
- end,
- save(State),
- % ets table is an optimization for cluster_ops performance
- Ets = ets:new(ets_name(Node), [public, set, named_table]),
- update_ets(Ets, State),
- {ok, State}.
-
-
-%%--------------------------------------------------------------------
-%% @spec
-%% handle_call(Request, From, State) -> {reply, Reply, State} |
-%% {reply, Reply, State, Timeout} |
-%% {noreply, State} |
-%% {noreply, State, Timeout} |
-%% {stop, Reason, Reply, State} |
-%% {stop, Reason, State}
-%% @doc Handling call messages
-%% @end
-%%--------------------------------------------------------------------
-
-%% join
-handle_call({join, JoiningNode, Options}, _From,
- State = #membership{version=Version, node=Node, nodes=Nodes,
- partitions=Partitions, fullmap=OldFullMap}) ->
- JoinType = mem_utils:join_type(JoiningNode, OldFullMap, Options),
- showroom_log:message(alert, "membership: node ~p wants to join, type '~p'",
- [JoiningNode, JoinType]),
- {PMap, NewFullmap} = case JoinType of
- rejoin ->
- mem_utils:fix_mappings(rejoin, JoiningNode, OldFullMap);
- {replace, OldNode} ->
- mem_utils:fix_mappings(replace, {OldNode, JoiningNode}, OldFullMap);
- new ->
- Hints = proplists:get_value(hints, Options),
- PMap1 = case partitions:join(JoiningNode, Partitions, Hints) of
- {ok, Table} -> Table;
- {error, Error, _Table} -> throw({join_error, Error})
- end,
- Fullmap1 = make_all_nodes_parts(PMap1),
- {PMap1, Fullmap1}
- end,
- WorldNodes = lists:usort(Nodes ++ [JoiningNode]),
- NewVersion = vector_clock:increment(dbcore, Version),
- NewState1 = State#membership{nodes=WorldNodes, partitions=PMap,
- version=NewVersion},
- {Fullmap, NewState2} = case proplists:get_value(bootstrap, Options) of
- true ->
- % join not complete until bootstrap finishes,
- % so this NewState isn't the final (i.e. NewState1 will be installed)
- showroom_log:message(info, "membership: bootstrap process starting", []),
- bootstrap_manager:start_bootstrap(NewState1, OldFullMap, NewFullmap);
- _ ->
- % no bootstrap, so install NewFullmap now
- showroom_log:message(info, "membership: no bootstrap", []),
- {NewFullmap, NewState1#membership{fullmap=NewFullmap}}
- end,
- save(NewState2),
- update_ets(ets_name(node()), NewState2),
- notify(node_join, [JoiningNode]),
- fire_gossip(Node, WorldNodes, NewState2),
- % If we're bootstrapping, then the join is not complete.
- % So return FullMap for now. bootstrap_manager:end_bootstrap will fix it
- {reply, {ok, NewVersion, WorldNodes, PMap, Fullmap}, NewState2};
-
-%% clock
-handle_call(clock, _From, State = #membership{version=Version}) ->
- {reply, Version, State};
-
-%% state
-handle_call(state, _From, State) ->
- {reply, State, State};
-
-%% newfullmap
-handle_call({newfullmap, NewFullMap}, _From,
- State = #membership{node=Node, nodes=Nodes, version=Version}) ->
- NewVersion = vector_clock:increment(dbcore, Version),
- NewState = State#membership{version=NewVersion, fullmap=NewFullMap},
- save(NewState),
- update_ets(ets_name(node()), NewState),
- fire_gossip(Node, Nodes, NewState),
- {reply, installed, NewState};
-
-%% partitions
-handle_call(partitions, _From, State = #membership{partitions=Parts}) ->
- {reply, {ok, Parts}, State};
-
-%% fullmap
-handle_call(fullmap, _From, State = #membership{fullmap=FullMap}) ->
- {reply, {ok, FullMap}, State};
-
-%% nodes
-handle_call(nodes, _From, State = #membership{nodes=Nodes}) ->
- {reply, {ok, Nodes}, State};
-
-%% take_offline
-handle_call({take_offline, Partition}, _From,
- State = #membership{node=Node, nodes=Nodes, fullmap=OldFullMap}) ->
- showroom_log:message(info, "membership: take_offline Node: ~p Partition: ~p",
- [Node, Partition]),
- NewFullMap = mem_utils:remove_partition(OldFullMap, Node, Partition),
- NewState = State#membership{fullmap=NewFullMap},
- fire_gossip(Node, Nodes, NewState),
- update_ets(ets_name(node()), NewState),
- {reply, {offline, Node, Partition}, NewState};
-
-%% at least reply that this 'catch-all' was ignored
-handle_call(_Request, _From, State) ->
- {reply, ignored, State}.
-
-
-%%--------------------------------------------------------------------
-%% @spec handle_cast(Msg, State) -> {noreply, State} |
-%% {noreply, State, Timeout} |
-%% {stop, Reason, State}
-%% @doc Handling cast messages
-%% @end
-%%--------------------------------------------------------------------
-
-handle_cast({gossip, RemoteState = #membership{node=RemoteNode}},
- LocalState = #membership{node=_Me}) ->
- showroom_log:message(info, "membership: received gossip from ~p",
- [RemoteNode]),
- {MergeType, MergedState = #membership{nodes=_MergedNodes}} =
- merge_state(RemoteState, LocalState),
- case MergeType of
- equal -> {noreply, MergedState};
- merged ->
- showroom_log:message(info, "membership: merged new gossip", []),
- % fire_gossip(Me, MergedNodes, MergedState),
- update_ets(ets_name(node()), MergedState),
- save(MergedState),
- {noreply, MergedState}
- end;
-
-% decommission
-% renaming for now, until case 1245 can be completed
-handle_cast({decommission, Part, DbName}, State) ->
- {{Y,Mon,D}, {H,Min,S}} = calendar:universal_time(),
- Directory = couch_config:get("couchdb", "database_dir"),
- OrigFilename = showroom_utils:full_filename(Part, DbName, Directory),
- Moved = lists:flatten(io_lib:format(".~w~2.10.0B~2.10.0B." ++
- "~2.10.0B~2.10.0B~2.10.0B.moved.couch", [Y,Mon,D,H,Min,S])),
- % Note: this MovedFilename bit below gives weird results:
- % ["/Users/brad/dev/erlang/dbcore/tmp/lib/x800000/test_800000",
- % ".20091001.162640.moved.couch"] but list/string behavior handles it.
- MovedFilename = lists:map(fun(E) -> binary_to_list(E) end,
- re:replace(OrigFilename, "\.couch", Moved, [])),
- ok = file:rename(OrigFilename, MovedFilename),
- {noreply, State}.
-
-
-%% @doc handle nodedown messages because we have
-%% net_kernel:monitor_nodes(true)
-handle_info({nodedown, Node},
- State = #membership{nodes=OldNodes, fullmap=OldFullmap,
- version=OldVersion}) ->
- showroom_log:message(alert, "membership: nodedown from ~p", [Node]),
- case lists:member(Node, OldNodes) of
- true ->
- notify(nodedown, [Node]),
- % clean up membership state
- Nodes = lists:delete(Node, OldNodes),
- {PMap, Fullmap} = mem_utils:fix_mappings(nodedown, Node, OldFullmap),
- % Do we increment clock here? w/o gossip?
- % This is happening near simultaneously on the other nodes, too :\
- % Only reason to increment is persisted clock on down node will be older
- % when it returns
- Version = vector_clock:increment(dbcore, OldVersion),
- NewState = State#membership{nodes=Nodes, partitions=PMap, fullmap=Fullmap,
- version=Version},
- update_ets(ets_name(node()), NewState),
- save(NewState),
- {noreply, NewState};
- _ -> {noreply, State}
- end;
-
-%% @doc handle nodeup messages because we have
-%% net_kernel:monitor_nodes(true)
-handle_info({nodeup, Node}, State) ->
- showroom_log:message(alert, "membership: nodeup Node: ~p", [Node]),
- {noreply, State};
-
-handle_info(Info, State) ->
- showroom_log:message(info, "membership: handle_info Info: ~p", [Info]),
- {noreply, State}.
-
-%%--------------------------------------------------------------------
-%% @spec terminate(Reason, State) -> void()
-%% @doc This function is called by a gen_server when it is about to
-%% terminate. It should be the opposite of Module:init/1 and do any necessary
-%% cleaning up. When it returns, the gen_server terminates with Reason.
-%% The return value is ignored.
-%% @end
-%%--------------------------------------------------------------------
-terminate(_Reason, _State) ->
- ok.
-
-%% 0.5.6 to 0.5.7
-code_change(184380560337424323902805568963460261434, State, _Extra) ->
- backup_old_config_file(),
- % update State to the new version
- {membership, _Hdr, Node, Nodes, PMap, Version} = State,
- NewState = #membership{
- node = Node,
- nodes = Nodes,
- partitions = PMap,
- version = Version,
- fullmap = make_all_nodes_parts(PMap)
- },
- save(NewState),
- % also create new ets table
- Ets = ets:new(ets_name(Node), [public, set, named_table]),
- update_ets(Ets, NewState),
- {ok, NewState};
-
-%% 0.8.8 to 0.9.0
-code_change(239470595681156900105628017899543243419, State, _Extra) ->
- net_kernel:monitor_nodes(true),
- {ok, State};
-
-code_change(OldVsn, State, _Extra) ->
- io:format("Unknown Old Version!~nOldVsn: ~p~nState : ~p~n", [OldVsn, State]),
- {ok, State}.
-
-%%--------------------------------------------------------------------
-%%% Internal functions
-%%--------------------------------------------------------------------
-
-backup_old_config_file() ->
- Config = configuration:get_config(),
- FileName = filename:join([Config#config.directory,
- lists:concat([node:name(node()), ".state"])]),
- BackupName = filename:join([Config#config.directory,
- lists:concat([node:name(node()), ".state.bak"])]),
- file:copy(FileName, BackupName).
-
-
-%% return State from membership file
-load(Node) ->
- Config = configuration:get_config(),
- case file:consult(filename:join([Config#config.directory,
- lists:concat([node:name(Node), ".state"])])) of
- {error, Reason} ->
- showroom_log:message(info, "membership: could not load state: ~p~n",
- [Reason]),
- #membership{nodes=[]};
- {ok, [Terms]} ->
- Terms
- end.
-
-
-%% save the State to a file
-save(State) ->
- Config = configuration:get_config(),
- Filename = filename:join([Config#config.directory,
- lists:concat([node:name(State#membership.node), ".state"])]),
- {ok, File} = file:open(Filename, [binary, write]),
- io:format(File, "~w.~n", [State]),
- file:close(File).
-
-
-%% joining is bi-directional, as opposed to gossip which is unidirectional
-%% we want to collect the list of known nodes to compute the partition map
-%% which isn't necessarily the same as the list of running nodes
-join_to(Node, Partners, Options) ->
- join_to(Node, Partners,
- {vector_clock:create(dbcore), [], [], []}, Options).
-
-
-%% @doc join this node to one of its partners (or PartnersPlus if no partners
-%% are available).
-join_to(_, [], {Version, World, PMap, FullMap}, _Options) ->
- {Version, World, PMap, FullMap};
-
-join_to(Node, [Partner|Rest], {Version, World, PMap, FullMap}, Options) ->
- case call_join(Partner, Node, Options) of
- {ok, RemoteVersion, NewNodes, NewPMap, NewFullMap} ->
- {vector_clock:merge(Version, RemoteVersion),
- lists:usort(World ++ NewNodes),
- NewPMap,
- NewFullMap};
- Other ->
- showroom_log:message(info, "membership: join_to Other: ~p~n", [Other]),
- join_to(Node, Rest, {Version, World, PMap, FullMap}, Options)
- end.
-
-
-%% @doc make the join call to Remote node (usually a partner of Node)
-call_join(Remote, Node, Options) ->
- showroom_log:message(info, "membership: call_join From: ~p To: ~p",
- [Node, Remote]),
- catch gen_server:call({membership, node:name(Remote)},
- {join, Node, Options}).
-
-
-merge_state(_RemoteState=#membership{version=RemoteVersion, nodes=RemoteNodes,
- partitions=RemotePMap,
- fullmap=RemoteFullMap},
- LocalState=#membership{version=LocalVersion, nodes=LocalNodes,
- partitions=LocalPMap,
- fullmap=LocalFullMap}) ->
- case vector_clock:equals(RemoteVersion, LocalVersion) of
- true ->
- {equal, LocalState};
- false ->
- % Note, we're matching MergedVersion from these funs.
- % They should be the same.
- {MergedVersion, MergedNodes} =
- merge_nodes(RemoteVersion, RemoteNodes, LocalVersion, LocalNodes),
- {MergedVersion, MergedPMap} =
- merge_pmaps(RemoteVersion, RemotePMap, LocalVersion, LocalPMap),
- {MergedVersion, MergedFullMap} =
- merge_fullmaps(RemoteVersion, RemoteFullMap,
- LocalVersion, LocalFullMap),
-
- % notify of arrivals & departures
- Arrived = MergedNodes -- LocalNodes,
- notify(node_join, Arrived),
- % Departed = LocalNodes -- MergedNodes,
- % notify(node_leave, Departed),
-
- {merged, LocalState#membership{version=MergedVersion, nodes=MergedNodes,
- partitions=MergedPMap,
- fullmap=MergedFullMap}}
- end.
-
-
-merge_nodes(RemoteVersion, RemoteNodes, LocalVersion, LocalNodes) ->
- {MergedVersion, Merged} = vector_clock:resolve({RemoteVersion, RemoteNodes},
- {LocalVersion, LocalNodes}),
- {MergedVersion, lists:usort(Merged)}.
-
-
-merge_pmaps(RemoteVersion, RemotePMap, LocalVersion, LocalPMap) ->
- {MergedVersion, Merged} = vector_clock:resolve({RemoteVersion, RemotePMap},
- {LocalVersion, LocalPMap}),
- {MergedVersion, lists:ukeysort(2, Merged)}.
-
-
-merge_fullmaps(RemoteVersion, RemoteFullMap, LocalVersion, LocalFullMap) ->
- {MergedVersion, Merged} = vector_clock:resolve({RemoteVersion, RemoteFullMap},
- {LocalVersion, LocalFullMap}),
- {MergedVersion, lists:usort(Merged)}.
-
-
-notify(Type, Nodes) ->
- lists:foreach(fun(Node) ->
- gen_event:notify(membership_events, {Type, Node})
- end, Nodes).
-
-
-%% @doc fires a gossip message (membership state) to partners nodes in the
-%% cluster.
-%% @end
-fire_gossip(Me, WorldNodes, Gossip) ->
- % GossipPartners = partners_plus(Me, WorldNodes),
- % random experiment, gossip with all nodes, not just partners_plus
- GossipPartners = lists:delete(Me, WorldNodes),
- lists:foreach(fun(TargetNode) ->
- showroom_log:message(info, "membership: firing gossip from ~p to ~p",
- [Me, TargetNode]),
- gen_server:cast({membership, TargetNode}, {gossip, Gossip})
- end, GossipPartners).
-
-
-%% @doc construct a table with all partitions, with the primary node and all
-%% replication partner nodes as well.
-make_all_nodes_parts(PMap) ->
- {Nodes, _Parts} = lists:unzip(PMap),
- NodeParts = lists:flatmap(
- fun({Node,Part}) ->
- Partners = replication:partners(Node, lists:usort(Nodes)),
- PartnerList = [{Partner, Part, partner} || Partner <- Partners],
- [{Node, Part, primary} | PartnerList]
- end, PMap),
- NodeParts.
-
-
-%% @doc for the given key, return a list of {Node,Part} tuples. Nodes are both
-%% primary and replication partner nodes, and should number N.
-int_node_parts_for_key(Key) ->
- Config = configuration:get_config(),
- Hash = lib_misc:hash(Key),
- Part = partitions:hash_to_partition(Hash, Config#config.q),
- NodePartList = all_nodes_parts(true),
- lists:filter(fun({_N,P}) -> P =:= Part end, NodePartList).
-
-
-%% ets table helper functions
-ets_name(Node) ->
- list_to_atom(lists:concat(["mem_", atom_to_list(Node)])).
-
-
-update_ets(Table, #membership{partitions=PMap, fullmap=FullMap}) ->
- ets:insert(Table, {pmap, PMap}),
- ets:insert(Table, {fullmap, FullMap}),
- ok.
-
-
-ets_pmap() ->
- [{pmap, PMap}] = ets:lookup(ets_name(node()), pmap),
- PMap.
-
-
-ets_fullmap() ->
- [{fullmap, FullMap}] = ets:lookup(ets_name(node()), fullmap),
- FullMap.
diff --git a/src/node.erl b/src/node.erl
deleted file mode 100644
index 9a9c82c1..00000000
--- a/src/node.erl
+++ /dev/null
@@ -1,39 +0,0 @@
-%%%-------------------------------------------------------------------
-%%% File: node.erl
-%%% @author Cliff Moon <> []
-%%% @copyright 2009 Cliff Moon
-%%% @doc
-%%%
-%%% @end
-%%%
-%%% @since 2009-05-11 by Cliff Moon
-%%%-------------------------------------------------------------------
--module(node).
--author('cliff@powerset.com').
-
-%% API
--export([name/1, attributes/1]).
-
--include("../include/common.hrl").
-
-%% -ifdef(TEST).
-%% -include("../etest/node_test.erl").
-%% -endif.
-
-%%====================================================================
-%% API
-%%====================================================================
-
-name(Name) when is_atom(Name) ->
- Name;
-name(Node) when is_tuple(Node) ->
- element(1, Node);
-name(Node) ->
- Node.
-
-attributes(Name) when is_atom(Name) ->
- [];
-attributes(Node) when is_tuple(Node) ->
- element(2, Node);
-attributes(_) ->
- [].
diff --git a/src/replication.erl b/src/replication.erl
deleted file mode 100644
index 96be0ad3..00000000
--- a/src/replication.erl
+++ /dev/null
@@ -1,165 +0,0 @@
-%%%-------------------------------------------------------------------
-%%% File: replication.erl
-%%% @author Brad Anderson <brad@cloudant.com> [http://www.cloudant.com]
-%%% @copyright 2009 Brad Anderson
-%%% @doc
-%%%
-%%% @end
-%%%
-%%% @since 2009-06-14 by Brad Anderson
-%%%-------------------------------------------------------------------
--module(replication).
--author('brad@cloudant.com').
-
-%% API
--export([partners/2, partners/3, partners_plus/2]).
-
--include_lib("eunit/include/eunit.hrl").
--include("../include/config.hrl").
--include("../include/common.hrl").
-
-
-%%====================================================================
-%% API
-%%====================================================================
-
-partners(Node, Nodes) ->
- partners(Node, Nodes, configuration:get_config()).
-
-
-%%--------------------------------------------------------------------
-%% @spec partners(Node::atom(), Nodes::list(), Config::config()) ->
-%% list()
-%% @doc returns the list of all replication partners for the specified node
-%% @end
-%%--------------------------------------------------------------------
-partners(Node, Nodes, Config) ->
- N = Config#config.n,
- Meta = Config#config.meta,
- pick_partners(Meta, Node, Nodes, [], N - 1).
-
-
-%% return a list of live/up Partners, and if all Partners are down,
-%% walk the ring to get one other remote node and return it.
-partners_plus(Node, Nodes) ->
- Partners = partners(Node, Nodes),
- PartnersDown = lists:subtract(Partners, erlang:nodes()),
- PartnersUp = lists:subtract(Partners, PartnersDown),
- case PartnersUp of
- [] ->
- TargetNodes = target_list(Node, Nodes),
- NonPartners = lists:subtract(TargetNodes,
- lists:flatten([Node, Partners])),
- walk_ring(NonPartners);
- _ ->
- %% at least one partner is up, so gossip w/ them
- PartnersUp
- end.
-
-
-%%====================================================================
-%% Internal functions
-%%====================================================================
-
-%% @spec pick_partners(proplist(), Node::dynomite_node(), [Node], [Node],
-%% integer()) -> list()
-%% @doc iterate through N-1 partner picks, returning the resulting list sorted
-pick_partners(_Meta, Node, _Nodes, Acc, 0) ->
- lists:sort(lists:delete(Node, Acc));
-pick_partners(Meta, Node, Nodes, Acc, Count) ->
- Partner = pick_partner(Meta, Node, Nodes, Acc, 1),
- NewNodes = lists:filter(fun(Elem) ->
- case Elem of
- no_partner_found -> false;
- Partner -> false;
- _ -> true
- end
- end, Nodes),
- NewAcc = case Partner of
- no_partner_found -> Acc;
- _ -> [Partner|Acc]
- end,
- pick_partners(Meta, Node, NewNodes, NewAcc, Count-1).
-
-
-%% @spec pick_partner(proplist(), Node::dynomite_node(), [Node], [Node],
-%% integer()) -> Node::dynomite_node()
-%% @doc pick a specific replication partner at the given level
-pick_partner([], Node, Nodes, _Acc, 1) ->
- %% handle the no metadata situation
- %% Note: This clause must be before the Level > length(Meta) guarded clause
- target_key(node:name(Node), lists:map(fun node:name/1, Nodes), roundrobin);
-
-pick_partner(Meta, _Node, _Nodes, Acc, Level) when Level > length(Meta) ->
- Acc;
-
-pick_partner(Meta, Node, Nodes, Acc, Level) ->
- MetaDict = meta_dict(Nodes, Level, dict:new()),
- NodeKey = lists:sublist(node:attributes(Node), Level),
- Keys = dict:fetch_keys(MetaDict),
- {_MetaName, Strategy} = lists:nth(Level, Meta),
- TargetKey = target_key(NodeKey, Keys, Strategy),
- Candidates = dict:fetch(TargetKey, MetaDict),
- case length(Candidates) of
- 0 ->
- %% didn't find a candidate
- no_partner_found;
- 1 ->
- %% found only one candidate, return it
- [Partner] = Candidates,
- Partner;
- _ ->
- pick_partner(Meta, Node, Nodes, Acc, Level + 1)
- end.
-
-
-%% @doc construct a dict that holds the key of metadata values so far (up to
-%% the current level, and dynomite_node() list as the value. This is used
-%% to select a partner in pick_partner/5
-%% @end
-meta_dict([], _Level, Dict) ->
- Dict;
-
-meta_dict([Node|Rest], Level, Dict) ->
- Key = lists:sublist(node:attributes(Node), Level),
- DictNew = dict:append(Key, Node, Dict),
- meta_dict(Rest, Level, DictNew).
-
-
-%% @spec target_key(term(), list(), Strategy::atom()) -> term()
-%% @doc given the key and keys, sort the list of keys based on stragety (i.e.
-%% for roundrobin, sort them, put the NodeKey on the end of the list, and
-%% then return the head of the list as the target.
-%% @end
-%% TODO: moar strategies other than roundrobin?
-target_key(NodeKey, Keys, roundrobin) ->
- SortedKeys = lists:sort(Keys),
- TargetKey = case target_list(NodeKey, SortedKeys) of
- [] -> no_partner_found;
- [Key|_Rest] -> Key
- end,
- TargetKey.
-
-
-%% @spec target_list(term(), list()) -> list()
-%% @doc split the list of keys into 'lessthan NodeKey', NodeKey, and 'greaterthan
-%% Nodekey' and then put the lessthan section on the end of the list
-%% @end
-target_list(_NodeKey, []) ->
- [];
-target_list(NodeKey, Keys) ->
- {A, [NodeKey|B]} = lists:splitwith(fun(K) -> K /= NodeKey end, Keys),
- lists:append([B, A, [NodeKey]]).
-
-
-walk_ring([]) ->
- %% TODO: should we be more forceful here and throw? not for now
- showroom_log:message(info,
- "~p:walk_ring/1 - could not find node for gossip", [?MODULE]),
- [];
-
-walk_ring([Node|Rest]) ->
- case lists:member(Node, erlang:nodes()) of
- true -> [Node];
- _ -> walk_ring(Rest)
- end.