diff options
Diffstat (limited to 'src/couchdb')
-rw-r--r-- | src/couchdb/couch_view_group.erl | 30 | ||||
-rw-r--r-- | src/couchdb/couch_view_updater.erl | 21 |
2 files changed, 39 insertions, 12 deletions
diff --git a/src/couchdb/couch_view_group.erl b/src/couchdb/couch_view_group.erl index fa4e832e..c4b495b0 100644 --- a/src/couchdb/couch_view_group.erl +++ b/src/couchdb/couch_view_group.erl @@ -89,7 +89,8 @@ init({InitArgs, ReturnPid, Ref}) -> case prepare_group(InitArgs, false) of {ok, #group{db=Db, fd=Fd}=Group} -> couch_db:monitor(Db), - Pid = spawn_link(fun()-> couch_view_updater:update(Group) end), + Owner = self(), + Pid = spawn_link(fun()-> couch_view_updater:update(Owner, Group) end), {ok, RefCounter} = couch_ref_counter:start([Fd]), {ok, #group_state{ db_name=couch_db:name(Db), @@ -127,7 +128,8 @@ handle_call({request_group, RequestSeq}, From, }=State) when RequestSeq > Seq -> {ok, Db} = couch_db:open(DbName, []), Group2 = Group#group{db=Db}, - Pid = spawn_link(fun()-> couch_view_updater:update(Group2) end), + Owner = self(), + Pid = spawn_link(fun()-> couch_view_updater:update(Owner, Group2) end), {noreply, State#group_state{ updater_pid=Pid, @@ -205,7 +207,7 @@ handle_cast({compact_done, NewGroup}, #group_state{ {ok, Db} = couch_db:open(DbName, []), Pid = spawn_link(fun() -> {_,Ref} = erlang:spawn_monitor(fun() -> - couch_view_updater:update(NewGroup#group{db = Db}) + couch_view_updater:update(nil, NewGroup#group{db = Db}) end), receive {'DOWN', Ref, _, _, {new_group, NewGroup2}} -> @@ -214,7 +216,21 @@ handle_cast({compact_done, NewGroup}, #group_state{ gen_server:cast(Pid2, {compact_done, NewGroup2}) end end), - {noreply, State#group_state{compactor_pid = Pid}}. + {noreply, State#group_state{compactor_pid = Pid}}; + +handle_cast({partial_update, NewGroup}, State) -> + #group_state{ + db_name = DbName, + waiting_commit = WaitingCommit + } = State, + NewSeq = NewGroup#group.current_seq, + ?LOG_INFO("checkpointing view update at seq ~p for ~s ~s", [NewSeq, + DbName, NewGroup#group.name]), + if not WaitingCommit -> + erlang:send_after(1000, self(), delayed_commit); + true -> ok + end, + {noreply, State#group_state{group=NewGroup, waiting_commit=true}}. handle_info(delayed_commit, #group_state{db_name=DbName,group=Group}=State) -> {ok, Db} = couch_db:open(DbName, []), @@ -254,7 +270,8 @@ handle_info({'EXIT', FromPid, {new_group, #group{db=Db}=Group}}, % we still have some waiters, reopen the database and reupdate the index {ok, Db2} = couch_db:open(DbName, []), Group2 = Group#group{db=Db2}, - Pid = spawn_link(fun() -> couch_view_updater:update(Group2) end), + Owner = self(), + Pid = spawn_link(fun() -> couch_view_updater:update(Owner, Group2) end), {noreply, State#group_state{waiting_commit=true, waiting_list=StillWaiting, group=Group2, updater_pid=Pid}} end; @@ -267,7 +284,8 @@ handle_info({'EXIT', FromPid, reset}, ok = couch_db:close(Group#group.db), case prepare_group(InitArgs, true) of {ok, ResetGroup} -> - Pid = spawn_link(fun()-> couch_view_updater:update(ResetGroup) end), + Owner = self(), + Pid = spawn_link(fun()-> couch_view_updater:update(Owner, ResetGroup) end), {noreply, State#group_state{ updater_pid=Pid, group=ResetGroup}}; diff --git a/src/couchdb/couch_view_updater.erl b/src/couchdb/couch_view_updater.erl index cfdc7ca8..64c86185 100644 --- a/src/couchdb/couch_view_updater.erl +++ b/src/couchdb/couch_view_updater.erl @@ -12,11 +12,17 @@ -module(couch_view_updater). --export([update/1]). +-export([update/2]). -include("couch_db.hrl"). -update(#group{db=#db{name=DbName}=Db,name=GroupName,current_seq=Seq,purge_seq=PurgeSeq}=Group) -> +update(Owner, Group) -> + #group{ + db = #db{name=DbName} = Db, + name = GroupName, + current_seq = Seq, + purge_seq = PurgeSeq + } = Group, couch_task_status:add_task(<<"View Group Indexer">>, <<DbName/binary," ",GroupName/binary>>, <<"Starting index update">>), DbPurgeSeq = couch_db:get_purge_seq(Db), @@ -43,7 +49,7 @@ update(#group{db=#db{name=DbName}=Db,name=GroupName,current_seq=Seq,purge_seq=Pu fun(DocInfo, _, {ChangesProcessed, Acc}) -> couch_task_status:update("Processed ~p of ~p changes (~p%)", [ChangesProcessed, TotalChanges, (ChangesProcessed*100) div TotalChanges]), - {ok, {ChangesProcessed+1, process_doc(Db, DocInfo, Acc)}} + {ok, {ChangesProcessed+1, process_doc(Db, Owner, DocInfo, Acc)}} end, {0, {[], Group2, ViewEmptyKVs, []}} ), @@ -90,9 +96,9 @@ purge_index(#group{db=Db, views=Views, id_btree=IdBtree}=Group) -> views=Views2, purge_seq=couch_db:get_purge_seq(Db)}. -process_doc(Db, DocInfo, {Docs, #group{sig=Sig,name=GroupId,design_options=DesignOptions}=Group, ViewKVs, - DocIdViewIdKeys}) -> - % This fun computes once for each document +% This fun computes once for each document +process_doc(Db, Owner, DocInfo, {Docs, Group, ViewKVs, DocIdViewIdKeys}) -> + #group{ design_options = DesignOptions } = Group, #doc_info{id=DocId, revs=[#rev_info{deleted=Deleted}|_]} = DocInfo, IncludeDesign = proplists:get_value(<<"include_design">>, @@ -125,6 +131,9 @@ process_doc(Db, DocInfo, {Docs, #group{sig=Sig,name=GroupId,design_options=Desig Results, ViewKVs, DocIdViewIdKeys2), {ok, Group2} = write_changes(Group1, ViewKVs3, DocIdViewIdKeys3, DocInfo#doc_info.high_seq), + if is_pid(Owner) -> + ok = gen_server:cast(Owner, {partial_update, Group2}); + true -> ok end, garbage_collect(), ViewEmptyKeyValues = [{View, []} || View <- Group2#group.views], {[], Group2, ViewEmptyKeyValues, []}; |