summaryrefslogtreecommitdiff
path: root/src/couchdb
diff options
context:
space:
mode:
Diffstat (limited to 'src/couchdb')
-rw-r--r--src/couchdb/couch_db.erl6
-rw-r--r--src/couchdb/couch_db.hrl6
-rw-r--r--src/couchdb/couch_db_updater.erl16
-rw-r--r--src/couchdb/couch_view_group.erl58
-rw-r--r--src/couchdb/couch_view_updater.erl12
5 files changed, 56 insertions, 42 deletions
diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl
index 7c86eeae..c9546240 100644
--- a/src/couchdb/couch_db.erl
+++ b/src/couchdb/couch_db.erl
@@ -17,7 +17,7 @@
-export([open_ref_counted/2,num_refs/1,monitor/1]).
-export([update_doc/3,update_docs/4,update_docs/2,update_docs/3,delete_doc/3]).
-export([get_doc_info/2,open_doc/2,open_doc/3,open_doc_revs/4]).
--export([get_missing_revs/2,name/1,doc_to_tree/1,get_update_seq/1]).
+-export([get_missing_revs/2,name/1,doc_to_tree/1,get_update_seq/1,get_committed_update_seq/1]).
-export([enum_docs/4,enum_docs/5,enum_docs_since/4,enum_docs_since/5]).
-export([enum_docs_since_reduce_to_count/1,enum_docs_reduce_to_count/1]).
-export([increment_update_seq/1,get_purge_seq/1,purge_docs/2,get_last_purged/1]).
@@ -147,8 +147,10 @@ increment_update_seq(#db{update_pid=UpdatePid}) ->
purge_docs(#db{update_pid=UpdatePid}, IdsRevs) ->
gen_server:call(UpdatePid, {purge_docs, IdsRevs}).
+get_committed_update_seq(#db{header=#db_header{update_seq=Seq}}) ->
+ Seq.
-get_update_seq(#db{header=#db_header{update_seq=Seq}})->
+get_update_seq(#db{update_seq=Seq})->
Seq.
get_purge_seq(#db{header=#db_header{purge_seq=PurgeSeq}})->
diff --git a/src/couchdb/couch_db.hrl b/src/couchdb/couch_db.hrl
index 5dbbcee8..830f2f9a 100644
--- a/src/couchdb/couch_db.hrl
+++ b/src/couchdb/couch_db.hrl
@@ -154,7 +154,8 @@
}).
-record(group,
- {sig=nil,
+ {type=view, % can also be slow_view
+ sig=nil,
db=nil,
fd=nil,
name,
@@ -162,10 +163,9 @@
views,
id_btree=nil,
current_seq=0,
- committed_seq=0,
purge_seq=0,
query_server=nil,
- commit_fun
+ waiting_delayed_commit=nil
}).
-record(view,
diff --git a/src/couchdb/couch_db_updater.erl b/src/couchdb/couch_db_updater.erl
index d02464ca..049a354e 100644
--- a/src/couchdb/couch_db_updater.erl
+++ b/src/couchdb/couch_db_updater.erl
@@ -503,19 +503,21 @@ commit_data(Db) ->
commit_data(#db{fd=Fd, header=Header} = Db, Delay) ->
- Db2 = Db#db{header = Header#db_header{
+ Header2 = Header#db_header{
update_seq = Db#db.update_seq,
summary_stream_state = couch_stream:get_state(Db#db.summary_stream),
docinfo_by_seq_btree_state = couch_btree:get_state(Db#db.docinfo_by_seq_btree),
fulldocinfo_by_id_btree_state = couch_btree:get_state(Db#db.fulldocinfo_by_id_btree),
local_docs_btree_state = couch_btree:get_state(Db#db.local_docs_btree),
admins_ptr = Db#db.admins_ptr
- }},
- if Delay and (Db#db.waiting_delayed_commit == nil) ->
- Db2#db{waiting_delayed_commit=
+ },
+ if Header == Header2 ->
+ Db;
+ Delay and (Db#db.waiting_delayed_commit == nil) ->
+ Db#db{waiting_delayed_commit=
erlang:send_after(1000, self(), delayed_commit)};
Delay ->
- Db2;
+ Db;
true ->
if Db#db.waiting_delayed_commit /= nil ->
case erlang:cancel_timer(Db#db.waiting_delayed_commit) of
@@ -524,8 +526,8 @@ commit_data(#db{fd=Fd, header=Header} = Db, Delay) ->
end;
true -> ok
end,
- ok = couch_file:write_header(Fd, ?HEADER_SIG, Db2#db.header),
- Db2#db{waiting_delayed_commit=nil}
+ ok = couch_file:write_header(Fd, ?HEADER_SIG, Header2),
+ Db#db{waiting_delayed_commit=nil,header=Header2}
end.
copy_raw_doc(SrcFd, SrcSp, DestFd, DestStream) ->
diff --git a/src/couchdb/couch_view_group.erl b/src/couchdb/couch_view_group.erl
index 69e90633..45e92dce 100644
--- a/src/couchdb/couch_view_group.erl
+++ b/src/couchdb/couch_view_group.erl
@@ -24,10 +24,12 @@
-include("couch_db.hrl").
-record(group_state, {
+ type,
db_name,
init_args,
group,
updater_pid=nil,
+ waiting_commit=false,
waiting_list=[]
}).
@@ -136,24 +138,47 @@ handle_cast(foo, State) ->
{ok, State}.
+handle_info(delayed_commit, #group_state{db_name=DbName,group=Group}=State) ->
+ {ok, Db} = couch_db:open(DbName, []),
+ CommittedSeq = couch_db:get_committed_update_seq(Db),
+ couch_db:close(Db),
+ if CommittedSeq >= Group#group.current_seq ->
+ % save the header
+ Header = {Group#group.sig, get_index_header_data(Group)},
+ ok = couch_file:write_header(Group#group.fd, <<$r, $c, $k, 0>>, Header),
+ {noreply, State#group_state{waiting_commit=false}};
+ true ->
+ % We can't commit the header because the database seq that's fully
+ % committed to disk is still behind us. It we committed now and the
+ % database lost those changes our view could be forever out of sync
+ % with the database. But a crash before we commit these changes, no big
+ % deal, we only lose incremental changes since last committal.
+ erlang:send_after(1000, self(), delayed_commit),
+ {noreply, State#group_state{waiting_commit=true}}
+ end;
+
handle_info({'EXIT', FromPid, {new_group, #group{db=Db}=Group}},
#group_state{db_name=DbName,
updater_pid=UpPid,
- waiting_list=WaitList}=State) when UpPid == FromPid ->
+ waiting_list=WaitList,
+ waiting_commit=WaitingCommit}=State) when UpPid == FromPid ->
ok = couch_db:close(Db),
+
+ if Group#group.type == view andalso not WaitingCommit ->
+ erlang:send_after(1000, self(), delayed_commit);
+ true -> ok
+ end,
case reply_with_group(Group, WaitList, []) of
[] ->
- {noreply, State#group_state{waiting_list=[],
- group=Group#group{db=nil},
- updater_pid=nil}};
+ {noreply, State#group_state{waiting_commit=true, waiting_list=[],
+ group=Group#group{db=nil}, updater_pid=nil}};
StillWaiting ->
% we still have some waiters, reopen the database and reupdate the index
{ok, Db2} = couch_db:open(DbName, []),
Group2 = Group#group{db=Db2},
Pid = spawn_link(fun() -> couch_view_updater:update(Group2) end),
- {noreply, State#group_state{waiting_list=StillWaiting,
- group=Group2,
- updater_pid=Pid}}
+ {noreply, State#group_state{waiting_commit=true,
+ waiting_list=StillWaiting, group=Group2, updater_pid=Pid}}
end;
handle_info({'EXIT', FromPid, reset},
@@ -184,17 +209,13 @@ handle_info({'DOWN',_,_,_,_}, State) ->
{stop, normal, reply_all(State, shutdown)}.
-terminate(Reason, #group_state{group=#group{fd=Fd}}=State) ->
+terminate(Reason, State) ->
reply_all(State, Reason),
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-% error handling? the updater could die on us, we can save ourselves here.
-% but we shouldn't, we could be dead for a reason, like the view got changed, or something.
-
-
%% Local Functions
% reply_with_group/3
@@ -219,14 +240,9 @@ reply_all(#group_state{waiting_list=WaitList}=State, Reply) ->
prepare_group({view, RootDir, DbName, GroupId}, ForceReset)->
case open_db_group(DbName, GroupId) of
- {ok, Db, #group{sig=Sig}=Group0} ->
+ {ok, Db, #group{sig=Sig}=Group} ->
case open_index_file(RootDir, DbName, GroupId) of
- {ok, Fd} ->
- Group = Group0#group{
- commit_fun = fun(GroupIn) ->
- Header = {Sig, get_index_header_data(GroupIn)},
- ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, Header)
- end},
+ {ok, Fd} ->
if ForceReset ->
{ok, reset_file(Db, Fd, DbName, Group)};
true ->
@@ -254,8 +270,8 @@ prepare_group({slow_view, DbName, Fd, Lang, MapSrc, RedSrc}, _ForceReset) ->
btree=nil,
def=MapSrc,
reduce_funs= if RedSrc==[] -> []; true -> [{"_temp", RedSrc}] end},
- {ok, init_group(Db, Fd, #group{name="_temp", db=Db, views=[View],
- def_lang=Lang, commit_fun=fun(_G) -> ok end}, nil)};
+ {ok, init_group(Db, Fd, #group{type=slow_view, name="_temp", db=Db,
+ views=[View], def_lang=Lang}, nil)};
Error ->
Error
end.
diff --git a/src/couchdb/couch_view_updater.erl b/src/couchdb/couch_view_updater.erl
index 0532258c..940602dd 100644
--- a/src/couchdb/couch_view_updater.erl
+++ b/src/couchdb/couch_view_updater.erl
@@ -16,8 +16,7 @@
-include("couch_db.hrl").
-update(#group{db=Db,current_seq=Seq,purge_seq=PurgeSeq,
- commit_fun=CommitFun}=Group) ->
+update(#group{db=Db,current_seq=Seq,purge_seq=PurgeSeq}=Group) ->
?LOG_DEBUG("Starting index update.",[]),
DbPurgeSeq = couch_db:get_purge_seq(Db),
Group2 =
@@ -45,14 +44,9 @@ update(#group{db=Db,current_seq=Seq,purge_seq=PurgeSeq,
UncomputedDocs, Results, ViewKVsToAdd, DocIdViewIdKeys),
couch_query_servers:stop_doc_map(Group4#group.query_server),
NewSeq = couch_db:get_update_seq(Db),
- if Seq /= NewSeq ->
- {ok, Group5} = write_changes(Group4, ViewKVsToAdd2, DocIdViewIdKeys2,
+ {ok, Group5} = write_changes(Group4, ViewKVsToAdd2, DocIdViewIdKeys2,
NewSeq),
- ok = CommitFun(Group5),
- exit({new_group, Group5#group{query_server=nil}});
- true ->
- exit({new_group, Group4#group{query_server=nil}})
- end.
+ exit({new_group, Group5#group{query_server=nil}}).
purge_index(#group{db=Db, views=Views, id_btree=IdBtree}=Group) ->