summaryrefslogtreecommitdiff
path: root/apps/mem3/src
diff options
context:
space:
mode:
authorAdam Kocoloski <adam@cloudant.com>2010-10-25 15:46:05 -0400
committerAdam Kocoloski <adam@cloudant.com>2010-10-25 21:45:32 -0400
commitebac05f686b56791511cb9b599dfb5a742dcfc96 (patch)
tree00a789cd058f98fa014d927f094f9e6e9f91f6f2 /apps/mem3/src
parent952a85381ff4b5b34426000b1dee73c9e74becdd (diff)
use get-deps to pull down individual cloudant projects
Diffstat (limited to 'apps/mem3/src')
-rw-r--r--apps/mem3/src/mem3.app.src13
-rw-r--r--apps/mem3/src/mem3.erl121
-rw-r--r--apps/mem3/src/mem3_app.erl23
-rw-r--r--apps/mem3/src/mem3_cache.erl106
-rw-r--r--apps/mem3/src/mem3_httpd.erl53
-rw-r--r--apps/mem3/src/mem3_nodes.erl134
-rw-r--r--apps/mem3/src/mem3_rep.erl106
-rw-r--r--apps/mem3/src/mem3_sup.erl35
-rw-r--r--apps/mem3/src/mem3_sync.erl229
-rw-r--r--apps/mem3/src/mem3_sync_event.erl58
-rw-r--r--apps/mem3/src/mem3_util.erl153
11 files changed, 0 insertions, 1031 deletions
diff --git a/apps/mem3/src/mem3.app.src b/apps/mem3/src/mem3.app.src
deleted file mode 100644
index 38f85c68..00000000
--- a/apps/mem3/src/mem3.app.src
+++ /dev/null
@@ -1,13 +0,0 @@
-{application, mem3, [
- {description, "CouchDB Cluster Membership"},
- {mod, {mem3_app, []}},
- {vsn, "1.0.1"},
- {registered, [
- mem3_cache,
- mem3_events,
- mem3_nodes,
- mem3_sync,
- mem3_sup
- ]},
- {applications, [kernel, stdlib, sasl, crypto, mochiweb, couch]}
-]}.
diff --git a/apps/mem3/src/mem3.erl b/apps/mem3/src/mem3.erl
deleted file mode 100644
index 1021ee5b..00000000
--- a/apps/mem3/src/mem3.erl
+++ /dev/null
@@ -1,121 +0,0 @@
-% Copyright 2010 Cloudant
-%
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(mem3).
-
--export([start/0, stop/0, restart/0, nodes/0, shards/1, shards/2,
- choose_shards/2, n/1]).
--export([compare_nodelists/0, compare_shards/1]).
-
--include("mem3.hrl").
-
-start() ->
- application:start(mem3).
-
-stop() ->
- application:stop(mem3).
-
-restart() ->
- stop(),
- start().
-
-%% @doc Detailed report of cluster-wide membership state. Queries the state
-%% on all member nodes and builds a dictionary with unique states as the
-%% key and the nodes holding that state as the value. Also reports member
-%% nodes which fail to respond and nodes which are connected but are not
-%% cluster members. Useful for debugging.
--spec compare_nodelists() -> [{{cluster_nodes, [node()]} | bad_nodes
- | non_member_nodes, [node()]}].
-compare_nodelists() ->
- Nodes = mem3:nodes(),
- AllNodes = erlang:nodes([this, visible]),
- {Replies, BadNodes} = gen_server:multi_call(Nodes, mem3_nodes, get_nodelist),
- Dict = lists:foldl(fun({Node, Nodelist}, D) ->
- orddict:append({cluster_nodes, Nodelist}, Node, D)
- end, orddict:new(), Replies),
- [{non_member_nodes, AllNodes -- Nodes}, {bad_nodes, BadNodes} | Dict].
-
--spec compare_shards(DbName::iodata()) -> [{bad_nodes | [#shard{}], [node()]}].
-compare_shards(DbName) when is_list(DbName) ->
- compare_shards(list_to_binary(DbName));
-compare_shards(DbName) ->
- Nodes = mem3:nodes(),
- {Replies, BadNodes} = rpc:multicall(mem3, shards, [DbName]),
- GoodNodes = [N || N <- Nodes, not lists:member(N, BadNodes)],
- Dict = lists:foldl(fun({Shards, Node}, D) ->
- orddict:append(Shards, Node, D)
- end, orddict:new(), lists:zip(Replies, GoodNodes)),
- [{bad_nodes, BadNodes} | Dict].
-
--spec n(DbName::iodata()) -> integer().
-n(DbName) ->
- length(mem3:shards(DbName, <<"foo">>)).
-
--spec nodes() -> [node()].
-nodes() ->
- mem3_nodes:get_nodelist().
-
--spec shards(DbName::iodata()) -> [#shard{}].
-shards(DbName) when is_list(DbName) ->
- shards(list_to_binary(DbName));
-shards(DbName) ->
- try ets:lookup(partitions, DbName) of
- [] ->
- mem3_util:load_shards_from_disk(DbName);
- Else ->
- Else
- catch error:badarg ->
- mem3_util:load_shards_from_disk(DbName)
- end.
-
--spec shards(DbName::iodata(), DocId::binary()) -> [#shard{}].
-shards(DbName, DocId) when is_list(DbName) ->
- shards(list_to_binary(DbName), DocId);
-shards(DbName, DocId) when is_list(DocId) ->
- shards(DbName, list_to_binary(DocId));
-shards(DbName, DocId) ->
- HashKey = mem3_util:hash(DocId),
- Head = #shard{
- name = '_',
- node = '_',
- dbname = DbName,
- range = ['$1','$2'],
- ref = '_'
- },
- Conditions = [{'<', '$1', HashKey}, {'=<', HashKey, '$2'}],
- try ets:select(partitions, [{Head, Conditions, ['$_']}]) of
- [] ->
- mem3_util:load_shards_from_disk(DbName, DocId);
- Shards ->
- Shards
- catch error:badarg ->
- mem3_util:load_shards_from_disk(DbName, DocId)
- end.
-
--spec choose_shards(DbName::iodata(), Options::list()) -> [#shard{}].
-choose_shards(DbName, Options) when is_list(DbName) ->
- choose_shards(list_to_binary(DbName), Options);
-choose_shards(DbName, Options) ->
- try shards(DbName)
- catch error:E when E==database_does_not_exist; E==badarg ->
- Nodes = mem3:nodes(),
- NodeCount = length(Nodes),
- N = mem3_util:n_val(couch_util:get_value(n, Options), NodeCount),
- Q = mem3_util:to_integer(couch_util:get_value(q, Options,
- couch_config:get("cluster", "q", "8"))),
- % rotate to a random entry in the nodelist for even distribution
- {A, B} = lists:split(crypto:rand_uniform(1,length(Nodes)+1), Nodes),
- RotatedNodes = B ++ A,
- mem3_util:create_partition_map(DbName, N, Q, RotatedNodes)
- end.
diff --git a/apps/mem3/src/mem3_app.erl b/apps/mem3/src/mem3_app.erl
deleted file mode 100644
index 682c6aff..00000000
--- a/apps/mem3/src/mem3_app.erl
+++ /dev/null
@@ -1,23 +0,0 @@
-% Copyright 2010 Cloudant
-%
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(mem3_app).
--behaviour(application).
--export([start/2, stop/1]).
-
-start(_Type, []) ->
- mem3_sup:start_link().
-
-stop([]) ->
- ok.
diff --git a/apps/mem3/src/mem3_cache.erl b/apps/mem3/src/mem3_cache.erl
deleted file mode 100644
index 6614c29e..00000000
--- a/apps/mem3/src/mem3_cache.erl
+++ /dev/null
@@ -1,106 +0,0 @@
-% Copyright 2010 Cloudant
-%
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(mem3_cache).
--behaviour(gen_server).
--export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
- code_change/3]).
-
--export([start_link/0]).
-
--record(state, {changes_pid}).
-
--include("mem3.hrl").
--include_lib("couch/include/couch_db.hrl").
-
-start_link() ->
- gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-
-init([]) ->
- ets:new(partitions, [bag, public, named_table, {keypos,#shard.dbname}]),
- {Pid, _} = spawn_monitor(fun() -> listen_for_changes(0) end),
- {ok, #state{changes_pid = Pid}}.
-
-handle_call(_Call, _From, State) ->
- {noreply, State}.
-
-handle_cast(_Msg, State) ->
- {noreply, State}.
-
-handle_info({'DOWN', _, _, Pid, {badarg, [{ets,delete,[partitions,_]}|_]}},
- #state{changes_pid=Pid} = State) ->
- % fatal error, somebody deleted our ets table
- {stop, ets_table_error, State};
-handle_info({'DOWN', _, _, Pid, Reason}, #state{changes_pid=Pid} = State) ->
- ?LOG_INFO("~p changes listener died ~p", [?MODULE, Reason]),
- Seq = case Reason of {seq, EndSeq} -> EndSeq; _ -> 0 end,
- erlang:send_after(5000, self(), {start_listener, Seq}),
- {noreply, State};
-handle_info({start_listener, Seq}, State) ->
- {NewPid, _} = spawn_monitor(fun() -> listen_for_changes(Seq) end),
- {noreply, State#state{changes_pid=NewPid}};
-handle_info(_Msg, State) ->
- {noreply, State}.
-
-terminate(_Reason, #state{changes_pid=Pid}) ->
- exit(Pid, kill),
- ok.
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-%% internal functions
-
-listen_for_changes(Since) ->
- DbName = ?l2b(couch_config:get("mem3", "db", "dbs")),
- {ok, Db} = ensure_exists(DbName),
- Args = #changes_args{
- feed = "continuous",
- since = Since,
- heartbeat = true,
- include_docs = true
- },
- ChangesFun = couch_changes:handle_changes(Args, nil, Db),
- ChangesFun(fun changes_callback/2).
-
-ensure_exists(DbName) ->
- Options = [{user_ctx, #user_ctx{roles=[<<"_admin">>]}}],
- case couch_db:open(DbName, Options) of
- {ok, Db} ->
- {ok, Db};
- _ ->
- couch_server:create(DbName, Options)
- end.
-
-changes_callback(start, _) ->
- {ok, nil};
-changes_callback({stop, EndSeq}, _) ->
- exit({seq, EndSeq});
-changes_callback({change, {Change}, _}, _) ->
- DbName = couch_util:get_value(<<"id">>, Change),
- case couch_util:get_value(deleted, Change, false) of
- true ->
- ets:delete(partitions, DbName);
- false ->
- case couch_util:get_value(doc, Change) of
- {error, Reason} ->
- ?LOG_ERROR("missing partition table for ~s: ~p", [DbName, Reason]);
- {Doc} ->
- ets:delete(partitions, DbName),
- ets:insert(partitions, mem3_util:build_shards(DbName, Doc))
- end
- end,
- {ok, couch_util:get_value(<<"seq">>, Change)};
-changes_callback(timeout, _) ->
- {ok, nil}.
diff --git a/apps/mem3/src/mem3_httpd.erl b/apps/mem3/src/mem3_httpd.erl
deleted file mode 100644
index 6810562e..00000000
--- a/apps/mem3/src/mem3_httpd.erl
+++ /dev/null
@@ -1,53 +0,0 @@
-% Copyright 2010 Cloudant
-%
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(mem3_httpd).
-
--export([handle_membership_req/1]).
-
-%% includes
--include("mem3.hrl").
--include_lib("couch/include/couch_db.hrl").
-
-
-handle_membership_req(#httpd{method='GET',
- path_parts=[<<"_membership">>]} = Req) ->
- ClusterNodes = try mem3:nodes()
- catch _:_ -> {ok,[]} end,
- couch_httpd:send_json(Req, {[
- {all_nodes, lists:sort([node()|nodes()])},
- {cluster_nodes, lists:sort(ClusterNodes)}
- ]});
-handle_membership_req(#httpd{method='GET',
- path_parts=[<<"_membership">>, <<"parts">>, DbName]} = Req) ->
- ClusterNodes = try mem3:nodes()
- catch _:_ -> {ok,[]} end,
- Shards = mem3:shards(DbName),
- JsonShards = json_shards(Shards, dict:new()),
- couch_httpd:send_json(Req, {[
- {all_nodes, lists:sort([node()|nodes()])},
- {cluster_nodes, lists:sort(ClusterNodes)},
- {partitions, JsonShards}
- ]}).
-
-%%
-%% internal
-%%
-
-json_shards([], AccIn) ->
- List = dict:to_list(AccIn),
- {lists:sort(List)};
-json_shards([#shard{node=Node, range=[B,_E]} | Rest], AccIn) ->
- HexBeg = couch_util:to_hex(<<B:32/integer>>),
- json_shards(Rest, dict:append(HexBeg, Node, AccIn)).
diff --git a/apps/mem3/src/mem3_nodes.erl b/apps/mem3/src/mem3_nodes.erl
deleted file mode 100644
index f9320598..00000000
--- a/apps/mem3/src/mem3_nodes.erl
+++ /dev/null
@@ -1,134 +0,0 @@
-% Copyright 2010 Cloudant
-%
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(mem3_nodes).
--behaviour(gen_server).
--export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
- code_change/3]).
-
--export([start_link/0, get_nodelist/0]).
-
--include("mem3.hrl").
--include_lib("couch/include/couch_db.hrl").
-
--record(state, {changes_pid, update_seq, nodes}).
-
-start_link() ->
- gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-
-get_nodelist() ->
- gen_server:call(?MODULE, get_nodelist).
-
-init([]) ->
- {Nodes, UpdateSeq} = initialize_nodelist(),
- {Pid, _} = spawn_monitor(fun() -> listen_for_changes(UpdateSeq) end),
- {ok, #state{changes_pid = Pid, update_seq = UpdateSeq, nodes = Nodes}}.
-
-handle_call(get_nodelist, _From, State) ->
- {reply, State#state.nodes, State};
-handle_call({add_node, Node}, _From, #state{nodes=Nodes} = State) ->
- gen_event:notify(mem3_events, {add_node, Node}),
- {reply, ok, State#state{nodes = lists:umerge([Node], Nodes)}};
-handle_call({remove_node, Node}, _From, #state{nodes=Nodes} = State) ->
- gen_event:notify(mem3_events, {remove_node, Node}),
- {reply, ok, State#state{nodes = lists:delete(Node, Nodes)}};
-handle_call(_Call, _From, State) ->
- {noreply, State}.
-
-handle_cast(_Msg, State) ->
- {noreply, State}.
-
-handle_info({'DOWN', _, _, Pid, Reason}, #state{changes_pid=Pid} = State) ->
- ?LOG_INFO("~p changes listener died ~p", [?MODULE, Reason]),
- StartSeq = State#state.update_seq,
- Seq = case Reason of {seq, EndSeq} -> EndSeq; _ -> StartSeq end,
- erlang:send_after(5000, self(), start_listener),
- {noreply, State#state{update_seq = Seq}};
-handle_info(start_listener, #state{update_seq = Seq} = State) ->
- {NewPid, _} = spawn_monitor(fun() -> listen_for_changes(Seq) end),
- {noreply, State#state{changes_pid=NewPid}};
-handle_info(_Info, State) ->
- {noreply, State}.
-
-terminate(_Reason, _State) ->
- ok.
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-%% internal functions
-
-initialize_nodelist() ->
- DbName = couch_config:get("mem3", "nodedb", "nodes"),
- {ok, Db} = ensure_exists(DbName),
- {ok, _, Nodes0} = couch_btree:fold(Db#db.id_tree, fun first_fold/3, [], []),
- % add self if not already present
- case lists:member(node(), Nodes0) of
- true ->
- Nodes = Nodes0;
- false ->
- Doc = #doc{id = couch_util:to_binary(node())},
- {ok, _} = couch_db:update_doc(Db, Doc, []),
- Nodes = [node() | Nodes0]
- end,
- couch_db:close(Db),
- {lists:sort(Nodes), Db#db.update_seq}.
-
-first_fold(#full_doc_info{id = <<"_design/", _/binary>>}, _, Acc) ->
- {ok, Acc};
-first_fold(#full_doc_info{deleted=true}, _, Acc) ->
- {ok, Acc};
-first_fold(#full_doc_info{id=Id}, _, Acc) ->
- {ok, [mem3_util:to_atom(Id) | Acc]}.
-
-listen_for_changes(Since) ->
- DbName = ?l2b(couch_config:get("mem3", "nodedb", "nodes")),
- {ok, Db} = ensure_exists(DbName),
- Args = #changes_args{
- feed = "continuous",
- since = Since,
- heartbeat = true,
- include_docs = true
- },
- ChangesFun = couch_changes:handle_changes(Args, nil, Db),
- ChangesFun(fun changes_callback/2).
-
-ensure_exists(DbName) when is_list(DbName) ->
- ensure_exists(list_to_binary(DbName));
-ensure_exists(DbName) ->
- Options = [{user_ctx, #user_ctx{roles=[<<"_admin">>]}}],
- case couch_db:open(DbName, Options) of
- {ok, Db} ->
- {ok, Db};
- _ ->
- couch_server:create(DbName, Options)
- end.
-
-changes_callback(start, _) ->
- {ok, nil};
-changes_callback({stop, EndSeq}, _) ->
- exit({seq, EndSeq});
-changes_callback({change, {Change}, _}, _) ->
- Node = couch_util:get_value(<<"id">>, Change),
- case Node of <<"_design/", _/binary>> -> ok; _ ->
- case couch_util:get_value(deleted, Change, false) of
- false ->
- gen_server:call(?MODULE, {add_node, mem3_util:to_atom(Node)});
- true ->
- gen_server:call(?MODULE, {remove_node, mem3_util:to_atom(Node)})
- end
- end,
- {ok, couch_util:get_value(<<"seq">>, Change)};
-changes_callback(timeout, _) ->
- {ok, nil}.
diff --git a/apps/mem3/src/mem3_rep.erl b/apps/mem3/src/mem3_rep.erl
deleted file mode 100644
index f80eeb3d..00000000
--- a/apps/mem3/src/mem3_rep.erl
+++ /dev/null
@@ -1,106 +0,0 @@
--module(mem3_rep).
-
--export([go/2, changes_enumerator/2, make_local_id/2]).
-
--include("mem3.hrl").
--include_lib("couch/include/couch_db.hrl").
-
--define(CTX, #user_ctx{roles = [<<"_admin">>]}).
-
--record(acc, {revcount = 0, infos = [], seq, localid, source, target}).
-
-go(#shard{} = Source, #shard{} = Target) ->
- LocalId = make_local_id(Source, Target),
- {ok, Db} = couch_db:open(Source#shard.name, [{user_ctx,?CTX}]),
- try go(Db, Target, LocalId) after couch_db:close(Db) end.
-
-go(#db{} = Db, #shard{} = Target, LocalId) ->
- Seq = calculate_start_seq(Db, Target, LocalId),
- Acc0 = #acc{source=Db, target=Target, seq=Seq, localid=LocalId},
- Fun = fun ?MODULE:changes_enumerator/2,
- {ok, AccOut} = couch_db:changes_since(Db, all_docs, Seq, Fun, [], Acc0),
- {ok, _} = replicate_batch(AccOut).
-
-make_local_id(#shard{node=SourceNode}, #shard{node=TargetNode}) ->
- S = couch_util:encodeBase64Url(couch_util:md5(term_to_binary(SourceNode))),
- T = couch_util:encodeBase64Url(couch_util:md5(term_to_binary(TargetNode))),
- <<"_local/shard-sync-", S/binary, "-", T/binary>>.
-
-changes_enumerator(DocInfo, #acc{revcount = C} = Acc) when C >= 99 ->
- Seq = DocInfo#doc_info.high_seq,
- replicate_batch(Acc#acc{seq = Seq, infos = [DocInfo | Acc#acc.infos]});
-
-changes_enumerator(DocInfo, #acc{revcount = C, infos = Infos} = Acc) ->
- RevCount = C + length(DocInfo#doc_info.revs),
- Seq = DocInfo#doc_info.high_seq,
- {ok, Acc#acc{seq = Seq, revcount = RevCount, infos = [DocInfo | Infos]}}.
-
-replicate_batch(#acc{target = #shard{node=Node, name=Name}} = Acc) ->
- case find_missing_revs(Acc) of
- [] ->
- ok;
- Missing ->
- ok = save_on_target(Node, Name, open_docs(Acc#acc.source, Missing))
- end,
- update_locals(Acc),
- {ok, Acc#acc{revcount=0, infos=[]}}.
-
-find_missing_revs(Acc) ->
- #acc{target = #shard{node=Node, name=Name}, infos = Infos} = Acc,
- IdsRevs = lists:map(fun(#doc_info{id=Id, revs=RevInfos}) ->
- {Id, [R || #rev_info{rev=R} <- RevInfos]}
- end, Infos),
- rexi_call(Node, {fabric_rpc, get_missing_revs, [Name, IdsRevs]}).
-
-open_docs(Db, Missing) ->
- lists:flatmap(fun({Id, Revs, _}) ->
- {ok, Docs} = couch_db:open_doc_revs(Db, Id, Revs, [latest]),
- [Doc || {ok, Doc} <- Docs]
- end, Missing).
-
-save_on_target(Node, Name, Docs) ->
- Options = [replicated_changes, full_commit, {user_ctx, ?CTX}],
- rexi_call(Node, {fabric_rpc, update_docs, [Name, Docs, Options]}),
- ok.
-
-update_locals(Acc) ->
- #acc{seq=Seq, source=Db, target=Target, localid=Id} = Acc,
- #shard{name=Name, node=Node} = Target,
- Doc = #doc{id = Id, body = {[
- {<<"seq">>, Seq},
- {<<"timestamp">>, list_to_binary(iso8601_timestamp())}
- ]}},
- {ok, _} = couch_db:update_doc(Db, Doc, []),
- Options = [{user_ctx, ?CTX}],
- rexi_call(Node, {fabric_rpc, update_docs, [Name, [Doc], Options]}).
-
-rexi_call(Node, MFA) ->
- Ref = rexi:cast(Node, MFA),
- receive {Ref, {ok, Reply}} ->
- Reply;
- {Ref, Error} ->
- erlang:error(Error)
- after 60000 ->
- erlang:error(timeout)
- end.
-
-calculate_start_seq(Db, #shard{node=Node, name=Name}, LocalId) ->
- case couch_db:open_doc(Db, LocalId, []) of
- {ok, #doc{body = {SProps}}} ->
- try rexi_call(Node, {fabric_rpc, open_doc, [Name, LocalId, []]}) of
- #doc{body = {TProps}} ->
- SourceSeq = couch_util:get_value(<<"seq">>, SProps, 0),
- TargetSeq = couch_util:get_value(<<"seq">>, TProps, 0),
- erlang:min(SourceSeq, TargetSeq)
- catch error:_ ->
- 0
- end;
- _ ->
- 0
- end.
-
-iso8601_timestamp() ->
- {_,_,Micro} = Now = os:timestamp(),
- {{Year,Month,Date},{Hour,Minute,Second}} = calendar:now_to_datetime(Now),
- Format = "~4.10.0B-~2.10.0B-~2.10.0BT~2.10.0B:~2.10.0B:~2.10.0B.~6.10.0BZ",
- io_lib:format(Format, [Year, Month, Date, Hour, Minute, Second, Micro]).
diff --git a/apps/mem3/src/mem3_sup.erl b/apps/mem3/src/mem3_sup.erl
deleted file mode 100644
index 9f93dd39..00000000
--- a/apps/mem3/src/mem3_sup.erl
+++ /dev/null
@@ -1,35 +0,0 @@
-% Copyright 2010 Cloudant
-%
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(mem3_sup).
--behaviour(supervisor).
--export([start_link/0, init/1]).
-
-start_link() ->
- supervisor:start_link({local, ?MODULE}, ?MODULE, []).
-
-init(_Args) ->
- Children = [
- child(mem3_events),
- child(mem3_sync),
- child(mem3_cache),
- child(mem3_nodes)
- ],
- {ok, {{one_for_one,10,1}, Children}}.
-
-child(mem3_events) ->
- MFA = {gen_event, start_link, [{local, mem3_events}]},
- {mem3_events, MFA, permanent, 1000, worker, dynamic};
-child(Child) ->
- {Child, {Child, start_link, []}, permanent, 1000, worker, [Child]}.
diff --git a/apps/mem3/src/mem3_sync.erl b/apps/mem3/src/mem3_sync.erl
deleted file mode 100644
index a1ba4f8b..00000000
--- a/apps/mem3/src/mem3_sync.erl
+++ /dev/null
@@ -1,229 +0,0 @@
-% Copyright 2010 Cloudant
-%
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(mem3_sync).
--behaviour(gen_server).
--export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
- code_change/3]).
-
--export([start_link/0, get_active/0, get_queue/0, push/2, remove_node/1]).
-
--include("mem3.hrl").
--include_lib("couch/include/couch_db.hrl").
-
--record(state, {
- active = [],
- count = 0,
- limit,
- dict = dict:new(),
- waiting = [],
- update_notifier
-}).
-
-start_link() ->
- gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-
-get_active() ->
- gen_server:call(?MODULE, get_active).
-
-get_queue() ->
- gen_server:call(?MODULE, get_queue).
-
-push(Db, Node) ->
- gen_server:cast(?MODULE, {push, Db, Node}).
-
-remove_node(Node) ->
- gen_server:cast(?MODULE, {remove_node, Node}).
-
-init([]) ->
- process_flag(trap_exit, true),
- Concurrency = couch_config:get("mem3", "sync_concurrency", "10"),
- gen_event:add_handler(mem3_events, mem3_sync_event, []),
- {ok, Pid} = start_update_notifier(),
- spawn(fun initial_sync/0),
- {ok, #state{limit = list_to_integer(Concurrency), update_notifier=Pid}}.
-
-handle_call(get_active, _From, State) ->
- {reply, State#state.active, State};
-
-handle_call(get_queue, _From, State) ->
- {reply, State#state.waiting, State}.
-
-handle_cast({push, DbName, Node}, #state{count=Count, limit=Limit} = State)
- when Count >= Limit ->
- {noreply, add_to_queue(State, DbName, Node)};
-
-handle_cast({push, DbName, Node}, State) ->
- #state{active = L, count = C} = State,
- case is_running(DbName, Node, L) of
- true ->
- {noreply, add_to_queue(State, DbName, Node)};
- false ->
- Pid = start_push_replication(DbName, Node),
- {noreply, State#state{active=[{DbName, Node, Pid}|L], count=C+1}}
- end;
-
-handle_cast({remove_node, Node}, State) ->
- Waiting = [{S,N} || {S,N} <- State#state.waiting, N =/= Node],
- Dict = lists:foldl(fun(DbName,D) -> dict:erase({DbName,Node}, D) end,
- State#state.dict, [S || {S,N} <- Waiting, N =:= Node]),
- {noreply, State#state{dict = Dict, waiting = Waiting}}.
-
-handle_info({'EXIT', Pid, _}, #state{update_notifier=Pid} = State) ->
- {ok, NewPid} = start_update_notifier(),
- {noreply, State#state{update_notifier=NewPid}};
-
-handle_info({'EXIT', Active, normal}, State) ->
- handle_replication_exit(State, Active);
-
-handle_info({'EXIT', Active, Reason}, State) ->
- case lists:keyfind(Active, 3, State#state.active) of
- {OldDbName, OldNode, _} ->
- ?LOG_ERROR("~p replication ~s -> ~p died:~n~p", [?MODULE, OldDbName,
- OldNode, Reason]),
- timer:apply_after(5000, ?MODULE, push, [OldDbName, OldNode]);
- false -> ok end,
- handle_replication_exit(State, Active);
-
-handle_info(Msg, State) ->
- ?LOG_ERROR("unexpected msg at replication manager ~p", [Msg]),
- {noreply, State}.
-
-terminate(_Reason, State) ->
- [exit(Pid, shutdown) || {_,_,Pid} <- State#state.active],
- ok.
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-handle_replication_exit(#state{waiting=[]} = State, Pid) ->
- NewActive = lists:keydelete(Pid, 3, State#state.active),
- {noreply, State#state{active=NewActive, count=length(NewActive)}};
-handle_replication_exit(State, Pid) ->
- #state{active=Active, limit=Limit, dict=D, waiting=Waiting} = State,
- Active1 = lists:keydelete(Pid, 3, Active),
- Count = length(Active1),
- NewState = if Count < Limit ->
- case next_replication(Active1, Waiting) of
- nil -> % all waiting replications are also active
- State#state{active = Active1, count = Count};
- {DbName, Node, StillWaiting} ->
- NewPid = start_push_replication(DbName, Node),
- State#state{
- active = [{DbName, Node, NewPid} | Active1],
- count = Count+1,
- dict = dict:erase({DbName,Node}, D),
- waiting = StillWaiting
- }
- end;
- true ->
- State#state{active = Active1, count=Count}
- end,
- {noreply, NewState}.
-
-start_push_replication(DbName, Node) ->
- PostBody = {[
- {<<"source">>, DbName},
- {<<"target">>, {[{<<"node">>, Node}, {<<"name">>, DbName}]}},
- {<<"continuous">>, false},
- {<<"async">>, true}
- ]},
- ?LOG_INFO("starting ~s -> ~p internal replication", [DbName, Node]),
- UserCtx = #user_ctx{name = <<"replicator">>, roles = [<<"_admin">>]},
- case (catch couch_rep:replicate(PostBody, UserCtx)) of
- Pid when is_pid(Pid) ->
- link(Pid),
- Pid;
- {db_not_found, _Msg} ->
- case couch_db:open(DbName, []) of
- {ok, Db} ->
- % source exists, let's (re)create the target
- couch_db:close(Db),
- case rpc:call(Node, couch_api, create_db, [DbName, []]) of
- {ok, Target} ->
- ?LOG_INFO("~p successfully created ~s on ~p", [?MODULE, DbName,
- Node]),
- couch_db:close(Target),
- start_push_replication(DbName, Node);
- file_exists ->
- start_push_replication(DbName, Node);
- Error ->
- ?LOG_ERROR("~p couldn't create ~s on ~p because ~p",
- [?MODULE, DbName, Node, Error]),
- exit(shutdown)
- end;
- {not_found, no_db_file} ->
- % source is gone, so this is a hack to skip it
- ?LOG_INFO("~p tried to push ~s to ~p but it was already deleted",
- [?MODULE, DbName, Node]),
- spawn_link(fun() -> ok end)
- end;
- {node_not_connected, _} ->
- % we'll get this one when the node rejoins
- ?LOG_ERROR("~p exiting because ~p is not connected", [?MODULE, Node]),
- spawn_link(fun() -> ok end);
- CatchAll ->
- ?LOG_INFO("~p strange error ~p", [?MODULE, CatchAll]),
- case lists:member(Node, nodes()) of
- true ->
- timer:apply_after(5000, ?MODULE, push, [DbName, Node]);
- false ->
- ok
- end,
- spawn_link(fun() -> ok end)
- end.
-
-add_to_queue(State, DbName, Node) ->
- #state{dict=D, waiting=Waiting} = State,
- case dict:is_key({DbName, Node}, D) of
- true ->
- State;
- false ->
- ?LOG_DEBUG("adding ~s -> ~p to internal queue", [DbName, Node]),
- State#state{
- dict = dict:store({DbName,Node}, ok, D),
- waiting = Waiting ++ [{DbName,Node}]
- }
- end.
-
-initial_sync() ->
- Db1 = ?l2b(couch_config:get("mem3", "node_db", "nodes")),
- Db2 = ?l2b(couch_config:get("mem3", "shard_db", "dbs")),
- Nodes = mem3:nodes(),
- Live = nodes(),
- [[push(Db, N) || Db <- [Db1,Db2]] || N <- Nodes, lists:member(N, Live)].
-
-start_update_notifier() ->
- Db1 = ?l2b(couch_config:get("mem3", "node_db", "nodes")),
- Db2 = ?l2b(couch_config:get("mem3", "shard_db", "dbs")),
- couch_db_update_notifier:start_link(fun
- ({updated, Db}) when Db == Db1; Db == Db2 ->
- Nodes = mem3:nodes(),
- Live = nodes(),
- [?MODULE:push(Db, N) || N <- Nodes, lists:member(N, Live)];
- (_) -> ok end).
-
-%% @doc Finds the next {DbName,Node} pair in the list of waiting replications
-%% which does not correspond to an already running replication
--spec next_replication(list(), list()) -> {binary(),node(),list()} | nil.
-next_replication(Active, Waiting) ->
- case lists:splitwith(fun({S,N}) -> is_running(S,N,Active) end, Waiting) of
- {_, []} ->
- nil;
- {Running, [{DbName,Node}|Rest]} ->
- {DbName, Node, Running ++ Rest}
- end.
-
-is_running(DbName, Node, ActiveList) ->
- [] =/= [true || {S,N,_} <- ActiveList, S=:=DbName, N=:=Node].
diff --git a/apps/mem3/src/mem3_sync_event.erl b/apps/mem3/src/mem3_sync_event.erl
deleted file mode 100644
index 5ee93b7a..00000000
--- a/apps/mem3/src/mem3_sync_event.erl
+++ /dev/null
@@ -1,58 +0,0 @@
-% Copyright 2010 Cloudant
-%
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(mem3_sync_event).
--behaviour(gen_event).
-
--export([init/1, handle_event/2, handle_call/2, handle_info/2, terminate/2,
- code_change/3]).
-
-init(_) ->
- {ok, nil}.
-
-handle_event({add_node, Node}, State) ->
- Db1 = list_to_binary(couch_config:get("mem3", "node_db", "nodes")),
- Db2 = list_to_binary(couch_config:get("mem3", "shard_db", "dbs")),
- [mem3_sync:push(Db, Node) || Db <- [Db1, Db2]],
- {ok, State};
-
-handle_event({nodeup, Node}, State) ->
- case lists:member(Node, mem3:nodes()) of
- true ->
- Db1 = list_to_binary(couch_config:get("mem3", "node_db", "nodes")),
- Db2 = list_to_binary(couch_config:get("mem3", "shard_db", "dbs")),
- [mem3_sync:push(Db, Node) || Db <- [Db1, Db2]];
- false ->
- ok
- end,
- {ok, State};
-
-handle_event({Down, Node}, State) when Down == nodedown; Down == remove_node ->
- mem3_sync:remove_node(Node),
- {ok, State};
-
-handle_event(_Event, State) ->
- {ok, State}.
-
-handle_call(_Request, State) ->
- {ok, ok, State}.
-
-handle_info(_Info, State) ->
- {ok, State}.
-
-terminate(_Reason, _State) ->
- ok.
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
diff --git a/apps/mem3/src/mem3_util.erl b/apps/mem3/src/mem3_util.erl
deleted file mode 100644
index 5e866ccf..00000000
--- a/apps/mem3/src/mem3_util.erl
+++ /dev/null
@@ -1,153 +0,0 @@
-% Copyright 2010 Cloudant
-%
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(mem3_util).
-
--export([hash/1, name_shard/1, create_partition_map/4, build_shards/2,
- n_val/2, to_atom/1, to_integer/1, write_db_doc/1, delete_db_doc/1,
- load_shards_from_disk/1, load_shards_from_disk/2]).
-
--define(RINGTOP, 2 bsl 31). % CRC32 space
-
--include("mem3.hrl").
--include_lib("couch/include/couch_db.hrl").
-
-hash(Item) when is_binary(Item) ->
- erlang:crc32(Item);
-hash(Item) ->
- erlang:crc32(term_to_binary(Item)).
-
-name_shard(#shard{dbname = DbName, range=[B,E]} = Shard) ->
- Name = ["shards/", couch_util:to_hex(<<B:32/integer>>), "-",
- couch_util:to_hex(<<E:32/integer>>), "/", DbName],
- Shard#shard{name = ?l2b(Name)}.
-
-create_partition_map(DbName, N, Q, Nodes) ->
- UniqueShards = make_key_ranges((?RINGTOP) div Q, 0, []),
- Shards0 = lists:flatten([lists:duplicate(N, S) || S <- UniqueShards]),
- Shards1 = attach_nodes(Shards0, [], Nodes, []),
- [name_shard(S#shard{dbname=DbName}) || S <- Shards1].
-
-make_key_ranges(_, CurrentPos, Acc) when CurrentPos >= ?RINGTOP ->
- Acc;
-make_key_ranges(Increment, Start, Acc) ->
- case Start + 2*Increment of
- X when X > ?RINGTOP ->
- End = ?RINGTOP - 1;
- _ ->
- End = Start + Increment - 1
- end,
- make_key_ranges(Increment, End+1, [#shard{range=[Start, End]} | Acc]).
-
-attach_nodes([], Acc, _, _) ->
- lists:reverse(Acc);
-attach_nodes(Shards, Acc, [], UsedNodes) ->
- attach_nodes(Shards, Acc, lists:reverse(UsedNodes), []);
-attach_nodes([S | Rest], Acc, [Node | Nodes], UsedNodes) ->
- attach_nodes(Rest, [S#shard{node=Node} | Acc], Nodes, [Node | UsedNodes]).
-
-write_db_doc(Doc) ->
- {ok, Db} = couch_db:open(<<"dbs">>, []),
- try
- update_db_doc(Db, Doc)
- catch conflict ->
- ok % assume this is a race with another shard on this node
- after
- couch_db:close(Db)
- end.
-
-update_db_doc(Db, #doc{id=Id, body=Body} = Doc) ->
- case couch_db:open_doc(Db, Id, []) of
- {not_found, _} ->
- {ok, _} = couch_db:update_doc(Db, Doc, []);
- {ok, #doc{body=Body}} ->
- ok;
- {ok, OldDoc} ->
- {ok, _} = couch_db:update_doc(Db, OldDoc#doc{body=Body}, [])
- end.
-
-delete_db_doc(DocId) ->
- {ok, Db} = couch_db:open(<<"dbs">>, []),
- try
- delete_db_doc(Db, DocId)
- catch conflict ->
- ok
- after
- couch_db:close(Db)
- end.
-
-delete_db_doc(Db, DocId) ->
- case couch_db:open_doc(Db, DocId, []) of
- {not_found, _} ->
- ok;
- {ok, OldDoc} ->
- {ok, _} = couch_db:update_doc(Db, OldDoc#doc{deleted=true}, [])
- end.
-
-build_shards(DbName, DocProps) ->
- {ByNode} = couch_util:get_value(<<"by_node">>, DocProps, {[]}),
- lists:flatmap(fun({Node, Ranges}) ->
- lists:map(fun(Range) ->
- [B,E] = string:tokens(?b2l(Range), "-"),
- Beg = httpd_util:hexlist_to_integer(B),
- End = httpd_util:hexlist_to_integer(E),
- name_shard(#shard{
- dbname = DbName,
- node = to_atom(Node),
- range = [Beg, End]
- })
- end, Ranges)
- end, ByNode).
-
-to_atom(Node) when is_binary(Node) ->
- list_to_atom(binary_to_list(Node));
-to_atom(Node) when is_atom(Node) ->
- Node.
-
-to_integer(N) when is_integer(N) ->
- N;
-to_integer(N) when is_binary(N) ->
- list_to_integer(binary_to_list(N));
-to_integer(N) when is_list(N) ->
- list_to_integer(N).
-
-n_val(undefined, NodeCount) ->
- n_val(couch_config:get("cluster", "n", "3"), NodeCount);
-n_val(N, NodeCount) when is_list(N) ->
- n_val(list_to_integer(N), NodeCount);
-n_val(N, NodeCount) when is_integer(NodeCount), N > NodeCount ->
- ?LOG_ERROR("Request to create N=~p DB but only ~p node(s)", [N, NodeCount]),
- NodeCount;
-n_val(N, _) when N < 1 ->
- 1;
-n_val(N, _) ->
- N.
-
-load_shards_from_disk(DbName) when is_binary(DbName) ->
- {ok, Db} = couch_db:open(<<"dbs">>, []),
- try load_shards_from_db(Db, DbName) after couch_db:close(Db) end.
-
-load_shards_from_db(#db{} = ShardDb, DbName) ->
- case couch_db:open_doc(ShardDb, DbName, []) of
- {ok, #doc{body = {Props}}} ->
- ?LOG_INFO("dbs cache miss for ~s", [DbName]),
- build_shards(DbName, Props);
- {not_found, _} ->
- erlang:error(database_does_not_exist)
- end.
-
-load_shards_from_disk(DbName, DocId)->
- Shards = load_shards_from_disk(DbName),
- HashKey = hash(DocId),
- [S || #shard{range = [B,E]} = S <- Shards, B < HashKey, HashKey =< E].