diff options
author | Micah Anderson <micah@leap.se> | 2014-01-15 18:13:16 +0000 |
---|---|---|
committer | drebs <drebs@leap.se> | 2014-01-17 08:48:11 -0200 |
commit | 510c6d763fba74f95ae8f894408c3658bcef4f83 (patch) | |
tree | d4dd0930b902cb1e5d46bea621ec83f801ea8ed6 /deps/mem3/src | |
parent | 8bd863936ead4243f58fb99e11d1221e1af0a71e (diff) |
embed dependencies that were previously pulled in by git during rebar build
Diffstat (limited to 'deps/mem3/src')
-rw-r--r-- | deps/mem3/src/mem3.app.src | 13 | ||||
-rw-r--r-- | deps/mem3/src/mem3.erl | 238 | ||||
-rw-r--r-- | deps/mem3/src/mem3_app.erl | 23 | ||||
-rw-r--r-- | deps/mem3/src/mem3_cache.erl | 118 | ||||
-rw-r--r-- | deps/mem3/src/mem3_httpd.erl | 53 | ||||
-rw-r--r-- | deps/mem3/src/mem3_nodes.erl | 136 | ||||
-rw-r--r-- | deps/mem3/src/mem3_rep.erl | 144 | ||||
-rw-r--r-- | deps/mem3/src/mem3_rep_manager.erl | 627 | ||||
-rw-r--r-- | deps/mem3/src/mem3_sup.erl | 36 | ||||
-rw-r--r-- | deps/mem3/src/mem3_sync.erl | 267 | ||||
-rw-r--r-- | deps/mem3/src/mem3_sync_event.erl | 68 | ||||
-rw-r--r-- | deps/mem3/src/mem3_util.erl | 211 |
12 files changed, 1934 insertions, 0 deletions
diff --git a/deps/mem3/src/mem3.app.src b/deps/mem3/src/mem3.app.src new file mode 100644 index 00000000..88447783 --- /dev/null +++ b/deps/mem3/src/mem3.app.src @@ -0,0 +1,13 @@ +{application, mem3, [ + {description, "CouchDB Cluster Membership"}, + {vsn, git}, + {mod, {mem3_app, []}}, + {registered, [ + mem3_cache, + mem3_events, + mem3_nodes, + mem3_sync, + mem3_sup + ]}, + {applications, [kernel, stdlib, sasl, crypto, mochiweb, couch, twig]} +]}. diff --git a/deps/mem3/src/mem3.erl b/deps/mem3/src/mem3.erl new file mode 100644 index 00000000..c7979642 --- /dev/null +++ b/deps/mem3/src/mem3.erl @@ -0,0 +1,238 @@ +% Copyright 2010 Cloudant +% +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(mem3). + +-export([start/0, stop/0, restart/0, nodes/0, node_info/2, shards/1, shards/2, + choose_shards/2, n/1, dbname/1, ushards/1]). +-export([compare_nodelists/0, compare_shards/1]). +-export([quorum/1]). + +-include("mem3.hrl"). +-include_lib("couch/include/couch_db.hrl"). + +start() -> + application:start(mem3). + +stop() -> + application:stop(mem3). + +restart() -> + stop(), + start(). + +%% @doc Detailed report of cluster-wide membership state. Queries the state +%% on all member nodes and builds a dictionary with unique states as the +%% key and the nodes holding that state as the value. Also reports member +%% nodes which fail to respond and nodes which are connected but are not +%% cluster members. Useful for debugging. +-spec compare_nodelists() -> [{{cluster_nodes, [node()]} | bad_nodes + | non_member_nodes, [node()]}]. +compare_nodelists() -> + Nodes = mem3:nodes(), + AllNodes = erlang:nodes([this, visible]), + {Replies, BadNodes} = gen_server:multi_call(Nodes, mem3_nodes, get_nodelist), + Dict = lists:foldl(fun({Node, Nodelist}, D) -> + orddict:append({cluster_nodes, Nodelist}, Node, D) + end, orddict:new(), Replies), + [{non_member_nodes, AllNodes -- Nodes}, {bad_nodes, BadNodes} | Dict]. + +-spec compare_shards(DbName::iodata()) -> [{bad_nodes | [#shard{}], [node()]}]. +compare_shards(DbName) when is_list(DbName) -> + compare_shards(list_to_binary(DbName)); +compare_shards(DbName) -> + Nodes = mem3:nodes(), + {Replies, BadNodes} = rpc:multicall(mem3, shards, [DbName]), + GoodNodes = [N || N <- Nodes, not lists:member(N, BadNodes)], + Dict = lists:foldl(fun({Shards, Node}, D) -> + orddict:append(Shards, Node, D) + end, orddict:new(), lists:zip(Replies, GoodNodes)), + [{bad_nodes, BadNodes} | Dict]. + +-spec n(DbName::iodata()) -> integer(). +n(DbName) -> + length(mem3:shards(DbName, <<"foo">>)). + +-spec nodes() -> [node()]. +nodes() -> + mem3_nodes:get_nodelist(). + +node_info(Node, Key) -> + mem3_nodes:get_node_info(Node, Key). + +-spec shards(DbName::iodata()) -> [#shard{}]. +shards(DbName) when is_list(DbName) -> + shards(list_to_binary(DbName)); +shards(DbName) -> + ShardDbName = + list_to_binary(couch_config:get("mem3", "shard_db", "dbs")), + case DbName of + ShardDbName -> + %% shard_db is treated as a single sharded db to support calls to db_info + %% and view_all_docs + [#shard{ + node = node(), + name = ShardDbName, + dbname = ShardDbName, + range = [0, 2 bsl 31]}]; + _ -> + try ets:lookup(partitions, DbName) of + [] -> + mem3_util:load_shards_from_disk(DbName); + Else -> + Else + catch error:badarg -> + mem3_util:load_shards_from_disk(DbName) + end + end. + +-spec shards(DbName::iodata(), DocId::binary()) -> [#shard{}]. +shards(DbName, DocId) when is_list(DbName) -> + shards(list_to_binary(DbName), DocId); +shards(DbName, DocId) when is_list(DocId) -> + shards(DbName, list_to_binary(DocId)); +shards(DbName, DocId) -> + HashKey = mem3_util:hash(DocId), + Head = #shard{ + name = '_', + node = '_', + dbname = DbName, + range = ['$1','$2'], + ref = '_' + }, + Conditions = [{'=<', '$1', HashKey}, {'=<', HashKey, '$2'}], + try ets:select(partitions, [{Head, Conditions, ['$_']}]) of + [] -> + mem3_util:load_shards_from_disk(DbName, DocId); + Shards -> + Shards + catch error:badarg -> + mem3_util:load_shards_from_disk(DbName, DocId) + end. + +ushards(DbName) -> + Shards = mem3:shards(DbName), + Nodes = rotate_nodes(DbName, live_nodes()), + Buckets = bucket_by_range(Shards), + choose_ushards(Buckets, Nodes). + +rotate_nodes(DbName, Nodes) -> + {H, T} = lists:split(erlang:crc32(DbName) rem length(Nodes), Nodes), + T ++ H. + +live_nodes() -> + lists:sort([node()|erlang:nodes()]). + +bucket_by_range(Shards) -> + Buckets0 = lists:foldl(fun(#shard{range=Range}=Shard, Dict) -> + orddict:append(Range, Shard, Dict) end, orddict:new(), Shards), + {_, Buckets} = lists:unzip(Buckets0), + Buckets. + +choose_ushards(Buckets, Nodes) -> + choose_ushards(Buckets, Nodes, []). + +choose_ushards([], _, Acc) -> + lists:reverse(Acc); +choose_ushards([Bucket|RestBuckets], Nodes, Acc) -> + #shard{node=Node} = Shard = first_match(Bucket, Bucket, Nodes), + choose_ushards(RestBuckets, lists:delete(Node, Nodes) ++ [Node], + [Shard | Acc]). + +first_match([], [#shard{range=Range}|_], []) -> + throw({range_not_available, Range}); +first_match([#shard{node=Node}=Shard|_], _, [Node|_]) -> + Shard; +first_match([], Shards, [_|RestNodes]) -> + first_match(Shards, Shards, RestNodes); +first_match([_|RestShards], Shards, Nodes) -> + first_match(RestShards, Shards, Nodes). + +-spec choose_shards(DbName::iodata(), Options::list()) -> [#shard{}]. +choose_shards(DbName, Options) when is_list(DbName) -> + choose_shards(list_to_binary(DbName), Options); +choose_shards(DbName, Options) -> + try shards(DbName) + catch error:E when E==database_does_not_exist; E==badarg -> + Nodes = mem3:nodes(), + NodeCount = length(Nodes), + Zones = zones(Nodes), + ZoneCount = length(Zones), + N = mem3_util:n_val(couch_util:get_value(n, Options), NodeCount), + Q = mem3_util:to_integer(couch_util:get_value(q, Options, + couch_config:get("cluster", "q", "8"))), + Z = mem3_util:z_val(couch_util:get_value(z, Options), NodeCount, ZoneCount), + Suffix = couch_util:get_value(shard_suffix, Options, ""), + ChosenZones = lists:sublist(shuffle(Zones), Z), + lists:flatmap( + fun({Zone, N1}) -> + Nodes1 = nodes_in_zone(Nodes, Zone), + {A, B} = lists:split(crypto:rand_uniform(1,length(Nodes1)+1), Nodes1), + RotatedNodes = B ++ A, + mem3_util:create_partition_map(DbName, erlang:min(N1,length(Nodes1)), + Q, RotatedNodes, Suffix) + end, + lists:zip(ChosenZones, apportion(N, Z))) + end. + +-spec dbname(#shard{} | iodata()) -> binary(). +dbname(#shard{dbname = DbName}) -> + DbName; +dbname(<<"shards/", _:8/binary, "-", _:8/binary, "/", DbName/binary>>) -> + list_to_binary(filename:rootname(binary_to_list(DbName))); +dbname(DbName) when is_list(DbName) -> + dbname(list_to_binary(DbName)); +dbname(DbName) when is_binary(DbName) -> + DbName; +dbname(_) -> + erlang:error(badarg). + + +zones(Nodes) -> + lists:usort([mem3:node_info(Node, <<"zone">>) || Node <- Nodes]). + +nodes_in_zone(Nodes, Zone) -> + [Node || Node <- Nodes, Zone == mem3:node_info(Node, <<"zone">>)]. + +shuffle(List) -> + %% Determine the log n portion then randomize the list. + randomize(round(math:log(length(List)) + 0.5), List). + +randomize(1, List) -> + randomize(List); +randomize(T, List) -> + lists:foldl(fun(_E, Acc) -> randomize(Acc) end, + randomize(List), lists:seq(1, (T - 1))). + +randomize(List) -> + D = lists:map(fun(A) -> {random:uniform(), A} end, List), + {_, D1} = lists:unzip(lists:keysort(1, D)), + D1. + +apportion(Shares, Ways) -> + apportion(Shares, lists:duplicate(Ways, 0), Shares). + +apportion(_Shares, Acc, 0) -> + Acc; +apportion(Shares, Acc, Remaining) -> + N = Remaining rem length(Acc), + [H|T] = lists:nthtail(N, Acc), + apportion(Shares, lists:sublist(Acc, N) ++ [H+1|T], Remaining - 1). + +% quorum functions + +quorum(#db{name=DbName}) -> + quorum(DbName); +quorum(DbName) -> + n(DbName) div 2 + 1. diff --git a/deps/mem3/src/mem3_app.erl b/deps/mem3/src/mem3_app.erl new file mode 100644 index 00000000..bb27171f --- /dev/null +++ b/deps/mem3/src/mem3_app.erl @@ -0,0 +1,23 @@ +% Copyright 2010 Cloudant +% +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(mem3_app). +-behaviour(application). +-export([start/2, stop/1]). + +start(_Type, []) -> + mem3_sup:start_link(). + +stop([]) -> + ok. diff --git a/deps/mem3/src/mem3_cache.erl b/deps/mem3/src/mem3_cache.erl new file mode 100644 index 00000000..84686b91 --- /dev/null +++ b/deps/mem3/src/mem3_cache.erl @@ -0,0 +1,118 @@ +% Copyright 2010 Cloudant +% +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(mem3_cache). +-behaviour(gen_server). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). + +-export([start_link/0]). + +-record(state, {changes_pid}). + +-include("mem3.hrl"). +-include_lib("couch/include/couch_db.hrl"). + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +init([]) -> + ets:new(partitions, [bag, public, named_table, {keypos,#shard.dbname}]), + {Pid, _} = spawn_monitor(fun() -> listen_for_changes(0) end), + {ok, #state{changes_pid = Pid}}. + +handle_call(_Call, _From, State) -> + {noreply, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info({'DOWN', _, _, Pid, {badarg, [{ets,delete,[partitions,_]}|_]}}, + #state{changes_pid=Pid} = State) -> + % fatal error, somebody deleted our ets table + {stop, ets_table_error, State}; +handle_info({'DOWN', _, _, Pid, Reason}, #state{changes_pid=Pid} = State) -> + twig:log(notice, "~p changes listener died ~p", [?MODULE, Reason]), + Seq = case Reason of {seq, EndSeq} -> EndSeq; _ -> 0 end, + erlang:send_after(5000, self(), {start_listener, Seq}), + {noreply, State}; +handle_info({start_listener, Seq}, State) -> + {NewPid, _} = spawn_monitor(fun() -> listen_for_changes(Seq) end), + {noreply, State#state{changes_pid=NewPid}}; +handle_info(_Msg, State) -> + {noreply, State}. + +terminate(_Reason, #state{changes_pid=Pid}) -> + exit(Pid, kill), + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%% internal functions + +listen_for_changes(Since) -> + DbName = couch_config:get("mem3", "shard_db", "dbs"), + {ok, Db} = mem3_util:ensure_exists(DbName), + Args = #changes_args{ + feed = "continuous", + since = Since, + heartbeat = true, + include_docs = true + }, + ChangesFun = couch_changes:handle_changes(Args, nil, Db), + ChangesFun(fun changes_callback/2). + +changes_callback(start, _) -> + {ok, nil}; +changes_callback({stop, EndSeq}, _) -> + exit({seq, EndSeq}); +changes_callback({change, {Change}, _}, _) -> + DbName = couch_util:get_value(<<"id">>, Change), + case DbName of <<"_design/", _/binary>> -> ok; _Else -> + case couch_util:get_value(<<"deleted">>, Change, false) of + true -> + ets:delete(partitions, DbName); + false -> + case couch_util:get_value(doc, Change) of + {error, Reason} -> + twig:log(error, "missing partition table for ~s: ~p", [DbName, Reason]); + {Doc} -> + ets:delete(partitions, DbName), + Shards = mem3_util:build_shards(DbName, Doc), + ets:insert(partitions, Shards), + [create_if_missing(Name) || #shard{name=Name, node=Node} + <- Shards, Node =:= node()] + end + end + end, + {ok, couch_util:get_value(<<"seq">>, Change)}; +changes_callback(timeout, _) -> + {ok, nil}. + +create_if_missing(Name) -> + DbDir = couch_config:get("couchdb", "database_dir"), + Filename = filename:join(DbDir, ?b2l(Name) ++ ".couch"), + case filelib:is_regular(Filename) of + true -> + ok; + false -> + Options = [{user_ctx, #user_ctx{roles=[<<"_admin">>]}}], + case couch_server:create(Name, Options) of + {ok, Db} -> + couch_db:close(Db); + Error -> + twig:log(error, "~p tried to create ~s, got ~p", [?MODULE, Name, Error]) + end + end. diff --git a/deps/mem3/src/mem3_httpd.erl b/deps/mem3/src/mem3_httpd.erl new file mode 100644 index 00000000..716080f8 --- /dev/null +++ b/deps/mem3/src/mem3_httpd.erl @@ -0,0 +1,53 @@ +% Copyright 2010 Cloudant +% +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(mem3_httpd). + +-export([handle_membership_req/1]). + +%% includes +-include("mem3.hrl"). +-include_lib("couch/include/couch_db.hrl"). + + +handle_membership_req(#httpd{method='GET', + path_parts=[<<"_membership">>]} = Req) -> + ClusterNodes = try mem3:nodes() + catch _:_ -> {ok,[]} end, + couch_httpd:send_json(Req, {[ + {all_nodes, lists:sort([node()|nodes()])}, + {cluster_nodes, lists:sort(ClusterNodes)} + ]}); +handle_membership_req(#httpd{method='GET', + path_parts=[<<"_membership">>, <<"parts">>, DbName]} = Req) -> + ClusterNodes = try mem3:nodes() + catch _:_ -> {ok,[]} end, + Shards = mem3:shards(DbName), + JsonShards = json_shards(Shards, dict:new()), + couch_httpd:send_json(Req, {[ + {all_nodes, lists:sort([node()|nodes()])}, + {cluster_nodes, lists:sort(ClusterNodes)}, + {partitions, JsonShards} + ]}). + +%% +%% internal +%% + +json_shards([], AccIn) -> + List = dict:to_list(AccIn), + {lists:sort(List)}; +json_shards([#shard{node=Node, range=[B,_E]} | Rest], AccIn) -> + HexBeg = couch_util:to_hex(<<B:32/integer>>), + json_shards(Rest, dict:append(HexBeg, Node, AccIn)). diff --git a/deps/mem3/src/mem3_nodes.erl b/deps/mem3/src/mem3_nodes.erl new file mode 100644 index 00000000..0be66462 --- /dev/null +++ b/deps/mem3/src/mem3_nodes.erl @@ -0,0 +1,136 @@ +% Copyright 2010 Cloudant +% +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(mem3_nodes). +-behaviour(gen_server). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). + +-export([start_link/0, get_nodelist/0, get_node_info/2]). + +-include("mem3.hrl"). +-include_lib("couch/include/couch_db.hrl"). + +-record(state, {changes_pid, update_seq, nodes}). + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +get_nodelist() -> + gen_server:call(?MODULE, get_nodelist). + +get_node_info(Node, Key) -> + gen_server:call(?MODULE, {get_node_info, Node, Key}). + +init([]) -> + {Nodes, UpdateSeq} = initialize_nodelist(), + {Pid, _} = spawn_monitor(fun() -> listen_for_changes(UpdateSeq) end), + {ok, #state{changes_pid = Pid, update_seq = UpdateSeq, nodes = Nodes}}. + +handle_call(get_nodelist, _From, State) -> + {reply, lists:sort(dict:fetch_keys(State#state.nodes)), State}; +handle_call({get_node_info, Node, Key}, _From, State) -> + case dict:find(Node, State#state.nodes) of + {ok, NodeInfo} -> + {reply, couch_util:get_value(Key, NodeInfo), State}; + error -> + {reply, error, State} + end; +handle_call({add_node, Node, NodeInfo}, _From, #state{nodes=Nodes} = State) -> + gen_event:notify(mem3_events, {add_node, Node}), + {reply, ok, State#state{nodes = dict:store(Node, NodeInfo, Nodes)}}; +handle_call({remove_node, Node}, _From, #state{nodes=Nodes} = State) -> + gen_event:notify(mem3_events, {remove_node, Node}), + {reply, ok, State#state{nodes = dict:erase(Node, Nodes)}}; +handle_call(_Call, _From, State) -> + {noreply, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info({'DOWN', _, _, Pid, Reason}, #state{changes_pid=Pid} = State) -> + twig:log(notice, "~p changes listener died ~p", [?MODULE, Reason]), + StartSeq = State#state.update_seq, + Seq = case Reason of {seq, EndSeq} -> EndSeq; _ -> StartSeq end, + erlang:send_after(5000, self(), start_listener), + {noreply, State#state{update_seq = Seq}}; +handle_info(start_listener, #state{update_seq = Seq} = State) -> + {NewPid, _} = spawn_monitor(fun() -> listen_for_changes(Seq) end), + {noreply, State#state{changes_pid=NewPid}}; +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%% internal functions + +initialize_nodelist() -> + DbName = couch_config:get("mem3", "node_db", "nodes"), + {ok, Db} = mem3_util:ensure_exists(DbName), + {ok, _, {_, Nodes0}} = couch_btree:fold(Db#db.id_tree, fun first_fold/3, + {Db, dict:new()}, []), + % add self if not already present + case dict:find(node(), Nodes0) of + {ok, _} -> + Nodes = Nodes0; + error -> + Doc = #doc{id = couch_util:to_binary(node())}, + {ok, _} = couch_db:update_doc(Db, Doc, []), + Nodes = dict:store(node(), [], Nodes0) + end, + couch_db:close(Db), + {Nodes, Db#db.update_seq}. + +first_fold(#full_doc_info{id = <<"_design/", _/binary>>}, _, Acc) -> + {ok, Acc}; +first_fold(#full_doc_info{deleted=true}, _, Acc) -> + {ok, Acc}; +first_fold(#full_doc_info{id=Id}=DocInfo, _, {Db, Dict}) -> + {ok, #doc{body={Props}}} = couch_db:open_doc(Db, DocInfo), + {ok, {Db, dict:store(mem3_util:to_atom(Id), Props, Dict)}}. + +listen_for_changes(Since) -> + DbName = couch_config:get("mem3", "node_db", "nodes"), + {ok, Db} = mem3_util:ensure_exists(DbName), + Args = #changes_args{ + feed = "continuous", + since = Since, + heartbeat = true, + include_docs = true + }, + ChangesFun = couch_changes:handle_changes(Args, nil, Db), + ChangesFun(fun changes_callback/2). + +changes_callback(start, _) -> + {ok, nil}; +changes_callback({stop, EndSeq}, _) -> + exit({seq, EndSeq}); +changes_callback({change, {Change}, _}, _) -> + Node = couch_util:get_value(<<"id">>, Change), + case Node of <<"_design/", _/binary>> -> ok; _ -> + case couch_util:get_value(<<"deleted">>, Change, false) of + false -> + {Props} = couch_util:get_value(doc, Change), + gen_server:call(?MODULE, {add_node, mem3_util:to_atom(Node), Props}); + true -> + gen_server:call(?MODULE, {remove_node, mem3_util:to_atom(Node)}) + end + end, + {ok, couch_util:get_value(<<"seq">>, Change)}; +changes_callback(timeout, _) -> + {ok, nil}. diff --git a/deps/mem3/src/mem3_rep.erl b/deps/mem3/src/mem3_rep.erl new file mode 100644 index 00000000..b68973c5 --- /dev/null +++ b/deps/mem3/src/mem3_rep.erl @@ -0,0 +1,144 @@ +-module(mem3_rep). + +-export([go/2, changes_enumerator/3, make_local_id/2]). + +-include("mem3.hrl"). +-include_lib("couch/include/couch_db.hrl"). + +-define(CTX, #user_ctx{roles = [<<"_admin">>]}). + +-record(acc, {revcount = 0, infos = [], seq, localid, source, target}). + +go(DbName, Node) when is_binary(DbName), is_atom(Node) -> + go(#shard{name=DbName, node=node()}, #shard{name=DbName, node=Node}); + +go(#shard{} = Source, #shard{} = Target) -> + LocalId = make_local_id(Source, Target), + case couch_db:open(Source#shard.name, [{user_ctx,?CTX}]) of + {ok, Db} -> + try + go(Db, Target, LocalId) + catch error:{not_found, no_db_file} -> + {error, missing_target} + after + couch_db:close(Db) + end; + {not_found, no_db_file} -> + {error, missing_source} + end. + +go(#db{name = DbName, seq_tree = Bt} = Db, #shard{} = Target, LocalId) -> + erlang:put(io_priority, {internal_repl, DbName}), + Seq = calculate_start_seq(Db, Target, LocalId), + Acc0 = #acc{source=Db, target=Target, seq=Seq, localid=LocalId}, + Fun = fun ?MODULE:changes_enumerator/3, + {ok, _, AccOut} = couch_btree:fold(Bt, Fun, Acc0, [{start_key, Seq + 1}]), + {ok, #acc{seq = LastSeq}} = replicate_batch(AccOut), + case couch_db:count_changes_since(Db, LastSeq) of + 0 -> + ok; + N -> + exit({pending_changes, N}) + end. + +make_local_id(#shard{node=SourceNode}, #shard{node=TargetNode}) -> + S = couch_util:encodeBase64Url(couch_util:md5(term_to_binary(SourceNode))), + T = couch_util:encodeBase64Url(couch_util:md5(term_to_binary(TargetNode))), + <<"_local/shard-sync-", S/binary, "-", T/binary>>. + +changes_enumerator(FullDocInfo, _, #acc{revcount = C} = Acc) when C >= 99 -> + #doc_info{high_seq = Seq} = couch_doc:to_doc_info(FullDocInfo), + {stop, Acc#acc{seq = Seq, infos = [FullDocInfo | Acc#acc.infos]}}; + +changes_enumerator(FullDocInfo, _, #acc{revcount = C, infos = Infos} = Acc) -> + #doc_info{high_seq = Seq, revs = Revs} = couch_doc:to_doc_info(FullDocInfo), + Count = C + length(Revs), + {ok, Acc#acc{seq = Seq, revcount = Count, infos = [FullDocInfo | Infos]}}. + +replicate_batch(#acc{target = #shard{node=Node, name=Name}} = Acc) -> + case find_missing_revs(Acc) of + [] -> + ok; + Missing -> + ok = save_on_target(Node, Name, open_docs(Acc, Missing)) + end, + update_locals(Acc), + {ok, Acc#acc{revcount=0, infos=[]}}. + +find_missing_revs(Acc) -> + #acc{target = #shard{node=Node, name=Name}, infos = Infos} = Acc, + IdsRevs = lists:map(fun(FDI) -> + #doc_info{id=Id, revs=RevInfos} = couch_doc:to_doc_info(FDI), + {Id, [R || #rev_info{rev=R} <- RevInfos]} + end, Infos), + Options = [{io_priority, {internal_repl, Name}}, {user_ctx, ?CTX}], + rexi_call(Node, {fabric_rpc, get_missing_revs, [Name, IdsRevs, Options]}). + +open_docs(#acc{source=Source, infos=Infos}, Missing) -> + lists:flatmap(fun({Id, Revs, _}) -> + FDI = lists:keyfind(Id, #full_doc_info.id, Infos), + open_doc_revs(Source, FDI, Revs) + end, Missing). + +save_on_target(Node, Name, Docs) -> + Options = [replicated_changes, full_commit, {user_ctx, ?CTX}, + {io_priority, {internal_repl, Name}}], + rexi_call(Node, {fabric_rpc, update_docs, [Name, Docs, Options]}), + ok. + +update_locals(Acc) -> + #acc{seq=Seq, source=Db, target=Target, localid=Id} = Acc, + #shard{name=Name, node=Node} = Target, + Doc = #doc{id = Id, body = {[ + {<<"seq">>, Seq}, + {<<"node">>, list_to_binary(atom_to_list(Node))}, + {<<"timestamp">>, list_to_binary(iso8601_timestamp())} + ]}}, + {ok, _} = couch_db:update_doc(Db, Doc, []), + Options = [{user_ctx, ?CTX}, {io_priority, {internal_repl, Name}}], + rexi_call(Node, {fabric_rpc, update_docs, [Name, [Doc], Options]}). + +rexi_call(Node, MFA) -> + Mon = rexi_monitor:start([{rexi_server, Node}]), + Ref = rexi:cast(Node, MFA), + try + receive {Ref, {ok, Reply}} -> + Reply; + {Ref, Error} -> + erlang:error(Error); + {rexi_DOWN, Mon, _, Reason} -> + erlang:error({rexi_DOWN, Reason}) + after 600000 -> + erlang:error(timeout) + end + after + rexi_monitor:stop(Mon) + end. + +calculate_start_seq(Db, #shard{node=Node, name=Name}, LocalId) -> + case couch_db:open_doc(Db, LocalId, []) of + {ok, #doc{body = {SProps}}} -> + Opts = [{user_ctx, ?CTX}, {io_priority, {internal_repl, Name}}], + try rexi_call(Node, {fabric_rpc, open_doc, [Name, LocalId, Opts]}) of + #doc{body = {TProps}} -> + SourceSeq = couch_util:get_value(<<"seq">>, SProps, 0), + TargetSeq = couch_util:get_value(<<"seq">>, TProps, 0), + erlang:min(SourceSeq, TargetSeq) + catch error:{not_found, _} -> + 0 + end; + {not_found, _} -> + 0 + end. + +open_doc_revs(Db, #full_doc_info{id=Id, rev_tree=RevTree}, Revs) -> + {FoundRevs, _} = couch_key_tree:get_key_leafs(RevTree, Revs), + lists:map(fun({#leaf{deleted=IsDel, ptr=SummaryPtr}, FoundRevPath}) -> + couch_db:make_doc(Db, Id, IsDel, SummaryPtr, FoundRevPath) + end, FoundRevs). + +iso8601_timestamp() -> + {_,_,Micro} = Now = os:timestamp(), + {{Year,Month,Date},{Hour,Minute,Second}} = calendar:now_to_datetime(Now), + Format = "~4.10.0B-~2.10.0B-~2.10.0BT~2.10.0B:~2.10.0B:~2.10.0B.~6.10.0BZ", + io_lib:format(Format, [Year, Month, Date, Hour, Minute, Second, Micro]). diff --git a/deps/mem3/src/mem3_rep_manager.erl b/deps/mem3/src/mem3_rep_manager.erl new file mode 100644 index 00000000..7b98701d --- /dev/null +++ b/deps/mem3/src/mem3_rep_manager.erl @@ -0,0 +1,627 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. +-module(mem3_rep_manager). +-behaviour(gen_server). + +% public API +-export([start_link/0, config_change/3]). +-export([replication_started/1, replication_completed/1, replication_error/2]). + +% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). + +-include("mem3.hrl"). +-include_lib("couch/include/couch_db.hrl"). +-include_lib("couch/include/couch_js_functions.hrl"). + +-define(DOC_TO_REP, mem3_rep_doc_id_to_rep_id). +-define(REP_TO_STATE, mem3_rep_id_to_rep_state). +-define(DB_TO_SEQ, mem3_db_to_seq). +-define(INITIAL_WAIT, 2.5). % seconds +-define(MAX_WAIT, 600). % seconds +-define(CTX, {user_ctx, #user_ctx{roles=[<<"_admin">>, <<"_replicator">>]}}). + +-record(state, { + db_notifier = nil, + max_retries, + scan_pid = nil, + rep_start_pids = [] +}). + +-record(rep_state, { + dbname, + doc_id, + user_ctx, + doc, + starting, + retries_left, + max_retries, + wait = ?INITIAL_WAIT +}). + +-import(couch_util, [ + get_value/2, + get_value/3, + to_binary/1 +]). + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +replication_started({BaseId, _} = RepId) -> + case rep_state(RepId) of + nil -> + ok; + #rep_state{dbname = DbName, doc_id = DocId} -> + update_rep_doc(DbName, DocId, [ + {<<"_replication_state">>, <<"triggered">>}, + {<<"_replication_id">>, ?l2b(BaseId)}]), + ok = gen_server:call(?MODULE, {rep_started, RepId}, infinity), + twig:log(notice, "Document `~s` triggered replication `~s`", + [DocId, pp_rep_id(RepId)]) + end. + + +replication_completed(RepId) -> + case rep_state(RepId) of + nil -> + ok; + #rep_state{dbname = DbName, doc_id = DocId} -> + update_rep_doc(DbName, DocId, [{<<"_replication_state">>, <<"completed">>}]), + ok = gen_server:call(?MODULE, {rep_complete, RepId}, infinity), + twig:log(notice, "Replication `~s` finished (triggered by document `~s`)", + [pp_rep_id(RepId), DocId]) + end. + + +replication_error({BaseId, _} = RepId, Error) -> + case rep_state(RepId) of + nil -> + ok; + #rep_state{dbname = DbName, doc_id = DocId} -> + % TODO: maybe add error reason to replication document + update_rep_doc(DbName, DocId, [ + {<<"_replication_state">>, <<"error">>}, + {<<"_replication_id">>, ?l2b(BaseId)}]), + ok = gen_server:call(?MODULE, {rep_error, RepId, Error}, infinity) + end. + +init(_) -> + process_flag(trap_exit, true), + net_kernel:monitor_nodes(true), + ?DOC_TO_REP = ets:new(?DOC_TO_REP, [named_table, set, protected]), + ?REP_TO_STATE = ets:new(?REP_TO_STATE, [named_table, set, protected]), + ?DB_TO_SEQ = ets:new(?DB_TO_SEQ, [named_table, set, protected]), + Server = self(), + ok = couch_config:register(fun ?MODULE:config_change/3, Server), + NotifierPid = db_update_notifier(), + ScanPid = spawn_link(fun() -> scan_all_dbs(Server) end), + {ok, #state{ + db_notifier = NotifierPid, + scan_pid = ScanPid, + max_retries = retries_value( + couch_config:get("replicator", "max_replication_retry_count", "10")) + }}. + +config_change("replicator", "max_replication_retry_count", V) -> + ok = gen_server:cast(?MODULE, {set_max_retries, retries_value(V)}). + +handle_call({rep_db_update, DbName, {ChangeProps} = Change}, _From, State) -> + NewState = try + process_update(State, DbName, Change) + catch + _Tag:Error -> + {RepProps} = get_value(doc, ChangeProps), + DocId = get_value(<<"_id">>, RepProps), + rep_db_update_error(Error, DbName, DocId), + State + end, + {reply, ok, NewState}; + +handle_call({rep_started, RepId}, _From, State) -> + case rep_state(RepId) of + nil -> + ok; + RepState -> + NewRepState = RepState#rep_state{ + starting = false, + retries_left = State#state.max_retries, + max_retries = State#state.max_retries, + wait = ?INITIAL_WAIT + }, + true = ets:insert(?REP_TO_STATE, {RepId, NewRepState}) + end, + {reply, ok, State}; + +handle_call({rep_complete, RepId}, _From, State) -> + true = ets:delete(?REP_TO_STATE, RepId), + {reply, ok, State}; + +handle_call({rep_error, RepId, Error}, _From, State) -> + {reply, ok, replication_error(State, RepId, Error)}; + +handle_call({resume_scan, DbName}, _From, State) -> + Since = case ets:lookup(?DB_TO_SEQ, DbName) of + [] -> 0; + [{DbName, EndSeq}] -> EndSeq + end, + Pid = changes_feed_loop(DbName, Since), + twig:log(debug, "Scanning ~s from update_seq ~p", [DbName, Since]), + {reply, ok, State#state{rep_start_pids = [Pid | State#state.rep_start_pids]}}; + +handle_call({rep_db_checkpoint, DbName, EndSeq}, _From, State) -> + true = ets:insert(?DB_TO_SEQ, {DbName, EndSeq}), + {reply, ok, State}; + +handle_call(Msg, From, State) -> + twig:log(error, "Replication manager received unexpected call ~p from ~p", + [Msg, From]), + {stop, {error, {unexpected_call, Msg}}, State}. + + +handle_cast({set_max_retries, MaxRetries}, State) -> + {noreply, State#state{max_retries = MaxRetries}}; + +handle_cast(Msg, State) -> + twig:log(error, "Replication manager received unexpected cast ~p", [Msg]), + {stop, {error, {unexpected_cast, Msg}}, State}. + +handle_info({nodeup, _Node}, State) -> + {noreply, rescan(State)}; + +handle_info({nodedown, _Node}, State) -> + {noreply, rescan(State)}; + +handle_info({'EXIT', From, normal}, #state{scan_pid = From} = State) -> + twig:log(debug, "Background scan has completed.", []), + {noreply, State#state{scan_pid=nil}}; + +handle_info({'EXIT', From, Reason}, #state{scan_pid = From} = State) -> + twig:log(error, "Background scanner died. Reason: ~p", [Reason]), + {stop, {scanner_died, Reason}, State}; + +handle_info({'EXIT', From, Reason}, #state{db_notifier = From} = State) -> + twig:log(error, "Database update notifier died. Reason: ~p", [Reason]), + {stop, {db_update_notifier_died, Reason}, State}; + +handle_info({'EXIT', From, normal}, #state{rep_start_pids = Pids} = State) -> + % one of the replication start processes terminated successfully + {noreply, State#state{rep_start_pids = Pids -- [From]}}; + +handle_info({'DOWN', _Ref, _, _, _}, State) -> + % From a db monitor created by a replication process. Ignore. + {noreply, State}; + +handle_info(Msg, State) -> + twig:log(error, "Replication manager received unexpected message ~p", [Msg]), + {stop, {unexpected_msg, Msg}, State}. + +terminate(_Reason, State) -> + #state{ + scan_pid = ScanPid, + rep_start_pids = StartPids, + db_notifier = DbNotifier + } = State, + stop_all_replications(), + lists:foreach( + fun(Pid) -> + catch unlink(Pid), + catch exit(Pid, stop) + end, + [ScanPid | StartPids]), + true = ets:delete(?REP_TO_STATE), + true = ets:delete(?DOC_TO_REP), + true = ets:delete(?DB_TO_SEQ), + couch_db_update_notifier:stop(DbNotifier). + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +changes_feed_loop(DbName, Since) -> + Server = self(), + Pid = spawn_link( + fun() -> + fabric:changes(DbName, fun + ({change, Change}, Acc) -> + case has_valid_rep_id(Change) of + true -> + ok = gen_server:call( + Server, {rep_db_update, DbName, Change}, infinity); + false -> + ok + end, + {ok, Acc}; + ({stop, EndSeq}, Acc) -> + ok = gen_server:call(Server, {rep_db_checkpoint, DbName, EndSeq}), + {ok, Acc}; + (_, Acc) -> + {ok, Acc} + end, + nil, + #changes_args{ + include_docs = true, + feed = "normal", + since = Since, + filter = main_only, + timeout = infinity, + db_open_options = [sys_db] + } + ) + end), + Pid. + +has_valid_rep_id({Change}) -> + has_valid_rep_id(get_value(<<"id">>, Change)); +has_valid_rep_id(<<?DESIGN_DOC_PREFIX, _Rest/binary>>) -> + false; +has_valid_rep_id(_Else) -> + true. + + +db_update_notifier() -> + Server = self(), + IsReplicatorDbFun = is_replicator_db_fun(), + {ok, Notifier} = couch_db_update_notifier:start_link( + fun({_, DbName}) -> + case IsReplicatorDbFun(DbName) of + true -> + ok = gen_server:call(Server, {resume_scan, mem3:dbname(DbName)}); + _ -> + ok + end + end + ), + Notifier. + +rescan(#state{scan_pid = nil} = State) -> + true = ets:delete_all_objects(?DB_TO_SEQ), + Server = self(), + NewScanPid = spawn_link(fun() -> scan_all_dbs(Server) end), + State#state{scan_pid = NewScanPid}; +rescan(#state{scan_pid = ScanPid} = State) -> + unlink(ScanPid), + exit(ScanPid, exit), + rescan(State#state{scan_pid = nil}). + +process_update(State, DbName, {Change}) -> + {RepProps} = JsonRepDoc = get_value(doc, Change), + DocId = get_value(<<"_id">>, RepProps), + case {owner(DbName, DocId), get_value(deleted, Change, false)} of + {false, _} -> + replication_complete(DocId), + State; + {true, true} -> + rep_doc_deleted(DocId), + State; + {true, false} -> + case get_value(<<"_replication_state">>, RepProps) of + undefined -> + maybe_start_replication(State, DbName, DocId, JsonRepDoc); + <<"triggered">> -> + maybe_start_replication(State, DbName, DocId, JsonRepDoc); + <<"completed">> -> + replication_complete(DocId), + State; + <<"error">> -> + case ets:lookup(?DOC_TO_REP, DocId) of + [] -> + maybe_start_replication(State, DbName, DocId, JsonRepDoc); + _ -> + State + end + end + end. + + +rep_db_update_error(Error, DbName, DocId) -> + case Error of + {bad_rep_doc, Reason} -> + ok; + _ -> + Reason = to_binary(Error) + end, + twig:log(error, "Replication manager, error processing document `~s`: ~s", + [DocId, Reason]), + update_rep_doc(DbName, DocId, [{<<"_replication_state">>, <<"error">>}]). + + +rep_user_ctx({RepDoc}) -> + case get_value(<<"user_ctx">>, RepDoc) of + undefined -> + #user_ctx{}; + {UserCtx} -> + #user_ctx{ + name = get_value(<<"name">>, UserCtx, null), + roles = get_value(<<"roles">>, UserCtx, []) + } + end. + + +maybe_start_replication(State, DbName, DocId, RepDoc) -> + UserCtx = rep_user_ctx(RepDoc), + {BaseId, _} = RepId = make_rep_id(RepDoc, UserCtx), + case rep_state(RepId) of + nil -> + RepState = #rep_state{ + dbname = DbName, + doc_id = DocId, + user_ctx = UserCtx, + doc = RepDoc, + starting = true, + retries_left = State#state.max_retries, + max_retries = State#state.max_retries + }, + true = ets:insert(?REP_TO_STATE, {RepId, RepState}), + true = ets:insert(?DOC_TO_REP, {DocId, RepId}), + twig:log(notice, "Attempting to start replication `~s` (document `~s`).", + [pp_rep_id(RepId), DocId]), + Server = self(), + Pid = spawn_link(fun() -> + start_replication(Server, RepDoc, RepId, UserCtx, 0) + end), + State#state{rep_start_pids = [Pid | State#state.rep_start_pids]}; + #rep_state{doc_id = DocId} -> + State; + #rep_state{starting = false, dbname = DbName, doc_id = OtherDocId} -> + twig:log(notice, "The replication specified by the document `~s` was already" + " triggered by the document `~s`", [DocId, OtherDocId]), + maybe_tag_rep_doc(DbName, DocId, RepDoc, ?l2b(BaseId)), + State; + #rep_state{starting = true, dbname = DbName, doc_id = OtherDocId} -> + twig:log(notice, "The replication specified by the document `~s` is already" + " being triggered by the document `~s`", [DocId, OtherDocId]), + maybe_tag_rep_doc(DbName, DocId, RepDoc, ?l2b(BaseId)), + State + end. + + +make_rep_id(RepDoc, UserCtx) -> + try + couch_rep:make_replication_id(RepDoc, UserCtx) + catch + throw:{error, Reason} -> + throw({bad_rep_doc, Reason}); + Tag:Err -> + throw({bad_rep_doc, to_binary({Tag, Err})}) + end. + + +maybe_tag_rep_doc(DbName, DocId, {RepProps}, RepId) -> + case get_value(<<"_replication_id">>, RepProps) of + RepId -> + ok; + _ -> + update_rep_doc(DbName, DocId, [{<<"_replication_id">>, RepId}]) + end. + + +start_replication(Server, RepDoc, RepId, UserCtx, Wait) -> + ok = timer:sleep(Wait * 1000), + case (catch couch_rep:start_replication(RepDoc, RepId, UserCtx, ?MODULE)) of + Pid when is_pid(Pid) -> + ok = gen_server:call(Server, {rep_started, RepId}, infinity), + couch_rep:get_result(Pid, RepId, RepDoc, UserCtx); + Error -> + replication_error(RepId, Error) + end. + + +replication_complete(DocId) -> + case ets:lookup(?DOC_TO_REP, DocId) of + [{DocId, RepId}] -> + case rep_state(RepId) of + nil -> + couch_rep:end_replication(RepId); + #rep_state{} -> + ok + end, + true = ets:delete(?DOC_TO_REP, DocId); + _ -> + ok + end. + + +rep_doc_deleted(DocId) -> + case ets:lookup(?DOC_TO_REP, DocId) of + [{DocId, RepId}] -> + couch_rep:end_replication(RepId), + true = ets:delete(?REP_TO_STATE, RepId), + true = ets:delete(?DOC_TO_REP, DocId), + twig:log(notice, "Stopped replication `~s` because replication document `~s`" + " was deleted", [pp_rep_id(RepId), DocId]); + [] -> + ok + end. + + +replication_error(State, RepId, Error) -> + case rep_state(RepId) of + nil -> + State; + RepState -> + maybe_retry_replication(RepId, RepState, Error, State) + end. + +maybe_retry_replication(RepId, #rep_state{retries_left = 0} = RepState, Error, State) -> + #rep_state{ + doc_id = DocId, + max_retries = MaxRetries + } = RepState, + couch_rep:end_replication(RepId), + true = ets:delete(?REP_TO_STATE, RepId), + true = ets:delete(?DOC_TO_REP, DocId), + twig:log(error, "Error in replication `~s` (triggered by document `~s`): ~s" + "~nReached maximum retry attempts (~p).", + [pp_rep_id(RepId), DocId, to_binary(error_reason(Error)), MaxRetries]), + State; + +maybe_retry_replication(RepId, RepState, Error, State) -> + #rep_state{ + doc_id = DocId, + user_ctx = UserCtx, + doc = RepDoc + } = RepState, + #rep_state{wait = Wait} = NewRepState = state_after_error(RepState), + true = ets:insert(?REP_TO_STATE, {RepId, NewRepState}), + twig:log(error, "Error in replication `~s` (triggered by document `~s`): ~s" + "~nRestarting replication in ~p seconds.", + [pp_rep_id(RepId), DocId, to_binary(error_reason(Error)), Wait]), + Server = self(), + Pid = spawn_link(fun() -> + start_replication(Server, RepDoc, RepId, UserCtx, Wait) + end), + State#state{rep_start_pids = [Pid | State#state.rep_start_pids]}. + +stop_all_replications() -> + twig:log(notice, "Stopping all ongoing replications.", []), + ets:foldl( + fun({_, RepId}, _) -> + couch_rep:end_replication(RepId) + end, + ok, ?DOC_TO_REP), + true = ets:delete_all_objects(?REP_TO_STATE), + true = ets:delete_all_objects(?DOC_TO_REP), + true = ets:delete_all_objects(?DB_TO_SEQ). + +update_rep_doc(RepDbName, RepDocId, KVs) when is_binary(RepDocId) -> + spawn_link(fun() -> + try + case fabric:open_doc(mem3:dbname(RepDbName), RepDocId, []) of + {ok, LatestRepDoc} -> + update_rep_doc(RepDbName, LatestRepDoc, KVs); + _ -> + ok + end + catch throw:conflict -> + % Shouldn't happen, as by default only the role _replicator can + % update replication documents. + twig:log(error, "Conflict error when updating replication document `~s`." + " Retrying.", [RepDocId]), + ok = timer:sleep(5), + update_rep_doc(RepDbName, RepDocId, KVs) + end + end); + +update_rep_doc(RepDbName, #doc{body = {RepDocBody}} = RepDoc, KVs) -> + NewRepDocBody = lists:foldl( + fun({<<"_replication_state">> = K, State} = KV, Body) -> + case get_value(K, Body) of + State -> + Body; + _ -> + Body1 = lists:keystore(K, 1, Body, KV), + lists:keystore( + <<"_replication_state_time">>, 1, Body1, + {<<"_replication_state_time">>, timestamp()}) + end; + ({K, _V} = KV, Body) -> + lists:keystore(K, 1, Body, KV) + end, + RepDocBody, KVs), + case NewRepDocBody of + RepDocBody -> + ok; + _ -> + % Might not succeed - when the replication doc is deleted right + % before this update (not an error, ignore). + spawn_link(fun() -> + fabric:update_doc(RepDbName, RepDoc#doc{body = {NewRepDocBody}}, [?CTX]) + end) + end. + + +% RFC3339 timestamps. +% Note: doesn't include the time seconds fraction (RFC3339 says it's optional). +timestamp() -> + {{Year, Month, Day}, {Hour, Min, Sec}} = calendar:now_to_local_time(now()), + UTime = erlang:universaltime(), + LocalTime = calendar:universal_time_to_local_time(UTime), + DiffSecs = calendar:datetime_to_gregorian_seconds(LocalTime) - + calendar:datetime_to_gregorian_seconds(UTime), + zone(DiffSecs div 3600, (DiffSecs rem 3600) div 60), + iolist_to_binary( + io_lib:format("~4..0w-~2..0w-~2..0wT~2..0w:~2..0w:~2..0w~s", + [Year, Month, Day, Hour, Min, Sec, + zone(DiffSecs div 3600, (DiffSecs rem 3600) div 60)])). + +zone(Hr, Min) when Hr >= 0, Min >= 0 -> + io_lib:format("+~2..0w:~2..0w", [Hr, Min]); +zone(Hr, Min) -> + io_lib:format("-~2..0w:~2..0w", [abs(Hr), abs(Min)]). + +% pretty-print replication id +pp_rep_id({Base, Extension}) -> + Base ++ Extension. + + +rep_state(RepId) -> + case ets:lookup(?REP_TO_STATE, RepId) of + [{RepId, RepState}] -> + RepState; + [] -> + nil + end. + + +error_reason({error, Reason}) -> + Reason; +error_reason(Reason) -> + Reason. + +retries_value("infinity") -> + infinity; +retries_value(Value) -> + list_to_integer(Value). + +state_after_error(#rep_state{retries_left = Left, wait = Wait} = State) -> + Wait2 = erlang:min(trunc(Wait * 2), ?MAX_WAIT), + case Left of + infinity -> + State#rep_state{wait = Wait2}; + _ -> + State#rep_state{retries_left = Left - 1, wait = Wait2} + end. + +scan_all_dbs(Server) when is_pid(Server) -> + {ok, Db} = mem3_util:ensure_exists( + couch_config:get("mem3", "shard_db", "dbs")), + ChangesFun = couch_changes:handle_changes(#changes_args{}, nil, Db), + IsReplicatorDbFun = is_replicator_db_fun(), + ChangesFun(fun({change, {Change}, _}, _) -> + DbName = couch_util:get_value(<<"id">>, Change), + case DbName of <<"_design/", _/binary>> -> ok; _Else -> + case couch_util:get_value(<<"deleted">>, Change, false) of + true -> + ok; + false -> + IsReplicatorDbFun(DbName) andalso + gen_server:call(Server, {resume_scan, DbName}) + end + end; + (_, _) -> ok + end), + couch_db:close(Db). + +is_replicator_db_fun() -> + {ok, RegExp} = re:compile("^([a-z][a-z0-9\\_\\$()\\+\\-\\/]*/)?_replicator$"), + fun(DbName) -> + match =:= re:run(mem3:dbname(DbName), RegExp, [{capture,none}]) + end. + +owner(DbName, DocId) -> + Shards = mem3:shards(DbName, DocId), + Nodes = [node()|nodes()], + LiveShards = [S || #shard{node=Node} = S <- Shards, lists:member(Node, Nodes)], + [#shard{node=Node}] = lists:usort(fun(#shard{name=A}, #shard{name=B}) -> + A =< B end, LiveShards), + node() =:= Node. diff --git a/deps/mem3/src/mem3_sup.erl b/deps/mem3/src/mem3_sup.erl new file mode 100644 index 00000000..07b9498b --- /dev/null +++ b/deps/mem3/src/mem3_sup.erl @@ -0,0 +1,36 @@ +% Copyright 2010 Cloudant +% +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(mem3_sup). +-behaviour(supervisor). +-export([start_link/0, init/1]). + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +init(_Args) -> + Children = [ + child(mem3_events), + child(mem3_cache), + child(mem3_nodes), + child(mem3_sync), + child(mem3_rep_manager) + ], + {ok, {{one_for_one,10,1}, Children}}. + +child(mem3_events) -> + MFA = {gen_event, start_link, [{local, mem3_events}]}, + {mem3_events, MFA, permanent, 1000, worker, dynamic}; +child(Child) -> + {Child, {Child, start_link, []}, permanent, 1000, worker, [Child]}. diff --git a/deps/mem3/src/mem3_sync.erl b/deps/mem3/src/mem3_sync.erl new file mode 100644 index 00000000..191a98c6 --- /dev/null +++ b/deps/mem3/src/mem3_sync.erl @@ -0,0 +1,267 @@ +% Copyright 2010 Cloudant +% +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(mem3_sync). +-behaviour(gen_server). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). + +-export([start_link/0, get_active/0, get_queue/0, push/1, push/2, + remove_node/1, initial_sync/1]). + +-include("mem3.hrl"). +-include_lib("couch/include/couch_db.hrl"). + +-record(state, { + active = [], + count = 0, + limit, + dict = dict:new(), + waiting = [], + update_notifier +}). + +-record(job, {name, node, count=nil, pid=nil}). + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +get_active() -> + gen_server:call(?MODULE, get_active). + +get_queue() -> + gen_server:call(?MODULE, get_queue). + +push(#shard{name = Name}, Target) -> + push(Name, Target); +push(Name, #shard{node=Node}) -> + push(Name, Node); +push(Name, Node) -> + push(#job{name = Name, node = Node}). + +push(#job{node = Node} = Job) when Node =/= node() -> + gen_server:cast(?MODULE, {push, Job}); +push(_) -> + ok. + +remove_node(Node) -> + gen_server:cast(?MODULE, {remove_node, Node}). + +init([]) -> + process_flag(trap_exit, true), + Concurrency = couch_config:get("mem3", "sync_concurrency", "10"), + gen_event:add_handler(mem3_events, mem3_sync_event, []), + {ok, Pid} = start_update_notifier(), + spawn(fun initial_sync/0), + {ok, #state{limit = list_to_integer(Concurrency), update_notifier=Pid}}. + +handle_call(get_active, _From, State) -> + {reply, State#state.active, State}; + +handle_call(get_queue, _From, State) -> + {reply, State#state.waiting, State}; + +handle_call(get_backlog, _From, #state{active=A, waiting=W} = State) -> + CA = lists:sum([C || #job{count=C} <- A, is_integer(C)]), + CW = lists:sum([C || #job{count=C} <- W, is_integer(C)]), + {reply, CA+CW, State}. + +handle_cast({push, DbName, Node}, State) -> + handle_cast({push, #job{name = DbName, node = Node}}, State); + +handle_cast({push, Job}, #state{count=Count, limit=Limit} = State) + when Count >= Limit -> + {noreply, add_to_queue(State, Job)}; + +handle_cast({push, Job}, State) -> + #state{active = L, count = C} = State, + #job{name = DbName, node = Node} = Job, + case is_running(DbName, Node, L) of + true -> + {noreply, add_to_queue(State, Job)}; + false -> + Pid = start_push_replication(Job), + {noreply, State#state{active=[Job#job{pid=Pid}|L], count=C+1}} + end; + +handle_cast({remove_node, Node}, #state{waiting = W0} = State) -> + {Alive, Dead} = lists:partition(fun(#job{node=N}) -> N =/= Node end, W0), + Dict = remove_entries(State#state.dict, Dead), + [exit(Pid, die_now) || #job{node=N, pid=Pid} <- State#state.active, + N =:= Node], + {noreply, State#state{dict = Dict, waiting = Alive}}; + +handle_cast({remove_shard, Shard}, #state{waiting = W0} = State) -> + {Alive, Dead} = lists:partition(fun(#job{name=S}) -> S =/= Shard end, W0), + Dict = remove_entries(State#state.dict, Dead), + [exit(Pid, die_now) || #job{name=S, pid=Pid} <- State#state.active, + S =:= Shard], + {noreply, State#state{dict = Dict, waiting = Alive}}. + +handle_info({'EXIT', Pid, _}, #state{update_notifier=Pid} = State) -> + {ok, NewPid} = start_update_notifier(), + {noreply, State#state{update_notifier=NewPid}}; + +handle_info({'EXIT', Active, normal}, State) -> + handle_replication_exit(State, Active); + +handle_info({'EXIT', Active, die_now}, State) -> + % we forced this one ourselves, do not retry + handle_replication_exit(State, Active); + +handle_info({'EXIT', Active, {{not_found, no_db_file}, _Stack}}, State) -> + % target doesn't exist, do not retry + handle_replication_exit(State, Active); + +handle_info({'EXIT', Active, Reason}, State) -> + NewState = case lists:keyfind(Active, #job.pid, State#state.active) of + #job{name=OldDbName, node=OldNode} = Job -> + twig:log(warn, "~p ~s -> ~p ~p", [?MODULE, OldDbName, OldNode, + Reason]), + case Reason of {pending_changes, Count} -> + add_to_queue(State, Job#job{pid = nil, count = Count}); + _ -> + timer:apply_after(5000, ?MODULE, push, [Job#job{pid=nil}]), + State + end; + false -> State end, + handle_replication_exit(NewState, Active); + +handle_info(Msg, State) -> + twig:log(notice, "unexpected msg at replication manager ~p", [Msg]), + {noreply, State}. + +terminate(_Reason, State) -> + [exit(Pid, shutdown) || #job{pid=Pid} <- State#state.active], + ok. + +code_change(_, #state{waiting = [{_,_}|_] = W, active=A} = State, _) -> + Waiting = [#job{name=Name, node=Node} || {Name,Node} <- W], + Active = [#job{name=Name, node=Node, pid=Pid} || {Name,Node,Pid} <- A], + {ok, State#state{active = Active, waiting = Waiting}}; + +code_change(_, State, _) -> + {ok, State}. + +handle_replication_exit(#state{waiting=[]} = State, Pid) -> + NewActive = lists:keydelete(Pid, #job.pid, State#state.active), + {noreply, State#state{active=NewActive, count=length(NewActive)}}; +handle_replication_exit(State, Pid) -> + #state{active=Active, limit=Limit, dict=D, waiting=Waiting} = State, + Active1 = lists:keydelete(Pid, #job.pid, Active), + Count = length(Active1), + NewState = if Count < Limit -> + case next_replication(Active1, Waiting) of + nil -> % all waiting replications are also active + State#state{active = Active1, count = Count}; + {#job{name=DbName, node=Node} = Job, StillWaiting} -> + NewPid = start_push_replication(Job), + State#state{ + active = [Job#job{pid = NewPid} | Active1], + count = Count+1, + dict = dict:erase({DbName,Node}, D), + waiting = StillWaiting + } + end; + true -> + State#state{active = Active1, count=Count} + end, + {noreply, NewState}. + +start_push_replication(#job{name=Name, node=Node}) -> + spawn_link(mem3_rep, go, [Name, Node]). + +add_to_queue(State, #job{name=DbName, node=Node} = Job) -> + #state{dict=D, waiting=Waiting} = State, + case dict:is_key({DbName, Node}, D) of + true -> + State; + false -> + twig:log(debug, "adding ~s -> ~p to mem3_sync queue", [DbName, Node]), + State#state{ + dict = dict:store({DbName,Node}, ok, D), + waiting = Waiting ++ [Job] + } + end. + +sync_nodes_and_dbs() -> + Db1 = couch_config:get("mem3", "node_db", "nodes"), + Db2 = couch_config:get("mem3", "shard_db", "dbs"), + Db3 = couch_config:get("couch_httpd_auth", "authentication_db", "_users"), + Dbs = [Db1, Db2, Db3], + Nodes = mem3:nodes(), + Live = nodes(), + [[push(?l2b(Db), N) || Db <- Dbs] || N <- Nodes, lists:member(N, Live)]. + +initial_sync() -> + [net_kernel:connect_node(Node) || Node <- mem3:nodes()], + sync_nodes_and_dbs(), + initial_sync(nodes()). + +initial_sync(Live) -> + Self = node(), + {ok, AllDbs} = fabric:all_dbs(), + lists:foreach(fun(Db) -> + LocalShards = [S || #shard{node=N} = S <- mem3:shards(Db), N =:= Self], + lists:foreach(fun(#shard{name=ShardName}) -> + Targets = [S || #shard{node=N, name=Name} = S <- mem3:shards(Db), + N =/= Self, Name =:= ShardName], + [?MODULE:push(ShardName, N) || #shard{node=N} <- Targets, + lists:member(N, Live)] + end, LocalShards) + end, AllDbs). + +start_update_notifier() -> + Db1 = ?l2b(couch_config:get("mem3", "node_db", "nodes")), + Db2 = ?l2b(couch_config:get("mem3", "shard_db", "dbs")), + Db3 = ?l2b(couch_config:get("couch_httpd_auth", "authentication_db", + "_users")), + couch_db_update_notifier:start_link(fun + ({updated, Db}) when Db == Db1; Db == Db2; Db == Db3 -> + Nodes = mem3:nodes(), + Live = nodes(), + [?MODULE:push(Db, N) || N <- Nodes, lists:member(N, Live)]; + ({updated, <<"shards/", _/binary>> = ShardName}) -> + % TODO deal with split/merged partitions by comparing keyranges + try mem3:shards(mem3:dbname(ShardName)) of + Shards -> + Targets = [S || #shard{node=N, name=Name} = S <- Shards, + N =/= node(), Name =:= ShardName], + Live = nodes(), + [?MODULE:push(ShardName,N) || #shard{node=N} <- Targets, + lists:member(N, Live)] + catch error:database_does_not_exist -> + ok + end; + ({deleted, <<"shards/", _:18/binary, _/binary>> = ShardName}) -> + gen_server:cast(?MODULE, {remove_shard, ShardName}); + (_) -> ok end). + +%% @doc Finds the next {DbName,Node} pair in the list of waiting replications +%% which does not correspond to an already running replication +-spec next_replication([#job{}], [#job{}]) -> {#job{}, [#job{}]} | nil. +next_replication(Active, Waiting) -> + Fun = fun(#job{name=S, node=N}) -> is_running(S,N,Active) end, + case lists:splitwith(Fun, Waiting) of + {_, []} -> + nil; + {Running, [Job|Rest]} -> + {Job, Running ++ Rest} + end. + +is_running(DbName, Node, ActiveList) -> + [] =/= [true || #job{name=S, node=N} <- ActiveList, S=:=DbName, N=:=Node]. + +remove_entries(Dict, Entries) -> + lists:foldl(fun(Entry, D) -> dict:erase(Entry, D) end, Dict, Entries). diff --git a/deps/mem3/src/mem3_sync_event.erl b/deps/mem3/src/mem3_sync_event.erl new file mode 100644 index 00000000..be5ab450 --- /dev/null +++ b/deps/mem3/src/mem3_sync_event.erl @@ -0,0 +1,68 @@ +% Copyright 2010 Cloudant +% +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(mem3_sync_event). +-behaviour(gen_event). + +-export([init/1, handle_event/2, handle_call/2, handle_info/2, terminate/2, + code_change/3]). + +init(_) -> + net_kernel:monitor_nodes(true), + {ok, nil}. + +handle_event({add_node, Node}, State) when Node =/= node() -> + Db1 = list_to_binary(couch_config:get("mem3", "node_db", "nodes")), + Db2 = list_to_binary(couch_config:get("mem3", "shard_db", "dbs")), + Db3 = list_to_binary(couch_config:get("couch_httpd_auth", + "authentication_db", "_users")), + [mem3_sync:push(Db, Node) || Db <- [Db1, Db2, Db3]], + {ok, State}; + +handle_event({remove_node, Node}, State) -> + mem3_sync:remove_node(Node), + {ok, State}; + +handle_event(_Event, State) -> + {ok, State}. + +handle_call(_Request, State) -> + {ok, ok, State}. + +handle_info({nodeup, Node}, State) -> + case lists:member(Node, mem3:nodes()) of + true -> + Db1 = list_to_binary(couch_config:get("mem3", "node_db", "nodes")), + Db2 = list_to_binary(couch_config:get("mem3", "shard_db", "dbs")), + Db3 = list_to_binary(couch_config:get("couch_httpd_auth", + "authentication_db", "_users")), + [mem3_sync:push(Db, Node) || Db <- [Db1, Db2, Db3]], + mem3_sync:initial_sync([Node]); + false -> + ok + end, + {ok, State}; + +handle_info({nodedown, Node}, State) -> + mem3_sync:remove_node(Node), + {ok, State}; + +handle_info(_Info, State) -> + {ok, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. diff --git a/deps/mem3/src/mem3_util.erl b/deps/mem3/src/mem3_util.erl new file mode 100644 index 00000000..c1b965bb --- /dev/null +++ b/deps/mem3/src/mem3_util.erl @@ -0,0 +1,211 @@ +% Copyright 2010 Cloudant +% +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(mem3_util). + +-export([hash/1, name_shard/2, create_partition_map/5, build_shards/2, + n_val/2, z_val/3, to_atom/1, to_integer/1, write_db_doc/1, delete_db_doc/1, + load_shards_from_disk/1, load_shards_from_disk/2, shard_info/1, + ensure_exists/1, open_db_doc/1]). + +-export([create_partition_map/4, name_shard/1]). +-deprecated({create_partition_map, 4, eventually}). +-deprecated({name_shard, 1, eventually}). + +-define(RINGTOP, 2 bsl 31). % CRC32 space + +-include("mem3.hrl"). +-include_lib("couch/include/couch_db.hrl"). + +hash(Item) when is_binary(Item) -> + erlang:crc32(Item); +hash(Item) -> + erlang:crc32(term_to_binary(Item)). + +name_shard(Shard) -> + name_shard(Shard, ""). + +name_shard(#shard{dbname = DbName, range=[B,E]} = Shard, Suffix) -> + Name = ["shards/", couch_util:to_hex(<<B:32/integer>>), "-", + couch_util:to_hex(<<E:32/integer>>), "/", DbName, Suffix], + Shard#shard{name = ?l2b(Name)}. + +create_partition_map(DbName, N, Q, Nodes) -> + create_partition_map(DbName, N, Q, Nodes, ""). + +create_partition_map(DbName, N, Q, Nodes, Suffix) -> + UniqueShards = make_key_ranges((?RINGTOP) div Q, 0, []), + Shards0 = lists:flatten([lists:duplicate(N, S) || S <- UniqueShards]), + Shards1 = attach_nodes(Shards0, [], Nodes, []), + [name_shard(S#shard{dbname=DbName}, Suffix) || S <- Shards1]. + +make_key_ranges(_, CurrentPos, Acc) when CurrentPos >= ?RINGTOP -> + Acc; +make_key_ranges(Increment, Start, Acc) -> + case Start + 2*Increment of + X when X > ?RINGTOP -> + End = ?RINGTOP - 1; + _ -> + End = Start + Increment - 1 + end, + make_key_ranges(Increment, End+1, [#shard{range=[Start, End]} | Acc]). + +attach_nodes([], Acc, _, _) -> + lists:reverse(Acc); +attach_nodes(Shards, Acc, [], UsedNodes) -> + attach_nodes(Shards, Acc, lists:reverse(UsedNodes), []); +attach_nodes([S | Rest], Acc, [Node | Nodes], UsedNodes) -> + attach_nodes(Rest, [S#shard{node=Node} | Acc], Nodes, [Node | UsedNodes]). + +open_db_doc(DocId) -> + DbName = ?l2b(couch_config:get("mem3", "shard_db", "dbs")), + {ok, Db} = couch_db:open(DbName, []), + try couch_db:open_doc(Db, DocId, []) after couch_db:close(Db) end. + +write_db_doc(Doc) -> + DbName = ?l2b(couch_config:get("mem3", "shard_db", "dbs")), + write_db_doc(DbName, Doc, true). + +write_db_doc(DbName, #doc{id=Id, body=Body} = Doc, ShouldMutate) -> + {ok, Db} = couch_db:open(DbName, []), + try couch_db:open_doc(Db, Id, []) of + {ok, #doc{body = Body}} -> + % the doc is already in the desired state, we're done here + ok; + {not_found, _} when ShouldMutate -> + try couch_db:update_doc(Db, Doc, []) of + {ok, _} -> + ok + catch conflict -> + % check to see if this was a replication race or a different edit + write_db_doc(DbName, Doc, false) + end; + _ -> + % the doc already exists in a different state + conflict + after + couch_db:close(Db) + end. + +delete_db_doc(DocId) -> + DbName = ?l2b(couch_config:get("mem3", "shard_db", "dbs")), + delete_db_doc(DbName, DocId, true). + +delete_db_doc(DbName, DocId, ShouldMutate) -> + {ok, Db} = couch_db:open(DbName, []), + {ok, Revs} = couch_db:open_doc_revs(Db, DocId, all, []), + try [Doc#doc{deleted=true} || {ok, #doc{deleted=false}=Doc} <- Revs] of + [] -> + not_found; + Docs when ShouldMutate -> + try couch_db:update_docs(Db, Docs, []) of + {ok, _} -> + ok + catch conflict -> + % check to see if this was a replication race or if leafs survived + delete_db_doc(DbName, DocId, false) + end; + _ -> + % we have live leafs that we aren't allowed to delete. let's bail + conflict + after + couch_db:close(Db) + end. + +build_shards(DbName, DocProps) -> + {ByNode} = couch_util:get_value(<<"by_node">>, DocProps, {[]}), + Suffix = couch_util:get_value(<<"shard_suffix">>, DocProps, ""), + lists:flatmap(fun({Node, Ranges}) -> + lists:map(fun(Range) -> + [B,E] = string:tokens(?b2l(Range), "-"), + Beg = httpd_util:hexlist_to_integer(B), + End = httpd_util:hexlist_to_integer(E), + name_shard(#shard{ + dbname = DbName, + node = to_atom(Node), + range = [Beg, End] + }, Suffix) + end, Ranges) + end, ByNode). + +to_atom(Node) when is_binary(Node) -> + list_to_atom(binary_to_list(Node)); +to_atom(Node) when is_atom(Node) -> + Node. + +to_integer(N) when is_integer(N) -> + N; +to_integer(N) when is_binary(N) -> + list_to_integer(binary_to_list(N)); +to_integer(N) when is_list(N) -> + list_to_integer(N). + +n_val(undefined, NodeCount) -> + n_val(couch_config:get("cluster", "n", "3"), NodeCount); +n_val(N, NodeCount) when is_list(N) -> + n_val(list_to_integer(N), NodeCount); +n_val(N, NodeCount) when is_integer(NodeCount), N > NodeCount -> + twig:log(error, "Request to create N=~p DB but only ~p node(s)", [N, NodeCount]), + NodeCount; +n_val(N, _) when N < 1 -> + 1; +n_val(N, _) -> + N. + +z_val(undefined, NodeCount, ZoneCount) -> + z_val(couch_config:get("cluster", "z", "3"), NodeCount, ZoneCount); +z_val(N, NodeCount, ZoneCount) when is_list(N) -> + z_val(list_to_integer(N), NodeCount, ZoneCount); +z_val(N, NodeCount, ZoneCount) when N > NodeCount orelse N > ZoneCount -> + twig:log(error, "Request to create Z=~p DB but only ~p nodes(s) and ~p zone(s)", + [N, NodeCount, ZoneCount]), + erlang:min(NodeCount, ZoneCount); +z_val(N, _, _) when N < 1 -> + 1; +z_val(N, _, _) -> + N. + +load_shards_from_disk(DbName) when is_binary(DbName) -> + X = ?l2b(couch_config:get("mem3", "shard_db", "dbs")), + {ok, Db} = couch_db:open(X, []), + try load_shards_from_db(Db, DbName) after couch_db:close(Db) end. + +load_shards_from_db(#db{} = ShardDb, DbName) -> + case couch_db:open_doc(ShardDb, DbName, []) of + {ok, #doc{body = {Props}}} -> + twig:log(notice, "dbs cache miss for ~s", [DbName]), + build_shards(DbName, Props); + {not_found, _} -> + erlang:error(database_does_not_exist, ?b2l(DbName)) + end. + +load_shards_from_disk(DbName, DocId)-> + Shards = load_shards_from_disk(DbName), + HashKey = hash(DocId), + [S || #shard{range = [B,E]} = S <- Shards, B < HashKey, HashKey =< E]. + +shard_info(DbName) -> + [{n, mem3:n(DbName)}, + {q, length(mem3:shards(DbName)) div mem3:n(DbName)}]. + +ensure_exists(DbName) when is_list(DbName) -> + ensure_exists(list_to_binary(DbName)); +ensure_exists(DbName) -> + Options = [{user_ctx, #user_ctx{roles=[<<"_admin">>]}}], + case couch_db:open(DbName, Options) of + {ok, Db} -> + {ok, Db}; + _ -> + couch_server:create(DbName, Options) + end. |