summaryrefslogtreecommitdiff
path: root/deps/mem3
diff options
context:
space:
mode:
Diffstat (limited to 'deps/mem3')
-rw-r--r--deps/mem3/README.md33
-rw-r--r--deps/mem3/include/mem3.hrl44
-rwxr-xr-xdeps/mem3/rebarbin0 -> 100732 bytes
-rw-r--r--deps/mem3/rebar.config17
-rw-r--r--deps/mem3/src/mem3.app.src13
-rw-r--r--deps/mem3/src/mem3.erl238
-rw-r--r--deps/mem3/src/mem3_app.erl23
-rw-r--r--deps/mem3/src/mem3_cache.erl118
-rw-r--r--deps/mem3/src/mem3_httpd.erl53
-rw-r--r--deps/mem3/src/mem3_nodes.erl136
-rw-r--r--deps/mem3/src/mem3_rep.erl144
-rw-r--r--deps/mem3/src/mem3_rep_manager.erl627
-rw-r--r--deps/mem3/src/mem3_sup.erl36
-rw-r--r--deps/mem3/src/mem3_sync.erl267
-rw-r--r--deps/mem3/src/mem3_sync_event.erl68
-rw-r--r--deps/mem3/src/mem3_util.erl211
-rw-r--r--deps/mem3/test/01-config-default.ini2
-rw-r--r--deps/mem3/test/mem3_util_test.erl154
18 files changed, 2184 insertions, 0 deletions
diff --git a/deps/mem3/README.md b/deps/mem3/README.md
new file mode 100644
index 00000000..7f6998bb
--- /dev/null
+++ b/deps/mem3/README.md
@@ -0,0 +1,33 @@
+## mem3
+
+Mem3 is the node membership application for clustered [CouchDB][1]. It is used in [BigCouch][2] and tracks two very important things for the cluster:
+
+ 1. member nodes
+ 2. node/partition mappings for each database
+
+Both the nodes and partitions are tracked in node-local couch databases. Partitions are heavily used, so an ETS cache is also maintained for low-latency lookups. The nodes and partitions are synchronized via continuous CouchDB replication, which serves as 'gossip' in Dynamo parlance. The partitions ETS cache is kept in sync based on membership and database event listeners.
+
+A very important point to make here is that BigCouch does not necessarily divide up each database into equal partitions across the nodes of a cluster. For instance, in a 20-node cluster, you may have the need to create a small database with very few documents. For efficiency reasons, you may create your database with Q=4 and keep the default of N=3. This means you only have 12 partitions total, so 8 nodes will hold none of the data for this database. Given this feature, we even partition use out across the cluster by altering the 'start' node for the database's partitions.
+
+Splitting and merging partitions is an immature feature of the system, and will require attention in the near-term. We believe we can implement both functions and perform them while the database remains online.
+
+### Getting Started
+
+Mem3 requires R13B03 or higher and can be built with [rebar][6], which comes bundled in the repository. Rebar needs to be able to find the `couch_db.hrl` header file; one way to accomplish this is to set ERL_LIBS to point to the apps
+subdirectory of a bigcouch checkout, e.g.
+
+ ERL_LIBS="/usr/local/src/bigcouch/apps" ./rebar compile
+
+### License
+[Apache 2.0][3]
+
+### Contact
+ * [http://cloudant.com][4]
+ * [info@cloudant.com][5]
+
+[1]: http://couchdb.apache.org
+[2]: http://github.com/cloudant/bigcouch
+[3]: http://www.apache.org/licenses/LICENSE-2.0.html
+[4]: http://cloudant.com
+[5]: mailto:info@cloudant.com
+[6]: http://github.com/basho/rebar
diff --git a/deps/mem3/include/mem3.hrl b/deps/mem3/include/mem3.hrl
new file mode 100644
index 00000000..04658bb3
--- /dev/null
+++ b/deps/mem3/include/mem3.hrl
@@ -0,0 +1,44 @@
+% Copyright 2010 Cloudant
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+% type specification hacked to suppress dialyzer warning re: match spec
+-record(shard, {
+ name :: binary() | '_',
+ node :: node() | '_',
+ dbname :: binary(),
+ range :: [non_neg_integer() | '$1' | '$2'],
+ ref :: reference() | 'undefined' | '_'
+}).
+
+%% types
+-type join_type() :: init | join | replace | leave.
+-type join_order() :: non_neg_integer().
+-type options() :: list().
+-type mem_node() :: {join_order(), node(), options()}.
+-type mem_node_list() :: [mem_node()].
+-type arg_options() :: {test, boolean()}.
+-type args() :: [] | [arg_options()].
+-type test() :: undefined | node().
+-type epoch() :: float().
+-type clock() :: {node(), epoch()}.
+-type vector_clock() :: [clock()].
+-type ping_node() :: node() | nil.
+-type gossip_fun() :: call | cast.
+
+-type part() :: #shard{}.
+-type fullmap() :: [part()].
+-type ref_part_map() :: {reference(), part()}.
+-type tref() :: reference().
+-type np() :: {node(), part()}.
+-type beg_acc() :: [integer()].
diff --git a/deps/mem3/rebar b/deps/mem3/rebar
new file mode 100755
index 00000000..30c43ba5
--- /dev/null
+++ b/deps/mem3/rebar
Binary files differ
diff --git a/deps/mem3/rebar.config b/deps/mem3/rebar.config
new file mode 100644
index 00000000..4af6b852
--- /dev/null
+++ b/deps/mem3/rebar.config
@@ -0,0 +1,17 @@
+% Copyright 2011 Cloudant
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+{deps, [
+ {twig, ".*", {git, "https://github.com/cloudant/twig.git", {tag, "0.2.1"}}}
+]}.
diff --git a/deps/mem3/src/mem3.app.src b/deps/mem3/src/mem3.app.src
new file mode 100644
index 00000000..88447783
--- /dev/null
+++ b/deps/mem3/src/mem3.app.src
@@ -0,0 +1,13 @@
+{application, mem3, [
+ {description, "CouchDB Cluster Membership"},
+ {vsn, git},
+ {mod, {mem3_app, []}},
+ {registered, [
+ mem3_cache,
+ mem3_events,
+ mem3_nodes,
+ mem3_sync,
+ mem3_sup
+ ]},
+ {applications, [kernel, stdlib, sasl, crypto, mochiweb, couch, twig]}
+]}.
diff --git a/deps/mem3/src/mem3.erl b/deps/mem3/src/mem3.erl
new file mode 100644
index 00000000..c7979642
--- /dev/null
+++ b/deps/mem3/src/mem3.erl
@@ -0,0 +1,238 @@
+% Copyright 2010 Cloudant
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3).
+
+-export([start/0, stop/0, restart/0, nodes/0, node_info/2, shards/1, shards/2,
+ choose_shards/2, n/1, dbname/1, ushards/1]).
+-export([compare_nodelists/0, compare_shards/1]).
+-export([quorum/1]).
+
+-include("mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+start() ->
+ application:start(mem3).
+
+stop() ->
+ application:stop(mem3).
+
+restart() ->
+ stop(),
+ start().
+
+%% @doc Detailed report of cluster-wide membership state. Queries the state
+%% on all member nodes and builds a dictionary with unique states as the
+%% key and the nodes holding that state as the value. Also reports member
+%% nodes which fail to respond and nodes which are connected but are not
+%% cluster members. Useful for debugging.
+-spec compare_nodelists() -> [{{cluster_nodes, [node()]} | bad_nodes
+ | non_member_nodes, [node()]}].
+compare_nodelists() ->
+ Nodes = mem3:nodes(),
+ AllNodes = erlang:nodes([this, visible]),
+ {Replies, BadNodes} = gen_server:multi_call(Nodes, mem3_nodes, get_nodelist),
+ Dict = lists:foldl(fun({Node, Nodelist}, D) ->
+ orddict:append({cluster_nodes, Nodelist}, Node, D)
+ end, orddict:new(), Replies),
+ [{non_member_nodes, AllNodes -- Nodes}, {bad_nodes, BadNodes} | Dict].
+
+-spec compare_shards(DbName::iodata()) -> [{bad_nodes | [#shard{}], [node()]}].
+compare_shards(DbName) when is_list(DbName) ->
+ compare_shards(list_to_binary(DbName));
+compare_shards(DbName) ->
+ Nodes = mem3:nodes(),
+ {Replies, BadNodes} = rpc:multicall(mem3, shards, [DbName]),
+ GoodNodes = [N || N <- Nodes, not lists:member(N, BadNodes)],
+ Dict = lists:foldl(fun({Shards, Node}, D) ->
+ orddict:append(Shards, Node, D)
+ end, orddict:new(), lists:zip(Replies, GoodNodes)),
+ [{bad_nodes, BadNodes} | Dict].
+
+-spec n(DbName::iodata()) -> integer().
+n(DbName) ->
+ length(mem3:shards(DbName, <<"foo">>)).
+
+-spec nodes() -> [node()].
+nodes() ->
+ mem3_nodes:get_nodelist().
+
+node_info(Node, Key) ->
+ mem3_nodes:get_node_info(Node, Key).
+
+-spec shards(DbName::iodata()) -> [#shard{}].
+shards(DbName) when is_list(DbName) ->
+ shards(list_to_binary(DbName));
+shards(DbName) ->
+ ShardDbName =
+ list_to_binary(couch_config:get("mem3", "shard_db", "dbs")),
+ case DbName of
+ ShardDbName ->
+ %% shard_db is treated as a single sharded db to support calls to db_info
+ %% and view_all_docs
+ [#shard{
+ node = node(),
+ name = ShardDbName,
+ dbname = ShardDbName,
+ range = [0, 2 bsl 31]}];
+ _ ->
+ try ets:lookup(partitions, DbName) of
+ [] ->
+ mem3_util:load_shards_from_disk(DbName);
+ Else ->
+ Else
+ catch error:badarg ->
+ mem3_util:load_shards_from_disk(DbName)
+ end
+ end.
+
+-spec shards(DbName::iodata(), DocId::binary()) -> [#shard{}].
+shards(DbName, DocId) when is_list(DbName) ->
+ shards(list_to_binary(DbName), DocId);
+shards(DbName, DocId) when is_list(DocId) ->
+ shards(DbName, list_to_binary(DocId));
+shards(DbName, DocId) ->
+ HashKey = mem3_util:hash(DocId),
+ Head = #shard{
+ name = '_',
+ node = '_',
+ dbname = DbName,
+ range = ['$1','$2'],
+ ref = '_'
+ },
+ Conditions = [{'=<', '$1', HashKey}, {'=<', HashKey, '$2'}],
+ try ets:select(partitions, [{Head, Conditions, ['$_']}]) of
+ [] ->
+ mem3_util:load_shards_from_disk(DbName, DocId);
+ Shards ->
+ Shards
+ catch error:badarg ->
+ mem3_util:load_shards_from_disk(DbName, DocId)
+ end.
+
+ushards(DbName) ->
+ Shards = mem3:shards(DbName),
+ Nodes = rotate_nodes(DbName, live_nodes()),
+ Buckets = bucket_by_range(Shards),
+ choose_ushards(Buckets, Nodes).
+
+rotate_nodes(DbName, Nodes) ->
+ {H, T} = lists:split(erlang:crc32(DbName) rem length(Nodes), Nodes),
+ T ++ H.
+
+live_nodes() ->
+ lists:sort([node()|erlang:nodes()]).
+
+bucket_by_range(Shards) ->
+ Buckets0 = lists:foldl(fun(#shard{range=Range}=Shard, Dict) ->
+ orddict:append(Range, Shard, Dict) end, orddict:new(), Shards),
+ {_, Buckets} = lists:unzip(Buckets0),
+ Buckets.
+
+choose_ushards(Buckets, Nodes) ->
+ choose_ushards(Buckets, Nodes, []).
+
+choose_ushards([], _, Acc) ->
+ lists:reverse(Acc);
+choose_ushards([Bucket|RestBuckets], Nodes, Acc) ->
+ #shard{node=Node} = Shard = first_match(Bucket, Bucket, Nodes),
+ choose_ushards(RestBuckets, lists:delete(Node, Nodes) ++ [Node],
+ [Shard | Acc]).
+
+first_match([], [#shard{range=Range}|_], []) ->
+ throw({range_not_available, Range});
+first_match([#shard{node=Node}=Shard|_], _, [Node|_]) ->
+ Shard;
+first_match([], Shards, [_|RestNodes]) ->
+ first_match(Shards, Shards, RestNodes);
+first_match([_|RestShards], Shards, Nodes) ->
+ first_match(RestShards, Shards, Nodes).
+
+-spec choose_shards(DbName::iodata(), Options::list()) -> [#shard{}].
+choose_shards(DbName, Options) when is_list(DbName) ->
+ choose_shards(list_to_binary(DbName), Options);
+choose_shards(DbName, Options) ->
+ try shards(DbName)
+ catch error:E when E==database_does_not_exist; E==badarg ->
+ Nodes = mem3:nodes(),
+ NodeCount = length(Nodes),
+ Zones = zones(Nodes),
+ ZoneCount = length(Zones),
+ N = mem3_util:n_val(couch_util:get_value(n, Options), NodeCount),
+ Q = mem3_util:to_integer(couch_util:get_value(q, Options,
+ couch_config:get("cluster", "q", "8"))),
+ Z = mem3_util:z_val(couch_util:get_value(z, Options), NodeCount, ZoneCount),
+ Suffix = couch_util:get_value(shard_suffix, Options, ""),
+ ChosenZones = lists:sublist(shuffle(Zones), Z),
+ lists:flatmap(
+ fun({Zone, N1}) ->
+ Nodes1 = nodes_in_zone(Nodes, Zone),
+ {A, B} = lists:split(crypto:rand_uniform(1,length(Nodes1)+1), Nodes1),
+ RotatedNodes = B ++ A,
+ mem3_util:create_partition_map(DbName, erlang:min(N1,length(Nodes1)),
+ Q, RotatedNodes, Suffix)
+ end,
+ lists:zip(ChosenZones, apportion(N, Z)))
+ end.
+
+-spec dbname(#shard{} | iodata()) -> binary().
+dbname(#shard{dbname = DbName}) ->
+ DbName;
+dbname(<<"shards/", _:8/binary, "-", _:8/binary, "/", DbName/binary>>) ->
+ list_to_binary(filename:rootname(binary_to_list(DbName)));
+dbname(DbName) when is_list(DbName) ->
+ dbname(list_to_binary(DbName));
+dbname(DbName) when is_binary(DbName) ->
+ DbName;
+dbname(_) ->
+ erlang:error(badarg).
+
+
+zones(Nodes) ->
+ lists:usort([mem3:node_info(Node, <<"zone">>) || Node <- Nodes]).
+
+nodes_in_zone(Nodes, Zone) ->
+ [Node || Node <- Nodes, Zone == mem3:node_info(Node, <<"zone">>)].
+
+shuffle(List) ->
+ %% Determine the log n portion then randomize the list.
+ randomize(round(math:log(length(List)) + 0.5), List).
+
+randomize(1, List) ->
+ randomize(List);
+randomize(T, List) ->
+ lists:foldl(fun(_E, Acc) -> randomize(Acc) end,
+ randomize(List), lists:seq(1, (T - 1))).
+
+randomize(List) ->
+ D = lists:map(fun(A) -> {random:uniform(), A} end, List),
+ {_, D1} = lists:unzip(lists:keysort(1, D)),
+ D1.
+
+apportion(Shares, Ways) ->
+ apportion(Shares, lists:duplicate(Ways, 0), Shares).
+
+apportion(_Shares, Acc, 0) ->
+ Acc;
+apportion(Shares, Acc, Remaining) ->
+ N = Remaining rem length(Acc),
+ [H|T] = lists:nthtail(N, Acc),
+ apportion(Shares, lists:sublist(Acc, N) ++ [H+1|T], Remaining - 1).
+
+% quorum functions
+
+quorum(#db{name=DbName}) ->
+ quorum(DbName);
+quorum(DbName) ->
+ n(DbName) div 2 + 1.
diff --git a/deps/mem3/src/mem3_app.erl b/deps/mem3/src/mem3_app.erl
new file mode 100644
index 00000000..bb27171f
--- /dev/null
+++ b/deps/mem3/src/mem3_app.erl
@@ -0,0 +1,23 @@
+% Copyright 2010 Cloudant
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_app).
+-behaviour(application).
+-export([start/2, stop/1]).
+
+start(_Type, []) ->
+ mem3_sup:start_link().
+
+stop([]) ->
+ ok.
diff --git a/deps/mem3/src/mem3_cache.erl b/deps/mem3/src/mem3_cache.erl
new file mode 100644
index 00000000..84686b91
--- /dev/null
+++ b/deps/mem3/src/mem3_cache.erl
@@ -0,0 +1,118 @@
+% Copyright 2010 Cloudant
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_cache).
+-behaviour(gen_server).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3]).
+
+-export([start_link/0]).
+
+-record(state, {changes_pid}).
+
+-include("mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+init([]) ->
+ ets:new(partitions, [bag, public, named_table, {keypos,#shard.dbname}]),
+ {Pid, _} = spawn_monitor(fun() -> listen_for_changes(0) end),
+ {ok, #state{changes_pid = Pid}}.
+
+handle_call(_Call, _From, State) ->
+ {noreply, State}.
+
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+handle_info({'DOWN', _, _, Pid, {badarg, [{ets,delete,[partitions,_]}|_]}},
+ #state{changes_pid=Pid} = State) ->
+ % fatal error, somebody deleted our ets table
+ {stop, ets_table_error, State};
+handle_info({'DOWN', _, _, Pid, Reason}, #state{changes_pid=Pid} = State) ->
+ twig:log(notice, "~p changes listener died ~p", [?MODULE, Reason]),
+ Seq = case Reason of {seq, EndSeq} -> EndSeq; _ -> 0 end,
+ erlang:send_after(5000, self(), {start_listener, Seq}),
+ {noreply, State};
+handle_info({start_listener, Seq}, State) ->
+ {NewPid, _} = spawn_monitor(fun() -> listen_for_changes(Seq) end),
+ {noreply, State#state{changes_pid=NewPid}};
+handle_info(_Msg, State) ->
+ {noreply, State}.
+
+terminate(_Reason, #state{changes_pid=Pid}) ->
+ exit(Pid, kill),
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%% internal functions
+
+listen_for_changes(Since) ->
+ DbName = couch_config:get("mem3", "shard_db", "dbs"),
+ {ok, Db} = mem3_util:ensure_exists(DbName),
+ Args = #changes_args{
+ feed = "continuous",
+ since = Since,
+ heartbeat = true,
+ include_docs = true
+ },
+ ChangesFun = couch_changes:handle_changes(Args, nil, Db),
+ ChangesFun(fun changes_callback/2).
+
+changes_callback(start, _) ->
+ {ok, nil};
+changes_callback({stop, EndSeq}, _) ->
+ exit({seq, EndSeq});
+changes_callback({change, {Change}, _}, _) ->
+ DbName = couch_util:get_value(<<"id">>, Change),
+ case DbName of <<"_design/", _/binary>> -> ok; _Else ->
+ case couch_util:get_value(<<"deleted">>, Change, false) of
+ true ->
+ ets:delete(partitions, DbName);
+ false ->
+ case couch_util:get_value(doc, Change) of
+ {error, Reason} ->
+ twig:log(error, "missing partition table for ~s: ~p", [DbName, Reason]);
+ {Doc} ->
+ ets:delete(partitions, DbName),
+ Shards = mem3_util:build_shards(DbName, Doc),
+ ets:insert(partitions, Shards),
+ [create_if_missing(Name) || #shard{name=Name, node=Node}
+ <- Shards, Node =:= node()]
+ end
+ end
+ end,
+ {ok, couch_util:get_value(<<"seq">>, Change)};
+changes_callback(timeout, _) ->
+ {ok, nil}.
+
+create_if_missing(Name) ->
+ DbDir = couch_config:get("couchdb", "database_dir"),
+ Filename = filename:join(DbDir, ?b2l(Name) ++ ".couch"),
+ case filelib:is_regular(Filename) of
+ true ->
+ ok;
+ false ->
+ Options = [{user_ctx, #user_ctx{roles=[<<"_admin">>]}}],
+ case couch_server:create(Name, Options) of
+ {ok, Db} ->
+ couch_db:close(Db);
+ Error ->
+ twig:log(error, "~p tried to create ~s, got ~p", [?MODULE, Name, Error])
+ end
+ end.
diff --git a/deps/mem3/src/mem3_httpd.erl b/deps/mem3/src/mem3_httpd.erl
new file mode 100644
index 00000000..716080f8
--- /dev/null
+++ b/deps/mem3/src/mem3_httpd.erl
@@ -0,0 +1,53 @@
+% Copyright 2010 Cloudant
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_httpd).
+
+-export([handle_membership_req/1]).
+
+%% includes
+-include("mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+
+handle_membership_req(#httpd{method='GET',
+ path_parts=[<<"_membership">>]} = Req) ->
+ ClusterNodes = try mem3:nodes()
+ catch _:_ -> {ok,[]} end,
+ couch_httpd:send_json(Req, {[
+ {all_nodes, lists:sort([node()|nodes()])},
+ {cluster_nodes, lists:sort(ClusterNodes)}
+ ]});
+handle_membership_req(#httpd{method='GET',
+ path_parts=[<<"_membership">>, <<"parts">>, DbName]} = Req) ->
+ ClusterNodes = try mem3:nodes()
+ catch _:_ -> {ok,[]} end,
+ Shards = mem3:shards(DbName),
+ JsonShards = json_shards(Shards, dict:new()),
+ couch_httpd:send_json(Req, {[
+ {all_nodes, lists:sort([node()|nodes()])},
+ {cluster_nodes, lists:sort(ClusterNodes)},
+ {partitions, JsonShards}
+ ]}).
+
+%%
+%% internal
+%%
+
+json_shards([], AccIn) ->
+ List = dict:to_list(AccIn),
+ {lists:sort(List)};
+json_shards([#shard{node=Node, range=[B,_E]} | Rest], AccIn) ->
+ HexBeg = couch_util:to_hex(<<B:32/integer>>),
+ json_shards(Rest, dict:append(HexBeg, Node, AccIn)).
diff --git a/deps/mem3/src/mem3_nodes.erl b/deps/mem3/src/mem3_nodes.erl
new file mode 100644
index 00000000..0be66462
--- /dev/null
+++ b/deps/mem3/src/mem3_nodes.erl
@@ -0,0 +1,136 @@
+% Copyright 2010 Cloudant
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_nodes).
+-behaviour(gen_server).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3]).
+
+-export([start_link/0, get_nodelist/0, get_node_info/2]).
+
+-include("mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+-record(state, {changes_pid, update_seq, nodes}).
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+get_nodelist() ->
+ gen_server:call(?MODULE, get_nodelist).
+
+get_node_info(Node, Key) ->
+ gen_server:call(?MODULE, {get_node_info, Node, Key}).
+
+init([]) ->
+ {Nodes, UpdateSeq} = initialize_nodelist(),
+ {Pid, _} = spawn_monitor(fun() -> listen_for_changes(UpdateSeq) end),
+ {ok, #state{changes_pid = Pid, update_seq = UpdateSeq, nodes = Nodes}}.
+
+handle_call(get_nodelist, _From, State) ->
+ {reply, lists:sort(dict:fetch_keys(State#state.nodes)), State};
+handle_call({get_node_info, Node, Key}, _From, State) ->
+ case dict:find(Node, State#state.nodes) of
+ {ok, NodeInfo} ->
+ {reply, couch_util:get_value(Key, NodeInfo), State};
+ error ->
+ {reply, error, State}
+ end;
+handle_call({add_node, Node, NodeInfo}, _From, #state{nodes=Nodes} = State) ->
+ gen_event:notify(mem3_events, {add_node, Node}),
+ {reply, ok, State#state{nodes = dict:store(Node, NodeInfo, Nodes)}};
+handle_call({remove_node, Node}, _From, #state{nodes=Nodes} = State) ->
+ gen_event:notify(mem3_events, {remove_node, Node}),
+ {reply, ok, State#state{nodes = dict:erase(Node, Nodes)}};
+handle_call(_Call, _From, State) ->
+ {noreply, State}.
+
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+handle_info({'DOWN', _, _, Pid, Reason}, #state{changes_pid=Pid} = State) ->
+ twig:log(notice, "~p changes listener died ~p", [?MODULE, Reason]),
+ StartSeq = State#state.update_seq,
+ Seq = case Reason of {seq, EndSeq} -> EndSeq; _ -> StartSeq end,
+ erlang:send_after(5000, self(), start_listener),
+ {noreply, State#state{update_seq = Seq}};
+handle_info(start_listener, #state{update_seq = Seq} = State) ->
+ {NewPid, _} = spawn_monitor(fun() -> listen_for_changes(Seq) end),
+ {noreply, State#state{changes_pid=NewPid}};
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%% internal functions
+
+initialize_nodelist() ->
+ DbName = couch_config:get("mem3", "node_db", "nodes"),
+ {ok, Db} = mem3_util:ensure_exists(DbName),
+ {ok, _, {_, Nodes0}} = couch_btree:fold(Db#db.id_tree, fun first_fold/3,
+ {Db, dict:new()}, []),
+ % add self if not already present
+ case dict:find(node(), Nodes0) of
+ {ok, _} ->
+ Nodes = Nodes0;
+ error ->
+ Doc = #doc{id = couch_util:to_binary(node())},
+ {ok, _} = couch_db:update_doc(Db, Doc, []),
+ Nodes = dict:store(node(), [], Nodes0)
+ end,
+ couch_db:close(Db),
+ {Nodes, Db#db.update_seq}.
+
+first_fold(#full_doc_info{id = <<"_design/", _/binary>>}, _, Acc) ->
+ {ok, Acc};
+first_fold(#full_doc_info{deleted=true}, _, Acc) ->
+ {ok, Acc};
+first_fold(#full_doc_info{id=Id}=DocInfo, _, {Db, Dict}) ->
+ {ok, #doc{body={Props}}} = couch_db:open_doc(Db, DocInfo),
+ {ok, {Db, dict:store(mem3_util:to_atom(Id), Props, Dict)}}.
+
+listen_for_changes(Since) ->
+ DbName = couch_config:get("mem3", "node_db", "nodes"),
+ {ok, Db} = mem3_util:ensure_exists(DbName),
+ Args = #changes_args{
+ feed = "continuous",
+ since = Since,
+ heartbeat = true,
+ include_docs = true
+ },
+ ChangesFun = couch_changes:handle_changes(Args, nil, Db),
+ ChangesFun(fun changes_callback/2).
+
+changes_callback(start, _) ->
+ {ok, nil};
+changes_callback({stop, EndSeq}, _) ->
+ exit({seq, EndSeq});
+changes_callback({change, {Change}, _}, _) ->
+ Node = couch_util:get_value(<<"id">>, Change),
+ case Node of <<"_design/", _/binary>> -> ok; _ ->
+ case couch_util:get_value(<<"deleted">>, Change, false) of
+ false ->
+ {Props} = couch_util:get_value(doc, Change),
+ gen_server:call(?MODULE, {add_node, mem3_util:to_atom(Node), Props});
+ true ->
+ gen_server:call(?MODULE, {remove_node, mem3_util:to_atom(Node)})
+ end
+ end,
+ {ok, couch_util:get_value(<<"seq">>, Change)};
+changes_callback(timeout, _) ->
+ {ok, nil}.
diff --git a/deps/mem3/src/mem3_rep.erl b/deps/mem3/src/mem3_rep.erl
new file mode 100644
index 00000000..b68973c5
--- /dev/null
+++ b/deps/mem3/src/mem3_rep.erl
@@ -0,0 +1,144 @@
+-module(mem3_rep).
+
+-export([go/2, changes_enumerator/3, make_local_id/2]).
+
+-include("mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+-define(CTX, #user_ctx{roles = [<<"_admin">>]}).
+
+-record(acc, {revcount = 0, infos = [], seq, localid, source, target}).
+
+go(DbName, Node) when is_binary(DbName), is_atom(Node) ->
+ go(#shard{name=DbName, node=node()}, #shard{name=DbName, node=Node});
+
+go(#shard{} = Source, #shard{} = Target) ->
+ LocalId = make_local_id(Source, Target),
+ case couch_db:open(Source#shard.name, [{user_ctx,?CTX}]) of
+ {ok, Db} ->
+ try
+ go(Db, Target, LocalId)
+ catch error:{not_found, no_db_file} ->
+ {error, missing_target}
+ after
+ couch_db:close(Db)
+ end;
+ {not_found, no_db_file} ->
+ {error, missing_source}
+ end.
+
+go(#db{name = DbName, seq_tree = Bt} = Db, #shard{} = Target, LocalId) ->
+ erlang:put(io_priority, {internal_repl, DbName}),
+ Seq = calculate_start_seq(Db, Target, LocalId),
+ Acc0 = #acc{source=Db, target=Target, seq=Seq, localid=LocalId},
+ Fun = fun ?MODULE:changes_enumerator/3,
+ {ok, _, AccOut} = couch_btree:fold(Bt, Fun, Acc0, [{start_key, Seq + 1}]),
+ {ok, #acc{seq = LastSeq}} = replicate_batch(AccOut),
+ case couch_db:count_changes_since(Db, LastSeq) of
+ 0 ->
+ ok;
+ N ->
+ exit({pending_changes, N})
+ end.
+
+make_local_id(#shard{node=SourceNode}, #shard{node=TargetNode}) ->
+ S = couch_util:encodeBase64Url(couch_util:md5(term_to_binary(SourceNode))),
+ T = couch_util:encodeBase64Url(couch_util:md5(term_to_binary(TargetNode))),
+ <<"_local/shard-sync-", S/binary, "-", T/binary>>.
+
+changes_enumerator(FullDocInfo, _, #acc{revcount = C} = Acc) when C >= 99 ->
+ #doc_info{high_seq = Seq} = couch_doc:to_doc_info(FullDocInfo),
+ {stop, Acc#acc{seq = Seq, infos = [FullDocInfo | Acc#acc.infos]}};
+
+changes_enumerator(FullDocInfo, _, #acc{revcount = C, infos = Infos} = Acc) ->
+ #doc_info{high_seq = Seq, revs = Revs} = couch_doc:to_doc_info(FullDocInfo),
+ Count = C + length(Revs),
+ {ok, Acc#acc{seq = Seq, revcount = Count, infos = [FullDocInfo | Infos]}}.
+
+replicate_batch(#acc{target = #shard{node=Node, name=Name}} = Acc) ->
+ case find_missing_revs(Acc) of
+ [] ->
+ ok;
+ Missing ->
+ ok = save_on_target(Node, Name, open_docs(Acc, Missing))
+ end,
+ update_locals(Acc),
+ {ok, Acc#acc{revcount=0, infos=[]}}.
+
+find_missing_revs(Acc) ->
+ #acc{target = #shard{node=Node, name=Name}, infos = Infos} = Acc,
+ IdsRevs = lists:map(fun(FDI) ->
+ #doc_info{id=Id, revs=RevInfos} = couch_doc:to_doc_info(FDI),
+ {Id, [R || #rev_info{rev=R} <- RevInfos]}
+ end, Infos),
+ Options = [{io_priority, {internal_repl, Name}}, {user_ctx, ?CTX}],
+ rexi_call(Node, {fabric_rpc, get_missing_revs, [Name, IdsRevs, Options]}).
+
+open_docs(#acc{source=Source, infos=Infos}, Missing) ->
+ lists:flatmap(fun({Id, Revs, _}) ->
+ FDI = lists:keyfind(Id, #full_doc_info.id, Infos),
+ open_doc_revs(Source, FDI, Revs)
+ end, Missing).
+
+save_on_target(Node, Name, Docs) ->
+ Options = [replicated_changes, full_commit, {user_ctx, ?CTX},
+ {io_priority, {internal_repl, Name}}],
+ rexi_call(Node, {fabric_rpc, update_docs, [Name, Docs, Options]}),
+ ok.
+
+update_locals(Acc) ->
+ #acc{seq=Seq, source=Db, target=Target, localid=Id} = Acc,
+ #shard{name=Name, node=Node} = Target,
+ Doc = #doc{id = Id, body = {[
+ {<<"seq">>, Seq},
+ {<<"node">>, list_to_binary(atom_to_list(Node))},
+ {<<"timestamp">>, list_to_binary(iso8601_timestamp())}
+ ]}},
+ {ok, _} = couch_db:update_doc(Db, Doc, []),
+ Options = [{user_ctx, ?CTX}, {io_priority, {internal_repl, Name}}],
+ rexi_call(Node, {fabric_rpc, update_docs, [Name, [Doc], Options]}).
+
+rexi_call(Node, MFA) ->
+ Mon = rexi_monitor:start([{rexi_server, Node}]),
+ Ref = rexi:cast(Node, MFA),
+ try
+ receive {Ref, {ok, Reply}} ->
+ Reply;
+ {Ref, Error} ->
+ erlang:error(Error);
+ {rexi_DOWN, Mon, _, Reason} ->
+ erlang:error({rexi_DOWN, Reason})
+ after 600000 ->
+ erlang:error(timeout)
+ end
+ after
+ rexi_monitor:stop(Mon)
+ end.
+
+calculate_start_seq(Db, #shard{node=Node, name=Name}, LocalId) ->
+ case couch_db:open_doc(Db, LocalId, []) of
+ {ok, #doc{body = {SProps}}} ->
+ Opts = [{user_ctx, ?CTX}, {io_priority, {internal_repl, Name}}],
+ try rexi_call(Node, {fabric_rpc, open_doc, [Name, LocalId, Opts]}) of
+ #doc{body = {TProps}} ->
+ SourceSeq = couch_util:get_value(<<"seq">>, SProps, 0),
+ TargetSeq = couch_util:get_value(<<"seq">>, TProps, 0),
+ erlang:min(SourceSeq, TargetSeq)
+ catch error:{not_found, _} ->
+ 0
+ end;
+ {not_found, _} ->
+ 0
+ end.
+
+open_doc_revs(Db, #full_doc_info{id=Id, rev_tree=RevTree}, Revs) ->
+ {FoundRevs, _} = couch_key_tree:get_key_leafs(RevTree, Revs),
+ lists:map(fun({#leaf{deleted=IsDel, ptr=SummaryPtr}, FoundRevPath}) ->
+ couch_db:make_doc(Db, Id, IsDel, SummaryPtr, FoundRevPath)
+ end, FoundRevs).
+
+iso8601_timestamp() ->
+ {_,_,Micro} = Now = os:timestamp(),
+ {{Year,Month,Date},{Hour,Minute,Second}} = calendar:now_to_datetime(Now),
+ Format = "~4.10.0B-~2.10.0B-~2.10.0BT~2.10.0B:~2.10.0B:~2.10.0B.~6.10.0BZ",
+ io_lib:format(Format, [Year, Month, Date, Hour, Minute, Second, Micro]).
diff --git a/deps/mem3/src/mem3_rep_manager.erl b/deps/mem3/src/mem3_rep_manager.erl
new file mode 100644
index 00000000..7b98701d
--- /dev/null
+++ b/deps/mem3/src/mem3_rep_manager.erl
@@ -0,0 +1,627 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+-module(mem3_rep_manager).
+-behaviour(gen_server).
+
+% public API
+-export([start_link/0, config_change/3]).
+-export([replication_started/1, replication_completed/1, replication_error/2]).
+
+% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3]).
+
+-include("mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch/include/couch_js_functions.hrl").
+
+-define(DOC_TO_REP, mem3_rep_doc_id_to_rep_id).
+-define(REP_TO_STATE, mem3_rep_id_to_rep_state).
+-define(DB_TO_SEQ, mem3_db_to_seq).
+-define(INITIAL_WAIT, 2.5). % seconds
+-define(MAX_WAIT, 600). % seconds
+-define(CTX, {user_ctx, #user_ctx{roles=[<<"_admin">>, <<"_replicator">>]}}).
+
+-record(state, {
+ db_notifier = nil,
+ max_retries,
+ scan_pid = nil,
+ rep_start_pids = []
+}).
+
+-record(rep_state, {
+ dbname,
+ doc_id,
+ user_ctx,
+ doc,
+ starting,
+ retries_left,
+ max_retries,
+ wait = ?INITIAL_WAIT
+}).
+
+-import(couch_util, [
+ get_value/2,
+ get_value/3,
+ to_binary/1
+]).
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+replication_started({BaseId, _} = RepId) ->
+ case rep_state(RepId) of
+ nil ->
+ ok;
+ #rep_state{dbname = DbName, doc_id = DocId} ->
+ update_rep_doc(DbName, DocId, [
+ {<<"_replication_state">>, <<"triggered">>},
+ {<<"_replication_id">>, ?l2b(BaseId)}]),
+ ok = gen_server:call(?MODULE, {rep_started, RepId}, infinity),
+ twig:log(notice, "Document `~s` triggered replication `~s`",
+ [DocId, pp_rep_id(RepId)])
+ end.
+
+
+replication_completed(RepId) ->
+ case rep_state(RepId) of
+ nil ->
+ ok;
+ #rep_state{dbname = DbName, doc_id = DocId} ->
+ update_rep_doc(DbName, DocId, [{<<"_replication_state">>, <<"completed">>}]),
+ ok = gen_server:call(?MODULE, {rep_complete, RepId}, infinity),
+ twig:log(notice, "Replication `~s` finished (triggered by document `~s`)",
+ [pp_rep_id(RepId), DocId])
+ end.
+
+
+replication_error({BaseId, _} = RepId, Error) ->
+ case rep_state(RepId) of
+ nil ->
+ ok;
+ #rep_state{dbname = DbName, doc_id = DocId} ->
+ % TODO: maybe add error reason to replication document
+ update_rep_doc(DbName, DocId, [
+ {<<"_replication_state">>, <<"error">>},
+ {<<"_replication_id">>, ?l2b(BaseId)}]),
+ ok = gen_server:call(?MODULE, {rep_error, RepId, Error}, infinity)
+ end.
+
+init(_) ->
+ process_flag(trap_exit, true),
+ net_kernel:monitor_nodes(true),
+ ?DOC_TO_REP = ets:new(?DOC_TO_REP, [named_table, set, protected]),
+ ?REP_TO_STATE = ets:new(?REP_TO_STATE, [named_table, set, protected]),
+ ?DB_TO_SEQ = ets:new(?DB_TO_SEQ, [named_table, set, protected]),
+ Server = self(),
+ ok = couch_config:register(fun ?MODULE:config_change/3, Server),
+ NotifierPid = db_update_notifier(),
+ ScanPid = spawn_link(fun() -> scan_all_dbs(Server) end),
+ {ok, #state{
+ db_notifier = NotifierPid,
+ scan_pid = ScanPid,
+ max_retries = retries_value(
+ couch_config:get("replicator", "max_replication_retry_count", "10"))
+ }}.
+
+config_change("replicator", "max_replication_retry_count", V) ->
+ ok = gen_server:cast(?MODULE, {set_max_retries, retries_value(V)}).
+
+handle_call({rep_db_update, DbName, {ChangeProps} = Change}, _From, State) ->
+ NewState = try
+ process_update(State, DbName, Change)
+ catch
+ _Tag:Error ->
+ {RepProps} = get_value(doc, ChangeProps),
+ DocId = get_value(<<"_id">>, RepProps),
+ rep_db_update_error(Error, DbName, DocId),
+ State
+ end,
+ {reply, ok, NewState};
+
+handle_call({rep_started, RepId}, _From, State) ->
+ case rep_state(RepId) of
+ nil ->
+ ok;
+ RepState ->
+ NewRepState = RepState#rep_state{
+ starting = false,
+ retries_left = State#state.max_retries,
+ max_retries = State#state.max_retries,
+ wait = ?INITIAL_WAIT
+ },
+ true = ets:insert(?REP_TO_STATE, {RepId, NewRepState})
+ end,
+ {reply, ok, State};
+
+handle_call({rep_complete, RepId}, _From, State) ->
+ true = ets:delete(?REP_TO_STATE, RepId),
+ {reply, ok, State};
+
+handle_call({rep_error, RepId, Error}, _From, State) ->
+ {reply, ok, replication_error(State, RepId, Error)};
+
+handle_call({resume_scan, DbName}, _From, State) ->
+ Since = case ets:lookup(?DB_TO_SEQ, DbName) of
+ [] -> 0;
+ [{DbName, EndSeq}] -> EndSeq
+ end,
+ Pid = changes_feed_loop(DbName, Since),
+ twig:log(debug, "Scanning ~s from update_seq ~p", [DbName, Since]),
+ {reply, ok, State#state{rep_start_pids = [Pid | State#state.rep_start_pids]}};
+
+handle_call({rep_db_checkpoint, DbName, EndSeq}, _From, State) ->
+ true = ets:insert(?DB_TO_SEQ, {DbName, EndSeq}),
+ {reply, ok, State};
+
+handle_call(Msg, From, State) ->
+ twig:log(error, "Replication manager received unexpected call ~p from ~p",
+ [Msg, From]),
+ {stop, {error, {unexpected_call, Msg}}, State}.
+
+
+handle_cast({set_max_retries, MaxRetries}, State) ->
+ {noreply, State#state{max_retries = MaxRetries}};
+
+handle_cast(Msg, State) ->
+ twig:log(error, "Replication manager received unexpected cast ~p", [Msg]),
+ {stop, {error, {unexpected_cast, Msg}}, State}.
+
+handle_info({nodeup, _Node}, State) ->
+ {noreply, rescan(State)};
+
+handle_info({nodedown, _Node}, State) ->
+ {noreply, rescan(State)};
+
+handle_info({'EXIT', From, normal}, #state{scan_pid = From} = State) ->
+ twig:log(debug, "Background scan has completed.", []),
+ {noreply, State#state{scan_pid=nil}};
+
+handle_info({'EXIT', From, Reason}, #state{scan_pid = From} = State) ->
+ twig:log(error, "Background scanner died. Reason: ~p", [Reason]),
+ {stop, {scanner_died, Reason}, State};
+
+handle_info({'EXIT', From, Reason}, #state{db_notifier = From} = State) ->
+ twig:log(error, "Database update notifier died. Reason: ~p", [Reason]),
+ {stop, {db_update_notifier_died, Reason}, State};
+
+handle_info({'EXIT', From, normal}, #state{rep_start_pids = Pids} = State) ->
+ % one of the replication start processes terminated successfully
+ {noreply, State#state{rep_start_pids = Pids -- [From]}};
+
+handle_info({'DOWN', _Ref, _, _, _}, State) ->
+ % From a db monitor created by a replication process. Ignore.
+ {noreply, State};
+
+handle_info(Msg, State) ->
+ twig:log(error, "Replication manager received unexpected message ~p", [Msg]),
+ {stop, {unexpected_msg, Msg}, State}.
+
+terminate(_Reason, State) ->
+ #state{
+ scan_pid = ScanPid,
+ rep_start_pids = StartPids,
+ db_notifier = DbNotifier
+ } = State,
+ stop_all_replications(),
+ lists:foreach(
+ fun(Pid) ->
+ catch unlink(Pid),
+ catch exit(Pid, stop)
+ end,
+ [ScanPid | StartPids]),
+ true = ets:delete(?REP_TO_STATE),
+ true = ets:delete(?DOC_TO_REP),
+ true = ets:delete(?DB_TO_SEQ),
+ couch_db_update_notifier:stop(DbNotifier).
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+changes_feed_loop(DbName, Since) ->
+ Server = self(),
+ Pid = spawn_link(
+ fun() ->
+ fabric:changes(DbName, fun
+ ({change, Change}, Acc) ->
+ case has_valid_rep_id(Change) of
+ true ->
+ ok = gen_server:call(
+ Server, {rep_db_update, DbName, Change}, infinity);
+ false ->
+ ok
+ end,
+ {ok, Acc};
+ ({stop, EndSeq}, Acc) ->
+ ok = gen_server:call(Server, {rep_db_checkpoint, DbName, EndSeq}),
+ {ok, Acc};
+ (_, Acc) ->
+ {ok, Acc}
+ end,
+ nil,
+ #changes_args{
+ include_docs = true,
+ feed = "normal",
+ since = Since,
+ filter = main_only,
+ timeout = infinity,
+ db_open_options = [sys_db]
+ }
+ )
+ end),
+ Pid.
+
+has_valid_rep_id({Change}) ->
+ has_valid_rep_id(get_value(<<"id">>, Change));
+has_valid_rep_id(<<?DESIGN_DOC_PREFIX, _Rest/binary>>) ->
+ false;
+has_valid_rep_id(_Else) ->
+ true.
+
+
+db_update_notifier() ->
+ Server = self(),
+ IsReplicatorDbFun = is_replicator_db_fun(),
+ {ok, Notifier} = couch_db_update_notifier:start_link(
+ fun({_, DbName}) ->
+ case IsReplicatorDbFun(DbName) of
+ true ->
+ ok = gen_server:call(Server, {resume_scan, mem3:dbname(DbName)});
+ _ ->
+ ok
+ end
+ end
+ ),
+ Notifier.
+
+rescan(#state{scan_pid = nil} = State) ->
+ true = ets:delete_all_objects(?DB_TO_SEQ),
+ Server = self(),
+ NewScanPid = spawn_link(fun() -> scan_all_dbs(Server) end),
+ State#state{scan_pid = NewScanPid};
+rescan(#state{scan_pid = ScanPid} = State) ->
+ unlink(ScanPid),
+ exit(ScanPid, exit),
+ rescan(State#state{scan_pid = nil}).
+
+process_update(State, DbName, {Change}) ->
+ {RepProps} = JsonRepDoc = get_value(doc, Change),
+ DocId = get_value(<<"_id">>, RepProps),
+ case {owner(DbName, DocId), get_value(deleted, Change, false)} of
+ {false, _} ->
+ replication_complete(DocId),
+ State;
+ {true, true} ->
+ rep_doc_deleted(DocId),
+ State;
+ {true, false} ->
+ case get_value(<<"_replication_state">>, RepProps) of
+ undefined ->
+ maybe_start_replication(State, DbName, DocId, JsonRepDoc);
+ <<"triggered">> ->
+ maybe_start_replication(State, DbName, DocId, JsonRepDoc);
+ <<"completed">> ->
+ replication_complete(DocId),
+ State;
+ <<"error">> ->
+ case ets:lookup(?DOC_TO_REP, DocId) of
+ [] ->
+ maybe_start_replication(State, DbName, DocId, JsonRepDoc);
+ _ ->
+ State
+ end
+ end
+ end.
+
+
+rep_db_update_error(Error, DbName, DocId) ->
+ case Error of
+ {bad_rep_doc, Reason} ->
+ ok;
+ _ ->
+ Reason = to_binary(Error)
+ end,
+ twig:log(error, "Replication manager, error processing document `~s`: ~s",
+ [DocId, Reason]),
+ update_rep_doc(DbName, DocId, [{<<"_replication_state">>, <<"error">>}]).
+
+
+rep_user_ctx({RepDoc}) ->
+ case get_value(<<"user_ctx">>, RepDoc) of
+ undefined ->
+ #user_ctx{};
+ {UserCtx} ->
+ #user_ctx{
+ name = get_value(<<"name">>, UserCtx, null),
+ roles = get_value(<<"roles">>, UserCtx, [])
+ }
+ end.
+
+
+maybe_start_replication(State, DbName, DocId, RepDoc) ->
+ UserCtx = rep_user_ctx(RepDoc),
+ {BaseId, _} = RepId = make_rep_id(RepDoc, UserCtx),
+ case rep_state(RepId) of
+ nil ->
+ RepState = #rep_state{
+ dbname = DbName,
+ doc_id = DocId,
+ user_ctx = UserCtx,
+ doc = RepDoc,
+ starting = true,
+ retries_left = State#state.max_retries,
+ max_retries = State#state.max_retries
+ },
+ true = ets:insert(?REP_TO_STATE, {RepId, RepState}),
+ true = ets:insert(?DOC_TO_REP, {DocId, RepId}),
+ twig:log(notice, "Attempting to start replication `~s` (document `~s`).",
+ [pp_rep_id(RepId), DocId]),
+ Server = self(),
+ Pid = spawn_link(fun() ->
+ start_replication(Server, RepDoc, RepId, UserCtx, 0)
+ end),
+ State#state{rep_start_pids = [Pid | State#state.rep_start_pids]};
+ #rep_state{doc_id = DocId} ->
+ State;
+ #rep_state{starting = false, dbname = DbName, doc_id = OtherDocId} ->
+ twig:log(notice, "The replication specified by the document `~s` was already"
+ " triggered by the document `~s`", [DocId, OtherDocId]),
+ maybe_tag_rep_doc(DbName, DocId, RepDoc, ?l2b(BaseId)),
+ State;
+ #rep_state{starting = true, dbname = DbName, doc_id = OtherDocId} ->
+ twig:log(notice, "The replication specified by the document `~s` is already"
+ " being triggered by the document `~s`", [DocId, OtherDocId]),
+ maybe_tag_rep_doc(DbName, DocId, RepDoc, ?l2b(BaseId)),
+ State
+ end.
+
+
+make_rep_id(RepDoc, UserCtx) ->
+ try
+ couch_rep:make_replication_id(RepDoc, UserCtx)
+ catch
+ throw:{error, Reason} ->
+ throw({bad_rep_doc, Reason});
+ Tag:Err ->
+ throw({bad_rep_doc, to_binary({Tag, Err})})
+ end.
+
+
+maybe_tag_rep_doc(DbName, DocId, {RepProps}, RepId) ->
+ case get_value(<<"_replication_id">>, RepProps) of
+ RepId ->
+ ok;
+ _ ->
+ update_rep_doc(DbName, DocId, [{<<"_replication_id">>, RepId}])
+ end.
+
+
+start_replication(Server, RepDoc, RepId, UserCtx, Wait) ->
+ ok = timer:sleep(Wait * 1000),
+ case (catch couch_rep:start_replication(RepDoc, RepId, UserCtx, ?MODULE)) of
+ Pid when is_pid(Pid) ->
+ ok = gen_server:call(Server, {rep_started, RepId}, infinity),
+ couch_rep:get_result(Pid, RepId, RepDoc, UserCtx);
+ Error ->
+ replication_error(RepId, Error)
+ end.
+
+
+replication_complete(DocId) ->
+ case ets:lookup(?DOC_TO_REP, DocId) of
+ [{DocId, RepId}] ->
+ case rep_state(RepId) of
+ nil ->
+ couch_rep:end_replication(RepId);
+ #rep_state{} ->
+ ok
+ end,
+ true = ets:delete(?DOC_TO_REP, DocId);
+ _ ->
+ ok
+ end.
+
+
+rep_doc_deleted(DocId) ->
+ case ets:lookup(?DOC_TO_REP, DocId) of
+ [{DocId, RepId}] ->
+ couch_rep:end_replication(RepId),
+ true = ets:delete(?REP_TO_STATE, RepId),
+ true = ets:delete(?DOC_TO_REP, DocId),
+ twig:log(notice, "Stopped replication `~s` because replication document `~s`"
+ " was deleted", [pp_rep_id(RepId), DocId]);
+ [] ->
+ ok
+ end.
+
+
+replication_error(State, RepId, Error) ->
+ case rep_state(RepId) of
+ nil ->
+ State;
+ RepState ->
+ maybe_retry_replication(RepId, RepState, Error, State)
+ end.
+
+maybe_retry_replication(RepId, #rep_state{retries_left = 0} = RepState, Error, State) ->
+ #rep_state{
+ doc_id = DocId,
+ max_retries = MaxRetries
+ } = RepState,
+ couch_rep:end_replication(RepId),
+ true = ets:delete(?REP_TO_STATE, RepId),
+ true = ets:delete(?DOC_TO_REP, DocId),
+ twig:log(error, "Error in replication `~s` (triggered by document `~s`): ~s"
+ "~nReached maximum retry attempts (~p).",
+ [pp_rep_id(RepId), DocId, to_binary(error_reason(Error)), MaxRetries]),
+ State;
+
+maybe_retry_replication(RepId, RepState, Error, State) ->
+ #rep_state{
+ doc_id = DocId,
+ user_ctx = UserCtx,
+ doc = RepDoc
+ } = RepState,
+ #rep_state{wait = Wait} = NewRepState = state_after_error(RepState),
+ true = ets:insert(?REP_TO_STATE, {RepId, NewRepState}),
+ twig:log(error, "Error in replication `~s` (triggered by document `~s`): ~s"
+ "~nRestarting replication in ~p seconds.",
+ [pp_rep_id(RepId), DocId, to_binary(error_reason(Error)), Wait]),
+ Server = self(),
+ Pid = spawn_link(fun() ->
+ start_replication(Server, RepDoc, RepId, UserCtx, Wait)
+ end),
+ State#state{rep_start_pids = [Pid | State#state.rep_start_pids]}.
+
+stop_all_replications() ->
+ twig:log(notice, "Stopping all ongoing replications.", []),
+ ets:foldl(
+ fun({_, RepId}, _) ->
+ couch_rep:end_replication(RepId)
+ end,
+ ok, ?DOC_TO_REP),
+ true = ets:delete_all_objects(?REP_TO_STATE),
+ true = ets:delete_all_objects(?DOC_TO_REP),
+ true = ets:delete_all_objects(?DB_TO_SEQ).
+
+update_rep_doc(RepDbName, RepDocId, KVs) when is_binary(RepDocId) ->
+ spawn_link(fun() ->
+ try
+ case fabric:open_doc(mem3:dbname(RepDbName), RepDocId, []) of
+ {ok, LatestRepDoc} ->
+ update_rep_doc(RepDbName, LatestRepDoc, KVs);
+ _ ->
+ ok
+ end
+ catch throw:conflict ->
+ % Shouldn't happen, as by default only the role _replicator can
+ % update replication documents.
+ twig:log(error, "Conflict error when updating replication document `~s`."
+ " Retrying.", [RepDocId]),
+ ok = timer:sleep(5),
+ update_rep_doc(RepDbName, RepDocId, KVs)
+ end
+ end);
+
+update_rep_doc(RepDbName, #doc{body = {RepDocBody}} = RepDoc, KVs) ->
+ NewRepDocBody = lists:foldl(
+ fun({<<"_replication_state">> = K, State} = KV, Body) ->
+ case get_value(K, Body) of
+ State ->
+ Body;
+ _ ->
+ Body1 = lists:keystore(K, 1, Body, KV),
+ lists:keystore(
+ <<"_replication_state_time">>, 1, Body1,
+ {<<"_replication_state_time">>, timestamp()})
+ end;
+ ({K, _V} = KV, Body) ->
+ lists:keystore(K, 1, Body, KV)
+ end,
+ RepDocBody, KVs),
+ case NewRepDocBody of
+ RepDocBody ->
+ ok;
+ _ ->
+ % Might not succeed - when the replication doc is deleted right
+ % before this update (not an error, ignore).
+ spawn_link(fun() ->
+ fabric:update_doc(RepDbName, RepDoc#doc{body = {NewRepDocBody}}, [?CTX])
+ end)
+ end.
+
+
+% RFC3339 timestamps.
+% Note: doesn't include the time seconds fraction (RFC3339 says it's optional).
+timestamp() ->
+ {{Year, Month, Day}, {Hour, Min, Sec}} = calendar:now_to_local_time(now()),
+ UTime = erlang:universaltime(),
+ LocalTime = calendar:universal_time_to_local_time(UTime),
+ DiffSecs = calendar:datetime_to_gregorian_seconds(LocalTime) -
+ calendar:datetime_to_gregorian_seconds(UTime),
+ zone(DiffSecs div 3600, (DiffSecs rem 3600) div 60),
+ iolist_to_binary(
+ io_lib:format("~4..0w-~2..0w-~2..0wT~2..0w:~2..0w:~2..0w~s",
+ [Year, Month, Day, Hour, Min, Sec,
+ zone(DiffSecs div 3600, (DiffSecs rem 3600) div 60)])).
+
+zone(Hr, Min) when Hr >= 0, Min >= 0 ->
+ io_lib:format("+~2..0w:~2..0w", [Hr, Min]);
+zone(Hr, Min) ->
+ io_lib:format("-~2..0w:~2..0w", [abs(Hr), abs(Min)]).
+
+% pretty-print replication id
+pp_rep_id({Base, Extension}) ->
+ Base ++ Extension.
+
+
+rep_state(RepId) ->
+ case ets:lookup(?REP_TO_STATE, RepId) of
+ [{RepId, RepState}] ->
+ RepState;
+ [] ->
+ nil
+ end.
+
+
+error_reason({error, Reason}) ->
+ Reason;
+error_reason(Reason) ->
+ Reason.
+
+retries_value("infinity") ->
+ infinity;
+retries_value(Value) ->
+ list_to_integer(Value).
+
+state_after_error(#rep_state{retries_left = Left, wait = Wait} = State) ->
+ Wait2 = erlang:min(trunc(Wait * 2), ?MAX_WAIT),
+ case Left of
+ infinity ->
+ State#rep_state{wait = Wait2};
+ _ ->
+ State#rep_state{retries_left = Left - 1, wait = Wait2}
+ end.
+
+scan_all_dbs(Server) when is_pid(Server) ->
+ {ok, Db} = mem3_util:ensure_exists(
+ couch_config:get("mem3", "shard_db", "dbs")),
+ ChangesFun = couch_changes:handle_changes(#changes_args{}, nil, Db),
+ IsReplicatorDbFun = is_replicator_db_fun(),
+ ChangesFun(fun({change, {Change}, _}, _) ->
+ DbName = couch_util:get_value(<<"id">>, Change),
+ case DbName of <<"_design/", _/binary>> -> ok; _Else ->
+ case couch_util:get_value(<<"deleted">>, Change, false) of
+ true ->
+ ok;
+ false ->
+ IsReplicatorDbFun(DbName) andalso
+ gen_server:call(Server, {resume_scan, DbName})
+ end
+ end;
+ (_, _) -> ok
+ end),
+ couch_db:close(Db).
+
+is_replicator_db_fun() ->
+ {ok, RegExp} = re:compile("^([a-z][a-z0-9\\_\\$()\\+\\-\\/]*/)?_replicator$"),
+ fun(DbName) ->
+ match =:= re:run(mem3:dbname(DbName), RegExp, [{capture,none}])
+ end.
+
+owner(DbName, DocId) ->
+ Shards = mem3:shards(DbName, DocId),
+ Nodes = [node()|nodes()],
+ LiveShards = [S || #shard{node=Node} = S <- Shards, lists:member(Node, Nodes)],
+ [#shard{node=Node}] = lists:usort(fun(#shard{name=A}, #shard{name=B}) ->
+ A =< B end, LiveShards),
+ node() =:= Node.
diff --git a/deps/mem3/src/mem3_sup.erl b/deps/mem3/src/mem3_sup.erl
new file mode 100644
index 00000000..07b9498b
--- /dev/null
+++ b/deps/mem3/src/mem3_sup.erl
@@ -0,0 +1,36 @@
+% Copyright 2010 Cloudant
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_sup).
+-behaviour(supervisor).
+-export([start_link/0, init/1]).
+
+start_link() ->
+ supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+init(_Args) ->
+ Children = [
+ child(mem3_events),
+ child(mem3_cache),
+ child(mem3_nodes),
+ child(mem3_sync),
+ child(mem3_rep_manager)
+ ],
+ {ok, {{one_for_one,10,1}, Children}}.
+
+child(mem3_events) ->
+ MFA = {gen_event, start_link, [{local, mem3_events}]},
+ {mem3_events, MFA, permanent, 1000, worker, dynamic};
+child(Child) ->
+ {Child, {Child, start_link, []}, permanent, 1000, worker, [Child]}.
diff --git a/deps/mem3/src/mem3_sync.erl b/deps/mem3/src/mem3_sync.erl
new file mode 100644
index 00000000..191a98c6
--- /dev/null
+++ b/deps/mem3/src/mem3_sync.erl
@@ -0,0 +1,267 @@
+% Copyright 2010 Cloudant
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_sync).
+-behaviour(gen_server).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3]).
+
+-export([start_link/0, get_active/0, get_queue/0, push/1, push/2,
+ remove_node/1, initial_sync/1]).
+
+-include("mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+-record(state, {
+ active = [],
+ count = 0,
+ limit,
+ dict = dict:new(),
+ waiting = [],
+ update_notifier
+}).
+
+-record(job, {name, node, count=nil, pid=nil}).
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+get_active() ->
+ gen_server:call(?MODULE, get_active).
+
+get_queue() ->
+ gen_server:call(?MODULE, get_queue).
+
+push(#shard{name = Name}, Target) ->
+ push(Name, Target);
+push(Name, #shard{node=Node}) ->
+ push(Name, Node);
+push(Name, Node) ->
+ push(#job{name = Name, node = Node}).
+
+push(#job{node = Node} = Job) when Node =/= node() ->
+ gen_server:cast(?MODULE, {push, Job});
+push(_) ->
+ ok.
+
+remove_node(Node) ->
+ gen_server:cast(?MODULE, {remove_node, Node}).
+
+init([]) ->
+ process_flag(trap_exit, true),
+ Concurrency = couch_config:get("mem3", "sync_concurrency", "10"),
+ gen_event:add_handler(mem3_events, mem3_sync_event, []),
+ {ok, Pid} = start_update_notifier(),
+ spawn(fun initial_sync/0),
+ {ok, #state{limit = list_to_integer(Concurrency), update_notifier=Pid}}.
+
+handle_call(get_active, _From, State) ->
+ {reply, State#state.active, State};
+
+handle_call(get_queue, _From, State) ->
+ {reply, State#state.waiting, State};
+
+handle_call(get_backlog, _From, #state{active=A, waiting=W} = State) ->
+ CA = lists:sum([C || #job{count=C} <- A, is_integer(C)]),
+ CW = lists:sum([C || #job{count=C} <- W, is_integer(C)]),
+ {reply, CA+CW, State}.
+
+handle_cast({push, DbName, Node}, State) ->
+ handle_cast({push, #job{name = DbName, node = Node}}, State);
+
+handle_cast({push, Job}, #state{count=Count, limit=Limit} = State)
+ when Count >= Limit ->
+ {noreply, add_to_queue(State, Job)};
+
+handle_cast({push, Job}, State) ->
+ #state{active = L, count = C} = State,
+ #job{name = DbName, node = Node} = Job,
+ case is_running(DbName, Node, L) of
+ true ->
+ {noreply, add_to_queue(State, Job)};
+ false ->
+ Pid = start_push_replication(Job),
+ {noreply, State#state{active=[Job#job{pid=Pid}|L], count=C+1}}
+ end;
+
+handle_cast({remove_node, Node}, #state{waiting = W0} = State) ->
+ {Alive, Dead} = lists:partition(fun(#job{node=N}) -> N =/= Node end, W0),
+ Dict = remove_entries(State#state.dict, Dead),
+ [exit(Pid, die_now) || #job{node=N, pid=Pid} <- State#state.active,
+ N =:= Node],
+ {noreply, State#state{dict = Dict, waiting = Alive}};
+
+handle_cast({remove_shard, Shard}, #state{waiting = W0} = State) ->
+ {Alive, Dead} = lists:partition(fun(#job{name=S}) -> S =/= Shard end, W0),
+ Dict = remove_entries(State#state.dict, Dead),
+ [exit(Pid, die_now) || #job{name=S, pid=Pid} <- State#state.active,
+ S =:= Shard],
+ {noreply, State#state{dict = Dict, waiting = Alive}}.
+
+handle_info({'EXIT', Pid, _}, #state{update_notifier=Pid} = State) ->
+ {ok, NewPid} = start_update_notifier(),
+ {noreply, State#state{update_notifier=NewPid}};
+
+handle_info({'EXIT', Active, normal}, State) ->
+ handle_replication_exit(State, Active);
+
+handle_info({'EXIT', Active, die_now}, State) ->
+ % we forced this one ourselves, do not retry
+ handle_replication_exit(State, Active);
+
+handle_info({'EXIT', Active, {{not_found, no_db_file}, _Stack}}, State) ->
+ % target doesn't exist, do not retry
+ handle_replication_exit(State, Active);
+
+handle_info({'EXIT', Active, Reason}, State) ->
+ NewState = case lists:keyfind(Active, #job.pid, State#state.active) of
+ #job{name=OldDbName, node=OldNode} = Job ->
+ twig:log(warn, "~p ~s -> ~p ~p", [?MODULE, OldDbName, OldNode,
+ Reason]),
+ case Reason of {pending_changes, Count} ->
+ add_to_queue(State, Job#job{pid = nil, count = Count});
+ _ ->
+ timer:apply_after(5000, ?MODULE, push, [Job#job{pid=nil}]),
+ State
+ end;
+ false -> State end,
+ handle_replication_exit(NewState, Active);
+
+handle_info(Msg, State) ->
+ twig:log(notice, "unexpected msg at replication manager ~p", [Msg]),
+ {noreply, State}.
+
+terminate(_Reason, State) ->
+ [exit(Pid, shutdown) || #job{pid=Pid} <- State#state.active],
+ ok.
+
+code_change(_, #state{waiting = [{_,_}|_] = W, active=A} = State, _) ->
+ Waiting = [#job{name=Name, node=Node} || {Name,Node} <- W],
+ Active = [#job{name=Name, node=Node, pid=Pid} || {Name,Node,Pid} <- A],
+ {ok, State#state{active = Active, waiting = Waiting}};
+
+code_change(_, State, _) ->
+ {ok, State}.
+
+handle_replication_exit(#state{waiting=[]} = State, Pid) ->
+ NewActive = lists:keydelete(Pid, #job.pid, State#state.active),
+ {noreply, State#state{active=NewActive, count=length(NewActive)}};
+handle_replication_exit(State, Pid) ->
+ #state{active=Active, limit=Limit, dict=D, waiting=Waiting} = State,
+ Active1 = lists:keydelete(Pid, #job.pid, Active),
+ Count = length(Active1),
+ NewState = if Count < Limit ->
+ case next_replication(Active1, Waiting) of
+ nil -> % all waiting replications are also active
+ State#state{active = Active1, count = Count};
+ {#job{name=DbName, node=Node} = Job, StillWaiting} ->
+ NewPid = start_push_replication(Job),
+ State#state{
+ active = [Job#job{pid = NewPid} | Active1],
+ count = Count+1,
+ dict = dict:erase({DbName,Node}, D),
+ waiting = StillWaiting
+ }
+ end;
+ true ->
+ State#state{active = Active1, count=Count}
+ end,
+ {noreply, NewState}.
+
+start_push_replication(#job{name=Name, node=Node}) ->
+ spawn_link(mem3_rep, go, [Name, Node]).
+
+add_to_queue(State, #job{name=DbName, node=Node} = Job) ->
+ #state{dict=D, waiting=Waiting} = State,
+ case dict:is_key({DbName, Node}, D) of
+ true ->
+ State;
+ false ->
+ twig:log(debug, "adding ~s -> ~p to mem3_sync queue", [DbName, Node]),
+ State#state{
+ dict = dict:store({DbName,Node}, ok, D),
+ waiting = Waiting ++ [Job]
+ }
+ end.
+
+sync_nodes_and_dbs() ->
+ Db1 = couch_config:get("mem3", "node_db", "nodes"),
+ Db2 = couch_config:get("mem3", "shard_db", "dbs"),
+ Db3 = couch_config:get("couch_httpd_auth", "authentication_db", "_users"),
+ Dbs = [Db1, Db2, Db3],
+ Nodes = mem3:nodes(),
+ Live = nodes(),
+ [[push(?l2b(Db), N) || Db <- Dbs] || N <- Nodes, lists:member(N, Live)].
+
+initial_sync() ->
+ [net_kernel:connect_node(Node) || Node <- mem3:nodes()],
+ sync_nodes_and_dbs(),
+ initial_sync(nodes()).
+
+initial_sync(Live) ->
+ Self = node(),
+ {ok, AllDbs} = fabric:all_dbs(),
+ lists:foreach(fun(Db) ->
+ LocalShards = [S || #shard{node=N} = S <- mem3:shards(Db), N =:= Self],
+ lists:foreach(fun(#shard{name=ShardName}) ->
+ Targets = [S || #shard{node=N, name=Name} = S <- mem3:shards(Db),
+ N =/= Self, Name =:= ShardName],
+ [?MODULE:push(ShardName, N) || #shard{node=N} <- Targets,
+ lists:member(N, Live)]
+ end, LocalShards)
+ end, AllDbs).
+
+start_update_notifier() ->
+ Db1 = ?l2b(couch_config:get("mem3", "node_db", "nodes")),
+ Db2 = ?l2b(couch_config:get("mem3", "shard_db", "dbs")),
+ Db3 = ?l2b(couch_config:get("couch_httpd_auth", "authentication_db",
+ "_users")),
+ couch_db_update_notifier:start_link(fun
+ ({updated, Db}) when Db == Db1; Db == Db2; Db == Db3 ->
+ Nodes = mem3:nodes(),
+ Live = nodes(),
+ [?MODULE:push(Db, N) || N <- Nodes, lists:member(N, Live)];
+ ({updated, <<"shards/", _/binary>> = ShardName}) ->
+ % TODO deal with split/merged partitions by comparing keyranges
+ try mem3:shards(mem3:dbname(ShardName)) of
+ Shards ->
+ Targets = [S || #shard{node=N, name=Name} = S <- Shards,
+ N =/= node(), Name =:= ShardName],
+ Live = nodes(),
+ [?MODULE:push(ShardName,N) || #shard{node=N} <- Targets,
+ lists:member(N, Live)]
+ catch error:database_does_not_exist ->
+ ok
+ end;
+ ({deleted, <<"shards/", _:18/binary, _/binary>> = ShardName}) ->
+ gen_server:cast(?MODULE, {remove_shard, ShardName});
+ (_) -> ok end).
+
+%% @doc Finds the next {DbName,Node} pair in the list of waiting replications
+%% which does not correspond to an already running replication
+-spec next_replication([#job{}], [#job{}]) -> {#job{}, [#job{}]} | nil.
+next_replication(Active, Waiting) ->
+ Fun = fun(#job{name=S, node=N}) -> is_running(S,N,Active) end,
+ case lists:splitwith(Fun, Waiting) of
+ {_, []} ->
+ nil;
+ {Running, [Job|Rest]} ->
+ {Job, Running ++ Rest}
+ end.
+
+is_running(DbName, Node, ActiveList) ->
+ [] =/= [true || #job{name=S, node=N} <- ActiveList, S=:=DbName, N=:=Node].
+
+remove_entries(Dict, Entries) ->
+ lists:foldl(fun(Entry, D) -> dict:erase(Entry, D) end, Dict, Entries).
diff --git a/deps/mem3/src/mem3_sync_event.erl b/deps/mem3/src/mem3_sync_event.erl
new file mode 100644
index 00000000..be5ab450
--- /dev/null
+++ b/deps/mem3/src/mem3_sync_event.erl
@@ -0,0 +1,68 @@
+% Copyright 2010 Cloudant
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_sync_event).
+-behaviour(gen_event).
+
+-export([init/1, handle_event/2, handle_call/2, handle_info/2, terminate/2,
+ code_change/3]).
+
+init(_) ->
+ net_kernel:monitor_nodes(true),
+ {ok, nil}.
+
+handle_event({add_node, Node}, State) when Node =/= node() ->
+ Db1 = list_to_binary(couch_config:get("mem3", "node_db", "nodes")),
+ Db2 = list_to_binary(couch_config:get("mem3", "shard_db", "dbs")),
+ Db3 = list_to_binary(couch_config:get("couch_httpd_auth",
+ "authentication_db", "_users")),
+ [mem3_sync:push(Db, Node) || Db <- [Db1, Db2, Db3]],
+ {ok, State};
+
+handle_event({remove_node, Node}, State) ->
+ mem3_sync:remove_node(Node),
+ {ok, State};
+
+handle_event(_Event, State) ->
+ {ok, State}.
+
+handle_call(_Request, State) ->
+ {ok, ok, State}.
+
+handle_info({nodeup, Node}, State) ->
+ case lists:member(Node, mem3:nodes()) of
+ true ->
+ Db1 = list_to_binary(couch_config:get("mem3", "node_db", "nodes")),
+ Db2 = list_to_binary(couch_config:get("mem3", "shard_db", "dbs")),
+ Db3 = list_to_binary(couch_config:get("couch_httpd_auth",
+ "authentication_db", "_users")),
+ [mem3_sync:push(Db, Node) || Db <- [Db1, Db2, Db3]],
+ mem3_sync:initial_sync([Node]);
+ false ->
+ ok
+ end,
+ {ok, State};
+
+handle_info({nodedown, Node}, State) ->
+ mem3_sync:remove_node(Node),
+ {ok, State};
+
+handle_info(_Info, State) ->
+ {ok, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
diff --git a/deps/mem3/src/mem3_util.erl b/deps/mem3/src/mem3_util.erl
new file mode 100644
index 00000000..c1b965bb
--- /dev/null
+++ b/deps/mem3/src/mem3_util.erl
@@ -0,0 +1,211 @@
+% Copyright 2010 Cloudant
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_util).
+
+-export([hash/1, name_shard/2, create_partition_map/5, build_shards/2,
+ n_val/2, z_val/3, to_atom/1, to_integer/1, write_db_doc/1, delete_db_doc/1,
+ load_shards_from_disk/1, load_shards_from_disk/2, shard_info/1,
+ ensure_exists/1, open_db_doc/1]).
+
+-export([create_partition_map/4, name_shard/1]).
+-deprecated({create_partition_map, 4, eventually}).
+-deprecated({name_shard, 1, eventually}).
+
+-define(RINGTOP, 2 bsl 31). % CRC32 space
+
+-include("mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+hash(Item) when is_binary(Item) ->
+ erlang:crc32(Item);
+hash(Item) ->
+ erlang:crc32(term_to_binary(Item)).
+
+name_shard(Shard) ->
+ name_shard(Shard, "").
+
+name_shard(#shard{dbname = DbName, range=[B,E]} = Shard, Suffix) ->
+ Name = ["shards/", couch_util:to_hex(<<B:32/integer>>), "-",
+ couch_util:to_hex(<<E:32/integer>>), "/", DbName, Suffix],
+ Shard#shard{name = ?l2b(Name)}.
+
+create_partition_map(DbName, N, Q, Nodes) ->
+ create_partition_map(DbName, N, Q, Nodes, "").
+
+create_partition_map(DbName, N, Q, Nodes, Suffix) ->
+ UniqueShards = make_key_ranges((?RINGTOP) div Q, 0, []),
+ Shards0 = lists:flatten([lists:duplicate(N, S) || S <- UniqueShards]),
+ Shards1 = attach_nodes(Shards0, [], Nodes, []),
+ [name_shard(S#shard{dbname=DbName}, Suffix) || S <- Shards1].
+
+make_key_ranges(_, CurrentPos, Acc) when CurrentPos >= ?RINGTOP ->
+ Acc;
+make_key_ranges(Increment, Start, Acc) ->
+ case Start + 2*Increment of
+ X when X > ?RINGTOP ->
+ End = ?RINGTOP - 1;
+ _ ->
+ End = Start + Increment - 1
+ end,
+ make_key_ranges(Increment, End+1, [#shard{range=[Start, End]} | Acc]).
+
+attach_nodes([], Acc, _, _) ->
+ lists:reverse(Acc);
+attach_nodes(Shards, Acc, [], UsedNodes) ->
+ attach_nodes(Shards, Acc, lists:reverse(UsedNodes), []);
+attach_nodes([S | Rest], Acc, [Node | Nodes], UsedNodes) ->
+ attach_nodes(Rest, [S#shard{node=Node} | Acc], Nodes, [Node | UsedNodes]).
+
+open_db_doc(DocId) ->
+ DbName = ?l2b(couch_config:get("mem3", "shard_db", "dbs")),
+ {ok, Db} = couch_db:open(DbName, []),
+ try couch_db:open_doc(Db, DocId, []) after couch_db:close(Db) end.
+
+write_db_doc(Doc) ->
+ DbName = ?l2b(couch_config:get("mem3", "shard_db", "dbs")),
+ write_db_doc(DbName, Doc, true).
+
+write_db_doc(DbName, #doc{id=Id, body=Body} = Doc, ShouldMutate) ->
+ {ok, Db} = couch_db:open(DbName, []),
+ try couch_db:open_doc(Db, Id, []) of
+ {ok, #doc{body = Body}} ->
+ % the doc is already in the desired state, we're done here
+ ok;
+ {not_found, _} when ShouldMutate ->
+ try couch_db:update_doc(Db, Doc, []) of
+ {ok, _} ->
+ ok
+ catch conflict ->
+ % check to see if this was a replication race or a different edit
+ write_db_doc(DbName, Doc, false)
+ end;
+ _ ->
+ % the doc already exists in a different state
+ conflict
+ after
+ couch_db:close(Db)
+ end.
+
+delete_db_doc(DocId) ->
+ DbName = ?l2b(couch_config:get("mem3", "shard_db", "dbs")),
+ delete_db_doc(DbName, DocId, true).
+
+delete_db_doc(DbName, DocId, ShouldMutate) ->
+ {ok, Db} = couch_db:open(DbName, []),
+ {ok, Revs} = couch_db:open_doc_revs(Db, DocId, all, []),
+ try [Doc#doc{deleted=true} || {ok, #doc{deleted=false}=Doc} <- Revs] of
+ [] ->
+ not_found;
+ Docs when ShouldMutate ->
+ try couch_db:update_docs(Db, Docs, []) of
+ {ok, _} ->
+ ok
+ catch conflict ->
+ % check to see if this was a replication race or if leafs survived
+ delete_db_doc(DbName, DocId, false)
+ end;
+ _ ->
+ % we have live leafs that we aren't allowed to delete. let's bail
+ conflict
+ after
+ couch_db:close(Db)
+ end.
+
+build_shards(DbName, DocProps) ->
+ {ByNode} = couch_util:get_value(<<"by_node">>, DocProps, {[]}),
+ Suffix = couch_util:get_value(<<"shard_suffix">>, DocProps, ""),
+ lists:flatmap(fun({Node, Ranges}) ->
+ lists:map(fun(Range) ->
+ [B,E] = string:tokens(?b2l(Range), "-"),
+ Beg = httpd_util:hexlist_to_integer(B),
+ End = httpd_util:hexlist_to_integer(E),
+ name_shard(#shard{
+ dbname = DbName,
+ node = to_atom(Node),
+ range = [Beg, End]
+ }, Suffix)
+ end, Ranges)
+ end, ByNode).
+
+to_atom(Node) when is_binary(Node) ->
+ list_to_atom(binary_to_list(Node));
+to_atom(Node) when is_atom(Node) ->
+ Node.
+
+to_integer(N) when is_integer(N) ->
+ N;
+to_integer(N) when is_binary(N) ->
+ list_to_integer(binary_to_list(N));
+to_integer(N) when is_list(N) ->
+ list_to_integer(N).
+
+n_val(undefined, NodeCount) ->
+ n_val(couch_config:get("cluster", "n", "3"), NodeCount);
+n_val(N, NodeCount) when is_list(N) ->
+ n_val(list_to_integer(N), NodeCount);
+n_val(N, NodeCount) when is_integer(NodeCount), N > NodeCount ->
+ twig:log(error, "Request to create N=~p DB but only ~p node(s)", [N, NodeCount]),
+ NodeCount;
+n_val(N, _) when N < 1 ->
+ 1;
+n_val(N, _) ->
+ N.
+
+z_val(undefined, NodeCount, ZoneCount) ->
+ z_val(couch_config:get("cluster", "z", "3"), NodeCount, ZoneCount);
+z_val(N, NodeCount, ZoneCount) when is_list(N) ->
+ z_val(list_to_integer(N), NodeCount, ZoneCount);
+z_val(N, NodeCount, ZoneCount) when N > NodeCount orelse N > ZoneCount ->
+ twig:log(error, "Request to create Z=~p DB but only ~p nodes(s) and ~p zone(s)",
+ [N, NodeCount, ZoneCount]),
+ erlang:min(NodeCount, ZoneCount);
+z_val(N, _, _) when N < 1 ->
+ 1;
+z_val(N, _, _) ->
+ N.
+
+load_shards_from_disk(DbName) when is_binary(DbName) ->
+ X = ?l2b(couch_config:get("mem3", "shard_db", "dbs")),
+ {ok, Db} = couch_db:open(X, []),
+ try load_shards_from_db(Db, DbName) after couch_db:close(Db) end.
+
+load_shards_from_db(#db{} = ShardDb, DbName) ->
+ case couch_db:open_doc(ShardDb, DbName, []) of
+ {ok, #doc{body = {Props}}} ->
+ twig:log(notice, "dbs cache miss for ~s", [DbName]),
+ build_shards(DbName, Props);
+ {not_found, _} ->
+ erlang:error(database_does_not_exist, ?b2l(DbName))
+ end.
+
+load_shards_from_disk(DbName, DocId)->
+ Shards = load_shards_from_disk(DbName),
+ HashKey = hash(DocId),
+ [S || #shard{range = [B,E]} = S <- Shards, B < HashKey, HashKey =< E].
+
+shard_info(DbName) ->
+ [{n, mem3:n(DbName)},
+ {q, length(mem3:shards(DbName)) div mem3:n(DbName)}].
+
+ensure_exists(DbName) when is_list(DbName) ->
+ ensure_exists(list_to_binary(DbName));
+ensure_exists(DbName) ->
+ Options = [{user_ctx, #user_ctx{roles=[<<"_admin">>]}}],
+ case couch_db:open(DbName, Options) of
+ {ok, Db} ->
+ {ok, Db};
+ _ ->
+ couch_server:create(DbName, Options)
+ end.
diff --git a/deps/mem3/test/01-config-default.ini b/deps/mem3/test/01-config-default.ini
new file mode 100644
index 00000000..757f7830
--- /dev/null
+++ b/deps/mem3/test/01-config-default.ini
@@ -0,0 +1,2 @@
+[cluster]
+n=3
diff --git a/deps/mem3/test/mem3_util_test.erl b/deps/mem3/test/mem3_util_test.erl
new file mode 100644
index 00000000..89c23ca6
--- /dev/null
+++ b/deps/mem3/test/mem3_util_test.erl
@@ -0,0 +1,154 @@
+% Copyright 2010 Cloudant
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_util_test).
+
+-include("mem3.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+hash_test() ->
+ ?assertEqual(1624516141,mem3_util:hash(0)),
+ ?assertEqual(3816901808,mem3_util:hash("0")),
+ ?assertEqual(3523407757,mem3_util:hash(<<0>>)),
+ ?assertEqual(4108050209,mem3_util:hash(<<"0">>)),
+ ?assertEqual(3094724072,mem3_util:hash(zero)),
+ ok.
+
+name_shard_test() ->
+ Shard1 = #shard{},
+ ?assertError(function_clause, mem3_util:name_shard(Shard1, ".1234")),
+
+ Shard2 = #shard{dbname = <<"testdb">>, range = [0,100]},
+ #shard{name=Name2} = mem3_util:name_shard(Shard2, ".1234"),
+ ?assertEqual(<<"shards/00000000-00000064/testdb.1234">>, Name2),
+
+ ok.
+
+create_partition_map_test() ->
+ {DbName1, N1, Q1, Nodes1} = {<<"testdb1">>, 3, 4, [a,b,c,d]},
+ Map1 = mem3_util:create_partition_map(DbName1, N1, Q1, Nodes1),
+ ?assertEqual(12, length(Map1)),
+
+ {DbName2, N2, Q2, Nodes2} = {<<"testdb2">>, 1, 1, [a,b,c,d]},
+ [#shard{name=Name2,node=Node2}] = Map2 =
+ mem3_util:create_partition_map(DbName2, N2, Q2, Nodes2, ".1234"),
+ ?assertEqual(1, length(Map2)),
+ ?assertEqual(<<"shards/00000000-ffffffff/testdb2.1234">>, Name2),
+ ?assertEqual(a, Node2),
+ ok.
+
+build_shards_test() ->
+ DocProps1 =
+ [{<<"changelog">>,
+ [[<<"add">>,<<"00000000-1fffffff">>,
+ <<"bigcouch@node.local">>],
+ [<<"add">>,<<"20000000-3fffffff">>,
+ <<"bigcouch@node.local">>],
+ [<<"add">>,<<"40000000-5fffffff">>,
+ <<"bigcouch@node.local">>],
+ [<<"add">>,<<"60000000-7fffffff">>,
+ <<"bigcouch@node.local">>],
+ [<<"add">>,<<"80000000-9fffffff">>,
+ <<"bigcouch@node.local">>],
+ [<<"add">>,<<"a0000000-bfffffff">>,
+ <<"bigcouch@node.local">>],
+ [<<"add">>,<<"c0000000-dfffffff">>,
+ <<"bigcouch@node.local">>],
+ [<<"add">>,<<"e0000000-ffffffff">>,
+ <<"bigcouch@node.local">>]]},
+ {<<"by_node">>,
+ {[{<<"bigcouch@node.local">>,
+ [<<"00000000-1fffffff">>,<<"20000000-3fffffff">>,
+ <<"40000000-5fffffff">>,<<"60000000-7fffffff">>,
+ <<"80000000-9fffffff">>,<<"a0000000-bfffffff">>,
+ <<"c0000000-dfffffff">>,<<"e0000000-ffffffff">>]}]}},
+ {<<"by_range">>,
+ {[{<<"00000000-1fffffff">>,[<<"bigcouch@node.local">>]},
+ {<<"20000000-3fffffff">>,[<<"bigcouch@node.local">>]},
+ {<<"40000000-5fffffff">>,[<<"bigcouch@node.local">>]},
+ {<<"60000000-7fffffff">>,[<<"bigcouch@node.local">>]},
+ {<<"80000000-9fffffff">>,[<<"bigcouch@node.local">>]},
+ {<<"a0000000-bfffffff">>,[<<"bigcouch@node.local">>]},
+ {<<"c0000000-dfffffff">>,[<<"bigcouch@node.local">>]},
+ {<<"e0000000-ffffffff">>,[<<"bigcouch@node.local">>]}]}}],
+ Shards1 = mem3_util:build_shards(<<"testdb1">>, DocProps1),
+ ExpectedShards1 =
+ [{shard,<<"shards/00000000-1fffffff/testdb1">>,
+ 'bigcouch@node.local',<<"testdb1">>,
+ [0,536870911],
+ undefined},
+ {shard,<<"shards/20000000-3fffffff/testdb1">>,
+ 'bigcouch@node.local',<<"testdb1">>,
+ [536870912,1073741823],
+ undefined},
+ {shard,<<"shards/40000000-5fffffff/testdb1">>,
+ 'bigcouch@node.local',<<"testdb1">>,
+ [1073741824,1610612735],
+ undefined},
+ {shard,<<"shards/60000000-7fffffff/testdb1">>,
+ 'bigcouch@node.local',<<"testdb1">>,
+ [1610612736,2147483647],
+ undefined},
+ {shard,<<"shards/80000000-9fffffff/testdb1">>,
+ 'bigcouch@node.local',<<"testdb1">>,
+ [2147483648,2684354559],
+ undefined},
+ {shard,<<"shards/a0000000-bfffffff/testdb1">>,
+ 'bigcouch@node.local',<<"testdb1">>,
+ [2684354560,3221225471],
+ undefined},
+ {shard,<<"shards/c0000000-dfffffff/testdb1">>,
+ 'bigcouch@node.local',<<"testdb1">>,
+ [3221225472,3758096383],
+ undefined},
+ {shard,<<"shards/e0000000-ffffffff/testdb1">>,
+ 'bigcouch@node.local',<<"testdb1">>,
+ [3758096384,4294967295],
+ undefined}],
+ ?assertEqual(ExpectedShards1, Shards1),
+ ok.
+
+
+%% n_val tests
+
+nval_test() ->
+ ?assertEqual(2, mem3_util:n_val(2,4)),
+ ?assertEqual(1, mem3_util:n_val(-1,4)),
+ ?assertEqual(4, mem3_util:n_val(6,4)),
+ ok.
+
+config_01_setup() ->
+ Ini = filename:join([code:lib_dir(mem3, test), "01-config-default.ini"]),
+ {ok, Pid} = couch_config:start_link([Ini]),
+ Pid.
+
+config_teardown(_Pid) ->
+ couch_config:stop().
+
+n_val_test_() ->
+ {"n_val tests",
+ [
+ {setup,
+ fun config_01_setup/0,
+ fun config_teardown/1,
+ fun(Pid) ->
+ {with, Pid, [
+ fun n_val_1/1
+ ]}
+ end}
+ ]
+ }.
+
+n_val_1(_Pid) ->
+ ?assertEqual(3, mem3_util:n_val(undefined, 4)).