diff options
Diffstat (limited to 'apps/couch/src/couch_view_group.erl')
-rw-r--r-- | apps/couch/src/couch_view_group.erl | 157 |
1 files changed, 69 insertions, 88 deletions
diff --git a/apps/couch/src/couch_view_group.erl b/apps/couch/src/couch_view_group.erl index 11cb4c60..87e4cd2e 100644 --- a/apps/couch/src/couch_view_group.erl +++ b/apps/couch/src/couch_view_group.erl @@ -17,6 +17,9 @@ -export([start_link/1, request_group/2, trigger_group_update/2, request_group_info/1]). -export([open_db_group/2, open_temp_group/5, design_doc_to_view_group/1,design_root/2]). +%% Exports for the compactor +-export([get_index_header_data/1]). + %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -80,8 +83,7 @@ start_link(InitArgs) -> init({{_, DbName, _} = InitArgs, ReturnPid, Ref}) -> process_flag(trap_exit, true), case prepare_group(InitArgs, false) of - {ok, #group{fd=Fd, current_seq=Seq}=Group} -> - {ok, Db} = couch_db:open(DbName, []), + {ok, Db, #group{fd=Fd, current_seq=Seq}=Group} -> case Seq > couch_db:get_update_seq(Db) of true -> ReturnPid ! {Ref, self(), {error, invalid_view_seq}}, @@ -92,7 +94,7 @@ init({{_, DbName, _} = InitArgs, ReturnPid, Ref}) -> {ok, #group_state{ db_name=DbName, init_args=InitArgs, - group=Group#group{dbname=DbName}, + group=Group, ref_counter=erlang:monitor(process,Fd)}} end; Error -> @@ -118,16 +120,16 @@ init({{_, DbName, _} = InitArgs, ReturnPid, Ref}) -> handle_call({request_group, RequestSeq}, From, #group_state{ + db_name=DbName, group=#group{current_seq=Seq}=Group, updater_pid=nil, waiting_list=WaitList }=State) when RequestSeq > Seq -> Owner = self(), - Pid = spawn_link(fun()-> couch_view_updater:update(Owner, Group) end), + Pid = spawn_link(fun()-> couch_view_updater:update(Owner, Group, DbName) end), {noreply, State#group_state{ updater_pid=Pid, - group=Group, waiting_list=[{From,RequestSeq}|WaitList] }, infinity}; @@ -148,40 +150,28 @@ handle_call({request_group, RequestSeq}, From, waiting_list=[{From, RequestSeq}|WaitList] }, infinity}; -handle_call({start_compact, CompactFun}, _From, State) -> - {noreply, NewState} = handle_cast({start_compact, CompactFun}, State), - {reply, {ok, NewState#group_state.compactor_pid}, NewState}; - handle_call(request_group_info, _From, State) -> GroupInfo = get_group_info(State), - {reply, {ok, GroupInfo}, State}. + {reply, {ok, GroupInfo}, State}; -handle_cast({update_group, RequestSeq}, - #group_state{ - group=#group{current_seq=Seq}=Group, - updater_pid=nil}=State) when RequestSeq > Seq -> - Owner = self(), - Pid = spawn_link(fun()-> couch_view_updater:update(Owner, Group) end), - {noreply, State#group_state{updater_pid=Pid}}; -handle_cast({update_group, _RequestSeq}, State) -> - {noreply, State}; - -handle_cast({start_compact, CompactFun}, #group_state{compactor_pid=nil} +handle_call({start_compact, CompactFun}, _From, #group_state{compactor_pid=nil} = State) -> #group_state{ - group = #group{dbname = DbName, name = GroupId, sig = GroupSig} = Group, - init_args = {RootDir, _, _} + group = #group{name = GroupId, sig = GroupSig} = Group, + init_args = {RootDir, DbName, _} } = State, ?LOG_INFO("View index compaction starting for ~s ~s", [DbName, GroupId]), + {ok, Db} = couch_db:open_int(DbName, []), {ok, Fd} = open_index_file(compact, RootDir, DbName, GroupSig), - NewGroup = reset_file(Fd, DbName, Group), - Pid = spawn_link(fun() -> CompactFun(Group, NewGroup) end), - {noreply, State#group_state{compactor_pid = Pid}}; -handle_cast({start_compact, _}, State) -> + NewGroup = reset_file(Db, Fd, DbName, Group), + couch_db:close(Db), + Pid = spawn_link(fun() -> CompactFun(Group, NewGroup, DbName) end), + {reply, {ok, Pid}, State#group_state{compactor_pid = Pid}}; +handle_call({start_compact, _}, _From, #group_state{compactor_pid=Pid} = State) -> %% compact already running, this is a no-op - {noreply, State}; + {reply, {ok, Pid}, State}; -handle_cast({compact_done, #group{fd=NewFd, current_seq=NewSeq} = NewGroup}, +handle_call({compact_done, #group{fd=NewFd, current_seq=NewSeq} = NewGroup}, _From, #group_state{group = #group{current_seq=OldSeq}} = State) when NewSeq >= OldSeq -> #group_state{ @@ -204,7 +194,7 @@ handle_cast({compact_done, #group{fd=NewFd, current_seq=NewSeq} = NewGroup}, unlink(UpdaterPid), exit(UpdaterPid, view_compaction_complete), Owner = self(), - spawn_link(fun()-> couch_view_updater:update(Owner, NewGroup) end); + spawn_link(fun()-> couch_view_updater:update(Owner, NewGroup, DbName) end); true -> nil end, @@ -216,30 +206,20 @@ handle_cast({compact_done, #group{fd=NewFd, current_seq=NewSeq} = NewGroup}, erlang:demonitor(RefCounter), self() ! delayed_commit, - {noreply, State#group_state{ + {reply, ok, State#group_state{ group=NewGroup, ref_counter=erlang:monitor(process,NewFd), compactor_pid=nil, updater_pid=NewUpdaterPid }}; -handle_cast({compact_done, NewGroup}, State) -> +handle_call({compact_done, NewGroup}, _From, State) -> #group_state{ group = #group{name = GroupId, current_seq = CurrentSeq}, init_args={_RootDir, DbName, _} } = State, ?LOG_INFO("View index compaction still behind for ~s ~s -- current: ~p " ++ "compact: ~p", [DbName, GroupId, CurrentSeq, NewGroup#group.current_seq]), - GroupServer = self(), - Pid = spawn_link(fun() -> - erlang:monitor(process, NewGroup#group.fd), - {_,Ref} = erlang:spawn_monitor(fun() -> - couch_view_updater:update(nil, NewGroup) - end), - receive {'DOWN', Ref, _, _, {new_group, NewGroup2}} -> - gen_server:cast(GroupServer, {compact_done, NewGroup2}) - end - end), - {noreply, State#group_state{compactor_pid = Pid}}; + {reply, update, State}. handle_cast({partial_update, Pid, NewGroup}, #group_state{updater_pid=Pid} = State) -> @@ -279,7 +259,7 @@ handle_info(delayed_commit, #group_state{db_name=DbName,group=Group}=State) -> end; handle_info({'EXIT', FromPid, {new_group, Group}}, - #group_state{ + #group_state{db_name=DbName, updater_pid=UpPid, ref_counter=RefCounter, waiting_list=WaitList, @@ -293,22 +273,25 @@ handle_info({'EXIT', FromPid, {new_group, Group}}, {noreply, State#group_state{waiting_commit=true, waiting_list=[], group=Group, updater_pid=nil}}; StillWaiting -> - % we still have some waiters, reupdate the index + % we still have some waiters, reopen the database and reupdate the index Owner = self(), - Pid = spawn_link(fun() -> couch_view_updater:update(Owner, Group) end), + Pid = spawn_link(fun() -> couch_view_updater:update(Owner, Group, DbName) end), {noreply, State#group_state{waiting_commit=true, - waiting_list=StillWaiting, group=Group, updater_pid=Pid}} + waiting_list=StillWaiting, updater_pid=Pid}} end; handle_info({'EXIT', _, {new_group, _}}, State) -> %% message from an old (probably pre-compaction) updater; ignore {noreply, State}; -handle_info({'EXIT', FromPid, reset}, #group_state{init_args=InitArgs, - updater_pid=FromPid}=State) -> +handle_info({'EXIT', UpPid, reset}, + #group_state{init_args=InitArgs, updater_pid=UpPid} = State) -> case prepare_group(InitArgs, true) of - {ok, ResetGroup} -> + {ok, Db, ResetGroup} -> Owner = self(), - Pid = spawn_link(fun()-> couch_view_updater:update(Owner, ResetGroup) end), + couch_db:close(Db), + Pid = spawn_link(fun() -> + couch_view_updater:update(Owner, ResetGroup, Db#db.name) + end), {noreply, State#group_state{ updater_pid=Pid, group=ResetGroup}}; @@ -368,29 +351,32 @@ reply_all(#group_state{waiting_list=WaitList}=State, Reply) -> [catch gen_server:reply(Pid, Reply) || {Pid, _} <- WaitList], State#group_state{waiting_list=[]}. -prepare_group({Root, DbName, #group{dbname=X}=G}, Reset) when X =/= DbName -> - prepare_group({Root, DbName, G#group{dbname=DbName}}, Reset); prepare_group({RootDir, DbName, #group{sig=Sig}=Group}, ForceReset)-> - case open_index_file(RootDir, DbName, Sig) of - {ok, Fd} -> - if ForceReset -> - % this can happen if we missed a purge - {ok, reset_file(Fd, DbName, Group)}; - true -> - % 09 UPGRADE CODE - ok = couch_file:upgrade_old_header(Fd, <<$r, $c, $k, 0>>), - case (catch couch_file:read_header(Fd)) of - {ok, {Sig, HeaderInfo}} -> - % sigs match! - {ok, init_group(Fd, Group, HeaderInfo)}; - _ -> - % this happens on a new file - {ok, reset_file(Fd, DbName, Group)} - end + case couch_db:open_int(DbName, []) of + {ok, Db} -> + case open_index_file(RootDir, DbName, Sig) of + {ok, Fd} -> + if ForceReset -> + % this can happen if we missed a purge + {ok, Db, reset_file(Db, Fd, DbName, Group)}; + true -> + % 09 UPGRADE CODE + ok = couch_file:upgrade_old_header(Fd, <<$r, $c, $k, 0>>), + case (catch couch_file:read_header(Fd)) of + {ok, {Sig, HeaderInfo}} -> + % sigs match! + {ok, Db, init_group(Db, Fd, Group, HeaderInfo)}; + _ -> + % this happens on a new file + {ok, Db, reset_file(Db, Fd, DbName, Group)} + end + end; + Error -> + catch delete_index_file(RootDir, DbName, Sig), + Error end; - Error -> - catch delete_index_file(RootDir, DbName, Sig), - Error + Else -> + Else end. get_index_header_data(#group{current_seq=Seq, purge_seq=PurgeSeq, @@ -497,7 +483,7 @@ open_db_group(DbName, GroupId) -> end) end), receive {'DOWN', Ref, process, Pid, {ok, Doc}} -> - {ok, design_doc_to_view_group(Doc)}; + {ok, design_doc_to_view_group(Doc)}; {'DOWN', Ref, process, Pid, Error} -> Error end. @@ -575,31 +561,26 @@ design_doc_to_view_group(#doc{id=Id,body={Fields}}) -> end, 0, lists:sort(dict:to_list(DictBySrc))), set_view_sig(#group{name=Id, lib=Lib, views=Views, def_lang=Language, design_options=DesignOptions}). -reset_group(DbName, #group{views=Views}=Group) -> +reset_group(#group{views=Views}=Group) -> Views2 = [View#view{btree=nil} || View <- Views], - Group#group{dbname=DbName,fd=nil,query_server=nil,current_seq=0, + Group#group{fd=nil,query_server=nil,current_seq=0, id_btree=nil,views=Views2}. -reset_file(Fd, DbName, #group{sig=Sig,name=Name} = Group) -> +reset_file(Db, Fd, DbName, #group{sig=Sig,name=Name} = Group) -> ?LOG_DEBUG("Resetting group index \"~s\" in db ~s", [Name, DbName]), ok = couch_file:truncate(Fd, 0), ok = couch_file:write_header(Fd, {Sig, nil}), - init_group(Fd, reset_group(DbName, Group), nil). + init_group(Db, Fd, reset_group(Group), nil). delete_index_file(RootDir, DbName, GroupSig) -> couch_file:delete(RootDir, index_file_name(RootDir, DbName, GroupSig)). -init_group(Fd, #group{dbname=DbName, views=Views}=Group, nil) -> - case couch_db:open(DbName, []) of - {ok, Db} -> - PurgeSeq = try couch_db:get_purge_seq(Db) after couch_db:close(Db) end, - Header = #index_header{purge_seq=PurgeSeq, view_states=[{nil, 0, 0} || _ <- Views]}, - init_group(Fd, Group, Header); - {not_found, no_db_file} -> - ?LOG_ERROR("~p no_db_file ~p", [?MODULE, DbName]), - exit(no_db_file) - end; -init_group(Fd, #group{def_lang=Lang,views=Views}=Group, IndexHeader) -> +init_group(Db, Fd, #group{views=Views}=Group, nil) -> + init_group(Db, Fd, Group, + #index_header{seq=0, purge_seq=couch_db:get_purge_seq(Db), + id_btree_state=nil, view_states=[{nil, 0, 0} || _ <- Views]}); +init_group(_Db, Fd, #group{def_lang=Lang,views=Views}= + Group, IndexHeader) -> #index_header{seq=Seq, purge_seq=PurgeSeq, id_btree_state=IdBtreeState, view_states=ViewStates} = IndexHeader, StateUpdate = fun |