diff options
author | Filipe David Borba Manana <fdmanana@apache.org> | 2011-06-23 09:45:39 +0000 |
---|---|---|
committer | Filipe David Borba Manana <fdmanana@apache.org> | 2011-06-23 09:45:39 +0000 |
commit | 91e2121c913a54a77482ed3883f7a8d2d00801a4 (patch) | |
tree | c2e8672d4b1ab6cd04fefcfb8df0ce52b2911ef9 /src/couchdb | |
parent | d8dc16093a26c53407d8bf702698848104ba01d6 (diff) |
Merged revision 1138796 from trunk
Simpler and safer db open/closing in view group servers
This makes the opening and closing of databases in the view
group server to be more friendly with the db reference counting
system, avoiding more potential db file leaking after compaction,
as we currently open a database in one process and use it on
another process (view compactor, view updater).
This significantly reduces the chances of failure when compacting
very large views as discussed in COUCHDB-994.
This relates to COUCHDB-926 and COUCHDB-994.
git-svn-id: https://svn.apache.org/repos/asf/couchdb/branches/1.1.x@1138798 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src/couchdb')
-rw-r--r-- | src/couchdb/couch_db.hrl | 1 | ||||
-rw-r--r-- | src/couchdb/couch_view_compactor.erl | 8 | ||||
-rw-r--r-- | src/couchdb/couch_view_group.erl | 72 | ||||
-rw-r--r-- | src/couchdb/couch_view_updater.erl | 14 |
4 files changed, 39 insertions, 56 deletions
diff --git a/src/couchdb/couch_db.hrl b/src/couchdb/couch_db.hrl index 003cb688..31318782 100644 --- a/src/couchdb/couch_db.hrl +++ b/src/couchdb/couch_db.hrl @@ -206,7 +206,6 @@ -record(group, { sig=nil, - db=nil, fd=nil, name, def_lang, diff --git a/src/couchdb/couch_view_compactor.erl b/src/couchdb/couch_view_compactor.erl index 8eda43e9..734605f0 100644 --- a/src/couchdb/couch_view_compactor.erl +++ b/src/couchdb/couch_view_compactor.erl @@ -20,14 +20,14 @@ %% @doc Compacts the views. GroupId must not include the _design/ prefix start_compact(DbName, GroupId) -> Pid = couch_view:get_group_server(DbName, <<"_design/",GroupId/binary>>), - gen_server:cast(Pid, {start_compact, fun compact_group/2}). + gen_server:cast(Pid, {start_compact, fun compact_group/3}). %%============================================================================= %% internal functions %%============================================================================= %% @spec compact_group(Group, NewGroup) -> ok -compact_group(Group, EmptyGroup) -> +compact_group(Group, EmptyGroup, DbName) -> #group{ current_seq = Seq, id_btree = IdBtree, @@ -36,15 +36,15 @@ compact_group(Group, EmptyGroup) -> } = Group, #group{ - db = Db, id_btree = EmptyIdBtree, views = EmptyViews } = EmptyGroup, + {ok, Db} = couch_db:open_int(DbName, []), {ok, {Count, _}} = couch_btree:full_reduce(Db#db.fulldocinfo_by_id_btree), + couch_db:close(Db), <<"_design", ShortName/binary>> = GroupId, - DbName = couch_db:name(Db), TaskName = <<DbName/binary, ShortName/binary>>, couch_task_status:add_task(<<"View Group Compaction">>, TaskName, <<"">>), diff --git a/src/couchdb/couch_view_group.erl b/src/couchdb/couch_view_group.erl index 6ef1dcb4..448a7dcf 100644 --- a/src/couchdb/couch_view_group.erl +++ b/src/couchdb/couch_view_group.erl @@ -78,7 +78,7 @@ start_link(InitArgs) -> init({{_, DbName, _} = InitArgs, ReturnPid, Ref}) -> process_flag(trap_exit, true), try prepare_group(InitArgs, false) of - {ok, #group{db=Db, fd=Fd, current_seq=Seq}=Group} -> + {ok, Db, #group{fd=Fd, current_seq=Seq}=Group} -> case Seq > couch_db:get_update_seq(Db) of true -> ReturnPid ! {Ref, self(), {error, invalid_view_seq}}, @@ -90,7 +90,7 @@ init({{_, DbName, _} = InitArgs, ReturnPid, Ref}) -> {ok, #group_state{ db_name=DbName, init_args=InitArgs, - group=Group#group{db=nil}, + group=Group, ref_counter=RefCounter}} end; Error -> @@ -124,14 +124,11 @@ handle_call({request_group, RequestSeq}, From, updater_pid=nil, waiting_list=WaitList }=State) when RequestSeq > Seq -> - {ok, Db} = couch_db:open_int(DbName, []), - Group2 = Group#group{db=Db}, Owner = self(), - Pid = spawn_link(fun()-> couch_view_updater:update(Owner, Group2) end), + Pid = spawn_link(fun()-> couch_view_updater:update(Owner, Group, DbName) end), {noreply, State#group_state{ updater_pid=Pid, - group=Group2, waiting_list=[{From,RequestSeq}|WaitList] }, infinity}; @@ -166,7 +163,8 @@ handle_cast({start_compact, CompactFun}, #group_state{compactor_pid=nil} {ok, Db} = couch_db:open_int(DbName, []), {ok, Fd} = open_index_file(compact, RootDir, DbName, GroupSig), NewGroup = reset_file(Db, Fd, DbName, Group), - Pid = spawn_link(fun() -> CompactFun(Group, NewGroup) end), + couch_db:close(Db), + Pid = spawn_link(fun() -> CompactFun(Group, NewGroup, DbName) end), {noreply, State#group_state{compactor_pid = Pid}}; handle_cast({start_compact, _}, State) -> %% compact already running, this is a no-op @@ -176,7 +174,7 @@ handle_cast({compact_done, #group{current_seq=NewSeq} = NewGroup}, #group_state{group = #group{current_seq=OldSeq}} = State) when NewSeq >= OldSeq -> #group_state{ - group = #group{name=GroupId, fd=OldFd, sig=GroupSig} = Group, + group = #group{name=GroupId, fd=OldFd, sig=GroupSig}, init_args = {RootDir, DbName, _}, updater_pid = UpdaterPid, compactor_pid = CompactorPid, @@ -195,7 +193,7 @@ handle_cast({compact_done, #group{current_seq=NewSeq} = NewGroup}, unlink(UpdaterPid), exit(UpdaterPid, view_compaction_complete), Owner = self(), - spawn_link(fun()-> couch_view_updater:update(Owner, NewGroup) end); + spawn_link(fun()-> couch_view_updater:update(Owner, NewGroup, DbName) end); true -> nil end, @@ -206,19 +204,10 @@ handle_cast({compact_done, #group{current_seq=NewSeq} = NewGroup}, unlink(OldFd), couch_ref_counter:drop(RefCounter), {ok, NewRefCounter} = couch_ref_counter:start([NewGroup#group.fd]), - case Group#group.db of - nil -> ok; - Else -> couch_db:close(Else) - end, - - case NewGroup#group.db of - nil -> ok; - _ -> couch_db:close(NewGroup#group.db) - end, self() ! delayed_commit, {noreply, State#group_state{ - group=NewGroup#group{db = nil}, + group=NewGroup, ref_counter=NewRefCounter, compactor_pid=nil, updater_pid=NewUpdaterPid @@ -230,18 +219,15 @@ handle_cast({compact_done, NewGroup}, State) -> } = State, ?LOG_INFO("View index compaction still behind for ~s ~s -- current: ~p " ++ "compact: ~p", [DbName, GroupId, CurrentSeq, NewGroup#group.current_seq]), - couch_db:close(NewGroup#group.db), Pid = spawn_link(fun() -> - {ok, Db} = couch_db:open_int(DbName, []), {_,Ref} = erlang:spawn_monitor(fun() -> - couch_view_updater:update(nil, NewGroup#group{db = Db}) + couch_view_updater:update(nil, NewGroup, DbName) end), receive {'DOWN', Ref, _, _, {new_group, NewGroup2}} -> - couch_db:close(Db), #group{name=GroupId} = NewGroup2, Pid2 = couch_view:get_group_server(DbName, GroupId), - gen_server:cast(Pid2, {compact_done, NewGroup2#group{db = nil}}) + gen_server:cast(Pid2, {compact_done, NewGroup2}) end end), {noreply, State#group_state{compactor_pid = Pid}}; @@ -283,13 +269,12 @@ handle_info(delayed_commit, #group_state{db_name=DbName,group=Group}=State) -> {noreply, State#group_state{waiting_commit=true}} end; -handle_info({'EXIT', FromPid, {new_group, #group{db=Db}=Group}}, +handle_info({'EXIT', FromPid, {new_group, Group}}, #group_state{db_name=DbName, updater_pid=UpPid, ref_counter=RefCounter, waiting_list=WaitList, waiting_commit=WaitingCommit}=State) when UpPid == FromPid -> - ok = couch_db:close(Db), if not WaitingCommit -> erlang:send_after(1000, self(), delayed_commit); true -> ok @@ -297,30 +282,27 @@ handle_info({'EXIT', FromPid, {new_group, #group{db=Db}=Group}}, case reply_with_group(Group, WaitList, [], RefCounter) of [] -> {noreply, State#group_state{waiting_commit=true, waiting_list=[], - group=Group#group{db=nil}, updater_pid=nil}}; + group=Group, updater_pid=nil}}; StillWaiting -> % we still have some waiters, reopen the database and reupdate the index - {ok, Db2} = couch_db:open_int(DbName, []), - Group2 = Group#group{db=Db2}, Owner = self(), - Pid = spawn_link(fun() -> couch_view_updater:update(Owner, Group2) end), + Pid = spawn_link(fun() -> couch_view_updater:update(Owner, Group, DbName) end), {noreply, State#group_state{waiting_commit=true, - waiting_list=StillWaiting, group=Group2, updater_pid=Pid}} + waiting_list=StillWaiting, updater_pid=Pid}} end; handle_info({'EXIT', _, {new_group, _}}, State) -> %% message from an old (probably pre-compaction) updater; ignore {noreply, State}; -handle_info({'EXIT', FromPid, reset}, - #group_state{ - init_args=InitArgs, - updater_pid=UpPid, - group=Group}=State) when UpPid == FromPid -> - ok = couch_db:close(Group#group.db), +handle_info({'EXIT', UpPid, reset}, + #group_state{init_args=InitArgs, updater_pid=UpPid} = State) -> case prepare_group(InitArgs, true) of - {ok, ResetGroup} -> + {ok, Db, ResetGroup} -> Owner = self(), - Pid = spawn_link(fun()-> couch_view_updater:update(Owner, ResetGroup) end), + couch_db:close(Db), + Pid = spawn_link(fun() -> + couch_view_updater:update(Owner, ResetGroup, Db#db.name) + end), {noreply, State#group_state{ updater_pid=Pid, group=ResetGroup}}; @@ -386,17 +368,17 @@ prepare_group({RootDir, DbName, #group{sig=Sig}=Group}, ForceReset)-> {ok, Fd} -> if ForceReset -> % this can happen if we missed a purge - {ok, reset_file(Db, Fd, DbName, Group)}; + {ok, Db, reset_file(Db, Fd, DbName, Group)}; true -> % 09 UPGRADE CODE ok = couch_file:upgrade_old_header(Fd, <<$r, $c, $k, 0>>), case (catch couch_file:read_header(Fd)) of {ok, {Sig, HeaderInfo}} -> % sigs match! - {ok, init_group(Db, Fd, Group, HeaderInfo)}; + {ok, Db, init_group(Db, Fd, Group, HeaderInfo)}; _ -> % this happens on a new file - {ok, reset_file(Db, Fd, DbName, Group)} + {ok, Db, reset_file(Db, Fd, DbName, Group)} end end; Error -> @@ -582,7 +564,7 @@ design_doc_to_view_group(#doc{id=Id,body={Fields}}) -> reset_group(#group{views=Views}=Group) -> Views2 = [View#view{btree=nil} || View <- Views], - Group#group{db=nil,fd=nil,query_server=nil,current_seq=0, + Group#group{fd=nil,query_server=nil,current_seq=0, id_btree=nil,views=Views2}. reset_file(Db, Fd, DbName, #group{sig=Sig,name=Name} = Group) -> @@ -598,7 +580,7 @@ init_group(Db, Fd, #group{views=Views}=Group, nil) -> init_group(Db, Fd, Group, #index_header{seq=0, purge_seq=couch_db:get_purge_seq(Db), id_btree_state=nil, view_states=[{nil, 0, 0} || _ <- Views]}); -init_group(Db, Fd, #group{def_lang=Lang,views=Views}= +init_group(_Db, Fd, #group{def_lang=Lang,views=Views}= Group, IndexHeader) -> #index_header{seq=Seq, purge_seq=PurgeSeq, id_btree_state=IdBtreeState, view_states=ViewStates} = IndexHeader, @@ -638,5 +620,5 @@ init_group(Db, Fd, #group{def_lang=Lang,views=Views}= View#view{btree=Btree, update_seq=USeq, purge_seq=PSeq} end, ViewStates2, Views), - Group#group{db=Db, fd=Fd, current_seq=Seq, purge_seq=PurgeSeq, + Group#group{fd=Fd, current_seq=Seq, purge_seq=PurgeSeq, id_btree=IdBtree, views=Views2}. diff --git a/src/couchdb/couch_view_updater.erl b/src/couchdb/couch_view_updater.erl index 8e089fa9..2cc390df 100644 --- a/src/couchdb/couch_view_updater.erl +++ b/src/couchdb/couch_view_updater.erl @@ -12,30 +12,31 @@ -module(couch_view_updater). --export([update/2]). +-export([update/3]). -include("couch_db.hrl"). --spec update(_, #group{}) -> no_return(). +-spec update(_, #group{}, Dbname::binary()) -> no_return(). -update(Owner, Group) -> +update(Owner, Group, DbName) -> #group{ - db = #db{name=DbName} = Db, name = GroupName, current_seq = Seq, purge_seq = PurgeSeq } = Group, couch_task_status:add_task(<<"View Group Indexer">>, <<DbName/binary," ",GroupName/binary>>, <<"Starting index update">>), + {ok, Db} = couch_db:open_int(DbName, []), DbPurgeSeq = couch_db:get_purge_seq(Db), Group2 = if DbPurgeSeq == PurgeSeq -> Group; DbPurgeSeq == PurgeSeq + 1 -> couch_task_status:update(<<"Removing purged entries from view index.">>), - purge_index(Group); + purge_index(Group, Db); true -> couch_task_status:update(<<"Resetting view index due to lost purge entries.">>), + couch_db:close(Db), exit(reset) end, {ok, MapQueue} = couch_work_queue:new( @@ -73,13 +74,14 @@ update(Owner, Group) -> couch_task_status:set_update_frequency(0), couch_task_status:update("Finishing."), couch_work_queue:close(MapQueue), + couch_db:close(Db), receive {new_group, NewGroup} -> exit({new_group, NewGroup#group{current_seq=couch_db:get_update_seq(Db)}}) end. -purge_index(#group{db=Db, views=Views, id_btree=IdBtree}=Group) -> +purge_index(#group{views=Views, id_btree=IdBtree}=Group, Db) -> {ok, PurgedIdsRevs} = couch_db:get_last_purged(Db), Ids = [Id || {Id, _Revs} <- PurgedIdsRevs], {ok, Lookups, IdBtree2} = couch_btree:query_modify(IdBtree, Ids, [], Ids), |