From 75e24cb09c6222713224540a1d82b6539c71ac9a Mon Sep 17 00:00:00 2001 From: "Damien F. Katz" Date: Mon, 5 Jan 2009 17:03:02 +0000 Subject: Fixed views to not commit index headers if they are ahead of what has been fully committed to the database. Also, the index headers are now committed async, improving response times updating views. git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@731618 13f79535-47bb-0310-9956-ffa450edef68 --- src/couchdb/couch_view_group.erl | 58 +++++++++++++++++++++++++--------------- 1 file changed, 37 insertions(+), 21 deletions(-) (limited to 'src/couchdb/couch_view_group.erl') diff --git a/src/couchdb/couch_view_group.erl b/src/couchdb/couch_view_group.erl index 69e90633..45e92dce 100644 --- a/src/couchdb/couch_view_group.erl +++ b/src/couchdb/couch_view_group.erl @@ -24,10 +24,12 @@ -include("couch_db.hrl"). -record(group_state, { + type, db_name, init_args, group, updater_pid=nil, + waiting_commit=false, waiting_list=[] }). @@ -136,24 +138,47 @@ handle_cast(foo, State) -> {ok, State}. +handle_info(delayed_commit, #group_state{db_name=DbName,group=Group}=State) -> + {ok, Db} = couch_db:open(DbName, []), + CommittedSeq = couch_db:get_committed_update_seq(Db), + couch_db:close(Db), + if CommittedSeq >= Group#group.current_seq -> + % save the header + Header = {Group#group.sig, get_index_header_data(Group)}, + ok = couch_file:write_header(Group#group.fd, <<$r, $c, $k, 0>>, Header), + {noreply, State#group_state{waiting_commit=false}}; + true -> + % We can't commit the header because the database seq that's fully + % committed to disk is still behind us. It we committed now and the + % database lost those changes our view could be forever out of sync + % with the database. But a crash before we commit these changes, no big + % deal, we only lose incremental changes since last committal. + erlang:send_after(1000, self(), delayed_commit), + {noreply, State#group_state{waiting_commit=true}} + end; + handle_info({'EXIT', FromPid, {new_group, #group{db=Db}=Group}}, #group_state{db_name=DbName, updater_pid=UpPid, - waiting_list=WaitList}=State) when UpPid == FromPid -> + waiting_list=WaitList, + waiting_commit=WaitingCommit}=State) when UpPid == FromPid -> ok = couch_db:close(Db), + + if Group#group.type == view andalso not WaitingCommit -> + erlang:send_after(1000, self(), delayed_commit); + true -> ok + end, case reply_with_group(Group, WaitList, []) of [] -> - {noreply, State#group_state{waiting_list=[], - group=Group#group{db=nil}, - updater_pid=nil}}; + {noreply, State#group_state{waiting_commit=true, waiting_list=[], + group=Group#group{db=nil}, updater_pid=nil}}; StillWaiting -> % we still have some waiters, reopen the database and reupdate the index {ok, Db2} = couch_db:open(DbName, []), Group2 = Group#group{db=Db2}, Pid = spawn_link(fun() -> couch_view_updater:update(Group2) end), - {noreply, State#group_state{waiting_list=StillWaiting, - group=Group2, - updater_pid=Pid}} + {noreply, State#group_state{waiting_commit=true, + waiting_list=StillWaiting, group=Group2, updater_pid=Pid}} end; handle_info({'EXIT', FromPid, reset}, @@ -184,17 +209,13 @@ handle_info({'DOWN',_,_,_,_}, State) -> {stop, normal, reply_all(State, shutdown)}. -terminate(Reason, #group_state{group=#group{fd=Fd}}=State) -> +terminate(Reason, State) -> reply_all(State, Reason), ok. code_change(_OldVsn, State, _Extra) -> {ok, State}. -% error handling? the updater could die on us, we can save ourselves here. -% but we shouldn't, we could be dead for a reason, like the view got changed, or something. - - %% Local Functions % reply_with_group/3 @@ -219,14 +240,9 @@ reply_all(#group_state{waiting_list=WaitList}=State, Reply) -> prepare_group({view, RootDir, DbName, GroupId}, ForceReset)-> case open_db_group(DbName, GroupId) of - {ok, Db, #group{sig=Sig}=Group0} -> + {ok, Db, #group{sig=Sig}=Group} -> case open_index_file(RootDir, DbName, GroupId) of - {ok, Fd} -> - Group = Group0#group{ - commit_fun = fun(GroupIn) -> - Header = {Sig, get_index_header_data(GroupIn)}, - ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, Header) - end}, + {ok, Fd} -> if ForceReset -> {ok, reset_file(Db, Fd, DbName, Group)}; true -> @@ -254,8 +270,8 @@ prepare_group({slow_view, DbName, Fd, Lang, MapSrc, RedSrc}, _ForceReset) -> btree=nil, def=MapSrc, reduce_funs= if RedSrc==[] -> []; true -> [{"_temp", RedSrc}] end}, - {ok, init_group(Db, Fd, #group{name="_temp", db=Db, views=[View], - def_lang=Lang, commit_fun=fun(_G) -> ok end}, nil)}; + {ok, init_group(Db, Fd, #group{type=slow_view, name="_temp", db=Db, + views=[View], def_lang=Lang}, nil)}; Error -> Error end. -- cgit v1.2.3