From 7393d62b7b630bee50f609d0ae8125d33f7cda2b Mon Sep 17 00:00:00 2001 From: Adam Kocoloski Date: Wed, 18 Aug 2010 11:51:03 -0400 Subject: Grab bag of Cloudant patches to couch OTP application - Removal of couch_db and couch_ref_counter processes. Active DBs are accessible through a protected ets table owned by couch_server. - #full_doc_info{} in by_id and by_seq trees for faster compaction at the expense of more disk usage afterwards. Proposed as COUCHDB-738 but not accepted upstream. - Replication via distributed Erlang. - Better hot upgrade support (uses exported functions much more often). - Configurable btree chunk sizes allow for larger (but still bounded) reductions. - Shorter names for btree fields in #db{} and #db_header{}. - couch_view_group does not keep a reference to the #db{}. - Terms are stored compressed (again). --- apps/couch/src/couch_view_group.erl | 197 +++++++++++++++++------------------- 1 file changed, 93 insertions(+), 104 deletions(-) (limited to 'apps/couch/src/couch_view_group.erl') diff --git a/apps/couch/src/couch_view_group.erl b/apps/couch/src/couch_view_group.erl index f01befdf..f11bb54d 100644 --- a/apps/couch/src/couch_view_group.erl +++ b/apps/couch/src/couch_view_group.erl @@ -39,8 +39,7 @@ request_group(Pid, Seq) -> ?LOG_DEBUG("request_group {Pid, Seq} ~p", [{Pid, Seq}]), case gen_server:call(Pid, {request_group, Seq}, infinity) of - {ok, Group, RefCounter} -> - couch_ref_counter:add(RefCounter), + {ok, Group, _RefCounter} -> {ok, Group}; Error -> ?LOG_DEBUG("request_group Error ~p", [Error]), @@ -75,27 +74,26 @@ start_link(InitArgs) -> end. % init creates a closure which spawns the appropriate view_updater. -init({InitArgs, ReturnPid, Ref}) -> +init({{_, DbName, _}=InitArgs, ReturnPid, Ref}) -> process_flag(trap_exit, true), case prepare_group(InitArgs, false) of - {ok, #group{db=Db, fd=Fd, current_seq=Seq}=Group} -> + {ok, #group{fd=Fd, current_seq=Seq}=Group} -> + {ok, Db} = couch_db:open(DbName, []), case Seq > couch_db:get_update_seq(Db) of true -> ReturnPid ! {Ref, self(), {error, invalid_view_seq}}, + couch_db:close(Db), ignore; _ -> - couch_db:monitor(Db), + try couch_db:monitor(Db) after couch_db:close(Db) end, Owner = self(), - Pid = spawn_link( - fun()-> couch_view_updater:update(Owner, Group) end - ), - {ok, RefCounter} = couch_ref_counter:start([Fd]), + Pid = spawn_link(fun()-> couch_view_updater:update(Owner, Group) end), {ok, #group_state{ - db_name=couch_db:name(Db), + db_name= DbName, init_args=InitArgs, updater_pid = Pid, - group=Group, - ref_counter=RefCounter}} + group=Group#group{dbname=DbName}, + ref_counter=erlang:monitor(process,Fd)}} end; Error -> ReturnPid ! {Ref, self(), Error}, @@ -120,19 +118,16 @@ init({InitArgs, ReturnPid, Ref}) -> handle_call({request_group, RequestSeq}, From, #group_state{ - db_name=DbName, group=#group{current_seq=Seq}=Group, updater_pid=nil, waiting_list=WaitList }=State) when RequestSeq > Seq -> - {ok, Db} = couch_db:open_int(DbName, []), - Group2 = Group#group{db=Db}, Owner = self(), - Pid = spawn_link(fun()-> couch_view_updater:update(Owner, Group2) end), + Pid = spawn_link(fun()-> couch_view_updater:update(Owner, Group) end), {noreply, State#group_state{ updater_pid=Pid, - group=Group2, + group=Group, waiting_list=[{From,RequestSeq}|WaitList] }, infinity}; @@ -153,6 +148,10 @@ handle_call({request_group, RequestSeq}, From, waiting_list=[{From, RequestSeq}|WaitList] }, infinity}; +handle_call({start_compact, CompactFun}, _From, State) -> + {noreply, NewState} = handle_cast({start_compact, CompactFun}, State), + {reply, {ok, NewState#group_state.compactor_pid}, NewState}; + handle_call(request_group_info, _From, State) -> GroupInfo = get_group_info(State), {reply, {ok, GroupInfo}, State}. @@ -160,24 +159,23 @@ handle_call(request_group_info, _From, State) -> handle_cast({start_compact, CompactFun}, #group_state{compactor_pid=nil} = State) -> #group_state{ - group = #group{name = GroupId, sig = GroupSig} = Group, - init_args = {RootDir, DbName, _} + group = #group{dbname = DbName, name = GroupId, sig = GroupSig} = Group, + init_args = {RootDir, _, _} } = State, ?LOG_INFO("View index compaction starting for ~s ~s", [DbName, GroupId]), - {ok, Db} = couch_db:open_int(DbName, []), {ok, Fd} = open_index_file(compact, RootDir, DbName, GroupSig), - NewGroup = reset_file(Db, Fd, DbName, Group), + NewGroup = reset_file(Fd, DbName, Group), Pid = spawn_link(fun() -> CompactFun(Group, NewGroup) end), {noreply, State#group_state{compactor_pid = Pid}}; handle_cast({start_compact, _}, State) -> %% compact already running, this is a no-op {noreply, State}; -handle_cast({compact_done, #group{current_seq=NewSeq} = NewGroup}, +handle_cast({compact_done, #group{fd=NewFd, current_seq=NewSeq} = NewGroup}, #group_state{group = #group{current_seq=OldSeq}} = State) when NewSeq >= OldSeq -> #group_state{ - group = #group{name=GroupId, fd=OldFd, sig=GroupSig} = Group, + group = #group{name=GroupId, fd=OldFd, sig=GroupSig}, init_args = {RootDir, DbName, _}, updater_pid = UpdaterPid, ref_counter = RefCounter @@ -202,17 +200,12 @@ handle_cast({compact_done, #group{current_seq=NewSeq} = NewGroup}, %% cleanup old group unlink(OldFd), - couch_ref_counter:drop(RefCounter), - {ok, NewRefCounter} = couch_ref_counter:start([NewGroup#group.fd]), - case Group#group.db of - nil -> ok; - Else -> couch_db:close(Else) - end, + erlang:demonitor(RefCounter), self() ! delayed_commit, {noreply, State#group_state{ group=NewGroup, - ref_counter=NewRefCounter, + ref_counter=erlang:monitor(process,NewFd), compactor_pid=nil, updater_pid=NewUpdaterPid }}; @@ -223,17 +216,14 @@ handle_cast({compact_done, NewGroup}, State) -> } = State, ?LOG_INFO("View index compaction still behind for ~s ~s -- current: ~p " ++ "compact: ~p", [DbName, GroupId, CurrentSeq, NewGroup#group.current_seq]), - couch_db:close(NewGroup#group.db), - {ok, Db} = couch_db:open_int(DbName, []), + GroupServer = self(), Pid = spawn_link(fun() -> + erlang:monitor(process, NewGroup#group.fd), {_,Ref} = erlang:spawn_monitor(fun() -> - couch_view_updater:update(nil, NewGroup#group{db = Db}) + couch_view_updater:update(nil, NewGroup) end), - receive - {'DOWN', Ref, _, _, {new_group, NewGroup2}} -> - #group{name=GroupId} = NewGroup2, - Pid2 = couch_view:get_group_server(DbName, GroupId), - gen_server:cast(Pid2, {compact_done, NewGroup2}) + receive {'DOWN', Ref, _, _, {new_group, NewGroup2}} -> + gen_server:cast(GroupServer, {compact_done, NewGroup2}) end end), {noreply, State#group_state{compactor_pid = Pid}}; @@ -245,7 +235,7 @@ handle_cast({partial_update, Pid, NewGroup}, #group_state{updater_pid=Pid} waiting_commit = WaitingCommit } = State, NewSeq = NewGroup#group.current_seq, - ?LOG_INFO("checkpointing view update at seq ~p for ~s ~s", [NewSeq, + ?LOG_DEBUG("checkpointing view update at seq ~p for ~s ~s", [NewSeq, DbName, NewGroup#group.name]), if not WaitingCommit -> erlang:send_after(1000, self(), delayed_commit); @@ -275,13 +265,12 @@ handle_info(delayed_commit, #group_state{db_name=DbName,group=Group}=State) -> {noreply, State#group_state{waiting_commit=true}} end; -handle_info({'EXIT', FromPid, {new_group, #group{db=Db}=Group}}, - #group_state{db_name=DbName, +handle_info({'EXIT', FromPid, {new_group, Group}}, + #group_state{ updater_pid=UpPid, ref_counter=RefCounter, waiting_list=WaitList, waiting_commit=WaitingCommit}=State) when UpPid == FromPid -> - ok = couch_db:close(Db), if not WaitingCommit -> erlang:send_after(1000, self(), delayed_commit); true -> ok @@ -289,26 +278,20 @@ handle_info({'EXIT', FromPid, {new_group, #group{db=Db}=Group}}, case reply_with_group(Group, WaitList, [], RefCounter) of [] -> {noreply, State#group_state{waiting_commit=true, waiting_list=[], - group=Group#group{db=nil}, updater_pid=nil}}; + group=Group, updater_pid=nil}}; StillWaiting -> - % we still have some waiters, reopen the database and reupdate the index - {ok, Db2} = couch_db:open_int(DbName, []), - Group2 = Group#group{db=Db2}, + % we still have some waiters, reupdate the index Owner = self(), - Pid = spawn_link(fun() -> couch_view_updater:update(Owner, Group2) end), + Pid = spawn_link(fun() -> couch_view_updater:update(Owner, Group) end), {noreply, State#group_state{waiting_commit=true, - waiting_list=StillWaiting, group=Group2, updater_pid=Pid}} + waiting_list=StillWaiting, group=Group, updater_pid=Pid}} end; handle_info({'EXIT', _, {new_group, _}}, State) -> %% message from an old (probably pre-compaction) updater; ignore {noreply, State}; -handle_info({'EXIT', FromPid, reset}, - #group_state{ - init_args=InitArgs, - updater_pid=UpPid, - group=Group}=State) when UpPid == FromPid -> - ok = couch_db:close(Group#group.db), +handle_info({'EXIT', FromPid, reset}, #group_state{init_args=InitArgs, + updater_pid=FromPid}=State) -> case prepare_group(InitArgs, true) of {ok, ResetGroup} -> Owner = self(), @@ -334,8 +317,9 @@ handle_info({'EXIT', FromPid, Reason}, State) -> ?LOG_DEBUG("Exit from linked pid: ~p", [{FromPid, Reason}]), {stop, Reason, State}; -handle_info({'DOWN',_,_,_,_}, State) -> - ?LOG_INFO("Shutting down view group server, monitored db is closing.", []), +handle_info({'DOWN',_,_,Pid,Reason}, #group_state{group=G}=State) -> + ?LOG_INFO("Shutting down group server ~p, db ~p closing w/ reason~n~p", + [G#group.name, Pid, Reason]), {stop, normal, reply_all(State, shutdown)}. @@ -371,32 +355,29 @@ reply_all(#group_state{waiting_list=WaitList}=State, Reply) -> [catch gen_server:reply(Pid, Reply) || {Pid, _} <- WaitList], State#group_state{waiting_list=[]}. +prepare_group({Root, DbName, #group{dbname=X}=G}, Reset) when X =/= DbName -> + prepare_group({Root, DbName, G#group{dbname=DbName}}, Reset); prepare_group({RootDir, DbName, #group{sig=Sig}=Group}, ForceReset)-> - case couch_db:open_int(DbName, []) of - {ok, Db} -> - case open_index_file(RootDir, DbName, Sig) of - {ok, Fd} -> - if ForceReset -> - % this can happen if we missed a purge - {ok, reset_file(Db, Fd, DbName, Group)}; - true -> - % 09 UPGRADE CODE - ok = couch_file:upgrade_old_header(Fd, <<$r, $c, $k, 0>>), - case (catch couch_file:read_header(Fd)) of - {ok, {Sig, HeaderInfo}} -> - % sigs match! - {ok, init_group(Db, Fd, Group, HeaderInfo)}; - _ -> - % this happens on a new file - {ok, reset_file(Db, Fd, DbName, Group)} - end - end; - Error -> - catch delete_index_file(RootDir, DbName, Sig), - Error + case open_index_file(RootDir, DbName, Sig) of + {ok, Fd} -> + if ForceReset -> + % this can happen if we missed a purge + {ok, reset_file(Fd, DbName, Group)}; + true -> + % 09 UPGRADE CODE + ok = couch_file:upgrade_old_header(Fd, <<$r, $c, $k, 0>>), + case (catch couch_file:read_header(Fd)) of + {ok, {Sig, HeaderInfo}} -> + % sigs match! + {ok, init_group(Fd, Group, HeaderInfo)}; + _ -> + % this happens on a new file + {ok, reset_file(Fd, DbName, Group)} + end end; - Else -> - Else + Error -> + catch delete_index_file(RootDir, DbName, Sig), + Error end. get_index_header_data(#group{current_seq=Seq, purge_seq=PurgeSeq, @@ -446,7 +427,7 @@ open_temp_group(DbName, Language, DesignOptions, MapSrc, RedSrc) -> reduce_funs= if RedSrc==[] -> []; true -> [{<<"_temp">>, RedSrc}] end, options=DesignOptions}, - {ok, Db, set_view_sig(#group{name = <<"_temp">>, db=Db, views=[View], + {ok, Db, set_view_sig(#group{name = <<"_temp">>, views=[View], def_lang=Language, design_options=DesignOptions})}; Error -> Error @@ -531,28 +512,39 @@ design_doc_to_view_group(#doc{id=Id,body={Fields}}) -> {View#view{id_num=N},N+1} end, 0, lists:sort(dict:to_list(DictBySrc))), - set_view_sig(#group{name=Id, views=Views, def_lang=Language, design_options=DesignOptions}). - -reset_group(#group{views=Views}=Group) -> - Views2 = [View#view{btree=nil} || View <- Views], - Group#group{db=nil,fd=nil,query_server=nil,current_seq=0, - id_btree=nil,views=Views2}. - -reset_file(Db, Fd, DbName, #group{sig=Sig,name=Name} = Group) -> - ?LOG_DEBUG("Resetting group index \"~s\" in db ~s", [Name, DbName]), + #group{ + name = Id, + views = Views, + def_lang = Language, + design_options = DesignOptions, + sig = couch_util:md5(term_to_binary({Views, Language, DesignOptions})) + }. + +reset_group(DbName, #group{views=Views}=Group) -> + Group#group{ + fd = nil, + dbname = DbName, + query_server = nil, + current_seq = 0, + id_btree = nil, + views = [View#view{btree=nil} || View <- Views] + }. + +reset_file(Fd, DbName, #group{sig=Sig,name=Name} = Group) -> + ?LOG_INFO("Resetting group index \"~s\" in db ~s", [Name, DbName]), ok = couch_file:truncate(Fd, 0), ok = couch_file:write_header(Fd, {Sig, nil}), - init_group(Db, Fd, reset_group(Group), nil). + init_group(Fd, reset_group(DbName, Group), nil). delete_index_file(RootDir, DbName, GroupSig) -> couch_file:delete(RootDir, index_file_name(RootDir, DbName, GroupSig)). -init_group(Db, Fd, #group{views=Views}=Group, nil) -> - init_group(Db, Fd, Group, - #index_header{seq=0, purge_seq=couch_db:get_purge_seq(Db), - id_btree_state=nil, view_states=[nil || _ <- Views]}); -init_group(Db, Fd, #group{def_lang=Lang,views=Views}= - Group, IndexHeader) -> +init_group(Fd, #group{dbname=DbName, views=Views}=Group, nil) -> + {ok, Db} = couch_db:open(DbName, []), + PurgeSeq = try couch_db:get_purge_seq(Db) after couch_db:close(Db) end, + Header = #index_header{purge_seq=PurgeSeq, view_states=[nil || _ <- Views]}, + init_group(Fd, Group, Header); +init_group(Fd, #group{def_lang=Lang,views=Views}=Group, IndexHeader) -> #index_header{seq=Seq, purge_seq=PurgeSeq, id_btree_state=IdBtreeState, view_states=ViewStates} = IndexHeader, {ok, IdBtree} = couch_btree:open(IdBtreeState, Fd), @@ -580,13 +572,10 @@ init_group(Db, Fd, #group{def_lang=Lang,views=Views}= <<"raw">> -> Less = fun(A,B) -> A < B end end, - {ok, Btree} = couch_btree:open(BtreeState, Fd, - [{less, Less}, - {reduce, ReduceFun}]), + {ok, Btree} = couch_btree:open(BtreeState, Fd, [{less, Less}, + {reduce, ReduceFun}]), View#view{btree=Btree} end, ViewStates, Views), - Group#group{db=Db, fd=Fd, current_seq=Seq, purge_seq=PurgeSeq, - id_btree=IdBtree, views=Views2}. - - + Group#group{fd=Fd, current_seq=Seq, purge_seq=PurgeSeq, id_btree=IdBtree, + views=Views2}. -- cgit v1.2.3