From 56a3ee28e006aa42150482e1c3f91dc1906273f9 Mon Sep 17 00:00:00 2001 From: "Damien F. Katz" Date: Fri, 12 Dec 2008 05:23:37 +0000 Subject: modifications to view server to keep the file descriptor open for the life of the view group. git-svn-id: https://svn.apache.org/repos/asf/incubator/couchdb/trunk@725909 13f79535-47bb-0310-9956-ffa450edef68 --- src/couchdb/couch_view_group.erl | 325 ++++++++++++++++++++++++++++++--------- 1 file changed, 250 insertions(+), 75 deletions(-) (limited to 'src/couchdb/couch_view_group.erl') diff --git a/src/couchdb/couch_view_group.erl b/src/couchdb/couch_view_group.erl index 84547095..cbbac92a 100644 --- a/src/couchdb/couch_view_group.erl +++ b/src/couchdb/couch_view_group.erl @@ -15,7 +15,7 @@ %% API -export([start_link/1, request_group/2]). -% -export([design_doc_to_view_group/1]). +-export([design_doc_to_view_group/1]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -24,10 +24,9 @@ -include("couch_db.hrl"). -record(group_state, { - spawn_fun, - target_seq=0, - group_seq=0, - group=nil, + db_name, + init_args, + group, updater_pid=nil, waiting_list=[] }). @@ -37,7 +36,6 @@ request_group(Pid, Seq) -> ?LOG_DEBUG("request_group {Pid, Seq} ~p", [{Pid, Seq}]), case gen_server:call(Pid, {request_group, Seq}, infinity) of {ok, Group} -> - ?LOG_DEBUG("get_updated_group replied with group", []), {ok, Group}; Else -> ?LOG_DEBUG("get_updated_group replied with _Else ~p", [Else]), @@ -47,15 +45,34 @@ request_group(Pid, Seq) -> % from template start_link(InitArgs) -> - gen_server:start_link(couch_view_group, InitArgs, []). + case gen_server:start_link(couch_view_group, + {InitArgs, self(), Ref = make_ref()}, []) of + {ok, Pid} -> {ok, Pid}; + ignore -> receive {Ref, Error} -> Error end; + Error -> Error + end. % init differentiates between temp and design_doc views. It creates a closure % which spawns the appropriate view_updater. (It might also spawn the first % view_updater run.) -init(InitArgs) -> - SpawnFun = fun() -> spawn_updater(InitArgs) end, +init({InitArgs, ReturnPid, Ref}) -> process_flag(trap_exit, true), - {ok, #group_state{spawn_fun=SpawnFun}}. + case prepare_group(InitArgs, false) of + {ok, #group{db=Db}=Group} -> + couch_db:monitor(Db), + Pid = spawn_link(fun()-> couch_view_updater:update(Group) end), + {ok, #group_state{ + db_name=couch_db:name(Db), + init_args=InitArgs, + updater_pid = Pid, + group=Group}}; + Error -> + ReturnPid ! {Ref, Error}, + ignore + end. + + + % There are two sources of messages: couch_view, which requests an up to date % view group, and the couch_view_updater, which when spawned, updates the @@ -73,87 +90,92 @@ init(InitArgs) -> handle_call({request_group, RequestSeq}, From, #group_state{ - target_seq=TargetSeq, - spawn_fun=SpawnFun, - updater_pid=Up, - waiting_list=WaitList - }=State) when RequestSeq > TargetSeq, Up == nil -> - UpdaterPid = SpawnFun(), - {noreply, State#group_state{ - target_seq=RequestSeq, - updater_pid=UpdaterPid, - waiting_list=[{From,RequestSeq}|WaitList] - }, infinity}; - -handle_call({request_group, RequestSeq}, From, - #group_state{ - target_seq=TargetSeq, + db_name=DbName, + group=#group{current_seq=Seq}=Group, + updater_pid=nil, waiting_list=WaitList - }=State) when RequestSeq > TargetSeq -> + }=State) when RequestSeq > Seq -> + {ok, Db} = couch_db:open(DbName, []), + Group2 = Group#group{db=Db}, + Pid = spawn_link(fun()-> couch_view_updater:update(Group2) end), + {noreply, State#group_state{ - target_seq=RequestSeq, + updater_pid=Pid, + group=Group2, waiting_list=[{From,RequestSeq}|WaitList] - }, infinity}; + }, infinity}; % If the request seqence is less than or equal to the seq_id of a known Group, % we respond with that Group. handle_call({request_group, RequestSeq}, _From, - State=#group_state{ - group_seq=GroupSeq, - group=Group - }) when RequestSeq =< GroupSeq -> + #group_state{group=#group{current_seq=GroupSeq}=Group}=State) + when RequestSeq =< GroupSeq -> {reply, {ok, Group}, State}; % Otherwise: TargetSeq => RequestSeq > GroupSeq % We've already initiated the appropriate action, so just hold the response until the group is up to the RequestSeq handle_call({request_group, RequestSeq}, From, - #group_state{ - waiting_list=WaitList - }=State) -> + #group_state{waiting_list=WaitList}=State) -> {noreply, State#group_state{ waiting_list=[{From, RequestSeq}|WaitList] - }, infinity}. - - -% When the updater finishes, it will return a group with a seq_id, we should -% store that group and seq_id in our state. If our high_target is higher than -% the returned group, start a new updater. - -handle_cast({new_group, Group=#group{current_seq=NewGroupSeq}}, - State=#group_state{ - target_seq=TargetSeq, - waiting_list=WaitList, - spawn_fun=SpawnFun}) when TargetSeq > NewGroupSeq -> - StillWaiting = reply_with_group(Group, WaitList, []), - UpdaterPid = SpawnFun(), - {noreply, State#group_state{ - updater_pid=UpdaterPid, - waiting_list=StillWaiting, - group_seq=NewGroupSeq, - group=Group}}; - -handle_cast({new_group, Group=#group{current_seq=NewGroupSeq}}, - State=#group_state{waiting_list=WaitList}) -> - StillWaiting = reply_with_group(Group, WaitList, []), - {noreply, State#group_state{ - updater_pid=nil, - waiting_list=StillWaiting, - group_seq=NewGroupSeq, - group=Group}}. - + }, infinity}. + + +handle_cast(foo, State) -> + {ok, State}. + + +handle_info({'EXIT', FromPid, {new_group, #group{db=Db}=Group}}, + #group_state{db_name=DbName, + updater_pid=UpPid, + waiting_list=WaitList}=State) when UpPid == FromPid -> + ok = couch_db:close(Db), + case reply_with_group(Group, WaitList, []) of + [] -> + {noreply, State#group_state{waiting_list=[], + group=Group#group{db=nil}, + updater_pid=nil}}; + StillWaiting -> + % we still have some waiters, reopen the database and reupdate the index + {ok, Db2} = couch_db:open(DbName, []), + Group2 = Group#group{db=Db2}, + Pid = spawn_link(fun() -> couch_view_updater:update(Group2) end), + {noreply, State#group_state{waiting_list=StillWaiting, + group=Group2, + updater_pid=Pid}} + end; + +handle_info({'EXIT', FromPid, reset}, + #group_state{ + init_args=InitArgs, + updater_pid=UpPid, + group=Group}=State) when UpPid == FromPid -> + ok = couch_db:close(Group#group.db), + case prepare_group(InitArgs, true) of + {ok, ResetGroup} -> + Pid = spawn_link(fun()-> couch_view_updater:update(ResetGroup) end), + {noreply, State#group_state{ + updater_pid=Pid, + group=ResetGroup}}; + Error -> + {stop, normal, reply_all(State, Error)} + end; + handle_info({'EXIT', _FromPid, normal}, State) -> {noreply, State}; handle_info({'EXIT', FromPid, Reason}, State) -> - ?LOG_DEBUG("Exit from updater: ~p", [{FromPid, Reason}]), + ?LOG_DEBUG("Exit from linked pid: ~p", [{FromPid, Reason}]), {stop, Reason, State}; -handle_info(_Info, State) -> - {noreply, State}. +handle_info({'DOWN',_,_,_,_}, State) -> + ?LOG_INFO("Shutting down view group server, monitored db is closing.", []), + {stop, normal, reply_all(State, shutdown)}. + -terminate(Reason, _State=#group_state{waiting_list=WaitList}) -> - lists:foreach(fun({Waiter, _}) -> gen_server:reply(Waiter, {error, Reason}) end, WaitList), +terminate(Reason, #group_state{group=#group{fd=Fd}}=State) -> + reply_all(State, Reason), ok. code_change(_OldVsn, State, _Extra) -> @@ -181,12 +203,165 @@ reply_with_group(Group, [{Pid, Seq}|WaitList], StillWaiting) -> reply_with_group(_Group, [], StillWaiting) -> StillWaiting. -spawn_updater({RootDir, DbName, GroupId}) -> - spawn_link(couch_view_updater, update, - [RootDir, DbName, GroupId, self()]); +reply_all(#group_state{waiting_list=WaitList}=State, Reply) -> + [catch gen_server:reply(Pid, Reply) || {Pid, _} <- WaitList], + State#group_state{waiting_list=[]}. + +prepare_group({view, RootDir, DbName, GroupId}, ForceReset)-> + case open_db_group(DbName, GroupId) of + {ok, Db, #group{sig=Sig}=Group0} -> + case open_index_file(RootDir, DbName, GroupId) of + {ok, Fd} -> + Group = Group0#group{ + commit_fun = fun(GroupIn) -> + Header = {Sig, get_index_header_data(GroupIn)}, + ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, Header) + end}, + if ForceReset -> + {ok, reset_file(Db, Fd, DbName, Group)}; + true -> + case (catch couch_file:read_header(Fd, <<$r, $c, $k, 0>>)) of + {ok, {Sig, HeaderInfo}} -> + % sigs match! + {ok, init_group(Db, Fd, Group, HeaderInfo)}; + _ -> + {ok, reset_file(Db, Fd, DbName, Group)} + end + end; + Error -> + catch delete_index_file(RootDir, DbName, GroupId), + Error + end; + Error -> + catch delete_index_file(RootDir, DbName, GroupId), + Error + end; +prepare_group({temp_view, DbName, Fd, Lang, MapSrc, RedSrc}, _ForceReset) -> + case couch_db:open(DbName, []) of + {ok, Db} -> + View = #view{map_names=["_temp"], + id_num=0, + btree=nil, + def=MapSrc, + reduce_funs= if RedSrc==[] -> []; true -> [{"_temp", RedSrc}] end}, + {ok, init_group(Db, Fd, #group{name="_temp", db=Db, views=[View], + def_lang=Lang, commit_fun=fun(_G) -> ok end}, nil)}; + Error -> + Error + end. + + +get_index_header_data(#group{current_seq=Seq, purge_seq=PurgeSeq, + id_btree=IdBtree,views=Views}) -> + ViewStates = [couch_btree:get_state(Btree) || #view{btree=Btree} <- Views], + #index_header{seq=Seq, + purge_seq=PurgeSeq, + id_btree_state=couch_btree:get_state(IdBtree), + view_states=ViewStates}. + -spawn_updater({DbName, Fd, Lang, MapSrc, RedSrc}) -> - spawn_link(couch_view_updater, temp_update, - [DbName, Fd, Lang, MapSrc, RedSrc, self()]). +open_index_file(RootDir, DbName, GroupId) -> + FileName = RootDir ++ "/." ++ ?b2l(DbName) ++ ?b2l(GroupId) ++".view", + case couch_file:open(FileName) of + {ok, Fd} -> {ok, Fd}; + {error, enoent} -> couch_file:open(FileName, [create]); + Error -> Error + end. +open_db_group(DbName, GroupId) -> + case couch_db:open(DbName, []) of + {ok, Db} -> + case couch_db:open_doc(Db, GroupId) of + {ok, Doc} -> + {ok, Db, design_doc_to_view_group(Doc)}; + Else -> + couch_db:close(Db), + Else + end; + Else -> + Else + end. + +% maybe move to another module +design_doc_to_view_group(#doc{id=Id,body={Fields}}) -> + Language = proplists:get_value(<<"language">>, Fields, <<"javascript">>), + {RawViews} = proplists:get_value(<<"views">>, Fields, {[]}), + + % add the views to a dictionary object, with the map source as the key + DictBySrc = + lists:foldl( + fun({Name, {MRFuns}}, DictBySrcAcc) -> + MapSrc = proplists:get_value(<<"map">>, MRFuns), + RedSrc = proplists:get_value(<<"reduce">>, MRFuns, null), + View = + case dict:find(MapSrc, DictBySrcAcc) of + {ok, View0} -> View0; + error -> #view{def=MapSrc} % create new view object + end, + View2 = + if RedSrc == null -> + View#view{map_names=[Name|View#view.map_names]}; + true -> + View#view{reduce_funs=[{Name,RedSrc}|View#view.reduce_funs]} + end, + dict:store(MapSrc, View2, DictBySrcAcc) + end, dict:new(), RawViews), + % number the views + {Views, _N} = lists:mapfoldl( + fun({_Src, View}, N) -> + {View#view{id_num=N},N+1} + end, 0, dict:to_list(DictBySrc)), + + Group = #group{name=Id, views=Views, def_lang=Language}, + Group#group{sig=erlang:md5(term_to_binary(Group))}. + +reset_group(#group{views=Views}=Group) -> + Views2 = [View#view{btree=nil} || View <- Views], + Group#group{db=nil,fd=nil,query_server=nil,current_seq=0, + id_btree=nil,views=Views2}. + +reset_file(Db, Fd, DbName, #group{sig=Sig,name=Name} = Group) -> + ?LOG_DEBUG("Reseting group index \"~s\" in db ~s", [Name, DbName]), + ok = couch_file:truncate(Fd, 0), + ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, {Sig, nil}), + init_group(Db, Fd, reset_group(Group), nil). + +delete_index_file(RootDir, DbName, GroupId) -> + file:delete(RootDir ++ "/." ++ binary_to_list(DbName) + ++ binary_to_list(GroupId) ++ ".view"). + +init_group(Db, Fd, #group{views=Views}=Group, nil) -> + init_group(Db, Fd, Group, + #index_header{seq=0, purge_seq=couch_db:get_purge_seq(Db), + id_btree_state=nil, view_states=[nil || _ <- Views]}); +init_group(Db, Fd, #group{def_lang=Lang,views=Views}=Group, IndexHeader) -> + #index_header{seq=Seq, purge_seq=PurgeSeq, + id_btree_state=IdBtreeState, view_states=ViewStates} = IndexHeader, + {ok, IdBtree} = couch_btree:open(IdBtreeState, Fd), + Views2 = lists:zipwith( + fun(BtreeState, #view{reduce_funs=RedFuns}=View) -> + FunSrcs = [FunSrc || {_Name, FunSrc} <- RedFuns], + ReduceFun = + fun(reduce, KVs) -> + KVs2 = couch_view:expand_dups(KVs,[]), + KVs3 = couch_view:detuple_kvs(KVs2,[]), + {ok, Reduced} = couch_query_servers:reduce(Lang, FunSrcs, + KVs3), + {length(KVs3), Reduced}; + (rereduce, Reds) -> + Count = lists:sum([Count0 || {Count0, _} <- Reds]), + UserReds = [UserRedsList || {_, UserRedsList} <- Reds], + {ok, Reduced} = couch_query_servers:rereduce(Lang, FunSrcs, + UserReds), + {Count, Reduced} + end, + {ok, Btree} = couch_btree:open(BtreeState, Fd, + [{less, fun couch_view:less_json_keys/2}, + {reduce, ReduceFun}]), + View#view{btree=Btree} + end, + ViewStates, Views), + Group#group{db=Db, fd=Fd, current_seq=Seq, purge_seq=PurgeSeq, + id_btree=IdBtree, views=Views2}. + -- cgit v1.2.3