summaryrefslogtreecommitdiff
path: root/src/couchdb/couch_view.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/couchdb/couch_view.erl')
-rw-r--r--src/couchdb/couch_view.erl160
1 files changed, 86 insertions, 74 deletions
diff --git a/src/couchdb/couch_view.erl b/src/couchdb/couch_view.erl
index 301ea8b7..87feea12 100644
--- a/src/couchdb/couch_view.erl
+++ b/src/couchdb/couch_view.erl
@@ -17,7 +17,7 @@
detuple_kvs/2,init/1,terminate/2,handle_call/3,handle_cast/2,handle_info/2,
code_change/3,get_reduce_view/4,get_temp_reduce_view/5,get_temp_map_view/4,
get_map_view/4,get_row_count/1,reduce_to_count/1,fold_reduce/7,
- extract_map_view/1,get_group_server/2]).
+ extract_map_view/1,get_group_server/2,get_group_info/2,cleanup_index_files/1]).
-include("couch_db.hrl").
@@ -28,18 +28,32 @@
start_link() ->
gen_server:start_link({local, couch_view}, couch_view, [], []).
-get_temp_updater(DbName, Type, DesignOptions, MapSrc, RedSrc) ->
- {ok, Pid} = gen_server:call(couch_view,
- {start_temp_updater, DbName, Type, DesignOptions, MapSrc, RedSrc}),
- Pid.
-
-get_group_server(DbName, GroupId) ->
- case gen_server:call(couch_view, {start_group_server, DbName, GroupId}) of
+get_temp_updater(DbName, Language, DesignOptions, MapSrc, RedSrc) ->
+ % make temp group
+ % do we need to close this db?
+ {ok, _Db, Group} =
+ couch_view_group:open_temp_group(DbName, Language, DesignOptions, MapSrc, RedSrc),
+ case gen_server:call(couch_view, {get_group_server, DbName, Group}) of
{ok, Pid} ->
Pid;
Error ->
throw(Error)
end.
+
+get_group_server(DbName, GroupId) ->
+ % get signature for group
+ case couch_view_group:open_db_group(DbName, GroupId) of
+ % do we need to close this db?
+ {ok, _Db, Group} ->
+ case gen_server:call(couch_view, {get_group_server, DbName, Group}) of
+ {ok, Pid} ->
+ Pid;
+ Error ->
+ throw(Error)
+ end;
+ Error ->
+ throw(Error)
+ end.
get_group(Db, GroupId, Stale) ->
MinUpdateSeq = case Stale of
@@ -50,18 +64,52 @@ get_group(Db, GroupId, Stale) ->
get_group_server(couch_db:name(Db), GroupId),
MinUpdateSeq).
-
-get_temp_group(Db, Type, DesignOptions, MapSrc, RedSrc) ->
+get_temp_group(Db, Language, DesignOptions, MapSrc, RedSrc) ->
couch_view_group:request_group(
- get_temp_updater(couch_db:name(Db), Type, DesignOptions, MapSrc, RedSrc),
- couch_db:get_update_seq(Db)).
+ get_temp_updater(couch_db:name(Db), Language, DesignOptions, MapSrc, RedSrc),
+ couch_db:get_update_seq(Db)).
+
+get_group_info(Db, GroupId) ->
+ couch_view_group:request_group_info(
+ get_group_server(couch_db:name(Db), GroupId)).
+
+cleanup_index_files(Db) ->
+ % load all ddocs
+ {ok, DesignDocs} = couch_db:get_design_docs(Db),
+
+ % make unique list of group sigs
+ Sigs = lists:map(fun(#doc{id = GroupId} = DDoc) ->
+ {ok, Info} = get_group_info(Db, GroupId),
+ ?b2l(proplists:get_value(signature, Info))
+ end, [DD||DD <- DesignDocs, DD#doc.deleted == false]),
+
+ FileList = list_index_files(Db),
+
+ % regex that matches all ddocs
+ RegExp = "("++ string:join(Sigs, "|") ++")",
+
+ % filter out the ones in use
+ DeleteFiles = lists:filter(fun(FilePath) ->
+ regexp:first_match(FilePath, RegExp)==nomatch
+ end, FileList),
+ % delete unused files
+ ?LOG_DEBUG("deleting unused view index files: ~p",[DeleteFiles]),
+ [file:delete(File)||File <- DeleteFiles],
+ ok.
+
+list_index_files(Db) ->
+ % call server to fetch the index files
+ RootDir = couch_config:get("couchdb", "view_index_dir"),
+ Files = filelib:wildcard(RootDir ++ "/." ++ ?b2l(couch_db:name(Db)) ++ "_design"++"/*").
+
get_row_count(#view{btree=Bt}) ->
{ok, {Count, _Reds}} = couch_btree:full_reduce(Bt),
{ok, Count}.
-get_temp_reduce_view(Db, Type, DesignOptions, MapSrc, RedSrc) ->
- {ok, #group{views=[View]}=Group} = get_temp_group(Db, Type, DesignOptions, MapSrc, RedSrc),
+get_temp_reduce_view(Db, Language, DesignOptions, MapSrc, RedSrc) ->
+ {ok, #group{views=[View]}=Group} =
+ get_temp_group(Db, Language, DesignOptions, MapSrc, RedSrc),
{ok, {temp_reduce, View}, Group}.
@@ -141,8 +189,8 @@ get_key_pos(Key, [_|Rest], N) ->
get_key_pos(Key, Rest, N+1).
-get_temp_map_view(Db, Type, DesignOptions, Src) ->
- {ok, #group{views=[View]}=Group} = get_temp_group(Db, Type, DesignOptions, Src, []),
+get_temp_map_view(Db, Language, DesignOptions, Src) ->
+ {ok, #group{views=[View]}=Group} = get_temp_group(Db, Language, DesignOptions, Src, []),
{ok, View, Group}.
get_map_view(Db, GroupId, Name, Stale) ->
@@ -220,9 +268,8 @@ init([]) ->
ok
end),
ets:new(couch_groups_by_db, [bag, private, named_table]),
- ets:new(group_servers_by_name, [set, protected, named_table]),
+ ets:new(group_servers_by_sig, [set, protected, named_table]),
ets:new(couch_groups_by_updater, [set, private, named_table]),
- ets:new(couch_temp_group_fd_by_db, [set, protected, named_table]),
process_flag(trap_exit, true),
{ok, #server{root_dir=RootDir}}.
@@ -232,37 +279,15 @@ terminate(Reason, _Srv) ->
ok.
-handle_call({start_temp_updater, DbName, Lang, DesignOptions, MapSrc, RedSrc},
- _From, #server{root_dir=Root}=Server) ->
- <<SigInt:128/integer>> = erlang:md5(term_to_binary({Lang, DesignOptions, MapSrc, RedSrc})),
- Name = lists:flatten(io_lib:format("_temp_~.36B",[SigInt])),
- Pid =
- case ets:lookup(group_servers_by_name, {DbName, Name}) of
+handle_call({get_group_server, DbName,
+ #group{name=GroupId,sig=Sig}=Group}, _From, #server{root_dir=Root}=Server) ->
+ case ets:lookup(group_servers_by_sig, {DbName, Sig}) of
[] ->
- case ets:lookup(couch_temp_group_fd_by_db, DbName) of
- [] ->
- FileName = Root ++ "/." ++ binary_to_list(DbName) ++ "_temp",
- {ok, Fd} = couch_file:open(FileName, [create, overwrite]),
- Count = 0;
- [{_, Fd, Count}] ->
- ok
- end,
- ?LOG_DEBUG("Spawning new temp update process for db ~s.", [DbName]),
- {ok, NewPid} = couch_view_group:start_link({slow_view, DbName, Fd, Lang, DesignOptions, MapSrc, RedSrc}),
- true = ets:insert(couch_temp_group_fd_by_db, {DbName, Fd, Count + 1}),
- add_to_ets(NewPid, DbName, Name),
- NewPid;
- [{_, ExistingPid0}] ->
- ExistingPid0
- end,
- {reply, {ok, Pid}, Server};
-handle_call({start_group_server, DbName, GroupId}, _From, #server{root_dir=Root}=Server) ->
- case ets:lookup(group_servers_by_name, {DbName, GroupId}) of
- [] ->
- ?LOG_DEBUG("Spawning new group server for view group ~s in database ~s.", [GroupId, DbName]),
- case (catch couch_view_group:start_link({view, Root, DbName, GroupId})) of
+ ?LOG_DEBUG("Spawning new group server for view group ~s in database ~s.",
+ [GroupId, DbName]),
+ case (catch couch_view_group:start_link({Root, DbName, Group})) of
{ok, NewPid} ->
- add_to_ets(NewPid, DbName, GroupId),
+ add_to_ets(NewPid, DbName, Sig),
{reply, {ok, NewPid}, Server};
Error ->
{reply, Error, Server}
@@ -272,22 +297,22 @@ handle_call({start_group_server, DbName, GroupId}, _From, #server{root_dir=Root}
end.
handle_cast({reset_indexes, DbName}, #server{root_dir=Root}=Server) ->
- % shutdown all the updaters
+ % shutdown all the updaters and clear the files, the db got changed
Names = ets:lookup(couch_groups_by_db, DbName),
lists:foreach(
- fun({_DbName, GroupId}) ->
- ?LOG_DEBUG("Killing update process for view group ~s. in database ~s.", [GroupId, DbName]),
- [{_, Pid}] = ets:lookup(group_servers_by_name, {DbName, GroupId}),
+ fun({_DbName, Sig}) ->
+ ?LOG_DEBUG("Killing update process for view group ~s. in database ~s.", [Sig, DbName]),
+ [{_, Pid}] = ets:lookup(group_servers_by_sig, {DbName, Sig}),
exit(Pid, kill),
receive {'EXIT', Pid, _} ->
- delete_from_ets(Pid, DbName, GroupId)
+ delete_from_ets(Pid, DbName, Sig)
end
end, Names),
delete_index_dir(Root, DbName),
file:delete(Root ++ "/." ++ binary_to_list(DbName) ++ "_temp"),
{noreply, Server}.
-handle_info({'EXIT', FromPid, Reason}, #server{root_dir=RootDir}=Server) ->
+handle_info({'EXIT', FromPid, Reason}, Server) ->
case ets:lookup(couch_groups_by_updater, FromPid) of
[] ->
if Reason /= normal ->
@@ -296,40 +321,27 @@ handle_info({'EXIT', FromPid, Reason}, #server{root_dir=RootDir}=Server) ->
exit(Reason);
true -> ok
end;
- [{_, {DbName, "_temp_" ++ _ = GroupId}}] ->
- delete_from_ets(FromPid, DbName, GroupId),
- [{_, Fd, Count}] = ets:lookup(couch_temp_group_fd_by_db, DbName),
- case Count of
- 1 -> % Last ref
- couch_file:close(Fd),
- file:delete(RootDir ++ "/." ++ binary_to_list(DbName) ++ "_temp"),
- true = ets:delete(couch_temp_group_fd_by_db, DbName);
- _ ->
- true = ets:insert(couch_temp_group_fd_by_db, {DbName, Fd, Count - 1})
- end;
[{_, {DbName, GroupId}}] ->
delete_from_ets(FromPid, DbName, GroupId)
end,
{noreply, Server}.
-add_to_ets(Pid, DbName, GroupId) ->
- true = ets:insert(couch_groups_by_updater, {Pid, {DbName, GroupId}}),
- true = ets:insert(group_servers_by_name, {{DbName, GroupId}, Pid}),
- true = ets:insert(couch_groups_by_db, {DbName, GroupId}).
+add_to_ets(Pid, DbName, Sig) ->
+ true = ets:insert(couch_groups_by_updater, {Pid, {DbName, Sig}}),
+ true = ets:insert(group_servers_by_sig, {{DbName, Sig}, Pid}),
+ true = ets:insert(couch_groups_by_db, {DbName, Sig}).
-delete_from_ets(Pid, DbName, GroupId) ->
+delete_from_ets(Pid, DbName, Sig) ->
true = ets:delete(couch_groups_by_updater, Pid),
- true = ets:delete(group_servers_by_name, {DbName, GroupId}),
- true = ets:delete_object(couch_groups_by_db, {DbName, GroupId}).
+ true = ets:delete(group_servers_by_sig, {DbName, Sig}),
+ true = ets:delete_object(couch_groups_by_db, {DbName, Sig}).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-
-
delete_index_dir(RootDir, DbName) ->
- nuke_dir(RootDir ++ "/." ++ binary_to_list(DbName) ++ "_design").
+ nuke_dir(RootDir ++ "/." ++ ?b2l(DbName) ++ "_design").
nuke_dir(Dir) ->
case file:list_dir(Dir) of