summaryrefslogtreecommitdiff
path: root/src/couchdb
diff options
context:
space:
mode:
authorDamien F. Katz <damien@apache.org>2009-01-05 17:03:02 +0000
committerDamien F. Katz <damien@apache.org>2009-01-05 17:03:02 +0000
commit75e24cb09c6222713224540a1d82b6539c71ac9a (patch)
treeac0ec8b7457c30a3501aca5711f2f78c205cf74d /src/couchdb
parentae1ad0ae6738783ce15918657fddddcbf176d940 (diff)
Fixed views to not commit index headers if they are ahead of what has been fully committed to the database. Also, the index headers are now committed async, improving response times updating views.
git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@731618 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src/couchdb')
-rw-r--r--src/couchdb/couch_db.erl6
-rw-r--r--src/couchdb/couch_db.hrl6
-rw-r--r--src/couchdb/couch_db_updater.erl16
-rw-r--r--src/couchdb/couch_view_group.erl58
-rw-r--r--src/couchdb/couch_view_updater.erl12
5 files changed, 56 insertions, 42 deletions
diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl
index 7c86eeae..c9546240 100644
--- a/src/couchdb/couch_db.erl
+++ b/src/couchdb/couch_db.erl
@@ -17,7 +17,7 @@
-export([open_ref_counted/2,num_refs/1,monitor/1]).
-export([update_doc/3,update_docs/4,update_docs/2,update_docs/3,delete_doc/3]).
-export([get_doc_info/2,open_doc/2,open_doc/3,open_doc_revs/4]).
--export([get_missing_revs/2,name/1,doc_to_tree/1,get_update_seq/1]).
+-export([get_missing_revs/2,name/1,doc_to_tree/1,get_update_seq/1,get_committed_update_seq/1]).
-export([enum_docs/4,enum_docs/5,enum_docs_since/4,enum_docs_since/5]).
-export([enum_docs_since_reduce_to_count/1,enum_docs_reduce_to_count/1]).
-export([increment_update_seq/1,get_purge_seq/1,purge_docs/2,get_last_purged/1]).
@@ -147,8 +147,10 @@ increment_update_seq(#db{update_pid=UpdatePid}) ->
purge_docs(#db{update_pid=UpdatePid}, IdsRevs) ->
gen_server:call(UpdatePid, {purge_docs, IdsRevs}).
+get_committed_update_seq(#db{header=#db_header{update_seq=Seq}}) ->
+ Seq.
-get_update_seq(#db{header=#db_header{update_seq=Seq}})->
+get_update_seq(#db{update_seq=Seq})->
Seq.
get_purge_seq(#db{header=#db_header{purge_seq=PurgeSeq}})->
diff --git a/src/couchdb/couch_db.hrl b/src/couchdb/couch_db.hrl
index 5dbbcee8..830f2f9a 100644
--- a/src/couchdb/couch_db.hrl
+++ b/src/couchdb/couch_db.hrl
@@ -154,7 +154,8 @@
}).
-record(group,
- {sig=nil,
+ {type=view, % can also be slow_view
+ sig=nil,
db=nil,
fd=nil,
name,
@@ -162,10 +163,9 @@
views,
id_btree=nil,
current_seq=0,
- committed_seq=0,
purge_seq=0,
query_server=nil,
- commit_fun
+ waiting_delayed_commit=nil
}).
-record(view,
diff --git a/src/couchdb/couch_db_updater.erl b/src/couchdb/couch_db_updater.erl
index d02464ca..049a354e 100644
--- a/src/couchdb/couch_db_updater.erl
+++ b/src/couchdb/couch_db_updater.erl
@@ -503,19 +503,21 @@ commit_data(Db) ->
commit_data(#db{fd=Fd, header=Header} = Db, Delay) ->
- Db2 = Db#db{header = Header#db_header{
+ Header2 = Header#db_header{
update_seq = Db#db.update_seq,
summary_stream_state = couch_stream:get_state(Db#db.summary_stream),
docinfo_by_seq_btree_state = couch_btree:get_state(Db#db.docinfo_by_seq_btree),
fulldocinfo_by_id_btree_state = couch_btree:get_state(Db#db.fulldocinfo_by_id_btree),
local_docs_btree_state = couch_btree:get_state(Db#db.local_docs_btree),
admins_ptr = Db#db.admins_ptr
- }},
- if Delay and (Db#db.waiting_delayed_commit == nil) ->
- Db2#db{waiting_delayed_commit=
+ },
+ if Header == Header2 ->
+ Db;
+ Delay and (Db#db.waiting_delayed_commit == nil) ->
+ Db#db{waiting_delayed_commit=
erlang:send_after(1000, self(), delayed_commit)};
Delay ->
- Db2;
+ Db;
true ->
if Db#db.waiting_delayed_commit /= nil ->
case erlang:cancel_timer(Db#db.waiting_delayed_commit) of
@@ -524,8 +526,8 @@ commit_data(#db{fd=Fd, header=Header} = Db, Delay) ->
end;
true -> ok
end,
- ok = couch_file:write_header(Fd, ?HEADER_SIG, Db2#db.header),
- Db2#db{waiting_delayed_commit=nil}
+ ok = couch_file:write_header(Fd, ?HEADER_SIG, Header2),
+ Db#db{waiting_delayed_commit=nil,header=Header2}
end.
copy_raw_doc(SrcFd, SrcSp, DestFd, DestStream) ->
diff --git a/src/couchdb/couch_view_group.erl b/src/couchdb/couch_view_group.erl
index 69e90633..45e92dce 100644
--- a/src/couchdb/couch_view_group.erl
+++ b/src/couchdb/couch_view_group.erl
@@ -24,10 +24,12 @@
-include("couch_db.hrl").
-record(group_state, {
+ type,
db_name,
init_args,
group,
updater_pid=nil,
+ waiting_commit=false,
waiting_list=[]
}).
@@ -136,24 +138,47 @@ handle_cast(foo, State) ->
{ok, State}.
+handle_info(delayed_commit, #group_state{db_name=DbName,group=Group}=State) ->
+ {ok, Db} = couch_db:open(DbName, []),
+ CommittedSeq = couch_db:get_committed_update_seq(Db),
+ couch_db:close(Db),
+ if CommittedSeq >= Group#group.current_seq ->
+ % save the header
+ Header = {Group#group.sig, get_index_header_data(Group)},
+ ok = couch_file:write_header(Group#group.fd, <<$r, $c, $k, 0>>, Header),
+ {noreply, State#group_state{waiting_commit=false}};
+ true ->
+ % We can't commit the header because the database seq that's fully
+ % committed to disk is still behind us. It we committed now and the
+ % database lost those changes our view could be forever out of sync
+ % with the database. But a crash before we commit these changes, no big
+ % deal, we only lose incremental changes since last committal.
+ erlang:send_after(1000, self(), delayed_commit),
+ {noreply, State#group_state{waiting_commit=true}}
+ end;
+
handle_info({'EXIT', FromPid, {new_group, #group{db=Db}=Group}},
#group_state{db_name=DbName,
updater_pid=UpPid,
- waiting_list=WaitList}=State) when UpPid == FromPid ->
+ waiting_list=WaitList,
+ waiting_commit=WaitingCommit}=State) when UpPid == FromPid ->
ok = couch_db:close(Db),
+
+ if Group#group.type == view andalso not WaitingCommit ->
+ erlang:send_after(1000, self(), delayed_commit);
+ true -> ok
+ end,
case reply_with_group(Group, WaitList, []) of
[] ->
- {noreply, State#group_state{waiting_list=[],
- group=Group#group{db=nil},
- updater_pid=nil}};
+ {noreply, State#group_state{waiting_commit=true, waiting_list=[],
+ group=Group#group{db=nil}, updater_pid=nil}};
StillWaiting ->
% we still have some waiters, reopen the database and reupdate the index
{ok, Db2} = couch_db:open(DbName, []),
Group2 = Group#group{db=Db2},
Pid = spawn_link(fun() -> couch_view_updater:update(Group2) end),
- {noreply, State#group_state{waiting_list=StillWaiting,
- group=Group2,
- updater_pid=Pid}}
+ {noreply, State#group_state{waiting_commit=true,
+ waiting_list=StillWaiting, group=Group2, updater_pid=Pid}}
end;
handle_info({'EXIT', FromPid, reset},
@@ -184,17 +209,13 @@ handle_info({'DOWN',_,_,_,_}, State) ->
{stop, normal, reply_all(State, shutdown)}.
-terminate(Reason, #group_state{group=#group{fd=Fd}}=State) ->
+terminate(Reason, State) ->
reply_all(State, Reason),
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-% error handling? the updater could die on us, we can save ourselves here.
-% but we shouldn't, we could be dead for a reason, like the view got changed, or something.
-
-
%% Local Functions
% reply_with_group/3
@@ -219,14 +240,9 @@ reply_all(#group_state{waiting_list=WaitList}=State, Reply) ->
prepare_group({view, RootDir, DbName, GroupId}, ForceReset)->
case open_db_group(DbName, GroupId) of
- {ok, Db, #group{sig=Sig}=Group0} ->
+ {ok, Db, #group{sig=Sig}=Group} ->
case open_index_file(RootDir, DbName, GroupId) of
- {ok, Fd} ->
- Group = Group0#group{
- commit_fun = fun(GroupIn) ->
- Header = {Sig, get_index_header_data(GroupIn)},
- ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, Header)
- end},
+ {ok, Fd} ->
if ForceReset ->
{ok, reset_file(Db, Fd, DbName, Group)};
true ->
@@ -254,8 +270,8 @@ prepare_group({slow_view, DbName, Fd, Lang, MapSrc, RedSrc}, _ForceReset) ->
btree=nil,
def=MapSrc,
reduce_funs= if RedSrc==[] -> []; true -> [{"_temp", RedSrc}] end},
- {ok, init_group(Db, Fd, #group{name="_temp", db=Db, views=[View],
- def_lang=Lang, commit_fun=fun(_G) -> ok end}, nil)};
+ {ok, init_group(Db, Fd, #group{type=slow_view, name="_temp", db=Db,
+ views=[View], def_lang=Lang}, nil)};
Error ->
Error
end.
diff --git a/src/couchdb/couch_view_updater.erl b/src/couchdb/couch_view_updater.erl
index 0532258c..940602dd 100644
--- a/src/couchdb/couch_view_updater.erl
+++ b/src/couchdb/couch_view_updater.erl
@@ -16,8 +16,7 @@
-include("couch_db.hrl").
-update(#group{db=Db,current_seq=Seq,purge_seq=PurgeSeq,
- commit_fun=CommitFun}=Group) ->
+update(#group{db=Db,current_seq=Seq,purge_seq=PurgeSeq}=Group) ->
?LOG_DEBUG("Starting index update.",[]),
DbPurgeSeq = couch_db:get_purge_seq(Db),
Group2 =
@@ -45,14 +44,9 @@ update(#group{db=Db,current_seq=Seq,purge_seq=PurgeSeq,
UncomputedDocs, Results, ViewKVsToAdd, DocIdViewIdKeys),
couch_query_servers:stop_doc_map(Group4#group.query_server),
NewSeq = couch_db:get_update_seq(Db),
- if Seq /= NewSeq ->
- {ok, Group5} = write_changes(Group4, ViewKVsToAdd2, DocIdViewIdKeys2,
+ {ok, Group5} = write_changes(Group4, ViewKVsToAdd2, DocIdViewIdKeys2,
NewSeq),
- ok = CommitFun(Group5),
- exit({new_group, Group5#group{query_server=nil}});
- true ->
- exit({new_group, Group4#group{query_server=nil}})
- end.
+ exit({new_group, Group5#group{query_server=nil}}).
purge_index(#group{db=Db, views=Views, id_btree=IdBtree}=Group) ->