From 88ec14c220592c8c0db7869c9961423e9ee97e7c Mon Sep 17 00:00:00 2001 From: "Damien F. Katz" Date: Tue, 5 Aug 2008 01:43:40 +0000 Subject: Added concurrent open db limit and a LRU cache for closing old databases when limit reached (configurable via MaxDbsOpen var in couch.ini). Refactored db update code in couch_db.erl into couch_db_updater.erl. git-svn-id: https://svn.apache.org/repos/asf/incubator/couchdb/trunk@682560 13f79535-47bb-0310-9956-ffa450edef68 --- src/couchdb/couch_view.erl | 99 ++++++++++++++++++++++++++-------------------- 1 file changed, 56 insertions(+), 43 deletions(-) (limited to 'src/couchdb/couch_view.erl') diff --git a/src/couchdb/couch_view.erl b/src/couchdb/couch_view.erl index 53cc4cda..a29a809c 100644 --- a/src/couchdb/couch_view.erl +++ b/src/couchdb/couch_view.erl @@ -57,22 +57,22 @@ get_updater(DbName, GroupId) -> Pid. get_updated_group(Pid) -> - Mref = erlang:monitor(process, Pid), + Mref = erlang:monitor(process, Pid), receive - {'DOWN', Mref, _, _, Reason} -> - throw(Reason) + {'DOWN', Mref, _, _, Reason} -> + throw(Reason) after 0 -> - Pid ! {self(), get_updated}, - receive - {Pid, Response} -> - erlang:demonitor(Mref), - receive - {'DOWN', Mref, _, _, _} -> ok - after 0 -> ok - end, - Response; - {'DOWN', Mref, _, _, Reason} -> - throw(Reason) + Pid ! {self(), get_updated}, + receive + {Pid, Response} -> + erlang:demonitor(Mref), + receive + {'DOWN', Mref, _, _, _} -> ok + after 0 -> ok + end, + Response; + {'DOWN', Mref, _, _, Reason} -> + throw(Reason) end end. @@ -216,10 +216,7 @@ init(RootDir) -> {ok, #server{root_dir=RootDir}}. terminate(_Reason, _) -> - catch ets:delete(couch_views_by_name), - catch ets:delete(couch_views_by_updater), - catch ets:delete(couch_views_by_db), - catch ets:delete(couch_views_temp_fd_by_db). + ok. handle_call({start_temp_updater, DbName, Lang, MapSrc, RedSrc}, _From, #server{root_dir=Root}=Server) -> @@ -317,7 +314,7 @@ code_change(_OldVsn, State, _Extra) -> start_temp_update_loop(DbName, Fd, Lang, MapSrc, RedSrc) -> NotifyPids = get_notify_pids(1000), - case couch_server:open(DbName) of + case couch_db:open(DbName, []) of {ok, Db} -> View = #view{map_names=["_temp"], id_num=0, @@ -331,16 +328,20 @@ start_temp_update_loop(DbName, Fd, Lang, MapSrc, RedSrc) -> def_lang=Lang, id_btree=nil}, Group2 = init_group(Db, Fd, Group,nil), - temp_update_loop(Group2, NotifyPids); + couch_db:monitor(Db), + couch_db:close(Db), + temp_update_loop(DbName, Group2, NotifyPids); Else -> exit(Else) end. -temp_update_loop(Group, NotifyPids) -> - {ok, Group2} = update_group(Group), +temp_update_loop(DbName, Group, NotifyPids) -> + {ok, Db} = couch_db:open(DbName, []), + {ok, Group2} = update_group(Group#group{db=Db}), + couch_db:close(Db), [Pid ! {self(), {ok, Group2}} || Pid <- NotifyPids], garbage_collect(), - temp_update_loop(Group2, get_notify_pids(10000)). + temp_update_loop(DbName, Group2, get_notify_pids(10000)). reset_group(#group{views=Views}=Group) -> @@ -355,21 +356,21 @@ start_update_loop(RootDir, DbName, GroupId) -> start_update_loop(RootDir, DbName, GroupId, NotifyPids) -> {Db, Group} = - case (catch couch_server:open(DbName)) of + case (catch couch_db:open(DbName, [])) of {ok, Db0} -> case (catch couch_db:open_doc(Db0, GroupId)) of {ok, Doc} -> {Db0, design_doc_to_view_group(Doc)}; - Else -> - delete_index_file(RootDir, DbName, GroupId), - exit(Else) - end; - Else -> - delete_index_file(RootDir, DbName, GroupId), - exit(Else) - end, - FileName = RootDir ++ "/." ++ DbName ++ GroupId ++".view", - Group2 = + Else -> + delete_index_file(RootDir, DbName, GroupId), + exit(Else) + end; + Else -> + delete_index_file(RootDir, DbName, GroupId), + exit(Else) + end, + FileName = RootDir ++ "/." ++ DbName ++ GroupId ++".view", + Group2 = case couch_file:open(FileName) of {ok, Fd} -> Sig = Group#group.sig, @@ -386,7 +387,8 @@ start_update_loop(RootDir, DbName, GroupId, NotifyPids) -> Error -> throw(Error) end end, - + couch_db:monitor(Db), + couch_db:close(Db), update_loop(RootDir, DbName, GroupId, Group2, NotifyPids). reset_file(Db, Fd, DbName, #group{sig=Sig,name=Name} = Group) -> @@ -396,14 +398,22 @@ reset_file(Db, Fd, DbName, #group{sig=Sig,name=Name} = Group) -> init_group(Db, Fd, reset_group(Group), nil). update_loop(RootDir, DbName, GroupId, #group{sig=Sig,fd=Fd}=Group, NotifyPids) -> - try update_group(Group) of - {ok, Group2} -> + {ok, Db}= couch_db:open(DbName, []), + Result = + try + update_group(Group#group{db=Db}) + catch + throw: restart -> restart + after + couch_db:close(Db) + end, + case Result of + {ok, Group2} -> HeaderData = {Sig, get_index_header_data(Group2)}, ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, HeaderData), [Pid ! {self(), {ok, Group2}} || Pid <- NotifyPids], garbage_collect(), - update_loop(RootDir, DbName, GroupId, Group2, get_notify_pids(100000)) - catch + update_loop(RootDir, DbName, GroupId, Group2, get_notify_pids(100000)); restart -> couch_file:close(Group#group.fd), start_update_loop(RootDir, DbName, GroupId, NotifyPids ++ get_notify_pids()) @@ -414,20 +424,23 @@ get_notify_pids(Wait) -> receive {Pid, get_updated} -> [Pid | get_notify_pids()]; + {'DOWN', _MonitorRef, _Type, _Pid, _Info} -> + ?LOG_DEBUG("View monitor received parent db shutdown notification. Shutting down instance.", []), + exit(normal); Else -> ?LOG_ERROR("Unexpected message in view updater: ~p", [Else]), exit({error, Else}) after Wait -> exit(wait_timeout) - end. + end. % then keep getting all available and return. get_notify_pids() -> receive {Pid, get_updated} -> [Pid | get_notify_pids()] - after 0 -> - [] - end. + after 0 -> + [] + end. update_group(#group{db=Db,current_seq=CurrentSeq, views=Views}=Group) -> ViewEmptyKVs = [{View, []} || View <- Views], -- cgit v1.2.3