rewrite mem3_cache to use continuous _changes feed
Diffstat (limited to 'src/mem3_cache.erl')
1 files changed, 71 insertions, 61 deletions
diff --git a/src/mem3_cache.erl b/src/mem3_cache.erl
index 8f5c372a..532a023a 100644
--- a/src/mem3_cache.erl
+++ b/src/mem3_cache.erl
@@ -5,87 +5,97 @@
+-record(state, {changes_pid}).
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
init([]) ->
- ets:new(partitions, [bag, protected, named_table, {keypos,#shard.dbname}]),
- ets:new(memnodes, [bag, protected, named_table]),
- cache_dbs(),
- Self = self(),
- couch_db_update_notifier:start_link(fun({updated, <<"dbs">>}) ->
- Self ! rebuild_dbs_cache;
- (_) -> ok end),
- {ok, nil}.
+ ets:new(partitions, [bag, public, named_table, {keypos,#shard.dbname}]),
+ {Pid, _} = spawn_monitor(fun() -> listen_for_changes(0) end),
+ {ok, #state{changes_pid = Pid}}.
-handle_call(_Msg, _From, State) ->
- {reply, ok, State}.
+handle_call(_Call, _From, State) ->
+ {noreply, State}.
handle_cast(_Msg, State) ->
{noreply, State}.
-handle_info(rebuild_dbs_cache, State) ->
- receive rebuild_dbs_cache ->
- handle_info(rebuild_dbs_cache, State)
- after 0 -> ok end,
- T0 = now(),
- ?LOG_INFO("rebuilding dbs DB cache", []),
- ets:delete_all_objects(partitions),
- ets:delete_all_objects(memnodes),
- cache_dbs(),
- ?LOG_INFO("rebuild of dbs DB cache complete in ~p ms",
- [round(timer:now_diff(now(),T0)/1000)]),
+handle_info({'DOWN', _, _, Pid, Reason}, #state{changes_pid=Pid} = State) ->
+ ?LOG_INFO("~p changes listener died ~p", [?MODULE, Reason]),
+ Seq = case Reason of {seq, EndSeq} -> EndSeq; _ -> 0 end,
+ timer:send_after(5000, {start_listener, Seq}),
+ {noreply, State};
+handle_info({start_listener, Seq}, State) ->
+ {NewPid, _} = spawn_monitor(fun() -> listen_for_changes(Seq) end),
+ {noreply, State#state{changes_pid=NewPid}};
+handle_info(_Msg, State) ->
{noreply, State}.
-terminate(_Reason, _State) ->
+terminate(_Reason, #state{changes_pid=Pid}) ->
+ exit(Pid, kill),
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-cache_dbs() ->
- try couch_db:open(<<"dbs">>, []) of
+%% internal functions
+listen_for_changes(Since) ->
+ DbName = ?l2b(couch_config:get("mem3", "db", "dbs")),
+ {ok, Db} = ensure_exists(DbName),
+ Args = #changes_args{
+ feed = "continuous",
+ since = Since,
+ heartbeat = true,
+ include_docs = true
+ },
+ ChangesFun = couch_changes:handle_changes(Args, nil, Db),
+ ChangesFun(fun changes_callback/2).
+ensure_exists(DbName) ->
+ Options = [{user_ctx, #user_ctx{roles=[<<"_admin">>]}}],
+ case couch_db:open(DbName, Options) of
{ok, Db} ->
- Bt = Db#db.id_tree,
- FoldFun = fun(#full_doc_info{id=Id, deleted=false} = FullDocInfo, _, _) ->
- {ok, Doc} = couch_db:open_doc_int(Db, FullDocInfo, []),
- {Props} = couch_doc:to_json_obj(Doc, []),
- cache_map(Id, Props),
- cache_nodes(Id, Props),
- {ok, true};
- (_, _, _) ->
- {ok, nil}
- end,
- couch_btree:foldl(Bt, FoldFun, nil),
- couch_db:close(Db)
- catch exit:{noproc,{gen_server,call,[couch_server|_]}} ->
- timer:sleep(1000),
- exit(couch_server_is_dead)
+ {ok, Db};
+ _ ->
+ couch_server:create(DbName, Options)
-cache_map(Id, Props) ->
- Map = couch_util:get_value(<<"map">>, Props, []),
- lists:foreach(fun({[{<<"node">>,Node},{<<"b">>,Beg},{<<"e">>,End}]}) ->
- Part = #shard{
- name = partitions:shard_name(Beg, Id),
- dbname = Id,
- node = to_atom(Node),
- range = [Beg,End]
- },
- ets:insert(partitions, Part)
- end, Map).
-cache_nodes(Id, Props) ->
- Nodes = couch_util:get_value(<<"nodes">>, Props, []),
- lists:foreach(fun({[{<<"order">>,Order},{<<"node">>, Node},{<<"options">>,Opts}]}) ->
- ets:insert(memnodes, {Id, {Order, to_atom(Node), Opts}})
- end, Nodes).
+changes_callback(start, _) ->
+ {ok, nil};
+changes_callback({stop, EndSeq}, _) ->
+ exit({seq, EndSeq});
+changes_callback({change, {Change}, _}, _) ->
+ DbName = couch_util:get_value(id, Change),
+ case couch_util:get_value(deleted, Change, false) of
+ true ->
+ ets:delete(partitions, DbName);
+ false ->
+ case couch_util:get_value(doc, Change) of
+ {error, Reason} ->
+ ?LOG_ERROR("missing partition table for ~s: ~p", [DbName, Reason]);
+ {Doc} ->
+ ets:delete(partitions, DbName),
+ cache_partition_table(DbName, Doc)
+ end
+ end,
+ {ok, couch_util:get_value(seq, Change)};
+changes_callback(timeout, _) ->
+ {ok, nil}.
+cache_partition_table(DbName, Doc) ->
+ ets:insert(partitions, lists:map(fun({Map}) ->
+ Begin = couch_util:get_value(<<"b">>, Map),
+ #shard{
+ name = mem3_util:shard_name(Begin, DbName),
+ dbname = DbName,
+ node = to_atom(couch_util:get_value(<<"node">>, Map)),
+ range = [Begin, couch_util:get_value(<<"e">>, Map)]
+ }
+ end, couch_util:get_value(<<"map">>, Doc, {[]}))).
to_atom(Node) when is_binary(Node) ->
- list_to_atom(binary_to_list(Node));
-to_atom(Node) when is_atom(Node) ->
- Node.
-%{ok, ets:insert(dbs_cache, {Id, Props})};
+ list_to_atom(binary_to_list(Node)).