summaryrefslogtreecommitdiff
path: root/src/couchdb/couch_view_group.erl
diff options
context:
space:
mode:
authorAdam Kocoloski <kocolosk@apache.org>2009-04-05 20:34:35 +0000
committerAdam Kocoloski <kocolosk@apache.org>2009-04-05 20:34:35 +0000
commit0d608a8ea9658780bdcfe36ae700accaf006690b (patch)
tree27151118c9a7a4fc4c40341c34996f332c8c854c /src/couchdb/couch_view_group.erl
parent05838650cfd06fa27f07d0a13b80617879b381e3 (diff)
added compaction for view indexes. See COUCHDB-92
No tests or Futon interface for this feature yet. git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@762153 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src/couchdb/couch_view_group.erl')
-rw-r--r--src/couchdb/couch_view_group.erl96
1 files changed, 79 insertions, 17 deletions
diff --git a/src/couchdb/couch_view_group.erl b/src/couchdb/couch_view_group.erl
index 4e7d7767..e44d637c 100644
--- a/src/couchdb/couch_view_group.erl
+++ b/src/couchdb/couch_view_group.erl
@@ -29,15 +29,18 @@
init_args,
group,
updater_pid=nil,
+ compactor_pid=nil,
waiting_commit=false,
- waiting_list=[]
+ waiting_list=[],
+ ref_counter=nil
}).
% api methods
request_group(Pid, Seq) ->
?LOG_DEBUG("request_group {Pid, Seq} ~p", [{Pid, Seq}]),
case gen_server:call(Pid, {request_group, Seq}, infinity) of
- {ok, Group} ->
+ {ok, Group, RefCounter} ->
+ couch_ref_counter:add(RefCounter),
{ok, Group};
Else ->
?LOG_DEBUG("get_updated_group replied with _Else ~p", [Else]),
@@ -70,14 +73,16 @@ start_link(InitArgs) ->
init({InitArgs, ReturnPid, Ref}) ->
process_flag(trap_exit, true),
case prepare_group(InitArgs, false) of
- {ok, #group{db=Db}=Group} ->
+ {ok, #group{db=Db, fd=Fd}=Group} ->
couch_db:monitor(Db),
Pid = spawn_link(fun()-> couch_view_updater:update(Group) end),
+ {ok, RefCounter} = couch_ref_counter:start([Fd]),
{ok, #group_state{
db_name=couch_db:name(Db),
init_args=InitArgs,
updater_pid = Pid,
- group=Group}};
+ group=Group,
+ ref_counter=RefCounter}};
Error ->
ReturnPid ! {Ref, self(), Error},
ignore
@@ -120,10 +125,11 @@ handle_call({request_group, RequestSeq}, From,
% If the request seqence is less than or equal to the seq_id of a known Group,
% we respond with that Group.
-handle_call({request_group, RequestSeq}, _From,
- #group_state{group=#group{current_seq=GroupSeq}=Group}=State)
- when RequestSeq =< GroupSeq ->
- {reply, {ok, Group}, State};
+handle_call({request_group, RequestSeq}, _From, #group_state{
+ group = #group{current_seq=GroupSeq} = Group,
+ ref_counter = RefCounter
+ } = State) when RequestSeq =< GroupSeq ->
+ {reply, {ok, Group, RefCounter}, State};
% Otherwise: TargetSeq => RequestSeq > GroupSeq
% We've already initiated the appropriate action, so just hold the response until the group is up to the RequestSeq
@@ -134,9 +140,63 @@ handle_call({request_group, RequestSeq}, From,
}, infinity}.
-handle_cast(foo, State) ->
- {ok, State}.
+handle_cast({start_compact, CompactFun}, #group_state{ compactor_pid=nil,
+ group=Group, init_args={view, RootDir, DbName, GroupId} } = State) ->
+ ?LOG_INFO("Starting view group compaction", []),
+ {ok, Db} = couch_db:open(DbName, []),
+ {ok, Fd} = open_index_file(RootDir, DbName, <<GroupId/binary,".compact">>),
+ NewGroup = reset_file(Db, Fd, DbName, Group),
+ Pid = spawn_link(fun() -> CompactFun(Group, NewGroup) end),
+ {noreply, State#group_state{compactor_pid = Pid}};
+handle_cast({start_compact, _}, State) ->
+ %% compact already running, this is a no-op
+ {noreply, State};
+handle_cast({compact_done, #group{fd=NewFd, current_seq=NewSeq} = NewGroup},
+ #group_state{
+ group = #group{current_seq=OldSeq} = Group,
+ init_args = {view, RootDir, DbName, GroupId},
+ updater_pid = nil,
+ ref_counter = RefCounter
+ } = State) when NewSeq >= OldSeq ->
+ ?LOG_INFO("View Group compaction complete", []),
+ BaseName = RootDir ++ "/." ++ ?b2l(DbName) ++ ?b2l(GroupId),
+ FileName = BaseName ++ ".view",
+ CompactName = BaseName ++".compact.view",
+ file:delete(FileName),
+ ok = file:rename(CompactName, FileName),
+
+ %% cleanup old group
+ couch_ref_counter:drop(RefCounter),
+ {ok, NewRefCounter} = couch_ref_counter:start([NewFd]),
+ case Group#group.db of
+ nil -> ok;
+ Else -> couch_db:close(Else)
+ end,
+
+ erlang:send_after(1000, self(), delayed_commit),
+ {noreply, State#group_state{
+ group=NewGroup,
+ ref_counter=NewRefCounter,
+ compactor_pid=nil
+ }};
+handle_cast({compact_done, NewGroup}, #group_state{
+ init_args={view, _RootDir, DbName, GroupId} } = State) ->
+ ?LOG_INFO("View index compaction still behind main file", []),
+ couch_db:close(NewGroup#group.db),
+ {ok, Db} = couch_db:open(DbName, []),
+ Pid = spawn_link(fun() ->
+ {_,Ref} = erlang:spawn_monitor(fun() ->
+ couch_view_updater:update(NewGroup#group{db = Db})
+ end),
+ receive
+ {'DOWN', Ref, _, _, {new_group, NewGroup2}} ->
+ #group{name=GroupId} = NewGroup2,
+ Pid2 = couch_view:get_group_server(DbName, GroupId),
+ gen_server:cast(Pid2, {compact_done, NewGroup2})
+ end
+ end),
+ {noreply, State#group_state{compactor_pid = Pid}}.
handle_info(delayed_commit, #group_state{db_name=DbName,group=Group}=State) ->
{ok, Db} = couch_db:open(DbName, []),
@@ -160,6 +220,7 @@ handle_info(delayed_commit, #group_state{db_name=DbName,group=Group}=State) ->
handle_info({'EXIT', FromPid, {new_group, #group{db=Db}=Group}},
#group_state{db_name=DbName,
updater_pid=UpPid,
+ ref_counter=RefCounter,
waiting_list=WaitList,
waiting_commit=WaitingCommit}=State) when UpPid == FromPid ->
ok = couch_db:close(Db),
@@ -168,7 +229,7 @@ handle_info({'EXIT', FromPid, {new_group, #group{db=Db}=Group}},
erlang:send_after(1000, self(), delayed_commit);
true -> ok
end,
- case reply_with_group(Group, WaitList, []) of
+ case reply_with_group(Group, WaitList, [], RefCounter) of
[] ->
{noreply, State#group_state{waiting_commit=true, waiting_list=[],
group=Group#group{db=nil}, updater_pid=nil}};
@@ -221,17 +282,18 @@ code_change(_OldVsn, State, _Extra) ->
% reply_with_group/3
% for each item in the WaitingList {Pid, Seq}
% if the Seq is =< GroupSeq, reply
-reply_with_group(Group=#group{current_seq=GroupSeq}, [{Pid, Seq}|WaitList], StillWaiting) when Seq =< GroupSeq ->
- gen_server:reply(Pid, {ok, Group}),
- reply_with_group(Group, WaitList, StillWaiting);
+reply_with_group(Group=#group{current_seq=GroupSeq}, [{Pid, Seq}|WaitList],
+ StillWaiting, RefCounter) when Seq =< GroupSeq ->
+ gen_server:reply(Pid, {ok, Group, RefCounter}),
+ reply_with_group(Group, WaitList, StillWaiting, RefCounter);
% else
% put it in the continuing waiting list
-reply_with_group(Group, [{Pid, Seq}|WaitList], StillWaiting) ->
- reply_with_group(Group, WaitList, [{Pid, Seq}|StillWaiting]);
+reply_with_group(Group, [{Pid, Seq}|WaitList], StillWaiting, RefCounter) ->
+ reply_with_group(Group, WaitList, [{Pid, Seq}|StillWaiting], RefCounter);
% return the still waiting list
-reply_with_group(_Group, [], StillWaiting) ->
+reply_with_group(_Group, [], StillWaiting, _RefCounter) ->
StillWaiting.
reply_all(#group_state{waiting_list=WaitList}=State, Reply) ->