diff options
Diffstat (limited to 'apps/couch/src/couch_view.erl')
-rw-r--r-- | apps/couch/src/couch_view.erl | 480 |
1 files changed, 480 insertions, 0 deletions
diff --git a/apps/couch/src/couch_view.erl b/apps/couch/src/couch_view.erl new file mode 100644 index 00000000..8d479d7e --- /dev/null +++ b/apps/couch/src/couch_view.erl @@ -0,0 +1,480 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_view). +-behaviour(gen_server). + +-export([start_link/0,fold/4,less_json/2,less_json_ids/2,expand_dups/2, + detuple_kvs/2,init/1,terminate/2,handle_call/3,handle_cast/2,handle_info/2, + code_change/3,get_reduce_view/4,get_temp_reduce_view/5,get_temp_map_view/4, + get_map_view/4,get_row_count/1,reduce_to_count/1,fold_reduce/4, + extract_map_view/1,get_group_server/2,get_group_info/2, + cleanup_index_files/1,config_change/2, data_size/2]). + +-include("couch_db.hrl"). + + +-record(server,{ + root_dir = []}). + +start_link() -> + gen_server:start_link({local, couch_view}, couch_view, [], []). + +get_temp_updater(DbName, Language, DesignOptions, MapSrc, RedSrc) -> + {ok, Group} = + couch_view_group:open_temp_group(DbName, Language, DesignOptions, MapSrc, RedSrc), + case gen_server:call(couch_view, {get_group_server, DbName, Group}, infinity) of + {ok, Pid} -> + Pid; + Error -> + throw(Error) + end. + +get_group_server(DbName, GroupId) -> + case couch_view_group:open_db_group(DbName, GroupId) of + {ok, Group} -> + case gen_server:call(couch_view, {get_group_server, DbName, Group}, infinity) of + {ok, Pid} -> + Pid; + Error -> + throw(Error) + end; + Error -> + throw(Error) + end. + +get_group(Db, GroupId, Stale) -> + MinUpdateSeq = case Stale of + ok -> 0; + update_after -> 0; + _Else -> couch_db:get_update_seq(Db) + end, + GroupPid = get_group_server(couch_db:name(Db), GroupId), + Result = couch_view_group:request_group(GroupPid, MinUpdateSeq), + case Stale of + update_after -> + % best effort, process might die + spawn(fun() -> + LastSeq = couch_db:get_update_seq(Db), + couch_view_group:request_group(GroupPid, LastSeq) + end); + _ -> + ok + end, + Result. + +get_temp_group(Db, Language, DesignOptions, MapSrc, RedSrc) -> + couch_view_group:request_group( + get_temp_updater(couch_db:name(Db), Language, DesignOptions, MapSrc, RedSrc), + couch_db:get_update_seq(Db)). + +get_group_info(Db, GroupId) -> + couch_view_group:request_group_info( + get_group_server(couch_db:name(Db), GroupId)). + +cleanup_index_files(Db) -> + % load all ddocs + {ok, DesignDocs} = couch_db:get_design_docs(Db), + + % make unique list of group sigs + Sigs = lists:map(fun(#doc{id = GroupId}) -> + {ok, Info} = get_group_info(Db, GroupId), + ?b2l(couch_util:get_value(signature, Info)) + end, [DD||DD <- DesignDocs, DD#doc.deleted == false]), + + FileList = list_index_files(Db), + + DeleteFiles = + if length(Sigs) =:= 0 -> + FileList; + true -> + % regex that matches all ddocs + RegExp = "("++ string:join(Sigs, "|") ++")", + + % filter out the ones in use + [FilePath || FilePath <- FileList, + re:run(FilePath, RegExp, [{capture, none}]) =:= nomatch] + end, + + % delete unused files + ?LOG_DEBUG("deleting unused view index files: ~p",[DeleteFiles]), + RootDir = couch_config:get("couchdb", "view_index_dir"), + [couch_file:delete(RootDir,File,false)||File <- DeleteFiles], + ok. + +list_index_files(Db) -> + % call server to fetch the index files + RootDir = couch_config:get("couchdb", "view_index_dir"), + filelib:wildcard(RootDir ++ "/." ++ ?b2l(couch_db:name(Db)) ++ "_design"++"/*"). + + +get_row_count(#view{btree=Bt}) -> + {ok, {Count, _, _}} = couch_btree:full_reduce(Bt), + {ok, Count}. + +get_temp_reduce_view(Db, Language, DesignOptions, MapSrc, RedSrc) -> + {ok, #group{views=[View]}=Group} = + get_temp_group(Db, Language, DesignOptions, MapSrc, RedSrc), + {ok, {temp_reduce, View}, Group}. + + +get_reduce_view(Db, GroupId, Name, Update) -> + case get_group(Db, GroupId, Update) of + {ok, #group{views=Views,def_lang=Lang}=Group} -> + case get_reduce_view0(Name, Lang, Views) of + {ok, View} -> + {ok, View, Group}; + Else -> + Else + end; + Error -> + Error + end. + +get_reduce_view0(_Name, _Lang, []) -> + {not_found, missing_named_view}; +get_reduce_view0(Name, Lang, [#view{reduce_funs=RedFuns}=View|Rest]) -> + case get_key_pos(Name, RedFuns, 0) of + 0 -> get_reduce_view0(Name, Lang, Rest); + N -> {ok, {reduce, N, Lang, View}} + end. + +extract_map_view({reduce, _N, _Lang, View}) -> + View. + +detuple_kvs([], Acc) -> + lists:reverse(Acc); +detuple_kvs([KV | Rest], Acc) -> + {{Key,Id},Value} = KV, + NKV = [[Key, Id], Value], + detuple_kvs(Rest, [NKV | Acc]). + +expand_dups([], Acc) -> + lists:reverse(Acc); +expand_dups([{Key, {dups, Vals}} | Rest], Acc) -> + Expanded = [{Key, Val} || Val <- Vals], + expand_dups(Rest, Expanded ++ Acc); +expand_dups([KV | Rest], Acc) -> + expand_dups(Rest, [KV | Acc]). + +data_size(KVList, Reduction) -> + lists:foldl(fun([[Key, _], Value], Acc) -> + size(term_to_binary(Key)) + + size(term_to_binary(Value)) + + Acc + end,size(term_to_binary(Reduction)),KVList). + +fold_reduce({temp_reduce, #view{btree=Bt}}, Fun, Acc, Options) -> + WrapperFun = fun({GroupedKey, _}, PartialReds, Acc0) -> + {_, [Red]} = couch_btree:final_reduce(Bt, PartialReds), + Fun(GroupedKey, Red, Acc0) + end, + couch_btree:fold_reduce(Bt, WrapperFun, Acc, Options); + +fold_reduce({reduce, NthRed, Lang, #view{btree=Bt, reduce_funs=RedFuns}}, Fun, Acc, Options) -> + PreResultPadding = lists:duplicate(NthRed - 1, []), + PostResultPadding = lists:duplicate(length(RedFuns) - NthRed, []), + {_Name, FunSrc} = lists:nth(NthRed,RedFuns), + ReduceFun = + fun(reduce, KVs) -> + {ok, Reduced} = couch_query_servers:reduce(Lang, [FunSrc], detuple_kvs(expand_dups(KVs, []),[])), + {0, PreResultPadding ++ Reduced ++ PostResultPadding}; + (rereduce, Reds) -> + UserReds = [[lists:nth(NthRed, element(2, R))] || R <- Reds], + {ok, Reduced} = couch_query_servers:rereduce(Lang, [FunSrc], UserReds), + {0, PreResultPadding ++ Reduced ++ PostResultPadding} + end, + WrapperFun = fun({GroupedKey, _}, PartialReds, Acc0) -> + {_, Reds} = couch_btree:final_reduce(ReduceFun, PartialReds), + Fun(GroupedKey, lists:nth(NthRed, Reds), Acc0) + end, + couch_btree:fold_reduce(Bt, WrapperFun, Acc, Options). + +get_key_pos(_Key, [], _N) -> + 0; +get_key_pos(Key, [{Key1,_Value}|_], N) when Key == Key1 -> + N + 1; +get_key_pos(Key, [_|Rest], N) -> + get_key_pos(Key, Rest, N+1). + + +get_temp_map_view(Db, Language, DesignOptions, Src) -> + {ok, #group{views=[View]}=Group} = get_temp_group(Db, Language, DesignOptions, Src, []), + {ok, View, Group}. + +get_map_view(Db, GroupId, Name, Stale) -> + case get_group(Db, GroupId, Stale) of + {ok, #group{views=Views}=Group} -> + case get_map_view0(Name, Views) of + {ok, View} -> + {ok, View, Group}; + Else -> + Else + end; + Error -> + Error + end. + +get_map_view0(_Name, []) -> + {not_found, missing_named_view}; +get_map_view0(Name, [#view{map_names=MapNames}=View|Rest]) -> + case lists:member(Name, MapNames) of + true -> {ok, View}; + false -> get_map_view0(Name, Rest) + end. + +reduce_to_count(Reductions) -> + {Count, _} = + couch_btree:final_reduce( + fun(reduce, KVs) -> + Count = lists:sum( + [case V of {dups, Vals} -> length(Vals); _ -> 1 end + || {_,V} <- KVs]), + {Count, []}; + (rereduce, Reds) -> + {lists:sum([Count0 || {Count0, _} <- Reds]), []} + end, Reductions), + Count. + + + +fold_fun(_Fun, [], _, Acc) -> + {ok, Acc}; +fold_fun(Fun, [KV|Rest], {KVReds, Reds}, Acc) -> + case Fun(KV, {KVReds, Reds}, Acc) of + {ok, Acc2} -> + fold_fun(Fun, Rest, {[KV|KVReds], Reds}, Acc2); + {stop, Acc2} -> + {stop, Acc2} + end. + + +fold(#view{btree=Btree}, Fun, Acc, Options) -> + WrapperFun = + fun(visit, KV, Reds, Acc2) -> + fold_fun(Fun, expand_dups([KV],[]), Reds, Acc2); + (traverse, LK, Red, Acc2) + when is_function(Fun, 4) -> + Fun(traverse, LK, Red, Acc2); + (traverse, _LK, Red, {_, Skip, _, _} = Acc2) + when Skip >= element(1, Red) -> + {skip, setelement(2, Acc2, Skip - element(1, Red))}; + (traverse, _, _, Acc2) -> + {ok, Acc2} + end, + {ok, _LastReduce, _AccResult} = couch_btree:fold(Btree, WrapperFun, Acc, Options). + + +init([]) -> + % read configuration settings and register for configuration changes + RootDir = couch_config:get("couchdb", "view_index_dir"), + ok = couch_config:register(fun ?MODULE:config_change/2), + + couch_db_update_notifier:start_link( + fun({deleted, DbName}) -> + gen_server:cast(couch_view, {reset_indexes, DbName}); + ({created, DbName}) -> + gen_server:cast(couch_view, {reset_indexes, DbName}); + (_Else) -> + ok + end), + ets:new(couch_groups_by_db, [bag, private, named_table]), + ets:new(group_servers_by_sig, [set, protected, named_table]), + ets:new(couch_groups_by_updater, [set, private, named_table]), + process_flag(trap_exit, true), + ok = couch_file:init_delete_dir(RootDir), + {ok, #server{root_dir=RootDir}}. + + +terminate(_Reason, _Srv) -> + [couch_util:shutdown_sync(Pid) || {Pid, _} <- + ets:tab2list(couch_groups_by_updater)], + ok. + + +handle_call({get_group_server, DbName, #group{sig=Sig}=Group}, From, + #server{root_dir=Root}=Server) -> + case ets:lookup(group_servers_by_sig, {DbName, Sig}) of + [] -> + spawn_monitor(fun() -> new_group(Root, DbName, Group) end), + ets:insert(group_servers_by_sig, {{DbName, Sig}, [From]}), + {noreply, Server}; + [{_, WaitList}] when is_list(WaitList) -> + ets:insert(group_servers_by_sig, {{DbName, Sig}, [From | WaitList]}), + {noreply, Server}; + [{_, ExistingPid}] -> + {reply, {ok, ExistingPid}, Server} + end; + +handle_call({reset_indexes, DbName}, _From, #server{root_dir=Root}=Server) -> + do_reset_indexes(DbName, Root), + {reply, ok, Server}. + +handle_cast({reset_indexes, DbName}, #server{root_dir=Root}=Server) -> + do_reset_indexes(DbName, Root), + {noreply, Server}. + +new_group(Root, DbName, #group{name=GroupId, sig=Sig} = Group) -> + ?LOG_DEBUG("Spawning new group server for view group ~s in database ~s.", + [GroupId, DbName]), + case (catch couch_view_group:start_link({Root, DbName, Group})) of + {ok, NewPid} -> + unlink(NewPid), + exit({DbName, Sig, {ok, NewPid}}); + {error, invalid_view_seq} -> + ok = gen_server:call(couch_view, {reset_indexes, DbName}), + new_group(Root, DbName, Group); + Error -> + exit({DbName, Sig, Error}) + end. + +do_reset_indexes(DbName, Root) -> + % shutdown all the updaters and clear the files, the db got changed + Names = ets:lookup(couch_groups_by_db, DbName), + lists:foreach( + fun({_DbName, Sig}) -> + ?LOG_DEBUG("Killing update process for view group ~s. in database ~s.", [Sig, DbName]), + [{_, Pid}] = ets:lookup(group_servers_by_sig, {DbName, Sig}), + couch_util:shutdown_sync(Pid), + delete_from_ets(Pid, DbName, Sig) + end, Names), + delete_index_dir(Root, DbName), + RootDelDir = couch_config:get("couchdb", "view_index_dir"), + couch_file:delete(RootDelDir, Root ++ "/." ++ ?b2l(DbName) ++ "_temp"). + +handle_info({'EXIT', FromPid, Reason}, Server) -> + case ets:lookup(couch_groups_by_updater, FromPid) of + [] -> + if Reason =/= normal, Reason =/= no_db_file -> + % non-updater linked process died, we propagate the error + ?LOG_ERROR("Exit on non-updater process: ~p", [Reason]), + exit(Reason); + true -> ok + end; + [{_, {DbName, GroupId}}] -> + delete_from_ets(FromPid, DbName, GroupId) + end, + {noreply, Server}; + +handle_info({'DOWN', _, _, _, {DbName, Sig, Reply}}, Server) -> + [{_, WaitList}] = ets:lookup(group_servers_by_sig, {DbName, Sig}), + [gen_server:reply(From, Reply) || From <- WaitList], + case Reply of {ok, NewPid} -> + link(NewPid), + add_to_ets(NewPid, DbName, Sig); + _ -> ok end, + {noreply, Server}. + +config_change("couchdb", "view_index_dir") -> + exit(whereis(couch_view), config_change). + +add_to_ets(Pid, DbName, Sig) -> + true = ets:insert(couch_groups_by_updater, {Pid, {DbName, Sig}}), + true = ets:insert(group_servers_by_sig, {{DbName, Sig}, Pid}), + true = ets:insert(couch_groups_by_db, {DbName, Sig}). + +delete_from_ets(Pid, DbName, Sig) -> + true = ets:delete(couch_groups_by_updater, Pid), + true = ets:delete(group_servers_by_sig, {DbName, Sig}), + true = ets:delete_object(couch_groups_by_db, {DbName, Sig}). + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + + +delete_index_dir(RootDir, DbName) -> + nuke_dir(RootDir, RootDir ++ "/." ++ ?b2l(DbName) ++ "_design"). + +nuke_dir(RootDelDir, Dir) -> + case file:list_dir(Dir) of + {error, enoent} -> ok; % doesn't exist + {ok, Files} -> + lists:foreach( + fun(File)-> + Full = Dir ++ "/" ++ File, + case couch_file:delete(RootDelDir, Full, false) of + ok -> ok; + {error, eperm} -> + ok = nuke_dir(RootDelDir, Full) + end + end, + Files), + ok = file:del_dir(Dir) + end. + + +% keys come back in the language of btree - tuples. +less_json_ids({JsonA, IdA}, {JsonB, IdB}) -> + case less_json0(JsonA, JsonB) of + 0 -> + IdA < IdB; + Result -> + Result < 0 + end. + +less_json(A,B) -> + less_json0(A,B) < 0. + +less_json0(A,A) -> 0; + +less_json0(A,B) when is_atom(A), is_atom(B) -> atom_sort(A) - atom_sort(B); +less_json0(A,_) when is_atom(A) -> -1; +less_json0(_,B) when is_atom(B) -> 1; + +less_json0(A,B) when is_number(A), is_number(B) -> A - B; +less_json0(A,_) when is_number(A) -> -1; +less_json0(_,B) when is_number(B) -> 1; + +less_json0(A,B) when is_binary(A), is_binary(B) -> couch_util:collate(A,B); +less_json0(A,_) when is_binary(A) -> -1; +less_json0(_,B) when is_binary(B) -> 1; + +less_json0(A,B) when is_list(A), is_list(B) -> less_list(A,B); +less_json0(A,_) when is_list(A) -> -1; +less_json0(_,B) when is_list(B) -> 1; + +less_json0({A},{B}) when is_list(A), is_list(B) -> less_props(A,B); +less_json0({A},_) when is_list(A) -> -1; +less_json0(_,{B}) when is_list(B) -> 1. + +atom_sort(null) -> 1; +atom_sort(false) -> 2; +atom_sort(true) -> 3. + +less_props([], [_|_]) -> + -1; +less_props(_, []) -> + 1; +less_props([{AKey, AValue}|RestA], [{BKey, BValue}|RestB]) -> + case couch_util:collate(AKey, BKey) of + 0 -> + case less_json0(AValue, BValue) of + 0 -> + less_props(RestA, RestB); + Result -> + Result + end; + Result -> + Result + end. + +less_list([], [_|_]) -> + -1; +less_list(_, []) -> + 1; +less_list([A|RestA], [B|RestB]) -> + case less_json0(A,B) of + 0 -> + less_list(RestA, RestB); + Result -> + Result + end. |