From 5a9321814a727e8c010bf83f05572a341d55f26a Mon Sep 17 00:00:00 2001 From: John Christopher Anderson Date: Wed, 10 Dec 2008 01:13:17 +0000 Subject: view group state gen_server. thanks damien and davisp. git-svn-id: https://svn.apache.org/repos/asf/incubator/couchdb/trunk@724946 13f79535-47bb-0310-9956-ffa450edef68 --- src/couchdb/couch_view.erl | 551 +++++---------------------------------------- 1 file changed, 58 insertions(+), 493 deletions(-) (limited to 'src/couchdb/couch_view.erl') diff --git a/src/couchdb/couch_view.erl b/src/couchdb/couch_view.erl index 4ebbb136..7c7730a7 100644 --- a/src/couchdb/couch_view.erl +++ b/src/couchdb/couch_view.erl @@ -13,45 +13,12 @@ -module(couch_view). -behaviour(gen_server). --export([start_link/0,fold/4,fold/5,less_json/2, start_update_loop/3, start_temp_update_loop/5]). +-export([start_link/0,fold/4,fold/5,less_json/2,less_json_keys/2,expand_dups/2,detuple_kvs/2]). -export([init/1,terminate/2,handle_call/3,handle_cast/2,handle_info/2,code_change/3]). -export([get_reduce_view/1, get_map_view/1,get_row_count/1,reduce_to_count/1, fold_reduce/7]). -include("couch_db.hrl"). - --record(group, - {sig=nil, - db=nil, - fd=nil, - name, - def_lang, - views, - id_btree=nil, - current_seq=0, - purge_seq=0, - query_server=nil - }). - --record(view, - {id_num, - map_names=[], - def, - btree=nil, - reduce_funs=[] - }). - --record(server, - {root_dir - }). - --record(index_header, - {seq=0, - purge_seq=0, - id_btree_state=nil, - view_states=nil - }). - start_link() -> gen_server:start_link({local, couch_view}, couch_view, [], []). @@ -59,41 +26,30 @@ get_temp_updater(DbName, Type, MapSrc, RedSrc) -> {ok, Pid} = gen_server:call(couch_view, {start_temp_updater, DbName, Type, MapSrc, RedSrc}), Pid. -get_updater(DbName, GroupId) -> - {ok, Pid} = gen_server:call(couch_view, {start_updater, DbName, GroupId}), +get_group_server(DbName, GroupId) -> + {ok, Pid} = gen_server:call(couch_view, {start_group_server, DbName, GroupId}), Pid. -get_updated_group(Pid) -> - Mref = erlang:monitor(process, Pid), - receive - {'DOWN', Mref, _, _, Reason} -> - throw(Reason) - after 0 -> - Pid ! {self(), get_updated}, - receive - {Pid, Response} -> - erlang:demonitor(Mref), - receive - {'DOWN', Mref, _, _, _} -> ok - after 0 -> ok - end, - Response; - {'DOWN', Mref, _, _, Reason} -> - throw(Reason) - end - end. +get_updated_group(DbName, GroupId, Update) -> + couch_view_group:request_group(get_group_server(DbName, GroupId), seq_for_update(DbName, Update)). + +get_updated_group(temp, DbName, Type, MapSrc, RedSrc, Update) -> + couch_view_group:request_group(get_temp_updater(DbName, Type, MapSrc, RedSrc), seq_for_update(DbName, Update)). get_row_count(#view{btree=Bt}) -> {ok, {Count, _Reds}} = couch_btree:full_reduce(Bt), {ok, Count}. get_reduce_view({temp, DbName, Type, MapSrc, RedSrc}) -> - {ok, #group{views=[View]}} = get_updated_group(get_temp_updater(DbName, Type, MapSrc, RedSrc)), + {ok, #group{views=[View]}} = get_updated_group(temp, DbName, Type, MapSrc, RedSrc, true), {ok, {temp_reduce, View}}; get_reduce_view({DbName, GroupId, Name}) -> - {ok, #group{views=Views,def_lang=Lang}} = - get_updated_group(get_updater(DbName, GroupId)), - get_reduce_view0(Name, Lang, Views). + case get_updated_group(DbName, GroupId, true) of + {error, Reason} -> + Reason; + {ok, #group{views=Views,def_lang=Lang}} -> + get_reduce_view0(Name, Lang, Views) + end. get_reduce_view0(_Name, _Lang, []) -> {not_found, missing_named_view}; @@ -153,13 +109,26 @@ get_key_pos(Key, [{Key1,_Value}|_], N) when Key == Key1 -> N + 1; get_key_pos(Key, [_|Rest], N) -> get_key_pos(Key, Rest, N+1). + +seq_for_update(DbName, Update) -> + case Update of + true -> + {ok, #db{update_seq=CurrentSeq}} = couch_db:open(DbName, []), + CurrentSeq; + _Else -> + 0 + end. get_map_view({temp, DbName, Type, Src}) -> - {ok, #group{views=[View]}} = get_updated_group(get_temp_updater(DbName, Type, Src, [])), + {ok, #group{views=[View]}} = get_updated_group(temp, DbName, Type, Src, [], true), {ok, View}; -get_map_view({DbName, GroupId, Name}) -> - {ok, #group{views=Views}} = get_updated_group(get_updater(DbName, GroupId)), - get_map_view0(Name, Views). +get_map_view({DbName, GroupId, Name, Update}) -> + case get_updated_group(DbName, GroupId, Update) of + {error, Reason} -> + Reason; + {ok, #group{views=Views}} -> + get_map_view0(Name, Views) + end. get_map_view0(_Name, []) -> {not_found, missing_named_view}; @@ -183,37 +152,6 @@ reduce_to_count(Reductions) -> Count. -design_doc_to_view_group(#doc{id=Id,body={Fields}}) -> - Language = proplists:get_value(<<"language">>, Fields, <<"javascript">>), - {RawViews} = proplists:get_value(<<"views">>, Fields, {[]}), - - % add the views to a dictionary object, with the map source as the key - DictBySrc = - lists:foldl( - fun({Name, {MRFuns}}, DictBySrcAcc) -> - MapSrc = proplists:get_value(<<"map">>, MRFuns), - RedSrc = proplists:get_value(<<"reduce">>, MRFuns, null), - View = - case dict:find(MapSrc, DictBySrcAcc) of - {ok, View0} -> View0; - error -> #view{def=MapSrc} % create new view object - end, - View2 = - if RedSrc == null -> - View#view{map_names=[Name|View#view.map_names]}; - true -> - View#view{reduce_funs=[{Name,RedSrc}|View#view.reduce_funs]} - end, - dict:store(MapSrc, View2, DictBySrcAcc) - end, dict:new(), RawViews), - % number the views - {Views, _N} = lists:mapfoldl( - fun({_Src, View}, N) -> - {View#view{id_num=N},N+1} - end, 0, dict:to_list(DictBySrc)), - - Group = #group{name=Id, views=Views, def_lang=Language}, - Group#group{sig=erlang:md5(term_to_binary(Group))}. fold_fun(_Fun, [], _, Acc) -> {ok, Acc}; @@ -253,10 +191,10 @@ init([]) -> (_Else) -> ok end), - ets:new(couch_views_by_db, [bag, private, named_table]), - ets:new(couch_views_by_name, [set, protected, named_table]), - ets:new(couch_views_by_updater, [set, private, named_table]), - ets:new(couch_views_temp_fd_by_db, [set, protected, named_table]), + ets:new(couch_groups_by_db, [bag, private, named_table]), + ets:new(group_servers_by_name, [set, protected, named_table]), + ets:new(couch_groups_by_updater, [set, private, named_table]), + ets:new(couch_temp_group_fd_by_db, [set, protected, named_table]), process_flag(trap_exit, true), {ok, #server{root_dir=RootDir}}. @@ -268,9 +206,9 @@ handle_call({start_temp_updater, DbName, Lang, MapSrc, RedSrc}, _From, #server{r <> = erlang:md5(term_to_binary({Lang, MapSrc, RedSrc})), Name = lists:flatten(io_lib:format("_temp_~.36B",[SigInt])), Pid = - case ets:lookup(couch_views_by_name, {DbName, Name}) of + case ets:lookup(group_servers_by_name, {DbName, Name}) of [] -> - case ets:lookup(couch_views_temp_fd_by_db, DbName) of + case ets:lookup(couch_temp_group_fd_by_db, DbName) of [] -> FileName = Root ++ "/." ++ binary_to_list(DbName) ++ "_temp", {ok, Fd} = couch_file:open(FileName, [create, overwrite]), @@ -279,21 +217,20 @@ handle_call({start_temp_updater, DbName, Lang, MapSrc, RedSrc}, _From, #server{r ok end, ?LOG_DEBUG("Spawning new temp update process for db ~s.", [DbName]), - NewPid = spawn_link(couch_view, start_temp_update_loop, - [DbName, Fd, Lang, MapSrc, RedSrc]), - true = ets:insert(couch_views_temp_fd_by_db, {DbName, Fd, Count + 1}), + {ok, NewPid} = couch_view_group:start_link({DbName, Fd, Lang, MapSrc, RedSrc}), + true = ets:insert(couch_temp_group_fd_by_db, {DbName, Fd, Count + 1}), add_to_ets(NewPid, DbName, Name), NewPid; [{_, ExistingPid0}] -> ExistingPid0 end, {reply, {ok, Pid}, Server}; -handle_call({start_updater, DbName, GroupId}, _From, #server{root_dir=Root}=Server) -> +handle_call({start_group_server, DbName, GroupId}, _From, #server{root_dir=Root}=Server) -> Pid = - case ets:lookup(couch_views_by_name, {DbName, GroupId}) of + case ets:lookup(group_servers_by_name, {DbName, GroupId}) of [] -> - ?LOG_DEBUG("Spawning new update process for view group ~s in database ~s.", [GroupId, DbName]), - NewPid = spawn_link(couch_view, start_update_loop, [Root, DbName, GroupId]), + ?LOG_DEBUG("Spawning new group server for view group ~s in database ~s.", [GroupId, DbName]), + {ok, NewPid} = couch_view_group:start_link({Root, DbName, GroupId}), add_to_ets(NewPid, DbName, GroupId), NewPid; [{_, ExistingPid0}] -> @@ -303,11 +240,11 @@ handle_call({start_updater, DbName, GroupId}, _From, #server{root_dir=Root}=Serv handle_cast({reset_indexes, DbName}, #server{root_dir=Root}=Server) -> % shutdown all the updaters - Names = ets:lookup(couch_views_by_db, DbName), + Names = ets:lookup(couch_groups_by_db, DbName), lists:foreach( fun({_DbName, GroupId}) -> ?LOG_DEBUG("Killing update process for view group ~s. in database ~s.", [GroupId, DbName]), - [{_, Pid}] = ets:lookup(couch_views_by_name, {DbName, GroupId}), + [{_, Pid}] = ets:lookup(group_servers_by_name, {DbName, GroupId}), exit(Pid, kill), receive {'EXIT', Pid, _} -> delete_from_ets(Pid, DbName, GroupId) @@ -318,22 +255,23 @@ handle_cast({reset_indexes, DbName}, #server{root_dir=Root}=Server) -> {noreply, Server}. handle_info({'EXIT', _FromPid, normal}, Server) -> - {noreply, Server}; + {noreply, Server}; handle_info({'EXIT', FromPid, Reason}, #server{root_dir=RootDir}=Server) -> - case ets:lookup(couch_views_by_updater, FromPid) of + ?LOG_DEBUG("Exit from process: ~p", [{FromPid, Reason}]), + case ets:lookup(couch_groups_by_updater, FromPid) of [] -> % non-updater linked process must have died, we propagate the error ?LOG_ERROR("Exit on non-updater process: ~p", [Reason]), exit(Reason); [{_, {DbName, "_temp_" ++ _ = GroupId}}] -> delete_from_ets(FromPid, DbName, GroupId), - [{_, Fd, Count}] = ets:lookup(couch_views_temp_fd_by_db, DbName), + [{_, Fd, Count}] = ets:lookup(couch_temp_group_fd_by_db, DbName), case Count of 1 -> % Last ref couch_file:close(Fd), file:delete(RootDir ++ "/." ++ binary_to_list(DbName) ++ "_temp"), - true = ets:delete(couch_views_temp_fd_by_db, DbName); + true = ets:delete(couch_temp_group_fd_by_db, DbName); _ -> - true = ets:insert(couch_views_temp_fd_by_db, {DbName, Fd, Count - 1}) + true = ets:insert(couch_temp_group_fd_by_db, {DbName, Fd, Count - 1}) end; [{_, {DbName, GroupId}}] -> delete_from_ets(FromPid, DbName, GroupId) @@ -344,225 +282,21 @@ handle_info(Msg, _Server) -> exit({error, Msg}). add_to_ets(Pid, DbName, GroupId) -> - true = ets:insert(couch_views_by_updater, {Pid, {DbName, GroupId}}), - true = ets:insert(couch_views_by_name, {{DbName, GroupId}, Pid}), - true = ets:insert(couch_views_by_db, {DbName, GroupId}). + true = ets:insert(couch_groups_by_updater, {Pid, {DbName, GroupId}}), + true = ets:insert(group_servers_by_name, {{DbName, GroupId}, Pid}), + true = ets:insert(couch_groups_by_db, {DbName, GroupId}). delete_from_ets(Pid, DbName, GroupId) -> - true = ets:delete(couch_views_by_updater, Pid), - true = ets:delete(couch_views_by_name, {DbName, GroupId}), - true = ets:delete_object(couch_views_by_db, {DbName, GroupId}). + true = ets:delete(couch_groups_by_updater, Pid), + true = ets:delete(group_servers_by_name, {DbName, GroupId}), + true = ets:delete_object(couch_groups_by_db, {DbName, GroupId}). code_change(_OldVsn, State, _Extra) -> {ok, State}. -start_temp_update_loop(DbName, Fd, Lang, MapSrc, RedSrc) -> - NotifyPids = get_notify_pids(1000), - case couch_db:open(DbName, []) of - {ok, Db} -> - View = #view{map_names=["_temp"], - id_num=0, - btree=nil, - def=MapSrc, - reduce_funs= if RedSrc==[] -> []; true -> [{"_temp", RedSrc}] end}, - Group = #group{name="_temp", - db=Db, - views=[View], - current_seq=0, - def_lang=Lang, - id_btree=nil}, - Group2 = init_group(Db, Fd, Group,nil), - couch_db:monitor(Db), - couch_db:close(Db), - temp_update_loop(DbName, Group2, NotifyPids); - Else -> - exit(Else) - end. -temp_update_loop(DbName, Group, NotifyPids) -> - {ok, Db} = couch_db:open(DbName, []), - {_Updated, Group2} = update_group(Group#group{db=Db}), - couch_db:close(Db), - [Pid ! {self(), {ok, Group2}} || Pid <- NotifyPids], - garbage_collect(), - temp_update_loop(DbName, Group2, get_notify_pids(10000)). - - -reset_group(#group{views=Views}=Group) -> - Views2 = [View#view{btree=nil} || View <- Views], - Group#group{db=nil,fd=nil,query_server=nil,current_seq=0, - id_btree=nil,views=Views2}. - -start_update_loop(RootDir, DbName, GroupId) -> - % wait for a notify request before doing anything. This way, we can just - % exit and any exits will be noticed by the callers. - start_update_loop(RootDir, DbName, GroupId, get_notify_pids(1000)). - -start_update_loop(RootDir, DbName, GroupId, NotifyPids) -> - {Db, Group} = - case (catch couch_db:open(DbName, [])) of - {ok, Db0} -> - case (catch couch_db:open_doc(Db0, GroupId)) of - {ok, Doc} -> - {Db0, design_doc_to_view_group(Doc)}; - Else -> - delete_index_file(RootDir, DbName, GroupId), - exit(Else) - end; - Else -> - delete_index_file(RootDir, DbName, GroupId), - exit(Else) - end, - FileName = RootDir ++ "/." ++ binary_to_list(DbName) ++ - binary_to_list(GroupId) ++".view", - Group2 = - case couch_file:open(FileName) of - {ok, Fd} -> - Sig = Group#group.sig, - case (catch couch_file:read_header(Fd, <<$r, $c, $k, 0>>)) of - {ok, {Sig, #index_header{purge_seq=PurgeSeq}=HeaderInfo}} -> - % sigs match! - DbPurgeSeq = couch_db:get_purge_seq(Db), - case (PurgeSeq == DbPurgeSeq) or ((PurgeSeq + 1) == DbPurgeSeq) of - true -> - % We can only use index with the same, or next purge seq as the - % db. - init_group(Db, Fd, Group, HeaderInfo); - false -> - reset_file(Db, Fd, DbName, Group) - end; - _ -> - reset_file(Db, Fd, DbName, Group) - end; - {error, enoent} -> - case couch_file:open(FileName, [create]) of - {ok, Fd} -> reset_file(Db, Fd, DbName, Group); - Error -> throw(Error) - end - end, - - couch_db:monitor(Db), - couch_db:close(Db), - update_loop(RootDir, DbName, GroupId, Group2, NotifyPids). - -reset_file(Db, Fd, DbName, #group{sig=Sig,name=Name} = Group) -> - ?LOG_DEBUG("Reseting group index \"~s\" in db ~s", [Name, DbName]), - ok = couch_file:truncate(Fd, 0), - ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, {Sig, nil}), - init_group(Db, Fd, reset_group(Group), nil). - -update_loop(RootDir, DbName, GroupId, #group{sig=Sig,fd=Fd}=Group, NotifyPids) -> - {ok, Db}= couch_db:open(DbName, []), - Result = - try - update_group(Group#group{db=Db}) - catch - throw: restart -> restart - after - couch_db:close(Db) - end, - case Result of - {same, Group2} -> - [Pid ! {self(), {ok, Group2}} || Pid <- NotifyPids], - update_loop(RootDir, DbName, GroupId, Group2, get_notify_pids(100000)); - {updated, Group2} -> - HeaderData = {Sig, get_index_header_data(Group2)}, - ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, HeaderData), - [Pid ! {self(), {ok, Group2}} || Pid <- NotifyPids], - garbage_collect(), - update_loop(RootDir, DbName, GroupId, Group2, get_notify_pids(100000)); - restart -> - couch_file:close(Group#group.fd), - start_update_loop(RootDir, DbName, GroupId, NotifyPids ++ get_notify_pids()) - end. -% wait for the first request to come in. -get_notify_pids(Wait) -> - receive - {Pid, get_updated} -> - [Pid | get_notify_pids()]; - {'DOWN', _MonitorRef, _Type, _Pid, _Info} -> - ?LOG_DEBUG("View monitor received parent db shutdown notification. Shutting down instance.", []), - exit(db_shutdown); - Else -> - ?LOG_ERROR("Unexpected message in view updater: ~p", [Else]), - exit({error, Else}) - after Wait -> - exit(wait_timeout) - end. -% then keep getting all available and return. -get_notify_pids() -> - receive - {Pid, get_updated} -> - [Pid | get_notify_pids()] - after 0 -> - [] - end. - -purge(#group{db=Db, views=Views, id_btree=IdBtree}=Group) -> - {ok, PurgedIdsRevs} = couch_db:get_last_purged(Db), - Ids = [Id || {Id, _Revs} <- PurgedIdsRevs], - {ok, Lookups, IdBtree2} = couch_btree:query_modify(IdBtree, Ids, [], Ids), - - % now populate the dictionary with all the keys to delete - ViewKeysToRemoveDict = lists:foldl( - fun({ok,{DocId,ViewNumRowKeys}}, ViewDictAcc) -> - lists:foldl( - fun({ViewNum, RowKey}, ViewDictAcc2) -> - dict:append(ViewNum, {RowKey, DocId}, ViewDictAcc2) - end, ViewDictAcc, ViewNumRowKeys); - ({not_found, _}, ViewDictAcc) -> - ViewDictAcc - end, dict:new(), Lookups), - - % Now remove the values from the btrees - Views2 = lists:map( - fun(#view{id_num=Num,btree=Btree}=View) -> - case dict:find(Num, ViewKeysToRemoveDict) of - {ok, RemoveKeys} -> - {ok, Btree2} = couch_btree:add_remove(Btree, [], RemoveKeys), - View#view{btree=Btree2}; - error -> % no keys to remove in this view - View - end - end, Views), - Group#group{id_btree=IdBtree2, - views=Views2, - purge_seq=couch_db:get_purge_seq(Db)}. - - -update_group(#group{db=Db,current_seq=CurrentSeq, - purge_seq=GroupPurgeSeq}=Group) -> - ViewEmptyKVs = [{View, []} || View <- Group#group.views], - % compute on all docs modified since we last computed. - DbPurgeSeq = couch_db:get_purge_seq(Db), - Group2 = - case DbPurgeSeq of - GroupPurgeSeq -> - Group; - DbPurgeSeq when GroupPurgeSeq + 1 == DbPurgeSeq -> - purge(Group); - _ -> - throw(restart) - end, - {ok, {UncomputedDocs, Group3, ViewKVsToAdd, DocIdViewIdKeys, NewSeq}} - = couch_db:enum_docs_since( - Db, - CurrentSeq, - fun(DocInfo, _, Acc) -> process_doc(Db, DocInfo, Acc) end, - {[], Group2, ViewEmptyKVs, [], CurrentSeq} - ), - {Group4, Results} = view_compute(Group3, UncomputedDocs), - {ViewKVsToAdd2, DocIdViewIdKeys2} = view_insert_query_results(UncomputedDocs, Results, ViewKVsToAdd, DocIdViewIdKeys), - couch_query_servers:stop_doc_map(Group4#group.query_server), - if CurrentSeq /= NewSeq -> - {ok, Group5} = write_changes(Group4, ViewKVsToAdd2, DocIdViewIdKeys2, NewSeq), - {updated, Group5#group{query_server=nil}}; - true -> - {same, Group4#group{query_server=nil}} - end. - delete_index_dir(RootDir, DbName) -> nuke_dir(RootDir ++ "/." ++ binary_to_list(DbName) ++ "_design"). @@ -583,50 +317,6 @@ nuke_dir(Dir) -> ok = file:del_dir(Dir) end. -delete_index_file(RootDir, DbName, GroupId) -> - file:delete(RootDir ++ "/." ++ binary_to_list(DbName) - ++ binary_to_list(GroupId) ++ ".view"). - -init_group(Db, Fd, #group{views=Views}=Group, nil) -> - init_group(Db, Fd, Group, - #index_header{seq=0, purge_seq=couch_db:get_purge_seq(Db), - id_btree_state=nil, view_states=[nil || _ <- Views]}); -init_group(Db, Fd, #group{def_lang=Lang,views=Views}=Group, IndexHeader) -> - #index_header{seq=Seq, purge_seq=PurgeSeq, - id_btree_state=IdBtreeState, view_states=ViewStates} = IndexHeader, - {ok, IdBtree} = couch_btree:open(IdBtreeState, Fd), - Views2 = lists:zipwith( - fun(BtreeState, #view{reduce_funs=RedFuns}=View) -> - FunSrcs = [FunSrc || {_Name, FunSrc} <- RedFuns], - ReduceFun = - fun(reduce, KVs) -> - KVs2 = expand_dups(KVs,[]), - KVs3 = detuple_kvs(KVs2,[]), - {ok, Reduced} = couch_query_servers:reduce(Lang, FunSrcs, KVs3), - {length(KVs3), Reduced}; - (rereduce, Reds) -> - Count = lists:sum([Count0 || {Count0, _} <- Reds]), - UserReds = [UserRedsList || {_, UserRedsList} <- Reds], - {ok, Reduced} = couch_query_servers:rereduce(Lang, FunSrcs, UserReds), - {Count, Reduced} - end, - {ok, Btree} = couch_btree:open(BtreeState, Fd, - [{less, fun less_json_keys/2},{reduce, ReduceFun}]), - View#view{btree=Btree} - end, - ViewStates, Views), - Group#group{db=Db, fd=Fd, current_seq=Seq, purge_seq=PurgeSeq, - id_btree=IdBtree, views=Views2}. - - -get_index_header_data(#group{current_seq=Seq, purge_seq=PurgeSeq, - id_btree=IdBtree,views=Views}) -> - ViewStates = [couch_btree:get_state(Btree) || #view{btree=Btree} <- Views], - #index_header{seq=Seq, - purge_seq=PurgeSeq, - id_btree_state=couch_btree:get_state(IdBtree), - view_states=ViewStates}. - % keys come back in the language of btree - tuples. less_json_keys(A, B) -> less_json(tuple_to_list(A), tuple_to_list(B)). @@ -703,129 +393,4 @@ less_list([A|RestA], [B|RestB]) -> end end. -process_doc(Db, DocInfo, {Docs, #group{sig=Sig,name=GroupId}=Group, ViewKVs, DocIdViewIdKeys, _LastSeq}) -> - % This fun computes once for each document - #doc_info{id=DocId, update_seq=Seq, deleted=Deleted} = DocInfo, - case DocId of - GroupId -> - % uh oh. this is the design doc with our definitions. See if - % anything in the definition changed. - case couch_db:open_doc(Db, DocInfo) of - {ok, Doc} -> - case design_doc_to_view_group(Doc) of - #group{sig=Sig} -> - % The same md5 signature, keep on computing - {ok, {Docs, Group, ViewKVs, DocIdViewIdKeys, Seq}}; - _ -> - throw(restart) - end; - {not_found, deleted} -> - throw(restart) - end; - <> -> % we skip design docs - {ok, {Docs, Group, ViewKVs, DocIdViewIdKeys, Seq}}; - _ -> - {Docs2, DocIdViewIdKeys2} = - if Deleted -> - {Docs, [{DocId, []} | DocIdViewIdKeys]}; - true -> - {ok, Doc} = couch_db:open_doc(Db, DocInfo, [conflicts, deleted_conflicts]), - {[Doc | Docs], DocIdViewIdKeys} - end, - case couch_util:should_flush() of - true -> - {Group1, Results} = view_compute(Group, Docs2), - {ViewKVs3, DocIdViewIdKeys3} = view_insert_query_results(Docs2, Results, ViewKVs, DocIdViewIdKeys2), - {ok, Group2} = write_changes(Group1, ViewKVs3, DocIdViewIdKeys3, Seq), - garbage_collect(), - ViewEmptyKeyValues = [{View, []} || View <- Group2#group.views], - {ok, {[], Group2, ViewEmptyKeyValues, [], Seq}}; - false -> - {ok, {Docs2, Group, ViewKVs, DocIdViewIdKeys2, Seq}} - end - end. -view_insert_query_results([], [], ViewKVs, DocIdViewIdKeysAcc) -> - {ViewKVs, DocIdViewIdKeysAcc}; -view_insert_query_results([Doc|RestDocs], [QueryResults | RestResults], ViewKVs, DocIdViewIdKeysAcc) -> - {NewViewKVs, NewViewIdKeys} = view_insert_doc_query_results(Doc, QueryResults, ViewKVs, [], []), - NewDocIdViewIdKeys = [{Doc#doc.id, NewViewIdKeys} | DocIdViewIdKeysAcc], - view_insert_query_results(RestDocs, RestResults, NewViewKVs, NewDocIdViewIdKeys). - - -view_insert_doc_query_results(_Doc, [], [], ViewKVsAcc, ViewIdKeysAcc) -> - {lists:reverse(ViewKVsAcc), lists:reverse(ViewIdKeysAcc)}; -view_insert_doc_query_results(#doc{id=DocId}=Doc, [ResultKVs|RestResults], [{View, KVs}|RestViewKVs], ViewKVsAcc, ViewIdKeysAcc) -> - % Take any identical keys and combine the values - ResultKVs2 = lists:foldl( - fun({Key,Value}, [{PrevKey,PrevVal}|AccRest]) -> - case Key == PrevKey of - true -> - case PrevVal of - {dups, Dups} -> - [{PrevKey, {dups, [Value|Dups]}} | AccRest]; - _ -> - [{PrevKey, {dups, [Value,PrevVal]}} | AccRest] - end; - false -> - [{Key,Value},{PrevKey,PrevVal}|AccRest] - end; - (KV, []) -> - [KV] - end, [], lists:sort(ResultKVs)), - NewKVs = [{{Key, DocId}, Value} || {Key, Value} <- ResultKVs2], - NewViewKVsAcc = [{View, NewKVs ++ KVs} | ViewKVsAcc], - NewViewIdKeys = [{View#view.id_num, Key} || {Key, _Value} <- ResultKVs2], - NewViewIdKeysAcc = NewViewIdKeys ++ ViewIdKeysAcc, - view_insert_doc_query_results(Doc, RestResults, RestViewKVs, NewViewKVsAcc, NewViewIdKeysAcc). - -view_compute(Group, []) -> - {Group, []}; -view_compute(#group{def_lang=DefLang, query_server=QueryServerIn}=Group, Docs) -> - {ok, QueryServer} = - case QueryServerIn of - nil -> % doc map not started - Definitions = [View#view.def || View <- Group#group.views], - couch_query_servers:start_doc_map(DefLang, Definitions); - _ -> - {ok, QueryServerIn} - end, - {ok, Results} = couch_query_servers:map_docs(QueryServer, Docs), - {Group#group{query_server=QueryServer}, Results}. - - - -write_changes(Group, ViewKeyValuesToAdd, DocIdViewIdKeys, NewSeq) -> - #group{id_btree=IdBtree} = Group, - - AddDocIdViewIdKeys = [{DocId, ViewIdKeys} || {DocId, ViewIdKeys} <- DocIdViewIdKeys, ViewIdKeys /= []], - RemoveDocIds = [DocId || {DocId, ViewIdKeys} <- DocIdViewIdKeys, ViewIdKeys == []], - LookupDocIds = [DocId || {DocId, _ViewIdKeys} <- DocIdViewIdKeys], - {ok, LookupResults, IdBtree2} - = couch_btree:query_modify(IdBtree, LookupDocIds, AddDocIdViewIdKeys, RemoveDocIds), - KeysToRemoveByView = lists:foldl( - fun(LookupResult, KeysToRemoveByViewAcc) -> - case LookupResult of - {ok, {DocId, ViewIdKeys}} -> - lists:foldl( - fun({ViewId, Key}, KeysToRemoveByViewAcc2) -> - dict:append(ViewId, {Key, DocId}, KeysToRemoveByViewAcc2) - end, - KeysToRemoveByViewAcc, ViewIdKeys); - {not_found, _} -> - KeysToRemoveByViewAcc - end - end, - dict:new(), LookupResults), - - Views2 = [ - begin - KeysToRemove = couch_util:dict_find(View#view.id_num, KeysToRemoveByView, []), - {ok, ViewBtree2} = couch_btree:add_remove(View#view.btree, AddKeyValues, KeysToRemove), - View#view{btree = ViewBtree2} - end - || - {View, AddKeyValues} <- ViewKeyValuesToAdd - ], - Group2 = Group#group{views=Views2, current_seq=NewSeq, id_btree=IdBtree2}, - {ok, Group2}. -- cgit v1.2.3