path: root/src/couchdb/couch_view_group.erl
diff options
authorJohn Christopher Anderson <>2008-12-10 01:13:17 +0000
committerJohn Christopher Anderson <>2008-12-10 01:13:17 +0000
commit5a9321814a727e8c010bf83f05572a341d55f26a (patch)
tree91b6233a3d81f9b29a34d9653fffbde284cdfa4b /src/couchdb/couch_view_group.erl
parent6bacde0d941d209f41ad3ca8921e3a596a056c06 (diff)
view group state gen_server. thanks damien and davisp.
git-svn-id: 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src/couchdb/couch_view_group.erl')
1 files changed, 192 insertions, 0 deletions
diff --git a/src/couchdb/couch_view_group.erl b/src/couchdb/couch_view_group.erl
new file mode 100644
index 00000000..84547095
--- /dev/null
+++ b/src/couchdb/couch_view_group.erl
@@ -0,0 +1,192 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+%% API
+-export([start_link/1, request_group/2]).
+% -export([design_doc_to_view_group/1]).
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+-record(group_state, {
+ spawn_fun,
+ target_seq=0,
+ group_seq=0,
+ group=nil,
+ updater_pid=nil,
+ waiting_list=[]
+% api methods
+request_group(Pid, Seq) ->
+ ?LOG_DEBUG("request_group {Pid, Seq} ~p", [{Pid, Seq}]),
+ case gen_server:call(Pid, {request_group, Seq}, infinity) of
+ {ok, Group} ->
+ ?LOG_DEBUG("get_updated_group replied with group", []),
+ {ok, Group};
+ Else ->
+ ?LOG_DEBUG("get_updated_group replied with _Else ~p", [Else]),
+ Else
+ end.
+% from template
+start_link(InitArgs) ->
+ gen_server:start_link(couch_view_group, InitArgs, []).
+% init differentiates between temp and design_doc views. It creates a closure
+% which spawns the appropriate view_updater. (It might also spawn the first
+% view_updater run.)
+init(InitArgs) ->
+ SpawnFun = fun() -> spawn_updater(InitArgs) end,
+ process_flag(trap_exit, true),
+ {ok, #group_state{spawn_fun=SpawnFun}}.
+% There are two sources of messages: couch_view, which requests an up to date
+% view group, and the couch_view_updater, which when spawned, updates the
+% group and sends it back here. We employ a caching mechanism, so that between
+% database writes, we don't have to spawn a couch_view_updater with every view
+% request. This should give us more control, and the ability to request view
+% statuses eventually.
+% The caching mechanism: each request is submitted with a seq_id for the
+% database at the time it was read. We guarantee to return a view from that
+% sequence or newer.
+% If the request sequence is higher than our current high_target seq, we set
+% that as the highest seqence. If the updater is not running, we launch it.
+handle_call({request_group, RequestSeq}, From,
+ #group_state{
+ target_seq=TargetSeq,
+ spawn_fun=SpawnFun,
+ updater_pid=Up,
+ waiting_list=WaitList
+ }=State) when RequestSeq > TargetSeq, Up == nil ->
+ UpdaterPid = SpawnFun(),
+ {noreply, State#group_state{
+ target_seq=RequestSeq,
+ updater_pid=UpdaterPid,
+ waiting_list=[{From,RequestSeq}|WaitList]
+ }, infinity};
+handle_call({request_group, RequestSeq}, From,
+ #group_state{
+ target_seq=TargetSeq,
+ waiting_list=WaitList
+ }=State) when RequestSeq > TargetSeq ->
+ {noreply, State#group_state{
+ target_seq=RequestSeq,
+ waiting_list=[{From,RequestSeq}|WaitList]
+ }, infinity};
+% If the request seqence is less than or equal to the seq_id of a known Group,
+% we respond with that Group.
+handle_call({request_group, RequestSeq}, _From,
+ State=#group_state{
+ group_seq=GroupSeq,
+ group=Group
+ }) when RequestSeq =< GroupSeq ->
+ {reply, {ok, Group}, State};
+% Otherwise: TargetSeq => RequestSeq > GroupSeq
+% We've already initiated the appropriate action, so just hold the response until the group is up to the RequestSeq
+handle_call({request_group, RequestSeq}, From,
+ #group_state{
+ waiting_list=WaitList
+ }=State) ->
+ {noreply, State#group_state{
+ waiting_list=[{From, RequestSeq}|WaitList]
+ }, infinity}.
+% When the updater finishes, it will return a group with a seq_id, we should
+% store that group and seq_id in our state. If our high_target is higher than
+% the returned group, start a new updater.
+handle_cast({new_group, Group=#group{current_seq=NewGroupSeq}},
+ State=#group_state{
+ target_seq=TargetSeq,
+ waiting_list=WaitList,
+ spawn_fun=SpawnFun}) when TargetSeq > NewGroupSeq ->
+ StillWaiting = reply_with_group(Group, WaitList, []),
+ UpdaterPid = SpawnFun(),
+ {noreply, State#group_state{
+ updater_pid=UpdaterPid,
+ waiting_list=StillWaiting,
+ group_seq=NewGroupSeq,
+ group=Group}};
+handle_cast({new_group, Group=#group{current_seq=NewGroupSeq}},
+ State=#group_state{waiting_list=WaitList}) ->
+ StillWaiting = reply_with_group(Group, WaitList, []),
+ {noreply, State#group_state{
+ updater_pid=nil,
+ waiting_list=StillWaiting,
+ group_seq=NewGroupSeq,
+ group=Group}}.
+handle_info({'EXIT', _FromPid, normal}, State) ->
+ {noreply, State};
+handle_info({'EXIT', FromPid, Reason}, State) ->
+ ?LOG_DEBUG("Exit from updater: ~p", [{FromPid, Reason}]),
+ {stop, Reason, State};
+handle_info(_Info, State) ->
+ {noreply, State}.
+terminate(Reason, _State=#group_state{waiting_list=WaitList}) ->
+ lists:foreach(fun({Waiter, _}) -> gen_server:reply(Waiter, {error, Reason}) end, WaitList),
+ ok.
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+% error handling? the updater could die on us, we can save ourselves here.
+% but we shouldn't, we could be dead for a reason, like the view got changed, or something.
+%% Local Functions
+% reply_with_group/3
+% for each item in the WaitingList {Pid, Seq}
+% if the Seq is =< GroupSeq, reply
+reply_with_group(Group=#group{current_seq=GroupSeq}, [{Pid, Seq}|WaitList], StillWaiting) when Seq =< GroupSeq ->
+ gen_server:reply(Pid, {ok, Group}),
+ reply_with_group(Group, WaitList, StillWaiting);
+% else
+% put it in the continuing waiting list
+reply_with_group(Group, [{Pid, Seq}|WaitList], StillWaiting) ->
+ reply_with_group(Group, WaitList, [{Pid, Seq}|StillWaiting]);
+% return the still waiting list
+reply_with_group(_Group, [], StillWaiting) ->
+ StillWaiting.
+spawn_updater({RootDir, DbName, GroupId}) ->
+ spawn_link(couch_view_updater, update,
+ [RootDir, DbName, GroupId, self()]);
+spawn_updater({DbName, Fd, Lang, MapSrc, RedSrc}) ->
+ spawn_link(couch_view_updater, temp_update,
+ [DbName, Fd, Lang, MapSrc, RedSrc, self()]).