From 56a3ee28e006aa42150482e1c3f91dc1906273f9 Mon Sep 17 00:00:00 2001 From: "Damien F. Katz" Date: Fri, 12 Dec 2008 05:23:37 +0000 Subject: modifications to view server to keep the file descriptor open for the life of the view group. git-svn-id: https://svn.apache.org/repos/asf/incubator/couchdb/trunk@725909 13f79535-47bb-0310-9956-ffa450edef68 --- src/couchdb/couch_view_updater.erl | 225 +++++-------------------------------- 1 file changed, 30 insertions(+), 195 deletions(-) (limited to 'src/couchdb/couch_view_updater.erl') diff --git a/src/couchdb/couch_view_updater.erl b/src/couchdb/couch_view_updater.erl index 35bc5ffe..0532258c 100644 --- a/src/couchdb/couch_view_updater.erl +++ b/src/couchdb/couch_view_updater.erl @@ -12,83 +12,49 @@ -module(couch_view_updater). --export([update/4, temp_update/6]). +-export([update/1]). -include("couch_db.hrl"). - - -update(RootDir, DbName, GroupId, NotifyPid) -> - {ok, #group{sig=Sig,fd=Fd}=Group} = prepare_group(RootDir, DbName, GroupId), - {ok, Db} = couch_db:open(DbName, []), - Result = update_group(Group#group{db=Db}), - couch_db:close(Db), - case Result of - {same, Group2} -> - gen_server:cast(NotifyPid, {new_group, Group2}); - {updated, Group2} -> - HeaderData = {Sig, get_index_header_data(Group2)}, - ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, HeaderData), - gen_server:cast(NotifyPid, {new_group, Group2}) +update(#group{db=Db,current_seq=Seq,purge_seq=PurgeSeq, + commit_fun=CommitFun}=Group) -> + ?LOG_DEBUG("Starting index update.",[]), + DbPurgeSeq = couch_db:get_purge_seq(Db), + Group2 = + if DbPurgeSeq == PurgeSeq -> + Group; + DbPurgeSeq == PurgeSeq + 1 -> + ?LOG_DEBUG("Purging entries from view index.",[]), + purge_index(Group); + true -> + ?LOG_DEBUG("Resetting view index due to lost purge entries.",[]), + exit(reset) end, - garbage_collect(). -temp_update(DbName, Fd, Lang, MapSrc, RedSrc, NotifyPid) -> - case couch_db:open(DbName, []) of - {ok, Db} -> - View = #view{map_names=["_temp"], - id_num=0, - btree=nil, - def=MapSrc, - reduce_funs= if RedSrc==[] -> []; true -> [{"_temp", RedSrc}] end}, - Group = #group{name="_temp", - db=Db, - views=[View], - current_seq=0, - def_lang=Lang, - id_btree=nil}, - Group2 = init_group(Db, Fd, Group,nil), - couch_db:monitor(Db), - {_Updated, Group3} = update_group(Group2#group{db=Db}), - couch_db:close(Db), - gen_server:cast(NotifyPid, {new_group, Group3}), - garbage_collect(); - Else -> - exit(Else) - end. - - -update_group(#group{db=Db,current_seq=CurrentSeq}=Group) -> - ViewEmptyKVs = [{View, []} || View <- Group#group.views], + ViewEmptyKVs = [{View, []} || View <- Group2#group.views], % compute on all docs modified since we last computed. {ok, {UncomputedDocs, Group3, ViewKVsToAdd, DocIdViewIdKeys}} = couch_db:enum_docs_since( Db, - CurrentSeq, + Seq, fun(DocInfo, _, Acc) -> process_doc(Db, DocInfo, Acc) end, - {[], Group, ViewEmptyKVs, []} + {[], Group2, ViewEmptyKVs, []} ), {Group4, Results} = view_compute(Group3, UncomputedDocs), - {ViewKVsToAdd2, DocIdViewIdKeys2} = view_insert_query_results(UncomputedDocs, Results, ViewKVsToAdd, DocIdViewIdKeys), + {ViewKVsToAdd2, DocIdViewIdKeys2} = view_insert_query_results( + UncomputedDocs, Results, ViewKVsToAdd, DocIdViewIdKeys), couch_query_servers:stop_doc_map(Group4#group.query_server), NewSeq = couch_db:get_update_seq(Db), - if CurrentSeq /= NewSeq -> - {ok, Group5} = write_changes(Group4, ViewKVsToAdd2, DocIdViewIdKeys2, NewSeq), - {updated, Group5#group{query_server=nil}}; + if Seq /= NewSeq -> + {ok, Group5} = write_changes(Group4, ViewKVsToAdd2, DocIdViewIdKeys2, + NewSeq), + ok = CommitFun(Group5), + exit({new_group, Group5#group{query_server=nil}}); true -> - {same, Group4#group{query_server=nil}} + exit({new_group, Group4#group{query_server=nil}}) end. -get_index_header_data(#group{current_seq=Seq, purge_seq=PurgeSeq, - id_btree=IdBtree,views=Views}) -> - ViewStates = [couch_btree:get_state(Btree) || #view{btree=Btree} <- Views], - #index_header{seq=Seq, - purge_seq=PurgeSeq, - id_btree_state=couch_btree:get_state(IdBtree), - view_states=ViewStates}. - - purge_index(#group{db=Db, views=Views, id_btree=IdBtree}=Group) -> {ok, PurgedIdsRevs} = couch_db:get_last_purged(Db), Ids = [Id || {Id, _Revs} <- PurgedIdsRevs], @@ -120,7 +86,8 @@ purge_index(#group{db=Db, views=Views, id_btree=IdBtree}=Group) -> views=Views2, purge_seq=couch_db:get_purge_seq(Db)}. -process_doc(Db, DocInfo, {Docs, #group{sig=Sig,name=GroupId}=Group, ViewKVs, DocIdViewIdKeys}) -> +process_doc(Db, DocInfo, {Docs, #group{sig=Sig,name=GroupId}=Group, ViewKVs, + DocIdViewIdKeys}) -> % This fun computes once for each document #doc_info{id=DocId, deleted=Deleted} = DocInfo, case DocId of @@ -129,17 +96,15 @@ process_doc(Db, DocInfo, {Docs, #group{sig=Sig,name=GroupId}=Group, ViewKVs, Doc % anything in the definition changed. case couch_db:open_doc(Db, DocInfo) of {ok, Doc} -> - case design_doc_to_view_group(Doc) of + case couch_view_group:design_doc_to_view_group(Doc) of #group{sig=Sig} -> % The same md5 signature, keep on computing {ok, {Docs, Group, ViewKVs, DocIdViewIdKeys}}; _ -> - ?LOG_DEBUG("throw(restart) md5 broke ~p", [DocId]), - throw(restart) + exit(reset) end; {not_found, deleted} -> - ?LOG_DEBUG("throw(restart) {not_found, deleted} ~p", [DocId]), - throw(restart) + exit(reset) end; <> -> % we skip design docs {ok, {Docs, Group, ViewKVs, DocIdViewIdKeys}}; @@ -250,134 +215,4 @@ write_changes(Group, ViewKeyValuesToAdd, DocIdViewIdKeys, NewSeq) -> ], Group2 = Group#group{views=Views2, current_seq=NewSeq, id_btree=IdBtree2}, {ok, Group2}. - -prepare_group(RootDir, DbName, GroupId) -> - {Db, Group} = case (catch couch_db:open(DbName, [])) of - {ok, Db0} -> - case (catch couch_db:open_doc(Db0, GroupId)) of - {ok, Doc} -> - {Db0, design_doc_to_view_group(Doc)}; - Else -> - delete_index_file(RootDir, DbName, GroupId), - ?LOG_DEBUG("prepare_group exit Else ~p self() ~p", [Else, self()]), - exit(Else) - end; - Else -> - delete_index_file(RootDir, DbName, GroupId), - exit(Else) - end, - FileName = RootDir ++ "/." ++ binary_to_list(DbName) ++ - binary_to_list(GroupId) ++".view", - Group2 = - case couch_file:open(FileName) of - {ok, Fd} -> - Sig = Group#group.sig, - case (catch couch_file:read_header(Fd, <<$r, $c, $k, 0>>)) of - {ok, {Sig, #index_header{purge_seq=PurgeSeq}=HeaderInfo}} -> - % sigs match! - DbPurgeSeq = couch_db:get_purge_seq(Db), - % We can only use index with the same, or next purge seq as the db. - if DbPurgeSeq == PurgeSeq -> - init_group(Db, Fd, Group, HeaderInfo); - DbPurgeSeq == PurgeSeq + 1 -> - ?LOG_DEBUG("Purging entries from view index.",[]), - purge_index(init_group(Db, Fd, Group, HeaderInfo)); - true -> - ?LOG_DEBUG("Reseting view index due to lost purge entries.",[]), - reset_file(Db, Fd, DbName, Group) - end; - _ -> - reset_file(Db, Fd, DbName, Group) - end; - {error, enoent} -> - case couch_file:open(FileName, [create]) of - {ok, Fd} -> reset_file(Db, Fd, DbName, Group); - Error -> throw(Error) - end - end, - - couch_db:monitor(Db), - couch_db:close(Db), - {ok, Group2}. - -% maybe move to another module -design_doc_to_view_group(#doc{id=Id,body={Fields}}) -> - Language = proplists:get_value(<<"language">>, Fields, <<"javascript">>), - {RawViews} = proplists:get_value(<<"views">>, Fields, {[]}), - % add the views to a dictionary object, with the map source as the key - DictBySrc = - lists:foldl( - fun({Name, {MRFuns}}, DictBySrcAcc) -> - MapSrc = proplists:get_value(<<"map">>, MRFuns), - RedSrc = proplists:get_value(<<"reduce">>, MRFuns, null), - View = - case dict:find(MapSrc, DictBySrcAcc) of - {ok, View0} -> View0; - error -> #view{def=MapSrc} % create new view object - end, - View2 = - if RedSrc == null -> - View#view{map_names=[Name|View#view.map_names]}; - true -> - View#view{reduce_funs=[{Name,RedSrc}|View#view.reduce_funs]} - end, - dict:store(MapSrc, View2, DictBySrcAcc) - end, dict:new(), RawViews), - % number the views - {Views, _N} = lists:mapfoldl( - fun({_Src, View}, N) -> - {View#view{id_num=N},N+1} - end, 0, dict:to_list(DictBySrc)), - - Group = #group{name=Id, views=Views, def_lang=Language}, - Group#group{sig=erlang:md5(term_to_binary(Group))}. - -reset_group(#group{views=Views}=Group) -> - Views2 = [View#view{btree=nil} || View <- Views], - Group#group{db=nil,fd=nil,query_server=nil,current_seq=0, - id_btree=nil,views=Views2}. - -reset_file(Db, Fd, DbName, #group{sig=Sig,name=Name} = Group) -> - ?LOG_DEBUG("Reseting group index \"~s\" in db ~s", [Name, DbName]), - ok = couch_file:truncate(Fd, 0), - ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, {Sig, nil}), - init_group(Db, Fd, reset_group(Group), nil). - -delete_index_file(RootDir, DbName, GroupId) -> - file:delete(RootDir ++ "/." ++ binary_to_list(DbName) - ++ binary_to_list(GroupId) ++ ".view"). - -init_group(Db, Fd, #group{views=Views}=Group, nil) -> - init_group(Db, Fd, Group, - #index_header{seq=0, purge_seq=couch_db:get_purge_seq(Db), - id_btree_state=nil, view_states=[nil || _ <- Views]}); -init_group(Db, Fd, #group{def_lang=Lang,views=Views}=Group, IndexHeader) -> - #index_header{seq=Seq, purge_seq=PurgeSeq, - id_btree_state=IdBtreeState, view_states=ViewStates} = IndexHeader, - {ok, IdBtree} = couch_btree:open(IdBtreeState, Fd), - Views2 = lists:zipwith( - fun(BtreeState, #view{reduce_funs=RedFuns}=View) -> - FunSrcs = [FunSrc || {_Name, FunSrc} <- RedFuns], - ReduceFun = - fun(reduce, KVs) -> - KVs2 = couch_view:expand_dups(KVs,[]), - KVs3 = couch_view:detuple_kvs(KVs2,[]), - {ok, Reduced} = couch_query_servers:reduce(Lang, FunSrcs, - KVs3), - {length(KVs3), Reduced}; - (rereduce, Reds) -> - Count = lists:sum([Count0 || {Count0, _} <- Reds]), - UserReds = [UserRedsList || {_, UserRedsList} <- Reds], - {ok, Reduced} = couch_query_servers:rereduce(Lang, FunSrcs, - UserReds), - {Count, Reduced} - end, - {ok, Btree} = couch_btree:open(BtreeState, Fd, - [{less, fun couch_view:less_json_keys/2}, - {reduce, ReduceFun}]), - View#view{btree=Btree} - end, - ViewStates, Views), - Group#group{db=Db, fd=Fd, current_seq=Seq, purge_seq=PurgeSeq, - id_btree=IdBtree, views=Views2}. \ No newline at end of file -- cgit v1.2.3