summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDamien F. Katz <damien@apache.org>2008-12-12 05:23:37 +0000
committerDamien F. Katz <damien@apache.org>2008-12-12 05:23:37 +0000
commit56a3ee28e006aa42150482e1c3f91dc1906273f9 (patch)
tree139a7b75db8c0e4b10d7a581e3f9f54936ea5746 /src
parentae3a9d4a0f06ef9eb6fcb0ce44e719bfc5bebbbd (diff)
modifications to view server to keep the file descriptor open for the life of the view group.
git-svn-id: https://svn.apache.org/repos/asf/incubator/couchdb/trunk@725909 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src')
-rw-r--r--src/couchdb/couch_db.erl2
-rw-r--r--src/couchdb/couch_db.hrl5
-rw-r--r--src/couchdb/couch_file.erl65
-rw-r--r--src/couchdb/couch_view.erl58
-rw-r--r--src/couchdb/couch_view_group.erl325
-rw-r--r--src/couchdb/couch_view_updater.erl225
6 files changed, 338 insertions, 342 deletions
diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl
index 5da16b83..5a51a1ac 100644
--- a/src/couchdb/couch_db.erl
+++ b/src/couchdb/couch_db.erl
@@ -553,7 +553,7 @@ open_doc_int(Db, Id, Options) ->
{ok, FullDocInfo} ->
open_doc_int(Db, FullDocInfo, Options);
not_found ->
- throw({not_found, missing})
+ {not_found, missing}
end.
doc_meta_info(DocInfo, RevTree, Options) ->
diff --git a/src/couchdb/couch_db.hrl b/src/couchdb/couch_db.hrl
index 6a48ab1c..eb7bd9a1 100644
--- a/src/couchdb/couch_db.hrl
+++ b/src/couchdb/couch_db.hrl
@@ -35,7 +35,7 @@
end).
-define(LOG_ERROR(Format, Args),
- error_logger:info_report(couch_error, {Format, Args})).
+ error_logger:error_report(couch_error, {Format, Args})).
-record(doc_info,
{
@@ -162,7 +162,8 @@
id_btree=nil,
current_seq=0,
purge_seq=0,
- query_server=nil
+ query_server=nil,
+ commit_fun
}).
-record(view,
diff --git a/src/couchdb/couch_file.erl b/src/couchdb/couch_file.erl
index d844d5b2..efca523a 100644
--- a/src/couchdb/couch_file.erl
+++ b/src/couchdb/couch_file.erl
@@ -33,23 +33,14 @@ open(Filepath) ->
open(Filepath, []).
open(Filepath, Options) ->
- case gen_server:start_link(couch_file, {Filepath, Options, self()}, []) of
- {ok, FdPid} ->
- % we got back an ok, but that doesn't really mean it was successful.
- % Instead the true status has been sent back to us as a message.
- % We do this because if the gen_server doesn't initialize properly,
- % it generates a crash report that will get logged. This avoids
- % that mess, because we don't want crash reports generated
- % every time a file cannot be found.
+ case gen_server:start_link(couch_file,
+ {Filepath, Options, self(), Ref = make_ref()}, []) of
+ {ok, Fd} ->
+ {ok, Fd};
+ ignore ->
+ % get the error
receive
- {FdPid, ok} ->
- {ok, FdPid};
- {FdPid, Error} ->
- case process_info(self(), trap_exit) of
- {trap_exit, true} ->
- receive {'EXIT', FdPid, _} -> ok end;
- _ -> ok
- end,
+ {Ref, Error} ->
Error
end;
Error ->
@@ -235,12 +226,12 @@ read_header(Fd, Prefix) ->
?LOG_INFO("Header version differences.~nPrimary Header: ~p~nSecondary Header: ~p", [Header1, Header2]),
{ok, Header1}
end;
- {error, Error} ->
+ Error ->
% error reading second header. It's ok, but log it.
?LOG_INFO("Secondary header corruption (error: ~p). Using primary header.", [Error]),
{ok, Header1}
end;
- {error, Error} ->
+ Error ->
% error reading primary header
case extract_header(Prefix, Bin2) of
{ok, Header2} ->
@@ -250,7 +241,7 @@ read_header(Fd, Prefix) ->
_ ->
% error reading secondary header too
% return the error, no need to log anything as the caller will be responsible for dealing with the error.
- {error, Error}
+ Error
end
end,
case Result of
@@ -277,26 +268,20 @@ extract_header(Prefix, Bin) ->
Header = binary_to_term(TermBin),
{ok, Header};
false ->
- {error, header_corrupt}
+ header_corrupt
end;
_ ->
- {error, unknown_header_type}
+ unknown_header_type
end.
-
-init_status_ok(ReturnPid, Fd) ->
- ReturnPid ! {self(), ok}, % signal back ok
- {ok, Fd}.
-
-init_status_error(ReturnPid, Error) ->
- ReturnPid ! {self(), Error}, % signal back error status
- gen_server:cast(self(), close), % tell ourself to close async
- {ok, nil}.
+init_status_error(ReturnPid, Ref, Error) ->
+ ReturnPid ! {Ref, Error},
+ ignore.
% server functions
-init({Filepath, Options, ReturnPid}) ->
+init({Filepath, Options, ReturnPid, Ref}) ->
case lists:member(create, Options) of
true ->
filelib:ensure_dir(Filepath),
@@ -312,16 +297,16 @@ init({Filepath, Options, ReturnPid}) ->
true ->
{ok, 0} = file:position(Fd, 0),
ok = file:truncate(Fd),
- init_status_ok(ReturnPid, Fd);
+ {ok, Fd};
false ->
ok = file:close(Fd),
- init_status_error(ReturnPid, file_exists)
+ init_status_error(ReturnPid, Ref, file_exists)
end;
false ->
- init_status_ok(ReturnPid, Fd)
+ {ok, Fd}
end;
Error ->
- init_status_error(ReturnPid, Error)
+ init_status_error(ReturnPid, Ref, Error)
end;
false ->
% open in read mode first, so we don't create the file if it doesn't exist.
@@ -329,15 +314,13 @@ init({Filepath, Options, ReturnPid}) ->
{ok, Fd_Read} ->
{ok, Fd} = file:open(Filepath, [read, write, raw, binary]),
ok = file:close(Fd_Read),
- init_status_ok(ReturnPid, Fd);
+ {ok, Fd};
Error ->
- init_status_error(ReturnPid, Error)
+ init_status_error(ReturnPid, Ref, Error)
end
end.
-terminate(_Reason, nil) ->
- ok;
terminate(_Reason, Fd) ->
file:close(Fd),
ok.
@@ -403,9 +386,7 @@ code_change(_OldVsn, State, _Extra) ->
handle_info({'DOWN', MonitorRef, _Type, Pid, _Info}, Fd) ->
{MonitorRef, _RefCount} = erase(Pid),
- maybe_close_async(Fd);
-handle_info(Info, Fd) ->
- exit({error, {Info, Fd}}).
+ maybe_close_async(Fd).
diff --git a/src/couchdb/couch_view.erl b/src/couchdb/couch_view.erl
index 7c7730a7..84f327e3 100644
--- a/src/couchdb/couch_view.erl
+++ b/src/couchdb/couch_view.erl
@@ -27,8 +27,12 @@ get_temp_updater(DbName, Type, MapSrc, RedSrc) ->
Pid.
get_group_server(DbName, GroupId) ->
- {ok, Pid} = gen_server:call(couch_view, {start_group_server, DbName, GroupId}),
- Pid.
+ case gen_server:call(couch_view, {start_group_server, DbName, GroupId}) of
+ {ok, Pid} ->
+ Pid;
+ Error ->
+ throw(Error)
+ end.
get_updated_group(DbName, GroupId, Update) ->
couch_view_group:request_group(get_group_server(DbName, GroupId), seq_for_update(DbName, Update)).
@@ -45,10 +49,10 @@ get_reduce_view({temp, DbName, Type, MapSrc, RedSrc}) ->
{ok, {temp_reduce, View}};
get_reduce_view({DbName, GroupId, Name}) ->
case get_updated_group(DbName, GroupId, true) of
- {error, Reason} ->
- Reason;
{ok, #group{views=Views,def_lang=Lang}} ->
- get_reduce_view0(Name, Lang, Views)
+ get_reduce_view0(Name, Lang, Views);
+ Error ->
+ Error
end.
get_reduce_view0(_Name, _Lang, []) ->
@@ -124,10 +128,10 @@ get_map_view({temp, DbName, Type, Src}) ->
{ok, View};
get_map_view({DbName, GroupId, Name, Update}) ->
case get_updated_group(DbName, GroupId, Update) of
- {error, Reason} ->
- Reason;
{ok, #group{views=Views}} ->
- get_map_view0(Name, Views)
+ get_map_view0(Name, Views);
+ Error ->
+ Error
end.
get_map_view0(_Name, []) ->
@@ -217,7 +221,7 @@ handle_call({start_temp_updater, DbName, Lang, MapSrc, RedSrc}, _From, #server{r
ok
end,
?LOG_DEBUG("Spawning new temp update process for db ~s.", [DbName]),
- {ok, NewPid} = couch_view_group:start_link({DbName, Fd, Lang, MapSrc, RedSrc}),
+ {ok, NewPid} = couch_view_group:start_link({temp_view, DbName, Fd, Lang, MapSrc, RedSrc}),
true = ets:insert(couch_temp_group_fd_by_db, {DbName, Fd, Count + 1}),
add_to_ets(NewPid, DbName, Name),
NewPid;
@@ -226,17 +230,19 @@ handle_call({start_temp_updater, DbName, Lang, MapSrc, RedSrc}, _From, #server{r
end,
{reply, {ok, Pid}, Server};
handle_call({start_group_server, DbName, GroupId}, _From, #server{root_dir=Root}=Server) ->
- Pid =
case ets:lookup(group_servers_by_name, {DbName, GroupId}) of
[] ->
?LOG_DEBUG("Spawning new group server for view group ~s in database ~s.", [GroupId, DbName]),
- {ok, NewPid} = couch_view_group:start_link({Root, DbName, GroupId}),
- add_to_ets(NewPid, DbName, GroupId),
- NewPid;
- [{_, ExistingPid0}] ->
- ExistingPid0
- end,
- {reply, {ok, Pid}, Server}.
+ case couch_view_group:start_link({view, Root, DbName, GroupId}) of
+ {ok, NewPid} ->
+ add_to_ets(NewPid, DbName, GroupId),
+ {reply, {ok, NewPid}, Server};
+ Error ->
+ {reply, Error, Server}
+ end;
+ [{_, ExistingPid}] ->
+ {reply, {ok, ExistingPid}, Server}
+ end.
handle_cast({reset_indexes, DbName}, #server{root_dir=Root}=Server) ->
% shutdown all the updaters
@@ -254,14 +260,15 @@ handle_cast({reset_indexes, DbName}, #server{root_dir=Root}=Server) ->
file:delete(Root ++ "/." ++ binary_to_list(DbName) ++ "_temp"),
{noreply, Server}.
-handle_info({'EXIT', _FromPid, normal}, Server) ->
- {noreply, Server};
handle_info({'EXIT', FromPid, Reason}, #server{root_dir=RootDir}=Server) ->
- ?LOG_DEBUG("Exit from process: ~p", [{FromPid, Reason}]),
case ets:lookup(couch_groups_by_updater, FromPid) of
- [] -> % non-updater linked process must have died, we propagate the error
- ?LOG_ERROR("Exit on non-updater process: ~p", [Reason]),
- exit(Reason);
+ [] ->
+ if Reason /= normal ->
+ % non-updater linked process died, we propagate the error
+ ?LOG_ERROR("Exit on non-updater process: ~p", [Reason]),
+ exit(Reason);
+ true -> ok
+ end;
[{_, {DbName, "_temp_" ++ _ = GroupId}}] ->
delete_from_ets(FromPid, DbName, GroupId),
[{_, Fd, Count}] = ets:lookup(couch_temp_group_fd_by_db, DbName),
@@ -276,10 +283,7 @@ handle_info({'EXIT', FromPid, Reason}, #server{root_dir=RootDir}=Server) ->
[{_, {DbName, GroupId}}] ->
delete_from_ets(FromPid, DbName, GroupId)
end,
- {noreply, Server};
-handle_info(Msg, _Server) ->
- ?LOG_ERROR("Bad message received for view module: ~p", [Msg]),
- exit({error, Msg}).
+ {noreply, Server}.
add_to_ets(Pid, DbName, GroupId) ->
true = ets:insert(couch_groups_by_updater, {Pid, {DbName, GroupId}}),
diff --git a/src/couchdb/couch_view_group.erl b/src/couchdb/couch_view_group.erl
index 84547095..cbbac92a 100644
--- a/src/couchdb/couch_view_group.erl
+++ b/src/couchdb/couch_view_group.erl
@@ -15,7 +15,7 @@
%% API
-export([start_link/1, request_group/2]).
-% -export([design_doc_to_view_group/1]).
+-export([design_doc_to_view_group/1]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
@@ -24,10 +24,9 @@
-include("couch_db.hrl").
-record(group_state, {
- spawn_fun,
- target_seq=0,
- group_seq=0,
- group=nil,
+ db_name,
+ init_args,
+ group,
updater_pid=nil,
waiting_list=[]
}).
@@ -37,7 +36,6 @@ request_group(Pid, Seq) ->
?LOG_DEBUG("request_group {Pid, Seq} ~p", [{Pid, Seq}]),
case gen_server:call(Pid, {request_group, Seq}, infinity) of
{ok, Group} ->
- ?LOG_DEBUG("get_updated_group replied with group", []),
{ok, Group};
Else ->
?LOG_DEBUG("get_updated_group replied with _Else ~p", [Else]),
@@ -47,15 +45,34 @@ request_group(Pid, Seq) ->
% from template
start_link(InitArgs) ->
- gen_server:start_link(couch_view_group, InitArgs, []).
+ case gen_server:start_link(couch_view_group,
+ {InitArgs, self(), Ref = make_ref()}, []) of
+ {ok, Pid} -> {ok, Pid};
+ ignore -> receive {Ref, Error} -> Error end;
+ Error -> Error
+ end.
% init differentiates between temp and design_doc views. It creates a closure
% which spawns the appropriate view_updater. (It might also spawn the first
% view_updater run.)
-init(InitArgs) ->
- SpawnFun = fun() -> spawn_updater(InitArgs) end,
+init({InitArgs, ReturnPid, Ref}) ->
process_flag(trap_exit, true),
- {ok, #group_state{spawn_fun=SpawnFun}}.
+ case prepare_group(InitArgs, false) of
+ {ok, #group{db=Db}=Group} ->
+ couch_db:monitor(Db),
+ Pid = spawn_link(fun()-> couch_view_updater:update(Group) end),
+ {ok, #group_state{
+ db_name=couch_db:name(Db),
+ init_args=InitArgs,
+ updater_pid = Pid,
+ group=Group}};
+ Error ->
+ ReturnPid ! {Ref, Error},
+ ignore
+ end.
+
+
+
% There are two sources of messages: couch_view, which requests an up to date
% view group, and the couch_view_updater, which when spawned, updates the
@@ -73,87 +90,92 @@ init(InitArgs) ->
handle_call({request_group, RequestSeq}, From,
#group_state{
- target_seq=TargetSeq,
- spawn_fun=SpawnFun,
- updater_pid=Up,
- waiting_list=WaitList
- }=State) when RequestSeq > TargetSeq, Up == nil ->
- UpdaterPid = SpawnFun(),
- {noreply, State#group_state{
- target_seq=RequestSeq,
- updater_pid=UpdaterPid,
- waiting_list=[{From,RequestSeq}|WaitList]
- }, infinity};
-
-handle_call({request_group, RequestSeq}, From,
- #group_state{
- target_seq=TargetSeq,
+ db_name=DbName,
+ group=#group{current_seq=Seq}=Group,
+ updater_pid=nil,
waiting_list=WaitList
- }=State) when RequestSeq > TargetSeq ->
+ }=State) when RequestSeq > Seq ->
+ {ok, Db} = couch_db:open(DbName, []),
+ Group2 = Group#group{db=Db},
+ Pid = spawn_link(fun()-> couch_view_updater:update(Group2) end),
+
{noreply, State#group_state{
- target_seq=RequestSeq,
+ updater_pid=Pid,
+ group=Group2,
waiting_list=[{From,RequestSeq}|WaitList]
- }, infinity};
+ }, infinity};
% If the request seqence is less than or equal to the seq_id of a known Group,
% we respond with that Group.
handle_call({request_group, RequestSeq}, _From,
- State=#group_state{
- group_seq=GroupSeq,
- group=Group
- }) when RequestSeq =< GroupSeq ->
+ #group_state{group=#group{current_seq=GroupSeq}=Group}=State)
+ when RequestSeq =< GroupSeq ->
{reply, {ok, Group}, State};
% Otherwise: TargetSeq => RequestSeq > GroupSeq
% We've already initiated the appropriate action, so just hold the response until the group is up to the RequestSeq
handle_call({request_group, RequestSeq}, From,
- #group_state{
- waiting_list=WaitList
- }=State) ->
+ #group_state{waiting_list=WaitList}=State) ->
{noreply, State#group_state{
waiting_list=[{From, RequestSeq}|WaitList]
- }, infinity}.
-
-
-% When the updater finishes, it will return a group with a seq_id, we should
-% store that group and seq_id in our state. If our high_target is higher than
-% the returned group, start a new updater.
-
-handle_cast({new_group, Group=#group{current_seq=NewGroupSeq}},
- State=#group_state{
- target_seq=TargetSeq,
- waiting_list=WaitList,
- spawn_fun=SpawnFun}) when TargetSeq > NewGroupSeq ->
- StillWaiting = reply_with_group(Group, WaitList, []),
- UpdaterPid = SpawnFun(),
- {noreply, State#group_state{
- updater_pid=UpdaterPid,
- waiting_list=StillWaiting,
- group_seq=NewGroupSeq,
- group=Group}};
-
-handle_cast({new_group, Group=#group{current_seq=NewGroupSeq}},
- State=#group_state{waiting_list=WaitList}) ->
- StillWaiting = reply_with_group(Group, WaitList, []),
- {noreply, State#group_state{
- updater_pid=nil,
- waiting_list=StillWaiting,
- group_seq=NewGroupSeq,
- group=Group}}.
-
+ }, infinity}.
+
+
+handle_cast(foo, State) ->
+ {ok, State}.
+
+
+handle_info({'EXIT', FromPid, {new_group, #group{db=Db}=Group}},
+ #group_state{db_name=DbName,
+ updater_pid=UpPid,
+ waiting_list=WaitList}=State) when UpPid == FromPid ->
+ ok = couch_db:close(Db),
+ case reply_with_group(Group, WaitList, []) of
+ [] ->
+ {noreply, State#group_state{waiting_list=[],
+ group=Group#group{db=nil},
+ updater_pid=nil}};
+ StillWaiting ->
+ % we still have some waiters, reopen the database and reupdate the index
+ {ok, Db2} = couch_db:open(DbName, []),
+ Group2 = Group#group{db=Db2},
+ Pid = spawn_link(fun() -> couch_view_updater:update(Group2) end),
+ {noreply, State#group_state{waiting_list=StillWaiting,
+ group=Group2,
+ updater_pid=Pid}}
+ end;
+
+handle_info({'EXIT', FromPid, reset},
+ #group_state{
+ init_args=InitArgs,
+ updater_pid=UpPid,
+ group=Group}=State) when UpPid == FromPid ->
+ ok = couch_db:close(Group#group.db),
+ case prepare_group(InitArgs, true) of
+ {ok, ResetGroup} ->
+ Pid = spawn_link(fun()-> couch_view_updater:update(ResetGroup) end),
+ {noreply, State#group_state{
+ updater_pid=Pid,
+ group=ResetGroup}};
+ Error ->
+ {stop, normal, reply_all(State, Error)}
+ end;
+
handle_info({'EXIT', _FromPid, normal}, State) ->
{noreply, State};
handle_info({'EXIT', FromPid, Reason}, State) ->
- ?LOG_DEBUG("Exit from updater: ~p", [{FromPid, Reason}]),
+ ?LOG_DEBUG("Exit from linked pid: ~p", [{FromPid, Reason}]),
{stop, Reason, State};
-handle_info(_Info, State) ->
- {noreply, State}.
+handle_info({'DOWN',_,_,_,_}, State) ->
+ ?LOG_INFO("Shutting down view group server, monitored db is closing.", []),
+ {stop, normal, reply_all(State, shutdown)}.
+
-terminate(Reason, _State=#group_state{waiting_list=WaitList}) ->
- lists:foreach(fun({Waiter, _}) -> gen_server:reply(Waiter, {error, Reason}) end, WaitList),
+terminate(Reason, #group_state{group=#group{fd=Fd}}=State) ->
+ reply_all(State, Reason),
ok.
code_change(_OldVsn, State, _Extra) ->
@@ -181,12 +203,165 @@ reply_with_group(Group, [{Pid, Seq}|WaitList], StillWaiting) ->
reply_with_group(_Group, [], StillWaiting) ->
StillWaiting.
-spawn_updater({RootDir, DbName, GroupId}) ->
- spawn_link(couch_view_updater, update,
- [RootDir, DbName, GroupId, self()]);
+reply_all(#group_state{waiting_list=WaitList}=State, Reply) ->
+ [catch gen_server:reply(Pid, Reply) || {Pid, _} <- WaitList],
+ State#group_state{waiting_list=[]}.
+
+prepare_group({view, RootDir, DbName, GroupId}, ForceReset)->
+ case open_db_group(DbName, GroupId) of
+ {ok, Db, #group{sig=Sig}=Group0} ->
+ case open_index_file(RootDir, DbName, GroupId) of
+ {ok, Fd} ->
+ Group = Group0#group{
+ commit_fun = fun(GroupIn) ->
+ Header = {Sig, get_index_header_data(GroupIn)},
+ ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, Header)
+ end},
+ if ForceReset ->
+ {ok, reset_file(Db, Fd, DbName, Group)};
+ true ->
+ case (catch couch_file:read_header(Fd, <<$r, $c, $k, 0>>)) of
+ {ok, {Sig, HeaderInfo}} ->
+ % sigs match!
+ {ok, init_group(Db, Fd, Group, HeaderInfo)};
+ _ ->
+ {ok, reset_file(Db, Fd, DbName, Group)}
+ end
+ end;
+ Error ->
+ catch delete_index_file(RootDir, DbName, GroupId),
+ Error
+ end;
+ Error ->
+ catch delete_index_file(RootDir, DbName, GroupId),
+ Error
+ end;
+prepare_group({temp_view, DbName, Fd, Lang, MapSrc, RedSrc}, _ForceReset) ->
+ case couch_db:open(DbName, []) of
+ {ok, Db} ->
+ View = #view{map_names=["_temp"],
+ id_num=0,
+ btree=nil,
+ def=MapSrc,
+ reduce_funs= if RedSrc==[] -> []; true -> [{"_temp", RedSrc}] end},
+ {ok, init_group(Db, Fd, #group{name="_temp", db=Db, views=[View],
+ def_lang=Lang, commit_fun=fun(_G) -> ok end}, nil)};
+ Error ->
+ Error
+ end.
+
+
+get_index_header_data(#group{current_seq=Seq, purge_seq=PurgeSeq,
+ id_btree=IdBtree,views=Views}) ->
+ ViewStates = [couch_btree:get_state(Btree) || #view{btree=Btree} <- Views],
+ #index_header{seq=Seq,
+ purge_seq=PurgeSeq,
+ id_btree_state=couch_btree:get_state(IdBtree),
+ view_states=ViewStates}.
+
-spawn_updater({DbName, Fd, Lang, MapSrc, RedSrc}) ->
- spawn_link(couch_view_updater, temp_update,
- [DbName, Fd, Lang, MapSrc, RedSrc, self()]).
+open_index_file(RootDir, DbName, GroupId) ->
+ FileName = RootDir ++ "/." ++ ?b2l(DbName) ++ ?b2l(GroupId) ++".view",
+ case couch_file:open(FileName) of
+ {ok, Fd} -> {ok, Fd};
+ {error, enoent} -> couch_file:open(FileName, [create]);
+ Error -> Error
+ end.
+open_db_group(DbName, GroupId) ->
+ case couch_db:open(DbName, []) of
+ {ok, Db} ->
+ case couch_db:open_doc(Db, GroupId) of
+ {ok, Doc} ->
+ {ok, Db, design_doc_to_view_group(Doc)};
+ Else ->
+ couch_db:close(Db),
+ Else
+ end;
+ Else ->
+ Else
+ end.
+
+% maybe move to another module
+design_doc_to_view_group(#doc{id=Id,body={Fields}}) ->
+ Language = proplists:get_value(<<"language">>, Fields, <<"javascript">>),
+ {RawViews} = proplists:get_value(<<"views">>, Fields, {[]}),
+
+ % add the views to a dictionary object, with the map source as the key
+ DictBySrc =
+ lists:foldl(
+ fun({Name, {MRFuns}}, DictBySrcAcc) ->
+ MapSrc = proplists:get_value(<<"map">>, MRFuns),
+ RedSrc = proplists:get_value(<<"reduce">>, MRFuns, null),
+ View =
+ case dict:find(MapSrc, DictBySrcAcc) of
+ {ok, View0} -> View0;
+ error -> #view{def=MapSrc} % create new view object
+ end,
+ View2 =
+ if RedSrc == null ->
+ View#view{map_names=[Name|View#view.map_names]};
+ true ->
+ View#view{reduce_funs=[{Name,RedSrc}|View#view.reduce_funs]}
+ end,
+ dict:store(MapSrc, View2, DictBySrcAcc)
+ end, dict:new(), RawViews),
+ % number the views
+ {Views, _N} = lists:mapfoldl(
+ fun({_Src, View}, N) ->
+ {View#view{id_num=N},N+1}
+ end, 0, dict:to_list(DictBySrc)),
+
+ Group = #group{name=Id, views=Views, def_lang=Language},
+ Group#group{sig=erlang:md5(term_to_binary(Group))}.
+
+reset_group(#group{views=Views}=Group) ->
+ Views2 = [View#view{btree=nil} || View <- Views],
+ Group#group{db=nil,fd=nil,query_server=nil,current_seq=0,
+ id_btree=nil,views=Views2}.
+
+reset_file(Db, Fd, DbName, #group{sig=Sig,name=Name} = Group) ->
+ ?LOG_DEBUG("Reseting group index \"~s\" in db ~s", [Name, DbName]),
+ ok = couch_file:truncate(Fd, 0),
+ ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, {Sig, nil}),
+ init_group(Db, Fd, reset_group(Group), nil).
+
+delete_index_file(RootDir, DbName, GroupId) ->
+ file:delete(RootDir ++ "/." ++ binary_to_list(DbName)
+ ++ binary_to_list(GroupId) ++ ".view").
+
+init_group(Db, Fd, #group{views=Views}=Group, nil) ->
+ init_group(Db, Fd, Group,
+ #index_header{seq=0, purge_seq=couch_db:get_purge_seq(Db),
+ id_btree_state=nil, view_states=[nil || _ <- Views]});
+init_group(Db, Fd, #group{def_lang=Lang,views=Views}=Group, IndexHeader) ->
+ #index_header{seq=Seq, purge_seq=PurgeSeq,
+ id_btree_state=IdBtreeState, view_states=ViewStates} = IndexHeader,
+ {ok, IdBtree} = couch_btree:open(IdBtreeState, Fd),
+ Views2 = lists:zipwith(
+ fun(BtreeState, #view{reduce_funs=RedFuns}=View) ->
+ FunSrcs = [FunSrc || {_Name, FunSrc} <- RedFuns],
+ ReduceFun =
+ fun(reduce, KVs) ->
+ KVs2 = couch_view:expand_dups(KVs,[]),
+ KVs3 = couch_view:detuple_kvs(KVs2,[]),
+ {ok, Reduced} = couch_query_servers:reduce(Lang, FunSrcs,
+ KVs3),
+ {length(KVs3), Reduced};
+ (rereduce, Reds) ->
+ Count = lists:sum([Count0 || {Count0, _} <- Reds]),
+ UserReds = [UserRedsList || {_, UserRedsList} <- Reds],
+ {ok, Reduced} = couch_query_servers:rereduce(Lang, FunSrcs,
+ UserReds),
+ {Count, Reduced}
+ end,
+ {ok, Btree} = couch_btree:open(BtreeState, Fd,
+ [{less, fun couch_view:less_json_keys/2},
+ {reduce, ReduceFun}]),
+ View#view{btree=Btree}
+ end,
+ ViewStates, Views),
+ Group#group{db=Db, fd=Fd, current_seq=Seq, purge_seq=PurgeSeq,
+ id_btree=IdBtree, views=Views2}.
+
diff --git a/src/couchdb/couch_view_updater.erl b/src/couchdb/couch_view_updater.erl
index 35bc5ffe..0532258c 100644
--- a/src/couchdb/couch_view_updater.erl
+++ b/src/couchdb/couch_view_updater.erl
@@ -12,83 +12,49 @@
-module(couch_view_updater).
--export([update/4, temp_update/6]).
+-export([update/1]).
-include("couch_db.hrl").
-
-
-update(RootDir, DbName, GroupId, NotifyPid) ->
- {ok, #group{sig=Sig,fd=Fd}=Group} = prepare_group(RootDir, DbName, GroupId),
- {ok, Db} = couch_db:open(DbName, []),
- Result = update_group(Group#group{db=Db}),
- couch_db:close(Db),
- case Result of
- {same, Group2} ->
- gen_server:cast(NotifyPid, {new_group, Group2});
- {updated, Group2} ->
- HeaderData = {Sig, get_index_header_data(Group2)},
- ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, HeaderData),
- gen_server:cast(NotifyPid, {new_group, Group2})
+update(#group{db=Db,current_seq=Seq,purge_seq=PurgeSeq,
+ commit_fun=CommitFun}=Group) ->
+ ?LOG_DEBUG("Starting index update.",[]),
+ DbPurgeSeq = couch_db:get_purge_seq(Db),
+ Group2 =
+ if DbPurgeSeq == PurgeSeq ->
+ Group;
+ DbPurgeSeq == PurgeSeq + 1 ->
+ ?LOG_DEBUG("Purging entries from view index.",[]),
+ purge_index(Group);
+ true ->
+ ?LOG_DEBUG("Resetting view index due to lost purge entries.",[]),
+ exit(reset)
end,
- garbage_collect().
-temp_update(DbName, Fd, Lang, MapSrc, RedSrc, NotifyPid) ->
- case couch_db:open(DbName, []) of
- {ok, Db} ->
- View = #view{map_names=["_temp"],
- id_num=0,
- btree=nil,
- def=MapSrc,
- reduce_funs= if RedSrc==[] -> []; true -> [{"_temp", RedSrc}] end},
- Group = #group{name="_temp",
- db=Db,
- views=[View],
- current_seq=0,
- def_lang=Lang,
- id_btree=nil},
- Group2 = init_group(Db, Fd, Group,nil),
- couch_db:monitor(Db),
- {_Updated, Group3} = update_group(Group2#group{db=Db}),
- couch_db:close(Db),
- gen_server:cast(NotifyPid, {new_group, Group3}),
- garbage_collect();
- Else ->
- exit(Else)
- end.
-
-
-update_group(#group{db=Db,current_seq=CurrentSeq}=Group) ->
- ViewEmptyKVs = [{View, []} || View <- Group#group.views],
+ ViewEmptyKVs = [{View, []} || View <- Group2#group.views],
% compute on all docs modified since we last computed.
{ok, {UncomputedDocs, Group3, ViewKVsToAdd, DocIdViewIdKeys}}
= couch_db:enum_docs_since(
Db,
- CurrentSeq,
+ Seq,
fun(DocInfo, _, Acc) -> process_doc(Db, DocInfo, Acc) end,
- {[], Group, ViewEmptyKVs, []}
+ {[], Group2, ViewEmptyKVs, []}
),
{Group4, Results} = view_compute(Group3, UncomputedDocs),
- {ViewKVsToAdd2, DocIdViewIdKeys2} = view_insert_query_results(UncomputedDocs, Results, ViewKVsToAdd, DocIdViewIdKeys),
+ {ViewKVsToAdd2, DocIdViewIdKeys2} = view_insert_query_results(
+ UncomputedDocs, Results, ViewKVsToAdd, DocIdViewIdKeys),
couch_query_servers:stop_doc_map(Group4#group.query_server),
NewSeq = couch_db:get_update_seq(Db),
- if CurrentSeq /= NewSeq ->
- {ok, Group5} = write_changes(Group4, ViewKVsToAdd2, DocIdViewIdKeys2, NewSeq),
- {updated, Group5#group{query_server=nil}};
+ if Seq /= NewSeq ->
+ {ok, Group5} = write_changes(Group4, ViewKVsToAdd2, DocIdViewIdKeys2,
+ NewSeq),
+ ok = CommitFun(Group5),
+ exit({new_group, Group5#group{query_server=nil}});
true ->
- {same, Group4#group{query_server=nil}}
+ exit({new_group, Group4#group{query_server=nil}})
end.
-get_index_header_data(#group{current_seq=Seq, purge_seq=PurgeSeq,
- id_btree=IdBtree,views=Views}) ->
- ViewStates = [couch_btree:get_state(Btree) || #view{btree=Btree} <- Views],
- #index_header{seq=Seq,
- purge_seq=PurgeSeq,
- id_btree_state=couch_btree:get_state(IdBtree),
- view_states=ViewStates}.
-
-
purge_index(#group{db=Db, views=Views, id_btree=IdBtree}=Group) ->
{ok, PurgedIdsRevs} = couch_db:get_last_purged(Db),
Ids = [Id || {Id, _Revs} <- PurgedIdsRevs],
@@ -120,7 +86,8 @@ purge_index(#group{db=Db, views=Views, id_btree=IdBtree}=Group) ->
views=Views2,
purge_seq=couch_db:get_purge_seq(Db)}.
-process_doc(Db, DocInfo, {Docs, #group{sig=Sig,name=GroupId}=Group, ViewKVs, DocIdViewIdKeys}) ->
+process_doc(Db, DocInfo, {Docs, #group{sig=Sig,name=GroupId}=Group, ViewKVs,
+ DocIdViewIdKeys}) ->
% This fun computes once for each document
#doc_info{id=DocId, deleted=Deleted} = DocInfo,
case DocId of
@@ -129,17 +96,15 @@ process_doc(Db, DocInfo, {Docs, #group{sig=Sig,name=GroupId}=Group, ViewKVs, Doc
% anything in the definition changed.
case couch_db:open_doc(Db, DocInfo) of
{ok, Doc} ->
- case design_doc_to_view_group(Doc) of
+ case couch_view_group:design_doc_to_view_group(Doc) of
#group{sig=Sig} ->
% The same md5 signature, keep on computing
{ok, {Docs, Group, ViewKVs, DocIdViewIdKeys}};
_ ->
- ?LOG_DEBUG("throw(restart) md5 broke ~p", [DocId]),
- throw(restart)
+ exit(reset)
end;
{not_found, deleted} ->
- ?LOG_DEBUG("throw(restart) {not_found, deleted} ~p", [DocId]),
- throw(restart)
+ exit(reset)
end;
<<?DESIGN_DOC_PREFIX, _/binary>> -> % we skip design docs
{ok, {Docs, Group, ViewKVs, DocIdViewIdKeys}};
@@ -250,134 +215,4 @@ write_changes(Group, ViewKeyValuesToAdd, DocIdViewIdKeys, NewSeq) ->
],
Group2 = Group#group{views=Views2, current_seq=NewSeq, id_btree=IdBtree2},
{ok, Group2}.
-
-prepare_group(RootDir, DbName, GroupId) ->
- {Db, Group} = case (catch couch_db:open(DbName, [])) of
- {ok, Db0} ->
- case (catch couch_db:open_doc(Db0, GroupId)) of
- {ok, Doc} ->
- {Db0, design_doc_to_view_group(Doc)};
- Else ->
- delete_index_file(RootDir, DbName, GroupId),
- ?LOG_DEBUG("prepare_group exit Else ~p self() ~p", [Else, self()]),
- exit(Else)
- end;
- Else ->
- delete_index_file(RootDir, DbName, GroupId),
- exit(Else)
- end,
- FileName = RootDir ++ "/." ++ binary_to_list(DbName) ++
- binary_to_list(GroupId) ++".view",
- Group2 =
- case couch_file:open(FileName) of
- {ok, Fd} ->
- Sig = Group#group.sig,
- case (catch couch_file:read_header(Fd, <<$r, $c, $k, 0>>)) of
- {ok, {Sig, #index_header{purge_seq=PurgeSeq}=HeaderInfo}} ->
- % sigs match!
- DbPurgeSeq = couch_db:get_purge_seq(Db),
- % We can only use index with the same, or next purge seq as the db.
- if DbPurgeSeq == PurgeSeq ->
- init_group(Db, Fd, Group, HeaderInfo);
- DbPurgeSeq == PurgeSeq + 1 ->
- ?LOG_DEBUG("Purging entries from view index.",[]),
- purge_index(init_group(Db, Fd, Group, HeaderInfo));
- true ->
- ?LOG_DEBUG("Reseting view index due to lost purge entries.",[]),
- reset_file(Db, Fd, DbName, Group)
- end;
- _ ->
- reset_file(Db, Fd, DbName, Group)
- end;
- {error, enoent} ->
- case couch_file:open(FileName, [create]) of
- {ok, Fd} -> reset_file(Db, Fd, DbName, Group);
- Error -> throw(Error)
- end
- end,
-
- couch_db:monitor(Db),
- couch_db:close(Db),
- {ok, Group2}.
-
-% maybe move to another module
-design_doc_to_view_group(#doc{id=Id,body={Fields}}) ->
- Language = proplists:get_value(<<"language">>, Fields, <<"javascript">>),
- {RawViews} = proplists:get_value(<<"views">>, Fields, {[]}),
- % add the views to a dictionary object, with the map source as the key
- DictBySrc =
- lists:foldl(
- fun({Name, {MRFuns}}, DictBySrcAcc) ->
- MapSrc = proplists:get_value(<<"map">>, MRFuns),
- RedSrc = proplists:get_value(<<"reduce">>, MRFuns, null),
- View =
- case dict:find(MapSrc, DictBySrcAcc) of
- {ok, View0} -> View0;
- error -> #view{def=MapSrc} % create new view object
- end,
- View2 =
- if RedSrc == null ->
- View#view{map_names=[Name|View#view.map_names]};
- true ->
- View#view{reduce_funs=[{Name,RedSrc}|View#view.reduce_funs]}
- end,
- dict:store(MapSrc, View2, DictBySrcAcc)
- end, dict:new(), RawViews),
- % number the views
- {Views, _N} = lists:mapfoldl(
- fun({_Src, View}, N) ->
- {View#view{id_num=N},N+1}
- end, 0, dict:to_list(DictBySrc)),
-
- Group = #group{name=Id, views=Views, def_lang=Language},
- Group#group{sig=erlang:md5(term_to_binary(Group))}.
-
-reset_group(#group{views=Views}=Group) ->
- Views2 = [View#view{btree=nil} || View <- Views],
- Group#group{db=nil,fd=nil,query_server=nil,current_seq=0,
- id_btree=nil,views=Views2}.
-
-reset_file(Db, Fd, DbName, #group{sig=Sig,name=Name} = Group) ->
- ?LOG_DEBUG("Reseting group index \"~s\" in db ~s", [Name, DbName]),
- ok = couch_file:truncate(Fd, 0),
- ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, {Sig, nil}),
- init_group(Db, Fd, reset_group(Group), nil).
-
-delete_index_file(RootDir, DbName, GroupId) ->
- file:delete(RootDir ++ "/." ++ binary_to_list(DbName)
- ++ binary_to_list(GroupId) ++ ".view").
-
-init_group(Db, Fd, #group{views=Views}=Group, nil) ->
- init_group(Db, Fd, Group,
- #index_header{seq=0, purge_seq=couch_db:get_purge_seq(Db),
- id_btree_state=nil, view_states=[nil || _ <- Views]});
-init_group(Db, Fd, #group{def_lang=Lang,views=Views}=Group, IndexHeader) ->
- #index_header{seq=Seq, purge_seq=PurgeSeq,
- id_btree_state=IdBtreeState, view_states=ViewStates} = IndexHeader,
- {ok, IdBtree} = couch_btree:open(IdBtreeState, Fd),
- Views2 = lists:zipwith(
- fun(BtreeState, #view{reduce_funs=RedFuns}=View) ->
- FunSrcs = [FunSrc || {_Name, FunSrc} <- RedFuns],
- ReduceFun =
- fun(reduce, KVs) ->
- KVs2 = couch_view:expand_dups(KVs,[]),
- KVs3 = couch_view:detuple_kvs(KVs2,[]),
- {ok, Reduced} = couch_query_servers:reduce(Lang, FunSrcs,
- KVs3),
- {length(KVs3), Reduced};
- (rereduce, Reds) ->
- Count = lists:sum([Count0 || {Count0, _} <- Reds]),
- UserReds = [UserRedsList || {_, UserRedsList} <- Reds],
- {ok, Reduced} = couch_query_servers:rereduce(Lang, FunSrcs,
- UserReds),
- {Count, Reduced}
- end,
- {ok, Btree} = couch_btree:open(BtreeState, Fd,
- [{less, fun couch_view:less_json_keys/2},
- {reduce, ReduceFun}]),
- View#view{btree=Btree}
- end,
- ViewStates, Views),
- Group#group{db=Db, fd=Fd, current_seq=Seq, purge_seq=PurgeSeq,
- id_btree=IdBtree, views=Views2}. \ No newline at end of file