summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/couchdb/couch_view_group.erl30
-rw-r--r--src/couchdb/couch_view_updater.erl21
2 files changed, 39 insertions, 12 deletions
diff --git a/src/couchdb/couch_view_group.erl b/src/couchdb/couch_view_group.erl
index fa4e832e..c4b495b0 100644
--- a/src/couchdb/couch_view_group.erl
+++ b/src/couchdb/couch_view_group.erl
@@ -89,7 +89,8 @@ init({InitArgs, ReturnPid, Ref}) ->
case prepare_group(InitArgs, false) of
{ok, #group{db=Db, fd=Fd}=Group} ->
couch_db:monitor(Db),
- Pid = spawn_link(fun()-> couch_view_updater:update(Group) end),
+ Owner = self(),
+ Pid = spawn_link(fun()-> couch_view_updater:update(Owner, Group) end),
{ok, RefCounter} = couch_ref_counter:start([Fd]),
{ok, #group_state{
db_name=couch_db:name(Db),
@@ -127,7 +128,8 @@ handle_call({request_group, RequestSeq}, From,
}=State) when RequestSeq > Seq ->
{ok, Db} = couch_db:open(DbName, []),
Group2 = Group#group{db=Db},
- Pid = spawn_link(fun()-> couch_view_updater:update(Group2) end),
+ Owner = self(),
+ Pid = spawn_link(fun()-> couch_view_updater:update(Owner, Group2) end),
{noreply, State#group_state{
updater_pid=Pid,
@@ -205,7 +207,7 @@ handle_cast({compact_done, NewGroup}, #group_state{
{ok, Db} = couch_db:open(DbName, []),
Pid = spawn_link(fun() ->
{_,Ref} = erlang:spawn_monitor(fun() ->
- couch_view_updater:update(NewGroup#group{db = Db})
+ couch_view_updater:update(nil, NewGroup#group{db = Db})
end),
receive
{'DOWN', Ref, _, _, {new_group, NewGroup2}} ->
@@ -214,7 +216,21 @@ handle_cast({compact_done, NewGroup}, #group_state{
gen_server:cast(Pid2, {compact_done, NewGroup2})
end
end),
- {noreply, State#group_state{compactor_pid = Pid}}.
+ {noreply, State#group_state{compactor_pid = Pid}};
+
+handle_cast({partial_update, NewGroup}, State) ->
+ #group_state{
+ db_name = DbName,
+ waiting_commit = WaitingCommit
+ } = State,
+ NewSeq = NewGroup#group.current_seq,
+ ?LOG_INFO("checkpointing view update at seq ~p for ~s ~s", [NewSeq,
+ DbName, NewGroup#group.name]),
+ if not WaitingCommit ->
+ erlang:send_after(1000, self(), delayed_commit);
+ true -> ok
+ end,
+ {noreply, State#group_state{group=NewGroup, waiting_commit=true}}.
handle_info(delayed_commit, #group_state{db_name=DbName,group=Group}=State) ->
{ok, Db} = couch_db:open(DbName, []),
@@ -254,7 +270,8 @@ handle_info({'EXIT', FromPid, {new_group, #group{db=Db}=Group}},
% we still have some waiters, reopen the database and reupdate the index
{ok, Db2} = couch_db:open(DbName, []),
Group2 = Group#group{db=Db2},
- Pid = spawn_link(fun() -> couch_view_updater:update(Group2) end),
+ Owner = self(),
+ Pid = spawn_link(fun() -> couch_view_updater:update(Owner, Group2) end),
{noreply, State#group_state{waiting_commit=true,
waiting_list=StillWaiting, group=Group2, updater_pid=Pid}}
end;
@@ -267,7 +284,8 @@ handle_info({'EXIT', FromPid, reset},
ok = couch_db:close(Group#group.db),
case prepare_group(InitArgs, true) of
{ok, ResetGroup} ->
- Pid = spawn_link(fun()-> couch_view_updater:update(ResetGroup) end),
+ Owner = self(),
+ Pid = spawn_link(fun()-> couch_view_updater:update(Owner, ResetGroup) end),
{noreply, State#group_state{
updater_pid=Pid,
group=ResetGroup}};
diff --git a/src/couchdb/couch_view_updater.erl b/src/couchdb/couch_view_updater.erl
index cfdc7ca8..64c86185 100644
--- a/src/couchdb/couch_view_updater.erl
+++ b/src/couchdb/couch_view_updater.erl
@@ -12,11 +12,17 @@
-module(couch_view_updater).
--export([update/1]).
+-export([update/2]).
-include("couch_db.hrl").
-update(#group{db=#db{name=DbName}=Db,name=GroupName,current_seq=Seq,purge_seq=PurgeSeq}=Group) ->
+update(Owner, Group) ->
+ #group{
+ db = #db{name=DbName} = Db,
+ name = GroupName,
+ current_seq = Seq,
+ purge_seq = PurgeSeq
+ } = Group,
couch_task_status:add_task(<<"View Group Indexer">>, <<DbName/binary," ",GroupName/binary>>, <<"Starting index update">>),
DbPurgeSeq = couch_db:get_purge_seq(Db),
@@ -43,7 +49,7 @@ update(#group{db=#db{name=DbName}=Db,name=GroupName,current_seq=Seq,purge_seq=Pu
fun(DocInfo, _, {ChangesProcessed, Acc}) ->
couch_task_status:update("Processed ~p of ~p changes (~p%)",
[ChangesProcessed, TotalChanges, (ChangesProcessed*100) div TotalChanges]),
- {ok, {ChangesProcessed+1, process_doc(Db, DocInfo, Acc)}}
+ {ok, {ChangesProcessed+1, process_doc(Db, Owner, DocInfo, Acc)}}
end,
{0, {[], Group2, ViewEmptyKVs, []}}
),
@@ -90,9 +96,9 @@ purge_index(#group{db=Db, views=Views, id_btree=IdBtree}=Group) ->
views=Views2,
purge_seq=couch_db:get_purge_seq(Db)}.
-process_doc(Db, DocInfo, {Docs, #group{sig=Sig,name=GroupId,design_options=DesignOptions}=Group, ViewKVs,
- DocIdViewIdKeys}) ->
- % This fun computes once for each document
+% This fun computes once for each document
+process_doc(Db, Owner, DocInfo, {Docs, Group, ViewKVs, DocIdViewIdKeys}) ->
+ #group{ design_options = DesignOptions } = Group,
#doc_info{id=DocId, revs=[#rev_info{deleted=Deleted}|_]} = DocInfo,
IncludeDesign = proplists:get_value(<<"include_design">>,
@@ -125,6 +131,9 @@ process_doc(Db, DocInfo, {Docs, #group{sig=Sig,name=GroupId,design_options=Desig
Results, ViewKVs, DocIdViewIdKeys2),
{ok, Group2} = write_changes(Group1, ViewKVs3, DocIdViewIdKeys3,
DocInfo#doc_info.high_seq),
+ if is_pid(Owner) ->
+ ok = gen_server:cast(Owner, {partial_update, Group2});
+ true -> ok end,
garbage_collect(),
ViewEmptyKeyValues = [{View, []} || View <- Group2#group.views],
{[], Group2, ViewEmptyKeyValues, []};