diff options
-rw-r--r-- | src/couchdb/couch_view_group.erl | 84 | ||||
-rw-r--r-- | src/couchdb/couch_view_updater.erl | 2 |
2 files changed, 64 insertions, 22 deletions
diff --git a/src/couchdb/couch_view_group.erl b/src/couchdb/couch_view_group.erl index 57c3ad21..b74d1dec 100644 --- a/src/couchdb/couch_view_group.erl +++ b/src/couchdb/couch_view_group.erl @@ -152,11 +152,15 @@ handle_call(request_group_info, _From, #group_state{ GroupInfo = get_group_info(Group, CompactorPid), {reply, {ok, GroupInfo}, State}. -handle_cast({start_compact, CompactFun}, #group_state{ compactor_pid=nil, - group=Group, init_args={view, RootDir, DbName, GroupId} } = State) -> - ?LOG_INFO("Starting view group compaction", []), +handle_cast({start_compact, CompactFun}, #group_state{compactor_pid=nil} + = State) -> + #group_state{ + group = #group{name = GroupId, sig = GroupSig} = Group, + init_args = {RootDir, DbName, _} + } = State, + ?LOG_INFO("View index compaction starting for ~s ~s", [DbName, GroupId]), {ok, Db} = couch_db:open(DbName, []), - {ok, Fd} = open_index_file(RootDir, DbName, <<GroupId/binary,".compact">>), + {ok, Fd} = open_index_file(compact, RootDir, DbName, GroupSig), NewGroup = reset_file(Db, Fd, DbName, Group), Pid = spawn_link(fun() -> CompactFun(Group, NewGroup) end), {noreply, State#group_state{compactor_pid = Pid}}; @@ -164,36 +168,56 @@ handle_cast({start_compact, _}, State) -> %% compact already running, this is a no-op {noreply, State}; -handle_cast({compact_done, #group{fd=NewFd, current_seq=NewSeq} = NewGroup}, - #group_state{ - group = #group{current_seq=OldSeq, sig=GroupSig} = Group, - init_args = {view, RootDir, DbName, _GroupId}, - updater_pid = nil, - ref_counter = RefCounter - } = State) when NewSeq >= OldSeq -> - ?LOG_INFO("View Group compaction complete", []), +handle_cast({compact_done, #group{current_seq=NewSeq} = NewGroup}, + #group_state{group = #group{current_seq=OldSeq}} = State) + when NewSeq >= OldSeq -> + #group_state{ + group = #group{name=GroupId, fd=OldFd, sig=GroupSig} = Group, + init_args = {RootDir, DbName, _}, + updater_pid = UpdaterPid, + ref_counter = RefCounter + } = State, + + ?LOG_INFO("View index compaction complete for ~s ~s", [DbName, GroupId]), FileName = index_file_name(RootDir, DbName, GroupSig), CompactName = index_file_name(compact, RootDir, DbName, GroupSig), file:delete(FileName), ok = file:rename(CompactName, FileName), + %% if an updater is running, kill it and start a new one + NewUpdaterPid = + if is_pid(UpdaterPid) -> + unlink(UpdaterPid), + exit(UpdaterPid, view_compaction_complete), + Owner = self(), + spawn_link(fun()-> couch_view_updater:update(Owner, NewGroup) end); + true -> + nil + end, + %% cleanup old group + unlink(OldFd), couch_ref_counter:drop(RefCounter), - {ok, NewRefCounter} = couch_ref_counter:start([NewFd]), + {ok, NewRefCounter} = couch_ref_counter:start([NewGroup#group.fd]), case Group#group.db of nil -> ok; Else -> couch_db:close(Else) end, - erlang:send_after(1000, self(), delayed_commit), + self() ! delayed_commit, {noreply, State#group_state{ group=NewGroup, ref_counter=NewRefCounter, - compactor_pid=nil + compactor_pid=nil, + updater_pid=NewUpdaterPid }}; -handle_cast({compact_done, NewGroup}, #group_state{ - init_args={view, _RootDir, DbName, GroupId} } = State) -> - ?LOG_INFO("View index compaction still behind main file", []), +handle_cast({compact_done, NewGroup}, State) -> + #group_state{ + group = #group{name = GroupId, current_seq = CurrentSeq}, + init_args={_RootDir, DbName, _} + } = State, + ?LOG_INFO("View index compaction still behind for ~s ~s -- current: ~p " ++ + "compact: ~p", [DbName, GroupId, CurrentSeq, NewGroup#group.current_seq]), couch_db:close(NewGroup#group.db), {ok, Db} = couch_db:open(DbName, []), Pid = spawn_link(fun() -> @@ -209,7 +233,8 @@ handle_cast({compact_done, NewGroup}, #group_state{ end), {noreply, State#group_state{compactor_pid = Pid}}; -handle_cast({partial_update, NewGroup}, State) -> +handle_cast({partial_update, Pid, NewGroup}, #group_state{updater_pid=Pid} + = State) -> #group_state{ db_name = DbName, waiting_commit = WaitingCommit @@ -221,7 +246,10 @@ handle_cast({partial_update, NewGroup}, State) -> erlang:send_after(1000, self(), delayed_commit); true -> ok end, - {noreply, State#group_state{group=NewGroup, waiting_commit=true}}. + {noreply, State#group_state{group=NewGroup, waiting_commit=true}}; +handle_cast({partial_update, _, _}, State) -> + %% message from an old (probably pre-compaction) updater; ignore + {noreply, State}. handle_info(delayed_commit, #group_state{db_name=DbName,group=Group}=State) -> {ok, Db} = couch_db:open(DbName, []), @@ -266,6 +294,9 @@ handle_info({'EXIT', FromPid, {new_group, #group{db=Db}=Group}}, {noreply, State#group_state{waiting_commit=true, waiting_list=StillWaiting, group=Group2, updater_pid=Pid}} end; +handle_info({'EXIT', _, {new_group, _}}, State) -> + %% message from an old (probably pre-compaction) updater; ignore + {noreply, State}; handle_info({'EXIT', FromPid, reset}, #group_state{ @@ -283,7 +314,10 @@ handle_info({'EXIT', FromPid, reset}, Error -> {stop, normal, reply_all(State, Error)} end; - +handle_info({'EXIT', _, reset}, State) -> + %% message from an old (probably pre-compaction) updater; ignore + {noreply, State}; + handle_info({'EXIT', _FromPid, normal}, State) -> {noreply, State}; @@ -389,6 +423,14 @@ open_index_file(RootDir, DbName, GroupSig) -> Error -> Error end. +open_index_file(compact, RootDir, DbName, GroupSig) -> + FileName = index_file_name(compact, RootDir, DbName, GroupSig), + case couch_file:open(FileName) of + {ok, Fd} -> {ok, Fd}; + {error, enoent} -> couch_file:open(FileName, [create]); + Error -> Error + end. + open_temp_group(DbName, Language, DesignOptions, MapSrc, RedSrc) -> case couch_db:open(DbName, []) of {ok, Db} -> diff --git a/src/couchdb/couch_view_updater.erl b/src/couchdb/couch_view_updater.erl index 64c86185..1a928cb4 100644 --- a/src/couchdb/couch_view_updater.erl +++ b/src/couchdb/couch_view_updater.erl @@ -132,7 +132,7 @@ process_doc(Db, Owner, DocInfo, {Docs, Group, ViewKVs, DocIdViewIdKeys}) -> {ok, Group2} = write_changes(Group1, ViewKVs3, DocIdViewIdKeys3, DocInfo#doc_info.high_seq), if is_pid(Owner) -> - ok = gen_server:cast(Owner, {partial_update, Group2}); + ok = gen_server:cast(Owner, {partial_update, self(), Group2}); true -> ok end, garbage_collect(), ViewEmptyKeyValues = [{View, []} || View <- Group2#group.views], |