From 91e2121c913a54a77482ed3883f7a8d2d00801a4 Mon Sep 17 00:00:00 2001 From: Filipe David Borba Manana Date: Thu, 23 Jun 2011 09:45:39 +0000 Subject: Merged revision 1138796 from trunk Simpler and safer db open/closing in view group servers This makes the opening and closing of databases in the view group server to be more friendly with the db reference counting system, avoiding more potential db file leaking after compaction, as we currently open a database in one process and use it on another process (view compactor, view updater). This significantly reduces the chances of failure when compacting very large views as discussed in COUCHDB-994. This relates to COUCHDB-926 and COUCHDB-994. git-svn-id: https://svn.apache.org/repos/asf/couchdb/branches/1.1.x@1138798 13f79535-47bb-0310-9956-ffa450edef68 --- src/couchdb/couch_db.hrl | 1 - src/couchdb/couch_view_compactor.erl | 8 ++-- src/couchdb/couch_view_group.erl | 72 ++++++++++++++---------------------- src/couchdb/couch_view_updater.erl | 14 ++++--- 4 files changed, 39 insertions(+), 56 deletions(-) (limited to 'src/couchdb') diff --git a/src/couchdb/couch_db.hrl b/src/couchdb/couch_db.hrl index 003cb688..31318782 100644 --- a/src/couchdb/couch_db.hrl +++ b/src/couchdb/couch_db.hrl @@ -206,7 +206,6 @@ -record(group, { sig=nil, - db=nil, fd=nil, name, def_lang, diff --git a/src/couchdb/couch_view_compactor.erl b/src/couchdb/couch_view_compactor.erl index 8eda43e9..734605f0 100644 --- a/src/couchdb/couch_view_compactor.erl +++ b/src/couchdb/couch_view_compactor.erl @@ -20,14 +20,14 @@ %% @doc Compacts the views. GroupId must not include the _design/ prefix start_compact(DbName, GroupId) -> Pid = couch_view:get_group_server(DbName, <<"_design/",GroupId/binary>>), - gen_server:cast(Pid, {start_compact, fun compact_group/2}). + gen_server:cast(Pid, {start_compact, fun compact_group/3}). %%============================================================================= %% internal functions %%============================================================================= %% @spec compact_group(Group, NewGroup) -> ok -compact_group(Group, EmptyGroup) -> +compact_group(Group, EmptyGroup, DbName) -> #group{ current_seq = Seq, id_btree = IdBtree, @@ -36,15 +36,15 @@ compact_group(Group, EmptyGroup) -> } = Group, #group{ - db = Db, id_btree = EmptyIdBtree, views = EmptyViews } = EmptyGroup, + {ok, Db} = couch_db:open_int(DbName, []), {ok, {Count, _}} = couch_btree:full_reduce(Db#db.fulldocinfo_by_id_btree), + couch_db:close(Db), <<"_design", ShortName/binary>> = GroupId, - DbName = couch_db:name(Db), TaskName = <>, couch_task_status:add_task(<<"View Group Compaction">>, TaskName, <<"">>), diff --git a/src/couchdb/couch_view_group.erl b/src/couchdb/couch_view_group.erl index 6ef1dcb4..448a7dcf 100644 --- a/src/couchdb/couch_view_group.erl +++ b/src/couchdb/couch_view_group.erl @@ -78,7 +78,7 @@ start_link(InitArgs) -> init({{_, DbName, _} = InitArgs, ReturnPid, Ref}) -> process_flag(trap_exit, true), try prepare_group(InitArgs, false) of - {ok, #group{db=Db, fd=Fd, current_seq=Seq}=Group} -> + {ok, Db, #group{fd=Fd, current_seq=Seq}=Group} -> case Seq > couch_db:get_update_seq(Db) of true -> ReturnPid ! {Ref, self(), {error, invalid_view_seq}}, @@ -90,7 +90,7 @@ init({{_, DbName, _} = InitArgs, ReturnPid, Ref}) -> {ok, #group_state{ db_name=DbName, init_args=InitArgs, - group=Group#group{db=nil}, + group=Group, ref_counter=RefCounter}} end; Error -> @@ -124,14 +124,11 @@ handle_call({request_group, RequestSeq}, From, updater_pid=nil, waiting_list=WaitList }=State) when RequestSeq > Seq -> - {ok, Db} = couch_db:open_int(DbName, []), - Group2 = Group#group{db=Db}, Owner = self(), - Pid = spawn_link(fun()-> couch_view_updater:update(Owner, Group2) end), + Pid = spawn_link(fun()-> couch_view_updater:update(Owner, Group, DbName) end), {noreply, State#group_state{ updater_pid=Pid, - group=Group2, waiting_list=[{From,RequestSeq}|WaitList] }, infinity}; @@ -166,7 +163,8 @@ handle_cast({start_compact, CompactFun}, #group_state{compactor_pid=nil} {ok, Db} = couch_db:open_int(DbName, []), {ok, Fd} = open_index_file(compact, RootDir, DbName, GroupSig), NewGroup = reset_file(Db, Fd, DbName, Group), - Pid = spawn_link(fun() -> CompactFun(Group, NewGroup) end), + couch_db:close(Db), + Pid = spawn_link(fun() -> CompactFun(Group, NewGroup, DbName) end), {noreply, State#group_state{compactor_pid = Pid}}; handle_cast({start_compact, _}, State) -> %% compact already running, this is a no-op @@ -176,7 +174,7 @@ handle_cast({compact_done, #group{current_seq=NewSeq} = NewGroup}, #group_state{group = #group{current_seq=OldSeq}} = State) when NewSeq >= OldSeq -> #group_state{ - group = #group{name=GroupId, fd=OldFd, sig=GroupSig} = Group, + group = #group{name=GroupId, fd=OldFd, sig=GroupSig}, init_args = {RootDir, DbName, _}, updater_pid = UpdaterPid, compactor_pid = CompactorPid, @@ -195,7 +193,7 @@ handle_cast({compact_done, #group{current_seq=NewSeq} = NewGroup}, unlink(UpdaterPid), exit(UpdaterPid, view_compaction_complete), Owner = self(), - spawn_link(fun()-> couch_view_updater:update(Owner, NewGroup) end); + spawn_link(fun()-> couch_view_updater:update(Owner, NewGroup, DbName) end); true -> nil end, @@ -206,19 +204,10 @@ handle_cast({compact_done, #group{current_seq=NewSeq} = NewGroup}, unlink(OldFd), couch_ref_counter:drop(RefCounter), {ok, NewRefCounter} = couch_ref_counter:start([NewGroup#group.fd]), - case Group#group.db of - nil -> ok; - Else -> couch_db:close(Else) - end, - - case NewGroup#group.db of - nil -> ok; - _ -> couch_db:close(NewGroup#group.db) - end, self() ! delayed_commit, {noreply, State#group_state{ - group=NewGroup#group{db = nil}, + group=NewGroup, ref_counter=NewRefCounter, compactor_pid=nil, updater_pid=NewUpdaterPid @@ -230,18 +219,15 @@ handle_cast({compact_done, NewGroup}, State) -> } = State, ?LOG_INFO("View index compaction still behind for ~s ~s -- current: ~p " ++ "compact: ~p", [DbName, GroupId, CurrentSeq, NewGroup#group.current_seq]), - couch_db:close(NewGroup#group.db), Pid = spawn_link(fun() -> - {ok, Db} = couch_db:open_int(DbName, []), {_,Ref} = erlang:spawn_monitor(fun() -> - couch_view_updater:update(nil, NewGroup#group{db = Db}) + couch_view_updater:update(nil, NewGroup, DbName) end), receive {'DOWN', Ref, _, _, {new_group, NewGroup2}} -> - couch_db:close(Db), #group{name=GroupId} = NewGroup2, Pid2 = couch_view:get_group_server(DbName, GroupId), - gen_server:cast(Pid2, {compact_done, NewGroup2#group{db = nil}}) + gen_server:cast(Pid2, {compact_done, NewGroup2}) end end), {noreply, State#group_state{compactor_pid = Pid}}; @@ -283,13 +269,12 @@ handle_info(delayed_commit, #group_state{db_name=DbName,group=Group}=State) -> {noreply, State#group_state{waiting_commit=true}} end; -handle_info({'EXIT', FromPid, {new_group, #group{db=Db}=Group}}, +handle_info({'EXIT', FromPid, {new_group, Group}}, #group_state{db_name=DbName, updater_pid=UpPid, ref_counter=RefCounter, waiting_list=WaitList, waiting_commit=WaitingCommit}=State) when UpPid == FromPid -> - ok = couch_db:close(Db), if not WaitingCommit -> erlang:send_after(1000, self(), delayed_commit); true -> ok @@ -297,30 +282,27 @@ handle_info({'EXIT', FromPid, {new_group, #group{db=Db}=Group}}, case reply_with_group(Group, WaitList, [], RefCounter) of [] -> {noreply, State#group_state{waiting_commit=true, waiting_list=[], - group=Group#group{db=nil}, updater_pid=nil}}; + group=Group, updater_pid=nil}}; StillWaiting -> % we still have some waiters, reopen the database and reupdate the index - {ok, Db2} = couch_db:open_int(DbName, []), - Group2 = Group#group{db=Db2}, Owner = self(), - Pid = spawn_link(fun() -> couch_view_updater:update(Owner, Group2) end), + Pid = spawn_link(fun() -> couch_view_updater:update(Owner, Group, DbName) end), {noreply, State#group_state{waiting_commit=true, - waiting_list=StillWaiting, group=Group2, updater_pid=Pid}} + waiting_list=StillWaiting, updater_pid=Pid}} end; handle_info({'EXIT', _, {new_group, _}}, State) -> %% message from an old (probably pre-compaction) updater; ignore {noreply, State}; -handle_info({'EXIT', FromPid, reset}, - #group_state{ - init_args=InitArgs, - updater_pid=UpPid, - group=Group}=State) when UpPid == FromPid -> - ok = couch_db:close(Group#group.db), +handle_info({'EXIT', UpPid, reset}, + #group_state{init_args=InitArgs, updater_pid=UpPid} = State) -> case prepare_group(InitArgs, true) of - {ok, ResetGroup} -> + {ok, Db, ResetGroup} -> Owner = self(), - Pid = spawn_link(fun()-> couch_view_updater:update(Owner, ResetGroup) end), + couch_db:close(Db), + Pid = spawn_link(fun() -> + couch_view_updater:update(Owner, ResetGroup, Db#db.name) + end), {noreply, State#group_state{ updater_pid=Pid, group=ResetGroup}}; @@ -386,17 +368,17 @@ prepare_group({RootDir, DbName, #group{sig=Sig}=Group}, ForceReset)-> {ok, Fd} -> if ForceReset -> % this can happen if we missed a purge - {ok, reset_file(Db, Fd, DbName, Group)}; + {ok, Db, reset_file(Db, Fd, DbName, Group)}; true -> % 09 UPGRADE CODE ok = couch_file:upgrade_old_header(Fd, <<$r, $c, $k, 0>>), case (catch couch_file:read_header(Fd)) of {ok, {Sig, HeaderInfo}} -> % sigs match! - {ok, init_group(Db, Fd, Group, HeaderInfo)}; + {ok, Db, init_group(Db, Fd, Group, HeaderInfo)}; _ -> % this happens on a new file - {ok, reset_file(Db, Fd, DbName, Group)} + {ok, Db, reset_file(Db, Fd, DbName, Group)} end end; Error -> @@ -582,7 +564,7 @@ design_doc_to_view_group(#doc{id=Id,body={Fields}}) -> reset_group(#group{views=Views}=Group) -> Views2 = [View#view{btree=nil} || View <- Views], - Group#group{db=nil,fd=nil,query_server=nil,current_seq=0, + Group#group{fd=nil,query_server=nil,current_seq=0, id_btree=nil,views=Views2}. reset_file(Db, Fd, DbName, #group{sig=Sig,name=Name} = Group) -> @@ -598,7 +580,7 @@ init_group(Db, Fd, #group{views=Views}=Group, nil) -> init_group(Db, Fd, Group, #index_header{seq=0, purge_seq=couch_db:get_purge_seq(Db), id_btree_state=nil, view_states=[{nil, 0, 0} || _ <- Views]}); -init_group(Db, Fd, #group{def_lang=Lang,views=Views}= +init_group(_Db, Fd, #group{def_lang=Lang,views=Views}= Group, IndexHeader) -> #index_header{seq=Seq, purge_seq=PurgeSeq, id_btree_state=IdBtreeState, view_states=ViewStates} = IndexHeader, @@ -638,5 +620,5 @@ init_group(Db, Fd, #group{def_lang=Lang,views=Views}= View#view{btree=Btree, update_seq=USeq, purge_seq=PSeq} end, ViewStates2, Views), - Group#group{db=Db, fd=Fd, current_seq=Seq, purge_seq=PurgeSeq, + Group#group{fd=Fd, current_seq=Seq, purge_seq=PurgeSeq, id_btree=IdBtree, views=Views2}. diff --git a/src/couchdb/couch_view_updater.erl b/src/couchdb/couch_view_updater.erl index 8e089fa9..2cc390df 100644 --- a/src/couchdb/couch_view_updater.erl +++ b/src/couchdb/couch_view_updater.erl @@ -12,30 +12,31 @@ -module(couch_view_updater). --export([update/2]). +-export([update/3]). -include("couch_db.hrl"). --spec update(_, #group{}) -> no_return(). +-spec update(_, #group{}, Dbname::binary()) -> no_return(). -update(Owner, Group) -> +update(Owner, Group, DbName) -> #group{ - db = #db{name=DbName} = Db, name = GroupName, current_seq = Seq, purge_seq = PurgeSeq } = Group, couch_task_status:add_task(<<"View Group Indexer">>, <>, <<"Starting index update">>), + {ok, Db} = couch_db:open_int(DbName, []), DbPurgeSeq = couch_db:get_purge_seq(Db), Group2 = if DbPurgeSeq == PurgeSeq -> Group; DbPurgeSeq == PurgeSeq + 1 -> couch_task_status:update(<<"Removing purged entries from view index.">>), - purge_index(Group); + purge_index(Group, Db); true -> couch_task_status:update(<<"Resetting view index due to lost purge entries.">>), + couch_db:close(Db), exit(reset) end, {ok, MapQueue} = couch_work_queue:new( @@ -73,13 +74,14 @@ update(Owner, Group) -> couch_task_status:set_update_frequency(0), couch_task_status:update("Finishing."), couch_work_queue:close(MapQueue), + couch_db:close(Db), receive {new_group, NewGroup} -> exit({new_group, NewGroup#group{current_seq=couch_db:get_update_seq(Db)}}) end. -purge_index(#group{db=Db, views=Views, id_btree=IdBtree}=Group) -> +purge_index(#group{views=Views, id_btree=IdBtree}=Group, Db) -> {ok, PurgedIdsRevs} = couch_db:get_last_purged(Db), Ids = [Id || {Id, _Revs} <- PurgedIdsRevs], {ok, Lookups, IdBtree2} = couch_btree:query_modify(IdBtree, Ids, [], Ids), -- cgit v1.2.3