From 0d608a8ea9658780bdcfe36ae700accaf006690b Mon Sep 17 00:00:00 2001 From: Adam Kocoloski Date: Sun, 5 Apr 2009 20:34:35 +0000 Subject: added compaction for view indexes. See COUCHDB-92 No tests or Futon interface for this feature yet. git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@762153 13f79535-47bb-0310-9956-ffa450edef68 --- src/couchdb/couch_view_group.erl | 96 +++++++++++++++++++++++++++++++++------- 1 file changed, 79 insertions(+), 17 deletions(-) (limited to 'src/couchdb/couch_view_group.erl') diff --git a/src/couchdb/couch_view_group.erl b/src/couchdb/couch_view_group.erl index 4e7d7767..e44d637c 100644 --- a/src/couchdb/couch_view_group.erl +++ b/src/couchdb/couch_view_group.erl @@ -29,15 +29,18 @@ init_args, group, updater_pid=nil, + compactor_pid=nil, waiting_commit=false, - waiting_list=[] + waiting_list=[], + ref_counter=nil }). % api methods request_group(Pid, Seq) -> ?LOG_DEBUG("request_group {Pid, Seq} ~p", [{Pid, Seq}]), case gen_server:call(Pid, {request_group, Seq}, infinity) of - {ok, Group} -> + {ok, Group, RefCounter} -> + couch_ref_counter:add(RefCounter), {ok, Group}; Else -> ?LOG_DEBUG("get_updated_group replied with _Else ~p", [Else]), @@ -70,14 +73,16 @@ start_link(InitArgs) -> init({InitArgs, ReturnPid, Ref}) -> process_flag(trap_exit, true), case prepare_group(InitArgs, false) of - {ok, #group{db=Db}=Group} -> + {ok, #group{db=Db, fd=Fd}=Group} -> couch_db:monitor(Db), Pid = spawn_link(fun()-> couch_view_updater:update(Group) end), + {ok, RefCounter} = couch_ref_counter:start([Fd]), {ok, #group_state{ db_name=couch_db:name(Db), init_args=InitArgs, updater_pid = Pid, - group=Group}}; + group=Group, + ref_counter=RefCounter}}; Error -> ReturnPid ! {Ref, self(), Error}, ignore @@ -120,10 +125,11 @@ handle_call({request_group, RequestSeq}, From, % If the request seqence is less than or equal to the seq_id of a known Group, % we respond with that Group. -handle_call({request_group, RequestSeq}, _From, - #group_state{group=#group{current_seq=GroupSeq}=Group}=State) - when RequestSeq =< GroupSeq -> - {reply, {ok, Group}, State}; +handle_call({request_group, RequestSeq}, _From, #group_state{ + group = #group{current_seq=GroupSeq} = Group, + ref_counter = RefCounter + } = State) when RequestSeq =< GroupSeq -> + {reply, {ok, Group, RefCounter}, State}; % Otherwise: TargetSeq => RequestSeq > GroupSeq % We've already initiated the appropriate action, so just hold the response until the group is up to the RequestSeq @@ -134,9 +140,63 @@ handle_call({request_group, RequestSeq}, From, }, infinity}. -handle_cast(foo, State) -> - {ok, State}. +handle_cast({start_compact, CompactFun}, #group_state{ compactor_pid=nil, + group=Group, init_args={view, RootDir, DbName, GroupId} } = State) -> + ?LOG_INFO("Starting view group compaction", []), + {ok, Db} = couch_db:open(DbName, []), + {ok, Fd} = open_index_file(RootDir, DbName, <>), + NewGroup = reset_file(Db, Fd, DbName, Group), + Pid = spawn_link(fun() -> CompactFun(Group, NewGroup) end), + {noreply, State#group_state{compactor_pid = Pid}}; +handle_cast({start_compact, _}, State) -> + %% compact already running, this is a no-op + {noreply, State}; +handle_cast({compact_done, #group{fd=NewFd, current_seq=NewSeq} = NewGroup}, + #group_state{ + group = #group{current_seq=OldSeq} = Group, + init_args = {view, RootDir, DbName, GroupId}, + updater_pid = nil, + ref_counter = RefCounter + } = State) when NewSeq >= OldSeq -> + ?LOG_INFO("View Group compaction complete", []), + BaseName = RootDir ++ "/." ++ ?b2l(DbName) ++ ?b2l(GroupId), + FileName = BaseName ++ ".view", + CompactName = BaseName ++".compact.view", + file:delete(FileName), + ok = file:rename(CompactName, FileName), + + %% cleanup old group + couch_ref_counter:drop(RefCounter), + {ok, NewRefCounter} = couch_ref_counter:start([NewFd]), + case Group#group.db of + nil -> ok; + Else -> couch_db:close(Else) + end, + + erlang:send_after(1000, self(), delayed_commit), + {noreply, State#group_state{ + group=NewGroup, + ref_counter=NewRefCounter, + compactor_pid=nil + }}; +handle_cast({compact_done, NewGroup}, #group_state{ + init_args={view, _RootDir, DbName, GroupId} } = State) -> + ?LOG_INFO("View index compaction still behind main file", []), + couch_db:close(NewGroup#group.db), + {ok, Db} = couch_db:open(DbName, []), + Pid = spawn_link(fun() -> + {_,Ref} = erlang:spawn_monitor(fun() -> + couch_view_updater:update(NewGroup#group{db = Db}) + end), + receive + {'DOWN', Ref, _, _, {new_group, NewGroup2}} -> + #group{name=GroupId} = NewGroup2, + Pid2 = couch_view:get_group_server(DbName, GroupId), + gen_server:cast(Pid2, {compact_done, NewGroup2}) + end + end), + {noreply, State#group_state{compactor_pid = Pid}}. handle_info(delayed_commit, #group_state{db_name=DbName,group=Group}=State) -> {ok, Db} = couch_db:open(DbName, []), @@ -160,6 +220,7 @@ handle_info(delayed_commit, #group_state{db_name=DbName,group=Group}=State) -> handle_info({'EXIT', FromPid, {new_group, #group{db=Db}=Group}}, #group_state{db_name=DbName, updater_pid=UpPid, + ref_counter=RefCounter, waiting_list=WaitList, waiting_commit=WaitingCommit}=State) when UpPid == FromPid -> ok = couch_db:close(Db), @@ -168,7 +229,7 @@ handle_info({'EXIT', FromPid, {new_group, #group{db=Db}=Group}}, erlang:send_after(1000, self(), delayed_commit); true -> ok end, - case reply_with_group(Group, WaitList, []) of + case reply_with_group(Group, WaitList, [], RefCounter) of [] -> {noreply, State#group_state{waiting_commit=true, waiting_list=[], group=Group#group{db=nil}, updater_pid=nil}}; @@ -221,17 +282,18 @@ code_change(_OldVsn, State, _Extra) -> % reply_with_group/3 % for each item in the WaitingList {Pid, Seq} % if the Seq is =< GroupSeq, reply -reply_with_group(Group=#group{current_seq=GroupSeq}, [{Pid, Seq}|WaitList], StillWaiting) when Seq =< GroupSeq -> - gen_server:reply(Pid, {ok, Group}), - reply_with_group(Group, WaitList, StillWaiting); +reply_with_group(Group=#group{current_seq=GroupSeq}, [{Pid, Seq}|WaitList], + StillWaiting, RefCounter) when Seq =< GroupSeq -> + gen_server:reply(Pid, {ok, Group, RefCounter}), + reply_with_group(Group, WaitList, StillWaiting, RefCounter); % else % put it in the continuing waiting list -reply_with_group(Group, [{Pid, Seq}|WaitList], StillWaiting) -> - reply_with_group(Group, WaitList, [{Pid, Seq}|StillWaiting]); +reply_with_group(Group, [{Pid, Seq}|WaitList], StillWaiting, RefCounter) -> + reply_with_group(Group, WaitList, [{Pid, Seq}|StillWaiting], RefCounter); % return the still waiting list -reply_with_group(_Group, [], StillWaiting) -> +reply_with_group(_Group, [], StillWaiting, _RefCounter) -> StillWaiting. reply_all(#group_state{waiting_list=WaitList}=State, Reply) -> -- cgit v1.2.3