diff options
author | Adam Kocoloski <adam@cloudant.com> | 2010-10-25 15:46:05 -0400 |
---|---|---|
committer | Adam Kocoloski <adam@cloudant.com> | 2010-10-25 21:45:32 -0400 |
commit | ebac05f686b56791511cb9b599dfb5a742dcfc96 (patch) | |
tree | 00a789cd058f98fa014d927f094f9e6e9f91f6f2 /apps/mem3/src | |
parent | 952a85381ff4b5b34426000b1dee73c9e74becdd (diff) |
use get-deps to pull down individual cloudant projects
Diffstat (limited to 'apps/mem3/src')
-rw-r--r-- | apps/mem3/src/mem3.app.src | 13 | ||||
-rw-r--r-- | apps/mem3/src/mem3.erl | 121 | ||||
-rw-r--r-- | apps/mem3/src/mem3_app.erl | 23 | ||||
-rw-r--r-- | apps/mem3/src/mem3_cache.erl | 106 | ||||
-rw-r--r-- | apps/mem3/src/mem3_httpd.erl | 53 | ||||
-rw-r--r-- | apps/mem3/src/mem3_nodes.erl | 134 | ||||
-rw-r--r-- | apps/mem3/src/mem3_rep.erl | 106 | ||||
-rw-r--r-- | apps/mem3/src/mem3_sup.erl | 35 | ||||
-rw-r--r-- | apps/mem3/src/mem3_sync.erl | 229 | ||||
-rw-r--r-- | apps/mem3/src/mem3_sync_event.erl | 58 | ||||
-rw-r--r-- | apps/mem3/src/mem3_util.erl | 153 |
11 files changed, 0 insertions, 1031 deletions
diff --git a/apps/mem3/src/mem3.app.src b/apps/mem3/src/mem3.app.src deleted file mode 100644 index 38f85c68..00000000 --- a/apps/mem3/src/mem3.app.src +++ /dev/null @@ -1,13 +0,0 @@ -{application, mem3, [ - {description, "CouchDB Cluster Membership"}, - {mod, {mem3_app, []}}, - {vsn, "1.0.1"}, - {registered, [ - mem3_cache, - mem3_events, - mem3_nodes, - mem3_sync, - mem3_sup - ]}, - {applications, [kernel, stdlib, sasl, crypto, mochiweb, couch]} -]}. diff --git a/apps/mem3/src/mem3.erl b/apps/mem3/src/mem3.erl deleted file mode 100644 index 1021ee5b..00000000 --- a/apps/mem3/src/mem3.erl +++ /dev/null @@ -1,121 +0,0 @@ -% Copyright 2010 Cloudant -% -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(mem3). - --export([start/0, stop/0, restart/0, nodes/0, shards/1, shards/2, - choose_shards/2, n/1]). --export([compare_nodelists/0, compare_shards/1]). - --include("mem3.hrl"). - -start() -> - application:start(mem3). - -stop() -> - application:stop(mem3). - -restart() -> - stop(), - start(). - -%% @doc Detailed report of cluster-wide membership state. Queries the state -%% on all member nodes and builds a dictionary with unique states as the -%% key and the nodes holding that state as the value. Also reports member -%% nodes which fail to respond and nodes which are connected but are not -%% cluster members. Useful for debugging. --spec compare_nodelists() -> [{{cluster_nodes, [node()]} | bad_nodes - | non_member_nodes, [node()]}]. -compare_nodelists() -> - Nodes = mem3:nodes(), - AllNodes = erlang:nodes([this, visible]), - {Replies, BadNodes} = gen_server:multi_call(Nodes, mem3_nodes, get_nodelist), - Dict = lists:foldl(fun({Node, Nodelist}, D) -> - orddict:append({cluster_nodes, Nodelist}, Node, D) - end, orddict:new(), Replies), - [{non_member_nodes, AllNodes -- Nodes}, {bad_nodes, BadNodes} | Dict]. - --spec compare_shards(DbName::iodata()) -> [{bad_nodes | [#shard{}], [node()]}]. -compare_shards(DbName) when is_list(DbName) -> - compare_shards(list_to_binary(DbName)); -compare_shards(DbName) -> - Nodes = mem3:nodes(), - {Replies, BadNodes} = rpc:multicall(mem3, shards, [DbName]), - GoodNodes = [N || N <- Nodes, not lists:member(N, BadNodes)], - Dict = lists:foldl(fun({Shards, Node}, D) -> - orddict:append(Shards, Node, D) - end, orddict:new(), lists:zip(Replies, GoodNodes)), - [{bad_nodes, BadNodes} | Dict]. - --spec n(DbName::iodata()) -> integer(). -n(DbName) -> - length(mem3:shards(DbName, <<"foo">>)). - --spec nodes() -> [node()]. -nodes() -> - mem3_nodes:get_nodelist(). - --spec shards(DbName::iodata()) -> [#shard{}]. -shards(DbName) when is_list(DbName) -> - shards(list_to_binary(DbName)); -shards(DbName) -> - try ets:lookup(partitions, DbName) of - [] -> - mem3_util:load_shards_from_disk(DbName); - Else -> - Else - catch error:badarg -> - mem3_util:load_shards_from_disk(DbName) - end. - --spec shards(DbName::iodata(), DocId::binary()) -> [#shard{}]. -shards(DbName, DocId) when is_list(DbName) -> - shards(list_to_binary(DbName), DocId); -shards(DbName, DocId) when is_list(DocId) -> - shards(DbName, list_to_binary(DocId)); -shards(DbName, DocId) -> - HashKey = mem3_util:hash(DocId), - Head = #shard{ - name = '_', - node = '_', - dbname = DbName, - range = ['$1','$2'], - ref = '_' - }, - Conditions = [{'<', '$1', HashKey}, {'=<', HashKey, '$2'}], - try ets:select(partitions, [{Head, Conditions, ['$_']}]) of - [] -> - mem3_util:load_shards_from_disk(DbName, DocId); - Shards -> - Shards - catch error:badarg -> - mem3_util:load_shards_from_disk(DbName, DocId) - end. - --spec choose_shards(DbName::iodata(), Options::list()) -> [#shard{}]. -choose_shards(DbName, Options) when is_list(DbName) -> - choose_shards(list_to_binary(DbName), Options); -choose_shards(DbName, Options) -> - try shards(DbName) - catch error:E when E==database_does_not_exist; E==badarg -> - Nodes = mem3:nodes(), - NodeCount = length(Nodes), - N = mem3_util:n_val(couch_util:get_value(n, Options), NodeCount), - Q = mem3_util:to_integer(couch_util:get_value(q, Options, - couch_config:get("cluster", "q", "8"))), - % rotate to a random entry in the nodelist for even distribution - {A, B} = lists:split(crypto:rand_uniform(1,length(Nodes)+1), Nodes), - RotatedNodes = B ++ A, - mem3_util:create_partition_map(DbName, N, Q, RotatedNodes) - end. diff --git a/apps/mem3/src/mem3_app.erl b/apps/mem3/src/mem3_app.erl deleted file mode 100644 index 682c6aff..00000000 --- a/apps/mem3/src/mem3_app.erl +++ /dev/null @@ -1,23 +0,0 @@ -% Copyright 2010 Cloudant -% -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(mem3_app). --behaviour(application). --export([start/2, stop/1]). - -start(_Type, []) -> - mem3_sup:start_link(). - -stop([]) -> - ok. diff --git a/apps/mem3/src/mem3_cache.erl b/apps/mem3/src/mem3_cache.erl deleted file mode 100644 index 6614c29e..00000000 --- a/apps/mem3/src/mem3_cache.erl +++ /dev/null @@ -1,106 +0,0 @@ -% Copyright 2010 Cloudant -% -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(mem3_cache). --behaviour(gen_server). --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3]). - --export([start_link/0]). - --record(state, {changes_pid}). - --include("mem3.hrl"). --include_lib("couch/include/couch_db.hrl"). - -start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). - -init([]) -> - ets:new(partitions, [bag, public, named_table, {keypos,#shard.dbname}]), - {Pid, _} = spawn_monitor(fun() -> listen_for_changes(0) end), - {ok, #state{changes_pid = Pid}}. - -handle_call(_Call, _From, State) -> - {noreply, State}. - -handle_cast(_Msg, State) -> - {noreply, State}. - -handle_info({'DOWN', _, _, Pid, {badarg, [{ets,delete,[partitions,_]}|_]}}, - #state{changes_pid=Pid} = State) -> - % fatal error, somebody deleted our ets table - {stop, ets_table_error, State}; -handle_info({'DOWN', _, _, Pid, Reason}, #state{changes_pid=Pid} = State) -> - ?LOG_INFO("~p changes listener died ~p", [?MODULE, Reason]), - Seq = case Reason of {seq, EndSeq} -> EndSeq; _ -> 0 end, - erlang:send_after(5000, self(), {start_listener, Seq}), - {noreply, State}; -handle_info({start_listener, Seq}, State) -> - {NewPid, _} = spawn_monitor(fun() -> listen_for_changes(Seq) end), - {noreply, State#state{changes_pid=NewPid}}; -handle_info(_Msg, State) -> - {noreply, State}. - -terminate(_Reason, #state{changes_pid=Pid}) -> - exit(Pid, kill), - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%% internal functions - -listen_for_changes(Since) -> - DbName = ?l2b(couch_config:get("mem3", "db", "dbs")), - {ok, Db} = ensure_exists(DbName), - Args = #changes_args{ - feed = "continuous", - since = Since, - heartbeat = true, - include_docs = true - }, - ChangesFun = couch_changes:handle_changes(Args, nil, Db), - ChangesFun(fun changes_callback/2). - -ensure_exists(DbName) -> - Options = [{user_ctx, #user_ctx{roles=[<<"_admin">>]}}], - case couch_db:open(DbName, Options) of - {ok, Db} -> - {ok, Db}; - _ -> - couch_server:create(DbName, Options) - end. - -changes_callback(start, _) -> - {ok, nil}; -changes_callback({stop, EndSeq}, _) -> - exit({seq, EndSeq}); -changes_callback({change, {Change}, _}, _) -> - DbName = couch_util:get_value(<<"id">>, Change), - case couch_util:get_value(deleted, Change, false) of - true -> - ets:delete(partitions, DbName); - false -> - case couch_util:get_value(doc, Change) of - {error, Reason} -> - ?LOG_ERROR("missing partition table for ~s: ~p", [DbName, Reason]); - {Doc} -> - ets:delete(partitions, DbName), - ets:insert(partitions, mem3_util:build_shards(DbName, Doc)) - end - end, - {ok, couch_util:get_value(<<"seq">>, Change)}; -changes_callback(timeout, _) -> - {ok, nil}. diff --git a/apps/mem3/src/mem3_httpd.erl b/apps/mem3/src/mem3_httpd.erl deleted file mode 100644 index 6810562e..00000000 --- a/apps/mem3/src/mem3_httpd.erl +++ /dev/null @@ -1,53 +0,0 @@ -% Copyright 2010 Cloudant -% -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(mem3_httpd). - --export([handle_membership_req/1]). - -%% includes --include("mem3.hrl"). --include_lib("couch/include/couch_db.hrl"). - - -handle_membership_req(#httpd{method='GET', - path_parts=[<<"_membership">>]} = Req) -> - ClusterNodes = try mem3:nodes() - catch _:_ -> {ok,[]} end, - couch_httpd:send_json(Req, {[ - {all_nodes, lists:sort([node()|nodes()])}, - {cluster_nodes, lists:sort(ClusterNodes)} - ]}); -handle_membership_req(#httpd{method='GET', - path_parts=[<<"_membership">>, <<"parts">>, DbName]} = Req) -> - ClusterNodes = try mem3:nodes() - catch _:_ -> {ok,[]} end, - Shards = mem3:shards(DbName), - JsonShards = json_shards(Shards, dict:new()), - couch_httpd:send_json(Req, {[ - {all_nodes, lists:sort([node()|nodes()])}, - {cluster_nodes, lists:sort(ClusterNodes)}, - {partitions, JsonShards} - ]}). - -%% -%% internal -%% - -json_shards([], AccIn) -> - List = dict:to_list(AccIn), - {lists:sort(List)}; -json_shards([#shard{node=Node, range=[B,_E]} | Rest], AccIn) -> - HexBeg = couch_util:to_hex(<<B:32/integer>>), - json_shards(Rest, dict:append(HexBeg, Node, AccIn)). diff --git a/apps/mem3/src/mem3_nodes.erl b/apps/mem3/src/mem3_nodes.erl deleted file mode 100644 index f9320598..00000000 --- a/apps/mem3/src/mem3_nodes.erl +++ /dev/null @@ -1,134 +0,0 @@ -% Copyright 2010 Cloudant -% -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(mem3_nodes). --behaviour(gen_server). --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3]). - --export([start_link/0, get_nodelist/0]). - --include("mem3.hrl"). --include_lib("couch/include/couch_db.hrl"). - --record(state, {changes_pid, update_seq, nodes}). - -start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). - -get_nodelist() -> - gen_server:call(?MODULE, get_nodelist). - -init([]) -> - {Nodes, UpdateSeq} = initialize_nodelist(), - {Pid, _} = spawn_monitor(fun() -> listen_for_changes(UpdateSeq) end), - {ok, #state{changes_pid = Pid, update_seq = UpdateSeq, nodes = Nodes}}. - -handle_call(get_nodelist, _From, State) -> - {reply, State#state.nodes, State}; -handle_call({add_node, Node}, _From, #state{nodes=Nodes} = State) -> - gen_event:notify(mem3_events, {add_node, Node}), - {reply, ok, State#state{nodes = lists:umerge([Node], Nodes)}}; -handle_call({remove_node, Node}, _From, #state{nodes=Nodes} = State) -> - gen_event:notify(mem3_events, {remove_node, Node}), - {reply, ok, State#state{nodes = lists:delete(Node, Nodes)}}; -handle_call(_Call, _From, State) -> - {noreply, State}. - -handle_cast(_Msg, State) -> - {noreply, State}. - -handle_info({'DOWN', _, _, Pid, Reason}, #state{changes_pid=Pid} = State) -> - ?LOG_INFO("~p changes listener died ~p", [?MODULE, Reason]), - StartSeq = State#state.update_seq, - Seq = case Reason of {seq, EndSeq} -> EndSeq; _ -> StartSeq end, - erlang:send_after(5000, self(), start_listener), - {noreply, State#state{update_seq = Seq}}; -handle_info(start_listener, #state{update_seq = Seq} = State) -> - {NewPid, _} = spawn_monitor(fun() -> listen_for_changes(Seq) end), - {noreply, State#state{changes_pid=NewPid}}; -handle_info(_Info, State) -> - {noreply, State}. - -terminate(_Reason, _State) -> - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%% internal functions - -initialize_nodelist() -> - DbName = couch_config:get("mem3", "nodedb", "nodes"), - {ok, Db} = ensure_exists(DbName), - {ok, _, Nodes0} = couch_btree:fold(Db#db.id_tree, fun first_fold/3, [], []), - % add self if not already present - case lists:member(node(), Nodes0) of - true -> - Nodes = Nodes0; - false -> - Doc = #doc{id = couch_util:to_binary(node())}, - {ok, _} = couch_db:update_doc(Db, Doc, []), - Nodes = [node() | Nodes0] - end, - couch_db:close(Db), - {lists:sort(Nodes), Db#db.update_seq}. - -first_fold(#full_doc_info{id = <<"_design/", _/binary>>}, _, Acc) -> - {ok, Acc}; -first_fold(#full_doc_info{deleted=true}, _, Acc) -> - {ok, Acc}; -first_fold(#full_doc_info{id=Id}, _, Acc) -> - {ok, [mem3_util:to_atom(Id) | Acc]}. - -listen_for_changes(Since) -> - DbName = ?l2b(couch_config:get("mem3", "nodedb", "nodes")), - {ok, Db} = ensure_exists(DbName), - Args = #changes_args{ - feed = "continuous", - since = Since, - heartbeat = true, - include_docs = true - }, - ChangesFun = couch_changes:handle_changes(Args, nil, Db), - ChangesFun(fun changes_callback/2). - -ensure_exists(DbName) when is_list(DbName) -> - ensure_exists(list_to_binary(DbName)); -ensure_exists(DbName) -> - Options = [{user_ctx, #user_ctx{roles=[<<"_admin">>]}}], - case couch_db:open(DbName, Options) of - {ok, Db} -> - {ok, Db}; - _ -> - couch_server:create(DbName, Options) - end. - -changes_callback(start, _) -> - {ok, nil}; -changes_callback({stop, EndSeq}, _) -> - exit({seq, EndSeq}); -changes_callback({change, {Change}, _}, _) -> - Node = couch_util:get_value(<<"id">>, Change), - case Node of <<"_design/", _/binary>> -> ok; _ -> - case couch_util:get_value(deleted, Change, false) of - false -> - gen_server:call(?MODULE, {add_node, mem3_util:to_atom(Node)}); - true -> - gen_server:call(?MODULE, {remove_node, mem3_util:to_atom(Node)}) - end - end, - {ok, couch_util:get_value(<<"seq">>, Change)}; -changes_callback(timeout, _) -> - {ok, nil}. diff --git a/apps/mem3/src/mem3_rep.erl b/apps/mem3/src/mem3_rep.erl deleted file mode 100644 index f80eeb3d..00000000 --- a/apps/mem3/src/mem3_rep.erl +++ /dev/null @@ -1,106 +0,0 @@ --module(mem3_rep). - --export([go/2, changes_enumerator/2, make_local_id/2]). - --include("mem3.hrl"). --include_lib("couch/include/couch_db.hrl"). - --define(CTX, #user_ctx{roles = [<<"_admin">>]}). - --record(acc, {revcount = 0, infos = [], seq, localid, source, target}). - -go(#shard{} = Source, #shard{} = Target) -> - LocalId = make_local_id(Source, Target), - {ok, Db} = couch_db:open(Source#shard.name, [{user_ctx,?CTX}]), - try go(Db, Target, LocalId) after couch_db:close(Db) end. - -go(#db{} = Db, #shard{} = Target, LocalId) -> - Seq = calculate_start_seq(Db, Target, LocalId), - Acc0 = #acc{source=Db, target=Target, seq=Seq, localid=LocalId}, - Fun = fun ?MODULE:changes_enumerator/2, - {ok, AccOut} = couch_db:changes_since(Db, all_docs, Seq, Fun, [], Acc0), - {ok, _} = replicate_batch(AccOut). - -make_local_id(#shard{node=SourceNode}, #shard{node=TargetNode}) -> - S = couch_util:encodeBase64Url(couch_util:md5(term_to_binary(SourceNode))), - T = couch_util:encodeBase64Url(couch_util:md5(term_to_binary(TargetNode))), - <<"_local/shard-sync-", S/binary, "-", T/binary>>. - -changes_enumerator(DocInfo, #acc{revcount = C} = Acc) when C >= 99 -> - Seq = DocInfo#doc_info.high_seq, - replicate_batch(Acc#acc{seq = Seq, infos = [DocInfo | Acc#acc.infos]}); - -changes_enumerator(DocInfo, #acc{revcount = C, infos = Infos} = Acc) -> - RevCount = C + length(DocInfo#doc_info.revs), - Seq = DocInfo#doc_info.high_seq, - {ok, Acc#acc{seq = Seq, revcount = RevCount, infos = [DocInfo | Infos]}}. - -replicate_batch(#acc{target = #shard{node=Node, name=Name}} = Acc) -> - case find_missing_revs(Acc) of - [] -> - ok; - Missing -> - ok = save_on_target(Node, Name, open_docs(Acc#acc.source, Missing)) - end, - update_locals(Acc), - {ok, Acc#acc{revcount=0, infos=[]}}. - -find_missing_revs(Acc) -> - #acc{target = #shard{node=Node, name=Name}, infos = Infos} = Acc, - IdsRevs = lists:map(fun(#doc_info{id=Id, revs=RevInfos}) -> - {Id, [R || #rev_info{rev=R} <- RevInfos]} - end, Infos), - rexi_call(Node, {fabric_rpc, get_missing_revs, [Name, IdsRevs]}). - -open_docs(Db, Missing) -> - lists:flatmap(fun({Id, Revs, _}) -> - {ok, Docs} = couch_db:open_doc_revs(Db, Id, Revs, [latest]), - [Doc || {ok, Doc} <- Docs] - end, Missing). - -save_on_target(Node, Name, Docs) -> - Options = [replicated_changes, full_commit, {user_ctx, ?CTX}], - rexi_call(Node, {fabric_rpc, update_docs, [Name, Docs, Options]}), - ok. - -update_locals(Acc) -> - #acc{seq=Seq, source=Db, target=Target, localid=Id} = Acc, - #shard{name=Name, node=Node} = Target, - Doc = #doc{id = Id, body = {[ - {<<"seq">>, Seq}, - {<<"timestamp">>, list_to_binary(iso8601_timestamp())} - ]}}, - {ok, _} = couch_db:update_doc(Db, Doc, []), - Options = [{user_ctx, ?CTX}], - rexi_call(Node, {fabric_rpc, update_docs, [Name, [Doc], Options]}). - -rexi_call(Node, MFA) -> - Ref = rexi:cast(Node, MFA), - receive {Ref, {ok, Reply}} -> - Reply; - {Ref, Error} -> - erlang:error(Error) - after 60000 -> - erlang:error(timeout) - end. - -calculate_start_seq(Db, #shard{node=Node, name=Name}, LocalId) -> - case couch_db:open_doc(Db, LocalId, []) of - {ok, #doc{body = {SProps}}} -> - try rexi_call(Node, {fabric_rpc, open_doc, [Name, LocalId, []]}) of - #doc{body = {TProps}} -> - SourceSeq = couch_util:get_value(<<"seq">>, SProps, 0), - TargetSeq = couch_util:get_value(<<"seq">>, TProps, 0), - erlang:min(SourceSeq, TargetSeq) - catch error:_ -> - 0 - end; - _ -> - 0 - end. - -iso8601_timestamp() -> - {_,_,Micro} = Now = os:timestamp(), - {{Year,Month,Date},{Hour,Minute,Second}} = calendar:now_to_datetime(Now), - Format = "~4.10.0B-~2.10.0B-~2.10.0BT~2.10.0B:~2.10.0B:~2.10.0B.~6.10.0BZ", - io_lib:format(Format, [Year, Month, Date, Hour, Minute, Second, Micro]). diff --git a/apps/mem3/src/mem3_sup.erl b/apps/mem3/src/mem3_sup.erl deleted file mode 100644 index 9f93dd39..00000000 --- a/apps/mem3/src/mem3_sup.erl +++ /dev/null @@ -1,35 +0,0 @@ -% Copyright 2010 Cloudant -% -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(mem3_sup). --behaviour(supervisor). --export([start_link/0, init/1]). - -start_link() -> - supervisor:start_link({local, ?MODULE}, ?MODULE, []). - -init(_Args) -> - Children = [ - child(mem3_events), - child(mem3_sync), - child(mem3_cache), - child(mem3_nodes) - ], - {ok, {{one_for_one,10,1}, Children}}. - -child(mem3_events) -> - MFA = {gen_event, start_link, [{local, mem3_events}]}, - {mem3_events, MFA, permanent, 1000, worker, dynamic}; -child(Child) -> - {Child, {Child, start_link, []}, permanent, 1000, worker, [Child]}. diff --git a/apps/mem3/src/mem3_sync.erl b/apps/mem3/src/mem3_sync.erl deleted file mode 100644 index a1ba4f8b..00000000 --- a/apps/mem3/src/mem3_sync.erl +++ /dev/null @@ -1,229 +0,0 @@ -% Copyright 2010 Cloudant -% -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(mem3_sync). --behaviour(gen_server). --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3]). - --export([start_link/0, get_active/0, get_queue/0, push/2, remove_node/1]). - --include("mem3.hrl"). --include_lib("couch/include/couch_db.hrl"). - --record(state, { - active = [], - count = 0, - limit, - dict = dict:new(), - waiting = [], - update_notifier -}). - -start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). - -get_active() -> - gen_server:call(?MODULE, get_active). - -get_queue() -> - gen_server:call(?MODULE, get_queue). - -push(Db, Node) -> - gen_server:cast(?MODULE, {push, Db, Node}). - -remove_node(Node) -> - gen_server:cast(?MODULE, {remove_node, Node}). - -init([]) -> - process_flag(trap_exit, true), - Concurrency = couch_config:get("mem3", "sync_concurrency", "10"), - gen_event:add_handler(mem3_events, mem3_sync_event, []), - {ok, Pid} = start_update_notifier(), - spawn(fun initial_sync/0), - {ok, #state{limit = list_to_integer(Concurrency), update_notifier=Pid}}. - -handle_call(get_active, _From, State) -> - {reply, State#state.active, State}; - -handle_call(get_queue, _From, State) -> - {reply, State#state.waiting, State}. - -handle_cast({push, DbName, Node}, #state{count=Count, limit=Limit} = State) - when Count >= Limit -> - {noreply, add_to_queue(State, DbName, Node)}; - -handle_cast({push, DbName, Node}, State) -> - #state{active = L, count = C} = State, - case is_running(DbName, Node, L) of - true -> - {noreply, add_to_queue(State, DbName, Node)}; - false -> - Pid = start_push_replication(DbName, Node), - {noreply, State#state{active=[{DbName, Node, Pid}|L], count=C+1}} - end; - -handle_cast({remove_node, Node}, State) -> - Waiting = [{S,N} || {S,N} <- State#state.waiting, N =/= Node], - Dict = lists:foldl(fun(DbName,D) -> dict:erase({DbName,Node}, D) end, - State#state.dict, [S || {S,N} <- Waiting, N =:= Node]), - {noreply, State#state{dict = Dict, waiting = Waiting}}. - -handle_info({'EXIT', Pid, _}, #state{update_notifier=Pid} = State) -> - {ok, NewPid} = start_update_notifier(), - {noreply, State#state{update_notifier=NewPid}}; - -handle_info({'EXIT', Active, normal}, State) -> - handle_replication_exit(State, Active); - -handle_info({'EXIT', Active, Reason}, State) -> - case lists:keyfind(Active, 3, State#state.active) of - {OldDbName, OldNode, _} -> - ?LOG_ERROR("~p replication ~s -> ~p died:~n~p", [?MODULE, OldDbName, - OldNode, Reason]), - timer:apply_after(5000, ?MODULE, push, [OldDbName, OldNode]); - false -> ok end, - handle_replication_exit(State, Active); - -handle_info(Msg, State) -> - ?LOG_ERROR("unexpected msg at replication manager ~p", [Msg]), - {noreply, State}. - -terminate(_Reason, State) -> - [exit(Pid, shutdown) || {_,_,Pid} <- State#state.active], - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -handle_replication_exit(#state{waiting=[]} = State, Pid) -> - NewActive = lists:keydelete(Pid, 3, State#state.active), - {noreply, State#state{active=NewActive, count=length(NewActive)}}; -handle_replication_exit(State, Pid) -> - #state{active=Active, limit=Limit, dict=D, waiting=Waiting} = State, - Active1 = lists:keydelete(Pid, 3, Active), - Count = length(Active1), - NewState = if Count < Limit -> - case next_replication(Active1, Waiting) of - nil -> % all waiting replications are also active - State#state{active = Active1, count = Count}; - {DbName, Node, StillWaiting} -> - NewPid = start_push_replication(DbName, Node), - State#state{ - active = [{DbName, Node, NewPid} | Active1], - count = Count+1, - dict = dict:erase({DbName,Node}, D), - waiting = StillWaiting - } - end; - true -> - State#state{active = Active1, count=Count} - end, - {noreply, NewState}. - -start_push_replication(DbName, Node) -> - PostBody = {[ - {<<"source">>, DbName}, - {<<"target">>, {[{<<"node">>, Node}, {<<"name">>, DbName}]}}, - {<<"continuous">>, false}, - {<<"async">>, true} - ]}, - ?LOG_INFO("starting ~s -> ~p internal replication", [DbName, Node]), - UserCtx = #user_ctx{name = <<"replicator">>, roles = [<<"_admin">>]}, - case (catch couch_rep:replicate(PostBody, UserCtx)) of - Pid when is_pid(Pid) -> - link(Pid), - Pid; - {db_not_found, _Msg} -> - case couch_db:open(DbName, []) of - {ok, Db} -> - % source exists, let's (re)create the target - couch_db:close(Db), - case rpc:call(Node, couch_api, create_db, [DbName, []]) of - {ok, Target} -> - ?LOG_INFO("~p successfully created ~s on ~p", [?MODULE, DbName, - Node]), - couch_db:close(Target), - start_push_replication(DbName, Node); - file_exists -> - start_push_replication(DbName, Node); - Error -> - ?LOG_ERROR("~p couldn't create ~s on ~p because ~p", - [?MODULE, DbName, Node, Error]), - exit(shutdown) - end; - {not_found, no_db_file} -> - % source is gone, so this is a hack to skip it - ?LOG_INFO("~p tried to push ~s to ~p but it was already deleted", - [?MODULE, DbName, Node]), - spawn_link(fun() -> ok end) - end; - {node_not_connected, _} -> - % we'll get this one when the node rejoins - ?LOG_ERROR("~p exiting because ~p is not connected", [?MODULE, Node]), - spawn_link(fun() -> ok end); - CatchAll -> - ?LOG_INFO("~p strange error ~p", [?MODULE, CatchAll]), - case lists:member(Node, nodes()) of - true -> - timer:apply_after(5000, ?MODULE, push, [DbName, Node]); - false -> - ok - end, - spawn_link(fun() -> ok end) - end. - -add_to_queue(State, DbName, Node) -> - #state{dict=D, waiting=Waiting} = State, - case dict:is_key({DbName, Node}, D) of - true -> - State; - false -> - ?LOG_DEBUG("adding ~s -> ~p to internal queue", [DbName, Node]), - State#state{ - dict = dict:store({DbName,Node}, ok, D), - waiting = Waiting ++ [{DbName,Node}] - } - end. - -initial_sync() -> - Db1 = ?l2b(couch_config:get("mem3", "node_db", "nodes")), - Db2 = ?l2b(couch_config:get("mem3", "shard_db", "dbs")), - Nodes = mem3:nodes(), - Live = nodes(), - [[push(Db, N) || Db <- [Db1,Db2]] || N <- Nodes, lists:member(N, Live)]. - -start_update_notifier() -> - Db1 = ?l2b(couch_config:get("mem3", "node_db", "nodes")), - Db2 = ?l2b(couch_config:get("mem3", "shard_db", "dbs")), - couch_db_update_notifier:start_link(fun - ({updated, Db}) when Db == Db1; Db == Db2 -> - Nodes = mem3:nodes(), - Live = nodes(), - [?MODULE:push(Db, N) || N <- Nodes, lists:member(N, Live)]; - (_) -> ok end). - -%% @doc Finds the next {DbName,Node} pair in the list of waiting replications -%% which does not correspond to an already running replication --spec next_replication(list(), list()) -> {binary(),node(),list()} | nil. -next_replication(Active, Waiting) -> - case lists:splitwith(fun({S,N}) -> is_running(S,N,Active) end, Waiting) of - {_, []} -> - nil; - {Running, [{DbName,Node}|Rest]} -> - {DbName, Node, Running ++ Rest} - end. - -is_running(DbName, Node, ActiveList) -> - [] =/= [true || {S,N,_} <- ActiveList, S=:=DbName, N=:=Node]. diff --git a/apps/mem3/src/mem3_sync_event.erl b/apps/mem3/src/mem3_sync_event.erl deleted file mode 100644 index 5ee93b7a..00000000 --- a/apps/mem3/src/mem3_sync_event.erl +++ /dev/null @@ -1,58 +0,0 @@ -% Copyright 2010 Cloudant -% -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(mem3_sync_event). --behaviour(gen_event). - --export([init/1, handle_event/2, handle_call/2, handle_info/2, terminate/2, - code_change/3]). - -init(_) -> - {ok, nil}. - -handle_event({add_node, Node}, State) -> - Db1 = list_to_binary(couch_config:get("mem3", "node_db", "nodes")), - Db2 = list_to_binary(couch_config:get("mem3", "shard_db", "dbs")), - [mem3_sync:push(Db, Node) || Db <- [Db1, Db2]], - {ok, State}; - -handle_event({nodeup, Node}, State) -> - case lists:member(Node, mem3:nodes()) of - true -> - Db1 = list_to_binary(couch_config:get("mem3", "node_db", "nodes")), - Db2 = list_to_binary(couch_config:get("mem3", "shard_db", "dbs")), - [mem3_sync:push(Db, Node) || Db <- [Db1, Db2]]; - false -> - ok - end, - {ok, State}; - -handle_event({Down, Node}, State) when Down == nodedown; Down == remove_node -> - mem3_sync:remove_node(Node), - {ok, State}; - -handle_event(_Event, State) -> - {ok, State}. - -handle_call(_Request, State) -> - {ok, ok, State}. - -handle_info(_Info, State) -> - {ok, State}. - -terminate(_Reason, _State) -> - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. diff --git a/apps/mem3/src/mem3_util.erl b/apps/mem3/src/mem3_util.erl deleted file mode 100644 index 5e866ccf..00000000 --- a/apps/mem3/src/mem3_util.erl +++ /dev/null @@ -1,153 +0,0 @@ -% Copyright 2010 Cloudant -% -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(mem3_util). - --export([hash/1, name_shard/1, create_partition_map/4, build_shards/2, - n_val/2, to_atom/1, to_integer/1, write_db_doc/1, delete_db_doc/1, - load_shards_from_disk/1, load_shards_from_disk/2]). - --define(RINGTOP, 2 bsl 31). % CRC32 space - --include("mem3.hrl"). --include_lib("couch/include/couch_db.hrl"). - -hash(Item) when is_binary(Item) -> - erlang:crc32(Item); -hash(Item) -> - erlang:crc32(term_to_binary(Item)). - -name_shard(#shard{dbname = DbName, range=[B,E]} = Shard) -> - Name = ["shards/", couch_util:to_hex(<<B:32/integer>>), "-", - couch_util:to_hex(<<E:32/integer>>), "/", DbName], - Shard#shard{name = ?l2b(Name)}. - -create_partition_map(DbName, N, Q, Nodes) -> - UniqueShards = make_key_ranges((?RINGTOP) div Q, 0, []), - Shards0 = lists:flatten([lists:duplicate(N, S) || S <- UniqueShards]), - Shards1 = attach_nodes(Shards0, [], Nodes, []), - [name_shard(S#shard{dbname=DbName}) || S <- Shards1]. - -make_key_ranges(_, CurrentPos, Acc) when CurrentPos >= ?RINGTOP -> - Acc; -make_key_ranges(Increment, Start, Acc) -> - case Start + 2*Increment of - X when X > ?RINGTOP -> - End = ?RINGTOP - 1; - _ -> - End = Start + Increment - 1 - end, - make_key_ranges(Increment, End+1, [#shard{range=[Start, End]} | Acc]). - -attach_nodes([], Acc, _, _) -> - lists:reverse(Acc); -attach_nodes(Shards, Acc, [], UsedNodes) -> - attach_nodes(Shards, Acc, lists:reverse(UsedNodes), []); -attach_nodes([S | Rest], Acc, [Node | Nodes], UsedNodes) -> - attach_nodes(Rest, [S#shard{node=Node} | Acc], Nodes, [Node | UsedNodes]). - -write_db_doc(Doc) -> - {ok, Db} = couch_db:open(<<"dbs">>, []), - try - update_db_doc(Db, Doc) - catch conflict -> - ok % assume this is a race with another shard on this node - after - couch_db:close(Db) - end. - -update_db_doc(Db, #doc{id=Id, body=Body} = Doc) -> - case couch_db:open_doc(Db, Id, []) of - {not_found, _} -> - {ok, _} = couch_db:update_doc(Db, Doc, []); - {ok, #doc{body=Body}} -> - ok; - {ok, OldDoc} -> - {ok, _} = couch_db:update_doc(Db, OldDoc#doc{body=Body}, []) - end. - -delete_db_doc(DocId) -> - {ok, Db} = couch_db:open(<<"dbs">>, []), - try - delete_db_doc(Db, DocId) - catch conflict -> - ok - after - couch_db:close(Db) - end. - -delete_db_doc(Db, DocId) -> - case couch_db:open_doc(Db, DocId, []) of - {not_found, _} -> - ok; - {ok, OldDoc} -> - {ok, _} = couch_db:update_doc(Db, OldDoc#doc{deleted=true}, []) - end. - -build_shards(DbName, DocProps) -> - {ByNode} = couch_util:get_value(<<"by_node">>, DocProps, {[]}), - lists:flatmap(fun({Node, Ranges}) -> - lists:map(fun(Range) -> - [B,E] = string:tokens(?b2l(Range), "-"), - Beg = httpd_util:hexlist_to_integer(B), - End = httpd_util:hexlist_to_integer(E), - name_shard(#shard{ - dbname = DbName, - node = to_atom(Node), - range = [Beg, End] - }) - end, Ranges) - end, ByNode). - -to_atom(Node) when is_binary(Node) -> - list_to_atom(binary_to_list(Node)); -to_atom(Node) when is_atom(Node) -> - Node. - -to_integer(N) when is_integer(N) -> - N; -to_integer(N) when is_binary(N) -> - list_to_integer(binary_to_list(N)); -to_integer(N) when is_list(N) -> - list_to_integer(N). - -n_val(undefined, NodeCount) -> - n_val(couch_config:get("cluster", "n", "3"), NodeCount); -n_val(N, NodeCount) when is_list(N) -> - n_val(list_to_integer(N), NodeCount); -n_val(N, NodeCount) when is_integer(NodeCount), N > NodeCount -> - ?LOG_ERROR("Request to create N=~p DB but only ~p node(s)", [N, NodeCount]), - NodeCount; -n_val(N, _) when N < 1 -> - 1; -n_val(N, _) -> - N. - -load_shards_from_disk(DbName) when is_binary(DbName) -> - {ok, Db} = couch_db:open(<<"dbs">>, []), - try load_shards_from_db(Db, DbName) after couch_db:close(Db) end. - -load_shards_from_db(#db{} = ShardDb, DbName) -> - case couch_db:open_doc(ShardDb, DbName, []) of - {ok, #doc{body = {Props}}} -> - ?LOG_INFO("dbs cache miss for ~s", [DbName]), - build_shards(DbName, Props); - {not_found, _} -> - erlang:error(database_does_not_exist) - end. - -load_shards_from_disk(DbName, DocId)-> - Shards = load_shards_from_disk(DbName), - HashKey = hash(DocId), - [S || #shard{range = [B,E]} = S <- Shards, B < HashKey, HashKey =< E]. |