diff options
author | Adam Kocoloski <kocolosk@apache.org> | 2009-07-22 17:34:14 +0000 |
---|---|---|
committer | Adam Kocoloski <kocolosk@apache.org> | 2009-07-22 17:34:14 +0000 |
commit | 26555564ad89db5407f278dc7e3943c0b0ba9bbc (patch) | |
tree | 224df278379c58f17e15c9627bce6f1f24fda022 /src/couchdb | |
parent | 1ce371e890dcbb2ff5ee2465c51f66a2009246e7 (diff) |
various bugfixes and improvements for view compaction
View compaction was broken after the switch to signature-based index
files, but we don't have tests for it yet so we didn't notice. I also
committed a few changes that should make compaction faster and more
robust:
* commit the header immediately after compaction finishes. We used
to wait 1 seconds, but if the server restarted in that second the
index would be reset.
* unlink from old index file at the end. Prevents process crashes
that could couch_view (and with the delayed commit would sometimes
cause index resets).
* don't wait for running view updates to finish before replacing old
view index file. If an update is running, restart it and point it to
the new view group. This alleviates the situation where the view
compaction goes into a busy wait, printing "still behind main file"
1000s of times to the log, and generally makes compaction finish more
quickly.
* better logging
git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@796805 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src/couchdb')
-rw-r--r-- | src/couchdb/couch_view_group.erl | 84 | ||||
-rw-r--r-- | src/couchdb/couch_view_updater.erl | 2 |
2 files changed, 64 insertions, 22 deletions
diff --git a/src/couchdb/couch_view_group.erl b/src/couchdb/couch_view_group.erl index 57c3ad21..b74d1dec 100644 --- a/src/couchdb/couch_view_group.erl +++ b/src/couchdb/couch_view_group.erl @@ -152,11 +152,15 @@ handle_call(request_group_info, _From, #group_state{ GroupInfo = get_group_info(Group, CompactorPid), {reply, {ok, GroupInfo}, State}. -handle_cast({start_compact, CompactFun}, #group_state{ compactor_pid=nil, - group=Group, init_args={view, RootDir, DbName, GroupId} } = State) -> - ?LOG_INFO("Starting view group compaction", []), +handle_cast({start_compact, CompactFun}, #group_state{compactor_pid=nil} + = State) -> + #group_state{ + group = #group{name = GroupId, sig = GroupSig} = Group, + init_args = {RootDir, DbName, _} + } = State, + ?LOG_INFO("View index compaction starting for ~s ~s", [DbName, GroupId]), {ok, Db} = couch_db:open(DbName, []), - {ok, Fd} = open_index_file(RootDir, DbName, <<GroupId/binary,".compact">>), + {ok, Fd} = open_index_file(compact, RootDir, DbName, GroupSig), NewGroup = reset_file(Db, Fd, DbName, Group), Pid = spawn_link(fun() -> CompactFun(Group, NewGroup) end), {noreply, State#group_state{compactor_pid = Pid}}; @@ -164,36 +168,56 @@ handle_cast({start_compact, _}, State) -> %% compact already running, this is a no-op {noreply, State}; -handle_cast({compact_done, #group{fd=NewFd, current_seq=NewSeq} = NewGroup}, - #group_state{ - group = #group{current_seq=OldSeq, sig=GroupSig} = Group, - init_args = {view, RootDir, DbName, _GroupId}, - updater_pid = nil, - ref_counter = RefCounter - } = State) when NewSeq >= OldSeq -> - ?LOG_INFO("View Group compaction complete", []), +handle_cast({compact_done, #group{current_seq=NewSeq} = NewGroup}, + #group_state{group = #group{current_seq=OldSeq}} = State) + when NewSeq >= OldSeq -> + #group_state{ + group = #group{name=GroupId, fd=OldFd, sig=GroupSig} = Group, + init_args = {RootDir, DbName, _}, + updater_pid = UpdaterPid, + ref_counter = RefCounter + } = State, + + ?LOG_INFO("View index compaction complete for ~s ~s", [DbName, GroupId]), FileName = index_file_name(RootDir, DbName, GroupSig), CompactName = index_file_name(compact, RootDir, DbName, GroupSig), file:delete(FileName), ok = file:rename(CompactName, FileName), + %% if an updater is running, kill it and start a new one + NewUpdaterPid = + if is_pid(UpdaterPid) -> + unlink(UpdaterPid), + exit(UpdaterPid, view_compaction_complete), + Owner = self(), + spawn_link(fun()-> couch_view_updater:update(Owner, NewGroup) end); + true -> + nil + end, + %% cleanup old group + unlink(OldFd), couch_ref_counter:drop(RefCounter), - {ok, NewRefCounter} = couch_ref_counter:start([NewFd]), + {ok, NewRefCounter} = couch_ref_counter:start([NewGroup#group.fd]), case Group#group.db of nil -> ok; Else -> couch_db:close(Else) end, - erlang:send_after(1000, self(), delayed_commit), + self() ! delayed_commit, {noreply, State#group_state{ group=NewGroup, ref_counter=NewRefCounter, - compactor_pid=nil + compactor_pid=nil, + updater_pid=NewUpdaterPid }}; -handle_cast({compact_done, NewGroup}, #group_state{ - init_args={view, _RootDir, DbName, GroupId} } = State) -> - ?LOG_INFO("View index compaction still behind main file", []), +handle_cast({compact_done, NewGroup}, State) -> + #group_state{ + group = #group{name = GroupId, current_seq = CurrentSeq}, + init_args={_RootDir, DbName, _} + } = State, + ?LOG_INFO("View index compaction still behind for ~s ~s -- current: ~p " ++ + "compact: ~p", [DbName, GroupId, CurrentSeq, NewGroup#group.current_seq]), couch_db:close(NewGroup#group.db), {ok, Db} = couch_db:open(DbName, []), Pid = spawn_link(fun() -> @@ -209,7 +233,8 @@ handle_cast({compact_done, NewGroup}, #group_state{ end), {noreply, State#group_state{compactor_pid = Pid}}; -handle_cast({partial_update, NewGroup}, State) -> +handle_cast({partial_update, Pid, NewGroup}, #group_state{updater_pid=Pid} + = State) -> #group_state{ db_name = DbName, waiting_commit = WaitingCommit @@ -221,7 +246,10 @@ handle_cast({partial_update, NewGroup}, State) -> erlang:send_after(1000, self(), delayed_commit); true -> ok end, - {noreply, State#group_state{group=NewGroup, waiting_commit=true}}. + {noreply, State#group_state{group=NewGroup, waiting_commit=true}}; +handle_cast({partial_update, _, _}, State) -> + %% message from an old (probably pre-compaction) updater; ignore + {noreply, State}. handle_info(delayed_commit, #group_state{db_name=DbName,group=Group}=State) -> {ok, Db} = couch_db:open(DbName, []), @@ -266,6 +294,9 @@ handle_info({'EXIT', FromPid, {new_group, #group{db=Db}=Group}}, {noreply, State#group_state{waiting_commit=true, waiting_list=StillWaiting, group=Group2, updater_pid=Pid}} end; +handle_info({'EXIT', _, {new_group, _}}, State) -> + %% message from an old (probably pre-compaction) updater; ignore + {noreply, State}; handle_info({'EXIT', FromPid, reset}, #group_state{ @@ -283,7 +314,10 @@ handle_info({'EXIT', FromPid, reset}, Error -> {stop, normal, reply_all(State, Error)} end; - +handle_info({'EXIT', _, reset}, State) -> + %% message from an old (probably pre-compaction) updater; ignore + {noreply, State}; + handle_info({'EXIT', _FromPid, normal}, State) -> {noreply, State}; @@ -389,6 +423,14 @@ open_index_file(RootDir, DbName, GroupSig) -> Error -> Error end. +open_index_file(compact, RootDir, DbName, GroupSig) -> + FileName = index_file_name(compact, RootDir, DbName, GroupSig), + case couch_file:open(FileName) of + {ok, Fd} -> {ok, Fd}; + {error, enoent} -> couch_file:open(FileName, [create]); + Error -> Error + end. + open_temp_group(DbName, Language, DesignOptions, MapSrc, RedSrc) -> case couch_db:open(DbName, []) of {ok, Db} -> diff --git a/src/couchdb/couch_view_updater.erl b/src/couchdb/couch_view_updater.erl index 64c86185..1a928cb4 100644 --- a/src/couchdb/couch_view_updater.erl +++ b/src/couchdb/couch_view_updater.erl @@ -132,7 +132,7 @@ process_doc(Db, Owner, DocInfo, {Docs, Group, ViewKVs, DocIdViewIdKeys}) -> {ok, Group2} = write_changes(Group1, ViewKVs3, DocIdViewIdKeys3, DocInfo#doc_info.high_seq), if is_pid(Owner) -> - ok = gen_server:cast(Owner, {partial_update, Group2}); + ok = gen_server:cast(Owner, {partial_update, self(), Group2}); true -> ok end, garbage_collect(), ViewEmptyKeyValues = [{View, []} || View <- Group2#group.views], |