diff options
Diffstat (limited to 'src/couchdb')
-rw-r--r-- | src/couchdb/couch_db.erl | 6 | ||||
-rw-r--r-- | src/couchdb/couch_db.hrl | 6 | ||||
-rw-r--r-- | src/couchdb/couch_db_updater.erl | 16 | ||||
-rw-r--r-- | src/couchdb/couch_view_group.erl | 58 | ||||
-rw-r--r-- | src/couchdb/couch_view_updater.erl | 12 |
5 files changed, 56 insertions, 42 deletions
diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl index 7c86eeae..c9546240 100644 --- a/src/couchdb/couch_db.erl +++ b/src/couchdb/couch_db.erl @@ -17,7 +17,7 @@ -export([open_ref_counted/2,num_refs/1,monitor/1]). -export([update_doc/3,update_docs/4,update_docs/2,update_docs/3,delete_doc/3]). -export([get_doc_info/2,open_doc/2,open_doc/3,open_doc_revs/4]). --export([get_missing_revs/2,name/1,doc_to_tree/1,get_update_seq/1]). +-export([get_missing_revs/2,name/1,doc_to_tree/1,get_update_seq/1,get_committed_update_seq/1]). -export([enum_docs/4,enum_docs/5,enum_docs_since/4,enum_docs_since/5]). -export([enum_docs_since_reduce_to_count/1,enum_docs_reduce_to_count/1]). -export([increment_update_seq/1,get_purge_seq/1,purge_docs/2,get_last_purged/1]). @@ -147,8 +147,10 @@ increment_update_seq(#db{update_pid=UpdatePid}) -> purge_docs(#db{update_pid=UpdatePid}, IdsRevs) -> gen_server:call(UpdatePid, {purge_docs, IdsRevs}). +get_committed_update_seq(#db{header=#db_header{update_seq=Seq}}) -> + Seq. -get_update_seq(#db{header=#db_header{update_seq=Seq}})-> +get_update_seq(#db{update_seq=Seq})-> Seq. get_purge_seq(#db{header=#db_header{purge_seq=PurgeSeq}})-> diff --git a/src/couchdb/couch_db.hrl b/src/couchdb/couch_db.hrl index 5dbbcee8..830f2f9a 100644 --- a/src/couchdb/couch_db.hrl +++ b/src/couchdb/couch_db.hrl @@ -154,7 +154,8 @@ }). -record(group, - {sig=nil, + {type=view, % can also be slow_view + sig=nil, db=nil, fd=nil, name, @@ -162,10 +163,9 @@ views, id_btree=nil, current_seq=0, - committed_seq=0, purge_seq=0, query_server=nil, - commit_fun + waiting_delayed_commit=nil }). -record(view, diff --git a/src/couchdb/couch_db_updater.erl b/src/couchdb/couch_db_updater.erl index d02464ca..049a354e 100644 --- a/src/couchdb/couch_db_updater.erl +++ b/src/couchdb/couch_db_updater.erl @@ -503,19 +503,21 @@ commit_data(Db) -> commit_data(#db{fd=Fd, header=Header} = Db, Delay) -> - Db2 = Db#db{header = Header#db_header{ + Header2 = Header#db_header{ update_seq = Db#db.update_seq, summary_stream_state = couch_stream:get_state(Db#db.summary_stream), docinfo_by_seq_btree_state = couch_btree:get_state(Db#db.docinfo_by_seq_btree), fulldocinfo_by_id_btree_state = couch_btree:get_state(Db#db.fulldocinfo_by_id_btree), local_docs_btree_state = couch_btree:get_state(Db#db.local_docs_btree), admins_ptr = Db#db.admins_ptr - }}, - if Delay and (Db#db.waiting_delayed_commit == nil) -> - Db2#db{waiting_delayed_commit= + }, + if Header == Header2 -> + Db; + Delay and (Db#db.waiting_delayed_commit == nil) -> + Db#db{waiting_delayed_commit= erlang:send_after(1000, self(), delayed_commit)}; Delay -> - Db2; + Db; true -> if Db#db.waiting_delayed_commit /= nil -> case erlang:cancel_timer(Db#db.waiting_delayed_commit) of @@ -524,8 +526,8 @@ commit_data(#db{fd=Fd, header=Header} = Db, Delay) -> end; true -> ok end, - ok = couch_file:write_header(Fd, ?HEADER_SIG, Db2#db.header), - Db2#db{waiting_delayed_commit=nil} + ok = couch_file:write_header(Fd, ?HEADER_SIG, Header2), + Db#db{waiting_delayed_commit=nil,header=Header2} end. copy_raw_doc(SrcFd, SrcSp, DestFd, DestStream) -> diff --git a/src/couchdb/couch_view_group.erl b/src/couchdb/couch_view_group.erl index 69e90633..45e92dce 100644 --- a/src/couchdb/couch_view_group.erl +++ b/src/couchdb/couch_view_group.erl @@ -24,10 +24,12 @@ -include("couch_db.hrl"). -record(group_state, { + type, db_name, init_args, group, updater_pid=nil, + waiting_commit=false, waiting_list=[] }). @@ -136,24 +138,47 @@ handle_cast(foo, State) -> {ok, State}. +handle_info(delayed_commit, #group_state{db_name=DbName,group=Group}=State) -> + {ok, Db} = couch_db:open(DbName, []), + CommittedSeq = couch_db:get_committed_update_seq(Db), + couch_db:close(Db), + if CommittedSeq >= Group#group.current_seq -> + % save the header + Header = {Group#group.sig, get_index_header_data(Group)}, + ok = couch_file:write_header(Group#group.fd, <<$r, $c, $k, 0>>, Header), + {noreply, State#group_state{waiting_commit=false}}; + true -> + % We can't commit the header because the database seq that's fully + % committed to disk is still behind us. It we committed now and the + % database lost those changes our view could be forever out of sync + % with the database. But a crash before we commit these changes, no big + % deal, we only lose incremental changes since last committal. + erlang:send_after(1000, self(), delayed_commit), + {noreply, State#group_state{waiting_commit=true}} + end; + handle_info({'EXIT', FromPid, {new_group, #group{db=Db}=Group}}, #group_state{db_name=DbName, updater_pid=UpPid, - waiting_list=WaitList}=State) when UpPid == FromPid -> + waiting_list=WaitList, + waiting_commit=WaitingCommit}=State) when UpPid == FromPid -> ok = couch_db:close(Db), + + if Group#group.type == view andalso not WaitingCommit -> + erlang:send_after(1000, self(), delayed_commit); + true -> ok + end, case reply_with_group(Group, WaitList, []) of [] -> - {noreply, State#group_state{waiting_list=[], - group=Group#group{db=nil}, - updater_pid=nil}}; + {noreply, State#group_state{waiting_commit=true, waiting_list=[], + group=Group#group{db=nil}, updater_pid=nil}}; StillWaiting -> % we still have some waiters, reopen the database and reupdate the index {ok, Db2} = couch_db:open(DbName, []), Group2 = Group#group{db=Db2}, Pid = spawn_link(fun() -> couch_view_updater:update(Group2) end), - {noreply, State#group_state{waiting_list=StillWaiting, - group=Group2, - updater_pid=Pid}} + {noreply, State#group_state{waiting_commit=true, + waiting_list=StillWaiting, group=Group2, updater_pid=Pid}} end; handle_info({'EXIT', FromPid, reset}, @@ -184,17 +209,13 @@ handle_info({'DOWN',_,_,_,_}, State) -> {stop, normal, reply_all(State, shutdown)}. -terminate(Reason, #group_state{group=#group{fd=Fd}}=State) -> +terminate(Reason, State) -> reply_all(State, Reason), ok. code_change(_OldVsn, State, _Extra) -> {ok, State}. -% error handling? the updater could die on us, we can save ourselves here. -% but we shouldn't, we could be dead for a reason, like the view got changed, or something. - - %% Local Functions % reply_with_group/3 @@ -219,14 +240,9 @@ reply_all(#group_state{waiting_list=WaitList}=State, Reply) -> prepare_group({view, RootDir, DbName, GroupId}, ForceReset)-> case open_db_group(DbName, GroupId) of - {ok, Db, #group{sig=Sig}=Group0} -> + {ok, Db, #group{sig=Sig}=Group} -> case open_index_file(RootDir, DbName, GroupId) of - {ok, Fd} -> - Group = Group0#group{ - commit_fun = fun(GroupIn) -> - Header = {Sig, get_index_header_data(GroupIn)}, - ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, Header) - end}, + {ok, Fd} -> if ForceReset -> {ok, reset_file(Db, Fd, DbName, Group)}; true -> @@ -254,8 +270,8 @@ prepare_group({slow_view, DbName, Fd, Lang, MapSrc, RedSrc}, _ForceReset) -> btree=nil, def=MapSrc, reduce_funs= if RedSrc==[] -> []; true -> [{"_temp", RedSrc}] end}, - {ok, init_group(Db, Fd, #group{name="_temp", db=Db, views=[View], - def_lang=Lang, commit_fun=fun(_G) -> ok end}, nil)}; + {ok, init_group(Db, Fd, #group{type=slow_view, name="_temp", db=Db, + views=[View], def_lang=Lang}, nil)}; Error -> Error end. diff --git a/src/couchdb/couch_view_updater.erl b/src/couchdb/couch_view_updater.erl index 0532258c..940602dd 100644 --- a/src/couchdb/couch_view_updater.erl +++ b/src/couchdb/couch_view_updater.erl @@ -16,8 +16,7 @@ -include("couch_db.hrl"). -update(#group{db=Db,current_seq=Seq,purge_seq=PurgeSeq, - commit_fun=CommitFun}=Group) -> +update(#group{db=Db,current_seq=Seq,purge_seq=PurgeSeq}=Group) -> ?LOG_DEBUG("Starting index update.",[]), DbPurgeSeq = couch_db:get_purge_seq(Db), Group2 = @@ -45,14 +44,9 @@ update(#group{db=Db,current_seq=Seq,purge_seq=PurgeSeq, UncomputedDocs, Results, ViewKVsToAdd, DocIdViewIdKeys), couch_query_servers:stop_doc_map(Group4#group.query_server), NewSeq = couch_db:get_update_seq(Db), - if Seq /= NewSeq -> - {ok, Group5} = write_changes(Group4, ViewKVsToAdd2, DocIdViewIdKeys2, + {ok, Group5} = write_changes(Group4, ViewKVsToAdd2, DocIdViewIdKeys2, NewSeq), - ok = CommitFun(Group5), - exit({new_group, Group5#group{query_server=nil}}); - true -> - exit({new_group, Group4#group{query_server=nil}}) - end. + exit({new_group, Group5#group{query_server=nil}}). purge_index(#group{db=Db, views=Views, id_btree=IdBtree}=Group) -> |