summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/couchdb/couch_view_group.erl84
-rw-r--r--src/couchdb/couch_view_updater.erl2
2 files changed, 64 insertions, 22 deletions
diff --git a/src/couchdb/couch_view_group.erl b/src/couchdb/couch_view_group.erl
index 57c3ad21..b74d1dec 100644
--- a/src/couchdb/couch_view_group.erl
+++ b/src/couchdb/couch_view_group.erl
@@ -152,11 +152,15 @@ handle_call(request_group_info, _From, #group_state{
GroupInfo = get_group_info(Group, CompactorPid),
{reply, {ok, GroupInfo}, State}.
-handle_cast({start_compact, CompactFun}, #group_state{ compactor_pid=nil,
- group=Group, init_args={view, RootDir, DbName, GroupId} } = State) ->
- ?LOG_INFO("Starting view group compaction", []),
+handle_cast({start_compact, CompactFun}, #group_state{compactor_pid=nil}
+ = State) ->
+ #group_state{
+ group = #group{name = GroupId, sig = GroupSig} = Group,
+ init_args = {RootDir, DbName, _}
+ } = State,
+ ?LOG_INFO("View index compaction starting for ~s ~s", [DbName, GroupId]),
{ok, Db} = couch_db:open(DbName, []),
- {ok, Fd} = open_index_file(RootDir, DbName, <<GroupId/binary,".compact">>),
+ {ok, Fd} = open_index_file(compact, RootDir, DbName, GroupSig),
NewGroup = reset_file(Db, Fd, DbName, Group),
Pid = spawn_link(fun() -> CompactFun(Group, NewGroup) end),
{noreply, State#group_state{compactor_pid = Pid}};
@@ -164,36 +168,56 @@ handle_cast({start_compact, _}, State) ->
%% compact already running, this is a no-op
{noreply, State};
-handle_cast({compact_done, #group{fd=NewFd, current_seq=NewSeq} = NewGroup},
- #group_state{
- group = #group{current_seq=OldSeq, sig=GroupSig} = Group,
- init_args = {view, RootDir, DbName, _GroupId},
- updater_pid = nil,
- ref_counter = RefCounter
- } = State) when NewSeq >= OldSeq ->
- ?LOG_INFO("View Group compaction complete", []),
+handle_cast({compact_done, #group{current_seq=NewSeq} = NewGroup},
+ #group_state{group = #group{current_seq=OldSeq}} = State)
+ when NewSeq >= OldSeq ->
+ #group_state{
+ group = #group{name=GroupId, fd=OldFd, sig=GroupSig} = Group,
+ init_args = {RootDir, DbName, _},
+ updater_pid = UpdaterPid,
+ ref_counter = RefCounter
+ } = State,
+
+ ?LOG_INFO("View index compaction complete for ~s ~s", [DbName, GroupId]),
FileName = index_file_name(RootDir, DbName, GroupSig),
CompactName = index_file_name(compact, RootDir, DbName, GroupSig),
file:delete(FileName),
ok = file:rename(CompactName, FileName),
+ %% if an updater is running, kill it and start a new one
+ NewUpdaterPid =
+ if is_pid(UpdaterPid) ->
+ unlink(UpdaterPid),
+ exit(UpdaterPid, view_compaction_complete),
+ Owner = self(),
+ spawn_link(fun()-> couch_view_updater:update(Owner, NewGroup) end);
+ true ->
+ nil
+ end,
+
%% cleanup old group
+ unlink(OldFd),
couch_ref_counter:drop(RefCounter),
- {ok, NewRefCounter} = couch_ref_counter:start([NewFd]),
+ {ok, NewRefCounter} = couch_ref_counter:start([NewGroup#group.fd]),
case Group#group.db of
nil -> ok;
Else -> couch_db:close(Else)
end,
- erlang:send_after(1000, self(), delayed_commit),
+ self() ! delayed_commit,
{noreply, State#group_state{
group=NewGroup,
ref_counter=NewRefCounter,
- compactor_pid=nil
+ compactor_pid=nil,
+ updater_pid=NewUpdaterPid
}};
-handle_cast({compact_done, NewGroup}, #group_state{
- init_args={view, _RootDir, DbName, GroupId} } = State) ->
- ?LOG_INFO("View index compaction still behind main file", []),
+handle_cast({compact_done, NewGroup}, State) ->
+ #group_state{
+ group = #group{name = GroupId, current_seq = CurrentSeq},
+ init_args={_RootDir, DbName, _}
+ } = State,
+ ?LOG_INFO("View index compaction still behind for ~s ~s -- current: ~p " ++
+ "compact: ~p", [DbName, GroupId, CurrentSeq, NewGroup#group.current_seq]),
couch_db:close(NewGroup#group.db),
{ok, Db} = couch_db:open(DbName, []),
Pid = spawn_link(fun() ->
@@ -209,7 +233,8 @@ handle_cast({compact_done, NewGroup}, #group_state{
end),
{noreply, State#group_state{compactor_pid = Pid}};
-handle_cast({partial_update, NewGroup}, State) ->
+handle_cast({partial_update, Pid, NewGroup}, #group_state{updater_pid=Pid}
+ = State) ->
#group_state{
db_name = DbName,
waiting_commit = WaitingCommit
@@ -221,7 +246,10 @@ handle_cast({partial_update, NewGroup}, State) ->
erlang:send_after(1000, self(), delayed_commit);
true -> ok
end,
- {noreply, State#group_state{group=NewGroup, waiting_commit=true}}.
+ {noreply, State#group_state{group=NewGroup, waiting_commit=true}};
+handle_cast({partial_update, _, _}, State) ->
+ %% message from an old (probably pre-compaction) updater; ignore
+ {noreply, State}.
handle_info(delayed_commit, #group_state{db_name=DbName,group=Group}=State) ->
{ok, Db} = couch_db:open(DbName, []),
@@ -266,6 +294,9 @@ handle_info({'EXIT', FromPid, {new_group, #group{db=Db}=Group}},
{noreply, State#group_state{waiting_commit=true,
waiting_list=StillWaiting, group=Group2, updater_pid=Pid}}
end;
+handle_info({'EXIT', _, {new_group, _}}, State) ->
+ %% message from an old (probably pre-compaction) updater; ignore
+ {noreply, State};
handle_info({'EXIT', FromPid, reset},
#group_state{
@@ -283,7 +314,10 @@ handle_info({'EXIT', FromPid, reset},
Error ->
{stop, normal, reply_all(State, Error)}
end;
-
+handle_info({'EXIT', _, reset}, State) ->
+ %% message from an old (probably pre-compaction) updater; ignore
+ {noreply, State};
+
handle_info({'EXIT', _FromPid, normal}, State) ->
{noreply, State};
@@ -389,6 +423,14 @@ open_index_file(RootDir, DbName, GroupSig) ->
Error -> Error
end.
+open_index_file(compact, RootDir, DbName, GroupSig) ->
+ FileName = index_file_name(compact, RootDir, DbName, GroupSig),
+ case couch_file:open(FileName) of
+ {ok, Fd} -> {ok, Fd};
+ {error, enoent} -> couch_file:open(FileName, [create]);
+ Error -> Error
+ end.
+
open_temp_group(DbName, Language, DesignOptions, MapSrc, RedSrc) ->
case couch_db:open(DbName, []) of
{ok, Db} ->
diff --git a/src/couchdb/couch_view_updater.erl b/src/couchdb/couch_view_updater.erl
index 64c86185..1a928cb4 100644
--- a/src/couchdb/couch_view_updater.erl
+++ b/src/couchdb/couch_view_updater.erl
@@ -132,7 +132,7 @@ process_doc(Db, Owner, DocInfo, {Docs, Group, ViewKVs, DocIdViewIdKeys}) ->
{ok, Group2} = write_changes(Group1, ViewKVs3, DocIdViewIdKeys3,
DocInfo#doc_info.high_seq),
if is_pid(Owner) ->
- ok = gen_server:cast(Owner, {partial_update, Group2});
+ ok = gen_server:cast(Owner, {partial_update, self(), Group2});
true -> ok end,
garbage_collect(),
ViewEmptyKeyValues = [{View, []} || View <- Group2#group.views],