diff options
author | Damien F. Katz <damien@apache.org> | 2009-01-05 17:03:02 +0000 |
---|---|---|
committer | Damien F. Katz <damien@apache.org> | 2009-01-05 17:03:02 +0000 |
commit | 75e24cb09c6222713224540a1d82b6539c71ac9a (patch) | |
tree | ac0ec8b7457c30a3501aca5711f2f78c205cf74d /src | |
parent | ae1ad0ae6738783ce15918657fddddcbf176d940 (diff) |
Fixed views to not commit index headers if they are ahead of what has been fully committed to the database. Also, the index headers are now committed async, improving response times updating views.
git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@731618 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src')
-rw-r--r-- | src/couchdb/couch_db.erl | 6 | ||||
-rw-r--r-- | src/couchdb/couch_db.hrl | 6 | ||||
-rw-r--r-- | src/couchdb/couch_db_updater.erl | 16 | ||||
-rw-r--r-- | src/couchdb/couch_view_group.erl | 58 | ||||
-rw-r--r-- | src/couchdb/couch_view_updater.erl | 12 |
5 files changed, 56 insertions, 42 deletions
diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl index 7c86eeae..c9546240 100644 --- a/src/couchdb/couch_db.erl +++ b/src/couchdb/couch_db.erl @@ -17,7 +17,7 @@ -export([open_ref_counted/2,num_refs/1,monitor/1]). -export([update_doc/3,update_docs/4,update_docs/2,update_docs/3,delete_doc/3]). -export([get_doc_info/2,open_doc/2,open_doc/3,open_doc_revs/4]). --export([get_missing_revs/2,name/1,doc_to_tree/1,get_update_seq/1]). +-export([get_missing_revs/2,name/1,doc_to_tree/1,get_update_seq/1,get_committed_update_seq/1]). -export([enum_docs/4,enum_docs/5,enum_docs_since/4,enum_docs_since/5]). -export([enum_docs_since_reduce_to_count/1,enum_docs_reduce_to_count/1]). -export([increment_update_seq/1,get_purge_seq/1,purge_docs/2,get_last_purged/1]). @@ -147,8 +147,10 @@ increment_update_seq(#db{update_pid=UpdatePid}) -> purge_docs(#db{update_pid=UpdatePid}, IdsRevs) -> gen_server:call(UpdatePid, {purge_docs, IdsRevs}). +get_committed_update_seq(#db{header=#db_header{update_seq=Seq}}) -> + Seq. -get_update_seq(#db{header=#db_header{update_seq=Seq}})-> +get_update_seq(#db{update_seq=Seq})-> Seq. get_purge_seq(#db{header=#db_header{purge_seq=PurgeSeq}})-> diff --git a/src/couchdb/couch_db.hrl b/src/couchdb/couch_db.hrl index 5dbbcee8..830f2f9a 100644 --- a/src/couchdb/couch_db.hrl +++ b/src/couchdb/couch_db.hrl @@ -154,7 +154,8 @@ }). -record(group, - {sig=nil, + {type=view, % can also be slow_view + sig=nil, db=nil, fd=nil, name, @@ -162,10 +163,9 @@ views, id_btree=nil, current_seq=0, - committed_seq=0, purge_seq=0, query_server=nil, - commit_fun + waiting_delayed_commit=nil }). -record(view, diff --git a/src/couchdb/couch_db_updater.erl b/src/couchdb/couch_db_updater.erl index d02464ca..049a354e 100644 --- a/src/couchdb/couch_db_updater.erl +++ b/src/couchdb/couch_db_updater.erl @@ -503,19 +503,21 @@ commit_data(Db) -> commit_data(#db{fd=Fd, header=Header} = Db, Delay) -> - Db2 = Db#db{header = Header#db_header{ + Header2 = Header#db_header{ update_seq = Db#db.update_seq, summary_stream_state = couch_stream:get_state(Db#db.summary_stream), docinfo_by_seq_btree_state = couch_btree:get_state(Db#db.docinfo_by_seq_btree), fulldocinfo_by_id_btree_state = couch_btree:get_state(Db#db.fulldocinfo_by_id_btree), local_docs_btree_state = couch_btree:get_state(Db#db.local_docs_btree), admins_ptr = Db#db.admins_ptr - }}, - if Delay and (Db#db.waiting_delayed_commit == nil) -> - Db2#db{waiting_delayed_commit= + }, + if Header == Header2 -> + Db; + Delay and (Db#db.waiting_delayed_commit == nil) -> + Db#db{waiting_delayed_commit= erlang:send_after(1000, self(), delayed_commit)}; Delay -> - Db2; + Db; true -> if Db#db.waiting_delayed_commit /= nil -> case erlang:cancel_timer(Db#db.waiting_delayed_commit) of @@ -524,8 +526,8 @@ commit_data(#db{fd=Fd, header=Header} = Db, Delay) -> end; true -> ok end, - ok = couch_file:write_header(Fd, ?HEADER_SIG, Db2#db.header), - Db2#db{waiting_delayed_commit=nil} + ok = couch_file:write_header(Fd, ?HEADER_SIG, Header2), + Db#db{waiting_delayed_commit=nil,header=Header2} end. copy_raw_doc(SrcFd, SrcSp, DestFd, DestStream) -> diff --git a/src/couchdb/couch_view_group.erl b/src/couchdb/couch_view_group.erl index 69e90633..45e92dce 100644 --- a/src/couchdb/couch_view_group.erl +++ b/src/couchdb/couch_view_group.erl @@ -24,10 +24,12 @@ -include("couch_db.hrl"). -record(group_state, { + type, db_name, init_args, group, updater_pid=nil, + waiting_commit=false, waiting_list=[] }). @@ -136,24 +138,47 @@ handle_cast(foo, State) -> {ok, State}. +handle_info(delayed_commit, #group_state{db_name=DbName,group=Group}=State) -> + {ok, Db} = couch_db:open(DbName, []), + CommittedSeq = couch_db:get_committed_update_seq(Db), + couch_db:close(Db), + if CommittedSeq >= Group#group.current_seq -> + % save the header + Header = {Group#group.sig, get_index_header_data(Group)}, + ok = couch_file:write_header(Group#group.fd, <<$r, $c, $k, 0>>, Header), + {noreply, State#group_state{waiting_commit=false}}; + true -> + % We can't commit the header because the database seq that's fully + % committed to disk is still behind us. It we committed now and the + % database lost those changes our view could be forever out of sync + % with the database. But a crash before we commit these changes, no big + % deal, we only lose incremental changes since last committal. + erlang:send_after(1000, self(), delayed_commit), + {noreply, State#group_state{waiting_commit=true}} + end; + handle_info({'EXIT', FromPid, {new_group, #group{db=Db}=Group}}, #group_state{db_name=DbName, updater_pid=UpPid, - waiting_list=WaitList}=State) when UpPid == FromPid -> + waiting_list=WaitList, + waiting_commit=WaitingCommit}=State) when UpPid == FromPid -> ok = couch_db:close(Db), + + if Group#group.type == view andalso not WaitingCommit -> + erlang:send_after(1000, self(), delayed_commit); + true -> ok + end, case reply_with_group(Group, WaitList, []) of [] -> - {noreply, State#group_state{waiting_list=[], - group=Group#group{db=nil}, - updater_pid=nil}}; + {noreply, State#group_state{waiting_commit=true, waiting_list=[], + group=Group#group{db=nil}, updater_pid=nil}}; StillWaiting -> % we still have some waiters, reopen the database and reupdate the index {ok, Db2} = couch_db:open(DbName, []), Group2 = Group#group{db=Db2}, Pid = spawn_link(fun() -> couch_view_updater:update(Group2) end), - {noreply, State#group_state{waiting_list=StillWaiting, - group=Group2, - updater_pid=Pid}} + {noreply, State#group_state{waiting_commit=true, + waiting_list=StillWaiting, group=Group2, updater_pid=Pid}} end; handle_info({'EXIT', FromPid, reset}, @@ -184,17 +209,13 @@ handle_info({'DOWN',_,_,_,_}, State) -> {stop, normal, reply_all(State, shutdown)}. -terminate(Reason, #group_state{group=#group{fd=Fd}}=State) -> +terminate(Reason, State) -> reply_all(State, Reason), ok. code_change(_OldVsn, State, _Extra) -> {ok, State}. -% error handling? the updater could die on us, we can save ourselves here. -% but we shouldn't, we could be dead for a reason, like the view got changed, or something. - - %% Local Functions % reply_with_group/3 @@ -219,14 +240,9 @@ reply_all(#group_state{waiting_list=WaitList}=State, Reply) -> prepare_group({view, RootDir, DbName, GroupId}, ForceReset)-> case open_db_group(DbName, GroupId) of - {ok, Db, #group{sig=Sig}=Group0} -> + {ok, Db, #group{sig=Sig}=Group} -> case open_index_file(RootDir, DbName, GroupId) of - {ok, Fd} -> - Group = Group0#group{ - commit_fun = fun(GroupIn) -> - Header = {Sig, get_index_header_data(GroupIn)}, - ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, Header) - end}, + {ok, Fd} -> if ForceReset -> {ok, reset_file(Db, Fd, DbName, Group)}; true -> @@ -254,8 +270,8 @@ prepare_group({slow_view, DbName, Fd, Lang, MapSrc, RedSrc}, _ForceReset) -> btree=nil, def=MapSrc, reduce_funs= if RedSrc==[] -> []; true -> [{"_temp", RedSrc}] end}, - {ok, init_group(Db, Fd, #group{name="_temp", db=Db, views=[View], - def_lang=Lang, commit_fun=fun(_G) -> ok end}, nil)}; + {ok, init_group(Db, Fd, #group{type=slow_view, name="_temp", db=Db, + views=[View], def_lang=Lang}, nil)}; Error -> Error end. diff --git a/src/couchdb/couch_view_updater.erl b/src/couchdb/couch_view_updater.erl index 0532258c..940602dd 100644 --- a/src/couchdb/couch_view_updater.erl +++ b/src/couchdb/couch_view_updater.erl @@ -16,8 +16,7 @@ -include("couch_db.hrl"). -update(#group{db=Db,current_seq=Seq,purge_seq=PurgeSeq, - commit_fun=CommitFun}=Group) -> +update(#group{db=Db,current_seq=Seq,purge_seq=PurgeSeq}=Group) -> ?LOG_DEBUG("Starting index update.",[]), DbPurgeSeq = couch_db:get_purge_seq(Db), Group2 = @@ -45,14 +44,9 @@ update(#group{db=Db,current_seq=Seq,purge_seq=PurgeSeq, UncomputedDocs, Results, ViewKVsToAdd, DocIdViewIdKeys), couch_query_servers:stop_doc_map(Group4#group.query_server), NewSeq = couch_db:get_update_seq(Db), - if Seq /= NewSeq -> - {ok, Group5} = write_changes(Group4, ViewKVsToAdd2, DocIdViewIdKeys2, + {ok, Group5} = write_changes(Group4, ViewKVsToAdd2, DocIdViewIdKeys2, NewSeq), - ok = CommitFun(Group5), - exit({new_group, Group5#group{query_server=nil}}); - true -> - exit({new_group, Group4#group{query_server=nil}}) - end. + exit({new_group, Group5#group{query_server=nil}}). purge_index(#group{db=Db, views=Views, id_btree=IdBtree}=Group) -> |