diff options
| author | John Christopher Anderson <jchris@apache.org> | 2008-12-10 01:13:17 +0000 | 
|---|---|---|
| committer | John Christopher Anderson <jchris@apache.org> | 2008-12-10 01:13:17 +0000 | 
| commit | 5a9321814a727e8c010bf83f05572a341d55f26a (patch) | |
| tree | 91b6233a3d81f9b29a34d9653fffbde284cdfa4b | |
| parent | 6bacde0d941d209f41ad3ca8921e3a596a056c06 (diff) | |
view group state gen_server. thanks damien and davisp.
git-svn-id: https://svn.apache.org/repos/asf/incubator/couchdb/trunk@724946 13f79535-47bb-0310-9956-ffa450edef68
| -rw-r--r-- | share/www/script/couch_tests.js | 9 | ||||
| -rw-r--r-- | src/couchdb/Makefile.am | 4 | ||||
| -rw-r--r-- | src/couchdb/couch_db.erl | 5 | ||||
| -rw-r--r-- | src/couchdb/couch_db.hrl | 37 | ||||
| -rw-r--r-- | src/couchdb/couch_db_updater.erl | 2 | ||||
| -rw-r--r-- | src/couchdb/couch_httpd.erl | 2 | ||||
| -rw-r--r-- | src/couchdb/couch_httpd_view.erl | 11 | ||||
| -rw-r--r-- | src/couchdb/couch_server.erl | 8 | ||||
| -rw-r--r-- | src/couchdb/couch_view.erl | 551 | ||||
| -rw-r--r-- | src/couchdb/couch_view_group.erl | 192 | ||||
| -rw-r--r-- | src/couchdb/couch_view_updater.erl | 384 | 
11 files changed, 694 insertions, 511 deletions
diff --git a/share/www/script/couch_tests.js b/share/www/script/couch_tests.js index 64b12f6b..ba88281f 100644 --- a/share/www/script/couch_tests.js +++ b/share/www/script/couch_tests.js @@ -1905,6 +1905,7 @@ var tests = {      }      T(db.view("test/single_doc").total_rows == 1); +    var info = db.info();      var doc1 = db.open("1");      var doc2 = db.open("2"); @@ -1913,7 +1914,13 @@ var tests = {        body: JSON.stringify({"1":[doc1._rev], "2":[doc2._rev]}),      });      T(xhr.status == 200); -     + +    var newInfo = db.info(); +    // purging increments the update sequence +    T(info.update_seq+1 == newInfo.update_seq); +    // and it increments the purge_seq +    T(info.purge_seq+1 == newInfo.purge_seq); +      var result = JSON.parse(xhr.responseText);      T(result.purged["1"][0] == doc1._rev);      T(result.purged["2"][0] == doc2._rev); diff --git a/src/couchdb/Makefile.am b/src/couchdb/Makefile.am index 2e4a85c8..a2099931 100644 --- a/src/couchdb/Makefile.am +++ b/src/couchdb/Makefile.am @@ -63,6 +63,8 @@ source_files = \      couch_stream.erl \      couch_util.erl \      couch_view.erl \ +    couch_view_updater.erl \ +    couch_view_group.erl \      couch_db_updater.erl  EXTRA_DIST = $(source_files) couch_db.hrl @@ -92,6 +94,8 @@ compiled_files = \      couch_stream.beam \      couch_util.beam \      couch_view.beam \ +    couch_view_updater.beam \ +    couch_view_group.beam \      couch_db_updater.beam  # doc_base = \ diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl index 7de5b8db..5da16b83 100644 --- a/src/couchdb/couch_db.erl +++ b/src/couchdb/couch_db.erl @@ -17,7 +17,7 @@  -export([open_ref_counted/3,num_refs/1,monitor/1]).  -export([update_doc/3,update_docs/4,update_docs/2,update_docs/3,delete_doc/3]).  -export([get_doc_info/2,open_doc/2,open_doc/3,open_doc_revs/4]). --export([get_missing_revs/2,name/1,doc_to_tree/1]). +-export([get_missing_revs/2,name/1,doc_to_tree/1,get_update_seq/1]).  -export([enum_docs/4,enum_docs/5,enum_docs_since/4,enum_docs_since/5]).  -export([enum_docs_since_reduce_to_count/1,enum_docs_reduce_to_count/1]).  -export([increment_update_seq/1,get_purge_seq/1,purge_docs/2,get_last_purged/1]). @@ -145,6 +145,9 @@ purge_docs(#db{update_pid=UpdatePid}, IdsRevs) ->      gen_server:call(UpdatePid, {purge_docs, IdsRevs}). +get_update_seq(#db{header=#db_header{update_seq=Seq}})-> +    Seq. +      get_purge_seq(#db{header=#db_header{purge_seq=PurgeSeq}})->      PurgeSeq. diff --git a/src/couchdb/couch_db.hrl b/src/couchdb/couch_db.hrl index 7bf3cb9d..6a48ab1c 100644 --- a/src/couchdb/couch_db.hrl +++ b/src/couchdb/couch_db.hrl @@ -88,7 +88,7 @@  -record(user_ctx, -    {name=nil, +    {name=null,      roles=[]      }). @@ -152,6 +152,41 @@      include_docs = false  }). +-record(group, +    {sig=nil, +    db=nil, +    fd=nil, +    name, +    def_lang, +    views, +    id_btree=nil, +    current_seq=0, +    purge_seq=0, +    query_server=nil +    }). + +-record(view, +    {id_num, +    map_names=[], +    def, +    btree=nil, +    reduce_funs=[] +    }). + +-record(server,{ +    root_dir = [], +    dbname_regexp, +    max_dbs_open=100, +    current_dbs_open=0, +    start_time="" +    }). + +-record(index_header, +    {seq=0, +    purge_seq=0, +    id_btree_state=nil, +    view_states=nil +    }).  % small value used in revision trees to indicate the revision isn't stored  -define(REV_MISSING, []). diff --git a/src/couchdb/couch_db_updater.erl b/src/couchdb/couch_db_updater.erl index 67a6f624..68c3a1fc 100644 --- a/src/couchdb/couch_db_updater.erl +++ b/src/couchdb/couch_db_updater.erl @@ -129,7 +129,7 @@ handle_call({purge_docs, IdRevs}, _From, Db) ->          Db#db{              fulldocinfo_by_id_btree = DocInfoByIdBTree2,              docinfo_by_seq_btree = DocInfoBySeqBTree2, -            update_seq = NewSeq, +            update_seq = NewSeq + 1,              header=Header#db_header{purge_seq=PurgeSeq+1, purged_docs=Pointer}}),      ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}), diff --git a/src/couchdb/couch_httpd.erl b/src/couchdb/couch_httpd.erl index 7d621d25..0d489869 100644 --- a/src/couchdb/couch_httpd.erl +++ b/src/couchdb/couch_httpd.erl @@ -261,7 +261,7 @@ basic_username_pw(Req) ->          [User, Pass] ->              {User, Pass};          [User] -> -            {User, <<"">>}; +            {User, ""};          _ ->              nil          end; diff --git a/src/couchdb/couch_httpd_view.erl b/src/couchdb/couch_httpd_view.erl index 5b19af5d..f1c8616f 100644 --- a/src/couchdb/couch_httpd_view.erl +++ b/src/couchdb/couch_httpd_view.erl @@ -22,18 +22,19 @@      start_json_response/2,send_chunk/2,end_json_response/1]).  design_doc_view(Req, Db, Id, ViewName, Keys) -> +    #view_query_args{ +        update = Update, +        reduce = Reduce +    } = QueryArgs = parse_view_query(Req, Keys),      case couch_view:get_map_view({couch_db:name(Db),  -            <<"_design/", Id/binary>>, ViewName}) of +            <<"_design/", Id/binary>>, ViewName, Update}) of      {ok, View} ->     -        QueryArgs = parse_view_query(Req, Keys),          output_map_view(Req, View, Db, QueryArgs, Keys);      {not_found, Reason} ->          case couch_view:get_reduce_view({couch_db:name(Db),                  <<"_design/", Id/binary>>, ViewName}) of          {ok, View} -> -            #view_query_args{ -                reduce = Reduce -            } = QueryArgs = parse_view_query(Req, Keys, true), +            parse_view_query(Req, Keys, true), % just for validation              case Reduce of              false ->                  {reduce, _N, _Lang, MapView} = View, diff --git a/src/couchdb/couch_server.erl b/src/couchdb/couch_server.erl index 34aa16b7..08f71f2b 100644 --- a/src/couchdb/couch_server.erl +++ b/src/couchdb/couch_server.erl @@ -22,14 +22,6 @@  -include("couch_db.hrl"). --record(server,{ -    root_dir = [], -    dbname_regexp, -    max_dbs_open=100, -    current_dbs_open=0, -    start_time="" -    }). -  start() ->      start(["default.ini"]). diff --git a/src/couchdb/couch_view.erl b/src/couchdb/couch_view.erl index 4ebbb136..7c7730a7 100644 --- a/src/couchdb/couch_view.erl +++ b/src/couchdb/couch_view.erl @@ -13,45 +13,12 @@  -module(couch_view).  -behaviour(gen_server). --export([start_link/0,fold/4,fold/5,less_json/2, start_update_loop/3, start_temp_update_loop/5]). +-export([start_link/0,fold/4,fold/5,less_json/2,less_json_keys/2,expand_dups/2,detuple_kvs/2]).  -export([init/1,terminate/2,handle_call/3,handle_cast/2,handle_info/2,code_change/3]).  -export([get_reduce_view/1, get_map_view/1,get_row_count/1,reduce_to_count/1, fold_reduce/7]).  -include("couch_db.hrl"). - --record(group, -    {sig=nil, -    db=nil, -    fd=nil, -    name, -    def_lang, -    views, -    id_btree=nil, -    current_seq=0, -    purge_seq=0, -    query_server=nil -    }). - --record(view, -    {id_num, -    map_names=[], -    def, -    btree=nil, -    reduce_funs=[] -    }). - --record(server, -    {root_dir -    }). - --record(index_header, -    {seq=0, -    purge_seq=0, -    id_btree_state=nil, -    view_states=nil -    }). -  start_link() ->      gen_server:start_link({local, couch_view}, couch_view, [], []). @@ -59,41 +26,30 @@ get_temp_updater(DbName, Type, MapSrc, RedSrc) ->      {ok, Pid} = gen_server:call(couch_view, {start_temp_updater, DbName, Type, MapSrc, RedSrc}),      Pid. -get_updater(DbName, GroupId) -> -    {ok, Pid} = gen_server:call(couch_view, {start_updater, DbName, GroupId}), +get_group_server(DbName, GroupId) -> +    {ok, Pid} = gen_server:call(couch_view, {start_group_server, DbName, GroupId}),      Pid. -get_updated_group(Pid) -> -    Mref = erlang:monitor(process, Pid), -    receive -    {'DOWN', Mref, _, _, Reason} -> -        throw(Reason) -    after 0 -> -        Pid ! {self(), get_updated}, -        receive -        {Pid, Response} -> -            erlang:demonitor(Mref), -            receive -                {'DOWN', Mref, _, _, _} -> ok -                after 0 -> ok -            end, -            Response; -        {'DOWN', Mref, _, _, Reason} -> -            throw(Reason) -        end -    end. +get_updated_group(DbName, GroupId, Update) -> +    couch_view_group:request_group(get_group_server(DbName, GroupId), seq_for_update(DbName, Update)). + +get_updated_group(temp, DbName, Type, MapSrc, RedSrc, Update) -> +    couch_view_group:request_group(get_temp_updater(DbName, Type, MapSrc, RedSrc), seq_for_update(DbName, Update)).  get_row_count(#view{btree=Bt}) ->      {ok, {Count, _Reds}} = couch_btree:full_reduce(Bt),      {ok, Count}.  get_reduce_view({temp, DbName, Type, MapSrc, RedSrc}) -> -    {ok, #group{views=[View]}} = get_updated_group(get_temp_updater(DbName, Type, MapSrc, RedSrc)), +    {ok, #group{views=[View]}} = get_updated_group(temp, DbName, Type, MapSrc, RedSrc, true),      {ok, {temp_reduce, View}};  get_reduce_view({DbName, GroupId, Name}) -> -    {ok, #group{views=Views,def_lang=Lang}} = -            get_updated_group(get_updater(DbName, GroupId)), -    get_reduce_view0(Name, Lang, Views). +    case get_updated_group(DbName, GroupId, true) of +    {error, Reason} -> +        Reason; +    {ok, #group{views=Views,def_lang=Lang}} -> +        get_reduce_view0(Name, Lang, Views) +    end.  get_reduce_view0(_Name, _Lang, []) ->      {not_found, missing_named_view}; @@ -153,13 +109,26 @@ get_key_pos(Key, [{Key1,_Value}|_], N) when Key == Key1 ->      N + 1;  get_key_pos(Key, [_|Rest], N) ->      get_key_pos(Key, Rest, N+1). +        +seq_for_update(DbName, Update) -> +    case Update of +    true -> +        {ok, #db{update_seq=CurrentSeq}} = couch_db:open(DbName, []), +        CurrentSeq; +    _Else -> +        0 +    end.    get_map_view({temp, DbName, Type, Src}) -> -    {ok, #group{views=[View]}} = get_updated_group(get_temp_updater(DbName, Type, Src, [])), +    {ok, #group{views=[View]}} = get_updated_group(temp, DbName, Type, Src, [], true),      {ok, View}; -get_map_view({DbName, GroupId, Name}) -> -    {ok, #group{views=Views}} = get_updated_group(get_updater(DbName, GroupId)), -    get_map_view0(Name, Views). +get_map_view({DbName, GroupId, Name, Update}) -> +    case get_updated_group(DbName, GroupId, Update) of +    {error, Reason} -> +        Reason; +    {ok, #group{views=Views}} -> +        get_map_view0(Name, Views) +    end.  get_map_view0(_Name, []) ->      {not_found, missing_named_view}; @@ -183,37 +152,6 @@ reduce_to_count(Reductions) ->      Count. -design_doc_to_view_group(#doc{id=Id,body={Fields}}) -> -    Language = proplists:get_value(<<"language">>, Fields, <<"javascript">>), -    {RawViews} = proplists:get_value(<<"views">>, Fields, {[]}), -             -    % add the views to a dictionary object, with the map source as the key -    DictBySrc = -    lists:foldl( -        fun({Name, {MRFuns}}, DictBySrcAcc) -> -            MapSrc = proplists:get_value(<<"map">>, MRFuns), -            RedSrc = proplists:get_value(<<"reduce">>, MRFuns, null), -            View = -            case dict:find(MapSrc, DictBySrcAcc) of -                {ok, View0} -> View0; -                error -> #view{def=MapSrc} % create new view object -            end, -            View2 = -            if RedSrc == null -> -                View#view{map_names=[Name|View#view.map_names]}; -            true -> -                View#view{reduce_funs=[{Name,RedSrc}|View#view.reduce_funs]} -            end, -            dict:store(MapSrc, View2, DictBySrcAcc) -        end, dict:new(), RawViews), -    % number the views -    {Views, _N} = lists:mapfoldl( -        fun({_Src, View}, N) -> -            {View#view{id_num=N},N+1} -        end, 0, dict:to_list(DictBySrc)), -     -    Group = #group{name=Id, views=Views, def_lang=Language}, -    Group#group{sig=erlang:md5(term_to_binary(Group))}.  fold_fun(_Fun, [], _, Acc) ->      {ok, Acc}; @@ -253,10 +191,10 @@ init([]) ->          (_Else) ->              ok          end), -    ets:new(couch_views_by_db, [bag, private, named_table]), -    ets:new(couch_views_by_name, [set, protected, named_table]), -    ets:new(couch_views_by_updater, [set, private, named_table]), -    ets:new(couch_views_temp_fd_by_db, [set, protected, named_table]), +    ets:new(couch_groups_by_db, [bag, private, named_table]), +    ets:new(group_servers_by_name, [set, protected, named_table]), +    ets:new(couch_groups_by_updater, [set, private, named_table]), +    ets:new(couch_temp_group_fd_by_db, [set, protected, named_table]),      process_flag(trap_exit, true),      {ok, #server{root_dir=RootDir}}. @@ -268,9 +206,9 @@ handle_call({start_temp_updater, DbName, Lang, MapSrc, RedSrc}, _From, #server{r      <<SigInt:128/integer>> = erlang:md5(term_to_binary({Lang, MapSrc, RedSrc})),      Name = lists:flatten(io_lib:format("_temp_~.36B",[SigInt])),      Pid =  -    case ets:lookup(couch_views_by_name, {DbName, Name}) of +    case ets:lookup(group_servers_by_name, {DbName, Name}) of      [] -> -        case ets:lookup(couch_views_temp_fd_by_db, DbName) of +        case ets:lookup(couch_temp_group_fd_by_db, DbName) of          [] ->              FileName = Root ++ "/." ++ binary_to_list(DbName) ++ "_temp",              {ok, Fd} = couch_file:open(FileName, [create, overwrite]), @@ -279,21 +217,20 @@ handle_call({start_temp_updater, DbName, Lang, MapSrc, RedSrc}, _From, #server{r              ok          end,          ?LOG_DEBUG("Spawning new temp update process for db ~s.", [DbName]), -        NewPid = spawn_link(couch_view, start_temp_update_loop, -                    [DbName, Fd, Lang, MapSrc, RedSrc]), -        true = ets:insert(couch_views_temp_fd_by_db, {DbName, Fd, Count + 1}), +        {ok, NewPid} = couch_view_group:start_link({DbName, Fd, Lang, MapSrc, RedSrc}), +        true = ets:insert(couch_temp_group_fd_by_db, {DbName, Fd, Count + 1}),          add_to_ets(NewPid, DbName, Name),          NewPid;      [{_, ExistingPid0}] ->          ExistingPid0      end,      {reply, {ok, Pid}, Server}; -handle_call({start_updater, DbName, GroupId}, _From, #server{root_dir=Root}=Server) -> +handle_call({start_group_server, DbName, GroupId}, _From, #server{root_dir=Root}=Server) ->      Pid =  -    case ets:lookup(couch_views_by_name, {DbName, GroupId}) of +    case ets:lookup(group_servers_by_name, {DbName, GroupId}) of      [] -> -        ?LOG_DEBUG("Spawning new update process for view group ~s in database ~s.", [GroupId, DbName]), -        NewPid = spawn_link(couch_view, start_update_loop, [Root, DbName, GroupId]), +        ?LOG_DEBUG("Spawning new group server for view group ~s in database ~s.", [GroupId, DbName]), +        {ok, NewPid} = couch_view_group:start_link({Root, DbName, GroupId}),          add_to_ets(NewPid, DbName, GroupId),          NewPid;      [{_, ExistingPid0}] -> @@ -303,11 +240,11 @@ handle_call({start_updater, DbName, GroupId}, _From, #server{root_dir=Root}=Serv  handle_cast({reset_indexes, DbName}, #server{root_dir=Root}=Server) ->      % shutdown all the updaters -    Names = ets:lookup(couch_views_by_db, DbName), +    Names = ets:lookup(couch_groups_by_db, DbName),      lists:foreach(          fun({_DbName, GroupId}) ->              ?LOG_DEBUG("Killing update process for view group ~s. in database ~s.", [GroupId, DbName]), -            [{_, Pid}] = ets:lookup(couch_views_by_name, {DbName, GroupId}), +            [{_, Pid}] = ets:lookup(group_servers_by_name, {DbName, GroupId}),              exit(Pid, kill),              receive {'EXIT', Pid, _} ->                  delete_from_ets(Pid, DbName, GroupId) @@ -318,22 +255,23 @@ handle_cast({reset_indexes, DbName}, #server{root_dir=Root}=Server) ->      {noreply, Server}.  handle_info({'EXIT', _FromPid, normal}, Server) -> -    {noreply, Server}; +   {noreply, Server};  handle_info({'EXIT', FromPid, Reason}, #server{root_dir=RootDir}=Server) -> -    case ets:lookup(couch_views_by_updater, FromPid) of +    ?LOG_DEBUG("Exit from process: ~p", [{FromPid, Reason}]), +    case ets:lookup(couch_groups_by_updater, FromPid) of      [] -> % non-updater linked process must have died, we propagate the error          ?LOG_ERROR("Exit on non-updater process: ~p", [Reason]),          exit(Reason);      [{_, {DbName, "_temp_" ++ _ = GroupId}}] ->          delete_from_ets(FromPid, DbName, GroupId), -        [{_, Fd, Count}] = ets:lookup(couch_views_temp_fd_by_db, DbName), +        [{_, Fd, Count}] = ets:lookup(couch_temp_group_fd_by_db, DbName),          case Count of          1 -> % Last ref              couch_file:close(Fd),              file:delete(RootDir ++ "/." ++ binary_to_list(DbName) ++ "_temp"), -            true = ets:delete(couch_views_temp_fd_by_db, DbName); +            true = ets:delete(couch_temp_group_fd_by_db, DbName);          _ -> -            true = ets:insert(couch_views_temp_fd_by_db, {DbName, Fd, Count - 1}) +            true = ets:insert(couch_temp_group_fd_by_db, {DbName, Fd, Count - 1})          end;      [{_, {DbName, GroupId}}] ->          delete_from_ets(FromPid, DbName, GroupId) @@ -344,225 +282,21 @@ handle_info(Msg, _Server) ->      exit({error, Msg}).  add_to_ets(Pid, DbName, GroupId) -> -    true = ets:insert(couch_views_by_updater, {Pid, {DbName, GroupId}}), -    true = ets:insert(couch_views_by_name, {{DbName, GroupId}, Pid}), -    true = ets:insert(couch_views_by_db, {DbName, GroupId}). +    true = ets:insert(couch_groups_by_updater, {Pid, {DbName, GroupId}}), +    true = ets:insert(group_servers_by_name, {{DbName, GroupId}, Pid}), +    true = ets:insert(couch_groups_by_db, {DbName, GroupId}).  delete_from_ets(Pid, DbName, GroupId) -> -    true = ets:delete(couch_views_by_updater, Pid), -    true = ets:delete(couch_views_by_name, {DbName, GroupId}), -    true = ets:delete_object(couch_views_by_db, {DbName, GroupId}). +    true = ets:delete(couch_groups_by_updater, Pid), +    true = ets:delete(group_servers_by_name, {DbName, GroupId}), +    true = ets:delete_object(couch_groups_by_db, {DbName, GroupId}).  code_change(_OldVsn, State, _Extra) ->      {ok, State}. -start_temp_update_loop(DbName, Fd, Lang, MapSrc, RedSrc) -> -    NotifyPids = get_notify_pids(1000), -    case couch_db:open(DbName, []) of -    {ok, Db} -> -        View = #view{map_names=["_temp"], -            id_num=0, -            btree=nil, -            def=MapSrc, -            reduce_funs= if RedSrc==[] -> []; true -> [{"_temp", RedSrc}] end}, -        Group = #group{name="_temp", -            db=Db, -            views=[View], -            current_seq=0, -            def_lang=Lang, -            id_btree=nil}, -        Group2 = init_group(Db, Fd, Group,nil), -        couch_db:monitor(Db), -        couch_db:close(Db), -        temp_update_loop(DbName, Group2, NotifyPids); -    Else -> -        exit(Else) -    end. -temp_update_loop(DbName, Group, NotifyPids) -> -    {ok, Db} = couch_db:open(DbName, []), -    {_Updated, Group2} = update_group(Group#group{db=Db}), -    couch_db:close(Db), -    [Pid ! {self(), {ok, Group2}} || Pid <- NotifyPids], -    garbage_collect(), -    temp_update_loop(DbName, Group2, get_notify_pids(10000)). - - -reset_group(#group{views=Views}=Group) -> -    Views2 = [View#view{btree=nil} || View <- Views], -    Group#group{db=nil,fd=nil,query_server=nil,current_seq=0, -            id_btree=nil,views=Views2}. - -start_update_loop(RootDir, DbName, GroupId) -> -    % wait for a notify request before doing anything. This way, we can just -    % exit and any exits will be noticed by the callers. -    start_update_loop(RootDir, DbName, GroupId, get_notify_pids(1000)). -     -start_update_loop(RootDir, DbName, GroupId, NotifyPids) -> -    {Db, Group} = -    case (catch couch_db:open(DbName, [])) of -    {ok, Db0} -> -        case (catch couch_db:open_doc(Db0, GroupId)) of -        {ok, Doc} -> -            {Db0, design_doc_to_view_group(Doc)}; -        Else -> -            delete_index_file(RootDir, DbName, GroupId), -            exit(Else) -        end; -    Else -> -        delete_index_file(RootDir, DbName, GroupId), -        exit(Else) -    end, -    FileName = RootDir ++ "/." ++ binary_to_list(DbName) ++  -            binary_to_list(GroupId) ++".view", -    Group2 = -    case couch_file:open(FileName) of -    {ok, Fd} -> -        Sig = Group#group.sig, -        case (catch couch_file:read_header(Fd, <<$r, $c, $k, 0>>)) of -        {ok, {Sig, #index_header{purge_seq=PurgeSeq}=HeaderInfo}} -> -            % sigs match! -            DbPurgeSeq = couch_db:get_purge_seq(Db), -            case (PurgeSeq == DbPurgeSeq) or ((PurgeSeq + 1) == DbPurgeSeq) of -            true -> -                % We can only use index with the same, or next purge seq as the -                % db. -                init_group(Db, Fd, Group, HeaderInfo); -            false -> -                reset_file(Db, Fd, DbName, Group) -            end; -        _ -> -            reset_file(Db, Fd, DbName, Group) -        end; -    {error, enoent} -> -        case couch_file:open(FileName, [create]) of -        {ok, Fd} -> reset_file(Db, Fd, DbName, Group); -        Error    -> throw(Error) -        end -    end, -     -    couch_db:monitor(Db), -    couch_db:close(Db), -    update_loop(RootDir, DbName, GroupId, Group2, NotifyPids). - -reset_file(Db, Fd, DbName, #group{sig=Sig,name=Name} = Group) -> -    ?LOG_DEBUG("Reseting group index \"~s\" in db ~s", [Name, DbName]), -    ok = couch_file:truncate(Fd, 0), -    ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, {Sig, nil}), -    init_group(Db, Fd, reset_group(Group), nil). - -update_loop(RootDir, DbName, GroupId, #group{sig=Sig,fd=Fd}=Group, NotifyPids) -> -    {ok, Db}= couch_db:open(DbName, []), -    Result = -    try -        update_group(Group#group{db=Db}) -    catch -        throw: restart -> restart -    after -        couch_db:close(Db) -    end, -    case Result of -    {same, Group2} -> -        [Pid ! {self(), {ok, Group2}} || Pid <- NotifyPids], -        update_loop(RootDir, DbName, GroupId, Group2, get_notify_pids(100000)); -    {updated, Group2} -> -        HeaderData = {Sig, get_index_header_data(Group2)}, -        ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, HeaderData), -        [Pid ! {self(), {ok, Group2}} || Pid <- NotifyPids], -        garbage_collect(), -        update_loop(RootDir, DbName, GroupId, Group2, get_notify_pids(100000)); -    restart -> -        couch_file:close(Group#group.fd), -        start_update_loop(RootDir, DbName, GroupId, NotifyPids ++ get_notify_pids()) -    end. -% wait for the first request to come in. -get_notify_pids(Wait) -> -    receive -    {Pid, get_updated} -> -        [Pid | get_notify_pids()]; -    {'DOWN', _MonitorRef, _Type, _Pid, _Info} -> -        ?LOG_DEBUG("View monitor received parent db shutdown notification. Shutting down instance.", []), -        exit(db_shutdown); -    Else -> -        ?LOG_ERROR("Unexpected message in view updater: ~p", [Else]), -        exit({error, Else}) -    after Wait -> -        exit(wait_timeout) -    end. -% then keep getting all available and return. -get_notify_pids() -> -    receive -    {Pid, get_updated} -> -        [Pid | get_notify_pids()] -    after 0 -> -        [] -    end. -     -purge(#group{db=Db, views=Views, id_btree=IdBtree}=Group) -> -    {ok, PurgedIdsRevs} = couch_db:get_last_purged(Db), -    Ids = [Id || {Id, _Revs} <- PurgedIdsRevs], -    {ok, Lookups, IdBtree2} = couch_btree:query_modify(IdBtree, Ids, [], Ids), - -    % now populate the dictionary with all the keys to delete -    ViewKeysToRemoveDict = lists:foldl( -        fun({ok,{DocId,ViewNumRowKeys}}, ViewDictAcc) -> -            lists:foldl( -                fun({ViewNum, RowKey}, ViewDictAcc2) -> -                    dict:append(ViewNum, {RowKey, DocId}, ViewDictAcc2) -                end, ViewDictAcc, ViewNumRowKeys); -        ({not_found, _}, ViewDictAcc) -> -            ViewDictAcc -        end, dict:new(), Lookups), -     -    % Now remove the values from the btrees -    Views2 = lists:map( -        fun(#view{id_num=Num,btree=Btree}=View) -> -            case dict:find(Num, ViewKeysToRemoveDict) of -            {ok, RemoveKeys} -> -                {ok, Btree2} = couch_btree:add_remove(Btree, [], RemoveKeys), -                View#view{btree=Btree2}; -            error -> % no keys to remove in this view -                View -            end -        end, Views), -    Group#group{id_btree=IdBtree2, -            views=Views2, -            purge_seq=couch_db:get_purge_seq(Db)}. -     -     -update_group(#group{db=Db,current_seq=CurrentSeq, -        purge_seq=GroupPurgeSeq}=Group) -> -    ViewEmptyKVs = [{View, []} || View <- Group#group.views], -    % compute on all docs modified since we last computed. -    DbPurgeSeq = couch_db:get_purge_seq(Db), -    Group2 = -    case DbPurgeSeq of -    GroupPurgeSeq ->  -        Group; -    DbPurgeSeq when GroupPurgeSeq + 1 == DbPurgeSeq -> -        purge(Group); -    _ -> -        throw(restart) -    end, -    {ok, {UncomputedDocs, Group3, ViewKVsToAdd, DocIdViewIdKeys, NewSeq}} -        = couch_db:enum_docs_since( -            Db, -            CurrentSeq, -            fun(DocInfo, _, Acc) -> process_doc(Db, DocInfo, Acc) end, -            {[], Group2, ViewEmptyKVs, [], CurrentSeq} -            ), -    {Group4, Results} = view_compute(Group3, UncomputedDocs), -    {ViewKVsToAdd2, DocIdViewIdKeys2} = view_insert_query_results(UncomputedDocs, Results, ViewKVsToAdd, DocIdViewIdKeys), -    couch_query_servers:stop_doc_map(Group4#group.query_server), -    if CurrentSeq /= NewSeq -> -        {ok, Group5} = write_changes(Group4, ViewKVsToAdd2, DocIdViewIdKeys2, NewSeq), -        {updated, Group5#group{query_server=nil}}; -    true -> -        {same, Group4#group{query_server=nil}} -    end. -      delete_index_dir(RootDir, DbName) ->      nuke_dir(RootDir ++ "/." ++ binary_to_list(DbName) ++ "_design"). @@ -583,50 +317,6 @@ nuke_dir(Dir) ->          ok = file:del_dir(Dir)      end. -delete_index_file(RootDir, DbName, GroupId) -> -    file:delete(RootDir ++ "/." ++ binary_to_list(DbName) -            ++ binary_to_list(GroupId) ++ ".view"). - -init_group(Db, Fd, #group{views=Views}=Group, nil) -> -    init_group(Db, Fd, Group, -        #index_header{seq=0, purge_seq=couch_db:get_purge_seq(Db), -            id_btree_state=nil, view_states=[nil || _ <- Views]}); -init_group(Db, Fd, #group{def_lang=Lang,views=Views}=Group, IndexHeader) -> -     #index_header{seq=Seq, purge_seq=PurgeSeq, -            id_btree_state=IdBtreeState, view_states=ViewStates} = IndexHeader, -    {ok, IdBtree} = couch_btree:open(IdBtreeState, Fd), -    Views2 = lists:zipwith( -        fun(BtreeState, #view{reduce_funs=RedFuns}=View) -> -            FunSrcs = [FunSrc || {_Name, FunSrc} <- RedFuns], -            ReduceFun =  -                fun(reduce, KVs) -> -                    KVs2 = expand_dups(KVs,[]), -                    KVs3 = detuple_kvs(KVs2,[]), -                    {ok, Reduced} = couch_query_servers:reduce(Lang, FunSrcs, KVs3), -                    {length(KVs3), Reduced}; -                (rereduce, Reds) -> -                    Count = lists:sum([Count0 || {Count0, _} <- Reds]), -                    UserReds = [UserRedsList || {_, UserRedsList} <- Reds], -                    {ok, Reduced} = couch_query_servers:rereduce(Lang, FunSrcs, UserReds), -                    {Count, Reduced} -                end, -            {ok, Btree} = couch_btree:open(BtreeState, Fd, -                        [{less, fun less_json_keys/2},{reduce, ReduceFun}]), -            View#view{btree=Btree} -        end, -        ViewStates, Views), -    Group#group{db=Db, fd=Fd, current_seq=Seq, purge_seq=PurgeSeq, -        id_btree=IdBtree, views=Views2}. - - -get_index_header_data(#group{current_seq=Seq, purge_seq=PurgeSeq,  -            id_btree=IdBtree,views=Views}) -> -    ViewStates = [couch_btree:get_state(Btree) || #view{btree=Btree} <- Views], -    #index_header{seq=Seq, -            purge_seq=PurgeSeq, -            id_btree_state=couch_btree:get_state(IdBtree), -            view_states=ViewStates}. -  % keys come back in the language of btree - tuples.  less_json_keys(A, B) ->      less_json(tuple_to_list(A), tuple_to_list(B)). @@ -703,129 +393,4 @@ less_list([A|RestA], [B|RestB]) ->          end      end. -process_doc(Db, DocInfo, {Docs, #group{sig=Sig,name=GroupId}=Group, ViewKVs, DocIdViewIdKeys, _LastSeq}) -> -    % This fun computes once for each document -    #doc_info{id=DocId, update_seq=Seq, deleted=Deleted} = DocInfo, -    case DocId of -    GroupId -> -        % uh oh. this is the design doc with our definitions. See if -        % anything in the definition changed. -        case couch_db:open_doc(Db, DocInfo) of -        {ok, Doc} -> -            case design_doc_to_view_group(Doc) of -            #group{sig=Sig} -> -                % The same md5 signature, keep on computing -                {ok, {Docs, Group, ViewKVs, DocIdViewIdKeys, Seq}}; -            _ -> -                throw(restart) -            end; -        {not_found, deleted} -> -            throw(restart) -        end; -    <<?DESIGN_DOC_PREFIX, _>> -> % we skip design docs -        {ok, {Docs, Group, ViewKVs, DocIdViewIdKeys, Seq}}; -    _ -> -        {Docs2, DocIdViewIdKeys2} = -        if Deleted -> -            {Docs, [{DocId, []} | DocIdViewIdKeys]}; -        true -> -            {ok, Doc} = couch_db:open_doc(Db, DocInfo, [conflicts, deleted_conflicts]), -            {[Doc | Docs], DocIdViewIdKeys} -        end, -        case couch_util:should_flush() of -        true -> -            {Group1, Results} = view_compute(Group, Docs2), -            {ViewKVs3, DocIdViewIdKeys3} = view_insert_query_results(Docs2, Results, ViewKVs, DocIdViewIdKeys2), -            {ok, Group2} = write_changes(Group1, ViewKVs3, DocIdViewIdKeys3, Seq), -            garbage_collect(), -            ViewEmptyKeyValues = [{View, []} || View <- Group2#group.views], -            {ok, {[], Group2, ViewEmptyKeyValues, [], Seq}}; -        false -> -            {ok, {Docs2, Group, ViewKVs, DocIdViewIdKeys2, Seq}} -        end -    end. -view_insert_query_results([], [], ViewKVs, DocIdViewIdKeysAcc) -> -    {ViewKVs, DocIdViewIdKeysAcc}; -view_insert_query_results([Doc|RestDocs], [QueryResults | RestResults], ViewKVs, DocIdViewIdKeysAcc) -> -    {NewViewKVs, NewViewIdKeys} = view_insert_doc_query_results(Doc, QueryResults, ViewKVs, [], []), -    NewDocIdViewIdKeys = [{Doc#doc.id, NewViewIdKeys} | DocIdViewIdKeysAcc], -    view_insert_query_results(RestDocs, RestResults, NewViewKVs, NewDocIdViewIdKeys). - - -view_insert_doc_query_results(_Doc, [], [], ViewKVsAcc, ViewIdKeysAcc) -> -    {lists:reverse(ViewKVsAcc), lists:reverse(ViewIdKeysAcc)}; -view_insert_doc_query_results(#doc{id=DocId}=Doc, [ResultKVs|RestResults], [{View, KVs}|RestViewKVs], ViewKVsAcc, ViewIdKeysAcc) -> -    % Take any identical keys and combine the values -    ResultKVs2 = lists:foldl( -        fun({Key,Value}, [{PrevKey,PrevVal}|AccRest]) -> -            case Key == PrevKey of -            true -> -                case PrevVal of -                {dups, Dups} -> -                    [{PrevKey, {dups, [Value|Dups]}} | AccRest]; -                _ -> -                    [{PrevKey, {dups, [Value,PrevVal]}} | AccRest] -                end; -            false -> -                [{Key,Value},{PrevKey,PrevVal}|AccRest] -            end; -        (KV, []) -> -           [KV]  -        end, [], lists:sort(ResultKVs)), -    NewKVs = [{{Key, DocId}, Value} || {Key, Value} <- ResultKVs2], -    NewViewKVsAcc = [{View, NewKVs ++ KVs} | ViewKVsAcc], -    NewViewIdKeys = [{View#view.id_num, Key} || {Key, _Value} <- ResultKVs2], -    NewViewIdKeysAcc = NewViewIdKeys ++ ViewIdKeysAcc, -    view_insert_doc_query_results(Doc, RestResults, RestViewKVs, NewViewKVsAcc, NewViewIdKeysAcc). - -view_compute(Group, []) -> -    {Group, []}; -view_compute(#group{def_lang=DefLang, query_server=QueryServerIn}=Group, Docs) -> -    {ok, QueryServer} = -    case QueryServerIn of -    nil -> % doc map not started -        Definitions = [View#view.def || View <- Group#group.views], -        couch_query_servers:start_doc_map(DefLang, Definitions); -    _ -> -        {ok, QueryServerIn} -    end, -    {ok, Results} = couch_query_servers:map_docs(QueryServer, Docs), -    {Group#group{query_server=QueryServer}, Results}. - - - -write_changes(Group, ViewKeyValuesToAdd, DocIdViewIdKeys, NewSeq) -> -    #group{id_btree=IdBtree} = Group, - -    AddDocIdViewIdKeys = [{DocId, ViewIdKeys} || {DocId, ViewIdKeys} <- DocIdViewIdKeys, ViewIdKeys /= []], -    RemoveDocIds = [DocId || {DocId, ViewIdKeys} <- DocIdViewIdKeys, ViewIdKeys == []], -    LookupDocIds = [DocId || {DocId, _ViewIdKeys} <- DocIdViewIdKeys], -    {ok, LookupResults, IdBtree2} -        = couch_btree:query_modify(IdBtree, LookupDocIds, AddDocIdViewIdKeys, RemoveDocIds), -    KeysToRemoveByView = lists:foldl( -        fun(LookupResult, KeysToRemoveByViewAcc) -> -            case LookupResult of -            {ok, {DocId, ViewIdKeys}} -> -                lists:foldl( -                    fun({ViewId, Key}, KeysToRemoveByViewAcc2) -> -                        dict:append(ViewId, {Key, DocId}, KeysToRemoveByViewAcc2) -                    end, -                    KeysToRemoveByViewAcc, ViewIdKeys); -            {not_found, _} -> -                KeysToRemoveByViewAcc -            end -        end, -        dict:new(), LookupResults), - -    Views2 = [ -        begin -            KeysToRemove = couch_util:dict_find(View#view.id_num, KeysToRemoveByView, []), -            {ok, ViewBtree2} = couch_btree:add_remove(View#view.btree, AddKeyValues, KeysToRemove), -            View#view{btree = ViewBtree2} -        end -    || -        {View, AddKeyValues} <- ViewKeyValuesToAdd -    ], -    Group2 = Group#group{views=Views2, current_seq=NewSeq, id_btree=IdBtree2}, -    {ok, Group2}. diff --git a/src/couchdb/couch_view_group.erl b/src/couchdb/couch_view_group.erl new file mode 100644 index 00000000..84547095 --- /dev/null +++ b/src/couchdb/couch_view_group.erl @@ -0,0 +1,192 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License.  You may obtain a copy of +% the License at +% +%   http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_view_group). +-behaviour(gen_server). + +%% API +-export([start_link/1, request_group/2]). +% -export([design_doc_to_view_group/1]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, +	 terminate/2, code_change/3]). + +-include("couch_db.hrl"). +	  +-record(group_state, { +    spawn_fun, +    target_seq=0, +    group_seq=0, +    group=nil, +    updater_pid=nil, +    waiting_list=[] +}). + +% api methods +request_group(Pid, Seq) -> +    ?LOG_DEBUG("request_group {Pid, Seq} ~p", [{Pid, Seq}]), +    case gen_server:call(Pid, {request_group, Seq}, infinity) of +    {ok, Group} -> +        ?LOG_DEBUG("get_updated_group replied with group", []), +        {ok, Group}; +    Else -> +        ?LOG_DEBUG("get_updated_group replied with _Else ~p", [Else]), +        Else +    end. + + +% from template +start_link(InitArgs) -> +    gen_server:start_link(couch_view_group, InitArgs, []). + +% init differentiates between temp and design_doc views. It creates a closure +% which spawns the appropriate view_updater. (It might also spawn the first +% view_updater run.) +init(InitArgs) -> +    SpawnFun = fun() -> spawn_updater(InitArgs) end, +    process_flag(trap_exit, true), +    {ok, #group_state{spawn_fun=SpawnFun}}. + +% There are two sources of messages: couch_view, which requests an up to date +% view group, and the couch_view_updater, which when spawned, updates the +% group and sends it back here. We employ a caching mechanism, so that between +% database writes, we don't have to spawn a couch_view_updater with every view +% request. This should give us more control, and the ability to request view +% statuses eventually. + +% The caching mechanism: each request is submitted with a seq_id for the +% database at the time it was read. We guarantee to return a view from that +% sequence or newer. + +% If the request sequence is higher than our current high_target seq, we set +% that as the highest seqence. If the updater is not running, we launch it. + +handle_call({request_group, RequestSeq}, From,  +        #group_state{ +            target_seq=TargetSeq,  +            spawn_fun=SpawnFun, +            updater_pid=Up, +            waiting_list=WaitList +            }=State) when RequestSeq > TargetSeq, Up == nil  ->     +    UpdaterPid = SpawnFun(), +    {noreply, State#group_state{ +        target_seq=RequestSeq,  +        updater_pid=UpdaterPid, +        waiting_list=[{From,RequestSeq}|WaitList] +    }, infinity}; + +handle_call({request_group, RequestSeq}, From,  +        #group_state{ +            target_seq=TargetSeq, +            waiting_list=WaitList +            }=State) when RequestSeq > TargetSeq  -> +    {noreply, State#group_state{ +        target_seq=RequestSeq,  +        waiting_list=[{From,RequestSeq}|WaitList] +    }, infinity}; +         + +% If the request seqence is less than or equal to the seq_id of a known Group, +% we respond with that Group. +handle_call({request_group, RequestSeq}, _From,  +        State=#group_state{ +            group_seq=GroupSeq, +            group=Group  +            }) when RequestSeq =< GroupSeq  -> +    {reply, {ok, Group}, State}; + +% Otherwise: TargetSeq => RequestSeq > GroupSeq +% We've already initiated the appropriate action, so just hold the response until the group is up to the RequestSeq +handle_call({request_group, RequestSeq}, From, +    #group_state{ +        waiting_list=WaitList +        }=State) -> +    {noreply, State#group_state{ +        waiting_list=[{From, RequestSeq}|WaitList] +    }, infinity}. + + +% When the updater finishes, it will return a group with a seq_id, we should +% store that group and seq_id in our state. If our high_target is higher than +% the returned group, start a new updater. + +handle_cast({new_group, Group=#group{current_seq=NewGroupSeq}},  +        State=#group_state{ +            target_seq=TargetSeq,  +            waiting_list=WaitList, +            spawn_fun=SpawnFun}) when TargetSeq > NewGroupSeq -> +    StillWaiting = reply_with_group(Group, WaitList, []), +    UpdaterPid = SpawnFun(), +    {noreply, State#group_state{  +        updater_pid=UpdaterPid, +        waiting_list=StillWaiting, +        group_seq=NewGroupSeq, +        group=Group}}; +         +handle_cast({new_group, Group=#group{current_seq=NewGroupSeq}}, +        State=#group_state{waiting_list=WaitList}) -> +    StillWaiting = reply_with_group(Group, WaitList, []), +    {noreply, State#group_state{ +        updater_pid=nil, +        waiting_list=StillWaiting, +        group_seq=NewGroupSeq, +        group=Group}}. +    +handle_info({'EXIT', _FromPid, normal}, State) -> +    {noreply, State}; +     +handle_info({'EXIT', FromPid, Reason}, State) -> +    ?LOG_DEBUG("Exit from updater: ~p", [{FromPid, Reason}]), +    {stop, Reason, State}; +     +handle_info(_Info, State) -> +    {noreply, State}. + +terminate(Reason, _State=#group_state{waiting_list=WaitList}) -> +    lists:foreach(fun({Waiter, _}) -> gen_server:reply(Waiter, {error, Reason}) end, WaitList),     +    ok. + +code_change(_OldVsn, State, _Extra) -> +    {ok, State}. + +% error handling? the updater could die on us, we can save ourselves here. +% but we shouldn't, we could be dead for a reason, like the view got changed, or something. + + +%% Local Functions + +% reply_with_group/3 +% for each item in the WaitingList {Pid, Seq} +% if the Seq is =< GroupSeq, reply +reply_with_group(Group=#group{current_seq=GroupSeq}, [{Pid, Seq}|WaitList], StillWaiting) when Seq =< GroupSeq -> +    gen_server:reply(Pid, {ok, Group}), +    reply_with_group(Group, WaitList, StillWaiting); + +% else +% put it in the continuing waiting list     +reply_with_group(Group, [{Pid, Seq}|WaitList], StillWaiting) -> +    reply_with_group(Group, WaitList, [{Pid, Seq}|StillWaiting]); + +% return the still waiting list +reply_with_group(_Group, [], StillWaiting) -> +    StillWaiting. + +spawn_updater({RootDir, DbName, GroupId}) ->  +    spawn_link(couch_view_updater, update, +        [RootDir, DbName, GroupId, self()]); + +spawn_updater({DbName, Fd, Lang, MapSrc, RedSrc}) -> +    spawn_link(couch_view_updater, temp_update, +        [DbName, Fd, Lang, MapSrc, RedSrc, self()]). +     + diff --git a/src/couchdb/couch_view_updater.erl b/src/couchdb/couch_view_updater.erl new file mode 100644 index 00000000..7f40af3b --- /dev/null +++ b/src/couchdb/couch_view_updater.erl @@ -0,0 +1,384 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License.  You may obtain a copy of +% the License at +% +%   http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_view_updater). + +-export([update/4, temp_update/6]). + +-include("couch_db.hrl"). + + + +update(RootDir, DbName, GroupId, NotifyPid) -> +    {ok, #group{sig=Sig,fd=Fd}=Group} = prepare_group(RootDir, DbName, GroupId), +    {ok, Db} = couch_db:open(DbName, []), +    Result = update_group(Group#group{db=Db}), +    ?LOG_DEBUG("update {Result} DONE ~p", [{Result}]),     +    couch_db:close(Db), +    case Result of +    {same, Group2} -> +        gen_server:cast(NotifyPid, {new_group, Group2}); +    {updated, Group2} -> +        HeaderData = {Sig, get_index_header_data(Group2)}, +        ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, HeaderData), +        gen_server:cast(NotifyPid, {new_group, Group2}) +    end, +    garbage_collect(). +     +temp_update(DbName, Fd, Lang, MapSrc, RedSrc, NotifyPid) -> +    case couch_db:open(DbName, []) of +    {ok, Db} -> +        View = #view{map_names=["_temp"], +            id_num=0, +            btree=nil, +            def=MapSrc, +            reduce_funs= if RedSrc==[] -> []; true -> [{"_temp", RedSrc}] end}, +        Group = #group{name="_temp", +            db=Db, +            views=[View], +            current_seq=0, +            def_lang=Lang, +            id_btree=nil}, +        Group2 = init_group(Db, Fd, Group,nil), +        couch_db:monitor(Db), +        {_Updated, Group3} = update_group(Group2#group{db=Db}), +        couch_db:close(Db), +        gen_server:cast(NotifyPid, {new_group, Group3}), +        garbage_collect(); +    Else -> +        exit(Else) +    end. + + +update_group(#group{db=Db,current_seq=CurrentSeq}=Group) -> +    ViewEmptyKVs = [{View, []} || View <- Group#group.views], +    % compute on all docs modified since we last computed. +    {ok, {UncomputedDocs, Group3, ViewKVsToAdd, DocIdViewIdKeys}} +        = couch_db:enum_docs_since( +            Db, +            CurrentSeq, +            fun(DocInfo, _, Acc) -> process_doc(Db, DocInfo, Acc) end, +            {[], Group, ViewEmptyKVs, []} +            ), +    {Group4, Results} = view_compute(Group3, UncomputedDocs), +    {ViewKVsToAdd2, DocIdViewIdKeys2} = view_insert_query_results(UncomputedDocs, Results, ViewKVsToAdd, DocIdViewIdKeys), +    couch_query_servers:stop_doc_map(Group4#group.query_server), +    NewSeq = couch_db:get_update_seq(Db), +    if CurrentSeq /= NewSeq -> +        {ok, Group5} = write_changes(Group4, ViewKVsToAdd2, DocIdViewIdKeys2, NewSeq), +        {updated, Group5#group{query_server=nil}}; +    true -> +        {same, Group4#group{query_server=nil}} +    end. + + +get_index_header_data(#group{current_seq=Seq, purge_seq=PurgeSeq,  +            id_btree=IdBtree,views=Views}) -> +    ViewStates = [couch_btree:get_state(Btree) || #view{btree=Btree} <- Views], +    #index_header{seq=Seq, +            purge_seq=PurgeSeq, +            id_btree_state=couch_btree:get_state(IdBtree), +            view_states=ViewStates}. + + +purge_index(#group{db=Db, views=Views, id_btree=IdBtree}=Group) -> +    {ok, PurgedIdsRevs} = couch_db:get_last_purged(Db), +    Ids = [Id || {Id, _Revs} <- PurgedIdsRevs], +    {ok, Lookups, IdBtree2} = couch_btree:query_modify(IdBtree, Ids, [], Ids), + +    % now populate the dictionary with all the keys to delete +    ViewKeysToRemoveDict = lists:foldl( +        fun({ok,{DocId,ViewNumRowKeys}}, ViewDictAcc) -> +            lists:foldl( +                fun({ViewNum, RowKey}, ViewDictAcc2) -> +                    dict:append(ViewNum, {RowKey, DocId}, ViewDictAcc2) +                end, ViewDictAcc, ViewNumRowKeys); +        ({not_found, _}, ViewDictAcc) -> +            ViewDictAcc +        end, dict:new(), Lookups), + +    % Now remove the values from the btrees +    Views2 = lists:map( +        fun(#view{id_num=Num,btree=Btree}=View) -> +            case dict:find(Num, ViewKeysToRemoveDict) of +            {ok, RemoveKeys} -> +                {ok, Btree2} = couch_btree:add_remove(Btree, [], RemoveKeys), +                View#view{btree=Btree2}; +            error -> % no keys to remove in this view +                View +            end +        end, Views), +    Group#group{id_btree=IdBtree2, +            views=Views2, +            purge_seq=couch_db:get_purge_seq(Db)}. + +process_doc(Db, DocInfo, {Docs, #group{sig=Sig,name=GroupId}=Group, ViewKVs, DocIdViewIdKeys}) -> +    % This fun computes once for each document         +    #doc_info{id=DocId, deleted=Deleted} = DocInfo, +    case DocId of +    GroupId -> +        % uh oh. this is the design doc with our definitions. See if +        % anything in the definition changed. +        case couch_db:open_doc(Db, DocInfo) of +        {ok, Doc} -> +            case design_doc_to_view_group(Doc) of +            #group{sig=Sig} -> +                % The same md5 signature, keep on computing +                {ok, {Docs, Group, ViewKVs, DocIdViewIdKeys}}; +            _ -> +                ?LOG_DEBUG("throw(restart) md5 broke ~p", [DocId]), +                throw(restart) +            end; +        {not_found, deleted} -> +            ?LOG_DEBUG("throw(restart) {not_found, deleted} ~p", [DocId]), +            throw(restart) +        end; +    <<?DESIGN_DOC_PREFIX, _/binary>> -> % we skip design docs +        {ok, {Docs, Group, ViewKVs, DocIdViewIdKeys}}; +    _ -> +        {Docs2, DocIdViewIdKeys2} = +        if Deleted -> +            {Docs, [{DocId, []} | DocIdViewIdKeys]}; +        true -> +            {ok, Doc} = couch_db:open_doc(Db, DocInfo, [conflicts, deleted_conflicts]), +            {[Doc | Docs], DocIdViewIdKeys} +        end, +         +        case couch_util:should_flush() of +        true -> +            {Group1, Results} = view_compute(Group, Docs2), +            {ViewKVs3, DocIdViewIdKeys3} = view_insert_query_results(Docs2, Results, ViewKVs, DocIdViewIdKeys2), +            {ok, Group2} = write_changes(Group1, ViewKVs3, DocIdViewIdKeys3, +                    DocInfo#doc_info.update_seq), +            garbage_collect(), +            ViewEmptyKeyValues = [{View, []} || View <- Group2#group.views], +            {ok, {[], Group2, ViewEmptyKeyValues, []}}; +        false -> +            {ok, {Docs2, Group, ViewKVs, DocIdViewIdKeys2}} +        end +    end. + +view_insert_query_results([], [], ViewKVs, DocIdViewIdKeysAcc) -> +    {ViewKVs, DocIdViewIdKeysAcc}; +view_insert_query_results([Doc|RestDocs], [QueryResults | RestResults], ViewKVs, DocIdViewIdKeysAcc) -> +    {NewViewKVs, NewViewIdKeys} = view_insert_doc_query_results(Doc, QueryResults, ViewKVs, [], []), +    NewDocIdViewIdKeys = [{Doc#doc.id, NewViewIdKeys} | DocIdViewIdKeysAcc], +    view_insert_query_results(RestDocs, RestResults, NewViewKVs, NewDocIdViewIdKeys). + + +view_insert_doc_query_results(_Doc, [], [], ViewKVsAcc, ViewIdKeysAcc) -> +    {lists:reverse(ViewKVsAcc), lists:reverse(ViewIdKeysAcc)}; +view_insert_doc_query_results(#doc{id=DocId}=Doc, [ResultKVs|RestResults], [{View, KVs}|RestViewKVs], ViewKVsAcc, ViewIdKeysAcc) -> +    % Take any identical keys and combine the values +    ResultKVs2 = lists:foldl( +        fun({Key,Value}, [{PrevKey,PrevVal}|AccRest]) -> +            case Key == PrevKey of +            true -> +                case PrevVal of +                {dups, Dups} -> +                    [{PrevKey, {dups, [Value|Dups]}} | AccRest]; +                _ -> +                    [{PrevKey, {dups, [Value,PrevVal]}} | AccRest] +                end; +            false -> +                [{Key,Value},{PrevKey,PrevVal}|AccRest] +            end; +        (KV, []) -> +           [KV]  +        end, [], lists:sort(ResultKVs)), +    NewKVs = [{{Key, DocId}, Value} || {Key, Value} <- ResultKVs2], +    NewViewKVsAcc = [{View, NewKVs ++ KVs} | ViewKVsAcc], +    NewViewIdKeys = [{View#view.id_num, Key} || {Key, _Value} <- ResultKVs2], +    NewViewIdKeysAcc = NewViewIdKeys ++ ViewIdKeysAcc, +    view_insert_doc_query_results(Doc, RestResults, RestViewKVs, NewViewKVsAcc, NewViewIdKeysAcc). + +view_compute(Group, []) -> +    {Group, []}; +view_compute(#group{def_lang=DefLang, query_server=QueryServerIn}=Group, Docs) -> +    {ok, QueryServer} = +    case QueryServerIn of +    nil -> % doc map not started +        Definitions = [View#view.def || View <- Group#group.views], +        couch_query_servers:start_doc_map(DefLang, Definitions); +    _ -> +        {ok, QueryServerIn} +    end, +    {ok, Results} = couch_query_servers:map_docs(QueryServer, Docs), +    {Group#group{query_server=QueryServer}, Results}. + + + +write_changes(Group, ViewKeyValuesToAdd, DocIdViewIdKeys, NewSeq) -> +    #group{id_btree=IdBtree} = Group, + +    AddDocIdViewIdKeys = [{DocId, ViewIdKeys} || {DocId, ViewIdKeys} <- DocIdViewIdKeys, ViewIdKeys /= []], +    RemoveDocIds = [DocId || {DocId, ViewIdKeys} <- DocIdViewIdKeys, ViewIdKeys == []], +    LookupDocIds = [DocId || {DocId, _ViewIdKeys} <- DocIdViewIdKeys], +    {ok, LookupResults, IdBtree2} +        = couch_btree:query_modify(IdBtree, LookupDocIds, AddDocIdViewIdKeys, RemoveDocIds), +    KeysToRemoveByView = lists:foldl( +        fun(LookupResult, KeysToRemoveByViewAcc) -> +            case LookupResult of +            {ok, {DocId, ViewIdKeys}} -> +                lists:foldl( +                    fun({ViewId, Key}, KeysToRemoveByViewAcc2) -> +                        dict:append(ViewId, {Key, DocId}, KeysToRemoveByViewAcc2) +                    end, +                    KeysToRemoveByViewAcc, ViewIdKeys); +            {not_found, _} -> +                KeysToRemoveByViewAcc +            end +        end, +        dict:new(), LookupResults), + +    Views2 = [ +        begin +            KeysToRemove = couch_util:dict_find(View#view.id_num, KeysToRemoveByView, []), +            {ok, ViewBtree2} = couch_btree:add_remove(View#view.btree, AddKeyValues, KeysToRemove), +            View#view{btree = ViewBtree2} +        end +    || +        {View, AddKeyValues} <- ViewKeyValuesToAdd +    ], +    Group2 = Group#group{views=Views2, current_seq=NewSeq, id_btree=IdBtree2}, +    {ok, Group2}. +     +prepare_group(RootDir, DbName, GroupId) -> +    {Db, Group} = case (catch couch_db:open(DbName, [])) of +    {ok, Db0} -> +        case (catch couch_db:open_doc(Db0, GroupId)) of +        {ok, Doc} -> +            {Db0, design_doc_to_view_group(Doc)}; +        Else -> +            delete_index_file(RootDir, DbName, GroupId), +            ?LOG_DEBUG("prepare_group exit Else ~p self() ~p", [Else, self()]),     +            exit(Else) +        end; +    Else -> +        delete_index_file(RootDir, DbName, GroupId), +        exit(Else) +    end, +    FileName = RootDir ++ "/." ++ binary_to_list(DbName) ++  +            binary_to_list(GroupId) ++".view", +    Group2 = +    case couch_file:open(FileName) of +    {ok, Fd} -> +        Sig = Group#group.sig, +        case (catch couch_file:read_header(Fd, <<$r, $c, $k, 0>>)) of +        {ok, {Sig, #index_header{purge_seq=PurgeSeq}=HeaderInfo}} -> +            % sigs match! +            DbPurgeSeq = couch_db:get_purge_seq(Db), +            % We can only use index with the same, or next purge seq as the db. +            if DbPurgeSeq == PurgeSeq -> +                init_group(Db, Fd, Group, HeaderInfo); +            DbPurgeSeq == PurgeSeq + 1 -> +                ?LOG_DEBUG("Purging entries from view index.",[]), +                purge_index(init_group(Db, Fd, Group, HeaderInfo)); +            true -> +                ?LOG_DEBUG("Reseting view index due to lost purge entries.",[]), +                reset_file(Db, Fd, DbName, Group) +            end; +        _ -> +            reset_file(Db, Fd, DbName, Group) +        end; +    {error, enoent} -> +        case couch_file:open(FileName, [create]) of +        {ok, Fd} -> reset_file(Db, Fd, DbName, Group); +        Error    -> throw(Error) +        end +    end, + +    couch_db:monitor(Db), +    couch_db:close(Db), +    {ok, Group2}. + +% maybe move to another module +design_doc_to_view_group(#doc{id=Id,body={Fields}}) -> +    Language = proplists:get_value(<<"language">>, Fields, <<"javascript">>), +    {RawViews} = proplists:get_value(<<"views">>, Fields, {[]}), + +    % add the views to a dictionary object, with the map source as the key +    DictBySrc = +    lists:foldl( +        fun({Name, {MRFuns}}, DictBySrcAcc) -> +            MapSrc = proplists:get_value(<<"map">>, MRFuns), +            RedSrc = proplists:get_value(<<"reduce">>, MRFuns, null), +            View = +            case dict:find(MapSrc, DictBySrcAcc) of +                {ok, View0} -> View0; +                error -> #view{def=MapSrc} % create new view object +            end, +            View2 = +            if RedSrc == null -> +                View#view{map_names=[Name|View#view.map_names]}; +            true -> +                View#view{reduce_funs=[{Name,RedSrc}|View#view.reduce_funs]} +            end, +            dict:store(MapSrc, View2, DictBySrcAcc) +        end, dict:new(), RawViews), +    % number the views +    {Views, _N} = lists:mapfoldl( +        fun({_Src, View}, N) -> +            {View#view{id_num=N},N+1} +        end, 0, dict:to_list(DictBySrc)), + +    Group = #group{name=Id, views=Views, def_lang=Language}, +    Group#group{sig=erlang:md5(term_to_binary(Group))}. + +reset_group(#group{views=Views}=Group) -> +    Views2 = [View#view{btree=nil} || View <- Views], +    Group#group{db=nil,fd=nil,query_server=nil,current_seq=0, +            id_btree=nil,views=Views2}. + +reset_file(Db, Fd, DbName, #group{sig=Sig,name=Name} = Group) -> +    ?LOG_DEBUG("Reseting group index \"~s\" in db ~s", [Name, DbName]), +    ok = couch_file:truncate(Fd, 0), +    ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, {Sig, nil}), +    init_group(Db, Fd, reset_group(Group), nil). + +delete_index_file(RootDir, DbName, GroupId) -> +    file:delete(RootDir ++ "/." ++ binary_to_list(DbName) +            ++ binary_to_list(GroupId) ++ ".view"). + +init_group(Db, Fd, #group{views=Views}=Group, nil) -> +    init_group(Db, Fd, Group, +        #index_header{seq=0, purge_seq=couch_db:get_purge_seq(Db), +            id_btree_state=nil, view_states=[nil || _ <- Views]}); +init_group(Db, Fd, #group{def_lang=Lang,views=Views}=Group, IndexHeader) -> +     #index_header{seq=Seq, purge_seq=PurgeSeq, +            id_btree_state=IdBtreeState, view_states=ViewStates} = IndexHeader, +    {ok, IdBtree} = couch_btree:open(IdBtreeState, Fd), +    Views2 = lists:zipwith( +        fun(BtreeState, #view{reduce_funs=RedFuns}=View) -> +            FunSrcs = [FunSrc || {_Name, FunSrc} <- RedFuns], +            ReduceFun =  +                fun(reduce, KVs) -> +                    KVs2 = couch_view:expand_dups(KVs,[]), +                    KVs3 = couch_view:detuple_kvs(KVs2,[]), +                    {ok, Reduced} = couch_query_servers:reduce(Lang, FunSrcs,  +                        KVs3), +                    {length(KVs3), Reduced}; +                (rereduce, Reds) -> +                    Count = lists:sum([Count0 || {Count0, _} <- Reds]), +                    UserReds = [UserRedsList || {_, UserRedsList} <- Reds], +                    {ok, Reduced} = couch_query_servers:rereduce(Lang, FunSrcs, +                        UserReds), +                    {Count, Reduced} +                end, +            {ok, Btree} = couch_btree:open(BtreeState, Fd, +                        [{less, fun couch_view:less_json_keys/2}, +                            {reduce, ReduceFun}]), +            View#view{btree=Btree} +        end, +        ViewStates, Views), +    Group#group{db=Db, fd=Fd, current_seq=Seq, purge_seq=PurgeSeq, +        id_btree=IdBtree, views=Views2}.
\ No newline at end of file  | 
