From a684f95cbcee7f2568a2ce04e7dc2bbb605a27b3 Mon Sep 17 00:00:00 2001 From: "Damien F. Katz" Date: Thu, 15 May 2008 21:51:22 +0000 Subject: Incremental reduce first checkin. Warning! Disk format change. git-svn-id: https://svn.apache.org/repos/asf/incubator/couchdb/trunk@656861 13f79535-47bb-0310-9956-ffa450edef68 --- src/couchdb/couch_view.erl | 332 ++++++++++++++++++++++++++++----------------- 1 file changed, 205 insertions(+), 127 deletions(-) (limited to 'src/couchdb/couch_view.erl') diff --git a/src/couchdb/couch_view.erl b/src/couchdb/couch_view.erl index d8006eba..cf03cccb 100644 --- a/src/couchdb/couch_view.erl +++ b/src/couchdb/couch_view.erl @@ -15,8 +15,9 @@ -module(couch_view). -behaviour(gen_server). --export([start_link/1,fold/4,fold/5,less_json/2, start_update_loop/3, start_temp_update_loop/4]). +-export([start_link/1,fold/4,fold/5,less_json/2, start_update_loop/3, start_temp_update_loop/5]). -export([init/1,terminate/2,handle_call/3,handle_cast/2,handle_info/2,code_change/3]). +-export([get_reduce_view/1, get_map_view/1,get_row_count/1,reduce_to_count/1, reduce/3]). -include("couch_db.hrl"). @@ -26,16 +27,18 @@ name, def_lang, views, - id_btree, - current_seq, + reductions=[], % list of reduction names and id_num of view that contains it. + id_btree=nil, + current_seq=0, query_server=nil }). -record(view, {id_num, - name, - btree, - def + map_names=[], + def, + btree=nil, + reduce_funs=[] }). -record(server, @@ -47,8 +50,8 @@ start_link(RootDir) -> -get_temp_updater(DbName, Type, Src) -> - {ok, Pid} = gen_server:call(couch_view, {start_temp_updater, DbName, Type, Src}), +get_temp_updater(DbName, Type, MapSrc, RedSrc) -> + {ok, Pid} = gen_server:call(couch_view, {start_temp_updater, DbName, Type, MapSrc, RedSrc}), Pid. get_updater(DbName, GroupId) -> @@ -75,44 +78,135 @@ get_updated_group(Pid) -> end end. -fold(ViewInfo, Dir, Fun, Acc) -> - fold(ViewInfo, nil, Dir, Fun, Acc). +get_row_count(#view{btree=Bt}) -> + {ok, Reds} = couch_btree:partial_reduce(Bt, nil, nil), + {ok, reduce_to_count(Reds)}. + +get_reduce_view({temp, DbName, Type, MapSrc, RedSrc}) -> + {ok, #group{views=[View]}} = get_updated_group(get_temp_updater(DbName, Type, MapSrc, RedSrc)), + {ok, {temp_reduce, View}}; +get_reduce_view({DbName, GroupId, Name}) -> + {ok, #group{views=Views,def_lang=Lang}} = + get_updated_group(get_updater(DbName, GroupId)), + get_reduce_view0(Name, Lang, Views). + +get_reduce_view0(_Name, _Lang, []) -> + {not_found, missing_named_view}; +get_reduce_view0(Name, Lang, [#view{reduce_funs=RedFuns}=View|Rest]) -> + case get_key_pos(Name, RedFuns, 0) of + 0 -> get_reduce_view0(Name, Lang, Rest); + N -> {ok, {reduce, N, Lang, View}} + end. -fold({temp, DbName, Type, Src}, StartKey, Dir, Fun, Acc) -> - {ok, #group{views=[View]}} = get_updated_group(get_temp_updater(DbName, Type, Src)), - fold_view(View#view.btree, StartKey, Dir, Fun, Acc); -fold({DbName, GroupId, ViewName}, StartKey, Dir, Fun, Acc) -> +reduce({temp_reduce, #view{btree=Bt}}, Key1, Key2) -> + {ok, {_Count, [Reduction]}} = couch_btree:reduce(Bt, Key1, Key2), + {ok, Reduction}; + +reduce({reduce, NthRed, Lang, #view{btree=Bt, reduce_funs=RedFuns}}, Key1, Key2) -> + {ok, PartialReductions} = couch_btree:partial_reduce(Bt, Key1, Key2), + PreResultPadding = lists:duplicate(NthRed - 1, []), + PostResultPadding = lists:duplicate(length(RedFuns) - NthRed, []), + {_Name, FunSrc} = lists:nth(NthRed,RedFuns), + ReduceFun = + fun(reduce, KVs) -> + {ok, Reduced} = couch_query_servers:reduce(Lang, [FunSrc], KVs), + {0, PreResultPadding ++ Reduced ++ PostResultPadding}; + (combine, Reds) -> + UserReds = [[lists:nth(NthRed, UserRedsList)] || {_, UserRedsList} <- Reds], + {ok, Reduced} = couch_query_servers:combine(Lang, [FunSrc], UserReds), + {0, PreResultPadding ++ Reduced ++ PostResultPadding} + end, + {0, [FinalReduction]} = couch_btree:final_reduce(ReduceFun, PartialReductions), + {ok, FinalReduction}. + +get_key_pos(_Key, [], _N) -> + 0; +get_key_pos(Key, [{Key1,_Value}|_], N) when Key == Key1 -> + N + 1; +get_key_pos(Key, [_|Rest], N) -> + get_key_pos(Key, Rest, N+1). + +get_map_view({temp, DbName, Type, Src}) -> + {ok, #group{views=[View]}} = get_updated_group(get_temp_updater(DbName, Type, Src, [])), + {ok, View}; +get_map_view({DbName, GroupId, Name}) -> {ok, #group{views=Views}} = get_updated_group(get_updater(DbName, GroupId)), - Btree = get_view_btree(Views, ViewName), - fold_view(Btree, StartKey, Dir, Fun, Acc). + get_map_view0(Name, Views). + +get_map_view0(_Name, []) -> + {not_found, missing_named_view}; +get_map_view0(Name, [#view{map_names=MapNames}=View|Rest]) -> + case lists:member(Name, MapNames) of + true -> {ok, View}; + false -> get_map_view0(Name, Rest) + end. + +reduce_to_count(Reductions) -> + {Count, _} = + couch_btree:final_reduce( + fun(reduce, KVs) -> + {length(KVs), []}; + (combine, Reds) -> + {lists:sum([Count0 || {Count0, _} <- Reds]), []} + end, Reductions), + Count. + + +design_doc_to_view_group(#doc{id=Id,body={obj, Fields}}) -> + Language = proplists:get_value("language", Fields, "text/javascript"), + {obj, RawViews} = proplists:get_value("views", Fields, {obj, []}), -fold_view(Btree, StartKey, Dir, Fun, Acc) -> - TotalRowCount = couch_btree:row_count(Btree), - WrapperFun = fun({{Key, DocId}, Value}, Offset, WrapperAcc) -> - Fun(DocId, Key, Value, Offset, TotalRowCount, WrapperAcc) - end, - {ok, AccResult} = couch_btree:fold(Btree, StartKey, Dir, WrapperFun, Acc), - {ok, TotalRowCount, AccResult}. + % extract the map/reduce views from the json fields and into lists + MapViewsRaw = [{Name, Src, nil} || {Name, Src} <- RawViews, is_list(Src)], + MapReduceViewsRaw = + [{Name, + proplists:get_value("map", MRFuns), + proplists:get_value("reduce", MRFuns)} + || {Name, {obj, MRFuns}} <- RawViews], + + % add the views to a dictionary object, with the map source as the key + DictBySrc = + lists:foldl( + fun({Name, MapSrc, RedSrc}, DictBySrcAcc) -> + View = + case dict:find(MapSrc, DictBySrcAcc) of + {ok, View0} -> View0; + error -> #view{def=MapSrc} % create new view object + end, + View2 = + if RedSrc == nil -> + View#view{map_names=[Name|View#view.map_names]}; + true -> + View#view{reduce_funs=[{Name,RedSrc}|View#view.reduce_funs]} + end, + dict:store(MapSrc, View2, DictBySrcAcc) + end, dict:new(), MapViewsRaw ++ MapReduceViewsRaw), + % number the views + {Views, _N} = lists:mapfoldl( + fun({_Src, View}, N) -> + {View#view{id_num=N},N+1} + end, 0, dict:to_list(DictBySrc)), + + reset_group(#group{name=Id, views=Views, def_lang=Language}). + + +fold(#view{btree=Btree}, Dir, Fun, Acc) -> + {ok, _AccResult} = couch_btree:fold(Btree, Dir, Fun, Acc). -get_view_btree([], _ViewName) -> - throw({not_found, missing_named_view}); -get_view_btree([View | _RestViews], ViewName) when View#view.name == ViewName -> - View#view.btree; -get_view_btree([_View | RestViews], ViewName) -> - get_view_btree(RestViews, ViewName). +fold(#view{btree=Btree}, StartKey, Dir, Fun, Acc) -> + {ok, _AccResult} = couch_btree:fold(Btree, StartKey, Dir, Fun, Acc). init(RootDir) -> - UpdateNotifierFun = + couch_db_update_notifier:start_link( fun({deleted, DbName}) -> gen_server:cast(couch_view, {reset_indexes, DbName}); ({created, DbName}) -> gen_server:cast(couch_view, {reset_indexes, DbName}); (_Else) -> ok - end, - couch_db_update_notifier:start_link(UpdateNotifierFun), + end), ets:new(couch_views_by_db, [bag, private, named_table]), ets:new(couch_views_by_name, [set, protected, named_table]), ets:new(couch_views_by_updater, [set, private, named_table]), @@ -127,8 +221,8 @@ terminate(_Reason, _) -> catch ets:delete(couch_views_temp_fd_by_db). -handle_call({start_temp_updater, DbName, Lang, Query}, _From, #server{root_dir=Root}=Server) -> - <> = erlang:md5(Lang ++ Query), +handle_call({start_temp_updater, DbName, Lang, MapSrc, RedSrc}, _From, #server{root_dir=Root}=Server) -> + <> = erlang:md5(Lang ++ [0] ++ MapSrc ++ [0] ++ RedSrc), Name = lists:flatten(io_lib:format("_temp_~.36B",[SigInt])), Pid = case ets:lookup(couch_views_by_name, {DbName, Name}) of @@ -142,7 +236,8 @@ handle_call({start_temp_updater, DbName, Lang, Query}, _From, #server{root_dir=R ok end, ?LOG_DEBUG("Spawning new temp update process for db ~s.", [DbName]), - NewPid = spawn_link(couch_view, start_temp_update_loop, [DbName, Fd, Lang, Query]), + NewPid = spawn_link(couch_view, start_temp_update_loop, + [DbName, Fd, Lang, MapSrc, RedSrc]), true = ets:insert(couch_views_temp_fd_by_db, {DbName, Fd, Count + 1}), add_to_ets(NewPid, DbName, Name), NewPid; @@ -219,18 +314,22 @@ code_change(_OldVsn, State, _Extra) -> {ok, State}. -start_temp_update_loop(DbName, Fd, Lang, Query) -> +start_temp_update_loop(DbName, Fd, Lang, MapSrc, RedSrc) -> NotifyPids = get_notify_pids(1000), case couch_server:open(DbName) of {ok, Db} -> - View = #view{name="_temp", id_num=0, btree=nil, def=Query}, + View = #view{map_names=["_temp"], + id_num=0, + btree=nil, + def=MapSrc, + reduce_funs= if RedSrc==[] -> []; true -> [{"_temp", RedSrc}] end}, Group = #group{name="_temp", db=Db, views=[View], current_seq=0, def_lang=Lang, id_btree=nil}, - Group2 = disk_group_to_mem(Fd, Group), + Group2 = disk_group_to_mem(Db, Fd, Group), temp_update_loop(Group2, NotifyPids); Else -> exit(Else) @@ -242,24 +341,24 @@ temp_update_loop(Group, NotifyPids) -> garbage_collect(), temp_update_loop(Group2, get_notify_pids(10000)). + +reset_group(#group{views=Views}=Group) -> + Views2 = [View#view{btree=nil} || View <- Views], + Group#group{db=nil,fd=nil,query_server=nil,current_seq=0, + id_btree=nil,views=Views2}. + start_update_loop(RootDir, DbName, GroupId) -> % wait for a notify request before doing anything. This way, we can just % exit and any exits will be noticed by the callers. start_update_loop(RootDir, DbName, GroupId, get_notify_pids(1000)). start_update_loop(RootDir, DbName, GroupId, NotifyPids) -> - {Db, DefLang, Defs} = - case couch_server:open(DbName) of + {Db, DbGroup} = + case (catch couch_server:open(DbName)) of {ok, Db0} -> - case couch_db:open_doc(Db0, GroupId) of + case (catch couch_db:open_doc(Db0, GroupId)) of {ok, Doc} -> - case couch_doc:get_view_functions(Doc) of - none -> - delete_index_file(RootDir, DbName, GroupId), - exit({not_found, no_views_found}); - {DefLang0, Defs0} -> - {Db0, DefLang0, Defs0} - end; + {Db0, design_doc_to_view_group(Doc)}; Else -> delete_index_file(RootDir, DbName, GroupId), exit(Else) @@ -268,26 +367,48 @@ start_update_loop(RootDir, DbName, GroupId, NotifyPids) -> delete_index_file(RootDir, DbName, GroupId), exit(Else) end, - Group = open_index_file(RootDir, DbName, GroupId, DefLang, Defs), + FileName = RootDir ++ "/." ++ DbName ++ GroupId ++".view", + Group = + case couch_file:open(FileName) of + {ok, Fd} -> + case couch_file:read_header(Fd, <<$r, $c, $k, 0>>) of + {ok, ExistingDiskGroup} -> + % validate all the view definitions in the index are correct. + case reset_group(ExistingDiskGroup) == reset_group(DbGroup) of + true -> disk_group_to_mem(Db, Fd, ExistingDiskGroup); + false -> reset_file(Db, Fd, DbName, DbGroup) + end; + _ -> + reset_file(Db, Fd, DbName, DbGroup) + end; + {error, enoent} -> + case couch_file:open(FileName, [create]) of + {ok, Fd} -> reset_file(Db, Fd, DbName, DbGroup); + Error -> throw(Error) + end + end, - try update_loop(Group#group{db=Db}, NotifyPids) of - _ -> ok + update_loop(RootDir, DbName, GroupId, Group, NotifyPids). + +reset_file(Db, Fd, DbName, #group{name=Name} = DiskReadyGroup) -> + ?LOG_DEBUG("Reseting group index \"~s\" in db ~s", [Name, DbName]), + ok = couch_file:truncate(Fd, 0), + ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, DiskReadyGroup), + disk_group_to_mem(Db, Fd, DiskReadyGroup). + +update_loop(RootDir, DbName, GroupId, #group{fd=Fd}=Group, NotifyPids) -> + try update_group(Group) of + {ok, Group2} -> + ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, mem_group_to_disk(Group2)), + [Pid ! {self(), {ok, Group2}} || Pid <- NotifyPids], + garbage_collect(), + update_loop(RootDir, DbName, GroupId, Group2, get_notify_pids(100000)) catch restart -> couch_file:close(Group#group.fd), start_update_loop(RootDir, DbName, GroupId, NotifyPids ++ get_notify_pids()) end. -update_loop(#group{fd=Fd}=Group, NotifyPids) -> - {ok, Group2} = update_group(Group), - ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, mem_group_to_disk(Group2)), - [Pid ! {self(), {ok, Group2}} || Pid <- NotifyPids], - garbage_collect(), - update_loop(Group2). - -update_loop(Group) -> - update_loop(Group, get_notify_pids(100000)). - % wait for the first request to come in. get_notify_pids(Wait) -> receive @@ -351,51 +472,29 @@ nuke_dir(Dir) -> delete_index_file(RootDir, DbName, GroupId) -> file:delete(RootDir ++ "/." ++ DbName ++ GroupId ++ ".view"). - -open_index_file(RootDir, DbName, GroupId, ViewLang, ViewDefs) -> - FileName = RootDir ++ "/." ++ DbName ++ GroupId ++".view", - case couch_file:open(FileName) of - {ok, Fd} -> - case couch_file:read_header(Fd, <<$r, $c, $k, 0>>) of - {ok, #group{views=Views}=Group} -> - % validate all the view definitions in the index are correct. - case same_view_def(Views, ViewDefs) of - true -> disk_group_to_mem(Fd, Group); - false -> reset_header(GroupId, Fd, ViewLang, ViewDefs) - end; - _ -> - reset_header(GroupId, Fd, ViewLang, ViewDefs) - end; - _ -> - case couch_file:open(FileName, [create]) of - {ok, Fd} -> - reset_header(GroupId, Fd, ViewLang, ViewDefs); - Error -> - throw(Error) - end - end. - -same_view_def([], []) -> - true; -same_view_def(DiskViews, ViewDefs) when DiskViews == [] orelse ViewDefs == []-> - false; -same_view_def([#view{name=DiskName,def=DiskDef}|RestViews], [{Name, Def}|RestDefs]) -> - if DiskName == Name andalso DiskDef == Def -> - same_view_def(RestViews, RestDefs); - true -> - false - end. % Given a disk ready group structure, return an initialized, in-memory version. -disk_group_to_mem(Fd, #group{id_btree=IdState,views=Views}=Group) -> +disk_group_to_mem(Db, Fd, #group{id_btree=IdState,def_lang=Lang,views=Views}=Group) -> {ok, IdBtree} = couch_btree:open(IdState, Fd), Views2 = lists:map( - fun(#view{btree=BtreeState}=View) -> - {ok, Btree} = couch_btree:open(BtreeState, Fd, [{less, fun less_json/2}]), + fun(#view{btree=BtreeState,reduce_funs=RedFuns}=View) -> + FunSrcs = [FunSrc || {_Name, FunSrc} <- RedFuns], + ReduceFun = + fun(reduce, KVs) -> + {ok, Reduced} = couch_query_servers:reduce(Lang, FunSrcs, KVs), + {length(KVs), Reduced}; + (combine, Reds) -> + Count = lists:sum([Count0 || {Count0, _} <- Reds]), + UserReds = [UserRedsList || {_, UserRedsList} <- Reds], + {ok, Reduced} = couch_query_servers:combine(Lang, FunSrcs, UserReds), + {Count, Reduced} + end, + {ok, Btree} = couch_btree:open(BtreeState, Fd, + [{less, fun less_json/2},{reduce, ReduceFun}]), View#view{btree=Btree} end, Views), - Group#group{fd=Fd, id_btree=IdBtree, views=Views2}. + Group#group{db=Db, fd=Fd, id_btree=IdBtree, views=Views2}. % Given an initialized, in-memory group structure, return a disk ready version. mem_group_to_disk(#group{id_btree=IdBtree,views=Views}=Group) -> @@ -405,23 +504,7 @@ mem_group_to_disk(#group{id_btree=IdBtree,views=Views}=Group) -> View#view{btree=State} end, Views), - Group#group{fd=nil, id_btree=couch_btree:get_state(IdBtree), views=Views2}. - -reset_header(GroupId, Fd, DefLanguage, NamedViews) -> - couch_file:truncate(Fd, 0), - {Views, _N} = lists:mapfoldl( - fun({Name, Definiton}, N) -> - {#view{name=Name, id_num=N, btree=nil, def=Definiton}, N+1} - end, - 0, NamedViews), - Group = #group{name=GroupId, - fd=Fd, - views=Views, - current_seq=0, - def_lang=DefLanguage, - id_btree=nil}, - ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, Group), - disk_group_to_mem(Fd, Group). + Group#group{db=nil, fd=nil, id_btree=couch_btree:get_state(IdBtree), views=Views2}. @@ -506,17 +589,12 @@ process_doc(Db, DocInfo, {Docs, #group{name=GroupId}=Group, ViewKVs, DocIdViewId % anything in the definition changed. case couch_db:open_doc(Db, DocInfo) of {ok, Doc} -> - case couch_doc:get_view_functions(Doc) of - none -> - throw(restart); - {DefLang, NewDefs} -> - case Group#group.def_lang == DefLang andalso same_view_def(Group#group.views, NewDefs) of - true -> - % nothing changed, keeping on computing - {ok, {Docs, Group, ViewKVs, DocIdViewIdKeys, Seq}}; - false -> - throw(restart) - end + case design_doc_to_view_group(Doc) == reset_group(Group) of + true -> + % nothing changed, keeping on computing + {ok, {Docs, Group, ViewKVs, DocIdViewIdKeys, Seq}}; + false -> + throw(restart) end; {not_found, deleted} -> throw(restart) -- cgit v1.2.3