summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorJohn Christopher Anderson <jchris@apache.org>2008-12-10 01:13:17 +0000
committerJohn Christopher Anderson <jchris@apache.org>2008-12-10 01:13:17 +0000
commit5a9321814a727e8c010bf83f05572a341d55f26a (patch)
tree91b6233a3d81f9b29a34d9653fffbde284cdfa4b /src
parent6bacde0d941d209f41ad3ca8921e3a596a056c06 (diff)
view group state gen_server. thanks damien and davisp.
git-svn-id: https://svn.apache.org/repos/asf/incubator/couchdb/trunk@724946 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src')
-rw-r--r--src/couchdb/Makefile.am4
-rw-r--r--src/couchdb/couch_db.erl5
-rw-r--r--src/couchdb/couch_db.hrl37
-rw-r--r--src/couchdb/couch_db_updater.erl2
-rw-r--r--src/couchdb/couch_httpd.erl2
-rw-r--r--src/couchdb/couch_httpd_view.erl11
-rw-r--r--src/couchdb/couch_server.erl8
-rw-r--r--src/couchdb/couch_view.erl551
-rw-r--r--src/couchdb/couch_view_group.erl192
-rw-r--r--src/couchdb/couch_view_updater.erl384
10 files changed, 686 insertions, 510 deletions
diff --git a/src/couchdb/Makefile.am b/src/couchdb/Makefile.am
index 2e4a85c8..a2099931 100644
--- a/src/couchdb/Makefile.am
+++ b/src/couchdb/Makefile.am
@@ -63,6 +63,8 @@ source_files = \
couch_stream.erl \
couch_util.erl \
couch_view.erl \
+ couch_view_updater.erl \
+ couch_view_group.erl \
couch_db_updater.erl
EXTRA_DIST = $(source_files) couch_db.hrl
@@ -92,6 +94,8 @@ compiled_files = \
couch_stream.beam \
couch_util.beam \
couch_view.beam \
+ couch_view_updater.beam \
+ couch_view_group.beam \
couch_db_updater.beam
# doc_base = \
diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl
index 7de5b8db..5da16b83 100644
--- a/src/couchdb/couch_db.erl
+++ b/src/couchdb/couch_db.erl
@@ -17,7 +17,7 @@
-export([open_ref_counted/3,num_refs/1,monitor/1]).
-export([update_doc/3,update_docs/4,update_docs/2,update_docs/3,delete_doc/3]).
-export([get_doc_info/2,open_doc/2,open_doc/3,open_doc_revs/4]).
--export([get_missing_revs/2,name/1,doc_to_tree/1]).
+-export([get_missing_revs/2,name/1,doc_to_tree/1,get_update_seq/1]).
-export([enum_docs/4,enum_docs/5,enum_docs_since/4,enum_docs_since/5]).
-export([enum_docs_since_reduce_to_count/1,enum_docs_reduce_to_count/1]).
-export([increment_update_seq/1,get_purge_seq/1,purge_docs/2,get_last_purged/1]).
@@ -145,6 +145,9 @@ purge_docs(#db{update_pid=UpdatePid}, IdsRevs) ->
gen_server:call(UpdatePid, {purge_docs, IdsRevs}).
+get_update_seq(#db{header=#db_header{update_seq=Seq}})->
+ Seq.
+
get_purge_seq(#db{header=#db_header{purge_seq=PurgeSeq}})->
PurgeSeq.
diff --git a/src/couchdb/couch_db.hrl b/src/couchdb/couch_db.hrl
index 7bf3cb9d..6a48ab1c 100644
--- a/src/couchdb/couch_db.hrl
+++ b/src/couchdb/couch_db.hrl
@@ -88,7 +88,7 @@
-record(user_ctx,
- {name=nil,
+ {name=null,
roles=[]
}).
@@ -152,6 +152,41 @@
include_docs = false
}).
+-record(group,
+ {sig=nil,
+ db=nil,
+ fd=nil,
+ name,
+ def_lang,
+ views,
+ id_btree=nil,
+ current_seq=0,
+ purge_seq=0,
+ query_server=nil
+ }).
+
+-record(view,
+ {id_num,
+ map_names=[],
+ def,
+ btree=nil,
+ reduce_funs=[]
+ }).
+
+-record(server,{
+ root_dir = [],
+ dbname_regexp,
+ max_dbs_open=100,
+ current_dbs_open=0,
+ start_time=""
+ }).
+
+-record(index_header,
+ {seq=0,
+ purge_seq=0,
+ id_btree_state=nil,
+ view_states=nil
+ }).
% small value used in revision trees to indicate the revision isn't stored
-define(REV_MISSING, []).
diff --git a/src/couchdb/couch_db_updater.erl b/src/couchdb/couch_db_updater.erl
index 67a6f624..68c3a1fc 100644
--- a/src/couchdb/couch_db_updater.erl
+++ b/src/couchdb/couch_db_updater.erl
@@ -129,7 +129,7 @@ handle_call({purge_docs, IdRevs}, _From, Db) ->
Db#db{
fulldocinfo_by_id_btree = DocInfoByIdBTree2,
docinfo_by_seq_btree = DocInfoBySeqBTree2,
- update_seq = NewSeq,
+ update_seq = NewSeq + 1,
header=Header#db_header{purge_seq=PurgeSeq+1, purged_docs=Pointer}}),
ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}),
diff --git a/src/couchdb/couch_httpd.erl b/src/couchdb/couch_httpd.erl
index 7d621d25..0d489869 100644
--- a/src/couchdb/couch_httpd.erl
+++ b/src/couchdb/couch_httpd.erl
@@ -261,7 +261,7 @@ basic_username_pw(Req) ->
[User, Pass] ->
{User, Pass};
[User] ->
- {User, <<"">>};
+ {User, ""};
_ ->
nil
end;
diff --git a/src/couchdb/couch_httpd_view.erl b/src/couchdb/couch_httpd_view.erl
index 5b19af5d..f1c8616f 100644
--- a/src/couchdb/couch_httpd_view.erl
+++ b/src/couchdb/couch_httpd_view.erl
@@ -22,18 +22,19 @@
start_json_response/2,send_chunk/2,end_json_response/1]).
design_doc_view(Req, Db, Id, ViewName, Keys) ->
+ #view_query_args{
+ update = Update,
+ reduce = Reduce
+ } = QueryArgs = parse_view_query(Req, Keys),
case couch_view:get_map_view({couch_db:name(Db),
- <<"_design/", Id/binary>>, ViewName}) of
+ <<"_design/", Id/binary>>, ViewName, Update}) of
{ok, View} ->
- QueryArgs = parse_view_query(Req, Keys),
output_map_view(Req, View, Db, QueryArgs, Keys);
{not_found, Reason} ->
case couch_view:get_reduce_view({couch_db:name(Db),
<<"_design/", Id/binary>>, ViewName}) of
{ok, View} ->
- #view_query_args{
- reduce = Reduce
- } = QueryArgs = parse_view_query(Req, Keys, true),
+ parse_view_query(Req, Keys, true), % just for validation
case Reduce of
false ->
{reduce, _N, _Lang, MapView} = View,
diff --git a/src/couchdb/couch_server.erl b/src/couchdb/couch_server.erl
index 34aa16b7..08f71f2b 100644
--- a/src/couchdb/couch_server.erl
+++ b/src/couchdb/couch_server.erl
@@ -22,14 +22,6 @@
-include("couch_db.hrl").
--record(server,{
- root_dir = [],
- dbname_regexp,
- max_dbs_open=100,
- current_dbs_open=0,
- start_time=""
- }).
-
start() ->
start(["default.ini"]).
diff --git a/src/couchdb/couch_view.erl b/src/couchdb/couch_view.erl
index 4ebbb136..7c7730a7 100644
--- a/src/couchdb/couch_view.erl
+++ b/src/couchdb/couch_view.erl
@@ -13,45 +13,12 @@
-module(couch_view).
-behaviour(gen_server).
--export([start_link/0,fold/4,fold/5,less_json/2, start_update_loop/3, start_temp_update_loop/5]).
+-export([start_link/0,fold/4,fold/5,less_json/2,less_json_keys/2,expand_dups/2,detuple_kvs/2]).
-export([init/1,terminate/2,handle_call/3,handle_cast/2,handle_info/2,code_change/3]).
-export([get_reduce_view/1, get_map_view/1,get_row_count/1,reduce_to_count/1, fold_reduce/7]).
-include("couch_db.hrl").
-
--record(group,
- {sig=nil,
- db=nil,
- fd=nil,
- name,
- def_lang,
- views,
- id_btree=nil,
- current_seq=0,
- purge_seq=0,
- query_server=nil
- }).
-
--record(view,
- {id_num,
- map_names=[],
- def,
- btree=nil,
- reduce_funs=[]
- }).
-
--record(server,
- {root_dir
- }).
-
--record(index_header,
- {seq=0,
- purge_seq=0,
- id_btree_state=nil,
- view_states=nil
- }).
-
start_link() ->
gen_server:start_link({local, couch_view}, couch_view, [], []).
@@ -59,41 +26,30 @@ get_temp_updater(DbName, Type, MapSrc, RedSrc) ->
{ok, Pid} = gen_server:call(couch_view, {start_temp_updater, DbName, Type, MapSrc, RedSrc}),
Pid.
-get_updater(DbName, GroupId) ->
- {ok, Pid} = gen_server:call(couch_view, {start_updater, DbName, GroupId}),
+get_group_server(DbName, GroupId) ->
+ {ok, Pid} = gen_server:call(couch_view, {start_group_server, DbName, GroupId}),
Pid.
-get_updated_group(Pid) ->
- Mref = erlang:monitor(process, Pid),
- receive
- {'DOWN', Mref, _, _, Reason} ->
- throw(Reason)
- after 0 ->
- Pid ! {self(), get_updated},
- receive
- {Pid, Response} ->
- erlang:demonitor(Mref),
- receive
- {'DOWN', Mref, _, _, _} -> ok
- after 0 -> ok
- end,
- Response;
- {'DOWN', Mref, _, _, Reason} ->
- throw(Reason)
- end
- end.
+get_updated_group(DbName, GroupId, Update) ->
+ couch_view_group:request_group(get_group_server(DbName, GroupId), seq_for_update(DbName, Update)).
+
+get_updated_group(temp, DbName, Type, MapSrc, RedSrc, Update) ->
+ couch_view_group:request_group(get_temp_updater(DbName, Type, MapSrc, RedSrc), seq_for_update(DbName, Update)).
get_row_count(#view{btree=Bt}) ->
{ok, {Count, _Reds}} = couch_btree:full_reduce(Bt),
{ok, Count}.
get_reduce_view({temp, DbName, Type, MapSrc, RedSrc}) ->
- {ok, #group{views=[View]}} = get_updated_group(get_temp_updater(DbName, Type, MapSrc, RedSrc)),
+ {ok, #group{views=[View]}} = get_updated_group(temp, DbName, Type, MapSrc, RedSrc, true),
{ok, {temp_reduce, View}};
get_reduce_view({DbName, GroupId, Name}) ->
- {ok, #group{views=Views,def_lang=Lang}} =
- get_updated_group(get_updater(DbName, GroupId)),
- get_reduce_view0(Name, Lang, Views).
+ case get_updated_group(DbName, GroupId, true) of
+ {error, Reason} ->
+ Reason;
+ {ok, #group{views=Views,def_lang=Lang}} ->
+ get_reduce_view0(Name, Lang, Views)
+ end.
get_reduce_view0(_Name, _Lang, []) ->
{not_found, missing_named_view};
@@ -153,13 +109,26 @@ get_key_pos(Key, [{Key1,_Value}|_], N) when Key == Key1 ->
N + 1;
get_key_pos(Key, [_|Rest], N) ->
get_key_pos(Key, Rest, N+1).
+
+seq_for_update(DbName, Update) ->
+ case Update of
+ true ->
+ {ok, #db{update_seq=CurrentSeq}} = couch_db:open(DbName, []),
+ CurrentSeq;
+ _Else ->
+ 0
+ end.
get_map_view({temp, DbName, Type, Src}) ->
- {ok, #group{views=[View]}} = get_updated_group(get_temp_updater(DbName, Type, Src, [])),
+ {ok, #group{views=[View]}} = get_updated_group(temp, DbName, Type, Src, [], true),
{ok, View};
-get_map_view({DbName, GroupId, Name}) ->
- {ok, #group{views=Views}} = get_updated_group(get_updater(DbName, GroupId)),
- get_map_view0(Name, Views).
+get_map_view({DbName, GroupId, Name, Update}) ->
+ case get_updated_group(DbName, GroupId, Update) of
+ {error, Reason} ->
+ Reason;
+ {ok, #group{views=Views}} ->
+ get_map_view0(Name, Views)
+ end.
get_map_view0(_Name, []) ->
{not_found, missing_named_view};
@@ -183,37 +152,6 @@ reduce_to_count(Reductions) ->
Count.
-design_doc_to_view_group(#doc{id=Id,body={Fields}}) ->
- Language = proplists:get_value(<<"language">>, Fields, <<"javascript">>),
- {RawViews} = proplists:get_value(<<"views">>, Fields, {[]}),
-
- % add the views to a dictionary object, with the map source as the key
- DictBySrc =
- lists:foldl(
- fun({Name, {MRFuns}}, DictBySrcAcc) ->
- MapSrc = proplists:get_value(<<"map">>, MRFuns),
- RedSrc = proplists:get_value(<<"reduce">>, MRFuns, null),
- View =
- case dict:find(MapSrc, DictBySrcAcc) of
- {ok, View0} -> View0;
- error -> #view{def=MapSrc} % create new view object
- end,
- View2 =
- if RedSrc == null ->
- View#view{map_names=[Name|View#view.map_names]};
- true ->
- View#view{reduce_funs=[{Name,RedSrc}|View#view.reduce_funs]}
- end,
- dict:store(MapSrc, View2, DictBySrcAcc)
- end, dict:new(), RawViews),
- % number the views
- {Views, _N} = lists:mapfoldl(
- fun({_Src, View}, N) ->
- {View#view{id_num=N},N+1}
- end, 0, dict:to_list(DictBySrc)),
-
- Group = #group{name=Id, views=Views, def_lang=Language},
- Group#group{sig=erlang:md5(term_to_binary(Group))}.
fold_fun(_Fun, [], _, Acc) ->
{ok, Acc};
@@ -253,10 +191,10 @@ init([]) ->
(_Else) ->
ok
end),
- ets:new(couch_views_by_db, [bag, private, named_table]),
- ets:new(couch_views_by_name, [set, protected, named_table]),
- ets:new(couch_views_by_updater, [set, private, named_table]),
- ets:new(couch_views_temp_fd_by_db, [set, protected, named_table]),
+ ets:new(couch_groups_by_db, [bag, private, named_table]),
+ ets:new(group_servers_by_name, [set, protected, named_table]),
+ ets:new(couch_groups_by_updater, [set, private, named_table]),
+ ets:new(couch_temp_group_fd_by_db, [set, protected, named_table]),
process_flag(trap_exit, true),
{ok, #server{root_dir=RootDir}}.
@@ -268,9 +206,9 @@ handle_call({start_temp_updater, DbName, Lang, MapSrc, RedSrc}, _From, #server{r
<<SigInt:128/integer>> = erlang:md5(term_to_binary({Lang, MapSrc, RedSrc})),
Name = lists:flatten(io_lib:format("_temp_~.36B",[SigInt])),
Pid =
- case ets:lookup(couch_views_by_name, {DbName, Name}) of
+ case ets:lookup(group_servers_by_name, {DbName, Name}) of
[] ->
- case ets:lookup(couch_views_temp_fd_by_db, DbName) of
+ case ets:lookup(couch_temp_group_fd_by_db, DbName) of
[] ->
FileName = Root ++ "/." ++ binary_to_list(DbName) ++ "_temp",
{ok, Fd} = couch_file:open(FileName, [create, overwrite]),
@@ -279,21 +217,20 @@ handle_call({start_temp_updater, DbName, Lang, MapSrc, RedSrc}, _From, #server{r
ok
end,
?LOG_DEBUG("Spawning new temp update process for db ~s.", [DbName]),
- NewPid = spawn_link(couch_view, start_temp_update_loop,
- [DbName, Fd, Lang, MapSrc, RedSrc]),
- true = ets:insert(couch_views_temp_fd_by_db, {DbName, Fd, Count + 1}),
+ {ok, NewPid} = couch_view_group:start_link({DbName, Fd, Lang, MapSrc, RedSrc}),
+ true = ets:insert(couch_temp_group_fd_by_db, {DbName, Fd, Count + 1}),
add_to_ets(NewPid, DbName, Name),
NewPid;
[{_, ExistingPid0}] ->
ExistingPid0
end,
{reply, {ok, Pid}, Server};
-handle_call({start_updater, DbName, GroupId}, _From, #server{root_dir=Root}=Server) ->
+handle_call({start_group_server, DbName, GroupId}, _From, #server{root_dir=Root}=Server) ->
Pid =
- case ets:lookup(couch_views_by_name, {DbName, GroupId}) of
+ case ets:lookup(group_servers_by_name, {DbName, GroupId}) of
[] ->
- ?LOG_DEBUG("Spawning new update process for view group ~s in database ~s.", [GroupId, DbName]),
- NewPid = spawn_link(couch_view, start_update_loop, [Root, DbName, GroupId]),
+ ?LOG_DEBUG("Spawning new group server for view group ~s in database ~s.", [GroupId, DbName]),
+ {ok, NewPid} = couch_view_group:start_link({Root, DbName, GroupId}),
add_to_ets(NewPid, DbName, GroupId),
NewPid;
[{_, ExistingPid0}] ->
@@ -303,11 +240,11 @@ handle_call({start_updater, DbName, GroupId}, _From, #server{root_dir=Root}=Serv
handle_cast({reset_indexes, DbName}, #server{root_dir=Root}=Server) ->
% shutdown all the updaters
- Names = ets:lookup(couch_views_by_db, DbName),
+ Names = ets:lookup(couch_groups_by_db, DbName),
lists:foreach(
fun({_DbName, GroupId}) ->
?LOG_DEBUG("Killing update process for view group ~s. in database ~s.", [GroupId, DbName]),
- [{_, Pid}] = ets:lookup(couch_views_by_name, {DbName, GroupId}),
+ [{_, Pid}] = ets:lookup(group_servers_by_name, {DbName, GroupId}),
exit(Pid, kill),
receive {'EXIT', Pid, _} ->
delete_from_ets(Pid, DbName, GroupId)
@@ -318,22 +255,23 @@ handle_cast({reset_indexes, DbName}, #server{root_dir=Root}=Server) ->
{noreply, Server}.
handle_info({'EXIT', _FromPid, normal}, Server) ->
- {noreply, Server};
+ {noreply, Server};
handle_info({'EXIT', FromPid, Reason}, #server{root_dir=RootDir}=Server) ->
- case ets:lookup(couch_views_by_updater, FromPid) of
+ ?LOG_DEBUG("Exit from process: ~p", [{FromPid, Reason}]),
+ case ets:lookup(couch_groups_by_updater, FromPid) of
[] -> % non-updater linked process must have died, we propagate the error
?LOG_ERROR("Exit on non-updater process: ~p", [Reason]),
exit(Reason);
[{_, {DbName, "_temp_" ++ _ = GroupId}}] ->
delete_from_ets(FromPid, DbName, GroupId),
- [{_, Fd, Count}] = ets:lookup(couch_views_temp_fd_by_db, DbName),
+ [{_, Fd, Count}] = ets:lookup(couch_temp_group_fd_by_db, DbName),
case Count of
1 -> % Last ref
couch_file:close(Fd),
file:delete(RootDir ++ "/." ++ binary_to_list(DbName) ++ "_temp"),
- true = ets:delete(couch_views_temp_fd_by_db, DbName);
+ true = ets:delete(couch_temp_group_fd_by_db, DbName);
_ ->
- true = ets:insert(couch_views_temp_fd_by_db, {DbName, Fd, Count - 1})
+ true = ets:insert(couch_temp_group_fd_by_db, {DbName, Fd, Count - 1})
end;
[{_, {DbName, GroupId}}] ->
delete_from_ets(FromPid, DbName, GroupId)
@@ -344,225 +282,21 @@ handle_info(Msg, _Server) ->
exit({error, Msg}).
add_to_ets(Pid, DbName, GroupId) ->
- true = ets:insert(couch_views_by_updater, {Pid, {DbName, GroupId}}),
- true = ets:insert(couch_views_by_name, {{DbName, GroupId}, Pid}),
- true = ets:insert(couch_views_by_db, {DbName, GroupId}).
+ true = ets:insert(couch_groups_by_updater, {Pid, {DbName, GroupId}}),
+ true = ets:insert(group_servers_by_name, {{DbName, GroupId}, Pid}),
+ true = ets:insert(couch_groups_by_db, {DbName, GroupId}).
delete_from_ets(Pid, DbName, GroupId) ->
- true = ets:delete(couch_views_by_updater, Pid),
- true = ets:delete(couch_views_by_name, {DbName, GroupId}),
- true = ets:delete_object(couch_views_by_db, {DbName, GroupId}).
+ true = ets:delete(couch_groups_by_updater, Pid),
+ true = ets:delete(group_servers_by_name, {DbName, GroupId}),
+ true = ets:delete_object(couch_groups_by_db, {DbName, GroupId}).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-start_temp_update_loop(DbName, Fd, Lang, MapSrc, RedSrc) ->
- NotifyPids = get_notify_pids(1000),
- case couch_db:open(DbName, []) of
- {ok, Db} ->
- View = #view{map_names=["_temp"],
- id_num=0,
- btree=nil,
- def=MapSrc,
- reduce_funs= if RedSrc==[] -> []; true -> [{"_temp", RedSrc}] end},
- Group = #group{name="_temp",
- db=Db,
- views=[View],
- current_seq=0,
- def_lang=Lang,
- id_btree=nil},
- Group2 = init_group(Db, Fd, Group,nil),
- couch_db:monitor(Db),
- couch_db:close(Db),
- temp_update_loop(DbName, Group2, NotifyPids);
- Else ->
- exit(Else)
- end.
-temp_update_loop(DbName, Group, NotifyPids) ->
- {ok, Db} = couch_db:open(DbName, []),
- {_Updated, Group2} = update_group(Group#group{db=Db}),
- couch_db:close(Db),
- [Pid ! {self(), {ok, Group2}} || Pid <- NotifyPids],
- garbage_collect(),
- temp_update_loop(DbName, Group2, get_notify_pids(10000)).
-
-
-reset_group(#group{views=Views}=Group) ->
- Views2 = [View#view{btree=nil} || View <- Views],
- Group#group{db=nil,fd=nil,query_server=nil,current_seq=0,
- id_btree=nil,views=Views2}.
-
-start_update_loop(RootDir, DbName, GroupId) ->
- % wait for a notify request before doing anything. This way, we can just
- % exit and any exits will be noticed by the callers.
- start_update_loop(RootDir, DbName, GroupId, get_notify_pids(1000)).
-
-start_update_loop(RootDir, DbName, GroupId, NotifyPids) ->
- {Db, Group} =
- case (catch couch_db:open(DbName, [])) of
- {ok, Db0} ->
- case (catch couch_db:open_doc(Db0, GroupId)) of
- {ok, Doc} ->
- {Db0, design_doc_to_view_group(Doc)};
- Else ->
- delete_index_file(RootDir, DbName, GroupId),
- exit(Else)
- end;
- Else ->
- delete_index_file(RootDir, DbName, GroupId),
- exit(Else)
- end,
- FileName = RootDir ++ "/." ++ binary_to_list(DbName) ++
- binary_to_list(GroupId) ++".view",
- Group2 =
- case couch_file:open(FileName) of
- {ok, Fd} ->
- Sig = Group#group.sig,
- case (catch couch_file:read_header(Fd, <<$r, $c, $k, 0>>)) of
- {ok, {Sig, #index_header{purge_seq=PurgeSeq}=HeaderInfo}} ->
- % sigs match!
- DbPurgeSeq = couch_db:get_purge_seq(Db),
- case (PurgeSeq == DbPurgeSeq) or ((PurgeSeq + 1) == DbPurgeSeq) of
- true ->
- % We can only use index with the same, or next purge seq as the
- % db.
- init_group(Db, Fd, Group, HeaderInfo);
- false ->
- reset_file(Db, Fd, DbName, Group)
- end;
- _ ->
- reset_file(Db, Fd, DbName, Group)
- end;
- {error, enoent} ->
- case couch_file:open(FileName, [create]) of
- {ok, Fd} -> reset_file(Db, Fd, DbName, Group);
- Error -> throw(Error)
- end
- end,
-
- couch_db:monitor(Db),
- couch_db:close(Db),
- update_loop(RootDir, DbName, GroupId, Group2, NotifyPids).
-
-reset_file(Db, Fd, DbName, #group{sig=Sig,name=Name} = Group) ->
- ?LOG_DEBUG("Reseting group index \"~s\" in db ~s", [Name, DbName]),
- ok = couch_file:truncate(Fd, 0),
- ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, {Sig, nil}),
- init_group(Db, Fd, reset_group(Group), nil).
-
-update_loop(RootDir, DbName, GroupId, #group{sig=Sig,fd=Fd}=Group, NotifyPids) ->
- {ok, Db}= couch_db:open(DbName, []),
- Result =
- try
- update_group(Group#group{db=Db})
- catch
- throw: restart -> restart
- after
- couch_db:close(Db)
- end,
- case Result of
- {same, Group2} ->
- [Pid ! {self(), {ok, Group2}} || Pid <- NotifyPids],
- update_loop(RootDir, DbName, GroupId, Group2, get_notify_pids(100000));
- {updated, Group2} ->
- HeaderData = {Sig, get_index_header_data(Group2)},
- ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, HeaderData),
- [Pid ! {self(), {ok, Group2}} || Pid <- NotifyPids],
- garbage_collect(),
- update_loop(RootDir, DbName, GroupId, Group2, get_notify_pids(100000));
- restart ->
- couch_file:close(Group#group.fd),
- start_update_loop(RootDir, DbName, GroupId, NotifyPids ++ get_notify_pids())
- end.
-% wait for the first request to come in.
-get_notify_pids(Wait) ->
- receive
- {Pid, get_updated} ->
- [Pid | get_notify_pids()];
- {'DOWN', _MonitorRef, _Type, _Pid, _Info} ->
- ?LOG_DEBUG("View monitor received parent db shutdown notification. Shutting down instance.", []),
- exit(db_shutdown);
- Else ->
- ?LOG_ERROR("Unexpected message in view updater: ~p", [Else]),
- exit({error, Else})
- after Wait ->
- exit(wait_timeout)
- end.
-% then keep getting all available and return.
-get_notify_pids() ->
- receive
- {Pid, get_updated} ->
- [Pid | get_notify_pids()]
- after 0 ->
- []
- end.
-
-purge(#group{db=Db, views=Views, id_btree=IdBtree}=Group) ->
- {ok, PurgedIdsRevs} = couch_db:get_last_purged(Db),
- Ids = [Id || {Id, _Revs} <- PurgedIdsRevs],
- {ok, Lookups, IdBtree2} = couch_btree:query_modify(IdBtree, Ids, [], Ids),
-
- % now populate the dictionary with all the keys to delete
- ViewKeysToRemoveDict = lists:foldl(
- fun({ok,{DocId,ViewNumRowKeys}}, ViewDictAcc) ->
- lists:foldl(
- fun({ViewNum, RowKey}, ViewDictAcc2) ->
- dict:append(ViewNum, {RowKey, DocId}, ViewDictAcc2)
- end, ViewDictAcc, ViewNumRowKeys);
- ({not_found, _}, ViewDictAcc) ->
- ViewDictAcc
- end, dict:new(), Lookups),
-
- % Now remove the values from the btrees
- Views2 = lists:map(
- fun(#view{id_num=Num,btree=Btree}=View) ->
- case dict:find(Num, ViewKeysToRemoveDict) of
- {ok, RemoveKeys} ->
- {ok, Btree2} = couch_btree:add_remove(Btree, [], RemoveKeys),
- View#view{btree=Btree2};
- error -> % no keys to remove in this view
- View
- end
- end, Views),
- Group#group{id_btree=IdBtree2,
- views=Views2,
- purge_seq=couch_db:get_purge_seq(Db)}.
-
-
-update_group(#group{db=Db,current_seq=CurrentSeq,
- purge_seq=GroupPurgeSeq}=Group) ->
- ViewEmptyKVs = [{View, []} || View <- Group#group.views],
- % compute on all docs modified since we last computed.
- DbPurgeSeq = couch_db:get_purge_seq(Db),
- Group2 =
- case DbPurgeSeq of
- GroupPurgeSeq ->
- Group;
- DbPurgeSeq when GroupPurgeSeq + 1 == DbPurgeSeq ->
- purge(Group);
- _ ->
- throw(restart)
- end,
- {ok, {UncomputedDocs, Group3, ViewKVsToAdd, DocIdViewIdKeys, NewSeq}}
- = couch_db:enum_docs_since(
- Db,
- CurrentSeq,
- fun(DocInfo, _, Acc) -> process_doc(Db, DocInfo, Acc) end,
- {[], Group2, ViewEmptyKVs, [], CurrentSeq}
- ),
- {Group4, Results} = view_compute(Group3, UncomputedDocs),
- {ViewKVsToAdd2, DocIdViewIdKeys2} = view_insert_query_results(UncomputedDocs, Results, ViewKVsToAdd, DocIdViewIdKeys),
- couch_query_servers:stop_doc_map(Group4#group.query_server),
- if CurrentSeq /= NewSeq ->
- {ok, Group5} = write_changes(Group4, ViewKVsToAdd2, DocIdViewIdKeys2, NewSeq),
- {updated, Group5#group{query_server=nil}};
- true ->
- {same, Group4#group{query_server=nil}}
- end.
-
delete_index_dir(RootDir, DbName) ->
nuke_dir(RootDir ++ "/." ++ binary_to_list(DbName) ++ "_design").
@@ -583,50 +317,6 @@ nuke_dir(Dir) ->
ok = file:del_dir(Dir)
end.
-delete_index_file(RootDir, DbName, GroupId) ->
- file:delete(RootDir ++ "/." ++ binary_to_list(DbName)
- ++ binary_to_list(GroupId) ++ ".view").
-
-init_group(Db, Fd, #group{views=Views}=Group, nil) ->
- init_group(Db, Fd, Group,
- #index_header{seq=0, purge_seq=couch_db:get_purge_seq(Db),
- id_btree_state=nil, view_states=[nil || _ <- Views]});
-init_group(Db, Fd, #group{def_lang=Lang,views=Views}=Group, IndexHeader) ->
- #index_header{seq=Seq, purge_seq=PurgeSeq,
- id_btree_state=IdBtreeState, view_states=ViewStates} = IndexHeader,
- {ok, IdBtree} = couch_btree:open(IdBtreeState, Fd),
- Views2 = lists:zipwith(
- fun(BtreeState, #view{reduce_funs=RedFuns}=View) ->
- FunSrcs = [FunSrc || {_Name, FunSrc} <- RedFuns],
- ReduceFun =
- fun(reduce, KVs) ->
- KVs2 = expand_dups(KVs,[]),
- KVs3 = detuple_kvs(KVs2,[]),
- {ok, Reduced} = couch_query_servers:reduce(Lang, FunSrcs, KVs3),
- {length(KVs3), Reduced};
- (rereduce, Reds) ->
- Count = lists:sum([Count0 || {Count0, _} <- Reds]),
- UserReds = [UserRedsList || {_, UserRedsList} <- Reds],
- {ok, Reduced} = couch_query_servers:rereduce(Lang, FunSrcs, UserReds),
- {Count, Reduced}
- end,
- {ok, Btree} = couch_btree:open(BtreeState, Fd,
- [{less, fun less_json_keys/2},{reduce, ReduceFun}]),
- View#view{btree=Btree}
- end,
- ViewStates, Views),
- Group#group{db=Db, fd=Fd, current_seq=Seq, purge_seq=PurgeSeq,
- id_btree=IdBtree, views=Views2}.
-
-
-get_index_header_data(#group{current_seq=Seq, purge_seq=PurgeSeq,
- id_btree=IdBtree,views=Views}) ->
- ViewStates = [couch_btree:get_state(Btree) || #view{btree=Btree} <- Views],
- #index_header{seq=Seq,
- purge_seq=PurgeSeq,
- id_btree_state=couch_btree:get_state(IdBtree),
- view_states=ViewStates}.
-
% keys come back in the language of btree - tuples.
less_json_keys(A, B) ->
less_json(tuple_to_list(A), tuple_to_list(B)).
@@ -703,129 +393,4 @@ less_list([A|RestA], [B|RestB]) ->
end
end.
-process_doc(Db, DocInfo, {Docs, #group{sig=Sig,name=GroupId}=Group, ViewKVs, DocIdViewIdKeys, _LastSeq}) ->
- % This fun computes once for each document
- #doc_info{id=DocId, update_seq=Seq, deleted=Deleted} = DocInfo,
- case DocId of
- GroupId ->
- % uh oh. this is the design doc with our definitions. See if
- % anything in the definition changed.
- case couch_db:open_doc(Db, DocInfo) of
- {ok, Doc} ->
- case design_doc_to_view_group(Doc) of
- #group{sig=Sig} ->
- % The same md5 signature, keep on computing
- {ok, {Docs, Group, ViewKVs, DocIdViewIdKeys, Seq}};
- _ ->
- throw(restart)
- end;
- {not_found, deleted} ->
- throw(restart)
- end;
- <<?DESIGN_DOC_PREFIX, _>> -> % we skip design docs
- {ok, {Docs, Group, ViewKVs, DocIdViewIdKeys, Seq}};
- _ ->
- {Docs2, DocIdViewIdKeys2} =
- if Deleted ->
- {Docs, [{DocId, []} | DocIdViewIdKeys]};
- true ->
- {ok, Doc} = couch_db:open_doc(Db, DocInfo, [conflicts, deleted_conflicts]),
- {[Doc | Docs], DocIdViewIdKeys}
- end,
- case couch_util:should_flush() of
- true ->
- {Group1, Results} = view_compute(Group, Docs2),
- {ViewKVs3, DocIdViewIdKeys3} = view_insert_query_results(Docs2, Results, ViewKVs, DocIdViewIdKeys2),
- {ok, Group2} = write_changes(Group1, ViewKVs3, DocIdViewIdKeys3, Seq),
- garbage_collect(),
- ViewEmptyKeyValues = [{View, []} || View <- Group2#group.views],
- {ok, {[], Group2, ViewEmptyKeyValues, [], Seq}};
- false ->
- {ok, {Docs2, Group, ViewKVs, DocIdViewIdKeys2, Seq}}
- end
- end.
-view_insert_query_results([], [], ViewKVs, DocIdViewIdKeysAcc) ->
- {ViewKVs, DocIdViewIdKeysAcc};
-view_insert_query_results([Doc|RestDocs], [QueryResults | RestResults], ViewKVs, DocIdViewIdKeysAcc) ->
- {NewViewKVs, NewViewIdKeys} = view_insert_doc_query_results(Doc, QueryResults, ViewKVs, [], []),
- NewDocIdViewIdKeys = [{Doc#doc.id, NewViewIdKeys} | DocIdViewIdKeysAcc],
- view_insert_query_results(RestDocs, RestResults, NewViewKVs, NewDocIdViewIdKeys).
-
-
-view_insert_doc_query_results(_Doc, [], [], ViewKVsAcc, ViewIdKeysAcc) ->
- {lists:reverse(ViewKVsAcc), lists:reverse(ViewIdKeysAcc)};
-view_insert_doc_query_results(#doc{id=DocId}=Doc, [ResultKVs|RestResults], [{View, KVs}|RestViewKVs], ViewKVsAcc, ViewIdKeysAcc) ->
- % Take any identical keys and combine the values
- ResultKVs2 = lists:foldl(
- fun({Key,Value}, [{PrevKey,PrevVal}|AccRest]) ->
- case Key == PrevKey of
- true ->
- case PrevVal of
- {dups, Dups} ->
- [{PrevKey, {dups, [Value|Dups]}} | AccRest];
- _ ->
- [{PrevKey, {dups, [Value,PrevVal]}} | AccRest]
- end;
- false ->
- [{Key,Value},{PrevKey,PrevVal}|AccRest]
- end;
- (KV, []) ->
- [KV]
- end, [], lists:sort(ResultKVs)),
- NewKVs = [{{Key, DocId}, Value} || {Key, Value} <- ResultKVs2],
- NewViewKVsAcc = [{View, NewKVs ++ KVs} | ViewKVsAcc],
- NewViewIdKeys = [{View#view.id_num, Key} || {Key, _Value} <- ResultKVs2],
- NewViewIdKeysAcc = NewViewIdKeys ++ ViewIdKeysAcc,
- view_insert_doc_query_results(Doc, RestResults, RestViewKVs, NewViewKVsAcc, NewViewIdKeysAcc).
-
-view_compute(Group, []) ->
- {Group, []};
-view_compute(#group{def_lang=DefLang, query_server=QueryServerIn}=Group, Docs) ->
- {ok, QueryServer} =
- case QueryServerIn of
- nil -> % doc map not started
- Definitions = [View#view.def || View <- Group#group.views],
- couch_query_servers:start_doc_map(DefLang, Definitions);
- _ ->
- {ok, QueryServerIn}
- end,
- {ok, Results} = couch_query_servers:map_docs(QueryServer, Docs),
- {Group#group{query_server=QueryServer}, Results}.
-
-
-
-write_changes(Group, ViewKeyValuesToAdd, DocIdViewIdKeys, NewSeq) ->
- #group{id_btree=IdBtree} = Group,
-
- AddDocIdViewIdKeys = [{DocId, ViewIdKeys} || {DocId, ViewIdKeys} <- DocIdViewIdKeys, ViewIdKeys /= []],
- RemoveDocIds = [DocId || {DocId, ViewIdKeys} <- DocIdViewIdKeys, ViewIdKeys == []],
- LookupDocIds = [DocId || {DocId, _ViewIdKeys} <- DocIdViewIdKeys],
- {ok, LookupResults, IdBtree2}
- = couch_btree:query_modify(IdBtree, LookupDocIds, AddDocIdViewIdKeys, RemoveDocIds),
- KeysToRemoveByView = lists:foldl(
- fun(LookupResult, KeysToRemoveByViewAcc) ->
- case LookupResult of
- {ok, {DocId, ViewIdKeys}} ->
- lists:foldl(
- fun({ViewId, Key}, KeysToRemoveByViewAcc2) ->
- dict:append(ViewId, {Key, DocId}, KeysToRemoveByViewAcc2)
- end,
- KeysToRemoveByViewAcc, ViewIdKeys);
- {not_found, _} ->
- KeysToRemoveByViewAcc
- end
- end,
- dict:new(), LookupResults),
-
- Views2 = [
- begin
- KeysToRemove = couch_util:dict_find(View#view.id_num, KeysToRemoveByView, []),
- {ok, ViewBtree2} = couch_btree:add_remove(View#view.btree, AddKeyValues, KeysToRemove),
- View#view{btree = ViewBtree2}
- end
- ||
- {View, AddKeyValues} <- ViewKeyValuesToAdd
- ],
- Group2 = Group#group{views=Views2, current_seq=NewSeq, id_btree=IdBtree2},
- {ok, Group2}.
diff --git a/src/couchdb/couch_view_group.erl b/src/couchdb/couch_view_group.erl
new file mode 100644
index 00000000..84547095
--- /dev/null
+++ b/src/couchdb/couch_view_group.erl
@@ -0,0 +1,192 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_view_group).
+-behaviour(gen_server).
+
+%% API
+-export([start_link/1, request_group/2]).
+% -export([design_doc_to_view_group/1]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-include("couch_db.hrl").
+
+-record(group_state, {
+ spawn_fun,
+ target_seq=0,
+ group_seq=0,
+ group=nil,
+ updater_pid=nil,
+ waiting_list=[]
+}).
+
+% api methods
+request_group(Pid, Seq) ->
+ ?LOG_DEBUG("request_group {Pid, Seq} ~p", [{Pid, Seq}]),
+ case gen_server:call(Pid, {request_group, Seq}, infinity) of
+ {ok, Group} ->
+ ?LOG_DEBUG("get_updated_group replied with group", []),
+ {ok, Group};
+ Else ->
+ ?LOG_DEBUG("get_updated_group replied with _Else ~p", [Else]),
+ Else
+ end.
+
+
+% from template
+start_link(InitArgs) ->
+ gen_server:start_link(couch_view_group, InitArgs, []).
+
+% init differentiates between temp and design_doc views. It creates a closure
+% which spawns the appropriate view_updater. (It might also spawn the first
+% view_updater run.)
+init(InitArgs) ->
+ SpawnFun = fun() -> spawn_updater(InitArgs) end,
+ process_flag(trap_exit, true),
+ {ok, #group_state{spawn_fun=SpawnFun}}.
+
+% There are two sources of messages: couch_view, which requests an up to date
+% view group, and the couch_view_updater, which when spawned, updates the
+% group and sends it back here. We employ a caching mechanism, so that between
+% database writes, we don't have to spawn a couch_view_updater with every view
+% request. This should give us more control, and the ability to request view
+% statuses eventually.
+
+% The caching mechanism: each request is submitted with a seq_id for the
+% database at the time it was read. We guarantee to return a view from that
+% sequence or newer.
+
+% If the request sequence is higher than our current high_target seq, we set
+% that as the highest seqence. If the updater is not running, we launch it.
+
+handle_call({request_group, RequestSeq}, From,
+ #group_state{
+ target_seq=TargetSeq,
+ spawn_fun=SpawnFun,
+ updater_pid=Up,
+ waiting_list=WaitList
+ }=State) when RequestSeq > TargetSeq, Up == nil ->
+ UpdaterPid = SpawnFun(),
+ {noreply, State#group_state{
+ target_seq=RequestSeq,
+ updater_pid=UpdaterPid,
+ waiting_list=[{From,RequestSeq}|WaitList]
+ }, infinity};
+
+handle_call({request_group, RequestSeq}, From,
+ #group_state{
+ target_seq=TargetSeq,
+ waiting_list=WaitList
+ }=State) when RequestSeq > TargetSeq ->
+ {noreply, State#group_state{
+ target_seq=RequestSeq,
+ waiting_list=[{From,RequestSeq}|WaitList]
+ }, infinity};
+
+
+% If the request seqence is less than or equal to the seq_id of a known Group,
+% we respond with that Group.
+handle_call({request_group, RequestSeq}, _From,
+ State=#group_state{
+ group_seq=GroupSeq,
+ group=Group
+ }) when RequestSeq =< GroupSeq ->
+ {reply, {ok, Group}, State};
+
+% Otherwise: TargetSeq => RequestSeq > GroupSeq
+% We've already initiated the appropriate action, so just hold the response until the group is up to the RequestSeq
+handle_call({request_group, RequestSeq}, From,
+ #group_state{
+ waiting_list=WaitList
+ }=State) ->
+ {noreply, State#group_state{
+ waiting_list=[{From, RequestSeq}|WaitList]
+ }, infinity}.
+
+
+% When the updater finishes, it will return a group with a seq_id, we should
+% store that group and seq_id in our state. If our high_target is higher than
+% the returned group, start a new updater.
+
+handle_cast({new_group, Group=#group{current_seq=NewGroupSeq}},
+ State=#group_state{
+ target_seq=TargetSeq,
+ waiting_list=WaitList,
+ spawn_fun=SpawnFun}) when TargetSeq > NewGroupSeq ->
+ StillWaiting = reply_with_group(Group, WaitList, []),
+ UpdaterPid = SpawnFun(),
+ {noreply, State#group_state{
+ updater_pid=UpdaterPid,
+ waiting_list=StillWaiting,
+ group_seq=NewGroupSeq,
+ group=Group}};
+
+handle_cast({new_group, Group=#group{current_seq=NewGroupSeq}},
+ State=#group_state{waiting_list=WaitList}) ->
+ StillWaiting = reply_with_group(Group, WaitList, []),
+ {noreply, State#group_state{
+ updater_pid=nil,
+ waiting_list=StillWaiting,
+ group_seq=NewGroupSeq,
+ group=Group}}.
+
+handle_info({'EXIT', _FromPid, normal}, State) ->
+ {noreply, State};
+
+handle_info({'EXIT', FromPid, Reason}, State) ->
+ ?LOG_DEBUG("Exit from updater: ~p", [{FromPid, Reason}]),
+ {stop, Reason, State};
+
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+terminate(Reason, _State=#group_state{waiting_list=WaitList}) ->
+ lists:foreach(fun({Waiter, _}) -> gen_server:reply(Waiter, {error, Reason}) end, WaitList),
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+% error handling? the updater could die on us, we can save ourselves here.
+% but we shouldn't, we could be dead for a reason, like the view got changed, or something.
+
+
+%% Local Functions
+
+% reply_with_group/3
+% for each item in the WaitingList {Pid, Seq}
+% if the Seq is =< GroupSeq, reply
+reply_with_group(Group=#group{current_seq=GroupSeq}, [{Pid, Seq}|WaitList], StillWaiting) when Seq =< GroupSeq ->
+ gen_server:reply(Pid, {ok, Group}),
+ reply_with_group(Group, WaitList, StillWaiting);
+
+% else
+% put it in the continuing waiting list
+reply_with_group(Group, [{Pid, Seq}|WaitList], StillWaiting) ->
+ reply_with_group(Group, WaitList, [{Pid, Seq}|StillWaiting]);
+
+% return the still waiting list
+reply_with_group(_Group, [], StillWaiting) ->
+ StillWaiting.
+
+spawn_updater({RootDir, DbName, GroupId}) ->
+ spawn_link(couch_view_updater, update,
+ [RootDir, DbName, GroupId, self()]);
+
+spawn_updater({DbName, Fd, Lang, MapSrc, RedSrc}) ->
+ spawn_link(couch_view_updater, temp_update,
+ [DbName, Fd, Lang, MapSrc, RedSrc, self()]).
+
+
diff --git a/src/couchdb/couch_view_updater.erl b/src/couchdb/couch_view_updater.erl
new file mode 100644
index 00000000..7f40af3b
--- /dev/null
+++ b/src/couchdb/couch_view_updater.erl
@@ -0,0 +1,384 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_view_updater).
+
+-export([update/4, temp_update/6]).
+
+-include("couch_db.hrl").
+
+
+
+update(RootDir, DbName, GroupId, NotifyPid) ->
+ {ok, #group{sig=Sig,fd=Fd}=Group} = prepare_group(RootDir, DbName, GroupId),
+ {ok, Db} = couch_db:open(DbName, []),
+ Result = update_group(Group#group{db=Db}),
+ ?LOG_DEBUG("update {Result} DONE ~p", [{Result}]),
+ couch_db:close(Db),
+ case Result of
+ {same, Group2} ->
+ gen_server:cast(NotifyPid, {new_group, Group2});
+ {updated, Group2} ->
+ HeaderData = {Sig, get_index_header_data(Group2)},
+ ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, HeaderData),
+ gen_server:cast(NotifyPid, {new_group, Group2})
+ end,
+ garbage_collect().
+
+temp_update(DbName, Fd, Lang, MapSrc, RedSrc, NotifyPid) ->
+ case couch_db:open(DbName, []) of
+ {ok, Db} ->
+ View = #view{map_names=["_temp"],
+ id_num=0,
+ btree=nil,
+ def=MapSrc,
+ reduce_funs= if RedSrc==[] -> []; true -> [{"_temp", RedSrc}] end},
+ Group = #group{name="_temp",
+ db=Db,
+ views=[View],
+ current_seq=0,
+ def_lang=Lang,
+ id_btree=nil},
+ Group2 = init_group(Db, Fd, Group,nil),
+ couch_db:monitor(Db),
+ {_Updated, Group3} = update_group(Group2#group{db=Db}),
+ couch_db:close(Db),
+ gen_server:cast(NotifyPid, {new_group, Group3}),
+ garbage_collect();
+ Else ->
+ exit(Else)
+ end.
+
+
+update_group(#group{db=Db,current_seq=CurrentSeq}=Group) ->
+ ViewEmptyKVs = [{View, []} || View <- Group#group.views],
+ % compute on all docs modified since we last computed.
+ {ok, {UncomputedDocs, Group3, ViewKVsToAdd, DocIdViewIdKeys}}
+ = couch_db:enum_docs_since(
+ Db,
+ CurrentSeq,
+ fun(DocInfo, _, Acc) -> process_doc(Db, DocInfo, Acc) end,
+ {[], Group, ViewEmptyKVs, []}
+ ),
+ {Group4, Results} = view_compute(Group3, UncomputedDocs),
+ {ViewKVsToAdd2, DocIdViewIdKeys2} = view_insert_query_results(UncomputedDocs, Results, ViewKVsToAdd, DocIdViewIdKeys),
+ couch_query_servers:stop_doc_map(Group4#group.query_server),
+ NewSeq = couch_db:get_update_seq(Db),
+ if CurrentSeq /= NewSeq ->
+ {ok, Group5} = write_changes(Group4, ViewKVsToAdd2, DocIdViewIdKeys2, NewSeq),
+ {updated, Group5#group{query_server=nil}};
+ true ->
+ {same, Group4#group{query_server=nil}}
+ end.
+
+
+get_index_header_data(#group{current_seq=Seq, purge_seq=PurgeSeq,
+ id_btree=IdBtree,views=Views}) ->
+ ViewStates = [couch_btree:get_state(Btree) || #view{btree=Btree} <- Views],
+ #index_header{seq=Seq,
+ purge_seq=PurgeSeq,
+ id_btree_state=couch_btree:get_state(IdBtree),
+ view_states=ViewStates}.
+
+
+purge_index(#group{db=Db, views=Views, id_btree=IdBtree}=Group) ->
+ {ok, PurgedIdsRevs} = couch_db:get_last_purged(Db),
+ Ids = [Id || {Id, _Revs} <- PurgedIdsRevs],
+ {ok, Lookups, IdBtree2} = couch_btree:query_modify(IdBtree, Ids, [], Ids),
+
+ % now populate the dictionary with all the keys to delete
+ ViewKeysToRemoveDict = lists:foldl(
+ fun({ok,{DocId,ViewNumRowKeys}}, ViewDictAcc) ->
+ lists:foldl(
+ fun({ViewNum, RowKey}, ViewDictAcc2) ->
+ dict:append(ViewNum, {RowKey, DocId}, ViewDictAcc2)
+ end, ViewDictAcc, ViewNumRowKeys);
+ ({not_found, _}, ViewDictAcc) ->
+ ViewDictAcc
+ end, dict:new(), Lookups),
+
+ % Now remove the values from the btrees
+ Views2 = lists:map(
+ fun(#view{id_num=Num,btree=Btree}=View) ->
+ case dict:find(Num, ViewKeysToRemoveDict) of
+ {ok, RemoveKeys} ->
+ {ok, Btree2} = couch_btree:add_remove(Btree, [], RemoveKeys),
+ View#view{btree=Btree2};
+ error -> % no keys to remove in this view
+ View
+ end
+ end, Views),
+ Group#group{id_btree=IdBtree2,
+ views=Views2,
+ purge_seq=couch_db:get_purge_seq(Db)}.
+
+process_doc(Db, DocInfo, {Docs, #group{sig=Sig,name=GroupId}=Group, ViewKVs, DocIdViewIdKeys}) ->
+ % This fun computes once for each document
+ #doc_info{id=DocId, deleted=Deleted} = DocInfo,
+ case DocId of
+ GroupId ->
+ % uh oh. this is the design doc with our definitions. See if
+ % anything in the definition changed.
+ case couch_db:open_doc(Db, DocInfo) of
+ {ok, Doc} ->
+ case design_doc_to_view_group(Doc) of
+ #group{sig=Sig} ->
+ % The same md5 signature, keep on computing
+ {ok, {Docs, Group, ViewKVs, DocIdViewIdKeys}};
+ _ ->
+ ?LOG_DEBUG("throw(restart) md5 broke ~p", [DocId]),
+ throw(restart)
+ end;
+ {not_found, deleted} ->
+ ?LOG_DEBUG("throw(restart) {not_found, deleted} ~p", [DocId]),
+ throw(restart)
+ end;
+ <<?DESIGN_DOC_PREFIX, _/binary>> -> % we skip design docs
+ {ok, {Docs, Group, ViewKVs, DocIdViewIdKeys}};
+ _ ->
+ {Docs2, DocIdViewIdKeys2} =
+ if Deleted ->
+ {Docs, [{DocId, []} | DocIdViewIdKeys]};
+ true ->
+ {ok, Doc} = couch_db:open_doc(Db, DocInfo, [conflicts, deleted_conflicts]),
+ {[Doc | Docs], DocIdViewIdKeys}
+ end,
+
+ case couch_util:should_flush() of
+ true ->
+ {Group1, Results} = view_compute(Group, Docs2),
+ {ViewKVs3, DocIdViewIdKeys3} = view_insert_query_results(Docs2, Results, ViewKVs, DocIdViewIdKeys2),
+ {ok, Group2} = write_changes(Group1, ViewKVs3, DocIdViewIdKeys3,
+ DocInfo#doc_info.update_seq),
+ garbage_collect(),
+ ViewEmptyKeyValues = [{View, []} || View <- Group2#group.views],
+ {ok, {[], Group2, ViewEmptyKeyValues, []}};
+ false ->
+ {ok, {Docs2, Group, ViewKVs, DocIdViewIdKeys2}}
+ end
+ end.
+
+view_insert_query_results([], [], ViewKVs, DocIdViewIdKeysAcc) ->
+ {ViewKVs, DocIdViewIdKeysAcc};
+view_insert_query_results([Doc|RestDocs], [QueryResults | RestResults], ViewKVs, DocIdViewIdKeysAcc) ->
+ {NewViewKVs, NewViewIdKeys} = view_insert_doc_query_results(Doc, QueryResults, ViewKVs, [], []),
+ NewDocIdViewIdKeys = [{Doc#doc.id, NewViewIdKeys} | DocIdViewIdKeysAcc],
+ view_insert_query_results(RestDocs, RestResults, NewViewKVs, NewDocIdViewIdKeys).
+
+
+view_insert_doc_query_results(_Doc, [], [], ViewKVsAcc, ViewIdKeysAcc) ->
+ {lists:reverse(ViewKVsAcc), lists:reverse(ViewIdKeysAcc)};
+view_insert_doc_query_results(#doc{id=DocId}=Doc, [ResultKVs|RestResults], [{View, KVs}|RestViewKVs], ViewKVsAcc, ViewIdKeysAcc) ->
+ % Take any identical keys and combine the values
+ ResultKVs2 = lists:foldl(
+ fun({Key,Value}, [{PrevKey,PrevVal}|AccRest]) ->
+ case Key == PrevKey of
+ true ->
+ case PrevVal of
+ {dups, Dups} ->
+ [{PrevKey, {dups, [Value|Dups]}} | AccRest];
+ _ ->
+ [{PrevKey, {dups, [Value,PrevVal]}} | AccRest]
+ end;
+ false ->
+ [{Key,Value},{PrevKey,PrevVal}|AccRest]
+ end;
+ (KV, []) ->
+ [KV]
+ end, [], lists:sort(ResultKVs)),
+ NewKVs = [{{Key, DocId}, Value} || {Key, Value} <- ResultKVs2],
+ NewViewKVsAcc = [{View, NewKVs ++ KVs} | ViewKVsAcc],
+ NewViewIdKeys = [{View#view.id_num, Key} || {Key, _Value} <- ResultKVs2],
+ NewViewIdKeysAcc = NewViewIdKeys ++ ViewIdKeysAcc,
+ view_insert_doc_query_results(Doc, RestResults, RestViewKVs, NewViewKVsAcc, NewViewIdKeysAcc).
+
+view_compute(Group, []) ->
+ {Group, []};
+view_compute(#group{def_lang=DefLang, query_server=QueryServerIn}=Group, Docs) ->
+ {ok, QueryServer} =
+ case QueryServerIn of
+ nil -> % doc map not started
+ Definitions = [View#view.def || View <- Group#group.views],
+ couch_query_servers:start_doc_map(DefLang, Definitions);
+ _ ->
+ {ok, QueryServerIn}
+ end,
+ {ok, Results} = couch_query_servers:map_docs(QueryServer, Docs),
+ {Group#group{query_server=QueryServer}, Results}.
+
+
+
+write_changes(Group, ViewKeyValuesToAdd, DocIdViewIdKeys, NewSeq) ->
+ #group{id_btree=IdBtree} = Group,
+
+ AddDocIdViewIdKeys = [{DocId, ViewIdKeys} || {DocId, ViewIdKeys} <- DocIdViewIdKeys, ViewIdKeys /= []],
+ RemoveDocIds = [DocId || {DocId, ViewIdKeys} <- DocIdViewIdKeys, ViewIdKeys == []],
+ LookupDocIds = [DocId || {DocId, _ViewIdKeys} <- DocIdViewIdKeys],
+ {ok, LookupResults, IdBtree2}
+ = couch_btree:query_modify(IdBtree, LookupDocIds, AddDocIdViewIdKeys, RemoveDocIds),
+ KeysToRemoveByView = lists:foldl(
+ fun(LookupResult, KeysToRemoveByViewAcc) ->
+ case LookupResult of
+ {ok, {DocId, ViewIdKeys}} ->
+ lists:foldl(
+ fun({ViewId, Key}, KeysToRemoveByViewAcc2) ->
+ dict:append(ViewId, {Key, DocId}, KeysToRemoveByViewAcc2)
+ end,
+ KeysToRemoveByViewAcc, ViewIdKeys);
+ {not_found, _} ->
+ KeysToRemoveByViewAcc
+ end
+ end,
+ dict:new(), LookupResults),
+
+ Views2 = [
+ begin
+ KeysToRemove = couch_util:dict_find(View#view.id_num, KeysToRemoveByView, []),
+ {ok, ViewBtree2} = couch_btree:add_remove(View#view.btree, AddKeyValues, KeysToRemove),
+ View#view{btree = ViewBtree2}
+ end
+ ||
+ {View, AddKeyValues} <- ViewKeyValuesToAdd
+ ],
+ Group2 = Group#group{views=Views2, current_seq=NewSeq, id_btree=IdBtree2},
+ {ok, Group2}.
+
+prepare_group(RootDir, DbName, GroupId) ->
+ {Db, Group} = case (catch couch_db:open(DbName, [])) of
+ {ok, Db0} ->
+ case (catch couch_db:open_doc(Db0, GroupId)) of
+ {ok, Doc} ->
+ {Db0, design_doc_to_view_group(Doc)};
+ Else ->
+ delete_index_file(RootDir, DbName, GroupId),
+ ?LOG_DEBUG("prepare_group exit Else ~p self() ~p", [Else, self()]),
+ exit(Else)
+ end;
+ Else ->
+ delete_index_file(RootDir, DbName, GroupId),
+ exit(Else)
+ end,
+ FileName = RootDir ++ "/." ++ binary_to_list(DbName) ++
+ binary_to_list(GroupId) ++".view",
+ Group2 =
+ case couch_file:open(FileName) of
+ {ok, Fd} ->
+ Sig = Group#group.sig,
+ case (catch couch_file:read_header(Fd, <<$r, $c, $k, 0>>)) of
+ {ok, {Sig, #index_header{purge_seq=PurgeSeq}=HeaderInfo}} ->
+ % sigs match!
+ DbPurgeSeq = couch_db:get_purge_seq(Db),
+ % We can only use index with the same, or next purge seq as the db.
+ if DbPurgeSeq == PurgeSeq ->
+ init_group(Db, Fd, Group, HeaderInfo);
+ DbPurgeSeq == PurgeSeq + 1 ->
+ ?LOG_DEBUG("Purging entries from view index.",[]),
+ purge_index(init_group(Db, Fd, Group, HeaderInfo));
+ true ->
+ ?LOG_DEBUG("Reseting view index due to lost purge entries.",[]),
+ reset_file(Db, Fd, DbName, Group)
+ end;
+ _ ->
+ reset_file(Db, Fd, DbName, Group)
+ end;
+ {error, enoent} ->
+ case couch_file:open(FileName, [create]) of
+ {ok, Fd} -> reset_file(Db, Fd, DbName, Group);
+ Error -> throw(Error)
+ end
+ end,
+
+ couch_db:monitor(Db),
+ couch_db:close(Db),
+ {ok, Group2}.
+
+% maybe move to another module
+design_doc_to_view_group(#doc{id=Id,body={Fields}}) ->
+ Language = proplists:get_value(<<"language">>, Fields, <<"javascript">>),
+ {RawViews} = proplists:get_value(<<"views">>, Fields, {[]}),
+
+ % add the views to a dictionary object, with the map source as the key
+ DictBySrc =
+ lists:foldl(
+ fun({Name, {MRFuns}}, DictBySrcAcc) ->
+ MapSrc = proplists:get_value(<<"map">>, MRFuns),
+ RedSrc = proplists:get_value(<<"reduce">>, MRFuns, null),
+ View =
+ case dict:find(MapSrc, DictBySrcAcc) of
+ {ok, View0} -> View0;
+ error -> #view{def=MapSrc} % create new view object
+ end,
+ View2 =
+ if RedSrc == null ->
+ View#view{map_names=[Name|View#view.map_names]};
+ true ->
+ View#view{reduce_funs=[{Name,RedSrc}|View#view.reduce_funs]}
+ end,
+ dict:store(MapSrc, View2, DictBySrcAcc)
+ end, dict:new(), RawViews),
+ % number the views
+ {Views, _N} = lists:mapfoldl(
+ fun({_Src, View}, N) ->
+ {View#view{id_num=N},N+1}
+ end, 0, dict:to_list(DictBySrc)),
+
+ Group = #group{name=Id, views=Views, def_lang=Language},
+ Group#group{sig=erlang:md5(term_to_binary(Group))}.
+
+reset_group(#group{views=Views}=Group) ->
+ Views2 = [View#view{btree=nil} || View <- Views],
+ Group#group{db=nil,fd=nil,query_server=nil,current_seq=0,
+ id_btree=nil,views=Views2}.
+
+reset_file(Db, Fd, DbName, #group{sig=Sig,name=Name} = Group) ->
+ ?LOG_DEBUG("Reseting group index \"~s\" in db ~s", [Name, DbName]),
+ ok = couch_file:truncate(Fd, 0),
+ ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, {Sig, nil}),
+ init_group(Db, Fd, reset_group(Group), nil).
+
+delete_index_file(RootDir, DbName, GroupId) ->
+ file:delete(RootDir ++ "/." ++ binary_to_list(DbName)
+ ++ binary_to_list(GroupId) ++ ".view").
+
+init_group(Db, Fd, #group{views=Views}=Group, nil) ->
+ init_group(Db, Fd, Group,
+ #index_header{seq=0, purge_seq=couch_db:get_purge_seq(Db),
+ id_btree_state=nil, view_states=[nil || _ <- Views]});
+init_group(Db, Fd, #group{def_lang=Lang,views=Views}=Group, IndexHeader) ->
+ #index_header{seq=Seq, purge_seq=PurgeSeq,
+ id_btree_state=IdBtreeState, view_states=ViewStates} = IndexHeader,
+ {ok, IdBtree} = couch_btree:open(IdBtreeState, Fd),
+ Views2 = lists:zipwith(
+ fun(BtreeState, #view{reduce_funs=RedFuns}=View) ->
+ FunSrcs = [FunSrc || {_Name, FunSrc} <- RedFuns],
+ ReduceFun =
+ fun(reduce, KVs) ->
+ KVs2 = couch_view:expand_dups(KVs,[]),
+ KVs3 = couch_view:detuple_kvs(KVs2,[]),
+ {ok, Reduced} = couch_query_servers:reduce(Lang, FunSrcs,
+ KVs3),
+ {length(KVs3), Reduced};
+ (rereduce, Reds) ->
+ Count = lists:sum([Count0 || {Count0, _} <- Reds]),
+ UserReds = [UserRedsList || {_, UserRedsList} <- Reds],
+ {ok, Reduced} = couch_query_servers:rereduce(Lang, FunSrcs,
+ UserReds),
+ {Count, Reduced}
+ end,
+ {ok, Btree} = couch_btree:open(BtreeState, Fd,
+ [{less, fun couch_view:less_json_keys/2},
+ {reduce, ReduceFun}]),
+ View#view{btree=Btree}
+ end,
+ ViewStates, Views),
+ Group#group{db=Db, fd=Fd, current_seq=Seq, purge_seq=PurgeSeq,
+ id_btree=IdBtree, views=Views2}. \ No newline at end of file