summaryrefslogtreecommitdiff
path: root/apps/couch/src/couch_view_group.erl
diff options
context:
space:
mode:
authorAdam Kocoloski <adam@cloudant.com>2010-08-18 11:51:03 -0400
committerAdam Kocoloski <adam@cloudant.com>2010-08-18 14:24:57 -0400
commit7393d62b7b630bee50f609d0ae8125d33f7cda2b (patch)
tree754e9ab17a586319c562de488e60056feff60bb8 /apps/couch/src/couch_view_group.erl
parentc0cb2625f25a2b51485c164bea1d8822f449ce14 (diff)
Grab bag of Cloudant patches to couch OTP application
- Removal of couch_db and couch_ref_counter processes. Active DBs are accessible through a protected ets table owned by couch_server. - #full_doc_info{} in by_id and by_seq trees for faster compaction at the expense of more disk usage afterwards. Proposed as COUCHDB-738 but not accepted upstream. - Replication via distributed Erlang. - Better hot upgrade support (uses exported functions much more often). - Configurable btree chunk sizes allow for larger (but still bounded) reductions. - Shorter names for btree fields in #db{} and #db_header{}. - couch_view_group does not keep a reference to the #db{}. - Terms are stored compressed (again).
Diffstat (limited to 'apps/couch/src/couch_view_group.erl')
-rw-r--r--apps/couch/src/couch_view_group.erl197
1 files changed, 93 insertions, 104 deletions
diff --git a/apps/couch/src/couch_view_group.erl b/apps/couch/src/couch_view_group.erl
index f01befdf..f11bb54d 100644
--- a/apps/couch/src/couch_view_group.erl
+++ b/apps/couch/src/couch_view_group.erl
@@ -39,8 +39,7 @@
request_group(Pid, Seq) ->
?LOG_DEBUG("request_group {Pid, Seq} ~p", [{Pid, Seq}]),
case gen_server:call(Pid, {request_group, Seq}, infinity) of
- {ok, Group, RefCounter} ->
- couch_ref_counter:add(RefCounter),
+ {ok, Group, _RefCounter} ->
{ok, Group};
Error ->
?LOG_DEBUG("request_group Error ~p", [Error]),
@@ -75,27 +74,26 @@ start_link(InitArgs) ->
end.
% init creates a closure which spawns the appropriate view_updater.
-init({InitArgs, ReturnPid, Ref}) ->
+init({{_, DbName, _}=InitArgs, ReturnPid, Ref}) ->
process_flag(trap_exit, true),
case prepare_group(InitArgs, false) of
- {ok, #group{db=Db, fd=Fd, current_seq=Seq}=Group} ->
+ {ok, #group{fd=Fd, current_seq=Seq}=Group} ->
+ {ok, Db} = couch_db:open(DbName, []),
case Seq > couch_db:get_update_seq(Db) of
true ->
ReturnPid ! {Ref, self(), {error, invalid_view_seq}},
+ couch_db:close(Db),
ignore;
_ ->
- couch_db:monitor(Db),
+ try couch_db:monitor(Db) after couch_db:close(Db) end,
Owner = self(),
- Pid = spawn_link(
- fun()-> couch_view_updater:update(Owner, Group) end
- ),
- {ok, RefCounter} = couch_ref_counter:start([Fd]),
+ Pid = spawn_link(fun()-> couch_view_updater:update(Owner, Group) end),
{ok, #group_state{
- db_name=couch_db:name(Db),
+ db_name= DbName,
init_args=InitArgs,
updater_pid = Pid,
- group=Group,
- ref_counter=RefCounter}}
+ group=Group#group{dbname=DbName},
+ ref_counter=erlang:monitor(process,Fd)}}
end;
Error ->
ReturnPid ! {Ref, self(), Error},
@@ -120,19 +118,16 @@ init({InitArgs, ReturnPid, Ref}) ->
handle_call({request_group, RequestSeq}, From,
#group_state{
- db_name=DbName,
group=#group{current_seq=Seq}=Group,
updater_pid=nil,
waiting_list=WaitList
}=State) when RequestSeq > Seq ->
- {ok, Db} = couch_db:open_int(DbName, []),
- Group2 = Group#group{db=Db},
Owner = self(),
- Pid = spawn_link(fun()-> couch_view_updater:update(Owner, Group2) end),
+ Pid = spawn_link(fun()-> couch_view_updater:update(Owner, Group) end),
{noreply, State#group_state{
updater_pid=Pid,
- group=Group2,
+ group=Group,
waiting_list=[{From,RequestSeq}|WaitList]
}, infinity};
@@ -153,6 +148,10 @@ handle_call({request_group, RequestSeq}, From,
waiting_list=[{From, RequestSeq}|WaitList]
}, infinity};
+handle_call({start_compact, CompactFun}, _From, State) ->
+ {noreply, NewState} = handle_cast({start_compact, CompactFun}, State),
+ {reply, {ok, NewState#group_state.compactor_pid}, NewState};
+
handle_call(request_group_info, _From, State) ->
GroupInfo = get_group_info(State),
{reply, {ok, GroupInfo}, State}.
@@ -160,24 +159,23 @@ handle_call(request_group_info, _From, State) ->
handle_cast({start_compact, CompactFun}, #group_state{compactor_pid=nil}
= State) ->
#group_state{
- group = #group{name = GroupId, sig = GroupSig} = Group,
- init_args = {RootDir, DbName, _}
+ group = #group{dbname = DbName, name = GroupId, sig = GroupSig} = Group,
+ init_args = {RootDir, _, _}
} = State,
?LOG_INFO("View index compaction starting for ~s ~s", [DbName, GroupId]),
- {ok, Db} = couch_db:open_int(DbName, []),
{ok, Fd} = open_index_file(compact, RootDir, DbName, GroupSig),
- NewGroup = reset_file(Db, Fd, DbName, Group),
+ NewGroup = reset_file(Fd, DbName, Group),
Pid = spawn_link(fun() -> CompactFun(Group, NewGroup) end),
{noreply, State#group_state{compactor_pid = Pid}};
handle_cast({start_compact, _}, State) ->
%% compact already running, this is a no-op
{noreply, State};
-handle_cast({compact_done, #group{current_seq=NewSeq} = NewGroup},
+handle_cast({compact_done, #group{fd=NewFd, current_seq=NewSeq} = NewGroup},
#group_state{group = #group{current_seq=OldSeq}} = State)
when NewSeq >= OldSeq ->
#group_state{
- group = #group{name=GroupId, fd=OldFd, sig=GroupSig} = Group,
+ group = #group{name=GroupId, fd=OldFd, sig=GroupSig},
init_args = {RootDir, DbName, _},
updater_pid = UpdaterPid,
ref_counter = RefCounter
@@ -202,17 +200,12 @@ handle_cast({compact_done, #group{current_seq=NewSeq} = NewGroup},
%% cleanup old group
unlink(OldFd),
- couch_ref_counter:drop(RefCounter),
- {ok, NewRefCounter} = couch_ref_counter:start([NewGroup#group.fd]),
- case Group#group.db of
- nil -> ok;
- Else -> couch_db:close(Else)
- end,
+ erlang:demonitor(RefCounter),
self() ! delayed_commit,
{noreply, State#group_state{
group=NewGroup,
- ref_counter=NewRefCounter,
+ ref_counter=erlang:monitor(process,NewFd),
compactor_pid=nil,
updater_pid=NewUpdaterPid
}};
@@ -223,17 +216,14 @@ handle_cast({compact_done, NewGroup}, State) ->
} = State,
?LOG_INFO("View index compaction still behind for ~s ~s -- current: ~p " ++
"compact: ~p", [DbName, GroupId, CurrentSeq, NewGroup#group.current_seq]),
- couch_db:close(NewGroup#group.db),
- {ok, Db} = couch_db:open_int(DbName, []),
+ GroupServer = self(),
Pid = spawn_link(fun() ->
+ erlang:monitor(process, NewGroup#group.fd),
{_,Ref} = erlang:spawn_monitor(fun() ->
- couch_view_updater:update(nil, NewGroup#group{db = Db})
+ couch_view_updater:update(nil, NewGroup)
end),
- receive
- {'DOWN', Ref, _, _, {new_group, NewGroup2}} ->
- #group{name=GroupId} = NewGroup2,
- Pid2 = couch_view:get_group_server(DbName, GroupId),
- gen_server:cast(Pid2, {compact_done, NewGroup2})
+ receive {'DOWN', Ref, _, _, {new_group, NewGroup2}} ->
+ gen_server:cast(GroupServer, {compact_done, NewGroup2})
end
end),
{noreply, State#group_state{compactor_pid = Pid}};
@@ -245,7 +235,7 @@ handle_cast({partial_update, Pid, NewGroup}, #group_state{updater_pid=Pid}
waiting_commit = WaitingCommit
} = State,
NewSeq = NewGroup#group.current_seq,
- ?LOG_INFO("checkpointing view update at seq ~p for ~s ~s", [NewSeq,
+ ?LOG_DEBUG("checkpointing view update at seq ~p for ~s ~s", [NewSeq,
DbName, NewGroup#group.name]),
if not WaitingCommit ->
erlang:send_after(1000, self(), delayed_commit);
@@ -275,13 +265,12 @@ handle_info(delayed_commit, #group_state{db_name=DbName,group=Group}=State) ->
{noreply, State#group_state{waiting_commit=true}}
end;
-handle_info({'EXIT', FromPid, {new_group, #group{db=Db}=Group}},
- #group_state{db_name=DbName,
+handle_info({'EXIT', FromPid, {new_group, Group}},
+ #group_state{
updater_pid=UpPid,
ref_counter=RefCounter,
waiting_list=WaitList,
waiting_commit=WaitingCommit}=State) when UpPid == FromPid ->
- ok = couch_db:close(Db),
if not WaitingCommit ->
erlang:send_after(1000, self(), delayed_commit);
true -> ok
@@ -289,26 +278,20 @@ handle_info({'EXIT', FromPid, {new_group, #group{db=Db}=Group}},
case reply_with_group(Group, WaitList, [], RefCounter) of
[] ->
{noreply, State#group_state{waiting_commit=true, waiting_list=[],
- group=Group#group{db=nil}, updater_pid=nil}};
+ group=Group, updater_pid=nil}};
StillWaiting ->
- % we still have some waiters, reopen the database and reupdate the index
- {ok, Db2} = couch_db:open_int(DbName, []),
- Group2 = Group#group{db=Db2},
+ % we still have some waiters, reupdate the index
Owner = self(),
- Pid = spawn_link(fun() -> couch_view_updater:update(Owner, Group2) end),
+ Pid = spawn_link(fun() -> couch_view_updater:update(Owner, Group) end),
{noreply, State#group_state{waiting_commit=true,
- waiting_list=StillWaiting, group=Group2, updater_pid=Pid}}
+ waiting_list=StillWaiting, group=Group, updater_pid=Pid}}
end;
handle_info({'EXIT', _, {new_group, _}}, State) ->
%% message from an old (probably pre-compaction) updater; ignore
{noreply, State};
-handle_info({'EXIT', FromPid, reset},
- #group_state{
- init_args=InitArgs,
- updater_pid=UpPid,
- group=Group}=State) when UpPid == FromPid ->
- ok = couch_db:close(Group#group.db),
+handle_info({'EXIT', FromPid, reset}, #group_state{init_args=InitArgs,
+ updater_pid=FromPid}=State) ->
case prepare_group(InitArgs, true) of
{ok, ResetGroup} ->
Owner = self(),
@@ -334,8 +317,9 @@ handle_info({'EXIT', FromPid, Reason}, State) ->
?LOG_DEBUG("Exit from linked pid: ~p", [{FromPid, Reason}]),
{stop, Reason, State};
-handle_info({'DOWN',_,_,_,_}, State) ->
- ?LOG_INFO("Shutting down view group server, monitored db is closing.", []),
+handle_info({'DOWN',_,_,Pid,Reason}, #group_state{group=G}=State) ->
+ ?LOG_INFO("Shutting down group server ~p, db ~p closing w/ reason~n~p",
+ [G#group.name, Pid, Reason]),
{stop, normal, reply_all(State, shutdown)}.
@@ -371,32 +355,29 @@ reply_all(#group_state{waiting_list=WaitList}=State, Reply) ->
[catch gen_server:reply(Pid, Reply) || {Pid, _} <- WaitList],
State#group_state{waiting_list=[]}.
+prepare_group({Root, DbName, #group{dbname=X}=G}, Reset) when X =/= DbName ->
+ prepare_group({Root, DbName, G#group{dbname=DbName}}, Reset);
prepare_group({RootDir, DbName, #group{sig=Sig}=Group}, ForceReset)->
- case couch_db:open_int(DbName, []) of
- {ok, Db} ->
- case open_index_file(RootDir, DbName, Sig) of
- {ok, Fd} ->
- if ForceReset ->
- % this can happen if we missed a purge
- {ok, reset_file(Db, Fd, DbName, Group)};
- true ->
- % 09 UPGRADE CODE
- ok = couch_file:upgrade_old_header(Fd, <<$r, $c, $k, 0>>),
- case (catch couch_file:read_header(Fd)) of
- {ok, {Sig, HeaderInfo}} ->
- % sigs match!
- {ok, init_group(Db, Fd, Group, HeaderInfo)};
- _ ->
- % this happens on a new file
- {ok, reset_file(Db, Fd, DbName, Group)}
- end
- end;
- Error ->
- catch delete_index_file(RootDir, DbName, Sig),
- Error
+ case open_index_file(RootDir, DbName, Sig) of
+ {ok, Fd} ->
+ if ForceReset ->
+ % this can happen if we missed a purge
+ {ok, reset_file(Fd, DbName, Group)};
+ true ->
+ % 09 UPGRADE CODE
+ ok = couch_file:upgrade_old_header(Fd, <<$r, $c, $k, 0>>),
+ case (catch couch_file:read_header(Fd)) of
+ {ok, {Sig, HeaderInfo}} ->
+ % sigs match!
+ {ok, init_group(Fd, Group, HeaderInfo)};
+ _ ->
+ % this happens on a new file
+ {ok, reset_file(Fd, DbName, Group)}
+ end
end;
- Else ->
- Else
+ Error ->
+ catch delete_index_file(RootDir, DbName, Sig),
+ Error
end.
get_index_header_data(#group{current_seq=Seq, purge_seq=PurgeSeq,
@@ -446,7 +427,7 @@ open_temp_group(DbName, Language, DesignOptions, MapSrc, RedSrc) ->
reduce_funs= if RedSrc==[] -> []; true -> [{<<"_temp">>, RedSrc}] end,
options=DesignOptions},
- {ok, Db, set_view_sig(#group{name = <<"_temp">>, db=Db, views=[View],
+ {ok, Db, set_view_sig(#group{name = <<"_temp">>, views=[View],
def_lang=Language, design_options=DesignOptions})};
Error ->
Error
@@ -531,28 +512,39 @@ design_doc_to_view_group(#doc{id=Id,body={Fields}}) ->
{View#view{id_num=N},N+1}
end, 0, lists:sort(dict:to_list(DictBySrc))),
- set_view_sig(#group{name=Id, views=Views, def_lang=Language, design_options=DesignOptions}).
-
-reset_group(#group{views=Views}=Group) ->
- Views2 = [View#view{btree=nil} || View <- Views],
- Group#group{db=nil,fd=nil,query_server=nil,current_seq=0,
- id_btree=nil,views=Views2}.
-
-reset_file(Db, Fd, DbName, #group{sig=Sig,name=Name} = Group) ->
- ?LOG_DEBUG("Resetting group index \"~s\" in db ~s", [Name, DbName]),
+ #group{
+ name = Id,
+ views = Views,
+ def_lang = Language,
+ design_options = DesignOptions,
+ sig = couch_util:md5(term_to_binary({Views, Language, DesignOptions}))
+ }.
+
+reset_group(DbName, #group{views=Views}=Group) ->
+ Group#group{
+ fd = nil,
+ dbname = DbName,
+ query_server = nil,
+ current_seq = 0,
+ id_btree = nil,
+ views = [View#view{btree=nil} || View <- Views]
+ }.
+
+reset_file(Fd, DbName, #group{sig=Sig,name=Name} = Group) ->
+ ?LOG_INFO("Resetting group index \"~s\" in db ~s", [Name, DbName]),
ok = couch_file:truncate(Fd, 0),
ok = couch_file:write_header(Fd, {Sig, nil}),
- init_group(Db, Fd, reset_group(Group), nil).
+ init_group(Fd, reset_group(DbName, Group), nil).
delete_index_file(RootDir, DbName, GroupSig) ->
couch_file:delete(RootDir, index_file_name(RootDir, DbName, GroupSig)).
-init_group(Db, Fd, #group{views=Views}=Group, nil) ->
- init_group(Db, Fd, Group,
- #index_header{seq=0, purge_seq=couch_db:get_purge_seq(Db),
- id_btree_state=nil, view_states=[nil || _ <- Views]});
-init_group(Db, Fd, #group{def_lang=Lang,views=Views}=
- Group, IndexHeader) ->
+init_group(Fd, #group{dbname=DbName, views=Views}=Group, nil) ->
+ {ok, Db} = couch_db:open(DbName, []),
+ PurgeSeq = try couch_db:get_purge_seq(Db) after couch_db:close(Db) end,
+ Header = #index_header{purge_seq=PurgeSeq, view_states=[nil || _ <- Views]},
+ init_group(Fd, Group, Header);
+init_group(Fd, #group{def_lang=Lang,views=Views}=Group, IndexHeader) ->
#index_header{seq=Seq, purge_seq=PurgeSeq,
id_btree_state=IdBtreeState, view_states=ViewStates} = IndexHeader,
{ok, IdBtree} = couch_btree:open(IdBtreeState, Fd),
@@ -580,13 +572,10 @@ init_group(Db, Fd, #group{def_lang=Lang,views=Views}=
<<"raw">> ->
Less = fun(A,B) -> A < B end
end,
- {ok, Btree} = couch_btree:open(BtreeState, Fd,
- [{less, Less},
- {reduce, ReduceFun}]),
+ {ok, Btree} = couch_btree:open(BtreeState, Fd, [{less, Less},
+ {reduce, ReduceFun}]),
View#view{btree=Btree}
end,
ViewStates, Views),
- Group#group{db=Db, fd=Fd, current_seq=Seq, purge_seq=PurgeSeq,
- id_btree=IdBtree, views=Views2}.
-
-
+ Group#group{fd=Fd, current_seq=Seq, purge_seq=PurgeSeq, id_btree=IdBtree,
+ views=Views2}.