From 5a9321814a727e8c010bf83f05572a341d55f26a Mon Sep 17 00:00:00 2001 From: John Christopher Anderson Date: Wed, 10 Dec 2008 01:13:17 +0000 Subject: view group state gen_server. thanks damien and davisp. git-svn-id: https://svn.apache.org/repos/asf/incubator/couchdb/trunk@724946 13f79535-47bb-0310-9956-ffa450edef68 --- src/couchdb/Makefile.am | 4 + src/couchdb/couch_db.erl | 5 +- src/couchdb/couch_db.hrl | 37 ++- src/couchdb/couch_db_updater.erl | 2 +- src/couchdb/couch_httpd.erl | 2 +- src/couchdb/couch_httpd_view.erl | 11 +- src/couchdb/couch_server.erl | 8 - src/couchdb/couch_view.erl | 551 ++++--------------------------------- src/couchdb/couch_view_group.erl | 192 +++++++++++++ src/couchdb/couch_view_updater.erl | 384 ++++++++++++++++++++++++++ 10 files changed, 686 insertions(+), 510 deletions(-) create mode 100644 src/couchdb/couch_view_group.erl create mode 100644 src/couchdb/couch_view_updater.erl (limited to 'src/couchdb') diff --git a/src/couchdb/Makefile.am b/src/couchdb/Makefile.am index 2e4a85c8..a2099931 100644 --- a/src/couchdb/Makefile.am +++ b/src/couchdb/Makefile.am @@ -63,6 +63,8 @@ source_files = \ couch_stream.erl \ couch_util.erl \ couch_view.erl \ + couch_view_updater.erl \ + couch_view_group.erl \ couch_db_updater.erl EXTRA_DIST = $(source_files) couch_db.hrl @@ -92,6 +94,8 @@ compiled_files = \ couch_stream.beam \ couch_util.beam \ couch_view.beam \ + couch_view_updater.beam \ + couch_view_group.beam \ couch_db_updater.beam # doc_base = \ diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl index 7de5b8db..5da16b83 100644 --- a/src/couchdb/couch_db.erl +++ b/src/couchdb/couch_db.erl @@ -17,7 +17,7 @@ -export([open_ref_counted/3,num_refs/1,monitor/1]). -export([update_doc/3,update_docs/4,update_docs/2,update_docs/3,delete_doc/3]). -export([get_doc_info/2,open_doc/2,open_doc/3,open_doc_revs/4]). --export([get_missing_revs/2,name/1,doc_to_tree/1]). +-export([get_missing_revs/2,name/1,doc_to_tree/1,get_update_seq/1]). -export([enum_docs/4,enum_docs/5,enum_docs_since/4,enum_docs_since/5]). -export([enum_docs_since_reduce_to_count/1,enum_docs_reduce_to_count/1]). -export([increment_update_seq/1,get_purge_seq/1,purge_docs/2,get_last_purged/1]). @@ -145,6 +145,9 @@ purge_docs(#db{update_pid=UpdatePid}, IdsRevs) -> gen_server:call(UpdatePid, {purge_docs, IdsRevs}). +get_update_seq(#db{header=#db_header{update_seq=Seq}})-> + Seq. + get_purge_seq(#db{header=#db_header{purge_seq=PurgeSeq}})-> PurgeSeq. diff --git a/src/couchdb/couch_db.hrl b/src/couchdb/couch_db.hrl index 7bf3cb9d..6a48ab1c 100644 --- a/src/couchdb/couch_db.hrl +++ b/src/couchdb/couch_db.hrl @@ -88,7 +88,7 @@ -record(user_ctx, - {name=nil, + {name=null, roles=[] }). @@ -152,6 +152,41 @@ include_docs = false }). +-record(group, + {sig=nil, + db=nil, + fd=nil, + name, + def_lang, + views, + id_btree=nil, + current_seq=0, + purge_seq=0, + query_server=nil + }). + +-record(view, + {id_num, + map_names=[], + def, + btree=nil, + reduce_funs=[] + }). + +-record(server,{ + root_dir = [], + dbname_regexp, + max_dbs_open=100, + current_dbs_open=0, + start_time="" + }). + +-record(index_header, + {seq=0, + purge_seq=0, + id_btree_state=nil, + view_states=nil + }). % small value used in revision trees to indicate the revision isn't stored -define(REV_MISSING, []). diff --git a/src/couchdb/couch_db_updater.erl b/src/couchdb/couch_db_updater.erl index 67a6f624..68c3a1fc 100644 --- a/src/couchdb/couch_db_updater.erl +++ b/src/couchdb/couch_db_updater.erl @@ -129,7 +129,7 @@ handle_call({purge_docs, IdRevs}, _From, Db) -> Db#db{ fulldocinfo_by_id_btree = DocInfoByIdBTree2, docinfo_by_seq_btree = DocInfoBySeqBTree2, - update_seq = NewSeq, + update_seq = NewSeq + 1, header=Header#db_header{purge_seq=PurgeSeq+1, purged_docs=Pointer}}), ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}), diff --git a/src/couchdb/couch_httpd.erl b/src/couchdb/couch_httpd.erl index 7d621d25..0d489869 100644 --- a/src/couchdb/couch_httpd.erl +++ b/src/couchdb/couch_httpd.erl @@ -261,7 +261,7 @@ basic_username_pw(Req) -> [User, Pass] -> {User, Pass}; [User] -> - {User, <<"">>}; + {User, ""}; _ -> nil end; diff --git a/src/couchdb/couch_httpd_view.erl b/src/couchdb/couch_httpd_view.erl index 5b19af5d..f1c8616f 100644 --- a/src/couchdb/couch_httpd_view.erl +++ b/src/couchdb/couch_httpd_view.erl @@ -22,18 +22,19 @@ start_json_response/2,send_chunk/2,end_json_response/1]). design_doc_view(Req, Db, Id, ViewName, Keys) -> + #view_query_args{ + update = Update, + reduce = Reduce + } = QueryArgs = parse_view_query(Req, Keys), case couch_view:get_map_view({couch_db:name(Db), - <<"_design/", Id/binary>>, ViewName}) of + <<"_design/", Id/binary>>, ViewName, Update}) of {ok, View} -> - QueryArgs = parse_view_query(Req, Keys), output_map_view(Req, View, Db, QueryArgs, Keys); {not_found, Reason} -> case couch_view:get_reduce_view({couch_db:name(Db), <<"_design/", Id/binary>>, ViewName}) of {ok, View} -> - #view_query_args{ - reduce = Reduce - } = QueryArgs = parse_view_query(Req, Keys, true), + parse_view_query(Req, Keys, true), % just for validation case Reduce of false -> {reduce, _N, _Lang, MapView} = View, diff --git a/src/couchdb/couch_server.erl b/src/couchdb/couch_server.erl index 34aa16b7..08f71f2b 100644 --- a/src/couchdb/couch_server.erl +++ b/src/couchdb/couch_server.erl @@ -22,14 +22,6 @@ -include("couch_db.hrl"). --record(server,{ - root_dir = [], - dbname_regexp, - max_dbs_open=100, - current_dbs_open=0, - start_time="" - }). - start() -> start(["default.ini"]). diff --git a/src/couchdb/couch_view.erl b/src/couchdb/couch_view.erl index 4ebbb136..7c7730a7 100644 --- a/src/couchdb/couch_view.erl +++ b/src/couchdb/couch_view.erl @@ -13,45 +13,12 @@ -module(couch_view). -behaviour(gen_server). --export([start_link/0,fold/4,fold/5,less_json/2, start_update_loop/3, start_temp_update_loop/5]). +-export([start_link/0,fold/4,fold/5,less_json/2,less_json_keys/2,expand_dups/2,detuple_kvs/2]). -export([init/1,terminate/2,handle_call/3,handle_cast/2,handle_info/2,code_change/3]). -export([get_reduce_view/1, get_map_view/1,get_row_count/1,reduce_to_count/1, fold_reduce/7]). -include("couch_db.hrl"). - --record(group, - {sig=nil, - db=nil, - fd=nil, - name, - def_lang, - views, - id_btree=nil, - current_seq=0, - purge_seq=0, - query_server=nil - }). - --record(view, - {id_num, - map_names=[], - def, - btree=nil, - reduce_funs=[] - }). - --record(server, - {root_dir - }). - --record(index_header, - {seq=0, - purge_seq=0, - id_btree_state=nil, - view_states=nil - }). - start_link() -> gen_server:start_link({local, couch_view}, couch_view, [], []). @@ -59,41 +26,30 @@ get_temp_updater(DbName, Type, MapSrc, RedSrc) -> {ok, Pid} = gen_server:call(couch_view, {start_temp_updater, DbName, Type, MapSrc, RedSrc}), Pid. -get_updater(DbName, GroupId) -> - {ok, Pid} = gen_server:call(couch_view, {start_updater, DbName, GroupId}), +get_group_server(DbName, GroupId) -> + {ok, Pid} = gen_server:call(couch_view, {start_group_server, DbName, GroupId}), Pid. -get_updated_group(Pid) -> - Mref = erlang:monitor(process, Pid), - receive - {'DOWN', Mref, _, _, Reason} -> - throw(Reason) - after 0 -> - Pid ! {self(), get_updated}, - receive - {Pid, Response} -> - erlang:demonitor(Mref), - receive - {'DOWN', Mref, _, _, _} -> ok - after 0 -> ok - end, - Response; - {'DOWN', Mref, _, _, Reason} -> - throw(Reason) - end - end. +get_updated_group(DbName, GroupId, Update) -> + couch_view_group:request_group(get_group_server(DbName, GroupId), seq_for_update(DbName, Update)). + +get_updated_group(temp, DbName, Type, MapSrc, RedSrc, Update) -> + couch_view_group:request_group(get_temp_updater(DbName, Type, MapSrc, RedSrc), seq_for_update(DbName, Update)). get_row_count(#view{btree=Bt}) -> {ok, {Count, _Reds}} = couch_btree:full_reduce(Bt), {ok, Count}. get_reduce_view({temp, DbName, Type, MapSrc, RedSrc}) -> - {ok, #group{views=[View]}} = get_updated_group(get_temp_updater(DbName, Type, MapSrc, RedSrc)), + {ok, #group{views=[View]}} = get_updated_group(temp, DbName, Type, MapSrc, RedSrc, true), {ok, {temp_reduce, View}}; get_reduce_view({DbName, GroupId, Name}) -> - {ok, #group{views=Views,def_lang=Lang}} = - get_updated_group(get_updater(DbName, GroupId)), - get_reduce_view0(Name, Lang, Views). + case get_updated_group(DbName, GroupId, true) of + {error, Reason} -> + Reason; + {ok, #group{views=Views,def_lang=Lang}} -> + get_reduce_view0(Name, Lang, Views) + end. get_reduce_view0(_Name, _Lang, []) -> {not_found, missing_named_view}; @@ -153,13 +109,26 @@ get_key_pos(Key, [{Key1,_Value}|_], N) when Key == Key1 -> N + 1; get_key_pos(Key, [_|Rest], N) -> get_key_pos(Key, Rest, N+1). + +seq_for_update(DbName, Update) -> + case Update of + true -> + {ok, #db{update_seq=CurrentSeq}} = couch_db:open(DbName, []), + CurrentSeq; + _Else -> + 0 + end. get_map_view({temp, DbName, Type, Src}) -> - {ok, #group{views=[View]}} = get_updated_group(get_temp_updater(DbName, Type, Src, [])), + {ok, #group{views=[View]}} = get_updated_group(temp, DbName, Type, Src, [], true), {ok, View}; -get_map_view({DbName, GroupId, Name}) -> - {ok, #group{views=Views}} = get_updated_group(get_updater(DbName, GroupId)), - get_map_view0(Name, Views). +get_map_view({DbName, GroupId, Name, Update}) -> + case get_updated_group(DbName, GroupId, Update) of + {error, Reason} -> + Reason; + {ok, #group{views=Views}} -> + get_map_view0(Name, Views) + end. get_map_view0(_Name, []) -> {not_found, missing_named_view}; @@ -183,37 +152,6 @@ reduce_to_count(Reductions) -> Count. -design_doc_to_view_group(#doc{id=Id,body={Fields}}) -> - Language = proplists:get_value(<<"language">>, Fields, <<"javascript">>), - {RawViews} = proplists:get_value(<<"views">>, Fields, {[]}), - - % add the views to a dictionary object, with the map source as the key - DictBySrc = - lists:foldl( - fun({Name, {MRFuns}}, DictBySrcAcc) -> - MapSrc = proplists:get_value(<<"map">>, MRFuns), - RedSrc = proplists:get_value(<<"reduce">>, MRFuns, null), - View = - case dict:find(MapSrc, DictBySrcAcc) of - {ok, View0} -> View0; - error -> #view{def=MapSrc} % create new view object - end, - View2 = - if RedSrc == null -> - View#view{map_names=[Name|View#view.map_names]}; - true -> - View#view{reduce_funs=[{Name,RedSrc}|View#view.reduce_funs]} - end, - dict:store(MapSrc, View2, DictBySrcAcc) - end, dict:new(), RawViews), - % number the views - {Views, _N} = lists:mapfoldl( - fun({_Src, View}, N) -> - {View#view{id_num=N},N+1} - end, 0, dict:to_list(DictBySrc)), - - Group = #group{name=Id, views=Views, def_lang=Language}, - Group#group{sig=erlang:md5(term_to_binary(Group))}. fold_fun(_Fun, [], _, Acc) -> {ok, Acc}; @@ -253,10 +191,10 @@ init([]) -> (_Else) -> ok end), - ets:new(couch_views_by_db, [bag, private, named_table]), - ets:new(couch_views_by_name, [set, protected, named_table]), - ets:new(couch_views_by_updater, [set, private, named_table]), - ets:new(couch_views_temp_fd_by_db, [set, protected, named_table]), + ets:new(couch_groups_by_db, [bag, private, named_table]), + ets:new(group_servers_by_name, [set, protected, named_table]), + ets:new(couch_groups_by_updater, [set, private, named_table]), + ets:new(couch_temp_group_fd_by_db, [set, protected, named_table]), process_flag(trap_exit, true), {ok, #server{root_dir=RootDir}}. @@ -268,9 +206,9 @@ handle_call({start_temp_updater, DbName, Lang, MapSrc, RedSrc}, _From, #server{r <> = erlang:md5(term_to_binary({Lang, MapSrc, RedSrc})), Name = lists:flatten(io_lib:format("_temp_~.36B",[SigInt])), Pid = - case ets:lookup(couch_views_by_name, {DbName, Name}) of + case ets:lookup(group_servers_by_name, {DbName, Name}) of [] -> - case ets:lookup(couch_views_temp_fd_by_db, DbName) of + case ets:lookup(couch_temp_group_fd_by_db, DbName) of [] -> FileName = Root ++ "/." ++ binary_to_list(DbName) ++ "_temp", {ok, Fd} = couch_file:open(FileName, [create, overwrite]), @@ -279,21 +217,20 @@ handle_call({start_temp_updater, DbName, Lang, MapSrc, RedSrc}, _From, #server{r ok end, ?LOG_DEBUG("Spawning new temp update process for db ~s.", [DbName]), - NewPid = spawn_link(couch_view, start_temp_update_loop, - [DbName, Fd, Lang, MapSrc, RedSrc]), - true = ets:insert(couch_views_temp_fd_by_db, {DbName, Fd, Count + 1}), + {ok, NewPid} = couch_view_group:start_link({DbName, Fd, Lang, MapSrc, RedSrc}), + true = ets:insert(couch_temp_group_fd_by_db, {DbName, Fd, Count + 1}), add_to_ets(NewPid, DbName, Name), NewPid; [{_, ExistingPid0}] -> ExistingPid0 end, {reply, {ok, Pid}, Server}; -handle_call({start_updater, DbName, GroupId}, _From, #server{root_dir=Root}=Server) -> +handle_call({start_group_server, DbName, GroupId}, _From, #server{root_dir=Root}=Server) -> Pid = - case ets:lookup(couch_views_by_name, {DbName, GroupId}) of + case ets:lookup(group_servers_by_name, {DbName, GroupId}) of [] -> - ?LOG_DEBUG("Spawning new update process for view group ~s in database ~s.", [GroupId, DbName]), - NewPid = spawn_link(couch_view, start_update_loop, [Root, DbName, GroupId]), + ?LOG_DEBUG("Spawning new group server for view group ~s in database ~s.", [GroupId, DbName]), + {ok, NewPid} = couch_view_group:start_link({Root, DbName, GroupId}), add_to_ets(NewPid, DbName, GroupId), NewPid; [{_, ExistingPid0}] -> @@ -303,11 +240,11 @@ handle_call({start_updater, DbName, GroupId}, _From, #server{root_dir=Root}=Serv handle_cast({reset_indexes, DbName}, #server{root_dir=Root}=Server) -> % shutdown all the updaters - Names = ets:lookup(couch_views_by_db, DbName), + Names = ets:lookup(couch_groups_by_db, DbName), lists:foreach( fun({_DbName, GroupId}) -> ?LOG_DEBUG("Killing update process for view group ~s. in database ~s.", [GroupId, DbName]), - [{_, Pid}] = ets:lookup(couch_views_by_name, {DbName, GroupId}), + [{_, Pid}] = ets:lookup(group_servers_by_name, {DbName, GroupId}), exit(Pid, kill), receive {'EXIT', Pid, _} -> delete_from_ets(Pid, DbName, GroupId) @@ -318,22 +255,23 @@ handle_cast({reset_indexes, DbName}, #server{root_dir=Root}=Server) -> {noreply, Server}. handle_info({'EXIT', _FromPid, normal}, Server) -> - {noreply, Server}; + {noreply, Server}; handle_info({'EXIT', FromPid, Reason}, #server{root_dir=RootDir}=Server) -> - case ets:lookup(couch_views_by_updater, FromPid) of + ?LOG_DEBUG("Exit from process: ~p", [{FromPid, Reason}]), + case ets:lookup(couch_groups_by_updater, FromPid) of [] -> % non-updater linked process must have died, we propagate the error ?LOG_ERROR("Exit on non-updater process: ~p", [Reason]), exit(Reason); [{_, {DbName, "_temp_" ++ _ = GroupId}}] -> delete_from_ets(FromPid, DbName, GroupId), - [{_, Fd, Count}] = ets:lookup(couch_views_temp_fd_by_db, DbName), + [{_, Fd, Count}] = ets:lookup(couch_temp_group_fd_by_db, DbName), case Count of 1 -> % Last ref couch_file:close(Fd), file:delete(RootDir ++ "/." ++ binary_to_list(DbName) ++ "_temp"), - true = ets:delete(couch_views_temp_fd_by_db, DbName); + true = ets:delete(couch_temp_group_fd_by_db, DbName); _ -> - true = ets:insert(couch_views_temp_fd_by_db, {DbName, Fd, Count - 1}) + true = ets:insert(couch_temp_group_fd_by_db, {DbName, Fd, Count - 1}) end; [{_, {DbName, GroupId}}] -> delete_from_ets(FromPid, DbName, GroupId) @@ -344,225 +282,21 @@ handle_info(Msg, _Server) -> exit({error, Msg}). add_to_ets(Pid, DbName, GroupId) -> - true = ets:insert(couch_views_by_updater, {Pid, {DbName, GroupId}}), - true = ets:insert(couch_views_by_name, {{DbName, GroupId}, Pid}), - true = ets:insert(couch_views_by_db, {DbName, GroupId}). + true = ets:insert(couch_groups_by_updater, {Pid, {DbName, GroupId}}), + true = ets:insert(group_servers_by_name, {{DbName, GroupId}, Pid}), + true = ets:insert(couch_groups_by_db, {DbName, GroupId}). delete_from_ets(Pid, DbName, GroupId) -> - true = ets:delete(couch_views_by_updater, Pid), - true = ets:delete(couch_views_by_name, {DbName, GroupId}), - true = ets:delete_object(couch_views_by_db, {DbName, GroupId}). + true = ets:delete(couch_groups_by_updater, Pid), + true = ets:delete(group_servers_by_name, {DbName, GroupId}), + true = ets:delete_object(couch_groups_by_db, {DbName, GroupId}). code_change(_OldVsn, State, _Extra) -> {ok, State}. -start_temp_update_loop(DbName, Fd, Lang, MapSrc, RedSrc) -> - NotifyPids = get_notify_pids(1000), - case couch_db:open(DbName, []) of - {ok, Db} -> - View = #view{map_names=["_temp"], - id_num=0, - btree=nil, - def=MapSrc, - reduce_funs= if RedSrc==[] -> []; true -> [{"_temp", RedSrc}] end}, - Group = #group{name="_temp", - db=Db, - views=[View], - current_seq=0, - def_lang=Lang, - id_btree=nil}, - Group2 = init_group(Db, Fd, Group,nil), - couch_db:monitor(Db), - couch_db:close(Db), - temp_update_loop(DbName, Group2, NotifyPids); - Else -> - exit(Else) - end. -temp_update_loop(DbName, Group, NotifyPids) -> - {ok, Db} = couch_db:open(DbName, []), - {_Updated, Group2} = update_group(Group#group{db=Db}), - couch_db:close(Db), - [Pid ! {self(), {ok, Group2}} || Pid <- NotifyPids], - garbage_collect(), - temp_update_loop(DbName, Group2, get_notify_pids(10000)). - - -reset_group(#group{views=Views}=Group) -> - Views2 = [View#view{btree=nil} || View <- Views], - Group#group{db=nil,fd=nil,query_server=nil,current_seq=0, - id_btree=nil,views=Views2}. - -start_update_loop(RootDir, DbName, GroupId) -> - % wait for a notify request before doing anything. This way, we can just - % exit and any exits will be noticed by the callers. - start_update_loop(RootDir, DbName, GroupId, get_notify_pids(1000)). - -start_update_loop(RootDir, DbName, GroupId, NotifyPids) -> - {Db, Group} = - case (catch couch_db:open(DbName, [])) of - {ok, Db0} -> - case (catch couch_db:open_doc(Db0, GroupId)) of - {ok, Doc} -> - {Db0, design_doc_to_view_group(Doc)}; - Else -> - delete_index_file(RootDir, DbName, GroupId), - exit(Else) - end; - Else -> - delete_index_file(RootDir, DbName, GroupId), - exit(Else) - end, - FileName = RootDir ++ "/." ++ binary_to_list(DbName) ++ - binary_to_list(GroupId) ++".view", - Group2 = - case couch_file:open(FileName) of - {ok, Fd} -> - Sig = Group#group.sig, - case (catch couch_file:read_header(Fd, <<$r, $c, $k, 0>>)) of - {ok, {Sig, #index_header{purge_seq=PurgeSeq}=HeaderInfo}} -> - % sigs match! - DbPurgeSeq = couch_db:get_purge_seq(Db), - case (PurgeSeq == DbPurgeSeq) or ((PurgeSeq + 1) == DbPurgeSeq) of - true -> - % We can only use index with the same, or next purge seq as the - % db. - init_group(Db, Fd, Group, HeaderInfo); - false -> - reset_file(Db, Fd, DbName, Group) - end; - _ -> - reset_file(Db, Fd, DbName, Group) - end; - {error, enoent} -> - case couch_file:open(FileName, [create]) of - {ok, Fd} -> reset_file(Db, Fd, DbName, Group); - Error -> throw(Error) - end - end, - - couch_db:monitor(Db), - couch_db:close(Db), - update_loop(RootDir, DbName, GroupId, Group2, NotifyPids). - -reset_file(Db, Fd, DbName, #group{sig=Sig,name=Name} = Group) -> - ?LOG_DEBUG("Reseting group index \"~s\" in db ~s", [Name, DbName]), - ok = couch_file:truncate(Fd, 0), - ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, {Sig, nil}), - init_group(Db, Fd, reset_group(Group), nil). - -update_loop(RootDir, DbName, GroupId, #group{sig=Sig,fd=Fd}=Group, NotifyPids) -> - {ok, Db}= couch_db:open(DbName, []), - Result = - try - update_group(Group#group{db=Db}) - catch - throw: restart -> restart - after - couch_db:close(Db) - end, - case Result of - {same, Group2} -> - [Pid ! {self(), {ok, Group2}} || Pid <- NotifyPids], - update_loop(RootDir, DbName, GroupId, Group2, get_notify_pids(100000)); - {updated, Group2} -> - HeaderData = {Sig, get_index_header_data(Group2)}, - ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, HeaderData), - [Pid ! {self(), {ok, Group2}} || Pid <- NotifyPids], - garbage_collect(), - update_loop(RootDir, DbName, GroupId, Group2, get_notify_pids(100000)); - restart -> - couch_file:close(Group#group.fd), - start_update_loop(RootDir, DbName, GroupId, NotifyPids ++ get_notify_pids()) - end. -% wait for the first request to come in. -get_notify_pids(Wait) -> - receive - {Pid, get_updated} -> - [Pid | get_notify_pids()]; - {'DOWN', _MonitorRef, _Type, _Pid, _Info} -> - ?LOG_DEBUG("View monitor received parent db shutdown notification. Shutting down instance.", []), - exit(db_shutdown); - Else -> - ?LOG_ERROR("Unexpected message in view updater: ~p", [Else]), - exit({error, Else}) - after Wait -> - exit(wait_timeout) - end. -% then keep getting all available and return. -get_notify_pids() -> - receive - {Pid, get_updated} -> - [Pid | get_notify_pids()] - after 0 -> - [] - end. - -purge(#group{db=Db, views=Views, id_btree=IdBtree}=Group) -> - {ok, PurgedIdsRevs} = couch_db:get_last_purged(Db), - Ids = [Id || {Id, _Revs} <- PurgedIdsRevs], - {ok, Lookups, IdBtree2} = couch_btree:query_modify(IdBtree, Ids, [], Ids), - - % now populate the dictionary with all the keys to delete - ViewKeysToRemoveDict = lists:foldl( - fun({ok,{DocId,ViewNumRowKeys}}, ViewDictAcc) -> - lists:foldl( - fun({ViewNum, RowKey}, ViewDictAcc2) -> - dict:append(ViewNum, {RowKey, DocId}, ViewDictAcc2) - end, ViewDictAcc, ViewNumRowKeys); - ({not_found, _}, ViewDictAcc) -> - ViewDictAcc - end, dict:new(), Lookups), - - % Now remove the values from the btrees - Views2 = lists:map( - fun(#view{id_num=Num,btree=Btree}=View) -> - case dict:find(Num, ViewKeysToRemoveDict) of - {ok, RemoveKeys} -> - {ok, Btree2} = couch_btree:add_remove(Btree, [], RemoveKeys), - View#view{btree=Btree2}; - error -> % no keys to remove in this view - View - end - end, Views), - Group#group{id_btree=IdBtree2, - views=Views2, - purge_seq=couch_db:get_purge_seq(Db)}. - - -update_group(#group{db=Db,current_seq=CurrentSeq, - purge_seq=GroupPurgeSeq}=Group) -> - ViewEmptyKVs = [{View, []} || View <- Group#group.views], - % compute on all docs modified since we last computed. - DbPurgeSeq = couch_db:get_purge_seq(Db), - Group2 = - case DbPurgeSeq of - GroupPurgeSeq -> - Group; - DbPurgeSeq when GroupPurgeSeq + 1 == DbPurgeSeq -> - purge(Group); - _ -> - throw(restart) - end, - {ok, {UncomputedDocs, Group3, ViewKVsToAdd, DocIdViewIdKeys, NewSeq}} - = couch_db:enum_docs_since( - Db, - CurrentSeq, - fun(DocInfo, _, Acc) -> process_doc(Db, DocInfo, Acc) end, - {[], Group2, ViewEmptyKVs, [], CurrentSeq} - ), - {Group4, Results} = view_compute(Group3, UncomputedDocs), - {ViewKVsToAdd2, DocIdViewIdKeys2} = view_insert_query_results(UncomputedDocs, Results, ViewKVsToAdd, DocIdViewIdKeys), - couch_query_servers:stop_doc_map(Group4#group.query_server), - if CurrentSeq /= NewSeq -> - {ok, Group5} = write_changes(Group4, ViewKVsToAdd2, DocIdViewIdKeys2, NewSeq), - {updated, Group5#group{query_server=nil}}; - true -> - {same, Group4#group{query_server=nil}} - end. - delete_index_dir(RootDir, DbName) -> nuke_dir(RootDir ++ "/." ++ binary_to_list(DbName) ++ "_design"). @@ -583,50 +317,6 @@ nuke_dir(Dir) -> ok = file:del_dir(Dir) end. -delete_index_file(RootDir, DbName, GroupId) -> - file:delete(RootDir ++ "/." ++ binary_to_list(DbName) - ++ binary_to_list(GroupId) ++ ".view"). - -init_group(Db, Fd, #group{views=Views}=Group, nil) -> - init_group(Db, Fd, Group, - #index_header{seq=0, purge_seq=couch_db:get_purge_seq(Db), - id_btree_state=nil, view_states=[nil || _ <- Views]}); -init_group(Db, Fd, #group{def_lang=Lang,views=Views}=Group, IndexHeader) -> - #index_header{seq=Seq, purge_seq=PurgeSeq, - id_btree_state=IdBtreeState, view_states=ViewStates} = IndexHeader, - {ok, IdBtree} = couch_btree:open(IdBtreeState, Fd), - Views2 = lists:zipwith( - fun(BtreeState, #view{reduce_funs=RedFuns}=View) -> - FunSrcs = [FunSrc || {_Name, FunSrc} <- RedFuns], - ReduceFun = - fun(reduce, KVs) -> - KVs2 = expand_dups(KVs,[]), - KVs3 = detuple_kvs(KVs2,[]), - {ok, Reduced} = couch_query_servers:reduce(Lang, FunSrcs, KVs3), - {length(KVs3), Reduced}; - (rereduce, Reds) -> - Count = lists:sum([Count0 || {Count0, _} <- Reds]), - UserReds = [UserRedsList || {_, UserRedsList} <- Reds], - {ok, Reduced} = couch_query_servers:rereduce(Lang, FunSrcs, UserReds), - {Count, Reduced} - end, - {ok, Btree} = couch_btree:open(BtreeState, Fd, - [{less, fun less_json_keys/2},{reduce, ReduceFun}]), - View#view{btree=Btree} - end, - ViewStates, Views), - Group#group{db=Db, fd=Fd, current_seq=Seq, purge_seq=PurgeSeq, - id_btree=IdBtree, views=Views2}. - - -get_index_header_data(#group{current_seq=Seq, purge_seq=PurgeSeq, - id_btree=IdBtree,views=Views}) -> - ViewStates = [couch_btree:get_state(Btree) || #view{btree=Btree} <- Views], - #index_header{seq=Seq, - purge_seq=PurgeSeq, - id_btree_state=couch_btree:get_state(IdBtree), - view_states=ViewStates}. - % keys come back in the language of btree - tuples. less_json_keys(A, B) -> less_json(tuple_to_list(A), tuple_to_list(B)). @@ -703,129 +393,4 @@ less_list([A|RestA], [B|RestB]) -> end end. -process_doc(Db, DocInfo, {Docs, #group{sig=Sig,name=GroupId}=Group, ViewKVs, DocIdViewIdKeys, _LastSeq}) -> - % This fun computes once for each document - #doc_info{id=DocId, update_seq=Seq, deleted=Deleted} = DocInfo, - case DocId of - GroupId -> - % uh oh. this is the design doc with our definitions. See if - % anything in the definition changed. - case couch_db:open_doc(Db, DocInfo) of - {ok, Doc} -> - case design_doc_to_view_group(Doc) of - #group{sig=Sig} -> - % The same md5 signature, keep on computing - {ok, {Docs, Group, ViewKVs, DocIdViewIdKeys, Seq}}; - _ -> - throw(restart) - end; - {not_found, deleted} -> - throw(restart) - end; - <> -> % we skip design docs - {ok, {Docs, Group, ViewKVs, DocIdViewIdKeys, Seq}}; - _ -> - {Docs2, DocIdViewIdKeys2} = - if Deleted -> - {Docs, [{DocId, []} | DocIdViewIdKeys]}; - true -> - {ok, Doc} = couch_db:open_doc(Db, DocInfo, [conflicts, deleted_conflicts]), - {[Doc | Docs], DocIdViewIdKeys} - end, - case couch_util:should_flush() of - true -> - {Group1, Results} = view_compute(Group, Docs2), - {ViewKVs3, DocIdViewIdKeys3} = view_insert_query_results(Docs2, Results, ViewKVs, DocIdViewIdKeys2), - {ok, Group2} = write_changes(Group1, ViewKVs3, DocIdViewIdKeys3, Seq), - garbage_collect(), - ViewEmptyKeyValues = [{View, []} || View <- Group2#group.views], - {ok, {[], Group2, ViewEmptyKeyValues, [], Seq}}; - false -> - {ok, {Docs2, Group, ViewKVs, DocIdViewIdKeys2, Seq}} - end - end. -view_insert_query_results([], [], ViewKVs, DocIdViewIdKeysAcc) -> - {ViewKVs, DocIdViewIdKeysAcc}; -view_insert_query_results([Doc|RestDocs], [QueryResults | RestResults], ViewKVs, DocIdViewIdKeysAcc) -> - {NewViewKVs, NewViewIdKeys} = view_insert_doc_query_results(Doc, QueryResults, ViewKVs, [], []), - NewDocIdViewIdKeys = [{Doc#doc.id, NewViewIdKeys} | DocIdViewIdKeysAcc], - view_insert_query_results(RestDocs, RestResults, NewViewKVs, NewDocIdViewIdKeys). - - -view_insert_doc_query_results(_Doc, [], [], ViewKVsAcc, ViewIdKeysAcc) -> - {lists:reverse(ViewKVsAcc), lists:reverse(ViewIdKeysAcc)}; -view_insert_doc_query_results(#doc{id=DocId}=Doc, [ResultKVs|RestResults], [{View, KVs}|RestViewKVs], ViewKVsAcc, ViewIdKeysAcc) -> - % Take any identical keys and combine the values - ResultKVs2 = lists:foldl( - fun({Key,Value}, [{PrevKey,PrevVal}|AccRest]) -> - case Key == PrevKey of - true -> - case PrevVal of - {dups, Dups} -> - [{PrevKey, {dups, [Value|Dups]}} | AccRest]; - _ -> - [{PrevKey, {dups, [Value,PrevVal]}} | AccRest] - end; - false -> - [{Key,Value},{PrevKey,PrevVal}|AccRest] - end; - (KV, []) -> - [KV] - end, [], lists:sort(ResultKVs)), - NewKVs = [{{Key, DocId}, Value} || {Key, Value} <- ResultKVs2], - NewViewKVsAcc = [{View, NewKVs ++ KVs} | ViewKVsAcc], - NewViewIdKeys = [{View#view.id_num, Key} || {Key, _Value} <- ResultKVs2], - NewViewIdKeysAcc = NewViewIdKeys ++ ViewIdKeysAcc, - view_insert_doc_query_results(Doc, RestResults, RestViewKVs, NewViewKVsAcc, NewViewIdKeysAcc). - -view_compute(Group, []) -> - {Group, []}; -view_compute(#group{def_lang=DefLang, query_server=QueryServerIn}=Group, Docs) -> - {ok, QueryServer} = - case QueryServerIn of - nil -> % doc map not started - Definitions = [View#view.def || View <- Group#group.views], - couch_query_servers:start_doc_map(DefLang, Definitions); - _ -> - {ok, QueryServerIn} - end, - {ok, Results} = couch_query_servers:map_docs(QueryServer, Docs), - {Group#group{query_server=QueryServer}, Results}. - - - -write_changes(Group, ViewKeyValuesToAdd, DocIdViewIdKeys, NewSeq) -> - #group{id_btree=IdBtree} = Group, - - AddDocIdViewIdKeys = [{DocId, ViewIdKeys} || {DocId, ViewIdKeys} <- DocIdViewIdKeys, ViewIdKeys /= []], - RemoveDocIds = [DocId || {DocId, ViewIdKeys} <- DocIdViewIdKeys, ViewIdKeys == []], - LookupDocIds = [DocId || {DocId, _ViewIdKeys} <- DocIdViewIdKeys], - {ok, LookupResults, IdBtree2} - = couch_btree:query_modify(IdBtree, LookupDocIds, AddDocIdViewIdKeys, RemoveDocIds), - KeysToRemoveByView = lists:foldl( - fun(LookupResult, KeysToRemoveByViewAcc) -> - case LookupResult of - {ok, {DocId, ViewIdKeys}} -> - lists:foldl( - fun({ViewId, Key}, KeysToRemoveByViewAcc2) -> - dict:append(ViewId, {Key, DocId}, KeysToRemoveByViewAcc2) - end, - KeysToRemoveByViewAcc, ViewIdKeys); - {not_found, _} -> - KeysToRemoveByViewAcc - end - end, - dict:new(), LookupResults), - - Views2 = [ - begin - KeysToRemove = couch_util:dict_find(View#view.id_num, KeysToRemoveByView, []), - {ok, ViewBtree2} = couch_btree:add_remove(View#view.btree, AddKeyValues, KeysToRemove), - View#view{btree = ViewBtree2} - end - || - {View, AddKeyValues} <- ViewKeyValuesToAdd - ], - Group2 = Group#group{views=Views2, current_seq=NewSeq, id_btree=IdBtree2}, - {ok, Group2}. diff --git a/src/couchdb/couch_view_group.erl b/src/couchdb/couch_view_group.erl new file mode 100644 index 00000000..84547095 --- /dev/null +++ b/src/couchdb/couch_view_group.erl @@ -0,0 +1,192 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_view_group). +-behaviour(gen_server). + +%% API +-export([start_link/1, request_group/2]). +% -export([design_doc_to_view_group/1]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-include("couch_db.hrl"). + +-record(group_state, { + spawn_fun, + target_seq=0, + group_seq=0, + group=nil, + updater_pid=nil, + waiting_list=[] +}). + +% api methods +request_group(Pid, Seq) -> + ?LOG_DEBUG("request_group {Pid, Seq} ~p", [{Pid, Seq}]), + case gen_server:call(Pid, {request_group, Seq}, infinity) of + {ok, Group} -> + ?LOG_DEBUG("get_updated_group replied with group", []), + {ok, Group}; + Else -> + ?LOG_DEBUG("get_updated_group replied with _Else ~p", [Else]), + Else + end. + + +% from template +start_link(InitArgs) -> + gen_server:start_link(couch_view_group, InitArgs, []). + +% init differentiates between temp and design_doc views. It creates a closure +% which spawns the appropriate view_updater. (It might also spawn the first +% view_updater run.) +init(InitArgs) -> + SpawnFun = fun() -> spawn_updater(InitArgs) end, + process_flag(trap_exit, true), + {ok, #group_state{spawn_fun=SpawnFun}}. + +% There are two sources of messages: couch_view, which requests an up to date +% view group, and the couch_view_updater, which when spawned, updates the +% group and sends it back here. We employ a caching mechanism, so that between +% database writes, we don't have to spawn a couch_view_updater with every view +% request. This should give us more control, and the ability to request view +% statuses eventually. + +% The caching mechanism: each request is submitted with a seq_id for the +% database at the time it was read. We guarantee to return a view from that +% sequence or newer. + +% If the request sequence is higher than our current high_target seq, we set +% that as the highest seqence. If the updater is not running, we launch it. + +handle_call({request_group, RequestSeq}, From, + #group_state{ + target_seq=TargetSeq, + spawn_fun=SpawnFun, + updater_pid=Up, + waiting_list=WaitList + }=State) when RequestSeq > TargetSeq, Up == nil -> + UpdaterPid = SpawnFun(), + {noreply, State#group_state{ + target_seq=RequestSeq, + updater_pid=UpdaterPid, + waiting_list=[{From,RequestSeq}|WaitList] + }, infinity}; + +handle_call({request_group, RequestSeq}, From, + #group_state{ + target_seq=TargetSeq, + waiting_list=WaitList + }=State) when RequestSeq > TargetSeq -> + {noreply, State#group_state{ + target_seq=RequestSeq, + waiting_list=[{From,RequestSeq}|WaitList] + }, infinity}; + + +% If the request seqence is less than or equal to the seq_id of a known Group, +% we respond with that Group. +handle_call({request_group, RequestSeq}, _From, + State=#group_state{ + group_seq=GroupSeq, + group=Group + }) when RequestSeq =< GroupSeq -> + {reply, {ok, Group}, State}; + +% Otherwise: TargetSeq => RequestSeq > GroupSeq +% We've already initiated the appropriate action, so just hold the response until the group is up to the RequestSeq +handle_call({request_group, RequestSeq}, From, + #group_state{ + waiting_list=WaitList + }=State) -> + {noreply, State#group_state{ + waiting_list=[{From, RequestSeq}|WaitList] + }, infinity}. + + +% When the updater finishes, it will return a group with a seq_id, we should +% store that group and seq_id in our state. If our high_target is higher than +% the returned group, start a new updater. + +handle_cast({new_group, Group=#group{current_seq=NewGroupSeq}}, + State=#group_state{ + target_seq=TargetSeq, + waiting_list=WaitList, + spawn_fun=SpawnFun}) when TargetSeq > NewGroupSeq -> + StillWaiting = reply_with_group(Group, WaitList, []), + UpdaterPid = SpawnFun(), + {noreply, State#group_state{ + updater_pid=UpdaterPid, + waiting_list=StillWaiting, + group_seq=NewGroupSeq, + group=Group}}; + +handle_cast({new_group, Group=#group{current_seq=NewGroupSeq}}, + State=#group_state{waiting_list=WaitList}) -> + StillWaiting = reply_with_group(Group, WaitList, []), + {noreply, State#group_state{ + updater_pid=nil, + waiting_list=StillWaiting, + group_seq=NewGroupSeq, + group=Group}}. + +handle_info({'EXIT', _FromPid, normal}, State) -> + {noreply, State}; + +handle_info({'EXIT', FromPid, Reason}, State) -> + ?LOG_DEBUG("Exit from updater: ~p", [{FromPid, Reason}]), + {stop, Reason, State}; + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(Reason, _State=#group_state{waiting_list=WaitList}) -> + lists:foreach(fun({Waiter, _}) -> gen_server:reply(Waiter, {error, Reason}) end, WaitList), + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +% error handling? the updater could die on us, we can save ourselves here. +% but we shouldn't, we could be dead for a reason, like the view got changed, or something. + + +%% Local Functions + +% reply_with_group/3 +% for each item in the WaitingList {Pid, Seq} +% if the Seq is =< GroupSeq, reply +reply_with_group(Group=#group{current_seq=GroupSeq}, [{Pid, Seq}|WaitList], StillWaiting) when Seq =< GroupSeq -> + gen_server:reply(Pid, {ok, Group}), + reply_with_group(Group, WaitList, StillWaiting); + +% else +% put it in the continuing waiting list +reply_with_group(Group, [{Pid, Seq}|WaitList], StillWaiting) -> + reply_with_group(Group, WaitList, [{Pid, Seq}|StillWaiting]); + +% return the still waiting list +reply_with_group(_Group, [], StillWaiting) -> + StillWaiting. + +spawn_updater({RootDir, DbName, GroupId}) -> + spawn_link(couch_view_updater, update, + [RootDir, DbName, GroupId, self()]); + +spawn_updater({DbName, Fd, Lang, MapSrc, RedSrc}) -> + spawn_link(couch_view_updater, temp_update, + [DbName, Fd, Lang, MapSrc, RedSrc, self()]). + + diff --git a/src/couchdb/couch_view_updater.erl b/src/couchdb/couch_view_updater.erl new file mode 100644 index 00000000..7f40af3b --- /dev/null +++ b/src/couchdb/couch_view_updater.erl @@ -0,0 +1,384 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_view_updater). + +-export([update/4, temp_update/6]). + +-include("couch_db.hrl"). + + + +update(RootDir, DbName, GroupId, NotifyPid) -> + {ok, #group{sig=Sig,fd=Fd}=Group} = prepare_group(RootDir, DbName, GroupId), + {ok, Db} = couch_db:open(DbName, []), + Result = update_group(Group#group{db=Db}), + ?LOG_DEBUG("update {Result} DONE ~p", [{Result}]), + couch_db:close(Db), + case Result of + {same, Group2} -> + gen_server:cast(NotifyPid, {new_group, Group2}); + {updated, Group2} -> + HeaderData = {Sig, get_index_header_data(Group2)}, + ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, HeaderData), + gen_server:cast(NotifyPid, {new_group, Group2}) + end, + garbage_collect(). + +temp_update(DbName, Fd, Lang, MapSrc, RedSrc, NotifyPid) -> + case couch_db:open(DbName, []) of + {ok, Db} -> + View = #view{map_names=["_temp"], + id_num=0, + btree=nil, + def=MapSrc, + reduce_funs= if RedSrc==[] -> []; true -> [{"_temp", RedSrc}] end}, + Group = #group{name="_temp", + db=Db, + views=[View], + current_seq=0, + def_lang=Lang, + id_btree=nil}, + Group2 = init_group(Db, Fd, Group,nil), + couch_db:monitor(Db), + {_Updated, Group3} = update_group(Group2#group{db=Db}), + couch_db:close(Db), + gen_server:cast(NotifyPid, {new_group, Group3}), + garbage_collect(); + Else -> + exit(Else) + end. + + +update_group(#group{db=Db,current_seq=CurrentSeq}=Group) -> + ViewEmptyKVs = [{View, []} || View <- Group#group.views], + % compute on all docs modified since we last computed. + {ok, {UncomputedDocs, Group3, ViewKVsToAdd, DocIdViewIdKeys}} + = couch_db:enum_docs_since( + Db, + CurrentSeq, + fun(DocInfo, _, Acc) -> process_doc(Db, DocInfo, Acc) end, + {[], Group, ViewEmptyKVs, []} + ), + {Group4, Results} = view_compute(Group3, UncomputedDocs), + {ViewKVsToAdd2, DocIdViewIdKeys2} = view_insert_query_results(UncomputedDocs, Results, ViewKVsToAdd, DocIdViewIdKeys), + couch_query_servers:stop_doc_map(Group4#group.query_server), + NewSeq = couch_db:get_update_seq(Db), + if CurrentSeq /= NewSeq -> + {ok, Group5} = write_changes(Group4, ViewKVsToAdd2, DocIdViewIdKeys2, NewSeq), + {updated, Group5#group{query_server=nil}}; + true -> + {same, Group4#group{query_server=nil}} + end. + + +get_index_header_data(#group{current_seq=Seq, purge_seq=PurgeSeq, + id_btree=IdBtree,views=Views}) -> + ViewStates = [couch_btree:get_state(Btree) || #view{btree=Btree} <- Views], + #index_header{seq=Seq, + purge_seq=PurgeSeq, + id_btree_state=couch_btree:get_state(IdBtree), + view_states=ViewStates}. + + +purge_index(#group{db=Db, views=Views, id_btree=IdBtree}=Group) -> + {ok, PurgedIdsRevs} = couch_db:get_last_purged(Db), + Ids = [Id || {Id, _Revs} <- PurgedIdsRevs], + {ok, Lookups, IdBtree2} = couch_btree:query_modify(IdBtree, Ids, [], Ids), + + % now populate the dictionary with all the keys to delete + ViewKeysToRemoveDict = lists:foldl( + fun({ok,{DocId,ViewNumRowKeys}}, ViewDictAcc) -> + lists:foldl( + fun({ViewNum, RowKey}, ViewDictAcc2) -> + dict:append(ViewNum, {RowKey, DocId}, ViewDictAcc2) + end, ViewDictAcc, ViewNumRowKeys); + ({not_found, _}, ViewDictAcc) -> + ViewDictAcc + end, dict:new(), Lookups), + + % Now remove the values from the btrees + Views2 = lists:map( + fun(#view{id_num=Num,btree=Btree}=View) -> + case dict:find(Num, ViewKeysToRemoveDict) of + {ok, RemoveKeys} -> + {ok, Btree2} = couch_btree:add_remove(Btree, [], RemoveKeys), + View#view{btree=Btree2}; + error -> % no keys to remove in this view + View + end + end, Views), + Group#group{id_btree=IdBtree2, + views=Views2, + purge_seq=couch_db:get_purge_seq(Db)}. + +process_doc(Db, DocInfo, {Docs, #group{sig=Sig,name=GroupId}=Group, ViewKVs, DocIdViewIdKeys}) -> + % This fun computes once for each document + #doc_info{id=DocId, deleted=Deleted} = DocInfo, + case DocId of + GroupId -> + % uh oh. this is the design doc with our definitions. See if + % anything in the definition changed. + case couch_db:open_doc(Db, DocInfo) of + {ok, Doc} -> + case design_doc_to_view_group(Doc) of + #group{sig=Sig} -> + % The same md5 signature, keep on computing + {ok, {Docs, Group, ViewKVs, DocIdViewIdKeys}}; + _ -> + ?LOG_DEBUG("throw(restart) md5 broke ~p", [DocId]), + throw(restart) + end; + {not_found, deleted} -> + ?LOG_DEBUG("throw(restart) {not_found, deleted} ~p", [DocId]), + throw(restart) + end; + <> -> % we skip design docs + {ok, {Docs, Group, ViewKVs, DocIdViewIdKeys}}; + _ -> + {Docs2, DocIdViewIdKeys2} = + if Deleted -> + {Docs, [{DocId, []} | DocIdViewIdKeys]}; + true -> + {ok, Doc} = couch_db:open_doc(Db, DocInfo, [conflicts, deleted_conflicts]), + {[Doc | Docs], DocIdViewIdKeys} + end, + + case couch_util:should_flush() of + true -> + {Group1, Results} = view_compute(Group, Docs2), + {ViewKVs3, DocIdViewIdKeys3} = view_insert_query_results(Docs2, Results, ViewKVs, DocIdViewIdKeys2), + {ok, Group2} = write_changes(Group1, ViewKVs3, DocIdViewIdKeys3, + DocInfo#doc_info.update_seq), + garbage_collect(), + ViewEmptyKeyValues = [{View, []} || View <- Group2#group.views], + {ok, {[], Group2, ViewEmptyKeyValues, []}}; + false -> + {ok, {Docs2, Group, ViewKVs, DocIdViewIdKeys2}} + end + end. + +view_insert_query_results([], [], ViewKVs, DocIdViewIdKeysAcc) -> + {ViewKVs, DocIdViewIdKeysAcc}; +view_insert_query_results([Doc|RestDocs], [QueryResults | RestResults], ViewKVs, DocIdViewIdKeysAcc) -> + {NewViewKVs, NewViewIdKeys} = view_insert_doc_query_results(Doc, QueryResults, ViewKVs, [], []), + NewDocIdViewIdKeys = [{Doc#doc.id, NewViewIdKeys} | DocIdViewIdKeysAcc], + view_insert_query_results(RestDocs, RestResults, NewViewKVs, NewDocIdViewIdKeys). + + +view_insert_doc_query_results(_Doc, [], [], ViewKVsAcc, ViewIdKeysAcc) -> + {lists:reverse(ViewKVsAcc), lists:reverse(ViewIdKeysAcc)}; +view_insert_doc_query_results(#doc{id=DocId}=Doc, [ResultKVs|RestResults], [{View, KVs}|RestViewKVs], ViewKVsAcc, ViewIdKeysAcc) -> + % Take any identical keys and combine the values + ResultKVs2 = lists:foldl( + fun({Key,Value}, [{PrevKey,PrevVal}|AccRest]) -> + case Key == PrevKey of + true -> + case PrevVal of + {dups, Dups} -> + [{PrevKey, {dups, [Value|Dups]}} | AccRest]; + _ -> + [{PrevKey, {dups, [Value,PrevVal]}} | AccRest] + end; + false -> + [{Key,Value},{PrevKey,PrevVal}|AccRest] + end; + (KV, []) -> + [KV] + end, [], lists:sort(ResultKVs)), + NewKVs = [{{Key, DocId}, Value} || {Key, Value} <- ResultKVs2], + NewViewKVsAcc = [{View, NewKVs ++ KVs} | ViewKVsAcc], + NewViewIdKeys = [{View#view.id_num, Key} || {Key, _Value} <- ResultKVs2], + NewViewIdKeysAcc = NewViewIdKeys ++ ViewIdKeysAcc, + view_insert_doc_query_results(Doc, RestResults, RestViewKVs, NewViewKVsAcc, NewViewIdKeysAcc). + +view_compute(Group, []) -> + {Group, []}; +view_compute(#group{def_lang=DefLang, query_server=QueryServerIn}=Group, Docs) -> + {ok, QueryServer} = + case QueryServerIn of + nil -> % doc map not started + Definitions = [View#view.def || View <- Group#group.views], + couch_query_servers:start_doc_map(DefLang, Definitions); + _ -> + {ok, QueryServerIn} + end, + {ok, Results} = couch_query_servers:map_docs(QueryServer, Docs), + {Group#group{query_server=QueryServer}, Results}. + + + +write_changes(Group, ViewKeyValuesToAdd, DocIdViewIdKeys, NewSeq) -> + #group{id_btree=IdBtree} = Group, + + AddDocIdViewIdKeys = [{DocId, ViewIdKeys} || {DocId, ViewIdKeys} <- DocIdViewIdKeys, ViewIdKeys /= []], + RemoveDocIds = [DocId || {DocId, ViewIdKeys} <- DocIdViewIdKeys, ViewIdKeys == []], + LookupDocIds = [DocId || {DocId, _ViewIdKeys} <- DocIdViewIdKeys], + {ok, LookupResults, IdBtree2} + = couch_btree:query_modify(IdBtree, LookupDocIds, AddDocIdViewIdKeys, RemoveDocIds), + KeysToRemoveByView = lists:foldl( + fun(LookupResult, KeysToRemoveByViewAcc) -> + case LookupResult of + {ok, {DocId, ViewIdKeys}} -> + lists:foldl( + fun({ViewId, Key}, KeysToRemoveByViewAcc2) -> + dict:append(ViewId, {Key, DocId}, KeysToRemoveByViewAcc2) + end, + KeysToRemoveByViewAcc, ViewIdKeys); + {not_found, _} -> + KeysToRemoveByViewAcc + end + end, + dict:new(), LookupResults), + + Views2 = [ + begin + KeysToRemove = couch_util:dict_find(View#view.id_num, KeysToRemoveByView, []), + {ok, ViewBtree2} = couch_btree:add_remove(View#view.btree, AddKeyValues, KeysToRemove), + View#view{btree = ViewBtree2} + end + || + {View, AddKeyValues} <- ViewKeyValuesToAdd + ], + Group2 = Group#group{views=Views2, current_seq=NewSeq, id_btree=IdBtree2}, + {ok, Group2}. + +prepare_group(RootDir, DbName, GroupId) -> + {Db, Group} = case (catch couch_db:open(DbName, [])) of + {ok, Db0} -> + case (catch couch_db:open_doc(Db0, GroupId)) of + {ok, Doc} -> + {Db0, design_doc_to_view_group(Doc)}; + Else -> + delete_index_file(RootDir, DbName, GroupId), + ?LOG_DEBUG("prepare_group exit Else ~p self() ~p", [Else, self()]), + exit(Else) + end; + Else -> + delete_index_file(RootDir, DbName, GroupId), + exit(Else) + end, + FileName = RootDir ++ "/." ++ binary_to_list(DbName) ++ + binary_to_list(GroupId) ++".view", + Group2 = + case couch_file:open(FileName) of + {ok, Fd} -> + Sig = Group#group.sig, + case (catch couch_file:read_header(Fd, <<$r, $c, $k, 0>>)) of + {ok, {Sig, #index_header{purge_seq=PurgeSeq}=HeaderInfo}} -> + % sigs match! + DbPurgeSeq = couch_db:get_purge_seq(Db), + % We can only use index with the same, or next purge seq as the db. + if DbPurgeSeq == PurgeSeq -> + init_group(Db, Fd, Group, HeaderInfo); + DbPurgeSeq == PurgeSeq + 1 -> + ?LOG_DEBUG("Purging entries from view index.",[]), + purge_index(init_group(Db, Fd, Group, HeaderInfo)); + true -> + ?LOG_DEBUG("Reseting view index due to lost purge entries.",[]), + reset_file(Db, Fd, DbName, Group) + end; + _ -> + reset_file(Db, Fd, DbName, Group) + end; + {error, enoent} -> + case couch_file:open(FileName, [create]) of + {ok, Fd} -> reset_file(Db, Fd, DbName, Group); + Error -> throw(Error) + end + end, + + couch_db:monitor(Db), + couch_db:close(Db), + {ok, Group2}. + +% maybe move to another module +design_doc_to_view_group(#doc{id=Id,body={Fields}}) -> + Language = proplists:get_value(<<"language">>, Fields, <<"javascript">>), + {RawViews} = proplists:get_value(<<"views">>, Fields, {[]}), + + % add the views to a dictionary object, with the map source as the key + DictBySrc = + lists:foldl( + fun({Name, {MRFuns}}, DictBySrcAcc) -> + MapSrc = proplists:get_value(<<"map">>, MRFuns), + RedSrc = proplists:get_value(<<"reduce">>, MRFuns, null), + View = + case dict:find(MapSrc, DictBySrcAcc) of + {ok, View0} -> View0; + error -> #view{def=MapSrc} % create new view object + end, + View2 = + if RedSrc == null -> + View#view{map_names=[Name|View#view.map_names]}; + true -> + View#view{reduce_funs=[{Name,RedSrc}|View#view.reduce_funs]} + end, + dict:store(MapSrc, View2, DictBySrcAcc) + end, dict:new(), RawViews), + % number the views + {Views, _N} = lists:mapfoldl( + fun({_Src, View}, N) -> + {View#view{id_num=N},N+1} + end, 0, dict:to_list(DictBySrc)), + + Group = #group{name=Id, views=Views, def_lang=Language}, + Group#group{sig=erlang:md5(term_to_binary(Group))}. + +reset_group(#group{views=Views}=Group) -> + Views2 = [View#view{btree=nil} || View <- Views], + Group#group{db=nil,fd=nil,query_server=nil,current_seq=0, + id_btree=nil,views=Views2}. + +reset_file(Db, Fd, DbName, #group{sig=Sig,name=Name} = Group) -> + ?LOG_DEBUG("Reseting group index \"~s\" in db ~s", [Name, DbName]), + ok = couch_file:truncate(Fd, 0), + ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, {Sig, nil}), + init_group(Db, Fd, reset_group(Group), nil). + +delete_index_file(RootDir, DbName, GroupId) -> + file:delete(RootDir ++ "/." ++ binary_to_list(DbName) + ++ binary_to_list(GroupId) ++ ".view"). + +init_group(Db, Fd, #group{views=Views}=Group, nil) -> + init_group(Db, Fd, Group, + #index_header{seq=0, purge_seq=couch_db:get_purge_seq(Db), + id_btree_state=nil, view_states=[nil || _ <- Views]}); +init_group(Db, Fd, #group{def_lang=Lang,views=Views}=Group, IndexHeader) -> + #index_header{seq=Seq, purge_seq=PurgeSeq, + id_btree_state=IdBtreeState, view_states=ViewStates} = IndexHeader, + {ok, IdBtree} = couch_btree:open(IdBtreeState, Fd), + Views2 = lists:zipwith( + fun(BtreeState, #view{reduce_funs=RedFuns}=View) -> + FunSrcs = [FunSrc || {_Name, FunSrc} <- RedFuns], + ReduceFun = + fun(reduce, KVs) -> + KVs2 = couch_view:expand_dups(KVs,[]), + KVs3 = couch_view:detuple_kvs(KVs2,[]), + {ok, Reduced} = couch_query_servers:reduce(Lang, FunSrcs, + KVs3), + {length(KVs3), Reduced}; + (rereduce, Reds) -> + Count = lists:sum([Count0 || {Count0, _} <- Reds]), + UserReds = [UserRedsList || {_, UserRedsList} <- Reds], + {ok, Reduced} = couch_query_servers:rereduce(Lang, FunSrcs, + UserReds), + {Count, Reduced} + end, + {ok, Btree} = couch_btree:open(BtreeState, Fd, + [{less, fun couch_view:less_json_keys/2}, + {reduce, ReduceFun}]), + View#view{btree=Btree} + end, + ViewStates, Views), + Group#group{db=Db, fd=Fd, current_seq=Seq, purge_seq=PurgeSeq, + id_btree=IdBtree, views=Views2}. \ No newline at end of file -- cgit v1.2.3