diff options
author | Adam Kocoloski <adam@cloudant.com> | 2010-07-01 11:15:04 -0400 |
---|---|---|
committer | Adam Kocoloski <adam@cloudant.com> | 2010-08-12 01:23:12 -0400 |
commit | 68a6934ae52b5876054a525411ef69523b6b9a03 (patch) | |
tree | 9e34f1143085ff28076445c3d5a02d2b62fba018 | |
parent | 217fcd205cc3c29ceac4d28763ba74e6adecf1f3 (diff) |
rewrite mem3_cache to use continuous _changes feed
-rw-r--r-- | src/mem3_cache.erl | 132 |
1 files changed, 71 insertions, 61 deletions
diff --git a/src/mem3_cache.erl b/src/mem3_cache.erl index 8f5c372a..532a023a 100644 --- a/src/mem3_cache.erl +++ b/src/mem3_cache.erl @@ -5,87 +5,97 @@ -export([start_link/0]). +-record(state, {changes_pid}). + -include("mem3.hrl"). start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). init([]) -> - ets:new(partitions, [bag, protected, named_table, {keypos,#shard.dbname}]), - ets:new(memnodes, [bag, protected, named_table]), - cache_dbs(), - Self = self(), - couch_db_update_notifier:start_link(fun({updated, <<"dbs">>}) -> - Self ! rebuild_dbs_cache; - (_) -> ok end), - {ok, nil}. + ets:new(partitions, [bag, public, named_table, {keypos,#shard.dbname}]), + {Pid, _} = spawn_monitor(fun() -> listen_for_changes(0) end), + {ok, #state{changes_pid = Pid}}. -handle_call(_Msg, _From, State) -> - {reply, ok, State}. +handle_call(_Call, _From, State) -> + {noreply, State}. handle_cast(_Msg, State) -> {noreply, State}. -handle_info(rebuild_dbs_cache, State) -> - receive rebuild_dbs_cache -> - handle_info(rebuild_dbs_cache, State) - after 0 -> ok end, - T0 = now(), - ?LOG_INFO("rebuilding dbs DB cache", []), - ets:delete_all_objects(partitions), - ets:delete_all_objects(memnodes), - cache_dbs(), - ?LOG_INFO("rebuild of dbs DB cache complete in ~p ms", - [round(timer:now_diff(now(),T0)/1000)]), +handle_info({'DOWN', _, _, Pid, Reason}, #state{changes_pid=Pid} = State) -> + ?LOG_INFO("~p changes listener died ~p", [?MODULE, Reason]), + Seq = case Reason of {seq, EndSeq} -> EndSeq; _ -> 0 end, + timer:send_after(5000, {start_listener, Seq}), + {noreply, State}; +handle_info({start_listener, Seq}, State) -> + {NewPid, _} = spawn_monitor(fun() -> listen_for_changes(Seq) end), + {noreply, State#state{changes_pid=NewPid}}; +handle_info(_Msg, State) -> {noreply, State}. -terminate(_Reason, _State) -> +terminate(_Reason, #state{changes_pid=Pid}) -> + exit(Pid, kill), ok. code_change(_OldVsn, State, _Extra) -> {ok, State}. -cache_dbs() -> - try couch_db:open(<<"dbs">>, []) of +%% internal functions + +listen_for_changes(Since) -> + DbName = ?l2b(couch_config:get("mem3", "db", "dbs")), + {ok, Db} = ensure_exists(DbName), + Args = #changes_args{ + feed = "continuous", + since = Since, + heartbeat = true, + include_docs = true + }, + ChangesFun = couch_changes:handle_changes(Args, nil, Db), + ChangesFun(fun changes_callback/2). + +ensure_exists(DbName) -> + Options = [{user_ctx, #user_ctx{roles=[<<"_admin">>]}}], + case couch_db:open(DbName, Options) of {ok, Db} -> - Bt = Db#db.id_tree, - FoldFun = fun(#full_doc_info{id=Id, deleted=false} = FullDocInfo, _, _) -> - {ok, Doc} = couch_db:open_doc_int(Db, FullDocInfo, []), - {Props} = couch_doc:to_json_obj(Doc, []), - cache_map(Id, Props), - cache_nodes(Id, Props), - {ok, true}; - (_, _, _) -> - {ok, nil} - end, - couch_btree:foldl(Bt, FoldFun, nil), - couch_db:close(Db) - catch exit:{noproc,{gen_server,call,[couch_server|_]}} -> - timer:sleep(1000), - exit(couch_server_is_dead) + {ok, Db}; + _ -> + couch_server:create(DbName, Options) end. -cache_map(Id, Props) -> - Map = couch_util:get_value(<<"map">>, Props, []), - lists:foreach(fun({[{<<"node">>,Node},{<<"b">>,Beg},{<<"e">>,End}]}) -> - Part = #shard{ - name = partitions:shard_name(Beg, Id), - dbname = Id, - node = to_atom(Node), - range = [Beg,End] - }, - ets:insert(partitions, Part) - end, Map). - -cache_nodes(Id, Props) -> - Nodes = couch_util:get_value(<<"nodes">>, Props, []), - lists:foreach(fun({[{<<"order">>,Order},{<<"node">>, Node},{<<"options">>,Opts}]}) -> - ets:insert(memnodes, {Id, {Order, to_atom(Node), Opts}}) - end, Nodes). +changes_callback(start, _) -> + {ok, nil}; +changes_callback({stop, EndSeq}, _) -> + exit({seq, EndSeq}); +changes_callback({change, {Change}, _}, _) -> + DbName = couch_util:get_value(id, Change), + case couch_util:get_value(deleted, Change, false) of + true -> + ets:delete(partitions, DbName); + false -> + case couch_util:get_value(doc, Change) of + {error, Reason} -> + ?LOG_ERROR("missing partition table for ~s: ~p", [DbName, Reason]); + {Doc} -> + ets:delete(partitions, DbName), + cache_partition_table(DbName, Doc) + end + end, + {ok, couch_util:get_value(seq, Change)}; +changes_callback(timeout, _) -> + {ok, nil}. + +cache_partition_table(DbName, Doc) -> + ets:insert(partitions, lists:map(fun({Map}) -> + Begin = couch_util:get_value(<<"b">>, Map), + #shard{ + name = mem3_util:shard_name(Begin, DbName), + dbname = DbName, + node = to_atom(couch_util:get_value(<<"node">>, Map)), + range = [Begin, couch_util:get_value(<<"e">>, Map)] + } + end, couch_util:get_value(<<"map">>, Doc, {[]}))). to_atom(Node) when is_binary(Node) -> - list_to_atom(binary_to_list(Node)); -to_atom(Node) when is_atom(Node) -> - Node. - -%{ok, ets:insert(dbs_cache, {Id, Props})}; + list_to_atom(binary_to_list(Node)). |