summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/Makefile11
-rw-r--r--src/bootstrap_manager.erl261
-rw-r--r--src/bootstrap_receiver.erl121
-rw-r--r--src/cluster_ops.erl282
-rw-r--r--src/configuration.erl99
-rw-r--r--src/dynomite.erl23
-rw-r--r--src/dynomite_app.erl145
-rw-r--r--src/dynomite_couch_api.erl140
-rw-r--r--src/dynomite_couch_storage.erl41
-rw-r--r--src/dynomite_http.erl21
-rw-r--r--src/dynomite_prof.erl164
-rw-r--r--src/dynomite_sup.erl85
-rw-r--r--src/lib_misc.erl235
-rw-r--r--src/mem_utils.erl129
-rw-r--r--src/membership2.erl686
-rw-r--r--src/node.erl39
-rw-r--r--src/partitions.erl334
-rw-r--r--src/replication.erl165
-rw-r--r--src/vector_clock.erl99
19 files changed, 3080 insertions, 0 deletions
diff --git a/src/Makefile b/src/Makefile
new file mode 100644
index 00000000..32aa1872
--- /dev/null
+++ b/src/Makefile
@@ -0,0 +1,11 @@
+include ../support/include.mk
+
+all: $(EBIN_FILES_NO_DOCS)
+
+doc: $(EBIN_FILES)
+
+debug:
+ $(MAKE) DEBUG=-DDEBUG
+
+clean:
+ rm -rf $(EBIN_FILES)
diff --git a/src/bootstrap_manager.erl b/src/bootstrap_manager.erl
new file mode 100644
index 00000000..f1303223
--- /dev/null
+++ b/src/bootstrap_manager.erl
@@ -0,0 +1,261 @@
+%%%-------------------------------------------------------------------
+%%% File: bootstrap_manager.erl
+%%% @author Cliff Moon <> []
+%%% @copyright 2009 Cliff Moon
+%%% @doc This is the bootstrap manager for a cluster.
+%%%
+%%% @end
+%%%
+%%% @since 2009-07-29 by Cliff Moon
+%%%-------------------------------------------------------------------
+-module(bootstrap_manager).
+-author('cliff@powerset.com').
+-author('brad@cloudant.com').
+
+-behaviour(gen_server).
+
+%% API
+-export([start_bootstrap/3, end_bootstrap/1,
+ start_link/3, start/3, stop/0,
+ start_transfers/0, transfers/0]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-record(state, {transfer_list, nodes, transfers, futurefullmap}).
+-record(transfer, {partition, receivers, rate=0, status=starting}).
+
+-include("../include/config.hrl").
+-include("../include/common.hrl").
+
+%%====================================================================
+%% API
+%%====================================================================
+%%--------------------------------------------------------------------
+%% @spec start_link() -> {ok,Pid} | ignore | {error,Error}
+%% @doc Starts the server
+%% @end
+%%--------------------------------------------------------------------
+start_bootstrap(State=#membership{node=Node, nodes=Nodes},
+ OldFullMap, NewFullMap) ->
+ case partitions:diff(OldFullMap, NewFullMap) of
+ [] ->
+ % no difference in pmaps
+ {NewFullMap, State#membership{fullmap=NewFullMap}};
+ TransferList when is_list(TransferList) ->
+ ?LOG_DEBUG("~nBootstrap~nNode : ~p~nTransferList :~n~p~n",
+ [Node, partitions:pp_diff(TransferList)]),
+ case start_link(TransferList, Nodes, NewFullMap) of
+ {ok, _Pid} ->
+ start_transfers();
+ Other -> throw(Other)
+ end,
+
+ % bootstrap has some stuff to do (async), so just give the state
+ % passed in for now. end_bootstrap will be called with the resulting
+ % state when it completes
+ {OldFullMap, State};
+ Other ->
+ % probably occurs b/c T (# of nodes) < N currently.
+ % more nodes joining should help avoid this error.
+ ?LOG_ERROR("no_bootstrap - Other: ~p", [Other]),
+ {NewFullMap, State#membership{fullmap=NewFullMap}}
+ end.
+
+
+end_bootstrap(#state{futurefullmap=FutureFullMap}) ->
+ end_bootstrap(FutureFullMap);
+
+end_bootstrap(NewFullMap) ->
+ gen_server:call(membership, {newfullmap, NewFullMap}),
+ stop().
+
+
+start(TransferList, Nodes, FutureFullMap) ->
+ gen_server:start({global, bootstrap_manager}, ?MODULE,
+ [TransferList, Nodes, FutureFullMap], []).
+
+
+start_link(TransferList, Nodes, FutureFullMap) ->
+ gen_server:start_link({global, bootstrap_manager}, ?MODULE,
+ [TransferList, Nodes, FutureFullMap], []).
+
+
+stop() ->
+ gen_server:cast({global, bootstrap_manager}, stop).
+
+
+start_transfers() ->
+ gen_server:cast({global, bootstrap_manager}, start_transfers).
+
+
+transfers() ->
+ gen_server:call({global, bootstrap_manager}, transfers).
+
+
+%%====================================================================
+%% gen_server callbacks
+%%====================================================================
+
+%%--------------------------------------------------------------------
+%% @spec init(Args) -> {ok, State} |
+%% {ok, State, Timeout} |
+%% ignore |
+%% {stop, Reason}
+%% @doc Initiates the server
+%% @end
+%%--------------------------------------------------------------------
+init([TransferList, Nodes, FutureFullMap]) ->
+ process_flag(trap_exit, true),
+ {ok, #state{transfer_list=TransferList,nodes=Nodes,
+ futurefullmap=FutureFullMap}}.
+
+
+%%--------------------------------------------------------------------
+%% @spec
+%% handle_call(Request, From, State) -> {reply, Reply, State} |
+%% {reply, Reply, State, Timeout} |
+%% {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, Reply, State} |
+%% {stop, Reason, State}
+%% @doc Handling call messages
+%% @end
+%%--------------------------------------------------------------------
+handle_call(average_transfer_rate, _From,
+ State=#state{transfers=Transfers}) ->
+ {Sum, Cardinality} = ets:foldl(
+ fun(#transfer{rate=Rate}, {Sum, Cardinality}) ->
+ {Sum+Rate,Cardinality+1}
+ end, {0, 0}, Transfers),
+ AverageRate = Sum / Cardinality,
+ {reply, AverageRate, State};
+
+handle_call(aggregate_transfer_rate, _From,
+ State=#state{transfers=Transfers}) ->
+ Sum = ets:foldl(fun(#transfer{rate=Rate}, Sum) ->
+ Rate + Sum
+ end, 0, Transfers),
+ {reply, Sum, State};
+
+handle_call(transfers, _From,
+ State=#state{transfers=Transfers}) ->
+ {reply, {ok, ets:tab2list(Transfers)}, State};
+
+%% at least reply that this 'catch-all' was ignored
+handle_call(_Request, _From, State) ->
+ {reply, ignored, State}.
+
+
+%%--------------------------------------------------------------------
+%% @spec handle_cast(Msg, State) -> {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State}
+%% @doc Handling cast messages
+%% @end
+%%--------------------------------------------------------------------
+handle_cast(stop, State) ->
+ {stop, normal, State};
+
+handle_cast(start_transfers,
+ State=#state{transfer_list=TransferList}) ->
+ Transfers = start_transfers(TransferList, State),
+ {noreply, State#state{transfers=Transfers}};
+
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+
+%%--------------------------------------------------------------------
+%% @spec handle_info(Info, State) -> {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State}
+%% @doc Handling all non call/cast messages
+%% @end
+%%--------------------------------------------------------------------
+
+handle_info({receiver_done, FromNode, _ToNode, Partition, DbName, Receiver},
+ State = #state{transfers=Transfers}) ->
+ %% TODO use bring_online & ToNode? instead of waiting until end & installing
+ %% NewFullMap into mem2
+
+ %% handle the old file
+ membership2:decommission_part(FromNode, Partition, DbName),
+
+ %% remove from Transfers table
+ case ets:lookup(Transfers, Partition) of
+ [Transfer] = [#transfer{receivers=Receivers}] ->
+ NewReceivers = lists:delete(Receiver, Receivers),
+ if
+ length(NewReceivers) == 0 -> ets:delete(Transfers, Partition);
+ true -> ets:insert(Transfers, Transfer#transfer{receivers=NewReceivers})
+ end;
+ _ -> ok
+ end,
+ case ets:first(Transfers) of
+ '$end_of_table' ->
+ end_bootstrap(State),
+ {noreply, State};
+ _ -> {noreply, State}
+ end;
+
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+
+%%--------------------------------------------------------------------
+%% @spec terminate(Reason, State) -> void()
+%% @doc This function is called by a gen_server when it is about to
+%% terminate. It should be the opposite of Module:init/1 and do any necessary
+%% cleaning up. When it returns, the gen_server terminates with Reason.
+%% The return value is ignored.
+%% @end
+%%--------------------------------------------------------------------
+terminate(_Reason, _State) ->
+ ok.
+
+
+%%--------------------------------------------------------------------
+%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState}
+%% @doc Convert process state when code is changed
+%% @end
+%%--------------------------------------------------------------------
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+
+%%--------------------------------------------------------------------
+%%% Internal functions
+%%--------------------------------------------------------------------
+start_transfers([], State) ->
+ no_transfers, % no diff in pmaps, so no transfers
+ end_bootstrap(State);
+
+start_transfers(Diff, State=#state{nodes=Nodes}) ->
+ case showroom_db:all_databases("") of
+ {ok, AllDbs} when length(AllDbs) > 0 ->
+ start_transfers(Diff, Nodes, configuration:get_config(), AllDbs,
+ ets:new(transfers, [public, set, {keypos, 2}]));
+ {ok, []} -> end_bootstrap(State); % no databases, so bootstrap not needed
+ Other -> throw(Other) % problem getting list of dbs
+ end.
+
+
+start_transfers([], _, _, _, Transfers) ->
+ Transfers;
+
+start_transfers([{FromNode, ToNode, Partition} | Diff], Nodes, Config,
+ AllDbs, Transfers) ->
+ membership2:take_offline(FromNode, Partition),
+ Receivers = lists:map(
+ fun(DbName) ->
+ {ok, Receiver} =
+ bootstrap_receiver:start_link(FromNode, ToNode, Partition,
+ DbName, 10000, self()),
+ Receiver
+ end, AllDbs),
+ % NOTE: by using AllDbs, we are omitting .deleted.couch files
+ ets:insert(Transfers, #transfer{partition=Partition,
+ receivers=Receivers}),
+ start_transfers(Diff, Nodes, Config, AllDbs, Transfers).
diff --git a/src/bootstrap_receiver.erl b/src/bootstrap_receiver.erl
new file mode 100644
index 00000000..3b4907cb
--- /dev/null
+++ b/src/bootstrap_receiver.erl
@@ -0,0 +1,121 @@
+%%%-------------------------------------------------------------------
+%%% File: bootstrap_receiver.erl
+%%% @author Brad Anderson <brad@cloudant.com>
+%%% @copyright 2009 Brad Anderson
+%%% @doc
+%%%
+%%% @end
+%%%
+%%% @since 2009-09-22 by Brad Anderson
+%%%-------------------------------------------------------------------
+-module(bootstrap_receiver).
+-author('brad@cloudant.com').
+
+-include("../include/config.hrl").
+-include("../include/common.hrl").
+
+%% API
+-export([start_link/6, loop/6, fetch_shard/5]).
+
+
+%%====================================================================
+%% API
+%%====================================================================
+%%--------------------------------------------------------------------
+%% @spec
+%% @doc
+%% @end
+%%--------------------------------------------------------------------
+start_link(FromNode, ToNode, Partition, DbName, Timeout, Manager) ->
+ Pid = proc_lib:spawn_link(ToNode, bootstrap_receiver, loop,
+ [FromNode, Partition, DbName, Timeout, Manager,
+ self()]),
+ sync_wait(Pid, Timeout).
+
+
+loop(FromNode, Partition, DbName, Timeout, Manager, Parent) ->
+ proc_lib:init_ack(Parent, {ok, self()}),
+ fetch_shard(FromNode, Partition, DbName, Timeout, Manager).
+
+
+%% @doc run at "ToNode" via spawn_link
+fetch_shard(FromNode, Partition, DbName, Timeout, Manager) ->
+ Directory = couch_config:get("couchdb", "database_dir"),
+ [_NodeName, Hostname] = string:tokens(atom_to_list(FromNode), "@"),
+ SrcFile = binary_to_list(partitions:shard_name(Partition, DbName)),
+ DestFile = showroom_utils:full_filename(Partition, DbName, Directory),
+ Authn = fetch_authn(),
+ Port = fetch_port(),
+ Url = lists:concat(["http://", Authn, Hostname, Port, "/", SrcFile,
+ ".couch"]),
+ Options = [{save_response_to_file, DestFile},
+ {inactivity_timeout, Timeout}],
+ case filelib:ensure_dir(DestFile) of
+ ok -> ok;
+ {error, eexist} -> ok; % duh!
+ Other -> throw(Other)
+ end,
+ ?LOG_DEBUG("~n"
+ "Directory: ~p~n"
+ "Hostname : ~p~n"
+ "SrcFile : ~p~n"
+ "DestFile : ~p~n"
+ "Url : ~p~n"
+ "Options : ~p~n"
+ , [Directory, Hostname, SrcFile, DestFile, Url, Options]),
+ case ibrowse:send_req(Url, [], get, [], Options, infinity) of
+ {ok, "200", _Headers, Body} ->
+ ?LOG_DEBUG("~nBootstrap ibrowse req Body: ~p~n", [Body]),
+ Manager ! {receiver_done, FromNode, node(), Partition, DbName,
+ self()};
+ Error ->
+ ?LOG_ERROR("~nBootstrap ibrowse req Error: ~p~n", [Error]),
+ throw(Error)
+ end.
+
+
+%%====================================================================
+%% Internal functions
+%%====================================================================
+
+
+%% from proc_lib.erl in otp r13b01
+sync_wait(Pid, Timeout) ->
+ receive
+ {ack, Pid, Return} ->
+ Return;
+ {'EXIT', Pid, Reason} ->
+ {error, Reason}
+ after Timeout ->
+ unlink(Pid),
+ exit(Pid, kill),
+ flush(Pid),
+ {error, timeout}
+ end.
+
+
+flush(Pid) ->
+ receive
+ {'EXIT', Pid, _} ->
+ true
+ after 0 ->
+ true
+ end.
+
+
+fetch_authn() ->
+ User = couch_config:get("shard_moving", "user", ""),
+ Pass = couch_config:get("shard_moving", "pass", ""),
+ if
+ length(User) > 0 andalso length(Pass) > 0 ->
+ lists:concat([User, ":", Pass, "@"]);
+ true -> ""
+ end.
+
+
+fetch_port() ->
+ Port = couch_config:get("shard_moving", "port", "8080"),
+ if
+ Port =:= "80" -> "";
+ true -> lists:concat([":", Port])
+ end.
diff --git a/src/cluster_ops.erl b/src/cluster_ops.erl
new file mode 100644
index 00000000..bd2ad83d
--- /dev/null
+++ b/src/cluster_ops.erl
@@ -0,0 +1,282 @@
+%%%-------------------------------------------------------------------
+%%% File: cluster_ops.erl
+%%% @author Brad Anderson <brad@cloudant.com> [http://cloudant.com]
+%%% @copyright 2009 Brad Anderson
+%%% @doc
+%%%
+%%% @end
+%%%
+%%% @since 2009-07-21 by Brad Anderson
+%%%-------------------------------------------------------------------
+-module(cluster_ops).
+-author('brad@cloudant.com').
+
+%% API
+-export([key_lookup/3, key_lookup/5,
+ all_parts/4,
+ some_parts/4, some_parts/5,
+ quorum_from_each_part/3]).
+
+-include("../include/common.hrl").
+-include("../include/config.hrl").
+
+-include("../include/profile.hrl").
+
+
+%%====================================================================
+%% API
+%%====================================================================
+
+%% @doc Get to the proper shard on N nodes by key lookup
+%%
+%% This fun uses quorum constants from config
+key_lookup(Key, {M,F,A}, Access) ->
+ {N,_R,_W} = Consts = unpack_config(configuration:get_config()),
+ Const = get_const(Access, Consts),
+ key_lookup(Key, {M,F,A}, Access, Const, N).
+
+
+%% @doc Get to the proper shard on N nodes by key lookup
+%%
+%% This fun uses a provided quorum constant, possibly from request,
+%% possibly from config
+key_lookup(Key, {M,F,A}, Access, Const, N) ->
+ NodeParts = membership2:nodeparts_for_key(Key),
+ {ResolveFun, NotFoundFun} = case Access of
+ r -> {fun resolve_read/1, fun resolve_not_found/2};
+ w -> {fun resolve_write/1, fun(_,_) -> {false, notused, []} end}
+ end,
+ MapFun = fun({Node,Part}) ->
+ try
+ rpc:call(Node, M, F, [[Part | A]])
+ catch Class:Exception ->
+ {error, Class, Exception}
+ end
+ end,
+ {GoodReplies, Bad} = pcall(MapFun, NodeParts, Const),
+ if length(Bad) > 0 -> ?LOG_DEBUG("~nBad: ~p~n", [Bad]); true -> ok end,
+ Good = lists:map(fun strip_ok/1, GoodReplies),
+ final_key_lookup(Good, Bad, N, Const, ResolveFun, NotFoundFun, Access).
+
+
+%% @doc Do op on all shards (and maybe even replication partners)
+all_parts({M,F,A}, Access, AndPartners, ResolveFun) ->
+ NodePartList = membership2:all_nodes_parts(AndPartners),
+ MapFun = fun({Node, Part}) ->
+ try
+ rpc:call(Node, M, F, [[Part | A]])
+ catch Class:Exception ->
+ {error, Class, Exception}
+ end
+ end,
+ Replies = ?PMAP(MapFun, NodePartList),
+ {Good, Bad} = lists:partition(fun valid/1, Replies),
+ final_all_parts(Good, Bad, length(NodePartList), ResolveFun, Access).
+
+
+%% @doc Do op on some shards, depending on list of keys sent in.
+%%
+%% This fun uses quorum constants from config
+some_parts(KeyFun, SeqsKVPairs, {M,F,A}, Access) ->
+ Const = get_const(Access),
+ some_parts(KeyFun, SeqsKVPairs, {M,F,A}, Access, Const).
+
+
+%% @doc Do op on some shards, depending on list of keys sent in.
+%%
+%% This fun uses a provided quorum constant, possibly from request,
+%% possibly from config
+some_parts(KeyFun, SeqsKVPairs, {M,F,A}, _Access, Const) ->
+ TaskFun = fun({{Node,Part}, Values}) ->
+ try
+ rpc:call(Node, M, F, [[Part | [Values | A]]])
+ catch Class:Exception ->
+ {error, Class, Exception}
+ end
+ end,
+
+ % get tasks per node that are part / values for that partition
+ DistTasks = get_dist_tasks(KeyFun, SeqsKVPairs),
+
+ % With the distributed tasklist in hand, do the tasks per partition.
+ % For each partition, do the work on all nodes/parts.
+ TaskReplies = ?PMAP(TaskFun, DistTasks),
+ {GoodReplies, Bad} = lists:partition(fun valid/1, TaskReplies),
+ if length(Bad) > 0 -> ?LOG_DEBUG("~nBad: ~p~n", [Bad]); true -> ok end,
+ Good = lists:map(fun strip_ok/1, GoodReplies),
+ final_some_parts(Good, Bad, Const).
+
+
+quorum_from_each_part({M,F,A}, Access, ResolveFun) ->
+ Const = get_const(Access),
+ {_, Parts} = lists:unzip(membership2:partitions()),
+ PartsMapFun = fun(Part) ->
+ Nodes = membership2:nodes_for_part(Part),
+ NodesMapFun = fun(Node) -> rpc:call(Node, M, F, [[Part | A]]) end,
+ {GoodReplies,BadReplies} = pcall(NodesMapFun, Nodes, Const),
+ Good1 = lists:map(fun strip_ok/1, GoodReplies),
+ Bad1 = case length(Good1) >= Const of
+ true -> [];
+ false -> BadReplies
+ end,
+ {Good1,Bad1}
+ end,
+ Results1 = ?PMAP(PartsMapFun, Parts),
+ {Good,Bad} = lists:foldl(fun({G,B}, {GAcc,BAcc}) ->
+ {lists:append(G,GAcc),lists:append(B,BAcc)}
+ end, {[],[]}, Results1),
+ if length(Bad) > 0 -> ?LOG_DEBUG("~nBad: ~p~n", [Bad]); true -> ok end,
+ final_quorum_from_each_part(Good, Bad, length(Parts), ResolveFun, Access).
+
+
+%%--------------------------------------------------------------------
+%% Internal functions
+%%--------------------------------------------------------------------
+
+final_key_lookup(Good, Bad, N, Const, ResolveFun, NotFoundFun, Access) ->
+ {NotFound, Return, Reasons} = NotFoundFun(Bad, Const),
+ if
+ length(Good) >= Const -> {ok, ResolveFun(Good)};
+ NotFound -> {ok, Return, Reasons};
+ true -> error_message(Good, Bad, N, Const, Access)
+ end.
+
+
+final_all_parts(Good, Bad, Total, ResolveFun, Access) ->
+ case length(Good) =:= Total of
+ true -> {ok, ResolveFun(Good)};
+ _ -> error_message(Good, Bad, Total, Total, Access)
+ end.
+
+
+final_some_parts(Good, _Bad, Const) ->
+ Good1 = lists:flatten(Good),
+ {Seqs, _} = lists:unzip(Good1),
+ {ResG,ResB} =
+ lists:foldl(
+ fun(Seq, {AccG,AccB}) ->
+ Vals = proplists:get_all_values(Seq, Good1),
+ case length(Vals) >= Const of
+ true -> {[{Seq, Vals}|AccG],AccB};
+ _ -> {AccG, [{Seq, Vals}|AccB]}
+ end
+ end, {[],[]}, lists:usort(Seqs)),
+ case length(ResB) of
+ 0 -> {ok, ResG};
+ _ -> {error, ResB}
+ end.
+
+
+final_quorum_from_each_part(Good, Bad, Total, ResolveFun, Access) ->
+ case length(Good) =:= Total of
+ true -> {ok, ResolveFun(Good)};
+ _ -> error_message(Good, Bad, Total, Total, Access)
+ end.
+
+
+resolve_read([First|Responses]) ->
+ case First of
+ not_found -> not_found;
+ _ -> lists:foldr(fun vector_clock:resolve/2, First, Responses)
+ end.
+
+
+resolve_write([First|Responses]) ->
+ case First of
+ not_found -> not_found;
+ _ -> lists:foldr(fun vector_clock:resolve/2, First, Responses)
+ end.
+
+
+resolve_not_found(Bad, R) ->
+ {NotFoundCnt, DeletedCnt, OtherReasons} =
+ lists:foldl(fun({Error,Reason}, {NotFoundAcc, DeletedAcc, ReasonAcc}) ->
+ case {Error,Reason} of
+ {not_found, {_Clock, [missing|_Rest]}} ->
+ {NotFoundAcc+1, DeletedAcc, ReasonAcc};
+ {not_found, {_Clock, [deleted|_Rest]}} ->
+ {NotFoundAcc, DeletedAcc+1, ReasonAcc};
+ _ ->
+ {NotFoundAcc, DeletedAcc, [Reason|ReasonAcc]}
+ end
+ end, {0, 0, []}, Bad),
+ % TODO: is the comparison to R good here, or should it be N-R?
+ if
+ NotFoundCnt >= R -> {true, {not_found, missing}, OtherReasons};
+ DeletedCnt >= R -> {true, {not_found, deleted}, OtherReasons};
+ true -> {false, other, OtherReasons}
+ end.
+
+
+error_message(Good, Bad, N, T, Access) ->
+ Msg = list_to_atom(lists:concat([atom_to_list(Access), "_quorum_not_met"])),
+ ?LOG_ERROR("~p~nSuccess on ~p of ~p servers. Needed ~p. Errors: ~w"
+ , [Msg, length(Good), N, T, Bad]),
+ [{error, Msg}, {good, Good}, {bad, Bad}].
+
+
+unpack_config(#config{n=N,r=R,w=W}) ->
+ {N, R, W}.
+
+
+pcall(MapFun, Servers, Const) ->
+ Replies = lib_misc:pmap(MapFun, Servers, Const),
+ lists:partition(fun valid/1, Replies).
+
+
+valid({ok, _}) -> true;
+valid(ok) -> true;
+valid(_) -> false.
+
+
+strip_ok({ok, Val}) -> Val;
+strip_ok(Val) -> Val.
+
+
+%% @spec get_dist_tasks(KeyFun::function(), KVPairs::list()) ->
+%% [{{Node::node(), Part::integer()}, SeqVals}]
+%% Type - ordered | ??
+%% SeqVals - [{Seq, Val}]
+%% @doc builds a distributed task list of nodes with a list of shard/values.
+%% This looks like a dict structure
+%% but is a list so we can use ?PMAP with the results
+%% @end
+get_dist_tasks(KeyFun, SeqsKVPairs) ->
+ %% loop thru SeqsKVPairs adding node/part to each
+ NPSV = lists:flatmap(
+ fun({Seq,KVPair}) ->
+ NodeParts = membership2:nodeparts_for_key(KeyFun(KVPair)),
+ lists:map(
+ fun(NodePart) ->
+ {NodePart, {Seq, KVPair}}
+ end, NodeParts)
+ end, SeqsKVPairs),
+ nodepart_values_list(NPSV).
+
+
+%% pile up the List by NodePart (like a dict)
+nodepart_values_list(List) ->
+ DistTasks =
+ lists:foldl(
+ fun(NodePart, AccIn) ->
+ Values = proplists:get_all_values(NodePart, List),
+ case length(Values) of
+ 0 -> AccIn;
+ _ -> [{NodePart, Values} | AccIn]
+ end
+ end, [], membership2:all_nodes_parts(true)),
+ % ?LOG_DEBUG("~nDistTasks: ~p~n", [DistTasks]),
+ DistTasks.
+
+
+get_const(Access) ->
+ get_const(Access, unpack_config(configuration:get_config())).
+
+
+get_const(Access, {_N,R,W}) ->
+ case Access of
+ r -> R;
+ w -> W;
+ r1 -> 1;
+ Other -> throw({bad_access_term, Other})
+ end.
diff --git a/src/configuration.erl b/src/configuration.erl
new file mode 100644
index 00000000..1caca5ec
--- /dev/null
+++ b/src/configuration.erl
@@ -0,0 +1,99 @@
+%%% -*- erlang-indent-level:2 -*-
+%%%-------------------------------------------------------------------
+%%% File: configuration.erl
+%%% @author Cliff Moon <cliff@powerset.com>
+%%% @author Brad Anderson <brad@cloudant.com>
+%%% @copyright 2008 Cliff Moon
+%%% @doc
+%%% This module keeps Dynomite source relatively unchanged, but
+%%% reads from couchdb config stuffs
+%%% @end
+%%%
+%%% @since 2008-07-18 by Cliff Moon
+%%%-------------------------------------------------------------------
+-module(configuration).
+-author('cliff@powerset.com').
+-author('brad@cloudant.com').
+
+%%-behaviour(gen_server).
+
+%% API
+-export([start_link/1, get_config/1, get_config/0, set_config/1, stop/0]).
+
+-include_lib("eunit/include/eunit.hrl").
+
+-include("../include/config.hrl").
+-include("../include/common.hrl").
+
+-define(SERVER, couch_config).
+-define(i2l(V), integer_to_list(V)).
+-define(l2i(V), list_to_integer(V)).
+
+
+%% -----------------------------------------------------------------
+%% API
+%% -----------------------------------------------------------------
+
+%% @doc starts couch_config gen_server if it's not already started
+start_link(DynomiteConfig) ->
+ couch_config:start_link([]),
+ set_config(DynomiteConfig).
+
+
+%% @doc get the config for a remote node
+get_config(Node) ->
+ ClusterConfig = rpc:call(Node, couch_config, get, ["cluster"]),
+ Directory = rpc:call(Node, couch_config, get, ["couchdb", "database_dir"]),
+ couch2dynomite_config(ClusterConfig, Directory).
+
+
+%% @doc get the config for the local node
+get_config() ->
+ get_config(node()).
+
+
+%% @doc given a Dynomite config record, put the values into the Couch config
+set_config(DynomiteConfig) ->
+ dynomite2couch_config(DynomiteConfig).
+
+
+%% @doc stop the config server (nothing to do until after couch_config refactor)
+stop() ->
+ couch_config:stop().
+
+
+%% -----------------------------------------------------------------
+%% Internal functions
+%% -----------------------------------------------------------------
+
+%% @doc turn a couch config proplist into a dynomite configuration record
+couch2dynomite_config(ClusterConfig, Directory) ->
+ Q = ?l2i(proplists:get_value("q", ClusterConfig, "3")),
+ R = ?l2i(proplists:get_value("r", ClusterConfig, "2")),
+ W = ?l2i(proplists:get_value("w", ClusterConfig, "1")),
+ N = ?l2i(proplists:get_value("n", ClusterConfig, "4")),
+ %% use couch's database_dir here, to avoid /tmp/data not existing
+ Webport = ?l2i(proplists:get_value("webport", ClusterConfig, "8080")),
+ Meta = proplists:get_value("meta", ClusterConfig, []),
+ StorageMod = proplists:get_value("storage_mod", ClusterConfig, []),
+ #config{q=Q, r=R, w=W, n=N, directory=Directory, web_port=Webport,
+ meta=Meta, storage_mod=StorageMod}.
+
+
+%% @doc workhorse for set_config/1 above
+dynomite2couch_config(DynomiteConfig) ->
+ couch_config:set("cluster", "q", ?i2l(DynomiteConfig#config.q), false),
+ couch_config:set("cluster", "r", ?i2l(DynomiteConfig#config.r), false),
+ couch_config:set("cluster", "w", ?i2l(DynomiteConfig#config.w), false),
+ couch_config:set("cluster", "n", ?i2l(DynomiteConfig#config.n), false),
+ couch_config:set("couchdb", "database_dir", DynomiteConfig#config.directory,
+ false),
+ couch_config:set("cluster", "webport",
+ case DynomiteConfig#config.web_port of
+ undefined -> "8080";
+ _ -> ?i2l(DynomiteConfig#config.web_port)
+ end, false),
+ couch_config:set("cluster", "meta", DynomiteConfig#config.meta, false),
+ couch_config:set("cluster", "storage_mod",
+ DynomiteConfig#config.storage_mod, false),
+ ok.
diff --git a/src/dynomite.erl b/src/dynomite.erl
new file mode 100644
index 00000000..1b9798c0
--- /dev/null
+++ b/src/dynomite.erl
@@ -0,0 +1,23 @@
+%%% @author Brad Anderson <brad@cloudant.com>
+%%% @doc convenience start/stop functions for Dynomite
+%%%
+-module(dynomite).
+-author('Brad Anderson <brad@cloudant.com>').
+
+-export([start/0, stop/0, restart/0]).
+
+
+%% @doc start Dynomite app with no args, for -s at the command-line
+start() ->
+ application:start(dynomite).
+
+
+%% @doc stops the Dynomite application
+stop() ->
+ application:stop(dynomite).
+
+
+%% @doc restart Dynomite app, with no args
+restart() ->
+ stop(),
+ start().
diff --git a/src/dynomite_app.erl b/src/dynomite_app.erl
new file mode 100644
index 00000000..6ee0b978
--- /dev/null
+++ b/src/dynomite_app.erl
@@ -0,0 +1,145 @@
+%%%-------------------------------------------------------------------
+%%% File: dynomite.erl
+%%% @author Cliff Moon <cliff@powerset.com> []
+%%% @copyright 2008 Cliff Moon
+%%% @doc
+%%%
+%%% @end
+%%%
+%%% @since 2008-06-27 by Cliff Moon
+%%%-------------------------------------------------------------------
+-module(dynomite_app).
+-author('cliff@powerset.com').
+-author('brad@cloudant.com').
+
+-behaviour(application).
+
+-include("../include/config.hrl").
+-include("../../couch/src/couch_db.hrl").
+
+%% Application callbacks
+-export([start/2, stop/1]).
+
+-define(APPS, [crypto,sasl,mochiweb]).
+-define(DEFAULT_CLUSTER_URL, "http://localhost:5984/_cluster").
+
+%%====================================================================
+%% Application callbacks
+%%====================================================================
+%%--------------------------------------------------------------------
+%% @spec start(Type, StartArgs) -> {ok, Pid} |
+%% {ok, Pid, State} |
+%% {error, Reason}
+%% @doc This function is called whenever an application
+%% is started using application:start/1,2, and should start the processes
+%% of the application. If the application is structured according to the
+%% OTP design principles as a supervision tree, this means starting the
+%% top supervisor of the tree.
+%% @end
+%%--------------------------------------------------------------------
+
+
+%% @doc start required apps, join cluster, start dynomite supervisor
+start(_Type, _StartArgs) ->
+ % get process_dict hack for startargs (i.e. not from .app file)
+ PdStartArgs = case erase(startargs) of
+ undefined ->
+ [];
+ Args ->
+ Args
+ end,
+
+ % start required apps
+ State = start_apps(),
+
+ % start dynomite supervisor
+ ok = start_node(),
+ case dynomite_sup:start_link(PdStartArgs) of
+ {ok, Supervisor} ->
+ {ok, Supervisor, State};
+ Error ->
+ Error
+ end.
+
+
+%%--------------------------------------------------------------------
+%% @spec stop(State) -> void()
+%% @doc This function is called whenever an application
+%% has stopped. It is intended to be the opposite of Module:start/2 and
+%% should do any necessary cleaning up. The return value is ignored.
+%% @end
+%%--------------------------------------------------------------------
+stop({_, Sup}) ->
+ showroom_log:message(alert, "dynomite application stopped", []),
+ exit(Sup, normal),
+ ok.
+
+
+%%====================================================================
+%% Internal functions
+%%====================================================================
+
+start_apps() ->
+ Fun = fun(App, AccIn) ->
+ Result = case application:start(App) of
+ ok ->
+ App;
+ {error, {already_started, App}} ->
+ nil;
+ _Error ->
+ exit(app_start_fail)
+ end,
+ if
+ Result =/= nil -> [App|AccIn];
+ true -> AccIn
+ end
+ end,
+ lists:foldl(Fun, [], ?APPS).
+
+
+%% @spec start_node() -> ok | {error, Reason}
+%% @doc start this node (join to dist. erlang cluster)
+start_node() ->
+ PingUrl = couch_config:get("cluster","ping", ?DEFAULT_CLUSTER_URL),
+ ?LOG_DEBUG("PingUrl: ~p", [PingUrl]),
+ Result = case get_pingnode(PingUrl, 1) of
+ {ok, PingNode} ->
+ join(PingNode);
+ _ ->
+ ?LOG_INFO("No pingnode found. Becoming single-node cluster", [])
+ end,
+ couch_api:create_db(<<"users">>, []), % all nodes have local 'users' db
+ Result.
+
+
+%% @spec get_pingnode(Url::string(), Retries::int()) -> node() |
+%% {error, Reason}
+%% @doc make a http get call to Url to get cluster information
+get_pingnode(Url, Retries) ->
+ try couch_rep_httpc:request(#http_db{url=Url, retries=Retries}) of
+ {[{<<"ping_node">>, Node}]} ->
+ {ok, list_to_atom(binary_to_list(Node))};
+ _ ->
+ {error, pingnode_not_found}
+ catch
+ _:_ ->
+ {error, pingnode_not_found}
+ end.
+
+
+join(PingNode) ->
+ if
+ node() =:= PingNode ->
+ ok; % we must be brain, so we'll take over the world
+ true ->
+ case net_adm:ping(PingNode) of
+ pong ->
+ % there is a cluster, we just joined it
+ ?LOG_DEBUG("ping successful, we're in.", []),
+ timer:sleep(1000); %% grr, what a hack, erlang. rly?
+ pang ->
+ ?LOG_ERROR("ping not successful.", []),
+ throw({cluster_error, ?l2b("cluster ping not successful")})
+ end
+ end,
+ ok.
diff --git a/src/dynomite_couch_api.erl b/src/dynomite_couch_api.erl
new file mode 100644
index 00000000..a5ad53c4
--- /dev/null
+++ b/src/dynomite_couch_api.erl
@@ -0,0 +1,140 @@
+%% This is a Dynomite plugin for calling the CouchDB raw Erlang API
+%%
+%% Most calls will have come from any of the web endpoints to execute
+%% these functions on the proper node for the key(s).
+
+-module(dynomite_couch_api).
+-author('brad@cloudant.com').
+
+-export([create_db/1, delete_db/1, get/1, put/1,
+ bulk_docs/1, missing_revs/1, get_db_info/1, get_view_group_info/1,
+ ensure_full_commit/1
+ ]).
+
+-include("../../couch/src/couch_db.hrl").
+-include("../include/common.hrl").
+
+
+%%--------------------------------------------------------------------
+%% @spec create_db([Part, DbName, Options]) -> {ok,Db} | {error,Error}
+%% Description: Creates the database shard.
+%%--------------------------------------------------------------------
+create_db([Part, DbName, Options]) ->
+ case couch_server:create(partitions:shard_name(Part, DbName), Options) of
+ {ok, Shard} ->
+ couch_db:close(Shard),
+ ok;
+ Error -> Error
+ end.
+
+
+%%--------------------------------------------------------------------
+%% @spec delete_db([Part, DbName, Options]) -> {ok,deleted} | {error,Error}
+%% Description: Deletes the database shard.
+%%--------------------------------------------------------------------
+delete_db([Part, DbName, Options]) ->
+ couch_server:delete(partitions:shard_name(Part, DbName), Options).
+
+
+get([Part, Db, DocId, Revs, Options]) ->
+ case showroom_db:open_shard(node(), Part, Db) of
+ {ok, Shard} ->
+ {Status, Doc} = couch_api:open_doc(Shard, DocId, Revs, Options),
+ showroom_db:close_shard(Shard),
+ {Status, {[], [Doc]}};
+ Error ->
+ Error
+ end.
+
+
+put([Part, Db, Doc = #doc{clock=Clock}, Options]) ->
+ case showroom_db:open_shard(node(), Part, Db) of
+ {ok, Shard} ->
+ {Status, NewRev} = couch_db:update_doc(Shard, Doc, Options),
+ showroom_db:close_shard(Shard),
+ {Status, {Clock, [NewRev]}};
+ Error ->
+ Error
+ end.
+
+
+bulk_docs([Part, SeqsDocs, Db, Options, Type]) ->
+ {Seqs, Docs} = lists:unzip(SeqsDocs),
+ case Docs of
+ [] -> {ok, []};
+ _ ->
+ case showroom_db:open_shard(node(), Part, Db) of
+ {ok, Shard} ->
+ {ok, Results1} = couch_db:update_docs(Shard, Docs, Options, Type),
+ showroom_db:close_shard(Shard),
+ Results = int_zip(Seqs, Results1),
+ {ok, Results};
+ Error ->
+ Error
+ end
+ end.
+
+
+missing_revs([Part, SeqsIdsRevs, Db]) ->
+ {_Seqs, IdsRevs} = lists:unzip(SeqsIdsRevs),
+ case IdsRevs of
+ [] -> {ok, []};
+ _ ->
+ case showroom_db:open_shard(node(), Part, Db) of
+ {ok, Shard} ->
+ {ok, Results1} = couch_db:get_missing_revs(Shard, IdsRevs),
+ showroom_db:close_shard(Shard),
+ {ok, Results1};
+ Error ->
+ Error
+ end
+ end.
+
+
+get_db_info([Part, Db]) ->
+ case showroom_db:open_shard(node(), Part, Db) of
+ {ok, Shard} ->
+ {Status, Info} = couch_db:get_db_info(Shard),
+ showroom_db:close_shard(Shard),
+ {Status, {[], Info}};
+ Error ->
+ Error
+ end.
+
+get_view_group_info([Part, Db, DesignId]) ->
+ case showroom_db:open_shard(node(), Part, Db) of
+ {ok, Shard} ->
+ {ok, EmptyGroup} = showroom_view:build_skeleton_view_group(Db, DesignId),
+ <<"S", ShardName/binary>> = Shard#db.name,
+ {ok, Pid} = gen_server:call(couch_view, {get_group_server,
+ ShardName, EmptyGroup}),
+ {ok, Info} = couch_view_group:request_group_info(Pid),
+ showroom_db:close_shard(Shard),
+ {ok, {[], Info}};
+ Error ->
+ Error
+ end.
+
+
+ensure_full_commit([Part, Db]) ->
+ case showroom_db:open_shard(node(), Part, Db) of
+ {ok, Shard} ->
+ {Status, Info} = couch_db:ensure_full_commit(Shard),
+ showroom_db:close_shard(Shard),
+ {Status, {[], Info}};
+ Error ->
+ Error
+ end.
+
+
+%% =======================
+%% internal
+%% =======================
+
+int_zip(Seqs, Docs) when length(Seqs) == length(Docs) ->
+ lists:zip(Seqs, Docs);
+int_zip(_Seqs, []) ->
+ [];
+int_zip(Seqs, Docs) ->
+ ?debugFmt("~nWTF? int_zip~nSeqs: ~p~nDocs: ~p~n", [Seqs, Docs]),
+ [].
diff --git a/src/dynomite_couch_storage.erl b/src/dynomite_couch_storage.erl
new file mode 100644
index 00000000..4fd21b80
--- /dev/null
+++ b/src/dynomite_couch_storage.erl
@@ -0,0 +1,41 @@
+%%%-------------------------------------------------------------------
+%%% File: dynomite_couch_storage.erl
+%%% @author Brad Anderson
+%%% @copyright 2009 Brad Anderson
+%%% @doc
+%%%
+%%% @end
+%%%
+%%% @since 2009-07-14
+%%%-------------------------------------------------------------------
+-module(dynomite_couch_storage).
+-author('brad@cloudant.com').
+
+%% API
+-export([name/1, open/2, close/1, create/2]).
+%% , close/1, get/2, put/4, has_key/2, delete/2, fold/3
+
+-include_lib("../include/common.hrl").
+
+%% -record(row, {key, context, values}).
+
+%%====================================================================
+%% API
+%%====================================================================
+
+name(Boundary) ->
+ showroom_utils:int_to_hexstr(Boundary).
+
+open(Directory, Name) ->
+%% ?debugFmt("~nDirectory: ~p~nName : ~p~n", [Directory,Name]),
+ {ok, {Directory, Name}}.
+
+close(_Table) -> ok.
+
+create(_Directory, _Name) ->
+ ok.
+
+
+%%====================================================================
+%% Internal functions
+%%====================================================================
diff --git a/src/dynomite_http.erl b/src/dynomite_http.erl
new file mode 100644
index 00000000..8b6f7fbb
--- /dev/null
+++ b/src/dynomite_http.erl
@@ -0,0 +1,21 @@
+%%%-------------------------------------------------------------------
+%%% File : dynomite_http.erl
+%%% Author : Brad Anderson <brad@cloudant.com>
+%%% Description :
+%%%
+%%% Created : 10 Jan 2010 by Brad Anderson <brad@cloudant.com>
+%%%-------------------------------------------------------------------
+-module(dynomite_http).
+-author('Brad Anderson <brad@cloudant.com>').
+
+-include("../couch/src/couch_db.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+-export([handle_cluster_info/1]).
+
+
+%% GET /_cluster
+handle_cluster_info(#httpd{method='GET', path_parts=[_]}=Req) ->
+ ClusterInfo = [{<<"ping_node">>, ?l2b(atom_to_list(node()))}],
+ showroom_log:message(info, "Cluster Info: ~p", [ClusterInfo]),
+ couch_httpd:send_json(Req, {ClusterInfo}).
diff --git a/src/dynomite_prof.erl b/src/dynomite_prof.erl
new file mode 100644
index 00000000..80c4b5b7
--- /dev/null
+++ b/src/dynomite_prof.erl
@@ -0,0 +1,164 @@
+%%%-------------------------------------------------------------------
+%%% File: dynomite_prof.erl
+%%% @author Cliff Moon <> []
+%%% @copyright 2009 Cliff Moon
+%%% @doc
+%%%
+%%% @end
+%%%
+%%% @since 2009-02-15 by Cliff Moon
+%%%-------------------------------------------------------------------
+-module(dynomite_prof).
+-author('cliff@powerset.com').
+
+-behaviour(gen_server).
+
+%% API
+-export([start_link/0, start_prof/1, stop_prof/1, stats/1, averages/0, balance_prof/0]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-record(state, {ets,balance}).
+
+-record(profile, {name, count, sum}).
+
+%%====================================================================
+%% API
+%%====================================================================
+%%--------------------------------------------------------------------
+%% @spec start_link() -> {ok,Pid} | ignore | {error,Error}
+%% @doc Starts the server
+%% @end
+%%--------------------------------------------------------------------
+start_link() ->
+ gen_server:start_link({local, dynomite_prof}, ?MODULE, [], []).
+
+stats(Id) ->
+ gen_server:call(dynomite_prof, {stats, Id}).
+
+balance_prof() ->
+ gen_server:cast(dynomite_prof, {balance, self(), lib_misc:now_float()}).
+
+start_prof(Id) ->
+ gen_server:cast(dynomite_prof, {start, self(), Id, lib_misc:now_float()}).
+
+stop_prof(Id) ->
+ gen_server:cast(dynomite_prof, {stop, self(), Id, lib_misc:now_float()}).
+
+averages() ->
+ gen_server:call(dynomite_prof, averages).
+
+%%====================================================================
+%% gen_server callbacks
+%%====================================================================
+
+%%--------------------------------------------------------------------
+%% @spec init(Args) -> {ok, State} |
+%% {ok, State, Timeout} |
+%% ignore |
+%% {stop, Reason}
+%% @doc Initiates the server
+%% @end
+%%--------------------------------------------------------------------
+init([]) ->
+ Tid = ets:new(profiling, [set, {keypos, 2}]),
+ Bal = ets:new(balance, [set]),
+ {ok, #state{ets=Tid, balance=Bal}}.
+
+%%--------------------------------------------------------------------
+%% @spec
+%% handle_call(Request, From, State) -> {reply, Reply, State} |
+%% {reply, Reply, State, Timeout} |
+%% {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, Reply, State} |
+%% {stop, Reason, State}
+%% @doc Handling call messages
+%% @end
+%%--------------------------------------------------------------------
+handle_call({stats, Id}, _From, State = #state{ets=Ets}) ->
+ Reply = ets:lookup(Ets, Id),
+ {reply, Reply, State};
+
+handle_call(table, _From, State = #state{ets=Ets}) ->
+ {reply, Ets, State};
+
+handle_call(averages, _From, State = #state{ets=Ets,balance=Bal}) ->
+ Avgs = ets:foldl(fun(#profile{name=Name,count=Count,sum=Sum}, List) ->
+ [{Name, Sum/Count}|List]
+ end, [], Ets),
+ {_, MaxCount} = ets:foldl(fun
+ ({Pid, Count}, {_P, M}) when Count > M -> {Pid, Count};
+ (_, {P, M}) -> {P, M}
+ end, {pid, 0}, Bal),
+ Balances = ets:foldl(fun({Pid, Count}, List) ->
+ [{Pid, Count / MaxCount} | List]
+ end, [], Bal),
+ {reply, [Balances, Avgs], State}.
+
+%%--------------------------------------------------------------------
+%% @spec handle_cast(Msg, State) -> {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State}
+%% @doc Handling cast messages
+%% @end
+%%--------------------------------------------------------------------
+handle_cast({balance, Pid, _Time}, State = #state{balance=Ets}) ->
+ case ets:lookup(Ets, Pid) of
+ [] -> ets:insert(Ets, {Pid, 1});
+ [{Pid, Count}] -> ets:insert(Ets, {Pid, Count+1})
+ end,
+ {noreply, State};
+
+handle_cast({start, Pid, Id, Time}, State = #state{ets=_Ets}) ->
+ put({Pid,Id}, Time),
+ {noreply, State};
+
+handle_cast({stop, Pid, Id, Time}, State = #state{ets=Ets}) ->
+ case get({Pid, Id}) of
+ undefined -> ok;
+ OldTime ->
+ erase({Pid, Id}),
+ increment_time(Ets, Time-OldTime, Id)
+ end,
+ {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% @spec handle_info(Info, State) -> {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State}
+%% @doc Handling all non call/cast messages
+%% @end
+%%--------------------------------------------------------------------
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% @spec terminate(Reason, State) -> void()
+%% @doc This function is called by a gen_server when it is about to
+%% terminate. It should be the opposite of Module:init/1 and do any necessary
+%% cleaning up. When it returns, the gen_server terminates with Reason.
+%% The return value is ignored.
+%% @end
+%%--------------------------------------------------------------------
+terminate(_Reason, _State) ->
+ ok.
+
+%%--------------------------------------------------------------------
+%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState}
+%% @doc Convert process state when code is changed
+%% @end
+%%--------------------------------------------------------------------
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%--------------------------------------------------------------------
+%%% Internal functions
+%%--------------------------------------------------------------------
+increment_time(Ets, Time, Id) ->
+ case ets:lookup(Ets, Id) of
+ [] -> ets:insert(Ets, #profile{name=Id,count=1,sum=Time});
+ [#profile{name=Id,count=Count,sum=Sum}] -> ets:insert(Ets, #profile{name=Id,count=Count+1,sum=Sum+Time})
+ end.
diff --git a/src/dynomite_sup.erl b/src/dynomite_sup.erl
new file mode 100644
index 00000000..f8136934
--- /dev/null
+++ b/src/dynomite_sup.erl
@@ -0,0 +1,85 @@
+%%%-------------------------------------------------------------------
+%%% File: dynomite_sup.erl
+%%% @author Cliff Moon <cliff@powerset.com> []
+%%% @copyright 2008 Cliff Moon
+%%% @doc
+%%%
+%%% @end
+%%%
+%%% @since 2008-06-27 by Cliff Moon
+%%%-------------------------------------------------------------------
+-module(dynomite_sup).
+-author('cliff@powerset.com').
+
+-behaviour(supervisor).
+
+%% API
+-export([start_link/1]).
+
+%% Supervisor callbacks
+-export([init/1]).
+
+-include("../include/config.hrl").
+
+-define(SERVER, ?MODULE).
+
+%%====================================================================
+%% API functions
+%%====================================================================
+%%--------------------------------------------------------------------
+%% @spec start_link() -> {ok,Pid} | ignore | {error,Error}
+%% @doc Starts the supervisor
+%% @end
+%%--------------------------------------------------------------------
+start_link(Hints) ->
+ supervisor:start_link(?MODULE, [Hints]).
+
+%%====================================================================
+%% Supervisor callbacks
+%%====================================================================
+%%--------------------------------------------------------------------
+%% @spec init(Args) -> {ok, {SupFlags, [ChildSpec]}} |
+%% ignore |
+%% {error, Reason}
+%% @doc Whenever a supervisor is started using
+%% supervisor:start_link/[2,3], this function is called by the new process
+%% to find out about restart strategy, maximum restart frequency and child
+%% specifications.
+%% @end
+%%--------------------------------------------------------------------
+init(Args) ->
+ Node = node(),
+ Nodes = running_nodes() ++ [node()],
+ Membership = {membership,
+ {membership2, start_link, [Node, Nodes, Args]},
+ permanent,
+ 1000,
+ worker,
+ [membership2]},
+ MemEventMgr = {mem_event_manager,
+ {gen_event, start_link, [{local, membership_events}]},
+ permanent,
+ 1000,
+ worker,
+ []},
+ {ok, {{one_for_one,10,1}, [Membership, MemEventMgr]}}.
+
+
+%%====================================================================
+%% Internal functions
+%%====================================================================
+
+%% @doc get a list of running nodes visible to this local node
+running_nodes() ->
+ [Node || Node <- nodes([this,visible]), running(Node)].
+
+%% @doc monitor the membership server on Node from here
+running(Node) ->
+ Ref = erlang:monitor(process, {membership, Node}),
+ R = receive
+ {'DOWN', Ref, _, _, _} -> false
+ after 1 ->
+ true
+ end,
+ erlang:demonitor(Ref),
+ R.
diff --git a/src/lib_misc.erl b/src/lib_misc.erl
new file mode 100644
index 00000000..f5449295
--- /dev/null
+++ b/src/lib_misc.erl
@@ -0,0 +1,235 @@
+-module(lib_misc).
+
+-define(OFFSET_BASIS, 2166136261).
+-define(FNV_PRIME, 16777619).
+
+-export([rm_rf/1, pmap/3, succ/1, fast_acc/3, hash/1, hash/2, fnv/1,
+ nthdelete/2, zero_split/1, nthreplace/3, rand_str/1, position/2,
+ shuffle/1, floor/1, ceiling/1, time_to_epoch_int/1,
+ time_to_epoch_float/1, now_int/0, now_float/0, byte_size/1, listify/1,
+ reverse_bits/1]).
+
+-include("../include/config.hrl").
+-include("../include/profile.hrl").
+
+
+rm_rf(Name) when is_list(Name) ->
+ case filelib:is_dir(Name) of
+ false ->
+ file:delete(Name);
+ true ->
+ case file:list_dir(Name) of
+ {ok, Filenames} ->
+ lists:foreach(fun rm_rf/1, [ filename:join(Name, F) || F <- Filenames]),
+ file:del_dir(Name);
+ {error, Reason} -> error_logger:info_msg("rm_rf failed because ~p~n", [Reason])
+ end
+ end.
+
+zero_split(Bin) ->
+ zero_split(0, Bin).
+
+zero_split(N, Bin) when N > erlang:byte_size(Bin) -> Bin;
+
+zero_split(N, Bin) ->
+ case Bin of
+ <<_:N/binary, 0:8, _/binary>> -> split_binary(Bin, N);
+ _ -> zero_split(N+1, Bin)
+ end.
+
+rand_str(N) ->
+ lists:map(fun(_I) ->
+ random:uniform(26) + $a - 1
+ end, lists:seq(1,N)).
+
+nthreplace(N, E, List) ->
+ lists:sublist(List, N-1) ++ [E] ++ lists:nthtail(N, List).
+
+nthdelete(N, List) ->
+ nthdelete(N, List, []).
+
+nthdelete(0, List, Ret) ->
+ lists:reverse(Ret) ++ List;
+
+nthdelete(_, [], Ret) ->
+ lists:reverse(Ret);
+
+nthdelete(1, [_E|L], Ret) ->
+ nthdelete(0, L, Ret);
+
+nthdelete(N, [E|L], Ret) ->
+ nthdelete(N-1, L, [E|Ret]).
+
+floor(X) ->
+ T = erlang:trunc(X),
+ case (X - T) of
+ Neg when Neg < 0 -> T - 1;
+ Pos when Pos > 0 -> T;
+ _ -> T
+ end.
+
+ceiling(X) ->
+ T = erlang:trunc(X),
+ case (X - T) of
+ Neg when Neg < 0 -> T;
+ Pos when Pos > 0 -> T + 1;
+ _ -> T
+ end.
+
+succ([]) ->
+ [];
+
+succ(Str) ->
+ succ_int(lists:reverse(Str), []).
+
+succ_int([Char|Str], Acc) ->
+ if
+ Char >= $z -> succ_int(Str, [$a|Acc]);
+ true -> lists:reverse(lists:reverse([Char+1|Acc]) ++ Str)
+ end.
+
+fast_acc(_, Acc, 0) -> Acc;
+
+fast_acc(Fun, Acc, N) ->
+ fast_acc(Fun, Fun(Acc), N-1).
+
+shuffle(List) when is_list(List) ->
+ [ N || {_R,N} <- lists:keysort(1, [{random:uniform(),X} || X <- List]) ].
+
+pmap(Fun, List, ReturnNum) ->
+ N = if
+ ReturnNum > length(List) -> length(List);
+ true -> ReturnNum
+ end,
+ SuperParent = self(),
+ SuperRef = erlang:make_ref(),
+ Ref = erlang:make_ref(),
+ %% we spawn an intermediary to collect the results
+ %% this is so that there will be no leaked messages sitting in our mailbox
+ Parent = spawn(fun() ->
+ L = gather(N, length(List), Ref, []),
+ SuperParent ! {SuperRef, pmap_sort(List, L)}
+ end),
+ Pids = [spawn(fun() ->
+ Parent ! {Ref, {Elem, (catch Fun(Elem))}}
+ end) || Elem <- List],
+ Ret = receive
+ {SuperRef, Ret1} -> Ret1
+ end,
+ % i think we need to cleanup here.
+ lists:foreach(fun(P) -> exit(P, die) end, Pids),
+ Ret.
+
+pmap_sort(Original, Results) ->
+ pmap_sort([], Original, lists:reverse(Results)).
+
+% pmap_sort(Sorted, [], _) -> lists:reverse(Sorted);
+pmap_sort(Sorted, _, []) -> lists:reverse(Sorted);
+pmap_sort(Sorted, [E|Original], Results) ->
+ case lists:keytake(E, 1, Results) of
+ {value, {E, Val}, Rest} -> pmap_sort([Val|Sorted], Original, Rest);
+ false -> pmap_sort(Sorted, Original, Results)
+ end.
+
+gather(_, Max, _, L) when length(L) == Max -> L;
+gather(0, _, _, L) -> L;
+gather(N, Max, Ref, L) ->
+ receive
+ {Ref, {Elem, {not_found, Ret}}} -> gather(N, Max, Ref, [{Elem, {not_found, Ret}}|L]);
+ {Ref, {Elem, {badrpc, Ret}}} -> gather(N, Max, Ref, [{Elem, {badrpc, Ret}}|L]);
+ {Ref, {Elem, {'EXIT', Ret}}} -> gather(N, Max, Ref, [{Elem, {'EXIT', Ret}}|L]);
+ {Ref, Ret} -> gather(N-1, Max, Ref, [Ret|L])
+ end.
+
+get_hash_module(#config{hash_module=HashModule}) ->
+ HashModule.
+
+hash(Term) ->
+ HashModule = get_hash_module(configuration:get_config()),
+ ?prof(hash),
+ R = HashModule:hash(Term),
+ ?forp(hash),
+ R.
+
+hash(Term, Seed) ->
+ HashModule = get_hash_module(configuration:get_config()),
+ ?prof(hash),
+ R = HashModule:hash(Term, Seed),
+ ?forp(hash),
+ R.
+
+%32 bit fnv. magic numbers ahoy
+fnv(Term) when is_binary(Term) ->
+ fnv_int(?OFFSET_BASIS, 0, Term);
+
+fnv(Term) ->
+ fnv_int(?OFFSET_BASIS, 0, term_to_binary(Term)).
+
+fnv_int(Hash, ByteOffset, Bin) when erlang:byte_size(Bin) == ByteOffset ->
+ Hash;
+
+fnv_int(Hash, ByteOffset, Bin) ->
+ <<_:ByteOffset/binary, Octet:8, _/binary>> = Bin,
+ Xord = Hash bxor Octet,
+ fnv_int((Xord * ?FNV_PRIME) rem (2 bsl 31), ByteOffset+1, Bin).
+
+position(Predicate, List) when is_function(Predicate) ->
+ position(Predicate, List, 1);
+
+position(E, List) ->
+ position(E, List, 1).
+
+position(Predicate, [], _N) when is_function(Predicate) -> false;
+
+position(Predicate, [E|List], N) when is_function(Predicate) ->
+ case Predicate(E) of
+ true -> N;
+ false -> position(Predicate, List, N+1)
+ end;
+
+position(_, [], _) -> false;
+
+position(E, [E|_List], N) -> N;
+
+position(E, [_|List], N) -> position(E, List, N+1).
+
+now_int() ->
+ time_to_epoch_int(now()).
+
+now_float() ->
+ time_to_epoch_float(now()).
+
+time_to_epoch_int(Time) when is_integer(Time) or is_float(Time) ->
+ Time;
+
+time_to_epoch_int({Mega,Sec,_}) ->
+ Mega * 1000000 + Sec.
+
+time_to_epoch_float(Time) when is_integer(Time) or is_float(Time) ->
+ Time;
+
+time_to_epoch_float({Mega,Sec,Micro}) ->
+ Mega * 1000000 + Sec + Micro / 1000000.
+
+byte_size(List) when is_list(List) ->
+ lists:foldl(fun(El, Acc) -> Acc + lib_misc:byte_size(El) end, 0, List);
+
+byte_size(Term) ->
+ erlang:byte_size(Term).
+
+listify(List) when is_list(List) ->
+ List;
+
+listify(El) -> [El].
+
+reverse_bits(V) when is_integer(V) ->
+ % swap odd and even bits
+ V1 = ((V bsr 1) band 16#55555555) bor (((V band 16#55555555) bsl 1) band 16#ffffffff),
+ % swap consecutive pairs
+ V2 = ((V1 bsr 2) band 16#33333333) bor (((V1 band 16#33333333) bsl 2) band 16#ffffffff),
+ % swap nibbles ...
+ V3 = ((V2 bsr 4) band 16#0F0F0F0F) bor (((V2 band 16#0F0F0F0F) bsl 4) band 16#ffffffff),
+ % swap bytes
+ V4 = ((V3 bsr 8) band 16#00FF00FF) bor (((V3 band 16#00FF00FF) bsl 8) band 16#ffffffff),
+ % swap 2-byte long pairs
+ ((V4 bsr 16) band 16#ffffffff) bor ((V4 bsl 16) band 16#ffffffff).
diff --git a/src/mem_utils.erl b/src/mem_utils.erl
new file mode 100644
index 00000000..ffefd5cb
--- /dev/null
+++ b/src/mem_utils.erl
@@ -0,0 +1,129 @@
+-module(mem_utils).
+
+-export([fix_mappings/3, get_remote_fullmap/1, join_type/3, pmap_from_full/1,
+ nodeparts_up/1, remove_partition/3, use_persistent/2,
+ was_i_nodedown/2]).
+
+-include("../include/common.hrl").
+
+join_type(Node, Fullmap, Options) ->
+ case proplists:get_value(replace, Options) of
+ undefined ->
+ case lists:filter(fun({N,_P,_T}) -> N =:= Node end, Fullmap) of
+ [] -> new;
+ _ -> rejoin
+ end;
+ OldNode when is_atom(OldNode) ->
+ % not a particularly strong guard, but will have to do
+ {replace, OldNode};
+ _ -> new
+ end.
+
+
+%% @doc return a {PMap, Fullmap} tuple that has corrections for
+%% down, rejoining, or replacing Node
+fix_mappings(nodedown, Node, OldFullmap) ->
+ fix_mappings_fold(fun({N,P,T}, AccIn) ->
+ case {N,T} of
+ {Node, {nodedown, Type}} ->
+ % already marked as nodedown, so leave it
+ [{N,P, {nodedown, Type}} | AccIn];
+ {Node, _} ->
+ % mark it as nodedown
+ [{N,P, {nodedown, T}} | AccIn];
+ _ -> [{N,P,T} | AccIn]
+ end
+ end, [], OldFullmap);
+
+fix_mappings(rejoin, Node, OldFullmap) ->
+ fix_mappings_fold(fun({N,P,{nodedown,T}}, AccIn) when N =:= Node ->
+ [{N,P,T} | AccIn];
+ (NPT, AccIn) -> [NPT | AccIn]
+ end, [], OldFullmap);
+
+fix_mappings(replace, {OldNode, NewNode}, OldFullmap) ->
+ fix_mappings_fold(fun({N,P,T}, AccIn) ->
+ case {N, T} of
+ {OldNode, {nodedown,T1}} -> [{NewNode,P,T1} | AccIn];
+ {OldNode, _} -> [{NewNode,P,T} | AccIn];
+ _ -> [{N,P,T} | AccIn]
+ end
+ end, [], OldFullmap).
+
+
+fix_mappings_fold(Fun, Acc0, OldFullmap) ->
+ NewFullmap = lists:foldl(Fun, Acc0, OldFullmap),
+ NewPMap = pmap_from_full(NewFullmap),
+ {NewPMap, NewFullmap}.
+
+
+%% @doc create a PMap (primary nodes only) from provided Fullmap
+%% If a primary node is down, a partner will be supplied
+pmap_from_full(Fullmap) ->
+ NodePartList = nodeparts_up(Fullmap),
+ lists:keysort(2,lists:foldl(fun({N,P,T}, AccIn) ->
+ case T of
+ primary -> [{N,P} | AccIn];
+ {nodedown, primary} ->
+ NewNode = case lists:delete(N,
+ membership2:nodes_for_part(P, NodePartList)) of
+ [First|_] -> First;
+ [] -> N % wtf, are all partners down too?
+ end,
+ [{NewNode,P} | AccIn];
+ _ -> AccIn
+ end
+ end, [], Fullmap)).
+
+
+nodeparts_up(Fullmap) ->
+ lists:foldl(fun({_N,_P,{nodedown,_}}, AccIn) -> AccIn;
+ ({N,P,_T}, AccIn) -> [{N,P} | AccIn]
+ end, [], Fullmap).
+
+
+
+%% @doc if Node is in the Fullmap as {nodedown,_} return true
+was_i_nodedown(Node, Fullmap) ->
+ lists:member(yes, lists:map(fun({N,_P,{nodedown,_T}}) ->
+ case N of
+ Node -> yes;
+ _ -> no
+ end;
+ (_) -> no
+ end, Fullmap)).
+
+
+remove_partition(FullMap, Node, Partition) ->
+ case lists:filter(
+ fun({N,P,_Type}) -> N =:= Node andalso P =:= Partition end,
+ FullMap) of
+ [Elem|_] ->
+ lists:delete(Elem, FullMap);
+ Other ->
+ ?LOG_ERROR("~nNo partition to remove: ~p~n"
+ "Node: ~p~nPartition: ~p~n", [Other, Node, Partition]),
+ FullMap
+ end.
+
+
+use_persistent(_PartnersPlus, undefined) ->
+ false;
+
+use_persistent(PartnersPlus, _PersistentParts) ->
+ % get a fullmap from a partner
+ % this may need rework for network partitions, as you could get a bad
+ % fullmap from another node that was partitioned w/ this one :\
+ RemoteFullmap = get_remote_fullmap(PartnersPlus),
+ % return opposite of was_i_nodedown
+ not mem_utils:was_i_nodedown(node(), RemoteFullmap).
+
+
+get_remote_fullmap([]) ->
+ []; % no remote fullmap available, so return empty list
+
+get_remote_fullmap([Node|Rest]) ->
+ case gen_server:call({membership, Node}, fullmap) of
+ {ok, Fullmap} -> Fullmap;
+ _ -> get_remote_fullmap(Rest)
+ end.
diff --git a/src/membership2.erl b/src/membership2.erl
new file mode 100644
index 00000000..4c4780c3
--- /dev/null
+++ b/src/membership2.erl
@@ -0,0 +1,686 @@
+%%%-------------------------------------------------------------------
+%%% File: membership2.erl
+%%% @author Cliff Moon <cliff@powerset.com> []
+%%% @copyright 2009 Cliff Moon
+%%% @doc
+%%%
+%%% @end
+%%%
+%%% @since 2009-05-04 by Cliff Moon
+%%%-------------------------------------------------------------------
+-module(membership2).
+-author('cliff@powerset.com').
+-author('brad@cloudant.com').
+
+-behaviour(gen_server).
+
+%% API
+-export([start_link/2, start_link/3, stop/1, check_nodes/0,
+ partitions/0, partition_for_key/1, fullmap/0,
+ all_nodes_parts/1, clock/0,
+ nodes/0, nodeparts_for_key/1, nodes_for_part/1, nodes_for_part/2,
+ nodes_for_shard/1, nodes_down/0,
+ parts_for_node/1,
+ take_offline/2, bring_online/2,
+ decommission_part/3, pp_fullmap/0, snafu/1, snafu/3]).
+
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+%% includes
+-include("../include/config.hrl").
+-include("../include/common.hrl").
+-include("../include/profile.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+%%====================================================================
+%% API
+%%====================================================================
+%% @doc Starts the server
+%% @end
+%%--------------------------------------------------------------------
+
+start_link(Node, Nodes) ->
+ start_link(Node, Nodes, []).
+
+
+start_link(Node, Nodes, Args) ->
+ gen_server:start_link({local, membership}, ?MODULE, [Node, Nodes, Args], []).
+
+
+stop(Server) ->
+ gen_server:cast(Server, stop).
+
+
+%% @doc for when things have really gone south. Install a new state on all
+%% nodes, given a filename, or node list, partition map, and fullmap.
+%% @end
+snafu(Filename) ->
+ NewState = case file:consult(Filename) of
+ {ok, [Terms]} ->
+ Terms;
+ Error ->
+ throw(Error)
+ end,
+ #membership{nodes=Nodes, partitions=PMap, fullmap=Fullmap} = NewState,
+ snafu(Nodes, PMap, Fullmap).
+
+
+snafu(Nodes, PMap, Fullmap) ->
+ NewState = #membership{node=node(), nodes=Nodes,
+ partitions=PMap, fullmap=Fullmap, version=vector_clock:create(dbcore)},
+ update_ets(ets_name(node()), NewState),
+ fire_gossip(node(), Nodes, NewState),
+ save(NewState).
+
+
+check_nodes() ->
+ ErlangNodes = lists:usort([node() | erlang:nodes()]),
+ {ok, MemNodeList} = membership2:nodes(),
+ MemNodes = lists:usort(MemNodeList),
+ {PMapNodeList, _PMapPartList} = lists:unzip(partitions()),
+ PMapNodes = lists:usort(PMapNodeList),
+ case ErlangNodes =:= MemNodes andalso
+ ErlangNodes =:= PMapNodes andalso
+ MemNodes =:= PMapNodes of
+ true -> true;
+ _ ->
+ Msg = "membership: Node Lists do not match.~n"
+ "Erlang Nodes : ~p~n"
+ "Membership Nodes : ~p~n"
+ "PMap Nodes : ~p~n",
+ Lst = [ErlangNodes, MemNodes, PMapNodes],
+ showroom_log:message(error, Msg, Lst),
+ io:format(Msg, Lst),
+ false
+ end.
+
+
+%% @doc retrieve the primary partition map. This is a list of partitions and
+%% their corresponding primary node, no replication partner nodes.
+partitions() ->
+ ets_pmap().
+
+
+%% @doc retrieve the full partition map, like above, but including replication
+%% partner nodes. List should number 2^Q * N
+fullmap() ->
+ lists:keysort(2, ets_fullmap()).
+
+
+%% @doc pretty-print the full partition map (sorted by node, then part)
+pp_fullmap() ->
+ lists:foreach(
+ fun({N,P}) ->
+ io:format("~-60s ~s~n", [N, showroom_utils:int_to_hexstr(P)])
+ end,
+ lists:sort(membership2:all_nodes_parts(true))).
+
+
+%% @doc get the current vector clock from membership state
+clock() ->
+ gen_server:call(membership, clock).
+
+
+%% @doc get the list of cluster nodes (according to membership module)
+%% This may differ from erlang:nodes()
+nodes() ->
+ gen_server:call(membership, nodes).
+
+
+%% @doc get all the responsible nodes for a given partition, including
+%% replication partner nodes
+nodes_for_part(Part) ->
+ nodes_for_part(Part, all_nodes_parts(true)).
+
+
+nodes_for_part(Part, NodePartList) ->
+ Filtered = lists:filter(fun({_N, P}) -> P =:= Part end, NodePartList),
+ {Nodes, _Parts} = lists:unzip(Filtered),
+ lists:usort(Nodes).
+
+
+nodes_for_shard(ShardName) when is_binary(ShardName) ->
+ nodes_for_shard(binary_to_list(ShardName));
+
+nodes_for_shard(ShardName) when is_list(ShardName) ->
+ HexPart = case string:rchr(ShardName, $_) + 1 of
+ 1 -> ShardName;
+ Last -> string:substr(ShardName, Last)
+ end,
+ Int = showroom_utils:hexstr_to_int(HexPart),
+ {_, Parts} = lists:unzip(membership2:partitions()),
+ nodes_for_part(partitions:int_to_partition(Int, Parts)).
+
+
+%% @doc get all the responsible nodes and partitions for a given key, including
+%% nodes/parts on replication partner nodes
+nodeparts_for_key(Key) ->
+ int_node_parts_for_key(Key).
+
+
+%% @doc get a list of all the nodes marked down in this node's fullmap
+nodes_down() ->
+ Downs = lists:foldl(fun({N,_P,{nodedown, _T}}, AccIn) -> [N|AccIn];
+ (_, AccIn) -> AccIn end, [], fullmap()),
+ lists:usort(Downs).
+
+
+%% @doc return the partition responsible for the given Key
+partition_for_key(Key) ->
+ Config = configuration:get_config(),
+ Hash = lib_misc:hash(Key),
+ partitions:hash_to_partition(Hash, Config#config.q).
+
+
+%% @doc return the partitions that reside on a given node
+parts_for_node(Node) ->
+ lists:sort(lists:foldl(fun({N,P,_Type}, AccIn) ->
+ case N of
+ Node -> [P | AccIn];
+ _ -> AccIn
+ end
+ end, [], fullmap())).
+
+
+%% @doc get all the nodes and partitions in the cluster. Depending on the
+%% AllPartners param, you get only primary nodes or replication partner
+%% nodes, as well.
+%% No nodes/parts currently down are returned.
+all_nodes_parts(false) ->
+ ets_pmap();
+all_nodes_parts(true) ->
+ mem_utils:nodeparts_up(ets_fullmap()).
+
+
+%% @doc If a local storage server exists for this partition it will be taken
+%% out of rotation until put back in.
+%% @end
+take_offline(Node, Partition) when Node =:= node() ->
+ gen_server:call(membership, {take_offline, Partition});
+
+take_offline(Node, Partition)->
+ gen_server:call({membership, Node}, {take_offline, Partition}).
+
+
+%% @doc Brings a storage server that has been taken offline back online.
+%% @end
+bring_online(Node, Partition) ->
+ showroom_log:message(debug, "membership: bring_online Node: ~p Partition: ~p",
+ [Node, Partition]),
+ gen_server:call({membership, Node}, {bring_online, Partition}).
+
+
+%% @doc cleans up the remaining .couch shard/partition file after it has been
+%% moved to a new node.
+decommission_part(Node, Part, DbName) ->
+ gen_server:cast({membership, Node}, {decommission, Part, DbName}).
+
+
+%%====================================================================
+%% gen_server callbacks
+%%====================================================================
+
+%%--------------------------------------------------------------------
+%% @spec init(Args) -> {ok, State} |
+%% {ok, State, Timeout} |
+%% ignore |
+%% {stop, Reason}
+%% @doc Initiates the server
+%% @end
+%%--------------------------------------------------------------------
+init([Node, Nodes, Args]) ->
+ process_flag(trap_exit,true),
+ showroom_log:message(info, "membership: membership server starting...", []),
+ Options = lists:flatten(Args),
+ showroom_log:message(info, "membership: options ~p", [Options]),
+ net_kernel:monitor_nodes(true),
+ Config = configuration:get_config(),
+ PersistentState=#membership{partitions=PersistentParts} = load(Node),
+ PartnersPlus = replication:partners_plus(Node, Nodes),
+ State =
+ case mem_utils:use_persistent(PartnersPlus, PersistentParts) of
+ false ->
+ showroom_log:message(info, "membership: not using persisted state", []),
+ % didn't find persistent state on disk or this node was nodedown
+ % so we don't want to use persisted state
+ PartialNodes = lists:usort(Nodes),
+ {NewVersion, RemoteNodes, NewPMap1, NewFullMap1} =
+ join_to(Node, PartnersPlus, Options),
+ NewWorldNodes = lists:usort(PartialNodes ++ RemoteNodes),
+ NewPMap = case NewPMap1 of
+ [] -> partitions:create_partitions(Config#config.q, Node,
+ NewWorldNodes);
+ _ -> NewPMap1
+ end,
+ NewFullMap = case NewFullMap1 of
+ [] -> make_all_nodes_parts(NewPMap);
+ _ -> NewFullMap1
+ end,
+ #membership{
+ node=Node,
+ nodes=NewWorldNodes,
+ partitions=lists:keysort(2,NewPMap),
+ % version=vector_clock:increment(dbcore, NewVersion),
+ version=NewVersion,
+ fullmap=NewFullMap};
+ _ ->
+ % found persistent state on disk
+ showroom_log:message(info, "membership: using persisted state", []),
+ case Options of
+ [] -> ok;
+ _ ->
+ showroom_log:message(info, "membership: options ~p ignored.", [Options])
+ end,
+ %% fire gossip even if state comes from disk
+ fire_gossip(Node, Nodes, PersistentState),
+ PersistentState
+ end,
+ save(State),
+ % ets table is an optimization for cluster_ops performance
+ Ets = ets:new(ets_name(Node), [public, set, named_table]),
+ update_ets(Ets, State),
+ {ok, State}.
+
+
+%%--------------------------------------------------------------------
+%% @spec
+%% handle_call(Request, From, State) -> {reply, Reply, State} |
+%% {reply, Reply, State, Timeout} |
+%% {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, Reply, State} |
+%% {stop, Reason, State}
+%% @doc Handling call messages
+%% @end
+%%--------------------------------------------------------------------
+
+%% join
+handle_call({join, JoiningNode, Options}, _From,
+ State = #membership{version=Version, node=Node, nodes=Nodes,
+ partitions=Partitions, fullmap=OldFullMap}) ->
+ JoinType = mem_utils:join_type(JoiningNode, OldFullMap, Options),
+ showroom_log:message(alert, "membership: node ~p wants to join, type '~p'",
+ [JoiningNode, JoinType]),
+ {PMap, NewFullmap} = case JoinType of
+ rejoin ->
+ mem_utils:fix_mappings(rejoin, JoiningNode, OldFullMap);
+ {replace, OldNode} ->
+ mem_utils:fix_mappings(replace, {OldNode, JoiningNode}, OldFullMap);
+ new ->
+ Hints = proplists:get_value(hints, Options),
+ PMap1 = case partitions:join(JoiningNode, Partitions, Hints) of
+ {ok, Table} -> Table;
+ {error, Error, _Table} -> throw({join_error, Error})
+ end,
+ Fullmap1 = make_all_nodes_parts(PMap1),
+ {PMap1, Fullmap1}
+ end,
+ WorldNodes = lists:usort(Nodes ++ [JoiningNode]),
+ NewVersion = vector_clock:increment(dbcore, Version),
+ NewState1 = State#membership{nodes=WorldNodes, partitions=PMap,
+ version=NewVersion},
+ {Fullmap, NewState2} = case proplists:get_value(bootstrap, Options) of
+ true ->
+ % join not complete until bootstrap finishes,
+ % so this NewState isn't the final (i.e. NewState1 will be installed)
+ showroom_log:message(info, "membership: bootstrap process starting", []),
+ bootstrap_manager:start_bootstrap(NewState1, OldFullMap, NewFullmap);
+ _ ->
+ % no bootstrap, so install NewFullmap now
+ showroom_log:message(info, "membership: no bootstrap", []),
+ {NewFullmap, NewState1#membership{fullmap=NewFullmap}}
+ end,
+ save(NewState2),
+ update_ets(ets_name(node()), NewState2),
+ notify(node_join, [JoiningNode]),
+ fire_gossip(Node, WorldNodes, NewState2),
+ % If we're bootstrapping, then the join is not complete.
+ % So return FullMap for now. bootstrap_manager:end_bootstrap will fix it
+ {reply, {ok, NewVersion, WorldNodes, PMap, Fullmap}, NewState2};
+
+%% clock
+handle_call(clock, _From, State = #membership{version=Version}) ->
+ {reply, Version, State};
+
+%% state
+handle_call(state, _From, State) ->
+ {reply, State, State};
+
+%% newfullmap
+handle_call({newfullmap, NewFullMap}, _From,
+ State = #membership{node=Node, nodes=Nodes, version=Version}) ->
+ NewVersion = vector_clock:increment(dbcore, Version),
+ NewState = State#membership{version=NewVersion, fullmap=NewFullMap},
+ save(NewState),
+ update_ets(ets_name(node()), NewState),
+ fire_gossip(Node, Nodes, NewState),
+ {reply, installed, NewState};
+
+%% partitions
+handle_call(partitions, _From, State = #membership{partitions=Parts}) ->
+ {reply, {ok, Parts}, State};
+
+%% fullmap
+handle_call(fullmap, _From, State = #membership{fullmap=FullMap}) ->
+ {reply, {ok, FullMap}, State};
+
+%% nodes
+handle_call(nodes, _From, State = #membership{nodes=Nodes}) ->
+ {reply, {ok, Nodes}, State};
+
+%% take_offline
+handle_call({take_offline, Partition}, _From,
+ State = #membership{node=Node, nodes=Nodes, fullmap=OldFullMap}) ->
+ showroom_log:message(info, "membership: take_offline Node: ~p Partition: ~p",
+ [Node, Partition]),
+ NewFullMap = mem_utils:remove_partition(OldFullMap, Node, Partition),
+ NewState = State#membership{fullmap=NewFullMap},
+ fire_gossip(Node, Nodes, NewState),
+ update_ets(ets_name(node()), NewState),
+ {reply, {offline, Node, Partition}, NewState};
+
+%% at least reply that this 'catch-all' was ignored
+handle_call(_Request, _From, State) ->
+ {reply, ignored, State}.
+
+
+%%--------------------------------------------------------------------
+%% @spec handle_cast(Msg, State) -> {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State}
+%% @doc Handling cast messages
+%% @end
+%%--------------------------------------------------------------------
+
+handle_cast({gossip, RemoteState = #membership{node=RemoteNode}},
+ LocalState = #membership{node=_Me}) ->
+ showroom_log:message(info, "membership: received gossip from ~p",
+ [RemoteNode]),
+ {MergeType, MergedState = #membership{nodes=_MergedNodes}} =
+ merge_state(RemoteState, LocalState),
+ case MergeType of
+ equal -> {noreply, MergedState};
+ merged ->
+ showroom_log:message(info, "membership: merged new gossip", []),
+ % fire_gossip(Me, MergedNodes, MergedState),
+ update_ets(ets_name(node()), MergedState),
+ save(MergedState),
+ {noreply, MergedState}
+ end;
+
+% decommission
+% renaming for now, until case 1245 can be completed
+handle_cast({decommission, Part, DbName}, State) ->
+ {{Y,Mon,D}, {H,Min,S}} = calendar:universal_time(),
+ Directory = couch_config:get("couchdb", "database_dir"),
+ OrigFilename = showroom_utils:full_filename(Part, DbName, Directory),
+ Moved = lists:flatten(io_lib:format(".~w~2.10.0B~2.10.0B." ++
+ "~2.10.0B~2.10.0B~2.10.0B.moved.couch", [Y,Mon,D,H,Min,S])),
+ % Note: this MovedFilename bit below gives weird results:
+ % ["/Users/brad/dev/erlang/dbcore/tmp/lib/x800000/test_800000",
+ % ".20091001.162640.moved.couch"] but list/string behavior handles it.
+ MovedFilename = lists:map(fun(E) -> binary_to_list(E) end,
+ re:replace(OrigFilename, "\.couch", Moved, [])),
+ ok = file:rename(OrigFilename, MovedFilename),
+ {noreply, State}.
+
+
+%% @doc handle nodedown messages because we have
+%% net_kernel:monitor_nodes(true)
+handle_info({nodedown, Node},
+ State = #membership{nodes=OldNodes, fullmap=OldFullmap,
+ version=OldVersion}) ->
+ showroom_log:message(alert, "membership: nodedown from ~p", [Node]),
+ case lists:member(Node, OldNodes) of
+ true ->
+ notify(nodedown, [Node]),
+ % clean up membership state
+ Nodes = lists:delete(Node, OldNodes),
+ {PMap, Fullmap} = mem_utils:fix_mappings(nodedown, Node, OldFullmap),
+ % Do we increment clock here? w/o gossip?
+ % This is happening near simultaneously on the other nodes, too :\
+ % Only reason to increment is persisted clock on down node will be older
+ % when it returns
+ Version = vector_clock:increment(dbcore, OldVersion),
+ NewState = State#membership{nodes=Nodes, partitions=PMap, fullmap=Fullmap,
+ version=Version},
+ update_ets(ets_name(node()), NewState),
+ save(NewState),
+ {noreply, NewState};
+ _ -> {noreply, State}
+ end;
+
+%% @doc handle nodeup messages because we have
+%% net_kernel:monitor_nodes(true)
+handle_info({nodeup, Node}, State) ->
+ showroom_log:message(alert, "membership: nodeup Node: ~p", [Node]),
+ {noreply, State};
+
+handle_info(Info, State) ->
+ showroom_log:message(info, "membership: handle_info Info: ~p", [Info]),
+ {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% @spec terminate(Reason, State) -> void()
+%% @doc This function is called by a gen_server when it is about to
+%% terminate. It should be the opposite of Module:init/1 and do any necessary
+%% cleaning up. When it returns, the gen_server terminates with Reason.
+%% The return value is ignored.
+%% @end
+%%--------------------------------------------------------------------
+terminate(_Reason, _State) ->
+ ok.
+
+%% 0.5.6 to 0.5.7
+code_change(184380560337424323902805568963460261434, State, _Extra) ->
+ backup_old_config_file(),
+ % update State to the new version
+ {membership, _Hdr, Node, Nodes, PMap, Version} = State,
+ NewState = #membership{
+ node = Node,
+ nodes = Nodes,
+ partitions = PMap,
+ version = Version,
+ fullmap = make_all_nodes_parts(PMap)
+ },
+ save(NewState),
+ % also create new ets table
+ Ets = ets:new(ets_name(Node), [public, set, named_table]),
+ update_ets(Ets, NewState),
+ {ok, NewState};
+
+%% 0.8.8 to 0.9.0
+code_change(239470595681156900105628017899543243419, State, _Extra) ->
+ net_kernel:monitor_nodes(true),
+ {ok, State};
+
+code_change(OldVsn, State, _Extra) ->
+ io:format("Unknown Old Version!~nOldVsn: ~p~nState : ~p~n", [OldVsn, State]),
+ {ok, State}.
+
+%%--------------------------------------------------------------------
+%%% Internal functions
+%%--------------------------------------------------------------------
+
+backup_old_config_file() ->
+ Config = configuration:get_config(),
+ FileName = filename:join([Config#config.directory,
+ lists:concat([node:name(node()), ".state"])]),
+ BackupName = filename:join([Config#config.directory,
+ lists:concat([node:name(node()), ".state.bak"])]),
+ file:copy(FileName, BackupName).
+
+
+%% return State from membership file
+load(Node) ->
+ Config = configuration:get_config(),
+ case file:consult(filename:join([Config#config.directory,
+ lists:concat([node:name(Node), ".state"])])) of
+ {error, Reason} ->
+ showroom_log:message(info, "membership: could not load state: ~p~n",
+ [Reason]),
+ #membership{nodes=[]};
+ {ok, [Terms]} ->
+ Terms
+ end.
+
+
+%% save the State to a file
+save(State) ->
+ Config = configuration:get_config(),
+ Filename = filename:join([Config#config.directory,
+ lists:concat([node:name(State#membership.node), ".state"])]),
+ {ok, File} = file:open(Filename, [binary, write]),
+ io:format(File, "~w.~n", [State]),
+ file:close(File).
+
+
+%% joining is bi-directional, as opposed to gossip which is unidirectional
+%% we want to collect the list of known nodes to compute the partition map
+%% which isn't necessarily the same as the list of running nodes
+join_to(Node, Partners, Options) ->
+ join_to(Node, Partners,
+ {vector_clock:create(dbcore), [], [], []}, Options).
+
+
+%% @doc join this node to one of its partners (or PartnersPlus if no partners
+%% are available).
+join_to(_, [], {Version, World, PMap, FullMap}, _Options) ->
+ {Version, World, PMap, FullMap};
+
+join_to(Node, [Partner|Rest], {Version, World, PMap, FullMap}, Options) ->
+ case call_join(Partner, Node, Options) of
+ {ok, RemoteVersion, NewNodes, NewPMap, NewFullMap} ->
+ {vector_clock:merge(Version, RemoteVersion),
+ lists:usort(World ++ NewNodes),
+ NewPMap,
+ NewFullMap};
+ Other ->
+ showroom_log:message(info, "membership: join_to Other: ~p~n", [Other]),
+ join_to(Node, Rest, {Version, World, PMap, FullMap}, Options)
+ end.
+
+
+%% @doc make the join call to Remote node (usually a partner of Node)
+call_join(Remote, Node, Options) ->
+ showroom_log:message(info, "membership: call_join From: ~p To: ~p",
+ [Node, Remote]),
+ catch gen_server:call({membership, node:name(Remote)},
+ {join, Node, Options}).
+
+
+merge_state(_RemoteState=#membership{version=RemoteVersion, nodes=RemoteNodes,
+ partitions=RemotePMap,
+ fullmap=RemoteFullMap},
+ LocalState=#membership{version=LocalVersion, nodes=LocalNodes,
+ partitions=LocalPMap,
+ fullmap=LocalFullMap}) ->
+ case vector_clock:equals(RemoteVersion, LocalVersion) of
+ true ->
+ {equal, LocalState};
+ false ->
+ % Note, we're matching MergedVersion from these funs.
+ % They should be the same.
+ {MergedVersion, MergedNodes} =
+ merge_nodes(RemoteVersion, RemoteNodes, LocalVersion, LocalNodes),
+ {MergedVersion, MergedPMap} =
+ merge_pmaps(RemoteVersion, RemotePMap, LocalVersion, LocalPMap),
+ {MergedVersion, MergedFullMap} =
+ merge_fullmaps(RemoteVersion, RemoteFullMap,
+ LocalVersion, LocalFullMap),
+
+ % notify of arrivals & departures
+ Arrived = MergedNodes -- LocalNodes,
+ notify(node_join, Arrived),
+ % Departed = LocalNodes -- MergedNodes,
+ % notify(node_leave, Departed),
+
+ {merged, LocalState#membership{version=MergedVersion, nodes=MergedNodes,
+ partitions=MergedPMap,
+ fullmap=MergedFullMap}}
+ end.
+
+
+merge_nodes(RemoteVersion, RemoteNodes, LocalVersion, LocalNodes) ->
+ {MergedVersion, Merged} = vector_clock:resolve({RemoteVersion, RemoteNodes},
+ {LocalVersion, LocalNodes}),
+ {MergedVersion, lists:usort(Merged)}.
+
+
+merge_pmaps(RemoteVersion, RemotePMap, LocalVersion, LocalPMap) ->
+ {MergedVersion, Merged} = vector_clock:resolve({RemoteVersion, RemotePMap},
+ {LocalVersion, LocalPMap}),
+ {MergedVersion, lists:ukeysort(2, Merged)}.
+
+
+merge_fullmaps(RemoteVersion, RemoteFullMap, LocalVersion, LocalFullMap) ->
+ {MergedVersion, Merged} = vector_clock:resolve({RemoteVersion, RemoteFullMap},
+ {LocalVersion, LocalFullMap}),
+ {MergedVersion, lists:usort(Merged)}.
+
+
+notify(Type, Nodes) ->
+ lists:foreach(fun(Node) ->
+ gen_event:notify(membership_events, {Type, Node})
+ end, Nodes).
+
+
+%% @doc fires a gossip message (membership state) to partners nodes in the
+%% cluster.
+%% @end
+fire_gossip(Me, WorldNodes, Gossip) ->
+ % GossipPartners = partners_plus(Me, WorldNodes),
+ % random experiment, gossip with all nodes, not just partners_plus
+ GossipPartners = lists:delete(Me, WorldNodes),
+ lists:foreach(fun(TargetNode) ->
+ showroom_log:message(info, "membership: firing gossip from ~p to ~p",
+ [Me, TargetNode]),
+ gen_server:cast({membership, TargetNode}, {gossip, Gossip})
+ end, GossipPartners).
+
+
+%% @doc construct a table with all partitions, with the primary node and all
+%% replication partner nodes as well.
+make_all_nodes_parts(PMap) ->
+ {Nodes, _Parts} = lists:unzip(PMap),
+ NodeParts = lists:flatmap(
+ fun({Node,Part}) ->
+ Partners = replication:partners(Node, lists:usort(Nodes)),
+ PartnerList = [{Partner, Part, partner} || Partner <- Partners],
+ [{Node, Part, primary} | PartnerList]
+ end, PMap),
+ NodeParts.
+
+
+%% @doc for the given key, return a list of {Node,Part} tuples. Nodes are both
+%% primary and replication partner nodes, and should number N.
+int_node_parts_for_key(Key) ->
+ Config = configuration:get_config(),
+ Hash = lib_misc:hash(Key),
+ Part = partitions:hash_to_partition(Hash, Config#config.q),
+ NodePartList = all_nodes_parts(true),
+ lists:filter(fun({_N,P}) -> P =:= Part end, NodePartList).
+
+
+%% ets table helper functions
+ets_name(Node) ->
+ list_to_atom(lists:concat(["mem_", atom_to_list(Node)])).
+
+
+update_ets(Table, #membership{partitions=PMap, fullmap=FullMap}) ->
+ ets:insert(Table, {pmap, PMap}),
+ ets:insert(Table, {fullmap, FullMap}),
+ ok.
+
+
+ets_pmap() ->
+ [{pmap, PMap}] = ets:lookup(ets_name(node()), pmap),
+ PMap.
+
+
+ets_fullmap() ->
+ [{fullmap, FullMap}] = ets:lookup(ets_name(node()), fullmap),
+ FullMap.
diff --git a/src/node.erl b/src/node.erl
new file mode 100644
index 00000000..9a9c82c1
--- /dev/null
+++ b/src/node.erl
@@ -0,0 +1,39 @@
+%%%-------------------------------------------------------------------
+%%% File: node.erl
+%%% @author Cliff Moon <> []
+%%% @copyright 2009 Cliff Moon
+%%% @doc
+%%%
+%%% @end
+%%%
+%%% @since 2009-05-11 by Cliff Moon
+%%%-------------------------------------------------------------------
+-module(node).
+-author('cliff@powerset.com').
+
+%% API
+-export([name/1, attributes/1]).
+
+-include("../include/common.hrl").
+
+%% -ifdef(TEST).
+%% -include("../etest/node_test.erl").
+%% -endif.
+
+%%====================================================================
+%% API
+%%====================================================================
+
+name(Name) when is_atom(Name) ->
+ Name;
+name(Node) when is_tuple(Node) ->
+ element(1, Node);
+name(Node) ->
+ Node.
+
+attributes(Name) when is_atom(Name) ->
+ [];
+attributes(Node) when is_tuple(Node) ->
+ element(2, Node);
+attributes(_) ->
+ [].
diff --git a/src/partitions.erl b/src/partitions.erl
new file mode 100644
index 00000000..942968e1
--- /dev/null
+++ b/src/partitions.erl
@@ -0,0 +1,334 @@
+%%%-------------------------------------------------------------------
+%%% File: partitions.erl
+%%% @author Cliff Moon <cliff@powerset.com> [http://www.powerset.com/]
+%%% @copyright 2008 Cliff Moon
+%%% @doc
+%%%
+%%% @end
+%%%
+%%% @since 2008-10-12 by Cliff Moon
+%%%-------------------------------------------------------------------
+-module(partitions).
+-author('cliff@powerset.com').
+
+%% API
+-export([partition_range/1, create_partitions/3, map_partitions/2,
+ diff/2, pp_diff/1, int_to_partition/2,
+ join/3, leave/3, hash/1, hash_to_partition/2, item_to_nodepart/1,
+ shard_name/2, hash_to_hex/2]).
+
+-define(RINGTOP, trunc(math:pow(2,160)-1)). % SHA-1 space
+
+-include("../../couch/src/couch_db.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+%% -ifdef(TEST).
+%% -include("etest/partitions_test.erl").
+%% -endif.
+
+%%====================================================================
+%% API
+%%====================================================================
+
+partition_range(Q) ->
+ trunc( ?RINGTOP / math:pow(2,Q) ). % SHA-1 space / 2^Q
+
+create_partitions(Q, Node, _Nodes) ->
+ fresh(trunc(math:pow(2,Q)), Node).
+ % map_partitions(Table, Nodes).
+
+
+%% @spec map_partitions(Table::proplist(),Nodes::list()) -> proplist()
+%% @doc maps partitions to nodes. The resulting list should be Dynomite format,
+%% namely {Node,Part}
+%% @end
+map_partitions(Table, Nodes) ->
+ {_Nodes, Parts} = lists:unzip(Table),
+ do_map(Nodes, Parts).
+
+
+%% @doc in case Hints is undefined, turn it into a list for clauses below.
+join(Node, Table, undefined) ->
+ join(Node, Table, []);
+
+%% @spec join(node(), proplist(), list()) -> {ok, PartTable::proplist()} |
+%% {error, Error}
+%% @doc given a node, current partition table, and hints, this function returns
+%% the new partition table
+join(Node, Table, Hints) ->
+ {NodeList, Parts} = lists:unzip(Table),
+ OtherNodes = lists:delete(Node, NodeList),
+ OtherDistinctNodes = lists:usort(OtherNodes),
+ %% quick check to see if we have more nodes than partitions
+ if
+ length(Parts) == length(OtherDistinctNodes) ->
+ {error, "Too many nodes vs partitions", Table};
+ true ->
+ AlreadyPresent = length(NodeList) - length(OtherNodes),
+ Nodes = lists:usort(NodeList),
+ PartCountToTake = trunc(length(Parts) / (length(Nodes) + 1)),
+ %% calcs done, let's steal some partitions
+ {HintsTaken, NewTable} = steal_hints(Node, Table, Hints),
+ if
+ PartCountToTake - AlreadyPresent - HintsTaken > 0 ->
+ steal_partitions(Node, OtherDistinctNodes, NewTable,
+ PartCountToTake - AlreadyPresent - HintsTaken);
+ true ->
+ %% no partitions to take
+ {ok, NewTable}
+ end
+ end.
+
+
+%% TODO: implement me
+leave(_Node, Table, _Hints) ->
+ Table.
+
+
+diff(From, To) when length(From) =/= length(To) ->
+ {error, badlength, "Cannot diff partition maps with different length"};
+
+diff(From, To) ->
+ diff(sort_for_diff(From), sort_for_diff(To), []).
+
+
+pp_diff(Diff) ->
+ lists:map(
+ fun({F,T,Part}) -> {F,T,showroom_utils:int_to_hexstr(Part)} end,
+ Diff).
+
+
+%% @spec hash(term()) -> Digest::binary()
+%% @doc Showroom uses SHA-1 as its hash
+hash(Item) ->
+ crypto:sha(term_to_binary(Item)).
+
+
+%% @spec hash_to_partition(binary(), integer()) -> integer()
+%% @doc given a hashed value and Q, return the partition
+hash_to_partition(Hash, Q) ->
+ HashInt = hash_int(Hash),
+ Size = partition_range(Q),
+ Factor = (HashInt div Size),
+ Rem = (HashInt rem Size),
+ if
+ Rem > 0 -> Factor * Size;
+ true -> ((Factor-1) * Size)
+ end.
+
+
+hash_to_hex(Hash, Q) ->
+ Part = hash_to_partition(Hash, Q),
+ showroom_utils:int_to_hexstr(Part).
+
+
+%% @doc given an int and a list of partitions, get the first part greater
+%% than Int. Used for a hex part being turned back into an int.
+int_to_partition(Int, Parts) ->
+ Rem = lists:dropwhile(fun(E) -> E < Int end, lists:sort(Parts)),
+ case Rem of
+ [] -> 0; % wrap-around-ring case (back to 0)
+ [H|_T] -> H
+ end.
+
+
+%% @spec item_to_nodepart(bin()) -> {Node::node(),Part::integer()}
+%% @doc given a raw item, return the node/partition/shard
+%% name based on consistent hashing
+item_to_nodepart(Item) when is_binary(Item) ->
+ Q = list_to_integer(couch_config:get("cluster","q")),
+ Hash = hash(?b2l(Item)),
+ Part = hash_to_partition(Hash, Q),
+ {ok, Table} = membership2:partitions(),
+ lists:keyfind(Part, 2, Table);
+
+item_to_nodepart(Item) ->
+ item_to_nodepart(term_to_binary(Item)).
+
+
+%% @spec shard_name(integer(), binary()) -> binary()
+%% @doc create shard name
+shard_name(Part, DbName) ->
+ PartHex = ?l2b(showroom_utils:int_to_hexstr(Part)),
+ <<"x", PartHex/binary, "/", DbName/binary, "_", PartHex/binary>>.
+
+%%====================================================================
+%% Internal functions
+%%====================================================================
+
+%% @doc Create a brand new table. The size and seednode are specified;
+%% initially all partitions are owned by the seednode. If NumPartitions
+%% is not much larger than the intended eventual number of
+%% participating nodes, then performance will suffer.
+%% from http://code.google.com/p/distributerl (trunk revision 4) chash:fresh/2
+%% @spec fresh(NumPartitions :: integer(), SeedNode :: node()) -> table()
+fresh(NumPartitions, SeedNode) ->
+ Increment = ?RINGTOP div NumPartitions,
+ [{SeedNode, IndexAsInt} || IndexAsInt <- lists:seq(0,(?RINGTOP-1),Increment)].
+
+
+%% @spec steal_hints(node(), proplist(), list( integer() )) ->
+%% {integer(), proplist()}
+%% @doc move the partitions listed in Hints over to the new owner, Node
+steal_hints(Node, Table, Hints) ->
+ steal_hints(Node, Table, Hints, 0).
+
+
+%% @doc recursive workhorse for hints mechanism, Acc is tracking how many
+%% hints/partitions were successfully moved to a new Node.
+%% @end
+steal_hints(_Node, Table, [], Acc) ->
+ {Acc, Table};
+
+steal_hints(Node, Table, [Hint|RestHints], Acc) ->
+ {Status, NewTable} = swap_node_for_part(Node, Hint, Table),
+ Acc1 = case Status of
+ ok -> Acc+1;
+ _ -> Acc
+ end,
+ steal_hints(Node, NewTable, RestHints, Acc1).
+
+
+%% @doc take a part from one of the other nodes based on most # of parts per
+%% node.
+%% @end
+%% TODO: This fun does list ops on the Table each time through. Inefficient?
+%% Hopefully not, due to small Table sizes
+steal_partitions(_Node, _OtherNodes, Table, 0) ->
+ {ok, Table};
+steal_partitions(Node, OtherNodes, Table, Count) ->
+ %% first, get a list of OtherNodes and their partition counts
+ NPCountFun = fun(N) ->
+ L = proplists:get_all_values(N, Table),
+ {N, length(lists:delete(undefined, L))}
+ end,
+ NPCounts = lists:reverse(lists:keysort(2,lists:map(NPCountFun, OtherNodes))),
+ %% grab the node that has the most partitions
+ [{TakeFrom, _PartsCount}|_RestOfTable] = NPCounts,
+ %% get the highest # partition of the TakeFrom node
+ TakeFromParts = lists:reverse(lists:sort(proplists:get_all_values(TakeFrom,
+ Table))),
+ [Part|_RestOfParts] = TakeFromParts,
+ {ok, NewTable} = swap_node_for_part(Node, Part, Table),
+ steal_partitions(Node, OtherNodes, NewTable, Count-1).
+
+
+%% @doc Make Node the owner of the partition beginning at Part.
+%% from http://code.google.com/p/distributerl (trunk revision 4) chash:update/3
+swap_node_for_part(Node, Part, Table) ->
+ case lists:keymember(Part, 2, Table) of
+ true ->
+ GapList = [{N,P} || {N,P} <- Table, P /= Part],
+ {A, B} = lists:partition(fun({_,K1}) -> K1 < Part end, GapList),
+ {ok, A ++ [{Node, Part}] ++ B};
+ false ->
+ showroom_log:message(info,
+ "'~p' partition was not found in partition table", [Part]),
+ {noswap, Table}
+ end.
+
+
+%% @doc get the difference between two FullPMaps
+%% lists need to be sorted by part, then node
+diff([], [], Results) ->
+ lists:reverse(remove_dupes(Results));
+
+diff([{Node,Part,_}|PartsA], [{Node,Part,_}|PartsB], Results) ->
+ diff(PartsA, PartsB, Results);
+
+diff([{NodeA,Part,_}|PartsA], [{NodeB,Part,_}|PartsB], Results) ->
+ diff(PartsA, PartsB, [{NodeA,NodeB,Part}|Results]).
+
+
+%% @doc sorts the full map for diff/3. This may change to get more accurate
+%% diff w/o dupes
+sort_for_diff(FullMap) ->
+ lists:keysort(2,lists:sort(FullMap)).
+
+
+remove_dupes(Diff) ->
+ {_,_,AllParts} = lists:unzip3(Diff),
+ Parts = lists:usort(AllParts),
+ remove_dupes_from_part(Parts, Diff, []).
+
+
+%% @doc ex: take [{a,b,1},{b,c,1}] diff and make it [{a,c,1}] so we don't go
+%% moving unnecessary shard files. 'Move partition 1 from a to b and
+%% then move partition 1 from b to c' is unnecessary. Just move it a to c.
+remove_dupes_from_part([], _Diff, Acc) ->
+ Acc;
+
+remove_dupes_from_part([Part|Rest], Diff, Acc) ->
+ PartData = lists:filter(fun({_,_,P}) -> P =:= Part end, Diff),
+ NewPartData = process_part_data(Part, PartData, PartData, PartData),
+ remove_dupes_from_part(Rest, Diff, lists:concat([NewPartData, Acc])).
+
+
+%% for one partition of the full diff, remove the dupes
+process_part_data(_Part, _PartData, [], Acc) ->
+ Acc;
+
+process_part_data(Part, PartData, [{From,To,_Part}|Rest], Acc) ->
+ case proplists:lookup(To, PartData) of
+ {To, NewTo, _Part} ->
+
+ Remove1 = proplists:delete(To, PartData),
+ Remove2 = proplists:delete(From, Remove1),
+ NewPartData = [{From, NewTo, Part}|Remove2],
+ %?debugFmt("~nFrom : ~p~nTo : ~p~nNewTo: ~p~n"
+ % "Remove1: ~p~nRemove2: ~p~n"
+ % "NewPartData: ~p~n"
+ % , [From, To, NewTo, Remove1, Remove2, NewPartData]),
+ process_part_data(Part, NewPartData, Rest, NewPartData);
+ none ->
+ process_part_data(Part, PartData, Rest, Acc)
+ end.
+
+
+% %% @doc from dynomite
+% diff([], [], Results) ->
+% lists:reverse(Results);
+
+% diff([{Node,Part}|PartsA], [{Node,Part}|PartsB], Results) ->
+% diff(PartsA, PartsB, Results);
+
+% diff([{NodeA,Part}|PartsA], [{NodeB,Part}|PartsB], Results) ->
+% diff(PartsA, PartsB, [{NodeA,NodeB,Part}|Results]).
+
+
+%% @doc does Node/Partition mapping based on Amazon Dynamo paper,
+%% section 6.2, strategy 3, more or less
+%% http://www.allthingsdistributed.com/2007/10/amazons_dynamo.html
+%% @end
+do_map([Node|RestNodes], Parts) ->
+ Max = length(Parts) / length([Node|RestNodes]),
+ do_map(Node, RestNodes, Parts, [], 1, Max).
+
+
+%% return final mapped list
+do_map(_,_,[],Mapped, _, _) ->
+ lists:keysort(1, Mapped);
+
+%% finish off last node, Cnt & Max no longer needed
+do_map(Node, [], [Part|RestParts], Mapped, _, _) ->
+ do_map(Node, [], RestParts, [{Node, Part}|Mapped], 0,0);
+
+%% workhorse clause, iterates through parts, until Cnt > Max, then advances to
+%% next node, wash, rinse, repeat
+do_map(Node, [NextNode|RestNodes], [Part|RestParts], Mapped, Cnt, Max) ->
+ case Cnt > Max of
+ true ->
+ do_map(NextNode, RestNodes, RestParts, [{Node, Part}|Mapped],
+ 1, Max);
+ false ->
+ do_map(Node, [NextNode|RestNodes], RestParts, [{Node, Part}|Mapped],
+ Cnt+1, Max)
+ end.
+
+
+%% TODO: other guards
+hash_int(Hash) when is_binary(Hash) ->
+ <<IndexAsInt:160/integer>> = Hash,
+ IndexAsInt;
+hash_int(Hash) when is_integer(Hash) ->
+ Hash.
diff --git a/src/replication.erl b/src/replication.erl
new file mode 100644
index 00000000..96be0ad3
--- /dev/null
+++ b/src/replication.erl
@@ -0,0 +1,165 @@
+%%%-------------------------------------------------------------------
+%%% File: replication.erl
+%%% @author Brad Anderson <brad@cloudant.com> [http://www.cloudant.com]
+%%% @copyright 2009 Brad Anderson
+%%% @doc
+%%%
+%%% @end
+%%%
+%%% @since 2009-06-14 by Brad Anderson
+%%%-------------------------------------------------------------------
+-module(replication).
+-author('brad@cloudant.com').
+
+%% API
+-export([partners/2, partners/3, partners_plus/2]).
+
+-include_lib("eunit/include/eunit.hrl").
+-include("../include/config.hrl").
+-include("../include/common.hrl").
+
+
+%%====================================================================
+%% API
+%%====================================================================
+
+partners(Node, Nodes) ->
+ partners(Node, Nodes, configuration:get_config()).
+
+
+%%--------------------------------------------------------------------
+%% @spec partners(Node::atom(), Nodes::list(), Config::config()) ->
+%% list()
+%% @doc returns the list of all replication partners for the specified node
+%% @end
+%%--------------------------------------------------------------------
+partners(Node, Nodes, Config) ->
+ N = Config#config.n,
+ Meta = Config#config.meta,
+ pick_partners(Meta, Node, Nodes, [], N - 1).
+
+
+%% return a list of live/up Partners, and if all Partners are down,
+%% walk the ring to get one other remote node and return it.
+partners_plus(Node, Nodes) ->
+ Partners = partners(Node, Nodes),
+ PartnersDown = lists:subtract(Partners, erlang:nodes()),
+ PartnersUp = lists:subtract(Partners, PartnersDown),
+ case PartnersUp of
+ [] ->
+ TargetNodes = target_list(Node, Nodes),
+ NonPartners = lists:subtract(TargetNodes,
+ lists:flatten([Node, Partners])),
+ walk_ring(NonPartners);
+ _ ->
+ %% at least one partner is up, so gossip w/ them
+ PartnersUp
+ end.
+
+
+%%====================================================================
+%% Internal functions
+%%====================================================================
+
+%% @spec pick_partners(proplist(), Node::dynomite_node(), [Node], [Node],
+%% integer()) -> list()
+%% @doc iterate through N-1 partner picks, returning the resulting list sorted
+pick_partners(_Meta, Node, _Nodes, Acc, 0) ->
+ lists:sort(lists:delete(Node, Acc));
+pick_partners(Meta, Node, Nodes, Acc, Count) ->
+ Partner = pick_partner(Meta, Node, Nodes, Acc, 1),
+ NewNodes = lists:filter(fun(Elem) ->
+ case Elem of
+ no_partner_found -> false;
+ Partner -> false;
+ _ -> true
+ end
+ end, Nodes),
+ NewAcc = case Partner of
+ no_partner_found -> Acc;
+ _ -> [Partner|Acc]
+ end,
+ pick_partners(Meta, Node, NewNodes, NewAcc, Count-1).
+
+
+%% @spec pick_partner(proplist(), Node::dynomite_node(), [Node], [Node],
+%% integer()) -> Node::dynomite_node()
+%% @doc pick a specific replication partner at the given level
+pick_partner([], Node, Nodes, _Acc, 1) ->
+ %% handle the no metadata situation
+ %% Note: This clause must be before the Level > length(Meta) guarded clause
+ target_key(node:name(Node), lists:map(fun node:name/1, Nodes), roundrobin);
+
+pick_partner(Meta, _Node, _Nodes, Acc, Level) when Level > length(Meta) ->
+ Acc;
+
+pick_partner(Meta, Node, Nodes, Acc, Level) ->
+ MetaDict = meta_dict(Nodes, Level, dict:new()),
+ NodeKey = lists:sublist(node:attributes(Node), Level),
+ Keys = dict:fetch_keys(MetaDict),
+ {_MetaName, Strategy} = lists:nth(Level, Meta),
+ TargetKey = target_key(NodeKey, Keys, Strategy),
+ Candidates = dict:fetch(TargetKey, MetaDict),
+ case length(Candidates) of
+ 0 ->
+ %% didn't find a candidate
+ no_partner_found;
+ 1 ->
+ %% found only one candidate, return it
+ [Partner] = Candidates,
+ Partner;
+ _ ->
+ pick_partner(Meta, Node, Nodes, Acc, Level + 1)
+ end.
+
+
+%% @doc construct a dict that holds the key of metadata values so far (up to
+%% the current level, and dynomite_node() list as the value. This is used
+%% to select a partner in pick_partner/5
+%% @end
+meta_dict([], _Level, Dict) ->
+ Dict;
+
+meta_dict([Node|Rest], Level, Dict) ->
+ Key = lists:sublist(node:attributes(Node), Level),
+ DictNew = dict:append(Key, Node, Dict),
+ meta_dict(Rest, Level, DictNew).
+
+
+%% @spec target_key(term(), list(), Strategy::atom()) -> term()
+%% @doc given the key and keys, sort the list of keys based on stragety (i.e.
+%% for roundrobin, sort them, put the NodeKey on the end of the list, and
+%% then return the head of the list as the target.
+%% @end
+%% TODO: moar strategies other than roundrobin?
+target_key(NodeKey, Keys, roundrobin) ->
+ SortedKeys = lists:sort(Keys),
+ TargetKey = case target_list(NodeKey, SortedKeys) of
+ [] -> no_partner_found;
+ [Key|_Rest] -> Key
+ end,
+ TargetKey.
+
+
+%% @spec target_list(term(), list()) -> list()
+%% @doc split the list of keys into 'lessthan NodeKey', NodeKey, and 'greaterthan
+%% Nodekey' and then put the lessthan section on the end of the list
+%% @end
+target_list(_NodeKey, []) ->
+ [];
+target_list(NodeKey, Keys) ->
+ {A, [NodeKey|B]} = lists:splitwith(fun(K) -> K /= NodeKey end, Keys),
+ lists:append([B, A, [NodeKey]]).
+
+
+walk_ring([]) ->
+ %% TODO: should we be more forceful here and throw? not for now
+ showroom_log:message(info,
+ "~p:walk_ring/1 - could not find node for gossip", [?MODULE]),
+ [];
+
+walk_ring([Node|Rest]) ->
+ case lists:member(Node, erlang:nodes()) of
+ true -> [Node];
+ _ -> walk_ring(Rest)
+ end.
diff --git a/src/vector_clock.erl b/src/vector_clock.erl
new file mode 100644
index 00000000..0a89d41e
--- /dev/null
+++ b/src/vector_clock.erl
@@ -0,0 +1,99 @@
+%%% @author Cliff Moon <cliff@powerset.com> []
+%%% @copyright 2008 Cliff Moon
+
+-module (vector_clock).
+-export ([create/1, truncate/1, increment/2, compare/2, resolve/2, merge/2,
+ equals/2]).
+
+%% -ifdef(TEST).
+%% -include("etest/vector_clock_test.erl").
+%% -endif.
+
+create(NodeName) -> [{NodeName, lib_misc:now_float()}].
+
+truncate(Clock) when length(Clock) > 10 ->
+ lists:nthtail(length(Clock) - 10, lists:keysort(2, Clock));
+
+truncate(Clock) -> Clock.
+
+increment(NodeName, [{NodeName, _Version}|Clocks]) ->
+ [{NodeName, lib_misc:now_float()}|Clocks];
+
+increment(NodeName, [NodeClock|Clocks]) ->
+ [NodeClock|increment(NodeName, Clocks)];
+
+increment(NodeName, []) ->
+ [{NodeName, lib_misc:now_float()}].
+
+resolve({ClockA, ValuesA}, {ClockB, ValuesB}) ->
+ case compare(ClockA, ClockB) of
+ less -> {ClockB, ValuesB};
+ greater -> {ClockA, ValuesA};
+ equal -> {ClockA, ValuesA};
+ concurrent ->
+ io:format("~nConcurrent Clocks~n"
+ "ClockA : ~p~nClockB : ~p~n"
+ "ValuesA: ~p~nValuesB: ~p~n"
+ , [ClockA, ClockB, ValuesA, ValuesB]),
+ {merge(ClockA,ClockB), ValuesA ++ ValuesB}
+ end;
+resolve(not_found, {Clock, Values}) ->
+ {Clock, Values};
+resolve({Clock, Values}, not_found) ->
+ {Clock, Values}.
+
+merge(ClockA, ClockB) ->
+ merge([], ClockA, ClockB).
+
+merge(Merged, [], ClockB) -> lists:keysort(1, Merged ++ ClockB);
+
+merge(Merged, ClockA, []) -> lists:keysort(1, Merged ++ ClockA);
+
+merge(Merged, [{NodeA, VersionA}|ClockA], ClockB) ->
+ case lists:keytake(NodeA, 1, ClockB) of
+ {value, {NodeA, VersionB}, TrunkClockB} when VersionA > VersionB ->
+ merge([{NodeA,VersionA}|Merged],ClockA,TrunkClockB);
+ {value, {NodeA, VersionB}, TrunkClockB} ->
+ merge([{NodeA,VersionB}|Merged],ClockA,TrunkClockB);
+ false ->
+ merge([{NodeA,VersionA}|Merged],ClockA,ClockB)
+ end.
+
+compare(ClockA, ClockB) ->
+ AltB = less_than(ClockA, ClockB),
+ if AltB -> less; true ->
+ BltA = less_than(ClockB, ClockA),
+ if BltA -> greater; true ->
+ AeqB = equals(ClockA, ClockB),
+ if AeqB -> equal; true -> concurrent end
+ end
+ end.
+
+%% ClockA is less than ClockB if and only if ClockA[z] <= ClockB[z] for all
+%% instances z and there exists an index z' such that ClockA[z'] < ClockB[z']
+less_than(ClockA, ClockB) ->
+ ForAll = lists:all(fun({Node, VersionA}) ->
+ case lists:keysearch(Node, 1, ClockB) of
+ {value, {_NodeB, VersionB}} -> VersionA =< VersionB;
+ false -> false
+ end
+ end, ClockA),
+ Exists = lists:any(fun({NodeA, VersionA}) ->
+ case lists:keysearch(NodeA, 1, ClockB) of
+ {value, {_NodeB, VersionB}} -> VersionA /= VersionB;
+ false -> true
+ end
+ end, ClockA),
+ %length takes care of the case when clockA is shorter than B
+ ForAll and (Exists or (length(ClockA) < length(ClockB))).
+
+equals(ClockA, ClockB) ->
+ Equivalent = lists:all(fun({NodeA, VersionA}) ->
+ lists:any(fun(NodeClockB) ->
+ case NodeClockB of
+ {NodeA, VersionA} -> true;
+ _ -> false
+ end
+ end, ClockB)
+ end, ClockA),
+ Equivalent and (length(ClockA) == length(ClockB)).