summaryrefslogtreecommitdiff
path: root/deps/mem3/src
diff options
context:
space:
mode:
authorMicah Anderson <micah@leap.se>2014-01-15 18:13:16 +0000
committerdrebs <drebs@leap.se>2014-01-17 08:48:11 -0200
commit510c6d763fba74f95ae8f894408c3658bcef4f83 (patch)
treed4dd0930b902cb1e5d46bea621ec83f801ea8ed6 /deps/mem3/src
parent8bd863936ead4243f58fb99e11d1221e1af0a71e (diff)
embed dependencies that were previously pulled in by git during rebar build
Diffstat (limited to 'deps/mem3/src')
-rw-r--r--deps/mem3/src/mem3.app.src13
-rw-r--r--deps/mem3/src/mem3.erl238
-rw-r--r--deps/mem3/src/mem3_app.erl23
-rw-r--r--deps/mem3/src/mem3_cache.erl118
-rw-r--r--deps/mem3/src/mem3_httpd.erl53
-rw-r--r--deps/mem3/src/mem3_nodes.erl136
-rw-r--r--deps/mem3/src/mem3_rep.erl144
-rw-r--r--deps/mem3/src/mem3_rep_manager.erl627
-rw-r--r--deps/mem3/src/mem3_sup.erl36
-rw-r--r--deps/mem3/src/mem3_sync.erl267
-rw-r--r--deps/mem3/src/mem3_sync_event.erl68
-rw-r--r--deps/mem3/src/mem3_util.erl211
12 files changed, 1934 insertions, 0 deletions
diff --git a/deps/mem3/src/mem3.app.src b/deps/mem3/src/mem3.app.src
new file mode 100644
index 00000000..88447783
--- /dev/null
+++ b/deps/mem3/src/mem3.app.src
@@ -0,0 +1,13 @@
+{application, mem3, [
+ {description, "CouchDB Cluster Membership"},
+ {vsn, git},
+ {mod, {mem3_app, []}},
+ {registered, [
+ mem3_cache,
+ mem3_events,
+ mem3_nodes,
+ mem3_sync,
+ mem3_sup
+ ]},
+ {applications, [kernel, stdlib, sasl, crypto, mochiweb, couch, twig]}
+]}.
diff --git a/deps/mem3/src/mem3.erl b/deps/mem3/src/mem3.erl
new file mode 100644
index 00000000..c7979642
--- /dev/null
+++ b/deps/mem3/src/mem3.erl
@@ -0,0 +1,238 @@
+% Copyright 2010 Cloudant
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3).
+
+-export([start/0, stop/0, restart/0, nodes/0, node_info/2, shards/1, shards/2,
+ choose_shards/2, n/1, dbname/1, ushards/1]).
+-export([compare_nodelists/0, compare_shards/1]).
+-export([quorum/1]).
+
+-include("mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+start() ->
+ application:start(mem3).
+
+stop() ->
+ application:stop(mem3).
+
+restart() ->
+ stop(),
+ start().
+
+%% @doc Detailed report of cluster-wide membership state. Queries the state
+%% on all member nodes and builds a dictionary with unique states as the
+%% key and the nodes holding that state as the value. Also reports member
+%% nodes which fail to respond and nodes which are connected but are not
+%% cluster members. Useful for debugging.
+-spec compare_nodelists() -> [{{cluster_nodes, [node()]} | bad_nodes
+ | non_member_nodes, [node()]}].
+compare_nodelists() ->
+ Nodes = mem3:nodes(),
+ AllNodes = erlang:nodes([this, visible]),
+ {Replies, BadNodes} = gen_server:multi_call(Nodes, mem3_nodes, get_nodelist),
+ Dict = lists:foldl(fun({Node, Nodelist}, D) ->
+ orddict:append({cluster_nodes, Nodelist}, Node, D)
+ end, orddict:new(), Replies),
+ [{non_member_nodes, AllNodes -- Nodes}, {bad_nodes, BadNodes} | Dict].
+
+-spec compare_shards(DbName::iodata()) -> [{bad_nodes | [#shard{}], [node()]}].
+compare_shards(DbName) when is_list(DbName) ->
+ compare_shards(list_to_binary(DbName));
+compare_shards(DbName) ->
+ Nodes = mem3:nodes(),
+ {Replies, BadNodes} = rpc:multicall(mem3, shards, [DbName]),
+ GoodNodes = [N || N <- Nodes, not lists:member(N, BadNodes)],
+ Dict = lists:foldl(fun({Shards, Node}, D) ->
+ orddict:append(Shards, Node, D)
+ end, orddict:new(), lists:zip(Replies, GoodNodes)),
+ [{bad_nodes, BadNodes} | Dict].
+
+-spec n(DbName::iodata()) -> integer().
+n(DbName) ->
+ length(mem3:shards(DbName, <<"foo">>)).
+
+-spec nodes() -> [node()].
+nodes() ->
+ mem3_nodes:get_nodelist().
+
+node_info(Node, Key) ->
+ mem3_nodes:get_node_info(Node, Key).
+
+-spec shards(DbName::iodata()) -> [#shard{}].
+shards(DbName) when is_list(DbName) ->
+ shards(list_to_binary(DbName));
+shards(DbName) ->
+ ShardDbName =
+ list_to_binary(couch_config:get("mem3", "shard_db", "dbs")),
+ case DbName of
+ ShardDbName ->
+ %% shard_db is treated as a single sharded db to support calls to db_info
+ %% and view_all_docs
+ [#shard{
+ node = node(),
+ name = ShardDbName,
+ dbname = ShardDbName,
+ range = [0, 2 bsl 31]}];
+ _ ->
+ try ets:lookup(partitions, DbName) of
+ [] ->
+ mem3_util:load_shards_from_disk(DbName);
+ Else ->
+ Else
+ catch error:badarg ->
+ mem3_util:load_shards_from_disk(DbName)
+ end
+ end.
+
+-spec shards(DbName::iodata(), DocId::binary()) -> [#shard{}].
+shards(DbName, DocId) when is_list(DbName) ->
+ shards(list_to_binary(DbName), DocId);
+shards(DbName, DocId) when is_list(DocId) ->
+ shards(DbName, list_to_binary(DocId));
+shards(DbName, DocId) ->
+ HashKey = mem3_util:hash(DocId),
+ Head = #shard{
+ name = '_',
+ node = '_',
+ dbname = DbName,
+ range = ['$1','$2'],
+ ref = '_'
+ },
+ Conditions = [{'=<', '$1', HashKey}, {'=<', HashKey, '$2'}],
+ try ets:select(partitions, [{Head, Conditions, ['$_']}]) of
+ [] ->
+ mem3_util:load_shards_from_disk(DbName, DocId);
+ Shards ->
+ Shards
+ catch error:badarg ->
+ mem3_util:load_shards_from_disk(DbName, DocId)
+ end.
+
+ushards(DbName) ->
+ Shards = mem3:shards(DbName),
+ Nodes = rotate_nodes(DbName, live_nodes()),
+ Buckets = bucket_by_range(Shards),
+ choose_ushards(Buckets, Nodes).
+
+rotate_nodes(DbName, Nodes) ->
+ {H, T} = lists:split(erlang:crc32(DbName) rem length(Nodes), Nodes),
+ T ++ H.
+
+live_nodes() ->
+ lists:sort([node()|erlang:nodes()]).
+
+bucket_by_range(Shards) ->
+ Buckets0 = lists:foldl(fun(#shard{range=Range}=Shard, Dict) ->
+ orddict:append(Range, Shard, Dict) end, orddict:new(), Shards),
+ {_, Buckets} = lists:unzip(Buckets0),
+ Buckets.
+
+choose_ushards(Buckets, Nodes) ->
+ choose_ushards(Buckets, Nodes, []).
+
+choose_ushards([], _, Acc) ->
+ lists:reverse(Acc);
+choose_ushards([Bucket|RestBuckets], Nodes, Acc) ->
+ #shard{node=Node} = Shard = first_match(Bucket, Bucket, Nodes),
+ choose_ushards(RestBuckets, lists:delete(Node, Nodes) ++ [Node],
+ [Shard | Acc]).
+
+first_match([], [#shard{range=Range}|_], []) ->
+ throw({range_not_available, Range});
+first_match([#shard{node=Node}=Shard|_], _, [Node|_]) ->
+ Shard;
+first_match([], Shards, [_|RestNodes]) ->
+ first_match(Shards, Shards, RestNodes);
+first_match([_|RestShards], Shards, Nodes) ->
+ first_match(RestShards, Shards, Nodes).
+
+-spec choose_shards(DbName::iodata(), Options::list()) -> [#shard{}].
+choose_shards(DbName, Options) when is_list(DbName) ->
+ choose_shards(list_to_binary(DbName), Options);
+choose_shards(DbName, Options) ->
+ try shards(DbName)
+ catch error:E when E==database_does_not_exist; E==badarg ->
+ Nodes = mem3:nodes(),
+ NodeCount = length(Nodes),
+ Zones = zones(Nodes),
+ ZoneCount = length(Zones),
+ N = mem3_util:n_val(couch_util:get_value(n, Options), NodeCount),
+ Q = mem3_util:to_integer(couch_util:get_value(q, Options,
+ couch_config:get("cluster", "q", "8"))),
+ Z = mem3_util:z_val(couch_util:get_value(z, Options), NodeCount, ZoneCount),
+ Suffix = couch_util:get_value(shard_suffix, Options, ""),
+ ChosenZones = lists:sublist(shuffle(Zones), Z),
+ lists:flatmap(
+ fun({Zone, N1}) ->
+ Nodes1 = nodes_in_zone(Nodes, Zone),
+ {A, B} = lists:split(crypto:rand_uniform(1,length(Nodes1)+1), Nodes1),
+ RotatedNodes = B ++ A,
+ mem3_util:create_partition_map(DbName, erlang:min(N1,length(Nodes1)),
+ Q, RotatedNodes, Suffix)
+ end,
+ lists:zip(ChosenZones, apportion(N, Z)))
+ end.
+
+-spec dbname(#shard{} | iodata()) -> binary().
+dbname(#shard{dbname = DbName}) ->
+ DbName;
+dbname(<<"shards/", _:8/binary, "-", _:8/binary, "/", DbName/binary>>) ->
+ list_to_binary(filename:rootname(binary_to_list(DbName)));
+dbname(DbName) when is_list(DbName) ->
+ dbname(list_to_binary(DbName));
+dbname(DbName) when is_binary(DbName) ->
+ DbName;
+dbname(_) ->
+ erlang:error(badarg).
+
+
+zones(Nodes) ->
+ lists:usort([mem3:node_info(Node, <<"zone">>) || Node <- Nodes]).
+
+nodes_in_zone(Nodes, Zone) ->
+ [Node || Node <- Nodes, Zone == mem3:node_info(Node, <<"zone">>)].
+
+shuffle(List) ->
+ %% Determine the log n portion then randomize the list.
+ randomize(round(math:log(length(List)) + 0.5), List).
+
+randomize(1, List) ->
+ randomize(List);
+randomize(T, List) ->
+ lists:foldl(fun(_E, Acc) -> randomize(Acc) end,
+ randomize(List), lists:seq(1, (T - 1))).
+
+randomize(List) ->
+ D = lists:map(fun(A) -> {random:uniform(), A} end, List),
+ {_, D1} = lists:unzip(lists:keysort(1, D)),
+ D1.
+
+apportion(Shares, Ways) ->
+ apportion(Shares, lists:duplicate(Ways, 0), Shares).
+
+apportion(_Shares, Acc, 0) ->
+ Acc;
+apportion(Shares, Acc, Remaining) ->
+ N = Remaining rem length(Acc),
+ [H|T] = lists:nthtail(N, Acc),
+ apportion(Shares, lists:sublist(Acc, N) ++ [H+1|T], Remaining - 1).
+
+% quorum functions
+
+quorum(#db{name=DbName}) ->
+ quorum(DbName);
+quorum(DbName) ->
+ n(DbName) div 2 + 1.
diff --git a/deps/mem3/src/mem3_app.erl b/deps/mem3/src/mem3_app.erl
new file mode 100644
index 00000000..bb27171f
--- /dev/null
+++ b/deps/mem3/src/mem3_app.erl
@@ -0,0 +1,23 @@
+% Copyright 2010 Cloudant
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_app).
+-behaviour(application).
+-export([start/2, stop/1]).
+
+start(_Type, []) ->
+ mem3_sup:start_link().
+
+stop([]) ->
+ ok.
diff --git a/deps/mem3/src/mem3_cache.erl b/deps/mem3/src/mem3_cache.erl
new file mode 100644
index 00000000..84686b91
--- /dev/null
+++ b/deps/mem3/src/mem3_cache.erl
@@ -0,0 +1,118 @@
+% Copyright 2010 Cloudant
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_cache).
+-behaviour(gen_server).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3]).
+
+-export([start_link/0]).
+
+-record(state, {changes_pid}).
+
+-include("mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+init([]) ->
+ ets:new(partitions, [bag, public, named_table, {keypos,#shard.dbname}]),
+ {Pid, _} = spawn_monitor(fun() -> listen_for_changes(0) end),
+ {ok, #state{changes_pid = Pid}}.
+
+handle_call(_Call, _From, State) ->
+ {noreply, State}.
+
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+handle_info({'DOWN', _, _, Pid, {badarg, [{ets,delete,[partitions,_]}|_]}},
+ #state{changes_pid=Pid} = State) ->
+ % fatal error, somebody deleted our ets table
+ {stop, ets_table_error, State};
+handle_info({'DOWN', _, _, Pid, Reason}, #state{changes_pid=Pid} = State) ->
+ twig:log(notice, "~p changes listener died ~p", [?MODULE, Reason]),
+ Seq = case Reason of {seq, EndSeq} -> EndSeq; _ -> 0 end,
+ erlang:send_after(5000, self(), {start_listener, Seq}),
+ {noreply, State};
+handle_info({start_listener, Seq}, State) ->
+ {NewPid, _} = spawn_monitor(fun() -> listen_for_changes(Seq) end),
+ {noreply, State#state{changes_pid=NewPid}};
+handle_info(_Msg, State) ->
+ {noreply, State}.
+
+terminate(_Reason, #state{changes_pid=Pid}) ->
+ exit(Pid, kill),
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%% internal functions
+
+listen_for_changes(Since) ->
+ DbName = couch_config:get("mem3", "shard_db", "dbs"),
+ {ok, Db} = mem3_util:ensure_exists(DbName),
+ Args = #changes_args{
+ feed = "continuous",
+ since = Since,
+ heartbeat = true,
+ include_docs = true
+ },
+ ChangesFun = couch_changes:handle_changes(Args, nil, Db),
+ ChangesFun(fun changes_callback/2).
+
+changes_callback(start, _) ->
+ {ok, nil};
+changes_callback({stop, EndSeq}, _) ->
+ exit({seq, EndSeq});
+changes_callback({change, {Change}, _}, _) ->
+ DbName = couch_util:get_value(<<"id">>, Change),
+ case DbName of <<"_design/", _/binary>> -> ok; _Else ->
+ case couch_util:get_value(<<"deleted">>, Change, false) of
+ true ->
+ ets:delete(partitions, DbName);
+ false ->
+ case couch_util:get_value(doc, Change) of
+ {error, Reason} ->
+ twig:log(error, "missing partition table for ~s: ~p", [DbName, Reason]);
+ {Doc} ->
+ ets:delete(partitions, DbName),
+ Shards = mem3_util:build_shards(DbName, Doc),
+ ets:insert(partitions, Shards),
+ [create_if_missing(Name) || #shard{name=Name, node=Node}
+ <- Shards, Node =:= node()]
+ end
+ end
+ end,
+ {ok, couch_util:get_value(<<"seq">>, Change)};
+changes_callback(timeout, _) ->
+ {ok, nil}.
+
+create_if_missing(Name) ->
+ DbDir = couch_config:get("couchdb", "database_dir"),
+ Filename = filename:join(DbDir, ?b2l(Name) ++ ".couch"),
+ case filelib:is_regular(Filename) of
+ true ->
+ ok;
+ false ->
+ Options = [{user_ctx, #user_ctx{roles=[<<"_admin">>]}}],
+ case couch_server:create(Name, Options) of
+ {ok, Db} ->
+ couch_db:close(Db);
+ Error ->
+ twig:log(error, "~p tried to create ~s, got ~p", [?MODULE, Name, Error])
+ end
+ end.
diff --git a/deps/mem3/src/mem3_httpd.erl b/deps/mem3/src/mem3_httpd.erl
new file mode 100644
index 00000000..716080f8
--- /dev/null
+++ b/deps/mem3/src/mem3_httpd.erl
@@ -0,0 +1,53 @@
+% Copyright 2010 Cloudant
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_httpd).
+
+-export([handle_membership_req/1]).
+
+%% includes
+-include("mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+
+handle_membership_req(#httpd{method='GET',
+ path_parts=[<<"_membership">>]} = Req) ->
+ ClusterNodes = try mem3:nodes()
+ catch _:_ -> {ok,[]} end,
+ couch_httpd:send_json(Req, {[
+ {all_nodes, lists:sort([node()|nodes()])},
+ {cluster_nodes, lists:sort(ClusterNodes)}
+ ]});
+handle_membership_req(#httpd{method='GET',
+ path_parts=[<<"_membership">>, <<"parts">>, DbName]} = Req) ->
+ ClusterNodes = try mem3:nodes()
+ catch _:_ -> {ok,[]} end,
+ Shards = mem3:shards(DbName),
+ JsonShards = json_shards(Shards, dict:new()),
+ couch_httpd:send_json(Req, {[
+ {all_nodes, lists:sort([node()|nodes()])},
+ {cluster_nodes, lists:sort(ClusterNodes)},
+ {partitions, JsonShards}
+ ]}).
+
+%%
+%% internal
+%%
+
+json_shards([], AccIn) ->
+ List = dict:to_list(AccIn),
+ {lists:sort(List)};
+json_shards([#shard{node=Node, range=[B,_E]} | Rest], AccIn) ->
+ HexBeg = couch_util:to_hex(<<B:32/integer>>),
+ json_shards(Rest, dict:append(HexBeg, Node, AccIn)).
diff --git a/deps/mem3/src/mem3_nodes.erl b/deps/mem3/src/mem3_nodes.erl
new file mode 100644
index 00000000..0be66462
--- /dev/null
+++ b/deps/mem3/src/mem3_nodes.erl
@@ -0,0 +1,136 @@
+% Copyright 2010 Cloudant
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_nodes).
+-behaviour(gen_server).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3]).
+
+-export([start_link/0, get_nodelist/0, get_node_info/2]).
+
+-include("mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+-record(state, {changes_pid, update_seq, nodes}).
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+get_nodelist() ->
+ gen_server:call(?MODULE, get_nodelist).
+
+get_node_info(Node, Key) ->
+ gen_server:call(?MODULE, {get_node_info, Node, Key}).
+
+init([]) ->
+ {Nodes, UpdateSeq} = initialize_nodelist(),
+ {Pid, _} = spawn_monitor(fun() -> listen_for_changes(UpdateSeq) end),
+ {ok, #state{changes_pid = Pid, update_seq = UpdateSeq, nodes = Nodes}}.
+
+handle_call(get_nodelist, _From, State) ->
+ {reply, lists:sort(dict:fetch_keys(State#state.nodes)), State};
+handle_call({get_node_info, Node, Key}, _From, State) ->
+ case dict:find(Node, State#state.nodes) of
+ {ok, NodeInfo} ->
+ {reply, couch_util:get_value(Key, NodeInfo), State};
+ error ->
+ {reply, error, State}
+ end;
+handle_call({add_node, Node, NodeInfo}, _From, #state{nodes=Nodes} = State) ->
+ gen_event:notify(mem3_events, {add_node, Node}),
+ {reply, ok, State#state{nodes = dict:store(Node, NodeInfo, Nodes)}};
+handle_call({remove_node, Node}, _From, #state{nodes=Nodes} = State) ->
+ gen_event:notify(mem3_events, {remove_node, Node}),
+ {reply, ok, State#state{nodes = dict:erase(Node, Nodes)}};
+handle_call(_Call, _From, State) ->
+ {noreply, State}.
+
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+handle_info({'DOWN', _, _, Pid, Reason}, #state{changes_pid=Pid} = State) ->
+ twig:log(notice, "~p changes listener died ~p", [?MODULE, Reason]),
+ StartSeq = State#state.update_seq,
+ Seq = case Reason of {seq, EndSeq} -> EndSeq; _ -> StartSeq end,
+ erlang:send_after(5000, self(), start_listener),
+ {noreply, State#state{update_seq = Seq}};
+handle_info(start_listener, #state{update_seq = Seq} = State) ->
+ {NewPid, _} = spawn_monitor(fun() -> listen_for_changes(Seq) end),
+ {noreply, State#state{changes_pid=NewPid}};
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%% internal functions
+
+initialize_nodelist() ->
+ DbName = couch_config:get("mem3", "node_db", "nodes"),
+ {ok, Db} = mem3_util:ensure_exists(DbName),
+ {ok, _, {_, Nodes0}} = couch_btree:fold(Db#db.id_tree, fun first_fold/3,
+ {Db, dict:new()}, []),
+ % add self if not already present
+ case dict:find(node(), Nodes0) of
+ {ok, _} ->
+ Nodes = Nodes0;
+ error ->
+ Doc = #doc{id = couch_util:to_binary(node())},
+ {ok, _} = couch_db:update_doc(Db, Doc, []),
+ Nodes = dict:store(node(), [], Nodes0)
+ end,
+ couch_db:close(Db),
+ {Nodes, Db#db.update_seq}.
+
+first_fold(#full_doc_info{id = <<"_design/", _/binary>>}, _, Acc) ->
+ {ok, Acc};
+first_fold(#full_doc_info{deleted=true}, _, Acc) ->
+ {ok, Acc};
+first_fold(#full_doc_info{id=Id}=DocInfo, _, {Db, Dict}) ->
+ {ok, #doc{body={Props}}} = couch_db:open_doc(Db, DocInfo),
+ {ok, {Db, dict:store(mem3_util:to_atom(Id), Props, Dict)}}.
+
+listen_for_changes(Since) ->
+ DbName = couch_config:get("mem3", "node_db", "nodes"),
+ {ok, Db} = mem3_util:ensure_exists(DbName),
+ Args = #changes_args{
+ feed = "continuous",
+ since = Since,
+ heartbeat = true,
+ include_docs = true
+ },
+ ChangesFun = couch_changes:handle_changes(Args, nil, Db),
+ ChangesFun(fun changes_callback/2).
+
+changes_callback(start, _) ->
+ {ok, nil};
+changes_callback({stop, EndSeq}, _) ->
+ exit({seq, EndSeq});
+changes_callback({change, {Change}, _}, _) ->
+ Node = couch_util:get_value(<<"id">>, Change),
+ case Node of <<"_design/", _/binary>> -> ok; _ ->
+ case couch_util:get_value(<<"deleted">>, Change, false) of
+ false ->
+ {Props} = couch_util:get_value(doc, Change),
+ gen_server:call(?MODULE, {add_node, mem3_util:to_atom(Node), Props});
+ true ->
+ gen_server:call(?MODULE, {remove_node, mem3_util:to_atom(Node)})
+ end
+ end,
+ {ok, couch_util:get_value(<<"seq">>, Change)};
+changes_callback(timeout, _) ->
+ {ok, nil}.
diff --git a/deps/mem3/src/mem3_rep.erl b/deps/mem3/src/mem3_rep.erl
new file mode 100644
index 00000000..b68973c5
--- /dev/null
+++ b/deps/mem3/src/mem3_rep.erl
@@ -0,0 +1,144 @@
+-module(mem3_rep).
+
+-export([go/2, changes_enumerator/3, make_local_id/2]).
+
+-include("mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+-define(CTX, #user_ctx{roles = [<<"_admin">>]}).
+
+-record(acc, {revcount = 0, infos = [], seq, localid, source, target}).
+
+go(DbName, Node) when is_binary(DbName), is_atom(Node) ->
+ go(#shard{name=DbName, node=node()}, #shard{name=DbName, node=Node});
+
+go(#shard{} = Source, #shard{} = Target) ->
+ LocalId = make_local_id(Source, Target),
+ case couch_db:open(Source#shard.name, [{user_ctx,?CTX}]) of
+ {ok, Db} ->
+ try
+ go(Db, Target, LocalId)
+ catch error:{not_found, no_db_file} ->
+ {error, missing_target}
+ after
+ couch_db:close(Db)
+ end;
+ {not_found, no_db_file} ->
+ {error, missing_source}
+ end.
+
+go(#db{name = DbName, seq_tree = Bt} = Db, #shard{} = Target, LocalId) ->
+ erlang:put(io_priority, {internal_repl, DbName}),
+ Seq = calculate_start_seq(Db, Target, LocalId),
+ Acc0 = #acc{source=Db, target=Target, seq=Seq, localid=LocalId},
+ Fun = fun ?MODULE:changes_enumerator/3,
+ {ok, _, AccOut} = couch_btree:fold(Bt, Fun, Acc0, [{start_key, Seq + 1}]),
+ {ok, #acc{seq = LastSeq}} = replicate_batch(AccOut),
+ case couch_db:count_changes_since(Db, LastSeq) of
+ 0 ->
+ ok;
+ N ->
+ exit({pending_changes, N})
+ end.
+
+make_local_id(#shard{node=SourceNode}, #shard{node=TargetNode}) ->
+ S = couch_util:encodeBase64Url(couch_util:md5(term_to_binary(SourceNode))),
+ T = couch_util:encodeBase64Url(couch_util:md5(term_to_binary(TargetNode))),
+ <<"_local/shard-sync-", S/binary, "-", T/binary>>.
+
+changes_enumerator(FullDocInfo, _, #acc{revcount = C} = Acc) when C >= 99 ->
+ #doc_info{high_seq = Seq} = couch_doc:to_doc_info(FullDocInfo),
+ {stop, Acc#acc{seq = Seq, infos = [FullDocInfo | Acc#acc.infos]}};
+
+changes_enumerator(FullDocInfo, _, #acc{revcount = C, infos = Infos} = Acc) ->
+ #doc_info{high_seq = Seq, revs = Revs} = couch_doc:to_doc_info(FullDocInfo),
+ Count = C + length(Revs),
+ {ok, Acc#acc{seq = Seq, revcount = Count, infos = [FullDocInfo | Infos]}}.
+
+replicate_batch(#acc{target = #shard{node=Node, name=Name}} = Acc) ->
+ case find_missing_revs(Acc) of
+ [] ->
+ ok;
+ Missing ->
+ ok = save_on_target(Node, Name, open_docs(Acc, Missing))
+ end,
+ update_locals(Acc),
+ {ok, Acc#acc{revcount=0, infos=[]}}.
+
+find_missing_revs(Acc) ->
+ #acc{target = #shard{node=Node, name=Name}, infos = Infos} = Acc,
+ IdsRevs = lists:map(fun(FDI) ->
+ #doc_info{id=Id, revs=RevInfos} = couch_doc:to_doc_info(FDI),
+ {Id, [R || #rev_info{rev=R} <- RevInfos]}
+ end, Infos),
+ Options = [{io_priority, {internal_repl, Name}}, {user_ctx, ?CTX}],
+ rexi_call(Node, {fabric_rpc, get_missing_revs, [Name, IdsRevs, Options]}).
+
+open_docs(#acc{source=Source, infos=Infos}, Missing) ->
+ lists:flatmap(fun({Id, Revs, _}) ->
+ FDI = lists:keyfind(Id, #full_doc_info.id, Infos),
+ open_doc_revs(Source, FDI, Revs)
+ end, Missing).
+
+save_on_target(Node, Name, Docs) ->
+ Options = [replicated_changes, full_commit, {user_ctx, ?CTX},
+ {io_priority, {internal_repl, Name}}],
+ rexi_call(Node, {fabric_rpc, update_docs, [Name, Docs, Options]}),
+ ok.
+
+update_locals(Acc) ->
+ #acc{seq=Seq, source=Db, target=Target, localid=Id} = Acc,
+ #shard{name=Name, node=Node} = Target,
+ Doc = #doc{id = Id, body = {[
+ {<<"seq">>, Seq},
+ {<<"node">>, list_to_binary(atom_to_list(Node))},
+ {<<"timestamp">>, list_to_binary(iso8601_timestamp())}
+ ]}},
+ {ok, _} = couch_db:update_doc(Db, Doc, []),
+ Options = [{user_ctx, ?CTX}, {io_priority, {internal_repl, Name}}],
+ rexi_call(Node, {fabric_rpc, update_docs, [Name, [Doc], Options]}).
+
+rexi_call(Node, MFA) ->
+ Mon = rexi_monitor:start([{rexi_server, Node}]),
+ Ref = rexi:cast(Node, MFA),
+ try
+ receive {Ref, {ok, Reply}} ->
+ Reply;
+ {Ref, Error} ->
+ erlang:error(Error);
+ {rexi_DOWN, Mon, _, Reason} ->
+ erlang:error({rexi_DOWN, Reason})
+ after 600000 ->
+ erlang:error(timeout)
+ end
+ after
+ rexi_monitor:stop(Mon)
+ end.
+
+calculate_start_seq(Db, #shard{node=Node, name=Name}, LocalId) ->
+ case couch_db:open_doc(Db, LocalId, []) of
+ {ok, #doc{body = {SProps}}} ->
+ Opts = [{user_ctx, ?CTX}, {io_priority, {internal_repl, Name}}],
+ try rexi_call(Node, {fabric_rpc, open_doc, [Name, LocalId, Opts]}) of
+ #doc{body = {TProps}} ->
+ SourceSeq = couch_util:get_value(<<"seq">>, SProps, 0),
+ TargetSeq = couch_util:get_value(<<"seq">>, TProps, 0),
+ erlang:min(SourceSeq, TargetSeq)
+ catch error:{not_found, _} ->
+ 0
+ end;
+ {not_found, _} ->
+ 0
+ end.
+
+open_doc_revs(Db, #full_doc_info{id=Id, rev_tree=RevTree}, Revs) ->
+ {FoundRevs, _} = couch_key_tree:get_key_leafs(RevTree, Revs),
+ lists:map(fun({#leaf{deleted=IsDel, ptr=SummaryPtr}, FoundRevPath}) ->
+ couch_db:make_doc(Db, Id, IsDel, SummaryPtr, FoundRevPath)
+ end, FoundRevs).
+
+iso8601_timestamp() ->
+ {_,_,Micro} = Now = os:timestamp(),
+ {{Year,Month,Date},{Hour,Minute,Second}} = calendar:now_to_datetime(Now),
+ Format = "~4.10.0B-~2.10.0B-~2.10.0BT~2.10.0B:~2.10.0B:~2.10.0B.~6.10.0BZ",
+ io_lib:format(Format, [Year, Month, Date, Hour, Minute, Second, Micro]).
diff --git a/deps/mem3/src/mem3_rep_manager.erl b/deps/mem3/src/mem3_rep_manager.erl
new file mode 100644
index 00000000..7b98701d
--- /dev/null
+++ b/deps/mem3/src/mem3_rep_manager.erl
@@ -0,0 +1,627 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+-module(mem3_rep_manager).
+-behaviour(gen_server).
+
+% public API
+-export([start_link/0, config_change/3]).
+-export([replication_started/1, replication_completed/1, replication_error/2]).
+
+% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3]).
+
+-include("mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch/include/couch_js_functions.hrl").
+
+-define(DOC_TO_REP, mem3_rep_doc_id_to_rep_id).
+-define(REP_TO_STATE, mem3_rep_id_to_rep_state).
+-define(DB_TO_SEQ, mem3_db_to_seq).
+-define(INITIAL_WAIT, 2.5). % seconds
+-define(MAX_WAIT, 600). % seconds
+-define(CTX, {user_ctx, #user_ctx{roles=[<<"_admin">>, <<"_replicator">>]}}).
+
+-record(state, {
+ db_notifier = nil,
+ max_retries,
+ scan_pid = nil,
+ rep_start_pids = []
+}).
+
+-record(rep_state, {
+ dbname,
+ doc_id,
+ user_ctx,
+ doc,
+ starting,
+ retries_left,
+ max_retries,
+ wait = ?INITIAL_WAIT
+}).
+
+-import(couch_util, [
+ get_value/2,
+ get_value/3,
+ to_binary/1
+]).
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+replication_started({BaseId, _} = RepId) ->
+ case rep_state(RepId) of
+ nil ->
+ ok;
+ #rep_state{dbname = DbName, doc_id = DocId} ->
+ update_rep_doc(DbName, DocId, [
+ {<<"_replication_state">>, <<"triggered">>},
+ {<<"_replication_id">>, ?l2b(BaseId)}]),
+ ok = gen_server:call(?MODULE, {rep_started, RepId}, infinity),
+ twig:log(notice, "Document `~s` triggered replication `~s`",
+ [DocId, pp_rep_id(RepId)])
+ end.
+
+
+replication_completed(RepId) ->
+ case rep_state(RepId) of
+ nil ->
+ ok;
+ #rep_state{dbname = DbName, doc_id = DocId} ->
+ update_rep_doc(DbName, DocId, [{<<"_replication_state">>, <<"completed">>}]),
+ ok = gen_server:call(?MODULE, {rep_complete, RepId}, infinity),
+ twig:log(notice, "Replication `~s` finished (triggered by document `~s`)",
+ [pp_rep_id(RepId), DocId])
+ end.
+
+
+replication_error({BaseId, _} = RepId, Error) ->
+ case rep_state(RepId) of
+ nil ->
+ ok;
+ #rep_state{dbname = DbName, doc_id = DocId} ->
+ % TODO: maybe add error reason to replication document
+ update_rep_doc(DbName, DocId, [
+ {<<"_replication_state">>, <<"error">>},
+ {<<"_replication_id">>, ?l2b(BaseId)}]),
+ ok = gen_server:call(?MODULE, {rep_error, RepId, Error}, infinity)
+ end.
+
+init(_) ->
+ process_flag(trap_exit, true),
+ net_kernel:monitor_nodes(true),
+ ?DOC_TO_REP = ets:new(?DOC_TO_REP, [named_table, set, protected]),
+ ?REP_TO_STATE = ets:new(?REP_TO_STATE, [named_table, set, protected]),
+ ?DB_TO_SEQ = ets:new(?DB_TO_SEQ, [named_table, set, protected]),
+ Server = self(),
+ ok = couch_config:register(fun ?MODULE:config_change/3, Server),
+ NotifierPid = db_update_notifier(),
+ ScanPid = spawn_link(fun() -> scan_all_dbs(Server) end),
+ {ok, #state{
+ db_notifier = NotifierPid,
+ scan_pid = ScanPid,
+ max_retries = retries_value(
+ couch_config:get("replicator", "max_replication_retry_count", "10"))
+ }}.
+
+config_change("replicator", "max_replication_retry_count", V) ->
+ ok = gen_server:cast(?MODULE, {set_max_retries, retries_value(V)}).
+
+handle_call({rep_db_update, DbName, {ChangeProps} = Change}, _From, State) ->
+ NewState = try
+ process_update(State, DbName, Change)
+ catch
+ _Tag:Error ->
+ {RepProps} = get_value(doc, ChangeProps),
+ DocId = get_value(<<"_id">>, RepProps),
+ rep_db_update_error(Error, DbName, DocId),
+ State
+ end,
+ {reply, ok, NewState};
+
+handle_call({rep_started, RepId}, _From, State) ->
+ case rep_state(RepId) of
+ nil ->
+ ok;
+ RepState ->
+ NewRepState = RepState#rep_state{
+ starting = false,
+ retries_left = State#state.max_retries,
+ max_retries = State#state.max_retries,
+ wait = ?INITIAL_WAIT
+ },
+ true = ets:insert(?REP_TO_STATE, {RepId, NewRepState})
+ end,
+ {reply, ok, State};
+
+handle_call({rep_complete, RepId}, _From, State) ->
+ true = ets:delete(?REP_TO_STATE, RepId),
+ {reply, ok, State};
+
+handle_call({rep_error, RepId, Error}, _From, State) ->
+ {reply, ok, replication_error(State, RepId, Error)};
+
+handle_call({resume_scan, DbName}, _From, State) ->
+ Since = case ets:lookup(?DB_TO_SEQ, DbName) of
+ [] -> 0;
+ [{DbName, EndSeq}] -> EndSeq
+ end,
+ Pid = changes_feed_loop(DbName, Since),
+ twig:log(debug, "Scanning ~s from update_seq ~p", [DbName, Since]),
+ {reply, ok, State#state{rep_start_pids = [Pid | State#state.rep_start_pids]}};
+
+handle_call({rep_db_checkpoint, DbName, EndSeq}, _From, State) ->
+ true = ets:insert(?DB_TO_SEQ, {DbName, EndSeq}),
+ {reply, ok, State};
+
+handle_call(Msg, From, State) ->
+ twig:log(error, "Replication manager received unexpected call ~p from ~p",
+ [Msg, From]),
+ {stop, {error, {unexpected_call, Msg}}, State}.
+
+
+handle_cast({set_max_retries, MaxRetries}, State) ->
+ {noreply, State#state{max_retries = MaxRetries}};
+
+handle_cast(Msg, State) ->
+ twig:log(error, "Replication manager received unexpected cast ~p", [Msg]),
+ {stop, {error, {unexpected_cast, Msg}}, State}.
+
+handle_info({nodeup, _Node}, State) ->
+ {noreply, rescan(State)};
+
+handle_info({nodedown, _Node}, State) ->
+ {noreply, rescan(State)};
+
+handle_info({'EXIT', From, normal}, #state{scan_pid = From} = State) ->
+ twig:log(debug, "Background scan has completed.", []),
+ {noreply, State#state{scan_pid=nil}};
+
+handle_info({'EXIT', From, Reason}, #state{scan_pid = From} = State) ->
+ twig:log(error, "Background scanner died. Reason: ~p", [Reason]),
+ {stop, {scanner_died, Reason}, State};
+
+handle_info({'EXIT', From, Reason}, #state{db_notifier = From} = State) ->
+ twig:log(error, "Database update notifier died. Reason: ~p", [Reason]),
+ {stop, {db_update_notifier_died, Reason}, State};
+
+handle_info({'EXIT', From, normal}, #state{rep_start_pids = Pids} = State) ->
+ % one of the replication start processes terminated successfully
+ {noreply, State#state{rep_start_pids = Pids -- [From]}};
+
+handle_info({'DOWN', _Ref, _, _, _}, State) ->
+ % From a db monitor created by a replication process. Ignore.
+ {noreply, State};
+
+handle_info(Msg, State) ->
+ twig:log(error, "Replication manager received unexpected message ~p", [Msg]),
+ {stop, {unexpected_msg, Msg}, State}.
+
+terminate(_Reason, State) ->
+ #state{
+ scan_pid = ScanPid,
+ rep_start_pids = StartPids,
+ db_notifier = DbNotifier
+ } = State,
+ stop_all_replications(),
+ lists:foreach(
+ fun(Pid) ->
+ catch unlink(Pid),
+ catch exit(Pid, stop)
+ end,
+ [ScanPid | StartPids]),
+ true = ets:delete(?REP_TO_STATE),
+ true = ets:delete(?DOC_TO_REP),
+ true = ets:delete(?DB_TO_SEQ),
+ couch_db_update_notifier:stop(DbNotifier).
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+changes_feed_loop(DbName, Since) ->
+ Server = self(),
+ Pid = spawn_link(
+ fun() ->
+ fabric:changes(DbName, fun
+ ({change, Change}, Acc) ->
+ case has_valid_rep_id(Change) of
+ true ->
+ ok = gen_server:call(
+ Server, {rep_db_update, DbName, Change}, infinity);
+ false ->
+ ok
+ end,
+ {ok, Acc};
+ ({stop, EndSeq}, Acc) ->
+ ok = gen_server:call(Server, {rep_db_checkpoint, DbName, EndSeq}),
+ {ok, Acc};
+ (_, Acc) ->
+ {ok, Acc}
+ end,
+ nil,
+ #changes_args{
+ include_docs = true,
+ feed = "normal",
+ since = Since,
+ filter = main_only,
+ timeout = infinity,
+ db_open_options = [sys_db]
+ }
+ )
+ end),
+ Pid.
+
+has_valid_rep_id({Change}) ->
+ has_valid_rep_id(get_value(<<"id">>, Change));
+has_valid_rep_id(<<?DESIGN_DOC_PREFIX, _Rest/binary>>) ->
+ false;
+has_valid_rep_id(_Else) ->
+ true.
+
+
+db_update_notifier() ->
+ Server = self(),
+ IsReplicatorDbFun = is_replicator_db_fun(),
+ {ok, Notifier} = couch_db_update_notifier:start_link(
+ fun({_, DbName}) ->
+ case IsReplicatorDbFun(DbName) of
+ true ->
+ ok = gen_server:call(Server, {resume_scan, mem3:dbname(DbName)});
+ _ ->
+ ok
+ end
+ end
+ ),
+ Notifier.
+
+rescan(#state{scan_pid = nil} = State) ->
+ true = ets:delete_all_objects(?DB_TO_SEQ),
+ Server = self(),
+ NewScanPid = spawn_link(fun() -> scan_all_dbs(Server) end),
+ State#state{scan_pid = NewScanPid};
+rescan(#state{scan_pid = ScanPid} = State) ->
+ unlink(ScanPid),
+ exit(ScanPid, exit),
+ rescan(State#state{scan_pid = nil}).
+
+process_update(State, DbName, {Change}) ->
+ {RepProps} = JsonRepDoc = get_value(doc, Change),
+ DocId = get_value(<<"_id">>, RepProps),
+ case {owner(DbName, DocId), get_value(deleted, Change, false)} of
+ {false, _} ->
+ replication_complete(DocId),
+ State;
+ {true, true} ->
+ rep_doc_deleted(DocId),
+ State;
+ {true, false} ->
+ case get_value(<<"_replication_state">>, RepProps) of
+ undefined ->
+ maybe_start_replication(State, DbName, DocId, JsonRepDoc);
+ <<"triggered">> ->
+ maybe_start_replication(State, DbName, DocId, JsonRepDoc);
+ <<"completed">> ->
+ replication_complete(DocId),
+ State;
+ <<"error">> ->
+ case ets:lookup(?DOC_TO_REP, DocId) of
+ [] ->
+ maybe_start_replication(State, DbName, DocId, JsonRepDoc);
+ _ ->
+ State
+ end
+ end
+ end.
+
+
+rep_db_update_error(Error, DbName, DocId) ->
+ case Error of
+ {bad_rep_doc, Reason} ->
+ ok;
+ _ ->
+ Reason = to_binary(Error)
+ end,
+ twig:log(error, "Replication manager, error processing document `~s`: ~s",
+ [DocId, Reason]),
+ update_rep_doc(DbName, DocId, [{<<"_replication_state">>, <<"error">>}]).
+
+
+rep_user_ctx({RepDoc}) ->
+ case get_value(<<"user_ctx">>, RepDoc) of
+ undefined ->
+ #user_ctx{};
+ {UserCtx} ->
+ #user_ctx{
+ name = get_value(<<"name">>, UserCtx, null),
+ roles = get_value(<<"roles">>, UserCtx, [])
+ }
+ end.
+
+
+maybe_start_replication(State, DbName, DocId, RepDoc) ->
+ UserCtx = rep_user_ctx(RepDoc),
+ {BaseId, _} = RepId = make_rep_id(RepDoc, UserCtx),
+ case rep_state(RepId) of
+ nil ->
+ RepState = #rep_state{
+ dbname = DbName,
+ doc_id = DocId,
+ user_ctx = UserCtx,
+ doc = RepDoc,
+ starting = true,
+ retries_left = State#state.max_retries,
+ max_retries = State#state.max_retries
+ },
+ true = ets:insert(?REP_TO_STATE, {RepId, RepState}),
+ true = ets:insert(?DOC_TO_REP, {DocId, RepId}),
+ twig:log(notice, "Attempting to start replication `~s` (document `~s`).",
+ [pp_rep_id(RepId), DocId]),
+ Server = self(),
+ Pid = spawn_link(fun() ->
+ start_replication(Server, RepDoc, RepId, UserCtx, 0)
+ end),
+ State#state{rep_start_pids = [Pid | State#state.rep_start_pids]};
+ #rep_state{doc_id = DocId} ->
+ State;
+ #rep_state{starting = false, dbname = DbName, doc_id = OtherDocId} ->
+ twig:log(notice, "The replication specified by the document `~s` was already"
+ " triggered by the document `~s`", [DocId, OtherDocId]),
+ maybe_tag_rep_doc(DbName, DocId, RepDoc, ?l2b(BaseId)),
+ State;
+ #rep_state{starting = true, dbname = DbName, doc_id = OtherDocId} ->
+ twig:log(notice, "The replication specified by the document `~s` is already"
+ " being triggered by the document `~s`", [DocId, OtherDocId]),
+ maybe_tag_rep_doc(DbName, DocId, RepDoc, ?l2b(BaseId)),
+ State
+ end.
+
+
+make_rep_id(RepDoc, UserCtx) ->
+ try
+ couch_rep:make_replication_id(RepDoc, UserCtx)
+ catch
+ throw:{error, Reason} ->
+ throw({bad_rep_doc, Reason});
+ Tag:Err ->
+ throw({bad_rep_doc, to_binary({Tag, Err})})
+ end.
+
+
+maybe_tag_rep_doc(DbName, DocId, {RepProps}, RepId) ->
+ case get_value(<<"_replication_id">>, RepProps) of
+ RepId ->
+ ok;
+ _ ->
+ update_rep_doc(DbName, DocId, [{<<"_replication_id">>, RepId}])
+ end.
+
+
+start_replication(Server, RepDoc, RepId, UserCtx, Wait) ->
+ ok = timer:sleep(Wait * 1000),
+ case (catch couch_rep:start_replication(RepDoc, RepId, UserCtx, ?MODULE)) of
+ Pid when is_pid(Pid) ->
+ ok = gen_server:call(Server, {rep_started, RepId}, infinity),
+ couch_rep:get_result(Pid, RepId, RepDoc, UserCtx);
+ Error ->
+ replication_error(RepId, Error)
+ end.
+
+
+replication_complete(DocId) ->
+ case ets:lookup(?DOC_TO_REP, DocId) of
+ [{DocId, RepId}] ->
+ case rep_state(RepId) of
+ nil ->
+ couch_rep:end_replication(RepId);
+ #rep_state{} ->
+ ok
+ end,
+ true = ets:delete(?DOC_TO_REP, DocId);
+ _ ->
+ ok
+ end.
+
+
+rep_doc_deleted(DocId) ->
+ case ets:lookup(?DOC_TO_REP, DocId) of
+ [{DocId, RepId}] ->
+ couch_rep:end_replication(RepId),
+ true = ets:delete(?REP_TO_STATE, RepId),
+ true = ets:delete(?DOC_TO_REP, DocId),
+ twig:log(notice, "Stopped replication `~s` because replication document `~s`"
+ " was deleted", [pp_rep_id(RepId), DocId]);
+ [] ->
+ ok
+ end.
+
+
+replication_error(State, RepId, Error) ->
+ case rep_state(RepId) of
+ nil ->
+ State;
+ RepState ->
+ maybe_retry_replication(RepId, RepState, Error, State)
+ end.
+
+maybe_retry_replication(RepId, #rep_state{retries_left = 0} = RepState, Error, State) ->
+ #rep_state{
+ doc_id = DocId,
+ max_retries = MaxRetries
+ } = RepState,
+ couch_rep:end_replication(RepId),
+ true = ets:delete(?REP_TO_STATE, RepId),
+ true = ets:delete(?DOC_TO_REP, DocId),
+ twig:log(error, "Error in replication `~s` (triggered by document `~s`): ~s"
+ "~nReached maximum retry attempts (~p).",
+ [pp_rep_id(RepId), DocId, to_binary(error_reason(Error)), MaxRetries]),
+ State;
+
+maybe_retry_replication(RepId, RepState, Error, State) ->
+ #rep_state{
+ doc_id = DocId,
+ user_ctx = UserCtx,
+ doc = RepDoc
+ } = RepState,
+ #rep_state{wait = Wait} = NewRepState = state_after_error(RepState),
+ true = ets:insert(?REP_TO_STATE, {RepId, NewRepState}),
+ twig:log(error, "Error in replication `~s` (triggered by document `~s`): ~s"
+ "~nRestarting replication in ~p seconds.",
+ [pp_rep_id(RepId), DocId, to_binary(error_reason(Error)), Wait]),
+ Server = self(),
+ Pid = spawn_link(fun() ->
+ start_replication(Server, RepDoc, RepId, UserCtx, Wait)
+ end),
+ State#state{rep_start_pids = [Pid | State#state.rep_start_pids]}.
+
+stop_all_replications() ->
+ twig:log(notice, "Stopping all ongoing replications.", []),
+ ets:foldl(
+ fun({_, RepId}, _) ->
+ couch_rep:end_replication(RepId)
+ end,
+ ok, ?DOC_TO_REP),
+ true = ets:delete_all_objects(?REP_TO_STATE),
+ true = ets:delete_all_objects(?DOC_TO_REP),
+ true = ets:delete_all_objects(?DB_TO_SEQ).
+
+update_rep_doc(RepDbName, RepDocId, KVs) when is_binary(RepDocId) ->
+ spawn_link(fun() ->
+ try
+ case fabric:open_doc(mem3:dbname(RepDbName), RepDocId, []) of
+ {ok, LatestRepDoc} ->
+ update_rep_doc(RepDbName, LatestRepDoc, KVs);
+ _ ->
+ ok
+ end
+ catch throw:conflict ->
+ % Shouldn't happen, as by default only the role _replicator can
+ % update replication documents.
+ twig:log(error, "Conflict error when updating replication document `~s`."
+ " Retrying.", [RepDocId]),
+ ok = timer:sleep(5),
+ update_rep_doc(RepDbName, RepDocId, KVs)
+ end
+ end);
+
+update_rep_doc(RepDbName, #doc{body = {RepDocBody}} = RepDoc, KVs) ->
+ NewRepDocBody = lists:foldl(
+ fun({<<"_replication_state">> = K, State} = KV, Body) ->
+ case get_value(K, Body) of
+ State ->
+ Body;
+ _ ->
+ Body1 = lists:keystore(K, 1, Body, KV),
+ lists:keystore(
+ <<"_replication_state_time">>, 1, Body1,
+ {<<"_replication_state_time">>, timestamp()})
+ end;
+ ({K, _V} = KV, Body) ->
+ lists:keystore(K, 1, Body, KV)
+ end,
+ RepDocBody, KVs),
+ case NewRepDocBody of
+ RepDocBody ->
+ ok;
+ _ ->
+ % Might not succeed - when the replication doc is deleted right
+ % before this update (not an error, ignore).
+ spawn_link(fun() ->
+ fabric:update_doc(RepDbName, RepDoc#doc{body = {NewRepDocBody}}, [?CTX])
+ end)
+ end.
+
+
+% RFC3339 timestamps.
+% Note: doesn't include the time seconds fraction (RFC3339 says it's optional).
+timestamp() ->
+ {{Year, Month, Day}, {Hour, Min, Sec}} = calendar:now_to_local_time(now()),
+ UTime = erlang:universaltime(),
+ LocalTime = calendar:universal_time_to_local_time(UTime),
+ DiffSecs = calendar:datetime_to_gregorian_seconds(LocalTime) -
+ calendar:datetime_to_gregorian_seconds(UTime),
+ zone(DiffSecs div 3600, (DiffSecs rem 3600) div 60),
+ iolist_to_binary(
+ io_lib:format("~4..0w-~2..0w-~2..0wT~2..0w:~2..0w:~2..0w~s",
+ [Year, Month, Day, Hour, Min, Sec,
+ zone(DiffSecs div 3600, (DiffSecs rem 3600) div 60)])).
+
+zone(Hr, Min) when Hr >= 0, Min >= 0 ->
+ io_lib:format("+~2..0w:~2..0w", [Hr, Min]);
+zone(Hr, Min) ->
+ io_lib:format("-~2..0w:~2..0w", [abs(Hr), abs(Min)]).
+
+% pretty-print replication id
+pp_rep_id({Base, Extension}) ->
+ Base ++ Extension.
+
+
+rep_state(RepId) ->
+ case ets:lookup(?REP_TO_STATE, RepId) of
+ [{RepId, RepState}] ->
+ RepState;
+ [] ->
+ nil
+ end.
+
+
+error_reason({error, Reason}) ->
+ Reason;
+error_reason(Reason) ->
+ Reason.
+
+retries_value("infinity") ->
+ infinity;
+retries_value(Value) ->
+ list_to_integer(Value).
+
+state_after_error(#rep_state{retries_left = Left, wait = Wait} = State) ->
+ Wait2 = erlang:min(trunc(Wait * 2), ?MAX_WAIT),
+ case Left of
+ infinity ->
+ State#rep_state{wait = Wait2};
+ _ ->
+ State#rep_state{retries_left = Left - 1, wait = Wait2}
+ end.
+
+scan_all_dbs(Server) when is_pid(Server) ->
+ {ok, Db} = mem3_util:ensure_exists(
+ couch_config:get("mem3", "shard_db", "dbs")),
+ ChangesFun = couch_changes:handle_changes(#changes_args{}, nil, Db),
+ IsReplicatorDbFun = is_replicator_db_fun(),
+ ChangesFun(fun({change, {Change}, _}, _) ->
+ DbName = couch_util:get_value(<<"id">>, Change),
+ case DbName of <<"_design/", _/binary>> -> ok; _Else ->
+ case couch_util:get_value(<<"deleted">>, Change, false) of
+ true ->
+ ok;
+ false ->
+ IsReplicatorDbFun(DbName) andalso
+ gen_server:call(Server, {resume_scan, DbName})
+ end
+ end;
+ (_, _) -> ok
+ end),
+ couch_db:close(Db).
+
+is_replicator_db_fun() ->
+ {ok, RegExp} = re:compile("^([a-z][a-z0-9\\_\\$()\\+\\-\\/]*/)?_replicator$"),
+ fun(DbName) ->
+ match =:= re:run(mem3:dbname(DbName), RegExp, [{capture,none}])
+ end.
+
+owner(DbName, DocId) ->
+ Shards = mem3:shards(DbName, DocId),
+ Nodes = [node()|nodes()],
+ LiveShards = [S || #shard{node=Node} = S <- Shards, lists:member(Node, Nodes)],
+ [#shard{node=Node}] = lists:usort(fun(#shard{name=A}, #shard{name=B}) ->
+ A =< B end, LiveShards),
+ node() =:= Node.
diff --git a/deps/mem3/src/mem3_sup.erl b/deps/mem3/src/mem3_sup.erl
new file mode 100644
index 00000000..07b9498b
--- /dev/null
+++ b/deps/mem3/src/mem3_sup.erl
@@ -0,0 +1,36 @@
+% Copyright 2010 Cloudant
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_sup).
+-behaviour(supervisor).
+-export([start_link/0, init/1]).
+
+start_link() ->
+ supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+init(_Args) ->
+ Children = [
+ child(mem3_events),
+ child(mem3_cache),
+ child(mem3_nodes),
+ child(mem3_sync),
+ child(mem3_rep_manager)
+ ],
+ {ok, {{one_for_one,10,1}, Children}}.
+
+child(mem3_events) ->
+ MFA = {gen_event, start_link, [{local, mem3_events}]},
+ {mem3_events, MFA, permanent, 1000, worker, dynamic};
+child(Child) ->
+ {Child, {Child, start_link, []}, permanent, 1000, worker, [Child]}.
diff --git a/deps/mem3/src/mem3_sync.erl b/deps/mem3/src/mem3_sync.erl
new file mode 100644
index 00000000..191a98c6
--- /dev/null
+++ b/deps/mem3/src/mem3_sync.erl
@@ -0,0 +1,267 @@
+% Copyright 2010 Cloudant
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_sync).
+-behaviour(gen_server).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3]).
+
+-export([start_link/0, get_active/0, get_queue/0, push/1, push/2,
+ remove_node/1, initial_sync/1]).
+
+-include("mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+-record(state, {
+ active = [],
+ count = 0,
+ limit,
+ dict = dict:new(),
+ waiting = [],
+ update_notifier
+}).
+
+-record(job, {name, node, count=nil, pid=nil}).
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+get_active() ->
+ gen_server:call(?MODULE, get_active).
+
+get_queue() ->
+ gen_server:call(?MODULE, get_queue).
+
+push(#shard{name = Name}, Target) ->
+ push(Name, Target);
+push(Name, #shard{node=Node}) ->
+ push(Name, Node);
+push(Name, Node) ->
+ push(#job{name = Name, node = Node}).
+
+push(#job{node = Node} = Job) when Node =/= node() ->
+ gen_server:cast(?MODULE, {push, Job});
+push(_) ->
+ ok.
+
+remove_node(Node) ->
+ gen_server:cast(?MODULE, {remove_node, Node}).
+
+init([]) ->
+ process_flag(trap_exit, true),
+ Concurrency = couch_config:get("mem3", "sync_concurrency", "10"),
+ gen_event:add_handler(mem3_events, mem3_sync_event, []),
+ {ok, Pid} = start_update_notifier(),
+ spawn(fun initial_sync/0),
+ {ok, #state{limit = list_to_integer(Concurrency), update_notifier=Pid}}.
+
+handle_call(get_active, _From, State) ->
+ {reply, State#state.active, State};
+
+handle_call(get_queue, _From, State) ->
+ {reply, State#state.waiting, State};
+
+handle_call(get_backlog, _From, #state{active=A, waiting=W} = State) ->
+ CA = lists:sum([C || #job{count=C} <- A, is_integer(C)]),
+ CW = lists:sum([C || #job{count=C} <- W, is_integer(C)]),
+ {reply, CA+CW, State}.
+
+handle_cast({push, DbName, Node}, State) ->
+ handle_cast({push, #job{name = DbName, node = Node}}, State);
+
+handle_cast({push, Job}, #state{count=Count, limit=Limit} = State)
+ when Count >= Limit ->
+ {noreply, add_to_queue(State, Job)};
+
+handle_cast({push, Job}, State) ->
+ #state{active = L, count = C} = State,
+ #job{name = DbName, node = Node} = Job,
+ case is_running(DbName, Node, L) of
+ true ->
+ {noreply, add_to_queue(State, Job)};
+ false ->
+ Pid = start_push_replication(Job),
+ {noreply, State#state{active=[Job#job{pid=Pid}|L], count=C+1}}
+ end;
+
+handle_cast({remove_node, Node}, #state{waiting = W0} = State) ->
+ {Alive, Dead} = lists:partition(fun(#job{node=N}) -> N =/= Node end, W0),
+ Dict = remove_entries(State#state.dict, Dead),
+ [exit(Pid, die_now) || #job{node=N, pid=Pid} <- State#state.active,
+ N =:= Node],
+ {noreply, State#state{dict = Dict, waiting = Alive}};
+
+handle_cast({remove_shard, Shard}, #state{waiting = W0} = State) ->
+ {Alive, Dead} = lists:partition(fun(#job{name=S}) -> S =/= Shard end, W0),
+ Dict = remove_entries(State#state.dict, Dead),
+ [exit(Pid, die_now) || #job{name=S, pid=Pid} <- State#state.active,
+ S =:= Shard],
+ {noreply, State#state{dict = Dict, waiting = Alive}}.
+
+handle_info({'EXIT', Pid, _}, #state{update_notifier=Pid} = State) ->
+ {ok, NewPid} = start_update_notifier(),
+ {noreply, State#state{update_notifier=NewPid}};
+
+handle_info({'EXIT', Active, normal}, State) ->
+ handle_replication_exit(State, Active);
+
+handle_info({'EXIT', Active, die_now}, State) ->
+ % we forced this one ourselves, do not retry
+ handle_replication_exit(State, Active);
+
+handle_info({'EXIT', Active, {{not_found, no_db_file}, _Stack}}, State) ->
+ % target doesn't exist, do not retry
+ handle_replication_exit(State, Active);
+
+handle_info({'EXIT', Active, Reason}, State) ->
+ NewState = case lists:keyfind(Active, #job.pid, State#state.active) of
+ #job{name=OldDbName, node=OldNode} = Job ->
+ twig:log(warn, "~p ~s -> ~p ~p", [?MODULE, OldDbName, OldNode,
+ Reason]),
+ case Reason of {pending_changes, Count} ->
+ add_to_queue(State, Job#job{pid = nil, count = Count});
+ _ ->
+ timer:apply_after(5000, ?MODULE, push, [Job#job{pid=nil}]),
+ State
+ end;
+ false -> State end,
+ handle_replication_exit(NewState, Active);
+
+handle_info(Msg, State) ->
+ twig:log(notice, "unexpected msg at replication manager ~p", [Msg]),
+ {noreply, State}.
+
+terminate(_Reason, State) ->
+ [exit(Pid, shutdown) || #job{pid=Pid} <- State#state.active],
+ ok.
+
+code_change(_, #state{waiting = [{_,_}|_] = W, active=A} = State, _) ->
+ Waiting = [#job{name=Name, node=Node} || {Name,Node} <- W],
+ Active = [#job{name=Name, node=Node, pid=Pid} || {Name,Node,Pid} <- A],
+ {ok, State#state{active = Active, waiting = Waiting}};
+
+code_change(_, State, _) ->
+ {ok, State}.
+
+handle_replication_exit(#state{waiting=[]} = State, Pid) ->
+ NewActive = lists:keydelete(Pid, #job.pid, State#state.active),
+ {noreply, State#state{active=NewActive, count=length(NewActive)}};
+handle_replication_exit(State, Pid) ->
+ #state{active=Active, limit=Limit, dict=D, waiting=Waiting} = State,
+ Active1 = lists:keydelete(Pid, #job.pid, Active),
+ Count = length(Active1),
+ NewState = if Count < Limit ->
+ case next_replication(Active1, Waiting) of
+ nil -> % all waiting replications are also active
+ State#state{active = Active1, count = Count};
+ {#job{name=DbName, node=Node} = Job, StillWaiting} ->
+ NewPid = start_push_replication(Job),
+ State#state{
+ active = [Job#job{pid = NewPid} | Active1],
+ count = Count+1,
+ dict = dict:erase({DbName,Node}, D),
+ waiting = StillWaiting
+ }
+ end;
+ true ->
+ State#state{active = Active1, count=Count}
+ end,
+ {noreply, NewState}.
+
+start_push_replication(#job{name=Name, node=Node}) ->
+ spawn_link(mem3_rep, go, [Name, Node]).
+
+add_to_queue(State, #job{name=DbName, node=Node} = Job) ->
+ #state{dict=D, waiting=Waiting} = State,
+ case dict:is_key({DbName, Node}, D) of
+ true ->
+ State;
+ false ->
+ twig:log(debug, "adding ~s -> ~p to mem3_sync queue", [DbName, Node]),
+ State#state{
+ dict = dict:store({DbName,Node}, ok, D),
+ waiting = Waiting ++ [Job]
+ }
+ end.
+
+sync_nodes_and_dbs() ->
+ Db1 = couch_config:get("mem3", "node_db", "nodes"),
+ Db2 = couch_config:get("mem3", "shard_db", "dbs"),
+ Db3 = couch_config:get("couch_httpd_auth", "authentication_db", "_users"),
+ Dbs = [Db1, Db2, Db3],
+ Nodes = mem3:nodes(),
+ Live = nodes(),
+ [[push(?l2b(Db), N) || Db <- Dbs] || N <- Nodes, lists:member(N, Live)].
+
+initial_sync() ->
+ [net_kernel:connect_node(Node) || Node <- mem3:nodes()],
+ sync_nodes_and_dbs(),
+ initial_sync(nodes()).
+
+initial_sync(Live) ->
+ Self = node(),
+ {ok, AllDbs} = fabric:all_dbs(),
+ lists:foreach(fun(Db) ->
+ LocalShards = [S || #shard{node=N} = S <- mem3:shards(Db), N =:= Self],
+ lists:foreach(fun(#shard{name=ShardName}) ->
+ Targets = [S || #shard{node=N, name=Name} = S <- mem3:shards(Db),
+ N =/= Self, Name =:= ShardName],
+ [?MODULE:push(ShardName, N) || #shard{node=N} <- Targets,
+ lists:member(N, Live)]
+ end, LocalShards)
+ end, AllDbs).
+
+start_update_notifier() ->
+ Db1 = ?l2b(couch_config:get("mem3", "node_db", "nodes")),
+ Db2 = ?l2b(couch_config:get("mem3", "shard_db", "dbs")),
+ Db3 = ?l2b(couch_config:get("couch_httpd_auth", "authentication_db",
+ "_users")),
+ couch_db_update_notifier:start_link(fun
+ ({updated, Db}) when Db == Db1; Db == Db2; Db == Db3 ->
+ Nodes = mem3:nodes(),
+ Live = nodes(),
+ [?MODULE:push(Db, N) || N <- Nodes, lists:member(N, Live)];
+ ({updated, <<"shards/", _/binary>> = ShardName}) ->
+ % TODO deal with split/merged partitions by comparing keyranges
+ try mem3:shards(mem3:dbname(ShardName)) of
+ Shards ->
+ Targets = [S || #shard{node=N, name=Name} = S <- Shards,
+ N =/= node(), Name =:= ShardName],
+ Live = nodes(),
+ [?MODULE:push(ShardName,N) || #shard{node=N} <- Targets,
+ lists:member(N, Live)]
+ catch error:database_does_not_exist ->
+ ok
+ end;
+ ({deleted, <<"shards/", _:18/binary, _/binary>> = ShardName}) ->
+ gen_server:cast(?MODULE, {remove_shard, ShardName});
+ (_) -> ok end).
+
+%% @doc Finds the next {DbName,Node} pair in the list of waiting replications
+%% which does not correspond to an already running replication
+-spec next_replication([#job{}], [#job{}]) -> {#job{}, [#job{}]} | nil.
+next_replication(Active, Waiting) ->
+ Fun = fun(#job{name=S, node=N}) -> is_running(S,N,Active) end,
+ case lists:splitwith(Fun, Waiting) of
+ {_, []} ->
+ nil;
+ {Running, [Job|Rest]} ->
+ {Job, Running ++ Rest}
+ end.
+
+is_running(DbName, Node, ActiveList) ->
+ [] =/= [true || #job{name=S, node=N} <- ActiveList, S=:=DbName, N=:=Node].
+
+remove_entries(Dict, Entries) ->
+ lists:foldl(fun(Entry, D) -> dict:erase(Entry, D) end, Dict, Entries).
diff --git a/deps/mem3/src/mem3_sync_event.erl b/deps/mem3/src/mem3_sync_event.erl
new file mode 100644
index 00000000..be5ab450
--- /dev/null
+++ b/deps/mem3/src/mem3_sync_event.erl
@@ -0,0 +1,68 @@
+% Copyright 2010 Cloudant
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_sync_event).
+-behaviour(gen_event).
+
+-export([init/1, handle_event/2, handle_call/2, handle_info/2, terminate/2,
+ code_change/3]).
+
+init(_) ->
+ net_kernel:monitor_nodes(true),
+ {ok, nil}.
+
+handle_event({add_node, Node}, State) when Node =/= node() ->
+ Db1 = list_to_binary(couch_config:get("mem3", "node_db", "nodes")),
+ Db2 = list_to_binary(couch_config:get("mem3", "shard_db", "dbs")),
+ Db3 = list_to_binary(couch_config:get("couch_httpd_auth",
+ "authentication_db", "_users")),
+ [mem3_sync:push(Db, Node) || Db <- [Db1, Db2, Db3]],
+ {ok, State};
+
+handle_event({remove_node, Node}, State) ->
+ mem3_sync:remove_node(Node),
+ {ok, State};
+
+handle_event(_Event, State) ->
+ {ok, State}.
+
+handle_call(_Request, State) ->
+ {ok, ok, State}.
+
+handle_info({nodeup, Node}, State) ->
+ case lists:member(Node, mem3:nodes()) of
+ true ->
+ Db1 = list_to_binary(couch_config:get("mem3", "node_db", "nodes")),
+ Db2 = list_to_binary(couch_config:get("mem3", "shard_db", "dbs")),
+ Db3 = list_to_binary(couch_config:get("couch_httpd_auth",
+ "authentication_db", "_users")),
+ [mem3_sync:push(Db, Node) || Db <- [Db1, Db2, Db3]],
+ mem3_sync:initial_sync([Node]);
+ false ->
+ ok
+ end,
+ {ok, State};
+
+handle_info({nodedown, Node}, State) ->
+ mem3_sync:remove_node(Node),
+ {ok, State};
+
+handle_info(_Info, State) ->
+ {ok, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
diff --git a/deps/mem3/src/mem3_util.erl b/deps/mem3/src/mem3_util.erl
new file mode 100644
index 00000000..c1b965bb
--- /dev/null
+++ b/deps/mem3/src/mem3_util.erl
@@ -0,0 +1,211 @@
+% Copyright 2010 Cloudant
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_util).
+
+-export([hash/1, name_shard/2, create_partition_map/5, build_shards/2,
+ n_val/2, z_val/3, to_atom/1, to_integer/1, write_db_doc/1, delete_db_doc/1,
+ load_shards_from_disk/1, load_shards_from_disk/2, shard_info/1,
+ ensure_exists/1, open_db_doc/1]).
+
+-export([create_partition_map/4, name_shard/1]).
+-deprecated({create_partition_map, 4, eventually}).
+-deprecated({name_shard, 1, eventually}).
+
+-define(RINGTOP, 2 bsl 31). % CRC32 space
+
+-include("mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+hash(Item) when is_binary(Item) ->
+ erlang:crc32(Item);
+hash(Item) ->
+ erlang:crc32(term_to_binary(Item)).
+
+name_shard(Shard) ->
+ name_shard(Shard, "").
+
+name_shard(#shard{dbname = DbName, range=[B,E]} = Shard, Suffix) ->
+ Name = ["shards/", couch_util:to_hex(<<B:32/integer>>), "-",
+ couch_util:to_hex(<<E:32/integer>>), "/", DbName, Suffix],
+ Shard#shard{name = ?l2b(Name)}.
+
+create_partition_map(DbName, N, Q, Nodes) ->
+ create_partition_map(DbName, N, Q, Nodes, "").
+
+create_partition_map(DbName, N, Q, Nodes, Suffix) ->
+ UniqueShards = make_key_ranges((?RINGTOP) div Q, 0, []),
+ Shards0 = lists:flatten([lists:duplicate(N, S) || S <- UniqueShards]),
+ Shards1 = attach_nodes(Shards0, [], Nodes, []),
+ [name_shard(S#shard{dbname=DbName}, Suffix) || S <- Shards1].
+
+make_key_ranges(_, CurrentPos, Acc) when CurrentPos >= ?RINGTOP ->
+ Acc;
+make_key_ranges(Increment, Start, Acc) ->
+ case Start + 2*Increment of
+ X when X > ?RINGTOP ->
+ End = ?RINGTOP - 1;
+ _ ->
+ End = Start + Increment - 1
+ end,
+ make_key_ranges(Increment, End+1, [#shard{range=[Start, End]} | Acc]).
+
+attach_nodes([], Acc, _, _) ->
+ lists:reverse(Acc);
+attach_nodes(Shards, Acc, [], UsedNodes) ->
+ attach_nodes(Shards, Acc, lists:reverse(UsedNodes), []);
+attach_nodes([S | Rest], Acc, [Node | Nodes], UsedNodes) ->
+ attach_nodes(Rest, [S#shard{node=Node} | Acc], Nodes, [Node | UsedNodes]).
+
+open_db_doc(DocId) ->
+ DbName = ?l2b(couch_config:get("mem3", "shard_db", "dbs")),
+ {ok, Db} = couch_db:open(DbName, []),
+ try couch_db:open_doc(Db, DocId, []) after couch_db:close(Db) end.
+
+write_db_doc(Doc) ->
+ DbName = ?l2b(couch_config:get("mem3", "shard_db", "dbs")),
+ write_db_doc(DbName, Doc, true).
+
+write_db_doc(DbName, #doc{id=Id, body=Body} = Doc, ShouldMutate) ->
+ {ok, Db} = couch_db:open(DbName, []),
+ try couch_db:open_doc(Db, Id, []) of
+ {ok, #doc{body = Body}} ->
+ % the doc is already in the desired state, we're done here
+ ok;
+ {not_found, _} when ShouldMutate ->
+ try couch_db:update_doc(Db, Doc, []) of
+ {ok, _} ->
+ ok
+ catch conflict ->
+ % check to see if this was a replication race or a different edit
+ write_db_doc(DbName, Doc, false)
+ end;
+ _ ->
+ % the doc already exists in a different state
+ conflict
+ after
+ couch_db:close(Db)
+ end.
+
+delete_db_doc(DocId) ->
+ DbName = ?l2b(couch_config:get("mem3", "shard_db", "dbs")),
+ delete_db_doc(DbName, DocId, true).
+
+delete_db_doc(DbName, DocId, ShouldMutate) ->
+ {ok, Db} = couch_db:open(DbName, []),
+ {ok, Revs} = couch_db:open_doc_revs(Db, DocId, all, []),
+ try [Doc#doc{deleted=true} || {ok, #doc{deleted=false}=Doc} <- Revs] of
+ [] ->
+ not_found;
+ Docs when ShouldMutate ->
+ try couch_db:update_docs(Db, Docs, []) of
+ {ok, _} ->
+ ok
+ catch conflict ->
+ % check to see if this was a replication race or if leafs survived
+ delete_db_doc(DbName, DocId, false)
+ end;
+ _ ->
+ % we have live leafs that we aren't allowed to delete. let's bail
+ conflict
+ after
+ couch_db:close(Db)
+ end.
+
+build_shards(DbName, DocProps) ->
+ {ByNode} = couch_util:get_value(<<"by_node">>, DocProps, {[]}),
+ Suffix = couch_util:get_value(<<"shard_suffix">>, DocProps, ""),
+ lists:flatmap(fun({Node, Ranges}) ->
+ lists:map(fun(Range) ->
+ [B,E] = string:tokens(?b2l(Range), "-"),
+ Beg = httpd_util:hexlist_to_integer(B),
+ End = httpd_util:hexlist_to_integer(E),
+ name_shard(#shard{
+ dbname = DbName,
+ node = to_atom(Node),
+ range = [Beg, End]
+ }, Suffix)
+ end, Ranges)
+ end, ByNode).
+
+to_atom(Node) when is_binary(Node) ->
+ list_to_atom(binary_to_list(Node));
+to_atom(Node) when is_atom(Node) ->
+ Node.
+
+to_integer(N) when is_integer(N) ->
+ N;
+to_integer(N) when is_binary(N) ->
+ list_to_integer(binary_to_list(N));
+to_integer(N) when is_list(N) ->
+ list_to_integer(N).
+
+n_val(undefined, NodeCount) ->
+ n_val(couch_config:get("cluster", "n", "3"), NodeCount);
+n_val(N, NodeCount) when is_list(N) ->
+ n_val(list_to_integer(N), NodeCount);
+n_val(N, NodeCount) when is_integer(NodeCount), N > NodeCount ->
+ twig:log(error, "Request to create N=~p DB but only ~p node(s)", [N, NodeCount]),
+ NodeCount;
+n_val(N, _) when N < 1 ->
+ 1;
+n_val(N, _) ->
+ N.
+
+z_val(undefined, NodeCount, ZoneCount) ->
+ z_val(couch_config:get("cluster", "z", "3"), NodeCount, ZoneCount);
+z_val(N, NodeCount, ZoneCount) when is_list(N) ->
+ z_val(list_to_integer(N), NodeCount, ZoneCount);
+z_val(N, NodeCount, ZoneCount) when N > NodeCount orelse N > ZoneCount ->
+ twig:log(error, "Request to create Z=~p DB but only ~p nodes(s) and ~p zone(s)",
+ [N, NodeCount, ZoneCount]),
+ erlang:min(NodeCount, ZoneCount);
+z_val(N, _, _) when N < 1 ->
+ 1;
+z_val(N, _, _) ->
+ N.
+
+load_shards_from_disk(DbName) when is_binary(DbName) ->
+ X = ?l2b(couch_config:get("mem3", "shard_db", "dbs")),
+ {ok, Db} = couch_db:open(X, []),
+ try load_shards_from_db(Db, DbName) after couch_db:close(Db) end.
+
+load_shards_from_db(#db{} = ShardDb, DbName) ->
+ case couch_db:open_doc(ShardDb, DbName, []) of
+ {ok, #doc{body = {Props}}} ->
+ twig:log(notice, "dbs cache miss for ~s", [DbName]),
+ build_shards(DbName, Props);
+ {not_found, _} ->
+ erlang:error(database_does_not_exist, ?b2l(DbName))
+ end.
+
+load_shards_from_disk(DbName, DocId)->
+ Shards = load_shards_from_disk(DbName),
+ HashKey = hash(DocId),
+ [S || #shard{range = [B,E]} = S <- Shards, B < HashKey, HashKey =< E].
+
+shard_info(DbName) ->
+ [{n, mem3:n(DbName)},
+ {q, length(mem3:shards(DbName)) div mem3:n(DbName)}].
+
+ensure_exists(DbName) when is_list(DbName) ->
+ ensure_exists(list_to_binary(DbName));
+ensure_exists(DbName) ->
+ Options = [{user_ctx, #user_ctx{roles=[<<"_admin">>]}}],
+ case couch_db:open(DbName, Options) of
+ {ok, Db} ->
+ {ok, Db};
+ _ ->
+ couch_server:create(DbName, Options)
+ end.