diff options
Diffstat (limited to 'src/couchdb/couch_view.erl')
-rw-r--r-- | src/couchdb/couch_view.erl | 160 |
1 files changed, 86 insertions, 74 deletions
diff --git a/src/couchdb/couch_view.erl b/src/couchdb/couch_view.erl index 301ea8b7..87feea12 100644 --- a/src/couchdb/couch_view.erl +++ b/src/couchdb/couch_view.erl @@ -17,7 +17,7 @@ detuple_kvs/2,init/1,terminate/2,handle_call/3,handle_cast/2,handle_info/2, code_change/3,get_reduce_view/4,get_temp_reduce_view/5,get_temp_map_view/4, get_map_view/4,get_row_count/1,reduce_to_count/1,fold_reduce/7, - extract_map_view/1,get_group_server/2]). + extract_map_view/1,get_group_server/2,get_group_info/2,cleanup_index_files/1]). -include("couch_db.hrl"). @@ -28,18 +28,32 @@ start_link() -> gen_server:start_link({local, couch_view}, couch_view, [], []). -get_temp_updater(DbName, Type, DesignOptions, MapSrc, RedSrc) -> - {ok, Pid} = gen_server:call(couch_view, - {start_temp_updater, DbName, Type, DesignOptions, MapSrc, RedSrc}), - Pid. - -get_group_server(DbName, GroupId) -> - case gen_server:call(couch_view, {start_group_server, DbName, GroupId}) of +get_temp_updater(DbName, Language, DesignOptions, MapSrc, RedSrc) -> + % make temp group + % do we need to close this db? + {ok, _Db, Group} = + couch_view_group:open_temp_group(DbName, Language, DesignOptions, MapSrc, RedSrc), + case gen_server:call(couch_view, {get_group_server, DbName, Group}) of {ok, Pid} -> Pid; Error -> throw(Error) end. + +get_group_server(DbName, GroupId) -> + % get signature for group + case couch_view_group:open_db_group(DbName, GroupId) of + % do we need to close this db? + {ok, _Db, Group} -> + case gen_server:call(couch_view, {get_group_server, DbName, Group}) of + {ok, Pid} -> + Pid; + Error -> + throw(Error) + end; + Error -> + throw(Error) + end. get_group(Db, GroupId, Stale) -> MinUpdateSeq = case Stale of @@ -50,18 +64,52 @@ get_group(Db, GroupId, Stale) -> get_group_server(couch_db:name(Db), GroupId), MinUpdateSeq). - -get_temp_group(Db, Type, DesignOptions, MapSrc, RedSrc) -> +get_temp_group(Db, Language, DesignOptions, MapSrc, RedSrc) -> couch_view_group:request_group( - get_temp_updater(couch_db:name(Db), Type, DesignOptions, MapSrc, RedSrc), - couch_db:get_update_seq(Db)). + get_temp_updater(couch_db:name(Db), Language, DesignOptions, MapSrc, RedSrc), + couch_db:get_update_seq(Db)). + +get_group_info(Db, GroupId) -> + couch_view_group:request_group_info( + get_group_server(couch_db:name(Db), GroupId)). + +cleanup_index_files(Db) -> + % load all ddocs + {ok, DesignDocs} = couch_db:get_design_docs(Db), + + % make unique list of group sigs + Sigs = lists:map(fun(#doc{id = GroupId} = DDoc) -> + {ok, Info} = get_group_info(Db, GroupId), + ?b2l(proplists:get_value(signature, Info)) + end, [DD||DD <- DesignDocs, DD#doc.deleted == false]), + + FileList = list_index_files(Db), + + % regex that matches all ddocs + RegExp = "("++ string:join(Sigs, "|") ++")", + + % filter out the ones in use + DeleteFiles = lists:filter(fun(FilePath) -> + regexp:first_match(FilePath, RegExp)==nomatch + end, FileList), + % delete unused files + ?LOG_DEBUG("deleting unused view index files: ~p",[DeleteFiles]), + [file:delete(File)||File <- DeleteFiles], + ok. + +list_index_files(Db) -> + % call server to fetch the index files + RootDir = couch_config:get("couchdb", "view_index_dir"), + Files = filelib:wildcard(RootDir ++ "/." ++ ?b2l(couch_db:name(Db)) ++ "_design"++"/*"). + get_row_count(#view{btree=Bt}) -> {ok, {Count, _Reds}} = couch_btree:full_reduce(Bt), {ok, Count}. -get_temp_reduce_view(Db, Type, DesignOptions, MapSrc, RedSrc) -> - {ok, #group{views=[View]}=Group} = get_temp_group(Db, Type, DesignOptions, MapSrc, RedSrc), +get_temp_reduce_view(Db, Language, DesignOptions, MapSrc, RedSrc) -> + {ok, #group{views=[View]}=Group} = + get_temp_group(Db, Language, DesignOptions, MapSrc, RedSrc), {ok, {temp_reduce, View}, Group}. @@ -141,8 +189,8 @@ get_key_pos(Key, [_|Rest], N) -> get_key_pos(Key, Rest, N+1). -get_temp_map_view(Db, Type, DesignOptions, Src) -> - {ok, #group{views=[View]}=Group} = get_temp_group(Db, Type, DesignOptions, Src, []), +get_temp_map_view(Db, Language, DesignOptions, Src) -> + {ok, #group{views=[View]}=Group} = get_temp_group(Db, Language, DesignOptions, Src, []), {ok, View, Group}. get_map_view(Db, GroupId, Name, Stale) -> @@ -220,9 +268,8 @@ init([]) -> ok end), ets:new(couch_groups_by_db, [bag, private, named_table]), - ets:new(group_servers_by_name, [set, protected, named_table]), + ets:new(group_servers_by_sig, [set, protected, named_table]), ets:new(couch_groups_by_updater, [set, private, named_table]), - ets:new(couch_temp_group_fd_by_db, [set, protected, named_table]), process_flag(trap_exit, true), {ok, #server{root_dir=RootDir}}. @@ -232,37 +279,15 @@ terminate(Reason, _Srv) -> ok. -handle_call({start_temp_updater, DbName, Lang, DesignOptions, MapSrc, RedSrc}, - _From, #server{root_dir=Root}=Server) -> - <<SigInt:128/integer>> = erlang:md5(term_to_binary({Lang, DesignOptions, MapSrc, RedSrc})), - Name = lists:flatten(io_lib:format("_temp_~.36B",[SigInt])), - Pid = - case ets:lookup(group_servers_by_name, {DbName, Name}) of +handle_call({get_group_server, DbName, + #group{name=GroupId,sig=Sig}=Group}, _From, #server{root_dir=Root}=Server) -> + case ets:lookup(group_servers_by_sig, {DbName, Sig}) of [] -> - case ets:lookup(couch_temp_group_fd_by_db, DbName) of - [] -> - FileName = Root ++ "/." ++ binary_to_list(DbName) ++ "_temp", - {ok, Fd} = couch_file:open(FileName, [create, overwrite]), - Count = 0; - [{_, Fd, Count}] -> - ok - end, - ?LOG_DEBUG("Spawning new temp update process for db ~s.", [DbName]), - {ok, NewPid} = couch_view_group:start_link({slow_view, DbName, Fd, Lang, DesignOptions, MapSrc, RedSrc}), - true = ets:insert(couch_temp_group_fd_by_db, {DbName, Fd, Count + 1}), - add_to_ets(NewPid, DbName, Name), - NewPid; - [{_, ExistingPid0}] -> - ExistingPid0 - end, - {reply, {ok, Pid}, Server}; -handle_call({start_group_server, DbName, GroupId}, _From, #server{root_dir=Root}=Server) -> - case ets:lookup(group_servers_by_name, {DbName, GroupId}) of - [] -> - ?LOG_DEBUG("Spawning new group server for view group ~s in database ~s.", [GroupId, DbName]), - case (catch couch_view_group:start_link({view, Root, DbName, GroupId})) of + ?LOG_DEBUG("Spawning new group server for view group ~s in database ~s.", + [GroupId, DbName]), + case (catch couch_view_group:start_link({Root, DbName, Group})) of {ok, NewPid} -> - add_to_ets(NewPid, DbName, GroupId), + add_to_ets(NewPid, DbName, Sig), {reply, {ok, NewPid}, Server}; Error -> {reply, Error, Server} @@ -272,22 +297,22 @@ handle_call({start_group_server, DbName, GroupId}, _From, #server{root_dir=Root} end. handle_cast({reset_indexes, DbName}, #server{root_dir=Root}=Server) -> - % shutdown all the updaters + % shutdown all the updaters and clear the files, the db got changed Names = ets:lookup(couch_groups_by_db, DbName), lists:foreach( - fun({_DbName, GroupId}) -> - ?LOG_DEBUG("Killing update process for view group ~s. in database ~s.", [GroupId, DbName]), - [{_, Pid}] = ets:lookup(group_servers_by_name, {DbName, GroupId}), + fun({_DbName, Sig}) -> + ?LOG_DEBUG("Killing update process for view group ~s. in database ~s.", [Sig, DbName]), + [{_, Pid}] = ets:lookup(group_servers_by_sig, {DbName, Sig}), exit(Pid, kill), receive {'EXIT', Pid, _} -> - delete_from_ets(Pid, DbName, GroupId) + delete_from_ets(Pid, DbName, Sig) end end, Names), delete_index_dir(Root, DbName), file:delete(Root ++ "/." ++ binary_to_list(DbName) ++ "_temp"), {noreply, Server}. -handle_info({'EXIT', FromPid, Reason}, #server{root_dir=RootDir}=Server) -> +handle_info({'EXIT', FromPid, Reason}, Server) -> case ets:lookup(couch_groups_by_updater, FromPid) of [] -> if Reason /= normal -> @@ -296,40 +321,27 @@ handle_info({'EXIT', FromPid, Reason}, #server{root_dir=RootDir}=Server) -> exit(Reason); true -> ok end; - [{_, {DbName, "_temp_" ++ _ = GroupId}}] -> - delete_from_ets(FromPid, DbName, GroupId), - [{_, Fd, Count}] = ets:lookup(couch_temp_group_fd_by_db, DbName), - case Count of - 1 -> % Last ref - couch_file:close(Fd), - file:delete(RootDir ++ "/." ++ binary_to_list(DbName) ++ "_temp"), - true = ets:delete(couch_temp_group_fd_by_db, DbName); - _ -> - true = ets:insert(couch_temp_group_fd_by_db, {DbName, Fd, Count - 1}) - end; [{_, {DbName, GroupId}}] -> delete_from_ets(FromPid, DbName, GroupId) end, {noreply, Server}. -add_to_ets(Pid, DbName, GroupId) -> - true = ets:insert(couch_groups_by_updater, {Pid, {DbName, GroupId}}), - true = ets:insert(group_servers_by_name, {{DbName, GroupId}, Pid}), - true = ets:insert(couch_groups_by_db, {DbName, GroupId}). +add_to_ets(Pid, DbName, Sig) -> + true = ets:insert(couch_groups_by_updater, {Pid, {DbName, Sig}}), + true = ets:insert(group_servers_by_sig, {{DbName, Sig}, Pid}), + true = ets:insert(couch_groups_by_db, {DbName, Sig}). -delete_from_ets(Pid, DbName, GroupId) -> +delete_from_ets(Pid, DbName, Sig) -> true = ets:delete(couch_groups_by_updater, Pid), - true = ets:delete(group_servers_by_name, {DbName, GroupId}), - true = ets:delete_object(couch_groups_by_db, {DbName, GroupId}). + true = ets:delete(group_servers_by_sig, {DbName, Sig}), + true = ets:delete_object(couch_groups_by_db, {DbName, Sig}). code_change(_OldVsn, State, _Extra) -> {ok, State}. - - delete_index_dir(RootDir, DbName) -> - nuke_dir(RootDir ++ "/." ++ binary_to_list(DbName) ++ "_design"). + nuke_dir(RootDir ++ "/." ++ ?b2l(DbName) ++ "_design"). nuke_dir(Dir) -> case file:list_dir(Dir) of |