summaryrefslogtreecommitdiff
path: root/src/couchdb/couch_view_group.erl
diff options
context:
space:
mode:
authorFilipe David Borba Manana <fdmanana@apache.org>2011-06-23 09:45:39 +0000
committerFilipe David Borba Manana <fdmanana@apache.org>2011-06-23 09:45:39 +0000
commit91e2121c913a54a77482ed3883f7a8d2d00801a4 (patch)
treec2e8672d4b1ab6cd04fefcfb8df0ce52b2911ef9 /src/couchdb/couch_view_group.erl
parentd8dc16093a26c53407d8bf702698848104ba01d6 (diff)
Merged revision 1138796 from trunk
Simpler and safer db open/closing in view group servers This makes the opening and closing of databases in the view group server to be more friendly with the db reference counting system, avoiding more potential db file leaking after compaction, as we currently open a database in one process and use it on another process (view compactor, view updater). This significantly reduces the chances of failure when compacting very large views as discussed in COUCHDB-994. This relates to COUCHDB-926 and COUCHDB-994. git-svn-id: https://svn.apache.org/repos/asf/couchdb/branches/1.1.x@1138798 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src/couchdb/couch_view_group.erl')
-rw-r--r--src/couchdb/couch_view_group.erl72
1 files changed, 27 insertions, 45 deletions
diff --git a/src/couchdb/couch_view_group.erl b/src/couchdb/couch_view_group.erl
index 6ef1dcb4..448a7dcf 100644
--- a/src/couchdb/couch_view_group.erl
+++ b/src/couchdb/couch_view_group.erl
@@ -78,7 +78,7 @@ start_link(InitArgs) ->
init({{_, DbName, _} = InitArgs, ReturnPid, Ref}) ->
process_flag(trap_exit, true),
try prepare_group(InitArgs, false) of
- {ok, #group{db=Db, fd=Fd, current_seq=Seq}=Group} ->
+ {ok, Db, #group{fd=Fd, current_seq=Seq}=Group} ->
case Seq > couch_db:get_update_seq(Db) of
true ->
ReturnPid ! {Ref, self(), {error, invalid_view_seq}},
@@ -90,7 +90,7 @@ init({{_, DbName, _} = InitArgs, ReturnPid, Ref}) ->
{ok, #group_state{
db_name=DbName,
init_args=InitArgs,
- group=Group#group{db=nil},
+ group=Group,
ref_counter=RefCounter}}
end;
Error ->
@@ -124,14 +124,11 @@ handle_call({request_group, RequestSeq}, From,
updater_pid=nil,
waiting_list=WaitList
}=State) when RequestSeq > Seq ->
- {ok, Db} = couch_db:open_int(DbName, []),
- Group2 = Group#group{db=Db},
Owner = self(),
- Pid = spawn_link(fun()-> couch_view_updater:update(Owner, Group2) end),
+ Pid = spawn_link(fun()-> couch_view_updater:update(Owner, Group, DbName) end),
{noreply, State#group_state{
updater_pid=Pid,
- group=Group2,
waiting_list=[{From,RequestSeq}|WaitList]
}, infinity};
@@ -166,7 +163,8 @@ handle_cast({start_compact, CompactFun}, #group_state{compactor_pid=nil}
{ok, Db} = couch_db:open_int(DbName, []),
{ok, Fd} = open_index_file(compact, RootDir, DbName, GroupSig),
NewGroup = reset_file(Db, Fd, DbName, Group),
- Pid = spawn_link(fun() -> CompactFun(Group, NewGroup) end),
+ couch_db:close(Db),
+ Pid = spawn_link(fun() -> CompactFun(Group, NewGroup, DbName) end),
{noreply, State#group_state{compactor_pid = Pid}};
handle_cast({start_compact, _}, State) ->
%% compact already running, this is a no-op
@@ -176,7 +174,7 @@ handle_cast({compact_done, #group{current_seq=NewSeq} = NewGroup},
#group_state{group = #group{current_seq=OldSeq}} = State)
when NewSeq >= OldSeq ->
#group_state{
- group = #group{name=GroupId, fd=OldFd, sig=GroupSig} = Group,
+ group = #group{name=GroupId, fd=OldFd, sig=GroupSig},
init_args = {RootDir, DbName, _},
updater_pid = UpdaterPid,
compactor_pid = CompactorPid,
@@ -195,7 +193,7 @@ handle_cast({compact_done, #group{current_seq=NewSeq} = NewGroup},
unlink(UpdaterPid),
exit(UpdaterPid, view_compaction_complete),
Owner = self(),
- spawn_link(fun()-> couch_view_updater:update(Owner, NewGroup) end);
+ spawn_link(fun()-> couch_view_updater:update(Owner, NewGroup, DbName) end);
true ->
nil
end,
@@ -206,19 +204,10 @@ handle_cast({compact_done, #group{current_seq=NewSeq} = NewGroup},
unlink(OldFd),
couch_ref_counter:drop(RefCounter),
{ok, NewRefCounter} = couch_ref_counter:start([NewGroup#group.fd]),
- case Group#group.db of
- nil -> ok;
- Else -> couch_db:close(Else)
- end,
-
- case NewGroup#group.db of
- nil -> ok;
- _ -> couch_db:close(NewGroup#group.db)
- end,
self() ! delayed_commit,
{noreply, State#group_state{
- group=NewGroup#group{db = nil},
+ group=NewGroup,
ref_counter=NewRefCounter,
compactor_pid=nil,
updater_pid=NewUpdaterPid
@@ -230,18 +219,15 @@ handle_cast({compact_done, NewGroup}, State) ->
} = State,
?LOG_INFO("View index compaction still behind for ~s ~s -- current: ~p " ++
"compact: ~p", [DbName, GroupId, CurrentSeq, NewGroup#group.current_seq]),
- couch_db:close(NewGroup#group.db),
Pid = spawn_link(fun() ->
- {ok, Db} = couch_db:open_int(DbName, []),
{_,Ref} = erlang:spawn_monitor(fun() ->
- couch_view_updater:update(nil, NewGroup#group{db = Db})
+ couch_view_updater:update(nil, NewGroup, DbName)
end),
receive
{'DOWN', Ref, _, _, {new_group, NewGroup2}} ->
- couch_db:close(Db),
#group{name=GroupId} = NewGroup2,
Pid2 = couch_view:get_group_server(DbName, GroupId),
- gen_server:cast(Pid2, {compact_done, NewGroup2#group{db = nil}})
+ gen_server:cast(Pid2, {compact_done, NewGroup2})
end
end),
{noreply, State#group_state{compactor_pid = Pid}};
@@ -283,13 +269,12 @@ handle_info(delayed_commit, #group_state{db_name=DbName,group=Group}=State) ->
{noreply, State#group_state{waiting_commit=true}}
end;
-handle_info({'EXIT', FromPid, {new_group, #group{db=Db}=Group}},
+handle_info({'EXIT', FromPid, {new_group, Group}},
#group_state{db_name=DbName,
updater_pid=UpPid,
ref_counter=RefCounter,
waiting_list=WaitList,
waiting_commit=WaitingCommit}=State) when UpPid == FromPid ->
- ok = couch_db:close(Db),
if not WaitingCommit ->
erlang:send_after(1000, self(), delayed_commit);
true -> ok
@@ -297,30 +282,27 @@ handle_info({'EXIT', FromPid, {new_group, #group{db=Db}=Group}},
case reply_with_group(Group, WaitList, [], RefCounter) of
[] ->
{noreply, State#group_state{waiting_commit=true, waiting_list=[],
- group=Group#group{db=nil}, updater_pid=nil}};
+ group=Group, updater_pid=nil}};
StillWaiting ->
% we still have some waiters, reopen the database and reupdate the index
- {ok, Db2} = couch_db:open_int(DbName, []),
- Group2 = Group#group{db=Db2},
Owner = self(),
- Pid = spawn_link(fun() -> couch_view_updater:update(Owner, Group2) end),
+ Pid = spawn_link(fun() -> couch_view_updater:update(Owner, Group, DbName) end),
{noreply, State#group_state{waiting_commit=true,
- waiting_list=StillWaiting, group=Group2, updater_pid=Pid}}
+ waiting_list=StillWaiting, updater_pid=Pid}}
end;
handle_info({'EXIT', _, {new_group, _}}, State) ->
%% message from an old (probably pre-compaction) updater; ignore
{noreply, State};
-handle_info({'EXIT', FromPid, reset},
- #group_state{
- init_args=InitArgs,
- updater_pid=UpPid,
- group=Group}=State) when UpPid == FromPid ->
- ok = couch_db:close(Group#group.db),
+handle_info({'EXIT', UpPid, reset},
+ #group_state{init_args=InitArgs, updater_pid=UpPid} = State) ->
case prepare_group(InitArgs, true) of
- {ok, ResetGroup} ->
+ {ok, Db, ResetGroup} ->
Owner = self(),
- Pid = spawn_link(fun()-> couch_view_updater:update(Owner, ResetGroup) end),
+ couch_db:close(Db),
+ Pid = spawn_link(fun() ->
+ couch_view_updater:update(Owner, ResetGroup, Db#db.name)
+ end),
{noreply, State#group_state{
updater_pid=Pid,
group=ResetGroup}};
@@ -386,17 +368,17 @@ prepare_group({RootDir, DbName, #group{sig=Sig}=Group}, ForceReset)->
{ok, Fd} ->
if ForceReset ->
% this can happen if we missed a purge
- {ok, reset_file(Db, Fd, DbName, Group)};
+ {ok, Db, reset_file(Db, Fd, DbName, Group)};
true ->
% 09 UPGRADE CODE
ok = couch_file:upgrade_old_header(Fd, <<$r, $c, $k, 0>>),
case (catch couch_file:read_header(Fd)) of
{ok, {Sig, HeaderInfo}} ->
% sigs match!
- {ok, init_group(Db, Fd, Group, HeaderInfo)};
+ {ok, Db, init_group(Db, Fd, Group, HeaderInfo)};
_ ->
% this happens on a new file
- {ok, reset_file(Db, Fd, DbName, Group)}
+ {ok, Db, reset_file(Db, Fd, DbName, Group)}
end
end;
Error ->
@@ -582,7 +564,7 @@ design_doc_to_view_group(#doc{id=Id,body={Fields}}) ->
reset_group(#group{views=Views}=Group) ->
Views2 = [View#view{btree=nil} || View <- Views],
- Group#group{db=nil,fd=nil,query_server=nil,current_seq=0,
+ Group#group{fd=nil,query_server=nil,current_seq=0,
id_btree=nil,views=Views2}.
reset_file(Db, Fd, DbName, #group{sig=Sig,name=Name} = Group) ->
@@ -598,7 +580,7 @@ init_group(Db, Fd, #group{views=Views}=Group, nil) ->
init_group(Db, Fd, Group,
#index_header{seq=0, purge_seq=couch_db:get_purge_seq(Db),
id_btree_state=nil, view_states=[{nil, 0, 0} || _ <- Views]});
-init_group(Db, Fd, #group{def_lang=Lang,views=Views}=
+init_group(_Db, Fd, #group{def_lang=Lang,views=Views}=
Group, IndexHeader) ->
#index_header{seq=Seq, purge_seq=PurgeSeq,
id_btree_state=IdBtreeState, view_states=ViewStates} = IndexHeader,
@@ -638,5 +620,5 @@ init_group(Db, Fd, #group{def_lang=Lang,views=Views}=
View#view{btree=Btree, update_seq=USeq, purge_seq=PSeq}
end,
ViewStates2, Views),
- Group#group{db=Db, fd=Fd, current_seq=Seq, purge_seq=PurgeSeq,
+ Group#group{fd=Fd, current_seq=Seq, purge_seq=PurgeSeq,
id_btree=IdBtree, views=Views2}.