summaryrefslogtreecommitdiff
path: root/apps/couch/src/couch_view_group.erl
diff options
context:
space:
mode:
authorRobert Newson <robert.newson@cloudant.com>2011-09-28 11:18:06 +0100
committerRobert Newson <robert.newson@cloudant.com>2011-09-28 11:32:50 +0100
commit954ddf0fca558f17f39e68df8311ee9057beb390 (patch)
tree2542c112210363076be1a8bc7b24e13bfaf1c055 /apps/couch/src/couch_view_group.erl
parentc8d7b6d8c3cb881d525be80bc6b16bc08822df65 (diff)
parentbefbdfb11f45bd2a5ccffb6b0d5ac04435ac9e55 (diff)
Merge 1.1.x changes
Conflicts: apps/couch/include/couch_db.hrl apps/couch/src/couch_db.erl apps/couch/src/couch_os_process.erl apps/couch/src/couch_query_servers.erl apps/couch/src/couch_rep.erl apps/couch/src/couch_replication_manager.erl apps/couch/src/couch_view_compactor.erl apps/couch/src/couch_view_group.erl apps/couch/src/couch_view_updater.erl configure.ac couchjs/c_src/http.c couchjs/c_src/main.c couchjs/c_src/utf8.c etc/windows/couchdb.iss.tpl src/couchdb/priv/Makefile.am src/couchdb/priv/couch_js/main.c test/etap/160-vhosts.t test/etap/200-view-group-no-db-leaks.t test/etap/Makefile.am BugzID: 12645
Diffstat (limited to 'apps/couch/src/couch_view_group.erl')
-rw-r--r--apps/couch/src/couch_view_group.erl157
1 files changed, 69 insertions, 88 deletions
diff --git a/apps/couch/src/couch_view_group.erl b/apps/couch/src/couch_view_group.erl
index 11cb4c60..87e4cd2e 100644
--- a/apps/couch/src/couch_view_group.erl
+++ b/apps/couch/src/couch_view_group.erl
@@ -17,6 +17,9 @@
-export([start_link/1, request_group/2, trigger_group_update/2, request_group_info/1]).
-export([open_db_group/2, open_temp_group/5, design_doc_to_view_group/1,design_root/2]).
+%% Exports for the compactor
+-export([get_index_header_data/1]).
+
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -80,8 +83,7 @@ start_link(InitArgs) ->
init({{_, DbName, _} = InitArgs, ReturnPid, Ref}) ->
process_flag(trap_exit, true),
case prepare_group(InitArgs, false) of
- {ok, #group{fd=Fd, current_seq=Seq}=Group} ->
- {ok, Db} = couch_db:open(DbName, []),
+ {ok, Db, #group{fd=Fd, current_seq=Seq}=Group} ->
case Seq > couch_db:get_update_seq(Db) of
true ->
ReturnPid ! {Ref, self(), {error, invalid_view_seq}},
@@ -92,7 +94,7 @@ init({{_, DbName, _} = InitArgs, ReturnPid, Ref}) ->
{ok, #group_state{
db_name=DbName,
init_args=InitArgs,
- group=Group#group{dbname=DbName},
+ group=Group,
ref_counter=erlang:monitor(process,Fd)}}
end;
Error ->
@@ -118,16 +120,16 @@ init({{_, DbName, _} = InitArgs, ReturnPid, Ref}) ->
handle_call({request_group, RequestSeq}, From,
#group_state{
+ db_name=DbName,
group=#group{current_seq=Seq}=Group,
updater_pid=nil,
waiting_list=WaitList
}=State) when RequestSeq > Seq ->
Owner = self(),
- Pid = spawn_link(fun()-> couch_view_updater:update(Owner, Group) end),
+ Pid = spawn_link(fun()-> couch_view_updater:update(Owner, Group, DbName) end),
{noreply, State#group_state{
updater_pid=Pid,
- group=Group,
waiting_list=[{From,RequestSeq}|WaitList]
}, infinity};
@@ -148,40 +150,28 @@ handle_call({request_group, RequestSeq}, From,
waiting_list=[{From, RequestSeq}|WaitList]
}, infinity};
-handle_call({start_compact, CompactFun}, _From, State) ->
- {noreply, NewState} = handle_cast({start_compact, CompactFun}, State),
- {reply, {ok, NewState#group_state.compactor_pid}, NewState};
-
handle_call(request_group_info, _From, State) ->
GroupInfo = get_group_info(State),
- {reply, {ok, GroupInfo}, State}.
+ {reply, {ok, GroupInfo}, State};
-handle_cast({update_group, RequestSeq},
- #group_state{
- group=#group{current_seq=Seq}=Group,
- updater_pid=nil}=State) when RequestSeq > Seq ->
- Owner = self(),
- Pid = spawn_link(fun()-> couch_view_updater:update(Owner, Group) end),
- {noreply, State#group_state{updater_pid=Pid}};
-handle_cast({update_group, _RequestSeq}, State) ->
- {noreply, State};
-
-handle_cast({start_compact, CompactFun}, #group_state{compactor_pid=nil}
+handle_call({start_compact, CompactFun}, _From, #group_state{compactor_pid=nil}
= State) ->
#group_state{
- group = #group{dbname = DbName, name = GroupId, sig = GroupSig} = Group,
- init_args = {RootDir, _, _}
+ group = #group{name = GroupId, sig = GroupSig} = Group,
+ init_args = {RootDir, DbName, _}
} = State,
?LOG_INFO("View index compaction starting for ~s ~s", [DbName, GroupId]),
+ {ok, Db} = couch_db:open_int(DbName, []),
{ok, Fd} = open_index_file(compact, RootDir, DbName, GroupSig),
- NewGroup = reset_file(Fd, DbName, Group),
- Pid = spawn_link(fun() -> CompactFun(Group, NewGroup) end),
- {noreply, State#group_state{compactor_pid = Pid}};
-handle_cast({start_compact, _}, State) ->
+ NewGroup = reset_file(Db, Fd, DbName, Group),
+ couch_db:close(Db),
+ Pid = spawn_link(fun() -> CompactFun(Group, NewGroup, DbName) end),
+ {reply, {ok, Pid}, State#group_state{compactor_pid = Pid}};
+handle_call({start_compact, _}, _From, #group_state{compactor_pid=Pid} = State) ->
%% compact already running, this is a no-op
- {noreply, State};
+ {reply, {ok, Pid}, State};
-handle_cast({compact_done, #group{fd=NewFd, current_seq=NewSeq} = NewGroup},
+handle_call({compact_done, #group{fd=NewFd, current_seq=NewSeq} = NewGroup}, _From,
#group_state{group = #group{current_seq=OldSeq}} = State)
when NewSeq >= OldSeq ->
#group_state{
@@ -204,7 +194,7 @@ handle_cast({compact_done, #group{fd=NewFd, current_seq=NewSeq} = NewGroup},
unlink(UpdaterPid),
exit(UpdaterPid, view_compaction_complete),
Owner = self(),
- spawn_link(fun()-> couch_view_updater:update(Owner, NewGroup) end);
+ spawn_link(fun()-> couch_view_updater:update(Owner, NewGroup, DbName) end);
true ->
nil
end,
@@ -216,30 +206,20 @@ handle_cast({compact_done, #group{fd=NewFd, current_seq=NewSeq} = NewGroup},
erlang:demonitor(RefCounter),
self() ! delayed_commit,
- {noreply, State#group_state{
+ {reply, ok, State#group_state{
group=NewGroup,
ref_counter=erlang:monitor(process,NewFd),
compactor_pid=nil,
updater_pid=NewUpdaterPid
}};
-handle_cast({compact_done, NewGroup}, State) ->
+handle_call({compact_done, NewGroup}, _From, State) ->
#group_state{
group = #group{name = GroupId, current_seq = CurrentSeq},
init_args={_RootDir, DbName, _}
} = State,
?LOG_INFO("View index compaction still behind for ~s ~s -- current: ~p " ++
"compact: ~p", [DbName, GroupId, CurrentSeq, NewGroup#group.current_seq]),
- GroupServer = self(),
- Pid = spawn_link(fun() ->
- erlang:monitor(process, NewGroup#group.fd),
- {_,Ref} = erlang:spawn_monitor(fun() ->
- couch_view_updater:update(nil, NewGroup)
- end),
- receive {'DOWN', Ref, _, _, {new_group, NewGroup2}} ->
- gen_server:cast(GroupServer, {compact_done, NewGroup2})
- end
- end),
- {noreply, State#group_state{compactor_pid = Pid}};
+ {reply, update, State}.
handle_cast({partial_update, Pid, NewGroup}, #group_state{updater_pid=Pid}
= State) ->
@@ -279,7 +259,7 @@ handle_info(delayed_commit, #group_state{db_name=DbName,group=Group}=State) ->
end;
handle_info({'EXIT', FromPid, {new_group, Group}},
- #group_state{
+ #group_state{db_name=DbName,
updater_pid=UpPid,
ref_counter=RefCounter,
waiting_list=WaitList,
@@ -293,22 +273,25 @@ handle_info({'EXIT', FromPid, {new_group, Group}},
{noreply, State#group_state{waiting_commit=true, waiting_list=[],
group=Group, updater_pid=nil}};
StillWaiting ->
- % we still have some waiters, reupdate the index
+ % we still have some waiters, reopen the database and reupdate the index
Owner = self(),
- Pid = spawn_link(fun() -> couch_view_updater:update(Owner, Group) end),
+ Pid = spawn_link(fun() -> couch_view_updater:update(Owner, Group, DbName) end),
{noreply, State#group_state{waiting_commit=true,
- waiting_list=StillWaiting, group=Group, updater_pid=Pid}}
+ waiting_list=StillWaiting, updater_pid=Pid}}
end;
handle_info({'EXIT', _, {new_group, _}}, State) ->
%% message from an old (probably pre-compaction) updater; ignore
{noreply, State};
-handle_info({'EXIT', FromPid, reset}, #group_state{init_args=InitArgs,
- updater_pid=FromPid}=State) ->
+handle_info({'EXIT', UpPid, reset},
+ #group_state{init_args=InitArgs, updater_pid=UpPid} = State) ->
case prepare_group(InitArgs, true) of
- {ok, ResetGroup} ->
+ {ok, Db, ResetGroup} ->
Owner = self(),
- Pid = spawn_link(fun()-> couch_view_updater:update(Owner, ResetGroup) end),
+ couch_db:close(Db),
+ Pid = spawn_link(fun() ->
+ couch_view_updater:update(Owner, ResetGroup, Db#db.name)
+ end),
{noreply, State#group_state{
updater_pid=Pid,
group=ResetGroup}};
@@ -368,29 +351,32 @@ reply_all(#group_state{waiting_list=WaitList}=State, Reply) ->
[catch gen_server:reply(Pid, Reply) || {Pid, _} <- WaitList],
State#group_state{waiting_list=[]}.
-prepare_group({Root, DbName, #group{dbname=X}=G}, Reset) when X =/= DbName ->
- prepare_group({Root, DbName, G#group{dbname=DbName}}, Reset);
prepare_group({RootDir, DbName, #group{sig=Sig}=Group}, ForceReset)->
- case open_index_file(RootDir, DbName, Sig) of
- {ok, Fd} ->
- if ForceReset ->
- % this can happen if we missed a purge
- {ok, reset_file(Fd, DbName, Group)};
- true ->
- % 09 UPGRADE CODE
- ok = couch_file:upgrade_old_header(Fd, <<$r, $c, $k, 0>>),
- case (catch couch_file:read_header(Fd)) of
- {ok, {Sig, HeaderInfo}} ->
- % sigs match!
- {ok, init_group(Fd, Group, HeaderInfo)};
- _ ->
- % this happens on a new file
- {ok, reset_file(Fd, DbName, Group)}
- end
+ case couch_db:open_int(DbName, []) of
+ {ok, Db} ->
+ case open_index_file(RootDir, DbName, Sig) of
+ {ok, Fd} ->
+ if ForceReset ->
+ % this can happen if we missed a purge
+ {ok, Db, reset_file(Db, Fd, DbName, Group)};
+ true ->
+ % 09 UPGRADE CODE
+ ok = couch_file:upgrade_old_header(Fd, <<$r, $c, $k, 0>>),
+ case (catch couch_file:read_header(Fd)) of
+ {ok, {Sig, HeaderInfo}} ->
+ % sigs match!
+ {ok, Db, init_group(Db, Fd, Group, HeaderInfo)};
+ _ ->
+ % this happens on a new file
+ {ok, Db, reset_file(Db, Fd, DbName, Group)}
+ end
+ end;
+ Error ->
+ catch delete_index_file(RootDir, DbName, Sig),
+ Error
end;
- Error ->
- catch delete_index_file(RootDir, DbName, Sig),
- Error
+ Else ->
+ Else
end.
get_index_header_data(#group{current_seq=Seq, purge_seq=PurgeSeq,
@@ -497,7 +483,7 @@ open_db_group(DbName, GroupId) ->
end)
end),
receive {'DOWN', Ref, process, Pid, {ok, Doc}} ->
- {ok, design_doc_to_view_group(Doc)};
+ {ok, design_doc_to_view_group(Doc)};
{'DOWN', Ref, process, Pid, Error} ->
Error
end.
@@ -575,31 +561,26 @@ design_doc_to_view_group(#doc{id=Id,body={Fields}}) ->
end, 0, lists:sort(dict:to_list(DictBySrc))),
set_view_sig(#group{name=Id, lib=Lib, views=Views, def_lang=Language, design_options=DesignOptions}).
-reset_group(DbName, #group{views=Views}=Group) ->
+reset_group(#group{views=Views}=Group) ->
Views2 = [View#view{btree=nil} || View <- Views],
- Group#group{dbname=DbName,fd=nil,query_server=nil,current_seq=0,
+ Group#group{fd=nil,query_server=nil,current_seq=0,
id_btree=nil,views=Views2}.
-reset_file(Fd, DbName, #group{sig=Sig,name=Name} = Group) ->
+reset_file(Db, Fd, DbName, #group{sig=Sig,name=Name} = Group) ->
?LOG_DEBUG("Resetting group index \"~s\" in db ~s", [Name, DbName]),
ok = couch_file:truncate(Fd, 0),
ok = couch_file:write_header(Fd, {Sig, nil}),
- init_group(Fd, reset_group(DbName, Group), nil).
+ init_group(Db, Fd, reset_group(Group), nil).
delete_index_file(RootDir, DbName, GroupSig) ->
couch_file:delete(RootDir, index_file_name(RootDir, DbName, GroupSig)).
-init_group(Fd, #group{dbname=DbName, views=Views}=Group, nil) ->
- case couch_db:open(DbName, []) of
- {ok, Db} ->
- PurgeSeq = try couch_db:get_purge_seq(Db) after couch_db:close(Db) end,
- Header = #index_header{purge_seq=PurgeSeq, view_states=[{nil, 0, 0} || _ <- Views]},
- init_group(Fd, Group, Header);
- {not_found, no_db_file} ->
- ?LOG_ERROR("~p no_db_file ~p", [?MODULE, DbName]),
- exit(no_db_file)
- end;
-init_group(Fd, #group{def_lang=Lang,views=Views}=Group, IndexHeader) ->
+init_group(Db, Fd, #group{views=Views}=Group, nil) ->
+ init_group(Db, Fd, Group,
+ #index_header{seq=0, purge_seq=couch_db:get_purge_seq(Db),
+ id_btree_state=nil, view_states=[{nil, 0, 0} || _ <- Views]});
+init_group(_Db, Fd, #group{def_lang=Lang,views=Views}=
+ Group, IndexHeader) ->
#index_header{seq=Seq, purge_seq=PurgeSeq,
id_btree_state=IdBtreeState, view_states=ViewStates} = IndexHeader,
StateUpdate = fun