diff options
author | Adam Kocoloski <adam@cloudant.com> | 2010-08-11 15:22:33 -0400 |
---|---|---|
committer | Adam Kocoloski <adam@cloudant.com> | 2010-08-11 17:39:37 -0400 |
commit | 81bdbed444df2cbcf3cdb32f7d4a74019de06454 (patch) | |
tree | eade7d0d9bb4cac01b55fd8642adfe0f7da35161 /apps/couch/src/couch_view.erl | |
parent | cc1910f73fbd20c5ffc94bd61e7701d7f5e4c92a (diff) |
reorganize couch .erl and driver code into rebar layout
Diffstat (limited to 'apps/couch/src/couch_view.erl')
-rw-r--r-- | apps/couch/src/couch_view.erl | 438 |
1 files changed, 438 insertions, 0 deletions
diff --git a/apps/couch/src/couch_view.erl b/apps/couch/src/couch_view.erl new file mode 100644 index 00000000..38c0a783 --- /dev/null +++ b/apps/couch/src/couch_view.erl @@ -0,0 +1,438 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_view). +-behaviour(gen_server). + +-export([start_link/0,fold/4,less_json/2,less_json_ids/2,expand_dups/2, + detuple_kvs/2,init/1,terminate/2,handle_call/3,handle_cast/2,handle_info/2, + code_change/3,get_reduce_view/4,get_temp_reduce_view/5,get_temp_map_view/4, + get_map_view/4,get_row_count/1,reduce_to_count/1,fold_reduce/4, + extract_map_view/1,get_group_server/2,get_group_info/2,cleanup_index_files/1]). + +-include("couch_db.hrl"). + + +-record(server,{ + root_dir = []}). + +start_link() -> + gen_server:start_link({local, couch_view}, couch_view, [], []). + +get_temp_updater(DbName, Language, DesignOptions, MapSrc, RedSrc) -> + % make temp group + % do we need to close this db? + {ok, _Db, Group} = + couch_view_group:open_temp_group(DbName, Language, DesignOptions, MapSrc, RedSrc), + case gen_server:call(couch_view, {get_group_server, DbName, Group}) of + {ok, Pid} -> + Pid; + Error -> + throw(Error) + end. + +get_group_server(DbName, GroupId) -> + % get signature for group + case couch_view_group:open_db_group(DbName, GroupId) of + % do we need to close this db? + {ok, _Db, Group} -> + case gen_server:call(couch_view, {get_group_server, DbName, Group}) of + {ok, Pid} -> + Pid; + Error -> + throw(Error) + end; + Error -> + throw(Error) + end. + +get_group(Db, GroupId, Stale) -> + MinUpdateSeq = case Stale of + ok -> 0; + _Else -> couch_db:get_update_seq(Db) + end, + couch_view_group:request_group( + get_group_server(couch_db:name(Db), GroupId), + MinUpdateSeq). + +get_temp_group(Db, Language, DesignOptions, MapSrc, RedSrc) -> + couch_view_group:request_group( + get_temp_updater(couch_db:name(Db), Language, DesignOptions, MapSrc, RedSrc), + couch_db:get_update_seq(Db)). + +get_group_info(Db, GroupId) -> + couch_view_group:request_group_info( + get_group_server(couch_db:name(Db), GroupId)). + +cleanup_index_files(Db) -> + % load all ddocs + {ok, DesignDocs} = couch_db:get_design_docs(Db), + + % make unique list of group sigs + Sigs = lists:map(fun(#doc{id = GroupId}) -> + {ok, Info} = get_group_info(Db, GroupId), + ?b2l(couch_util:get_value(signature, Info)) + end, [DD||DD <- DesignDocs, DD#doc.deleted == false]), + + FileList = list_index_files(Db), + + % regex that matches all ddocs + RegExp = "("++ string:join(Sigs, "|") ++")", + + % filter out the ones in use + DeleteFiles = [FilePath + || FilePath <- FileList, + re:run(FilePath, RegExp, [{capture, none}]) =:= nomatch], + % delete unused files + ?LOG_DEBUG("deleting unused view index files: ~p",[DeleteFiles]), + RootDir = couch_config:get("couchdb", "view_index_dir"), + [couch_file:delete(RootDir,File,false)||File <- DeleteFiles], + ok. + +list_index_files(Db) -> + % call server to fetch the index files + RootDir = couch_config:get("couchdb", "view_index_dir"), + filelib:wildcard(RootDir ++ "/." ++ ?b2l(couch_db:name(Db)) ++ "_design"++"/*"). + + +get_row_count(#view{btree=Bt}) -> + {ok, {Count, _Reds}} = couch_btree:full_reduce(Bt), + {ok, Count}. + +get_temp_reduce_view(Db, Language, DesignOptions, MapSrc, RedSrc) -> + {ok, #group{views=[View]}=Group} = + get_temp_group(Db, Language, DesignOptions, MapSrc, RedSrc), + {ok, {temp_reduce, View}, Group}. + + +get_reduce_view(Db, GroupId, Name, Update) -> + case get_group(Db, GroupId, Update) of + {ok, #group{views=Views,def_lang=Lang}=Group} -> + case get_reduce_view0(Name, Lang, Views) of + {ok, View} -> + {ok, View, Group}; + Else -> + Else + end; + Error -> + Error + end. + +get_reduce_view0(_Name, _Lang, []) -> + {not_found, missing_named_view}; +get_reduce_view0(Name, Lang, [#view{reduce_funs=RedFuns}=View|Rest]) -> + case get_key_pos(Name, RedFuns, 0) of + 0 -> get_reduce_view0(Name, Lang, Rest); + N -> {ok, {reduce, N, Lang, View}} + end. + +extract_map_view({reduce, _N, _Lang, View}) -> + View. + +detuple_kvs([], Acc) -> + lists:reverse(Acc); +detuple_kvs([KV | Rest], Acc) -> + {{Key,Id},Value} = KV, + NKV = [[Key, Id], Value], + detuple_kvs(Rest, [NKV | Acc]). + +expand_dups([], Acc) -> + lists:reverse(Acc); +expand_dups([{Key, {dups, Vals}} | Rest], Acc) -> + Expanded = [{Key, Val} || Val <- Vals], + expand_dups(Rest, Expanded ++ Acc); +expand_dups([KV | Rest], Acc) -> + expand_dups(Rest, [KV | Acc]). + +fold_reduce({temp_reduce, #view{btree=Bt}}, Fun, Acc, Options) -> + WrapperFun = fun({GroupedKey, _}, PartialReds, Acc0) -> + {_, [Red]} = couch_btree:final_reduce(Bt, PartialReds), + Fun(GroupedKey, Red, Acc0) + end, + couch_btree:fold_reduce(Bt, WrapperFun, Acc, Options); + +fold_reduce({reduce, NthRed, Lang, #view{btree=Bt, reduce_funs=RedFuns}}, Fun, Acc, Options) -> + PreResultPadding = lists:duplicate(NthRed - 1, []), + PostResultPadding = lists:duplicate(length(RedFuns) - NthRed, []), + {_Name, FunSrc} = lists:nth(NthRed,RedFuns), + ReduceFun = + fun(reduce, KVs) -> + {ok, Reduced} = couch_query_servers:reduce(Lang, [FunSrc], detuple_kvs(expand_dups(KVs, []),[])), + {0, PreResultPadding ++ Reduced ++ PostResultPadding}; + (rereduce, Reds) -> + UserReds = [[lists:nth(NthRed, UserRedsList)] || {_, UserRedsList} <- Reds], + {ok, Reduced} = couch_query_servers:rereduce(Lang, [FunSrc], UserReds), + {0, PreResultPadding ++ Reduced ++ PostResultPadding} + end, + WrapperFun = fun({GroupedKey, _}, PartialReds, Acc0) -> + {_, Reds} = couch_btree:final_reduce(ReduceFun, PartialReds), + Fun(GroupedKey, lists:nth(NthRed, Reds), Acc0) + end, + couch_btree:fold_reduce(Bt, WrapperFun, Acc, Options). + +get_key_pos(_Key, [], _N) -> + 0; +get_key_pos(Key, [{Key1,_Value}|_], N) when Key == Key1 -> + N + 1; +get_key_pos(Key, [_|Rest], N) -> + get_key_pos(Key, Rest, N+1). + + +get_temp_map_view(Db, Language, DesignOptions, Src) -> + {ok, #group{views=[View]}=Group} = get_temp_group(Db, Language, DesignOptions, Src, []), + {ok, View, Group}. + +get_map_view(Db, GroupId, Name, Stale) -> + case get_group(Db, GroupId, Stale) of + {ok, #group{views=Views}=Group} -> + case get_map_view0(Name, Views) of + {ok, View} -> + {ok, View, Group}; + Else -> + Else + end; + Error -> + Error + end. + +get_map_view0(_Name, []) -> + {not_found, missing_named_view}; +get_map_view0(Name, [#view{map_names=MapNames}=View|Rest]) -> + case lists:member(Name, MapNames) of + true -> {ok, View}; + false -> get_map_view0(Name, Rest) + end. + +reduce_to_count(Reductions) -> + {Count, _} = + couch_btree:final_reduce( + fun(reduce, KVs) -> + Count = lists:sum( + [case V of {dups, Vals} -> length(Vals); _ -> 1 end + || {_,V} <- KVs]), + {Count, []}; + (rereduce, Reds) -> + {lists:sum([Count0 || {Count0, _} <- Reds]), []} + end, Reductions), + Count. + + + +fold_fun(_Fun, [], _, Acc) -> + {ok, Acc}; +fold_fun(Fun, [KV|Rest], {KVReds, Reds}, Acc) -> + case Fun(KV, {KVReds, Reds}, Acc) of + {ok, Acc2} -> + fold_fun(Fun, Rest, {[KV|KVReds], Reds}, Acc2); + {stop, Acc2} -> + {stop, Acc2} + end. + + +fold(#view{btree=Btree}, Fun, Acc, Options) -> + WrapperFun = + fun(KV, Reds, Acc2) -> + fold_fun(Fun, expand_dups([KV],[]), Reds, Acc2) + end, + {ok, _LastReduce, _AccResult} = couch_btree:fold(Btree, WrapperFun, Acc, Options). + + +init([]) -> + % read configuration settings and register for configuration changes + RootDir = couch_config:get("couchdb", "view_index_dir"), + Self = self(), + ok = couch_config:register( + fun("couchdb", "view_index_dir")-> + exit(Self, config_change) + end), + + couch_db_update_notifier:start_link( + fun({deleted, DbName}) -> + gen_server:cast(couch_view, {reset_indexes, DbName}); + ({created, DbName}) -> + gen_server:cast(couch_view, {reset_indexes, DbName}); + (_Else) -> + ok + end), + ets:new(couch_groups_by_db, [bag, private, named_table]), + ets:new(group_servers_by_sig, [set, protected, named_table]), + ets:new(couch_groups_by_updater, [set, private, named_table]), + process_flag(trap_exit, true), + ok = couch_file:init_delete_dir(RootDir), + {ok, #server{root_dir=RootDir}}. + + +terminate(_Reason, _Srv) -> + [couch_util:shutdown_sync(Pid) || {Pid, _} <- + ets:tab2list(couch_groups_by_updater)], + ok. + + +handle_call({get_group_server, DbName, + #group{name=GroupId,sig=Sig}=Group}, _From, #server{root_dir=Root}=Server) -> + case ets:lookup(group_servers_by_sig, {DbName, Sig}) of + [] -> + ?LOG_DEBUG("Spawning new group server for view group ~s in database ~s.", + [GroupId, DbName]), + case (catch couch_view_group:start_link({Root, DbName, Group})) of + {ok, NewPid} -> + add_to_ets(NewPid, DbName, Sig), + {reply, {ok, NewPid}, Server}; + {error, invalid_view_seq} -> + do_reset_indexes(DbName, Root), + case (catch couch_view_group:start_link({Root, DbName, Group})) of + {ok, NewPid} -> + add_to_ets(NewPid, DbName, Sig), + {reply, {ok, NewPid}, Server}; + Error -> + {reply, Error, Server} + end; + Error -> + {reply, Error, Server} + end; + [{_, ExistingPid}] -> + {reply, {ok, ExistingPid}, Server} + end. + +handle_cast({reset_indexes, DbName}, #server{root_dir=Root}=Server) -> + do_reset_indexes(DbName, Root), + {noreply, Server}. + +do_reset_indexes(DbName, Root) -> + % shutdown all the updaters and clear the files, the db got changed + Names = ets:lookup(couch_groups_by_db, DbName), + lists:foreach( + fun({_DbName, Sig}) -> + ?LOG_DEBUG("Killing update process for view group ~s. in database ~s.", [Sig, DbName]), + [{_, Pid}] = ets:lookup(group_servers_by_sig, {DbName, Sig}), + couch_util:shutdown_sync(Pid), + delete_from_ets(Pid, DbName, Sig) + end, Names), + delete_index_dir(Root, DbName), + RootDelDir = couch_config:get("couchdb", "view_index_dir"), + couch_file:delete(RootDelDir, Root ++ "/." ++ ?b2l(DbName) ++ "_temp"). + +handle_info({'EXIT', FromPid, Reason}, Server) -> + case ets:lookup(couch_groups_by_updater, FromPid) of + [] -> + if Reason /= normal -> + % non-updater linked process died, we propagate the error + ?LOG_ERROR("Exit on non-updater process: ~p", [Reason]), + exit(Reason); + true -> ok + end; + [{_, {DbName, GroupId}}] -> + delete_from_ets(FromPid, DbName, GroupId) + end, + {noreply, Server}. + +add_to_ets(Pid, DbName, Sig) -> + true = ets:insert(couch_groups_by_updater, {Pid, {DbName, Sig}}), + true = ets:insert(group_servers_by_sig, {{DbName, Sig}, Pid}), + true = ets:insert(couch_groups_by_db, {DbName, Sig}). + +delete_from_ets(Pid, DbName, Sig) -> + true = ets:delete(couch_groups_by_updater, Pid), + true = ets:delete(group_servers_by_sig, {DbName, Sig}), + true = ets:delete_object(couch_groups_by_db, {DbName, Sig}). + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + + +delete_index_dir(RootDir, DbName) -> + nuke_dir(RootDir, RootDir ++ "/." ++ ?b2l(DbName) ++ "_design"). + +nuke_dir(RootDelDir, Dir) -> + case file:list_dir(Dir) of + {error, enoent} -> ok; % doesn't exist + {ok, Files} -> + lists:foreach( + fun(File)-> + Full = Dir ++ "/" ++ File, + case couch_file:delete(RootDelDir, Full, false) of + ok -> ok; + {error, eperm} -> + ok = nuke_dir(RootDelDir, Full) + end + end, + Files), + ok = file:del_dir(Dir) + end. + + +% keys come back in the language of btree - tuples. +less_json_ids({JsonA, IdA}, {JsonB, IdB}) -> + case less_json0(JsonA, JsonB) of + 0 -> + IdA < IdB; + Result -> + Result < 0 + end. + +less_json(A,B) -> + less_json0(A,B) < 0. + +less_json0(A,A) -> 0; + +less_json0(A,B) when is_atom(A), is_atom(B) -> atom_sort(A) - atom_sort(B); +less_json0(A,_) when is_atom(A) -> -1; +less_json0(_,B) when is_atom(B) -> 1; + +less_json0(A,B) when is_number(A), is_number(B) -> A - B; +less_json0(A,_) when is_number(A) -> -1; +less_json0(_,B) when is_number(B) -> 1; + +less_json0(A,B) when is_binary(A), is_binary(B) -> couch_util:collate(A,B); +less_json0(A,_) when is_binary(A) -> -1; +less_json0(_,B) when is_binary(B) -> 1; + +less_json0(A,B) when is_list(A), is_list(B) -> less_list(A,B); +less_json0(A,_) when is_list(A) -> -1; +less_json0(_,B) when is_list(B) -> 1; + +less_json0({A},{B}) when is_list(A), is_list(B) -> less_props(A,B); +less_json0({A},_) when is_list(A) -> -1; +less_json0(_,{B}) when is_list(B) -> 1. + +atom_sort(null) -> 1; +atom_sort(false) -> 2; +atom_sort(true) -> 3. + +less_props([], [_|_]) -> + -1; +less_props(_, []) -> + 1; +less_props([{AKey, AValue}|RestA], [{BKey, BValue}|RestB]) -> + case couch_util:collate(AKey, BKey) of + 0 -> + case less_json0(AValue, BValue) of + 0 -> + less_props(RestA, RestB); + Result -> + Result + end; + Result -> + Result + end. + +less_list([], [_|_]) -> + -1; +less_list(_, []) -> + 1; +less_list([A|RestA], [B|RestB]) -> + case less_json0(A,B) of + 0 -> + less_list(RestA, RestB); + Result -> + Result + end. |