summaryrefslogtreecommitdiff
path: root/src/couchdb/couch_view.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/couchdb/couch_view.erl')
-rw-r--r--src/couchdb/couch_view.erl332
1 files changed, 205 insertions, 127 deletions
diff --git a/src/couchdb/couch_view.erl b/src/couchdb/couch_view.erl
index d8006eba..cf03cccb 100644
--- a/src/couchdb/couch_view.erl
+++ b/src/couchdb/couch_view.erl
@@ -15,8 +15,9 @@
-module(couch_view).
-behaviour(gen_server).
--export([start_link/1,fold/4,fold/5,less_json/2, start_update_loop/3, start_temp_update_loop/4]).
+-export([start_link/1,fold/4,fold/5,less_json/2, start_update_loop/3, start_temp_update_loop/5]).
-export([init/1,terminate/2,handle_call/3,handle_cast/2,handle_info/2,code_change/3]).
+-export([get_reduce_view/1, get_map_view/1,get_row_count/1,reduce_to_count/1, reduce/3]).
-include("couch_db.hrl").
@@ -26,16 +27,18 @@
name,
def_lang,
views,
- id_btree,
- current_seq,
+ reductions=[], % list of reduction names and id_num of view that contains it.
+ id_btree=nil,
+ current_seq=0,
query_server=nil
}).
-record(view,
{id_num,
- name,
- btree,
- def
+ map_names=[],
+ def,
+ btree=nil,
+ reduce_funs=[]
}).
-record(server,
@@ -47,8 +50,8 @@ start_link(RootDir) ->
-get_temp_updater(DbName, Type, Src) ->
- {ok, Pid} = gen_server:call(couch_view, {start_temp_updater, DbName, Type, Src}),
+get_temp_updater(DbName, Type, MapSrc, RedSrc) ->
+ {ok, Pid} = gen_server:call(couch_view, {start_temp_updater, DbName, Type, MapSrc, RedSrc}),
Pid.
get_updater(DbName, GroupId) ->
@@ -75,44 +78,135 @@ get_updated_group(Pid) ->
end
end.
-fold(ViewInfo, Dir, Fun, Acc) ->
- fold(ViewInfo, nil, Dir, Fun, Acc).
+get_row_count(#view{btree=Bt}) ->
+ {ok, Reds} = couch_btree:partial_reduce(Bt, nil, nil),
+ {ok, reduce_to_count(Reds)}.
+
+get_reduce_view({temp, DbName, Type, MapSrc, RedSrc}) ->
+ {ok, #group{views=[View]}} = get_updated_group(get_temp_updater(DbName, Type, MapSrc, RedSrc)),
+ {ok, {temp_reduce, View}};
+get_reduce_view({DbName, GroupId, Name}) ->
+ {ok, #group{views=Views,def_lang=Lang}} =
+ get_updated_group(get_updater(DbName, GroupId)),
+ get_reduce_view0(Name, Lang, Views).
+
+get_reduce_view0(_Name, _Lang, []) ->
+ {not_found, missing_named_view};
+get_reduce_view0(Name, Lang, [#view{reduce_funs=RedFuns}=View|Rest]) ->
+ case get_key_pos(Name, RedFuns, 0) of
+ 0 -> get_reduce_view0(Name, Lang, Rest);
+ N -> {ok, {reduce, N, Lang, View}}
+ end.
-fold({temp, DbName, Type, Src}, StartKey, Dir, Fun, Acc) ->
- {ok, #group{views=[View]}} = get_updated_group(get_temp_updater(DbName, Type, Src)),
- fold_view(View#view.btree, StartKey, Dir, Fun, Acc);
-fold({DbName, GroupId, ViewName}, StartKey, Dir, Fun, Acc) ->
+reduce({temp_reduce, #view{btree=Bt}}, Key1, Key2) ->
+ {ok, {_Count, [Reduction]}} = couch_btree:reduce(Bt, Key1, Key2),
+ {ok, Reduction};
+
+reduce({reduce, NthRed, Lang, #view{btree=Bt, reduce_funs=RedFuns}}, Key1, Key2) ->
+ {ok, PartialReductions} = couch_btree:partial_reduce(Bt, Key1, Key2),
+ PreResultPadding = lists:duplicate(NthRed - 1, []),
+ PostResultPadding = lists:duplicate(length(RedFuns) - NthRed, []),
+ {_Name, FunSrc} = lists:nth(NthRed,RedFuns),
+ ReduceFun =
+ fun(reduce, KVs) ->
+ {ok, Reduced} = couch_query_servers:reduce(Lang, [FunSrc], KVs),
+ {0, PreResultPadding ++ Reduced ++ PostResultPadding};
+ (combine, Reds) ->
+ UserReds = [[lists:nth(NthRed, UserRedsList)] || {_, UserRedsList} <- Reds],
+ {ok, Reduced} = couch_query_servers:combine(Lang, [FunSrc], UserReds),
+ {0, PreResultPadding ++ Reduced ++ PostResultPadding}
+ end,
+ {0, [FinalReduction]} = couch_btree:final_reduce(ReduceFun, PartialReductions),
+ {ok, FinalReduction}.
+
+get_key_pos(_Key, [], _N) ->
+ 0;
+get_key_pos(Key, [{Key1,_Value}|_], N) when Key == Key1 ->
+ N + 1;
+get_key_pos(Key, [_|Rest], N) ->
+ get_key_pos(Key, Rest, N+1).
+
+get_map_view({temp, DbName, Type, Src}) ->
+ {ok, #group{views=[View]}} = get_updated_group(get_temp_updater(DbName, Type, Src, [])),
+ {ok, View};
+get_map_view({DbName, GroupId, Name}) ->
{ok, #group{views=Views}} = get_updated_group(get_updater(DbName, GroupId)),
- Btree = get_view_btree(Views, ViewName),
- fold_view(Btree, StartKey, Dir, Fun, Acc).
+ get_map_view0(Name, Views).
+
+get_map_view0(_Name, []) ->
+ {not_found, missing_named_view};
+get_map_view0(Name, [#view{map_names=MapNames}=View|Rest]) ->
+ case lists:member(Name, MapNames) of
+ true -> {ok, View};
+ false -> get_map_view0(Name, Rest)
+ end.
+
+reduce_to_count(Reductions) ->
+ {Count, _} =
+ couch_btree:final_reduce(
+ fun(reduce, KVs) ->
+ {length(KVs), []};
+ (combine, Reds) ->
+ {lists:sum([Count0 || {Count0, _} <- Reds]), []}
+ end, Reductions),
+ Count.
+
+
+design_doc_to_view_group(#doc{id=Id,body={obj, Fields}}) ->
+ Language = proplists:get_value("language", Fields, "text/javascript"),
+ {obj, RawViews} = proplists:get_value("views", Fields, {obj, []}),
-fold_view(Btree, StartKey, Dir, Fun, Acc) ->
- TotalRowCount = couch_btree:row_count(Btree),
- WrapperFun = fun({{Key, DocId}, Value}, Offset, WrapperAcc) ->
- Fun(DocId, Key, Value, Offset, TotalRowCount, WrapperAcc)
- end,
- {ok, AccResult} = couch_btree:fold(Btree, StartKey, Dir, WrapperFun, Acc),
- {ok, TotalRowCount, AccResult}.
+ % extract the map/reduce views from the json fields and into lists
+ MapViewsRaw = [{Name, Src, nil} || {Name, Src} <- RawViews, is_list(Src)],
+ MapReduceViewsRaw =
+ [{Name,
+ proplists:get_value("map", MRFuns),
+ proplists:get_value("reduce", MRFuns)}
+ || {Name, {obj, MRFuns}} <- RawViews],
+
+ % add the views to a dictionary object, with the map source as the key
+ DictBySrc =
+ lists:foldl(
+ fun({Name, MapSrc, RedSrc}, DictBySrcAcc) ->
+ View =
+ case dict:find(MapSrc, DictBySrcAcc) of
+ {ok, View0} -> View0;
+ error -> #view{def=MapSrc} % create new view object
+ end,
+ View2 =
+ if RedSrc == nil ->
+ View#view{map_names=[Name|View#view.map_names]};
+ true ->
+ View#view{reduce_funs=[{Name,RedSrc}|View#view.reduce_funs]}
+ end,
+ dict:store(MapSrc, View2, DictBySrcAcc)
+ end, dict:new(), MapViewsRaw ++ MapReduceViewsRaw),
+ % number the views
+ {Views, _N} = lists:mapfoldl(
+ fun({_Src, View}, N) ->
+ {View#view{id_num=N},N+1}
+ end, 0, dict:to_list(DictBySrc)),
+
+ reset_group(#group{name=Id, views=Views, def_lang=Language}).
+
+
+fold(#view{btree=Btree}, Dir, Fun, Acc) ->
+ {ok, _AccResult} = couch_btree:fold(Btree, Dir, Fun, Acc).
-get_view_btree([], _ViewName) ->
- throw({not_found, missing_named_view});
-get_view_btree([View | _RestViews], ViewName) when View#view.name == ViewName ->
- View#view.btree;
-get_view_btree([_View | RestViews], ViewName) ->
- get_view_btree(RestViews, ViewName).
+fold(#view{btree=Btree}, StartKey, Dir, Fun, Acc) ->
+ {ok, _AccResult} = couch_btree:fold(Btree, StartKey, Dir, Fun, Acc).
init(RootDir) ->
- UpdateNotifierFun =
+ couch_db_update_notifier:start_link(
fun({deleted, DbName}) ->
gen_server:cast(couch_view, {reset_indexes, DbName});
({created, DbName}) ->
gen_server:cast(couch_view, {reset_indexes, DbName});
(_Else) ->
ok
- end,
- couch_db_update_notifier:start_link(UpdateNotifierFun),
+ end),
ets:new(couch_views_by_db, [bag, private, named_table]),
ets:new(couch_views_by_name, [set, protected, named_table]),
ets:new(couch_views_by_updater, [set, private, named_table]),
@@ -127,8 +221,8 @@ terminate(_Reason, _) ->
catch ets:delete(couch_views_temp_fd_by_db).
-handle_call({start_temp_updater, DbName, Lang, Query}, _From, #server{root_dir=Root}=Server) ->
- <<SigInt:128/integer>> = erlang:md5(Lang ++ Query),
+handle_call({start_temp_updater, DbName, Lang, MapSrc, RedSrc}, _From, #server{root_dir=Root}=Server) ->
+ <<SigInt:128/integer>> = erlang:md5(Lang ++ [0] ++ MapSrc ++ [0] ++ RedSrc),
Name = lists:flatten(io_lib:format("_temp_~.36B",[SigInt])),
Pid =
case ets:lookup(couch_views_by_name, {DbName, Name}) of
@@ -142,7 +236,8 @@ handle_call({start_temp_updater, DbName, Lang, Query}, _From, #server{root_dir=R
ok
end,
?LOG_DEBUG("Spawning new temp update process for db ~s.", [DbName]),
- NewPid = spawn_link(couch_view, start_temp_update_loop, [DbName, Fd, Lang, Query]),
+ NewPid = spawn_link(couch_view, start_temp_update_loop,
+ [DbName, Fd, Lang, MapSrc, RedSrc]),
true = ets:insert(couch_views_temp_fd_by_db, {DbName, Fd, Count + 1}),
add_to_ets(NewPid, DbName, Name),
NewPid;
@@ -219,18 +314,22 @@ code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-start_temp_update_loop(DbName, Fd, Lang, Query) ->
+start_temp_update_loop(DbName, Fd, Lang, MapSrc, RedSrc) ->
NotifyPids = get_notify_pids(1000),
case couch_server:open(DbName) of
{ok, Db} ->
- View = #view{name="_temp", id_num=0, btree=nil, def=Query},
+ View = #view{map_names=["_temp"],
+ id_num=0,
+ btree=nil,
+ def=MapSrc,
+ reduce_funs= if RedSrc==[] -> []; true -> [{"_temp", RedSrc}] end},
Group = #group{name="_temp",
db=Db,
views=[View],
current_seq=0,
def_lang=Lang,
id_btree=nil},
- Group2 = disk_group_to_mem(Fd, Group),
+ Group2 = disk_group_to_mem(Db, Fd, Group),
temp_update_loop(Group2, NotifyPids);
Else ->
exit(Else)
@@ -242,24 +341,24 @@ temp_update_loop(Group, NotifyPids) ->
garbage_collect(),
temp_update_loop(Group2, get_notify_pids(10000)).
+
+reset_group(#group{views=Views}=Group) ->
+ Views2 = [View#view{btree=nil} || View <- Views],
+ Group#group{db=nil,fd=nil,query_server=nil,current_seq=0,
+ id_btree=nil,views=Views2}.
+
start_update_loop(RootDir, DbName, GroupId) ->
% wait for a notify request before doing anything. This way, we can just
% exit and any exits will be noticed by the callers.
start_update_loop(RootDir, DbName, GroupId, get_notify_pids(1000)).
start_update_loop(RootDir, DbName, GroupId, NotifyPids) ->
- {Db, DefLang, Defs} =
- case couch_server:open(DbName) of
+ {Db, DbGroup} =
+ case (catch couch_server:open(DbName)) of
{ok, Db0} ->
- case couch_db:open_doc(Db0, GroupId) of
+ case (catch couch_db:open_doc(Db0, GroupId)) of
{ok, Doc} ->
- case couch_doc:get_view_functions(Doc) of
- none ->
- delete_index_file(RootDir, DbName, GroupId),
- exit({not_found, no_views_found});
- {DefLang0, Defs0} ->
- {Db0, DefLang0, Defs0}
- end;
+ {Db0, design_doc_to_view_group(Doc)};
Else ->
delete_index_file(RootDir, DbName, GroupId),
exit(Else)
@@ -268,26 +367,48 @@ start_update_loop(RootDir, DbName, GroupId, NotifyPids) ->
delete_index_file(RootDir, DbName, GroupId),
exit(Else)
end,
- Group = open_index_file(RootDir, DbName, GroupId, DefLang, Defs),
+ FileName = RootDir ++ "/." ++ DbName ++ GroupId ++".view",
+ Group =
+ case couch_file:open(FileName) of
+ {ok, Fd} ->
+ case couch_file:read_header(Fd, <<$r, $c, $k, 0>>) of
+ {ok, ExistingDiskGroup} ->
+ % validate all the view definitions in the index are correct.
+ case reset_group(ExistingDiskGroup) == reset_group(DbGroup) of
+ true -> disk_group_to_mem(Db, Fd, ExistingDiskGroup);
+ false -> reset_file(Db, Fd, DbName, DbGroup)
+ end;
+ _ ->
+ reset_file(Db, Fd, DbName, DbGroup)
+ end;
+ {error, enoent} ->
+ case couch_file:open(FileName, [create]) of
+ {ok, Fd} -> reset_file(Db, Fd, DbName, DbGroup);
+ Error -> throw(Error)
+ end
+ end,
- try update_loop(Group#group{db=Db}, NotifyPids) of
- _ -> ok
+ update_loop(RootDir, DbName, GroupId, Group, NotifyPids).
+
+reset_file(Db, Fd, DbName, #group{name=Name} = DiskReadyGroup) ->
+ ?LOG_DEBUG("Reseting group index \"~s\" in db ~s", [Name, DbName]),
+ ok = couch_file:truncate(Fd, 0),
+ ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, DiskReadyGroup),
+ disk_group_to_mem(Db, Fd, DiskReadyGroup).
+
+update_loop(RootDir, DbName, GroupId, #group{fd=Fd}=Group, NotifyPids) ->
+ try update_group(Group) of
+ {ok, Group2} ->
+ ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, mem_group_to_disk(Group2)),
+ [Pid ! {self(), {ok, Group2}} || Pid <- NotifyPids],
+ garbage_collect(),
+ update_loop(RootDir, DbName, GroupId, Group2, get_notify_pids(100000))
catch
restart ->
couch_file:close(Group#group.fd),
start_update_loop(RootDir, DbName, GroupId, NotifyPids ++ get_notify_pids())
end.
-update_loop(#group{fd=Fd}=Group, NotifyPids) ->
- {ok, Group2} = update_group(Group),
- ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, mem_group_to_disk(Group2)),
- [Pid ! {self(), {ok, Group2}} || Pid <- NotifyPids],
- garbage_collect(),
- update_loop(Group2).
-
-update_loop(Group) ->
- update_loop(Group, get_notify_pids(100000)).
-
% wait for the first request to come in.
get_notify_pids(Wait) ->
receive
@@ -351,51 +472,29 @@ nuke_dir(Dir) ->
delete_index_file(RootDir, DbName, GroupId) ->
file:delete(RootDir ++ "/." ++ DbName ++ GroupId ++ ".view").
-
-open_index_file(RootDir, DbName, GroupId, ViewLang, ViewDefs) ->
- FileName = RootDir ++ "/." ++ DbName ++ GroupId ++".view",
- case couch_file:open(FileName) of
- {ok, Fd} ->
- case couch_file:read_header(Fd, <<$r, $c, $k, 0>>) of
- {ok, #group{views=Views}=Group} ->
- % validate all the view definitions in the index are correct.
- case same_view_def(Views, ViewDefs) of
- true -> disk_group_to_mem(Fd, Group);
- false -> reset_header(GroupId, Fd, ViewLang, ViewDefs)
- end;
- _ ->
- reset_header(GroupId, Fd, ViewLang, ViewDefs)
- end;
- _ ->
- case couch_file:open(FileName, [create]) of
- {ok, Fd} ->
- reset_header(GroupId, Fd, ViewLang, ViewDefs);
- Error ->
- throw(Error)
- end
- end.
-
-same_view_def([], []) ->
- true;
-same_view_def(DiskViews, ViewDefs) when DiskViews == [] orelse ViewDefs == []->
- false;
-same_view_def([#view{name=DiskName,def=DiskDef}|RestViews], [{Name, Def}|RestDefs]) ->
- if DiskName == Name andalso DiskDef == Def ->
- same_view_def(RestViews, RestDefs);
- true ->
- false
- end.
% Given a disk ready group structure, return an initialized, in-memory version.
-disk_group_to_mem(Fd, #group{id_btree=IdState,views=Views}=Group) ->
+disk_group_to_mem(Db, Fd, #group{id_btree=IdState,def_lang=Lang,views=Views}=Group) ->
{ok, IdBtree} = couch_btree:open(IdState, Fd),
Views2 = lists:map(
- fun(#view{btree=BtreeState}=View) ->
- {ok, Btree} = couch_btree:open(BtreeState, Fd, [{less, fun less_json/2}]),
+ fun(#view{btree=BtreeState,reduce_funs=RedFuns}=View) ->
+ FunSrcs = [FunSrc || {_Name, FunSrc} <- RedFuns],
+ ReduceFun =
+ fun(reduce, KVs) ->
+ {ok, Reduced} = couch_query_servers:reduce(Lang, FunSrcs, KVs),
+ {length(KVs), Reduced};
+ (combine, Reds) ->
+ Count = lists:sum([Count0 || {Count0, _} <- Reds]),
+ UserReds = [UserRedsList || {_, UserRedsList} <- Reds],
+ {ok, Reduced} = couch_query_servers:combine(Lang, FunSrcs, UserReds),
+ {Count, Reduced}
+ end,
+ {ok, Btree} = couch_btree:open(BtreeState, Fd,
+ [{less, fun less_json/2},{reduce, ReduceFun}]),
View#view{btree=Btree}
end,
Views),
- Group#group{fd=Fd, id_btree=IdBtree, views=Views2}.
+ Group#group{db=Db, fd=Fd, id_btree=IdBtree, views=Views2}.
% Given an initialized, in-memory group structure, return a disk ready version.
mem_group_to_disk(#group{id_btree=IdBtree,views=Views}=Group) ->
@@ -405,23 +504,7 @@ mem_group_to_disk(#group{id_btree=IdBtree,views=Views}=Group) ->
View#view{btree=State}
end,
Views),
- Group#group{fd=nil, id_btree=couch_btree:get_state(IdBtree), views=Views2}.
-
-reset_header(GroupId, Fd, DefLanguage, NamedViews) ->
- couch_file:truncate(Fd, 0),
- {Views, _N} = lists:mapfoldl(
- fun({Name, Definiton}, N) ->
- {#view{name=Name, id_num=N, btree=nil, def=Definiton}, N+1}
- end,
- 0, NamedViews),
- Group = #group{name=GroupId,
- fd=Fd,
- views=Views,
- current_seq=0,
- def_lang=DefLanguage,
- id_btree=nil},
- ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, Group),
- disk_group_to_mem(Fd, Group).
+ Group#group{db=nil, fd=nil, id_btree=couch_btree:get_state(IdBtree), views=Views2}.
@@ -506,17 +589,12 @@ process_doc(Db, DocInfo, {Docs, #group{name=GroupId}=Group, ViewKVs, DocIdViewId
% anything in the definition changed.
case couch_db:open_doc(Db, DocInfo) of
{ok, Doc} ->
- case couch_doc:get_view_functions(Doc) of
- none ->
- throw(restart);
- {DefLang, NewDefs} ->
- case Group#group.def_lang == DefLang andalso same_view_def(Group#group.views, NewDefs) of
- true ->
- % nothing changed, keeping on computing
- {ok, {Docs, Group, ViewKVs, DocIdViewIdKeys, Seq}};
- false ->
- throw(restart)
- end
+ case design_doc_to_view_group(Doc) == reset_group(Group) of
+ true ->
+ % nothing changed, keeping on computing
+ {ok, {Docs, Group, ViewKVs, DocIdViewIdKeys, Seq}};
+ false ->
+ throw(restart)
end;
{not_found, deleted} ->
throw(restart)