diff options
author | Filipe David Borba Manana <fdmanana@apache.org> | 2011-06-23 09:45:39 +0000 |
---|---|---|
committer | Filipe David Borba Manana <fdmanana@apache.org> | 2011-06-23 09:45:39 +0000 |
commit | 91e2121c913a54a77482ed3883f7a8d2d00801a4 (patch) | |
tree | c2e8672d4b1ab6cd04fefcfb8df0ce52b2911ef9 /src/couchdb/couch_view_group.erl | |
parent | d8dc16093a26c53407d8bf702698848104ba01d6 (diff) |
Merged revision 1138796 from trunk
Simpler and safer db open/closing in view group servers
This makes the opening and closing of databases in the view
group server to be more friendly with the db reference counting
system, avoiding more potential db file leaking after compaction,
as we currently open a database in one process and use it on
another process (view compactor, view updater).
This significantly reduces the chances of failure when compacting
very large views as discussed in COUCHDB-994.
This relates to COUCHDB-926 and COUCHDB-994.
git-svn-id: https://svn.apache.org/repos/asf/couchdb/branches/1.1.x@1138798 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src/couchdb/couch_view_group.erl')
-rw-r--r-- | src/couchdb/couch_view_group.erl | 72 |
1 files changed, 27 insertions, 45 deletions
diff --git a/src/couchdb/couch_view_group.erl b/src/couchdb/couch_view_group.erl index 6ef1dcb4..448a7dcf 100644 --- a/src/couchdb/couch_view_group.erl +++ b/src/couchdb/couch_view_group.erl @@ -78,7 +78,7 @@ start_link(InitArgs) -> init({{_, DbName, _} = InitArgs, ReturnPid, Ref}) -> process_flag(trap_exit, true), try prepare_group(InitArgs, false) of - {ok, #group{db=Db, fd=Fd, current_seq=Seq}=Group} -> + {ok, Db, #group{fd=Fd, current_seq=Seq}=Group} -> case Seq > couch_db:get_update_seq(Db) of true -> ReturnPid ! {Ref, self(), {error, invalid_view_seq}}, @@ -90,7 +90,7 @@ init({{_, DbName, _} = InitArgs, ReturnPid, Ref}) -> {ok, #group_state{ db_name=DbName, init_args=InitArgs, - group=Group#group{db=nil}, + group=Group, ref_counter=RefCounter}} end; Error -> @@ -124,14 +124,11 @@ handle_call({request_group, RequestSeq}, From, updater_pid=nil, waiting_list=WaitList }=State) when RequestSeq > Seq -> - {ok, Db} = couch_db:open_int(DbName, []), - Group2 = Group#group{db=Db}, Owner = self(), - Pid = spawn_link(fun()-> couch_view_updater:update(Owner, Group2) end), + Pid = spawn_link(fun()-> couch_view_updater:update(Owner, Group, DbName) end), {noreply, State#group_state{ updater_pid=Pid, - group=Group2, waiting_list=[{From,RequestSeq}|WaitList] }, infinity}; @@ -166,7 +163,8 @@ handle_cast({start_compact, CompactFun}, #group_state{compactor_pid=nil} {ok, Db} = couch_db:open_int(DbName, []), {ok, Fd} = open_index_file(compact, RootDir, DbName, GroupSig), NewGroup = reset_file(Db, Fd, DbName, Group), - Pid = spawn_link(fun() -> CompactFun(Group, NewGroup) end), + couch_db:close(Db), + Pid = spawn_link(fun() -> CompactFun(Group, NewGroup, DbName) end), {noreply, State#group_state{compactor_pid = Pid}}; handle_cast({start_compact, _}, State) -> %% compact already running, this is a no-op @@ -176,7 +174,7 @@ handle_cast({compact_done, #group{current_seq=NewSeq} = NewGroup}, #group_state{group = #group{current_seq=OldSeq}} = State) when NewSeq >= OldSeq -> #group_state{ - group = #group{name=GroupId, fd=OldFd, sig=GroupSig} = Group, + group = #group{name=GroupId, fd=OldFd, sig=GroupSig}, init_args = {RootDir, DbName, _}, updater_pid = UpdaterPid, compactor_pid = CompactorPid, @@ -195,7 +193,7 @@ handle_cast({compact_done, #group{current_seq=NewSeq} = NewGroup}, unlink(UpdaterPid), exit(UpdaterPid, view_compaction_complete), Owner = self(), - spawn_link(fun()-> couch_view_updater:update(Owner, NewGroup) end); + spawn_link(fun()-> couch_view_updater:update(Owner, NewGroup, DbName) end); true -> nil end, @@ -206,19 +204,10 @@ handle_cast({compact_done, #group{current_seq=NewSeq} = NewGroup}, unlink(OldFd), couch_ref_counter:drop(RefCounter), {ok, NewRefCounter} = couch_ref_counter:start([NewGroup#group.fd]), - case Group#group.db of - nil -> ok; - Else -> couch_db:close(Else) - end, - - case NewGroup#group.db of - nil -> ok; - _ -> couch_db:close(NewGroup#group.db) - end, self() ! delayed_commit, {noreply, State#group_state{ - group=NewGroup#group{db = nil}, + group=NewGroup, ref_counter=NewRefCounter, compactor_pid=nil, updater_pid=NewUpdaterPid @@ -230,18 +219,15 @@ handle_cast({compact_done, NewGroup}, State) -> } = State, ?LOG_INFO("View index compaction still behind for ~s ~s -- current: ~p " ++ "compact: ~p", [DbName, GroupId, CurrentSeq, NewGroup#group.current_seq]), - couch_db:close(NewGroup#group.db), Pid = spawn_link(fun() -> - {ok, Db} = couch_db:open_int(DbName, []), {_,Ref} = erlang:spawn_monitor(fun() -> - couch_view_updater:update(nil, NewGroup#group{db = Db}) + couch_view_updater:update(nil, NewGroup, DbName) end), receive {'DOWN', Ref, _, _, {new_group, NewGroup2}} -> - couch_db:close(Db), #group{name=GroupId} = NewGroup2, Pid2 = couch_view:get_group_server(DbName, GroupId), - gen_server:cast(Pid2, {compact_done, NewGroup2#group{db = nil}}) + gen_server:cast(Pid2, {compact_done, NewGroup2}) end end), {noreply, State#group_state{compactor_pid = Pid}}; @@ -283,13 +269,12 @@ handle_info(delayed_commit, #group_state{db_name=DbName,group=Group}=State) -> {noreply, State#group_state{waiting_commit=true}} end; -handle_info({'EXIT', FromPid, {new_group, #group{db=Db}=Group}}, +handle_info({'EXIT', FromPid, {new_group, Group}}, #group_state{db_name=DbName, updater_pid=UpPid, ref_counter=RefCounter, waiting_list=WaitList, waiting_commit=WaitingCommit}=State) when UpPid == FromPid -> - ok = couch_db:close(Db), if not WaitingCommit -> erlang:send_after(1000, self(), delayed_commit); true -> ok @@ -297,30 +282,27 @@ handle_info({'EXIT', FromPid, {new_group, #group{db=Db}=Group}}, case reply_with_group(Group, WaitList, [], RefCounter) of [] -> {noreply, State#group_state{waiting_commit=true, waiting_list=[], - group=Group#group{db=nil}, updater_pid=nil}}; + group=Group, updater_pid=nil}}; StillWaiting -> % we still have some waiters, reopen the database and reupdate the index - {ok, Db2} = couch_db:open_int(DbName, []), - Group2 = Group#group{db=Db2}, Owner = self(), - Pid = spawn_link(fun() -> couch_view_updater:update(Owner, Group2) end), + Pid = spawn_link(fun() -> couch_view_updater:update(Owner, Group, DbName) end), {noreply, State#group_state{waiting_commit=true, - waiting_list=StillWaiting, group=Group2, updater_pid=Pid}} + waiting_list=StillWaiting, updater_pid=Pid}} end; handle_info({'EXIT', _, {new_group, _}}, State) -> %% message from an old (probably pre-compaction) updater; ignore {noreply, State}; -handle_info({'EXIT', FromPid, reset}, - #group_state{ - init_args=InitArgs, - updater_pid=UpPid, - group=Group}=State) when UpPid == FromPid -> - ok = couch_db:close(Group#group.db), +handle_info({'EXIT', UpPid, reset}, + #group_state{init_args=InitArgs, updater_pid=UpPid} = State) -> case prepare_group(InitArgs, true) of - {ok, ResetGroup} -> + {ok, Db, ResetGroup} -> Owner = self(), - Pid = spawn_link(fun()-> couch_view_updater:update(Owner, ResetGroup) end), + couch_db:close(Db), + Pid = spawn_link(fun() -> + couch_view_updater:update(Owner, ResetGroup, Db#db.name) + end), {noreply, State#group_state{ updater_pid=Pid, group=ResetGroup}}; @@ -386,17 +368,17 @@ prepare_group({RootDir, DbName, #group{sig=Sig}=Group}, ForceReset)-> {ok, Fd} -> if ForceReset -> % this can happen if we missed a purge - {ok, reset_file(Db, Fd, DbName, Group)}; + {ok, Db, reset_file(Db, Fd, DbName, Group)}; true -> % 09 UPGRADE CODE ok = couch_file:upgrade_old_header(Fd, <<$r, $c, $k, 0>>), case (catch couch_file:read_header(Fd)) of {ok, {Sig, HeaderInfo}} -> % sigs match! - {ok, init_group(Db, Fd, Group, HeaderInfo)}; + {ok, Db, init_group(Db, Fd, Group, HeaderInfo)}; _ -> % this happens on a new file - {ok, reset_file(Db, Fd, DbName, Group)} + {ok, Db, reset_file(Db, Fd, DbName, Group)} end end; Error -> @@ -582,7 +564,7 @@ design_doc_to_view_group(#doc{id=Id,body={Fields}}) -> reset_group(#group{views=Views}=Group) -> Views2 = [View#view{btree=nil} || View <- Views], - Group#group{db=nil,fd=nil,query_server=nil,current_seq=0, + Group#group{fd=nil,query_server=nil,current_seq=0, id_btree=nil,views=Views2}. reset_file(Db, Fd, DbName, #group{sig=Sig,name=Name} = Group) -> @@ -598,7 +580,7 @@ init_group(Db, Fd, #group{views=Views}=Group, nil) -> init_group(Db, Fd, Group, #index_header{seq=0, purge_seq=couch_db:get_purge_seq(Db), id_btree_state=nil, view_states=[{nil, 0, 0} || _ <- Views]}); -init_group(Db, Fd, #group{def_lang=Lang,views=Views}= +init_group(_Db, Fd, #group{def_lang=Lang,views=Views}= Group, IndexHeader) -> #index_header{seq=Seq, purge_seq=PurgeSeq, id_btree_state=IdBtreeState, view_states=ViewStates} = IndexHeader, @@ -638,5 +620,5 @@ init_group(Db, Fd, #group{def_lang=Lang,views=Views}= View#view{btree=Btree, update_seq=USeq, purge_seq=PSeq} end, ViewStates2, Views), - Group#group{db=Db, fd=Fd, current_seq=Seq, purge_seq=PurgeSeq, + Group#group{fd=Fd, current_seq=Seq, purge_seq=PurgeSeq, id_btree=IdBtree, views=Views2}. |