summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorFilipe David Borba Manana <fdmanana@apache.org>2011-06-23 09:45:39 +0000
committerFilipe David Borba Manana <fdmanana@apache.org>2011-06-23 09:45:39 +0000
commit91e2121c913a54a77482ed3883f7a8d2d00801a4 (patch)
treec2e8672d4b1ab6cd04fefcfb8df0ce52b2911ef9 /src
parentd8dc16093a26c53407d8bf702698848104ba01d6 (diff)
Merged revision 1138796 from trunk
Simpler and safer db open/closing in view group servers This makes the opening and closing of databases in the view group server to be more friendly with the db reference counting system, avoiding more potential db file leaking after compaction, as we currently open a database in one process and use it on another process (view compactor, view updater). This significantly reduces the chances of failure when compacting very large views as discussed in COUCHDB-994. This relates to COUCHDB-926 and COUCHDB-994. git-svn-id: https://svn.apache.org/repos/asf/couchdb/branches/1.1.x@1138798 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src')
-rw-r--r--src/couchdb/couch_db.hrl1
-rw-r--r--src/couchdb/couch_view_compactor.erl8
-rw-r--r--src/couchdb/couch_view_group.erl72
-rw-r--r--src/couchdb/couch_view_updater.erl14
4 files changed, 39 insertions, 56 deletions
diff --git a/src/couchdb/couch_db.hrl b/src/couchdb/couch_db.hrl
index 003cb688..31318782 100644
--- a/src/couchdb/couch_db.hrl
+++ b/src/couchdb/couch_db.hrl
@@ -206,7 +206,6 @@
-record(group, {
sig=nil,
- db=nil,
fd=nil,
name,
def_lang,
diff --git a/src/couchdb/couch_view_compactor.erl b/src/couchdb/couch_view_compactor.erl
index 8eda43e9..734605f0 100644
--- a/src/couchdb/couch_view_compactor.erl
+++ b/src/couchdb/couch_view_compactor.erl
@@ -20,14 +20,14 @@
%% @doc Compacts the views. GroupId must not include the _design/ prefix
start_compact(DbName, GroupId) ->
Pid = couch_view:get_group_server(DbName, <<"_design/",GroupId/binary>>),
- gen_server:cast(Pid, {start_compact, fun compact_group/2}).
+ gen_server:cast(Pid, {start_compact, fun compact_group/3}).
%%=============================================================================
%% internal functions
%%=============================================================================
%% @spec compact_group(Group, NewGroup) -> ok
-compact_group(Group, EmptyGroup) ->
+compact_group(Group, EmptyGroup, DbName) ->
#group{
current_seq = Seq,
id_btree = IdBtree,
@@ -36,15 +36,15 @@ compact_group(Group, EmptyGroup) ->
} = Group,
#group{
- db = Db,
id_btree = EmptyIdBtree,
views = EmptyViews
} = EmptyGroup,
+ {ok, Db} = couch_db:open_int(DbName, []),
{ok, {Count, _}} = couch_btree:full_reduce(Db#db.fulldocinfo_by_id_btree),
+ couch_db:close(Db),
<<"_design", ShortName/binary>> = GroupId,
- DbName = couch_db:name(Db),
TaskName = <<DbName/binary, ShortName/binary>>,
couch_task_status:add_task(<<"View Group Compaction">>, TaskName, <<"">>),
diff --git a/src/couchdb/couch_view_group.erl b/src/couchdb/couch_view_group.erl
index 6ef1dcb4..448a7dcf 100644
--- a/src/couchdb/couch_view_group.erl
+++ b/src/couchdb/couch_view_group.erl
@@ -78,7 +78,7 @@ start_link(InitArgs) ->
init({{_, DbName, _} = InitArgs, ReturnPid, Ref}) ->
process_flag(trap_exit, true),
try prepare_group(InitArgs, false) of
- {ok, #group{db=Db, fd=Fd, current_seq=Seq}=Group} ->
+ {ok, Db, #group{fd=Fd, current_seq=Seq}=Group} ->
case Seq > couch_db:get_update_seq(Db) of
true ->
ReturnPid ! {Ref, self(), {error, invalid_view_seq}},
@@ -90,7 +90,7 @@ init({{_, DbName, _} = InitArgs, ReturnPid, Ref}) ->
{ok, #group_state{
db_name=DbName,
init_args=InitArgs,
- group=Group#group{db=nil},
+ group=Group,
ref_counter=RefCounter}}
end;
Error ->
@@ -124,14 +124,11 @@ handle_call({request_group, RequestSeq}, From,
updater_pid=nil,
waiting_list=WaitList
}=State) when RequestSeq > Seq ->
- {ok, Db} = couch_db:open_int(DbName, []),
- Group2 = Group#group{db=Db},
Owner = self(),
- Pid = spawn_link(fun()-> couch_view_updater:update(Owner, Group2) end),
+ Pid = spawn_link(fun()-> couch_view_updater:update(Owner, Group, DbName) end),
{noreply, State#group_state{
updater_pid=Pid,
- group=Group2,
waiting_list=[{From,RequestSeq}|WaitList]
}, infinity};
@@ -166,7 +163,8 @@ handle_cast({start_compact, CompactFun}, #group_state{compactor_pid=nil}
{ok, Db} = couch_db:open_int(DbName, []),
{ok, Fd} = open_index_file(compact, RootDir, DbName, GroupSig),
NewGroup = reset_file(Db, Fd, DbName, Group),
- Pid = spawn_link(fun() -> CompactFun(Group, NewGroup) end),
+ couch_db:close(Db),
+ Pid = spawn_link(fun() -> CompactFun(Group, NewGroup, DbName) end),
{noreply, State#group_state{compactor_pid = Pid}};
handle_cast({start_compact, _}, State) ->
%% compact already running, this is a no-op
@@ -176,7 +174,7 @@ handle_cast({compact_done, #group{current_seq=NewSeq} = NewGroup},
#group_state{group = #group{current_seq=OldSeq}} = State)
when NewSeq >= OldSeq ->
#group_state{
- group = #group{name=GroupId, fd=OldFd, sig=GroupSig} = Group,
+ group = #group{name=GroupId, fd=OldFd, sig=GroupSig},
init_args = {RootDir, DbName, _},
updater_pid = UpdaterPid,
compactor_pid = CompactorPid,
@@ -195,7 +193,7 @@ handle_cast({compact_done, #group{current_seq=NewSeq} = NewGroup},
unlink(UpdaterPid),
exit(UpdaterPid, view_compaction_complete),
Owner = self(),
- spawn_link(fun()-> couch_view_updater:update(Owner, NewGroup) end);
+ spawn_link(fun()-> couch_view_updater:update(Owner, NewGroup, DbName) end);
true ->
nil
end,
@@ -206,19 +204,10 @@ handle_cast({compact_done, #group{current_seq=NewSeq} = NewGroup},
unlink(OldFd),
couch_ref_counter:drop(RefCounter),
{ok, NewRefCounter} = couch_ref_counter:start([NewGroup#group.fd]),
- case Group#group.db of
- nil -> ok;
- Else -> couch_db:close(Else)
- end,
-
- case NewGroup#group.db of
- nil -> ok;
- _ -> couch_db:close(NewGroup#group.db)
- end,
self() ! delayed_commit,
{noreply, State#group_state{
- group=NewGroup#group{db = nil},
+ group=NewGroup,
ref_counter=NewRefCounter,
compactor_pid=nil,
updater_pid=NewUpdaterPid
@@ -230,18 +219,15 @@ handle_cast({compact_done, NewGroup}, State) ->
} = State,
?LOG_INFO("View index compaction still behind for ~s ~s -- current: ~p " ++
"compact: ~p", [DbName, GroupId, CurrentSeq, NewGroup#group.current_seq]),
- couch_db:close(NewGroup#group.db),
Pid = spawn_link(fun() ->
- {ok, Db} = couch_db:open_int(DbName, []),
{_,Ref} = erlang:spawn_monitor(fun() ->
- couch_view_updater:update(nil, NewGroup#group{db = Db})
+ couch_view_updater:update(nil, NewGroup, DbName)
end),
receive
{'DOWN', Ref, _, _, {new_group, NewGroup2}} ->
- couch_db:close(Db),
#group{name=GroupId} = NewGroup2,
Pid2 = couch_view:get_group_server(DbName, GroupId),
- gen_server:cast(Pid2, {compact_done, NewGroup2#group{db = nil}})
+ gen_server:cast(Pid2, {compact_done, NewGroup2})
end
end),
{noreply, State#group_state{compactor_pid = Pid}};
@@ -283,13 +269,12 @@ handle_info(delayed_commit, #group_state{db_name=DbName,group=Group}=State) ->
{noreply, State#group_state{waiting_commit=true}}
end;
-handle_info({'EXIT', FromPid, {new_group, #group{db=Db}=Group}},
+handle_info({'EXIT', FromPid, {new_group, Group}},
#group_state{db_name=DbName,
updater_pid=UpPid,
ref_counter=RefCounter,
waiting_list=WaitList,
waiting_commit=WaitingCommit}=State) when UpPid == FromPid ->
- ok = couch_db:close(Db),
if not WaitingCommit ->
erlang:send_after(1000, self(), delayed_commit);
true -> ok
@@ -297,30 +282,27 @@ handle_info({'EXIT', FromPid, {new_group, #group{db=Db}=Group}},
case reply_with_group(Group, WaitList, [], RefCounter) of
[] ->
{noreply, State#group_state{waiting_commit=true, waiting_list=[],
- group=Group#group{db=nil}, updater_pid=nil}};
+ group=Group, updater_pid=nil}};
StillWaiting ->
% we still have some waiters, reopen the database and reupdate the index
- {ok, Db2} = couch_db:open_int(DbName, []),
- Group2 = Group#group{db=Db2},
Owner = self(),
- Pid = spawn_link(fun() -> couch_view_updater:update(Owner, Group2) end),
+ Pid = spawn_link(fun() -> couch_view_updater:update(Owner, Group, DbName) end),
{noreply, State#group_state{waiting_commit=true,
- waiting_list=StillWaiting, group=Group2, updater_pid=Pid}}
+ waiting_list=StillWaiting, updater_pid=Pid}}
end;
handle_info({'EXIT', _, {new_group, _}}, State) ->
%% message from an old (probably pre-compaction) updater; ignore
{noreply, State};
-handle_info({'EXIT', FromPid, reset},
- #group_state{
- init_args=InitArgs,
- updater_pid=UpPid,
- group=Group}=State) when UpPid == FromPid ->
- ok = couch_db:close(Group#group.db),
+handle_info({'EXIT', UpPid, reset},
+ #group_state{init_args=InitArgs, updater_pid=UpPid} = State) ->
case prepare_group(InitArgs, true) of
- {ok, ResetGroup} ->
+ {ok, Db, ResetGroup} ->
Owner = self(),
- Pid = spawn_link(fun()-> couch_view_updater:update(Owner, ResetGroup) end),
+ couch_db:close(Db),
+ Pid = spawn_link(fun() ->
+ couch_view_updater:update(Owner, ResetGroup, Db#db.name)
+ end),
{noreply, State#group_state{
updater_pid=Pid,
group=ResetGroup}};
@@ -386,17 +368,17 @@ prepare_group({RootDir, DbName, #group{sig=Sig}=Group}, ForceReset)->
{ok, Fd} ->
if ForceReset ->
% this can happen if we missed a purge
- {ok, reset_file(Db, Fd, DbName, Group)};
+ {ok, Db, reset_file(Db, Fd, DbName, Group)};
true ->
% 09 UPGRADE CODE
ok = couch_file:upgrade_old_header(Fd, <<$r, $c, $k, 0>>),
case (catch couch_file:read_header(Fd)) of
{ok, {Sig, HeaderInfo}} ->
% sigs match!
- {ok, init_group(Db, Fd, Group, HeaderInfo)};
+ {ok, Db, init_group(Db, Fd, Group, HeaderInfo)};
_ ->
% this happens on a new file
- {ok, reset_file(Db, Fd, DbName, Group)}
+ {ok, Db, reset_file(Db, Fd, DbName, Group)}
end
end;
Error ->
@@ -582,7 +564,7 @@ design_doc_to_view_group(#doc{id=Id,body={Fields}}) ->
reset_group(#group{views=Views}=Group) ->
Views2 = [View#view{btree=nil} || View <- Views],
- Group#group{db=nil,fd=nil,query_server=nil,current_seq=0,
+ Group#group{fd=nil,query_server=nil,current_seq=0,
id_btree=nil,views=Views2}.
reset_file(Db, Fd, DbName, #group{sig=Sig,name=Name} = Group) ->
@@ -598,7 +580,7 @@ init_group(Db, Fd, #group{views=Views}=Group, nil) ->
init_group(Db, Fd, Group,
#index_header{seq=0, purge_seq=couch_db:get_purge_seq(Db),
id_btree_state=nil, view_states=[{nil, 0, 0} || _ <- Views]});
-init_group(Db, Fd, #group{def_lang=Lang,views=Views}=
+init_group(_Db, Fd, #group{def_lang=Lang,views=Views}=
Group, IndexHeader) ->
#index_header{seq=Seq, purge_seq=PurgeSeq,
id_btree_state=IdBtreeState, view_states=ViewStates} = IndexHeader,
@@ -638,5 +620,5 @@ init_group(Db, Fd, #group{def_lang=Lang,views=Views}=
View#view{btree=Btree, update_seq=USeq, purge_seq=PSeq}
end,
ViewStates2, Views),
- Group#group{db=Db, fd=Fd, current_seq=Seq, purge_seq=PurgeSeq,
+ Group#group{fd=Fd, current_seq=Seq, purge_seq=PurgeSeq,
id_btree=IdBtree, views=Views2}.
diff --git a/src/couchdb/couch_view_updater.erl b/src/couchdb/couch_view_updater.erl
index 8e089fa9..2cc390df 100644
--- a/src/couchdb/couch_view_updater.erl
+++ b/src/couchdb/couch_view_updater.erl
@@ -12,30 +12,31 @@
-module(couch_view_updater).
--export([update/2]).
+-export([update/3]).
-include("couch_db.hrl").
--spec update(_, #group{}) -> no_return().
+-spec update(_, #group{}, Dbname::binary()) -> no_return().
-update(Owner, Group) ->
+update(Owner, Group, DbName) ->
#group{
- db = #db{name=DbName} = Db,
name = GroupName,
current_seq = Seq,
purge_seq = PurgeSeq
} = Group,
couch_task_status:add_task(<<"View Group Indexer">>, <<DbName/binary," ",GroupName/binary>>, <<"Starting index update">>),
+ {ok, Db} = couch_db:open_int(DbName, []),
DbPurgeSeq = couch_db:get_purge_seq(Db),
Group2 =
if DbPurgeSeq == PurgeSeq ->
Group;
DbPurgeSeq == PurgeSeq + 1 ->
couch_task_status:update(<<"Removing purged entries from view index.">>),
- purge_index(Group);
+ purge_index(Group, Db);
true ->
couch_task_status:update(<<"Resetting view index due to lost purge entries.">>),
+ couch_db:close(Db),
exit(reset)
end,
{ok, MapQueue} = couch_work_queue:new(
@@ -73,13 +74,14 @@ update(Owner, Group) ->
couch_task_status:set_update_frequency(0),
couch_task_status:update("Finishing."),
couch_work_queue:close(MapQueue),
+ couch_db:close(Db),
receive {new_group, NewGroup} ->
exit({new_group,
NewGroup#group{current_seq=couch_db:get_update_seq(Db)}})
end.
-purge_index(#group{db=Db, views=Views, id_btree=IdBtree}=Group) ->
+purge_index(#group{views=Views, id_btree=IdBtree}=Group, Db) ->
{ok, PurgedIdsRevs} = couch_db:get_last_purged(Db),
Ids = [Id || {Id, _Revs} <- PurgedIdsRevs],
{ok, Lookups, IdBtree2} = couch_btree:query_modify(IdBtree, Ids, [], Ids),