summaryrefslogtreecommitdiff
path: root/apps/couch/src/couch_view.erl
diff options
context:
space:
mode:
authorAdam Kocoloski <adam@cloudant.com>2010-08-11 15:22:33 -0400
committerAdam Kocoloski <adam@cloudant.com>2010-08-11 17:39:37 -0400
commit81bdbed444df2cbcf3cdb32f7d4a74019de06454 (patch)
treeeade7d0d9bb4cac01b55fd8642adfe0f7da35161 /apps/couch/src/couch_view.erl
parentcc1910f73fbd20c5ffc94bd61e7701d7f5e4c92a (diff)
reorganize couch .erl and driver code into rebar layout
Diffstat (limited to 'apps/couch/src/couch_view.erl')
-rw-r--r--apps/couch/src/couch_view.erl438
1 files changed, 438 insertions, 0 deletions
diff --git a/apps/couch/src/couch_view.erl b/apps/couch/src/couch_view.erl
new file mode 100644
index 00000000..38c0a783
--- /dev/null
+++ b/apps/couch/src/couch_view.erl
@@ -0,0 +1,438 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_view).
+-behaviour(gen_server).
+
+-export([start_link/0,fold/4,less_json/2,less_json_ids/2,expand_dups/2,
+ detuple_kvs/2,init/1,terminate/2,handle_call/3,handle_cast/2,handle_info/2,
+ code_change/3,get_reduce_view/4,get_temp_reduce_view/5,get_temp_map_view/4,
+ get_map_view/4,get_row_count/1,reduce_to_count/1,fold_reduce/4,
+ extract_map_view/1,get_group_server/2,get_group_info/2,cleanup_index_files/1]).
+
+-include("couch_db.hrl").
+
+
+-record(server,{
+ root_dir = []}).
+
+start_link() ->
+ gen_server:start_link({local, couch_view}, couch_view, [], []).
+
+get_temp_updater(DbName, Language, DesignOptions, MapSrc, RedSrc) ->
+ % make temp group
+ % do we need to close this db?
+ {ok, _Db, Group} =
+ couch_view_group:open_temp_group(DbName, Language, DesignOptions, MapSrc, RedSrc),
+ case gen_server:call(couch_view, {get_group_server, DbName, Group}) of
+ {ok, Pid} ->
+ Pid;
+ Error ->
+ throw(Error)
+ end.
+
+get_group_server(DbName, GroupId) ->
+ % get signature for group
+ case couch_view_group:open_db_group(DbName, GroupId) of
+ % do we need to close this db?
+ {ok, _Db, Group} ->
+ case gen_server:call(couch_view, {get_group_server, DbName, Group}) of
+ {ok, Pid} ->
+ Pid;
+ Error ->
+ throw(Error)
+ end;
+ Error ->
+ throw(Error)
+ end.
+
+get_group(Db, GroupId, Stale) ->
+ MinUpdateSeq = case Stale of
+ ok -> 0;
+ _Else -> couch_db:get_update_seq(Db)
+ end,
+ couch_view_group:request_group(
+ get_group_server(couch_db:name(Db), GroupId),
+ MinUpdateSeq).
+
+get_temp_group(Db, Language, DesignOptions, MapSrc, RedSrc) ->
+ couch_view_group:request_group(
+ get_temp_updater(couch_db:name(Db), Language, DesignOptions, MapSrc, RedSrc),
+ couch_db:get_update_seq(Db)).
+
+get_group_info(Db, GroupId) ->
+ couch_view_group:request_group_info(
+ get_group_server(couch_db:name(Db), GroupId)).
+
+cleanup_index_files(Db) ->
+ % load all ddocs
+ {ok, DesignDocs} = couch_db:get_design_docs(Db),
+
+ % make unique list of group sigs
+ Sigs = lists:map(fun(#doc{id = GroupId}) ->
+ {ok, Info} = get_group_info(Db, GroupId),
+ ?b2l(couch_util:get_value(signature, Info))
+ end, [DD||DD <- DesignDocs, DD#doc.deleted == false]),
+
+ FileList = list_index_files(Db),
+
+ % regex that matches all ddocs
+ RegExp = "("++ string:join(Sigs, "|") ++")",
+
+ % filter out the ones in use
+ DeleteFiles = [FilePath
+ || FilePath <- FileList,
+ re:run(FilePath, RegExp, [{capture, none}]) =:= nomatch],
+ % delete unused files
+ ?LOG_DEBUG("deleting unused view index files: ~p",[DeleteFiles]),
+ RootDir = couch_config:get("couchdb", "view_index_dir"),
+ [couch_file:delete(RootDir,File,false)||File <- DeleteFiles],
+ ok.
+
+list_index_files(Db) ->
+ % call server to fetch the index files
+ RootDir = couch_config:get("couchdb", "view_index_dir"),
+ filelib:wildcard(RootDir ++ "/." ++ ?b2l(couch_db:name(Db)) ++ "_design"++"/*").
+
+
+get_row_count(#view{btree=Bt}) ->
+ {ok, {Count, _Reds}} = couch_btree:full_reduce(Bt),
+ {ok, Count}.
+
+get_temp_reduce_view(Db, Language, DesignOptions, MapSrc, RedSrc) ->
+ {ok, #group{views=[View]}=Group} =
+ get_temp_group(Db, Language, DesignOptions, MapSrc, RedSrc),
+ {ok, {temp_reduce, View}, Group}.
+
+
+get_reduce_view(Db, GroupId, Name, Update) ->
+ case get_group(Db, GroupId, Update) of
+ {ok, #group{views=Views,def_lang=Lang}=Group} ->
+ case get_reduce_view0(Name, Lang, Views) of
+ {ok, View} ->
+ {ok, View, Group};
+ Else ->
+ Else
+ end;
+ Error ->
+ Error
+ end.
+
+get_reduce_view0(_Name, _Lang, []) ->
+ {not_found, missing_named_view};
+get_reduce_view0(Name, Lang, [#view{reduce_funs=RedFuns}=View|Rest]) ->
+ case get_key_pos(Name, RedFuns, 0) of
+ 0 -> get_reduce_view0(Name, Lang, Rest);
+ N -> {ok, {reduce, N, Lang, View}}
+ end.
+
+extract_map_view({reduce, _N, _Lang, View}) ->
+ View.
+
+detuple_kvs([], Acc) ->
+ lists:reverse(Acc);
+detuple_kvs([KV | Rest], Acc) ->
+ {{Key,Id},Value} = KV,
+ NKV = [[Key, Id], Value],
+ detuple_kvs(Rest, [NKV | Acc]).
+
+expand_dups([], Acc) ->
+ lists:reverse(Acc);
+expand_dups([{Key, {dups, Vals}} | Rest], Acc) ->
+ Expanded = [{Key, Val} || Val <- Vals],
+ expand_dups(Rest, Expanded ++ Acc);
+expand_dups([KV | Rest], Acc) ->
+ expand_dups(Rest, [KV | Acc]).
+
+fold_reduce({temp_reduce, #view{btree=Bt}}, Fun, Acc, Options) ->
+ WrapperFun = fun({GroupedKey, _}, PartialReds, Acc0) ->
+ {_, [Red]} = couch_btree:final_reduce(Bt, PartialReds),
+ Fun(GroupedKey, Red, Acc0)
+ end,
+ couch_btree:fold_reduce(Bt, WrapperFun, Acc, Options);
+
+fold_reduce({reduce, NthRed, Lang, #view{btree=Bt, reduce_funs=RedFuns}}, Fun, Acc, Options) ->
+ PreResultPadding = lists:duplicate(NthRed - 1, []),
+ PostResultPadding = lists:duplicate(length(RedFuns) - NthRed, []),
+ {_Name, FunSrc} = lists:nth(NthRed,RedFuns),
+ ReduceFun =
+ fun(reduce, KVs) ->
+ {ok, Reduced} = couch_query_servers:reduce(Lang, [FunSrc], detuple_kvs(expand_dups(KVs, []),[])),
+ {0, PreResultPadding ++ Reduced ++ PostResultPadding};
+ (rereduce, Reds) ->
+ UserReds = [[lists:nth(NthRed, UserRedsList)] || {_, UserRedsList} <- Reds],
+ {ok, Reduced} = couch_query_servers:rereduce(Lang, [FunSrc], UserReds),
+ {0, PreResultPadding ++ Reduced ++ PostResultPadding}
+ end,
+ WrapperFun = fun({GroupedKey, _}, PartialReds, Acc0) ->
+ {_, Reds} = couch_btree:final_reduce(ReduceFun, PartialReds),
+ Fun(GroupedKey, lists:nth(NthRed, Reds), Acc0)
+ end,
+ couch_btree:fold_reduce(Bt, WrapperFun, Acc, Options).
+
+get_key_pos(_Key, [], _N) ->
+ 0;
+get_key_pos(Key, [{Key1,_Value}|_], N) when Key == Key1 ->
+ N + 1;
+get_key_pos(Key, [_|Rest], N) ->
+ get_key_pos(Key, Rest, N+1).
+
+
+get_temp_map_view(Db, Language, DesignOptions, Src) ->
+ {ok, #group{views=[View]}=Group} = get_temp_group(Db, Language, DesignOptions, Src, []),
+ {ok, View, Group}.
+
+get_map_view(Db, GroupId, Name, Stale) ->
+ case get_group(Db, GroupId, Stale) of
+ {ok, #group{views=Views}=Group} ->
+ case get_map_view0(Name, Views) of
+ {ok, View} ->
+ {ok, View, Group};
+ Else ->
+ Else
+ end;
+ Error ->
+ Error
+ end.
+
+get_map_view0(_Name, []) ->
+ {not_found, missing_named_view};
+get_map_view0(Name, [#view{map_names=MapNames}=View|Rest]) ->
+ case lists:member(Name, MapNames) of
+ true -> {ok, View};
+ false -> get_map_view0(Name, Rest)
+ end.
+
+reduce_to_count(Reductions) ->
+ {Count, _} =
+ couch_btree:final_reduce(
+ fun(reduce, KVs) ->
+ Count = lists:sum(
+ [case V of {dups, Vals} -> length(Vals); _ -> 1 end
+ || {_,V} <- KVs]),
+ {Count, []};
+ (rereduce, Reds) ->
+ {lists:sum([Count0 || {Count0, _} <- Reds]), []}
+ end, Reductions),
+ Count.
+
+
+
+fold_fun(_Fun, [], _, Acc) ->
+ {ok, Acc};
+fold_fun(Fun, [KV|Rest], {KVReds, Reds}, Acc) ->
+ case Fun(KV, {KVReds, Reds}, Acc) of
+ {ok, Acc2} ->
+ fold_fun(Fun, Rest, {[KV|KVReds], Reds}, Acc2);
+ {stop, Acc2} ->
+ {stop, Acc2}
+ end.
+
+
+fold(#view{btree=Btree}, Fun, Acc, Options) ->
+ WrapperFun =
+ fun(KV, Reds, Acc2) ->
+ fold_fun(Fun, expand_dups([KV],[]), Reds, Acc2)
+ end,
+ {ok, _LastReduce, _AccResult} = couch_btree:fold(Btree, WrapperFun, Acc, Options).
+
+
+init([]) ->
+ % read configuration settings and register for configuration changes
+ RootDir = couch_config:get("couchdb", "view_index_dir"),
+ Self = self(),
+ ok = couch_config:register(
+ fun("couchdb", "view_index_dir")->
+ exit(Self, config_change)
+ end),
+
+ couch_db_update_notifier:start_link(
+ fun({deleted, DbName}) ->
+ gen_server:cast(couch_view, {reset_indexes, DbName});
+ ({created, DbName}) ->
+ gen_server:cast(couch_view, {reset_indexes, DbName});
+ (_Else) ->
+ ok
+ end),
+ ets:new(couch_groups_by_db, [bag, private, named_table]),
+ ets:new(group_servers_by_sig, [set, protected, named_table]),
+ ets:new(couch_groups_by_updater, [set, private, named_table]),
+ process_flag(trap_exit, true),
+ ok = couch_file:init_delete_dir(RootDir),
+ {ok, #server{root_dir=RootDir}}.
+
+
+terminate(_Reason, _Srv) ->
+ [couch_util:shutdown_sync(Pid) || {Pid, _} <-
+ ets:tab2list(couch_groups_by_updater)],
+ ok.
+
+
+handle_call({get_group_server, DbName,
+ #group{name=GroupId,sig=Sig}=Group}, _From, #server{root_dir=Root}=Server) ->
+ case ets:lookup(group_servers_by_sig, {DbName, Sig}) of
+ [] ->
+ ?LOG_DEBUG("Spawning new group server for view group ~s in database ~s.",
+ [GroupId, DbName]),
+ case (catch couch_view_group:start_link({Root, DbName, Group})) of
+ {ok, NewPid} ->
+ add_to_ets(NewPid, DbName, Sig),
+ {reply, {ok, NewPid}, Server};
+ {error, invalid_view_seq} ->
+ do_reset_indexes(DbName, Root),
+ case (catch couch_view_group:start_link({Root, DbName, Group})) of
+ {ok, NewPid} ->
+ add_to_ets(NewPid, DbName, Sig),
+ {reply, {ok, NewPid}, Server};
+ Error ->
+ {reply, Error, Server}
+ end;
+ Error ->
+ {reply, Error, Server}
+ end;
+ [{_, ExistingPid}] ->
+ {reply, {ok, ExistingPid}, Server}
+ end.
+
+handle_cast({reset_indexes, DbName}, #server{root_dir=Root}=Server) ->
+ do_reset_indexes(DbName, Root),
+ {noreply, Server}.
+
+do_reset_indexes(DbName, Root) ->
+ % shutdown all the updaters and clear the files, the db got changed
+ Names = ets:lookup(couch_groups_by_db, DbName),
+ lists:foreach(
+ fun({_DbName, Sig}) ->
+ ?LOG_DEBUG("Killing update process for view group ~s. in database ~s.", [Sig, DbName]),
+ [{_, Pid}] = ets:lookup(group_servers_by_sig, {DbName, Sig}),
+ couch_util:shutdown_sync(Pid),
+ delete_from_ets(Pid, DbName, Sig)
+ end, Names),
+ delete_index_dir(Root, DbName),
+ RootDelDir = couch_config:get("couchdb", "view_index_dir"),
+ couch_file:delete(RootDelDir, Root ++ "/." ++ ?b2l(DbName) ++ "_temp").
+
+handle_info({'EXIT', FromPid, Reason}, Server) ->
+ case ets:lookup(couch_groups_by_updater, FromPid) of
+ [] ->
+ if Reason /= normal ->
+ % non-updater linked process died, we propagate the error
+ ?LOG_ERROR("Exit on non-updater process: ~p", [Reason]),
+ exit(Reason);
+ true -> ok
+ end;
+ [{_, {DbName, GroupId}}] ->
+ delete_from_ets(FromPid, DbName, GroupId)
+ end,
+ {noreply, Server}.
+
+add_to_ets(Pid, DbName, Sig) ->
+ true = ets:insert(couch_groups_by_updater, {Pid, {DbName, Sig}}),
+ true = ets:insert(group_servers_by_sig, {{DbName, Sig}, Pid}),
+ true = ets:insert(couch_groups_by_db, {DbName, Sig}).
+
+delete_from_ets(Pid, DbName, Sig) ->
+ true = ets:delete(couch_groups_by_updater, Pid),
+ true = ets:delete(group_servers_by_sig, {DbName, Sig}),
+ true = ets:delete_object(couch_groups_by_db, {DbName, Sig}).
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+
+delete_index_dir(RootDir, DbName) ->
+ nuke_dir(RootDir, RootDir ++ "/." ++ ?b2l(DbName) ++ "_design").
+
+nuke_dir(RootDelDir, Dir) ->
+ case file:list_dir(Dir) of
+ {error, enoent} -> ok; % doesn't exist
+ {ok, Files} ->
+ lists:foreach(
+ fun(File)->
+ Full = Dir ++ "/" ++ File,
+ case couch_file:delete(RootDelDir, Full, false) of
+ ok -> ok;
+ {error, eperm} ->
+ ok = nuke_dir(RootDelDir, Full)
+ end
+ end,
+ Files),
+ ok = file:del_dir(Dir)
+ end.
+
+
+% keys come back in the language of btree - tuples.
+less_json_ids({JsonA, IdA}, {JsonB, IdB}) ->
+ case less_json0(JsonA, JsonB) of
+ 0 ->
+ IdA < IdB;
+ Result ->
+ Result < 0
+ end.
+
+less_json(A,B) ->
+ less_json0(A,B) < 0.
+
+less_json0(A,A) -> 0;
+
+less_json0(A,B) when is_atom(A), is_atom(B) -> atom_sort(A) - atom_sort(B);
+less_json0(A,_) when is_atom(A) -> -1;
+less_json0(_,B) when is_atom(B) -> 1;
+
+less_json0(A,B) when is_number(A), is_number(B) -> A - B;
+less_json0(A,_) when is_number(A) -> -1;
+less_json0(_,B) when is_number(B) -> 1;
+
+less_json0(A,B) when is_binary(A), is_binary(B) -> couch_util:collate(A,B);
+less_json0(A,_) when is_binary(A) -> -1;
+less_json0(_,B) when is_binary(B) -> 1;
+
+less_json0(A,B) when is_list(A), is_list(B) -> less_list(A,B);
+less_json0(A,_) when is_list(A) -> -1;
+less_json0(_,B) when is_list(B) -> 1;
+
+less_json0({A},{B}) when is_list(A), is_list(B) -> less_props(A,B);
+less_json0({A},_) when is_list(A) -> -1;
+less_json0(_,{B}) when is_list(B) -> 1.
+
+atom_sort(null) -> 1;
+atom_sort(false) -> 2;
+atom_sort(true) -> 3.
+
+less_props([], [_|_]) ->
+ -1;
+less_props(_, []) ->
+ 1;
+less_props([{AKey, AValue}|RestA], [{BKey, BValue}|RestB]) ->
+ case couch_util:collate(AKey, BKey) of
+ 0 ->
+ case less_json0(AValue, BValue) of
+ 0 ->
+ less_props(RestA, RestB);
+ Result ->
+ Result
+ end;
+ Result ->
+ Result
+ end.
+
+less_list([], [_|_]) ->
+ -1;
+less_list(_, []) ->
+ 1;
+less_list([A|RestA], [B|RestB]) ->
+ case less_json0(A,B) of
+ 0 ->
+ less_list(RestA, RestB);
+ Result ->
+ Result
+ end.