From 5a9321814a727e8c010bf83f05572a341d55f26a Mon Sep 17 00:00:00 2001 From: John Christopher Anderson Date: Wed, 10 Dec 2008 01:13:17 +0000 Subject: view group state gen_server. thanks damien and davisp. git-svn-id: https://svn.apache.org/repos/asf/incubator/couchdb/trunk@724946 13f79535-47bb-0310-9956-ffa450edef68 --- src/couchdb/couch_view_group.erl | 192 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 192 insertions(+) create mode 100644 src/couchdb/couch_view_group.erl (limited to 'src/couchdb/couch_view_group.erl') diff --git a/src/couchdb/couch_view_group.erl b/src/couchdb/couch_view_group.erl new file mode 100644 index 00000000..84547095 --- /dev/null +++ b/src/couchdb/couch_view_group.erl @@ -0,0 +1,192 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_view_group). +-behaviour(gen_server). + +%% API +-export([start_link/1, request_group/2]). +% -export([design_doc_to_view_group/1]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-include("couch_db.hrl"). + +-record(group_state, { + spawn_fun, + target_seq=0, + group_seq=0, + group=nil, + updater_pid=nil, + waiting_list=[] +}). + +% api methods +request_group(Pid, Seq) -> + ?LOG_DEBUG("request_group {Pid, Seq} ~p", [{Pid, Seq}]), + case gen_server:call(Pid, {request_group, Seq}, infinity) of + {ok, Group} -> + ?LOG_DEBUG("get_updated_group replied with group", []), + {ok, Group}; + Else -> + ?LOG_DEBUG("get_updated_group replied with _Else ~p", [Else]), + Else + end. + + +% from template +start_link(InitArgs) -> + gen_server:start_link(couch_view_group, InitArgs, []). + +% init differentiates between temp and design_doc views. It creates a closure +% which spawns the appropriate view_updater. (It might also spawn the first +% view_updater run.) +init(InitArgs) -> + SpawnFun = fun() -> spawn_updater(InitArgs) end, + process_flag(trap_exit, true), + {ok, #group_state{spawn_fun=SpawnFun}}. + +% There are two sources of messages: couch_view, which requests an up to date +% view group, and the couch_view_updater, which when spawned, updates the +% group and sends it back here. We employ a caching mechanism, so that between +% database writes, we don't have to spawn a couch_view_updater with every view +% request. This should give us more control, and the ability to request view +% statuses eventually. + +% The caching mechanism: each request is submitted with a seq_id for the +% database at the time it was read. We guarantee to return a view from that +% sequence or newer. + +% If the request sequence is higher than our current high_target seq, we set +% that as the highest seqence. If the updater is not running, we launch it. + +handle_call({request_group, RequestSeq}, From, + #group_state{ + target_seq=TargetSeq, + spawn_fun=SpawnFun, + updater_pid=Up, + waiting_list=WaitList + }=State) when RequestSeq > TargetSeq, Up == nil -> + UpdaterPid = SpawnFun(), + {noreply, State#group_state{ + target_seq=RequestSeq, + updater_pid=UpdaterPid, + waiting_list=[{From,RequestSeq}|WaitList] + }, infinity}; + +handle_call({request_group, RequestSeq}, From, + #group_state{ + target_seq=TargetSeq, + waiting_list=WaitList + }=State) when RequestSeq > TargetSeq -> + {noreply, State#group_state{ + target_seq=RequestSeq, + waiting_list=[{From,RequestSeq}|WaitList] + }, infinity}; + + +% If the request seqence is less than or equal to the seq_id of a known Group, +% we respond with that Group. +handle_call({request_group, RequestSeq}, _From, + State=#group_state{ + group_seq=GroupSeq, + group=Group + }) when RequestSeq =< GroupSeq -> + {reply, {ok, Group}, State}; + +% Otherwise: TargetSeq => RequestSeq > GroupSeq +% We've already initiated the appropriate action, so just hold the response until the group is up to the RequestSeq +handle_call({request_group, RequestSeq}, From, + #group_state{ + waiting_list=WaitList + }=State) -> + {noreply, State#group_state{ + waiting_list=[{From, RequestSeq}|WaitList] + }, infinity}. + + +% When the updater finishes, it will return a group with a seq_id, we should +% store that group and seq_id in our state. If our high_target is higher than +% the returned group, start a new updater. + +handle_cast({new_group, Group=#group{current_seq=NewGroupSeq}}, + State=#group_state{ + target_seq=TargetSeq, + waiting_list=WaitList, + spawn_fun=SpawnFun}) when TargetSeq > NewGroupSeq -> + StillWaiting = reply_with_group(Group, WaitList, []), + UpdaterPid = SpawnFun(), + {noreply, State#group_state{ + updater_pid=UpdaterPid, + waiting_list=StillWaiting, + group_seq=NewGroupSeq, + group=Group}}; + +handle_cast({new_group, Group=#group{current_seq=NewGroupSeq}}, + State=#group_state{waiting_list=WaitList}) -> + StillWaiting = reply_with_group(Group, WaitList, []), + {noreply, State#group_state{ + updater_pid=nil, + waiting_list=StillWaiting, + group_seq=NewGroupSeq, + group=Group}}. + +handle_info({'EXIT', _FromPid, normal}, State) -> + {noreply, State}; + +handle_info({'EXIT', FromPid, Reason}, State) -> + ?LOG_DEBUG("Exit from updater: ~p", [{FromPid, Reason}]), + {stop, Reason, State}; + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(Reason, _State=#group_state{waiting_list=WaitList}) -> + lists:foreach(fun({Waiter, _}) -> gen_server:reply(Waiter, {error, Reason}) end, WaitList), + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +% error handling? the updater could die on us, we can save ourselves here. +% but we shouldn't, we could be dead for a reason, like the view got changed, or something. + + +%% Local Functions + +% reply_with_group/3 +% for each item in the WaitingList {Pid, Seq} +% if the Seq is =< GroupSeq, reply +reply_with_group(Group=#group{current_seq=GroupSeq}, [{Pid, Seq}|WaitList], StillWaiting) when Seq =< GroupSeq -> + gen_server:reply(Pid, {ok, Group}), + reply_with_group(Group, WaitList, StillWaiting); + +% else +% put it in the continuing waiting list +reply_with_group(Group, [{Pid, Seq}|WaitList], StillWaiting) -> + reply_with_group(Group, WaitList, [{Pid, Seq}|StillWaiting]); + +% return the still waiting list +reply_with_group(_Group, [], StillWaiting) -> + StillWaiting. + +spawn_updater({RootDir, DbName, GroupId}) -> + spawn_link(couch_view_updater, update, + [RootDir, DbName, GroupId, self()]); + +spawn_updater({DbName, Fd, Lang, MapSrc, RedSrc}) -> + spawn_link(couch_view_updater, temp_update, + [DbName, Fd, Lang, MapSrc, RedSrc, self()]). + + -- cgit v1.2.3