summaryrefslogtreecommitdiff
path: root/apps/couch/src/couch_view.erl
diff options
context:
space:
mode:
Diffstat (limited to 'apps/couch/src/couch_view.erl')
-rw-r--r--apps/couch/src/couch_view.erl480
1 files changed, 480 insertions, 0 deletions
diff --git a/apps/couch/src/couch_view.erl b/apps/couch/src/couch_view.erl
new file mode 100644
index 00000000..8d479d7e
--- /dev/null
+++ b/apps/couch/src/couch_view.erl
@@ -0,0 +1,480 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_view).
+-behaviour(gen_server).
+
+-export([start_link/0,fold/4,less_json/2,less_json_ids/2,expand_dups/2,
+ detuple_kvs/2,init/1,terminate/2,handle_call/3,handle_cast/2,handle_info/2,
+ code_change/3,get_reduce_view/4,get_temp_reduce_view/5,get_temp_map_view/4,
+ get_map_view/4,get_row_count/1,reduce_to_count/1,fold_reduce/4,
+ extract_map_view/1,get_group_server/2,get_group_info/2,
+ cleanup_index_files/1,config_change/2, data_size/2]).
+
+-include("couch_db.hrl").
+
+
+-record(server,{
+ root_dir = []}).
+
+start_link() ->
+ gen_server:start_link({local, couch_view}, couch_view, [], []).
+
+get_temp_updater(DbName, Language, DesignOptions, MapSrc, RedSrc) ->
+ {ok, Group} =
+ couch_view_group:open_temp_group(DbName, Language, DesignOptions, MapSrc, RedSrc),
+ case gen_server:call(couch_view, {get_group_server, DbName, Group}, infinity) of
+ {ok, Pid} ->
+ Pid;
+ Error ->
+ throw(Error)
+ end.
+
+get_group_server(DbName, GroupId) ->
+ case couch_view_group:open_db_group(DbName, GroupId) of
+ {ok, Group} ->
+ case gen_server:call(couch_view, {get_group_server, DbName, Group}, infinity) of
+ {ok, Pid} ->
+ Pid;
+ Error ->
+ throw(Error)
+ end;
+ Error ->
+ throw(Error)
+ end.
+
+get_group(Db, GroupId, Stale) ->
+ MinUpdateSeq = case Stale of
+ ok -> 0;
+ update_after -> 0;
+ _Else -> couch_db:get_update_seq(Db)
+ end,
+ GroupPid = get_group_server(couch_db:name(Db), GroupId),
+ Result = couch_view_group:request_group(GroupPid, MinUpdateSeq),
+ case Stale of
+ update_after ->
+ % best effort, process might die
+ spawn(fun() ->
+ LastSeq = couch_db:get_update_seq(Db),
+ couch_view_group:request_group(GroupPid, LastSeq)
+ end);
+ _ ->
+ ok
+ end,
+ Result.
+
+get_temp_group(Db, Language, DesignOptions, MapSrc, RedSrc) ->
+ couch_view_group:request_group(
+ get_temp_updater(couch_db:name(Db), Language, DesignOptions, MapSrc, RedSrc),
+ couch_db:get_update_seq(Db)).
+
+get_group_info(Db, GroupId) ->
+ couch_view_group:request_group_info(
+ get_group_server(couch_db:name(Db), GroupId)).
+
+cleanup_index_files(Db) ->
+ % load all ddocs
+ {ok, DesignDocs} = couch_db:get_design_docs(Db),
+
+ % make unique list of group sigs
+ Sigs = lists:map(fun(#doc{id = GroupId}) ->
+ {ok, Info} = get_group_info(Db, GroupId),
+ ?b2l(couch_util:get_value(signature, Info))
+ end, [DD||DD <- DesignDocs, DD#doc.deleted == false]),
+
+ FileList = list_index_files(Db),
+
+ DeleteFiles =
+ if length(Sigs) =:= 0 ->
+ FileList;
+ true ->
+ % regex that matches all ddocs
+ RegExp = "("++ string:join(Sigs, "|") ++")",
+
+ % filter out the ones in use
+ [FilePath || FilePath <- FileList,
+ re:run(FilePath, RegExp, [{capture, none}]) =:= nomatch]
+ end,
+
+ % delete unused files
+ ?LOG_DEBUG("deleting unused view index files: ~p",[DeleteFiles]),
+ RootDir = couch_config:get("couchdb", "view_index_dir"),
+ [couch_file:delete(RootDir,File,false)||File <- DeleteFiles],
+ ok.
+
+list_index_files(Db) ->
+ % call server to fetch the index files
+ RootDir = couch_config:get("couchdb", "view_index_dir"),
+ filelib:wildcard(RootDir ++ "/." ++ ?b2l(couch_db:name(Db)) ++ "_design"++"/*").
+
+
+get_row_count(#view{btree=Bt}) ->
+ {ok, {Count, _, _}} = couch_btree:full_reduce(Bt),
+ {ok, Count}.
+
+get_temp_reduce_view(Db, Language, DesignOptions, MapSrc, RedSrc) ->
+ {ok, #group{views=[View]}=Group} =
+ get_temp_group(Db, Language, DesignOptions, MapSrc, RedSrc),
+ {ok, {temp_reduce, View}, Group}.
+
+
+get_reduce_view(Db, GroupId, Name, Update) ->
+ case get_group(Db, GroupId, Update) of
+ {ok, #group{views=Views,def_lang=Lang}=Group} ->
+ case get_reduce_view0(Name, Lang, Views) of
+ {ok, View} ->
+ {ok, View, Group};
+ Else ->
+ Else
+ end;
+ Error ->
+ Error
+ end.
+
+get_reduce_view0(_Name, _Lang, []) ->
+ {not_found, missing_named_view};
+get_reduce_view0(Name, Lang, [#view{reduce_funs=RedFuns}=View|Rest]) ->
+ case get_key_pos(Name, RedFuns, 0) of
+ 0 -> get_reduce_view0(Name, Lang, Rest);
+ N -> {ok, {reduce, N, Lang, View}}
+ end.
+
+extract_map_view({reduce, _N, _Lang, View}) ->
+ View.
+
+detuple_kvs([], Acc) ->
+ lists:reverse(Acc);
+detuple_kvs([KV | Rest], Acc) ->
+ {{Key,Id},Value} = KV,
+ NKV = [[Key, Id], Value],
+ detuple_kvs(Rest, [NKV | Acc]).
+
+expand_dups([], Acc) ->
+ lists:reverse(Acc);
+expand_dups([{Key, {dups, Vals}} | Rest], Acc) ->
+ Expanded = [{Key, Val} || Val <- Vals],
+ expand_dups(Rest, Expanded ++ Acc);
+expand_dups([KV | Rest], Acc) ->
+ expand_dups(Rest, [KV | Acc]).
+
+data_size(KVList, Reduction) ->
+ lists:foldl(fun([[Key, _], Value], Acc) ->
+ size(term_to_binary(Key)) +
+ size(term_to_binary(Value)) +
+ Acc
+ end,size(term_to_binary(Reduction)),KVList).
+
+fold_reduce({temp_reduce, #view{btree=Bt}}, Fun, Acc, Options) ->
+ WrapperFun = fun({GroupedKey, _}, PartialReds, Acc0) ->
+ {_, [Red]} = couch_btree:final_reduce(Bt, PartialReds),
+ Fun(GroupedKey, Red, Acc0)
+ end,
+ couch_btree:fold_reduce(Bt, WrapperFun, Acc, Options);
+
+fold_reduce({reduce, NthRed, Lang, #view{btree=Bt, reduce_funs=RedFuns}}, Fun, Acc, Options) ->
+ PreResultPadding = lists:duplicate(NthRed - 1, []),
+ PostResultPadding = lists:duplicate(length(RedFuns) - NthRed, []),
+ {_Name, FunSrc} = lists:nth(NthRed,RedFuns),
+ ReduceFun =
+ fun(reduce, KVs) ->
+ {ok, Reduced} = couch_query_servers:reduce(Lang, [FunSrc], detuple_kvs(expand_dups(KVs, []),[])),
+ {0, PreResultPadding ++ Reduced ++ PostResultPadding};
+ (rereduce, Reds) ->
+ UserReds = [[lists:nth(NthRed, element(2, R))] || R <- Reds],
+ {ok, Reduced} = couch_query_servers:rereduce(Lang, [FunSrc], UserReds),
+ {0, PreResultPadding ++ Reduced ++ PostResultPadding}
+ end,
+ WrapperFun = fun({GroupedKey, _}, PartialReds, Acc0) ->
+ {_, Reds} = couch_btree:final_reduce(ReduceFun, PartialReds),
+ Fun(GroupedKey, lists:nth(NthRed, Reds), Acc0)
+ end,
+ couch_btree:fold_reduce(Bt, WrapperFun, Acc, Options).
+
+get_key_pos(_Key, [], _N) ->
+ 0;
+get_key_pos(Key, [{Key1,_Value}|_], N) when Key == Key1 ->
+ N + 1;
+get_key_pos(Key, [_|Rest], N) ->
+ get_key_pos(Key, Rest, N+1).
+
+
+get_temp_map_view(Db, Language, DesignOptions, Src) ->
+ {ok, #group{views=[View]}=Group} = get_temp_group(Db, Language, DesignOptions, Src, []),
+ {ok, View, Group}.
+
+get_map_view(Db, GroupId, Name, Stale) ->
+ case get_group(Db, GroupId, Stale) of
+ {ok, #group{views=Views}=Group} ->
+ case get_map_view0(Name, Views) of
+ {ok, View} ->
+ {ok, View, Group};
+ Else ->
+ Else
+ end;
+ Error ->
+ Error
+ end.
+
+get_map_view0(_Name, []) ->
+ {not_found, missing_named_view};
+get_map_view0(Name, [#view{map_names=MapNames}=View|Rest]) ->
+ case lists:member(Name, MapNames) of
+ true -> {ok, View};
+ false -> get_map_view0(Name, Rest)
+ end.
+
+reduce_to_count(Reductions) ->
+ {Count, _} =
+ couch_btree:final_reduce(
+ fun(reduce, KVs) ->
+ Count = lists:sum(
+ [case V of {dups, Vals} -> length(Vals); _ -> 1 end
+ || {_,V} <- KVs]),
+ {Count, []};
+ (rereduce, Reds) ->
+ {lists:sum([Count0 || {Count0, _} <- Reds]), []}
+ end, Reductions),
+ Count.
+
+
+
+fold_fun(_Fun, [], _, Acc) ->
+ {ok, Acc};
+fold_fun(Fun, [KV|Rest], {KVReds, Reds}, Acc) ->
+ case Fun(KV, {KVReds, Reds}, Acc) of
+ {ok, Acc2} ->
+ fold_fun(Fun, Rest, {[KV|KVReds], Reds}, Acc2);
+ {stop, Acc2} ->
+ {stop, Acc2}
+ end.
+
+
+fold(#view{btree=Btree}, Fun, Acc, Options) ->
+ WrapperFun =
+ fun(visit, KV, Reds, Acc2) ->
+ fold_fun(Fun, expand_dups([KV],[]), Reds, Acc2);
+ (traverse, LK, Red, Acc2)
+ when is_function(Fun, 4) ->
+ Fun(traverse, LK, Red, Acc2);
+ (traverse, _LK, Red, {_, Skip, _, _} = Acc2)
+ when Skip >= element(1, Red) ->
+ {skip, setelement(2, Acc2, Skip - element(1, Red))};
+ (traverse, _, _, Acc2) ->
+ {ok, Acc2}
+ end,
+ {ok, _LastReduce, _AccResult} = couch_btree:fold(Btree, WrapperFun, Acc, Options).
+
+
+init([]) ->
+ % read configuration settings and register for configuration changes
+ RootDir = couch_config:get("couchdb", "view_index_dir"),
+ ok = couch_config:register(fun ?MODULE:config_change/2),
+
+ couch_db_update_notifier:start_link(
+ fun({deleted, DbName}) ->
+ gen_server:cast(couch_view, {reset_indexes, DbName});
+ ({created, DbName}) ->
+ gen_server:cast(couch_view, {reset_indexes, DbName});
+ (_Else) ->
+ ok
+ end),
+ ets:new(couch_groups_by_db, [bag, private, named_table]),
+ ets:new(group_servers_by_sig, [set, protected, named_table]),
+ ets:new(couch_groups_by_updater, [set, private, named_table]),
+ process_flag(trap_exit, true),
+ ok = couch_file:init_delete_dir(RootDir),
+ {ok, #server{root_dir=RootDir}}.
+
+
+terminate(_Reason, _Srv) ->
+ [couch_util:shutdown_sync(Pid) || {Pid, _} <-
+ ets:tab2list(couch_groups_by_updater)],
+ ok.
+
+
+handle_call({get_group_server, DbName, #group{sig=Sig}=Group}, From,
+ #server{root_dir=Root}=Server) ->
+ case ets:lookup(group_servers_by_sig, {DbName, Sig}) of
+ [] ->
+ spawn_monitor(fun() -> new_group(Root, DbName, Group) end),
+ ets:insert(group_servers_by_sig, {{DbName, Sig}, [From]}),
+ {noreply, Server};
+ [{_, WaitList}] when is_list(WaitList) ->
+ ets:insert(group_servers_by_sig, {{DbName, Sig}, [From | WaitList]}),
+ {noreply, Server};
+ [{_, ExistingPid}] ->
+ {reply, {ok, ExistingPid}, Server}
+ end;
+
+handle_call({reset_indexes, DbName}, _From, #server{root_dir=Root}=Server) ->
+ do_reset_indexes(DbName, Root),
+ {reply, ok, Server}.
+
+handle_cast({reset_indexes, DbName}, #server{root_dir=Root}=Server) ->
+ do_reset_indexes(DbName, Root),
+ {noreply, Server}.
+
+new_group(Root, DbName, #group{name=GroupId, sig=Sig} = Group) ->
+ ?LOG_DEBUG("Spawning new group server for view group ~s in database ~s.",
+ [GroupId, DbName]),
+ case (catch couch_view_group:start_link({Root, DbName, Group})) of
+ {ok, NewPid} ->
+ unlink(NewPid),
+ exit({DbName, Sig, {ok, NewPid}});
+ {error, invalid_view_seq} ->
+ ok = gen_server:call(couch_view, {reset_indexes, DbName}),
+ new_group(Root, DbName, Group);
+ Error ->
+ exit({DbName, Sig, Error})
+ end.
+
+do_reset_indexes(DbName, Root) ->
+ % shutdown all the updaters and clear the files, the db got changed
+ Names = ets:lookup(couch_groups_by_db, DbName),
+ lists:foreach(
+ fun({_DbName, Sig}) ->
+ ?LOG_DEBUG("Killing update process for view group ~s. in database ~s.", [Sig, DbName]),
+ [{_, Pid}] = ets:lookup(group_servers_by_sig, {DbName, Sig}),
+ couch_util:shutdown_sync(Pid),
+ delete_from_ets(Pid, DbName, Sig)
+ end, Names),
+ delete_index_dir(Root, DbName),
+ RootDelDir = couch_config:get("couchdb", "view_index_dir"),
+ couch_file:delete(RootDelDir, Root ++ "/." ++ ?b2l(DbName) ++ "_temp").
+
+handle_info({'EXIT', FromPid, Reason}, Server) ->
+ case ets:lookup(couch_groups_by_updater, FromPid) of
+ [] ->
+ if Reason =/= normal, Reason =/= no_db_file ->
+ % non-updater linked process died, we propagate the error
+ ?LOG_ERROR("Exit on non-updater process: ~p", [Reason]),
+ exit(Reason);
+ true -> ok
+ end;
+ [{_, {DbName, GroupId}}] ->
+ delete_from_ets(FromPid, DbName, GroupId)
+ end,
+ {noreply, Server};
+
+handle_info({'DOWN', _, _, _, {DbName, Sig, Reply}}, Server) ->
+ [{_, WaitList}] = ets:lookup(group_servers_by_sig, {DbName, Sig}),
+ [gen_server:reply(From, Reply) || From <- WaitList],
+ case Reply of {ok, NewPid} ->
+ link(NewPid),
+ add_to_ets(NewPid, DbName, Sig);
+ _ -> ok end,
+ {noreply, Server}.
+
+config_change("couchdb", "view_index_dir") ->
+ exit(whereis(couch_view), config_change).
+
+add_to_ets(Pid, DbName, Sig) ->
+ true = ets:insert(couch_groups_by_updater, {Pid, {DbName, Sig}}),
+ true = ets:insert(group_servers_by_sig, {{DbName, Sig}, Pid}),
+ true = ets:insert(couch_groups_by_db, {DbName, Sig}).
+
+delete_from_ets(Pid, DbName, Sig) ->
+ true = ets:delete(couch_groups_by_updater, Pid),
+ true = ets:delete(group_servers_by_sig, {DbName, Sig}),
+ true = ets:delete_object(couch_groups_by_db, {DbName, Sig}).
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+
+delete_index_dir(RootDir, DbName) ->
+ nuke_dir(RootDir, RootDir ++ "/." ++ ?b2l(DbName) ++ "_design").
+
+nuke_dir(RootDelDir, Dir) ->
+ case file:list_dir(Dir) of
+ {error, enoent} -> ok; % doesn't exist
+ {ok, Files} ->
+ lists:foreach(
+ fun(File)->
+ Full = Dir ++ "/" ++ File,
+ case couch_file:delete(RootDelDir, Full, false) of
+ ok -> ok;
+ {error, eperm} ->
+ ok = nuke_dir(RootDelDir, Full)
+ end
+ end,
+ Files),
+ ok = file:del_dir(Dir)
+ end.
+
+
+% keys come back in the language of btree - tuples.
+less_json_ids({JsonA, IdA}, {JsonB, IdB}) ->
+ case less_json0(JsonA, JsonB) of
+ 0 ->
+ IdA < IdB;
+ Result ->
+ Result < 0
+ end.
+
+less_json(A,B) ->
+ less_json0(A,B) < 0.
+
+less_json0(A,A) -> 0;
+
+less_json0(A,B) when is_atom(A), is_atom(B) -> atom_sort(A) - atom_sort(B);
+less_json0(A,_) when is_atom(A) -> -1;
+less_json0(_,B) when is_atom(B) -> 1;
+
+less_json0(A,B) when is_number(A), is_number(B) -> A - B;
+less_json0(A,_) when is_number(A) -> -1;
+less_json0(_,B) when is_number(B) -> 1;
+
+less_json0(A,B) when is_binary(A), is_binary(B) -> couch_util:collate(A,B);
+less_json0(A,_) when is_binary(A) -> -1;
+less_json0(_,B) when is_binary(B) -> 1;
+
+less_json0(A,B) when is_list(A), is_list(B) -> less_list(A,B);
+less_json0(A,_) when is_list(A) -> -1;
+less_json0(_,B) when is_list(B) -> 1;
+
+less_json0({A},{B}) when is_list(A), is_list(B) -> less_props(A,B);
+less_json0({A},_) when is_list(A) -> -1;
+less_json0(_,{B}) when is_list(B) -> 1.
+
+atom_sort(null) -> 1;
+atom_sort(false) -> 2;
+atom_sort(true) -> 3.
+
+less_props([], [_|_]) ->
+ -1;
+less_props(_, []) ->
+ 1;
+less_props([{AKey, AValue}|RestA], [{BKey, BValue}|RestB]) ->
+ case couch_util:collate(AKey, BKey) of
+ 0 ->
+ case less_json0(AValue, BValue) of
+ 0 ->
+ less_props(RestA, RestB);
+ Result ->
+ Result
+ end;
+ Result ->
+ Result
+ end.
+
+less_list([], [_|_]) ->
+ -1;
+less_list(_, []) ->
+ 1;
+less_list([A|RestA], [B|RestB]) ->
+ case less_json0(A,B) of
+ 0 ->
+ less_list(RestA, RestB);
+ Result ->
+ Result
+ end.