diff options
Diffstat (limited to 'src/couchdb')
-rw-r--r-- | src/couchdb/couch_db.erl | 2 | ||||
-rw-r--r-- | src/couchdb/couch_db.hrl | 5 | ||||
-rw-r--r-- | src/couchdb/couch_file.erl | 65 | ||||
-rw-r--r-- | src/couchdb/couch_view.erl | 58 | ||||
-rw-r--r-- | src/couchdb/couch_view_group.erl | 325 | ||||
-rw-r--r-- | src/couchdb/couch_view_updater.erl | 225 |
6 files changed, 338 insertions, 342 deletions
diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl index 5da16b83..5a51a1ac 100644 --- a/src/couchdb/couch_db.erl +++ b/src/couchdb/couch_db.erl @@ -553,7 +553,7 @@ open_doc_int(Db, Id, Options) -> {ok, FullDocInfo} -> open_doc_int(Db, FullDocInfo, Options); not_found -> - throw({not_found, missing}) + {not_found, missing} end. doc_meta_info(DocInfo, RevTree, Options) -> diff --git a/src/couchdb/couch_db.hrl b/src/couchdb/couch_db.hrl index 6a48ab1c..eb7bd9a1 100644 --- a/src/couchdb/couch_db.hrl +++ b/src/couchdb/couch_db.hrl @@ -35,7 +35,7 @@ end). -define(LOG_ERROR(Format, Args), - error_logger:info_report(couch_error, {Format, Args})). + error_logger:error_report(couch_error, {Format, Args})). -record(doc_info, { @@ -162,7 +162,8 @@ id_btree=nil, current_seq=0, purge_seq=0, - query_server=nil + query_server=nil, + commit_fun }). -record(view, diff --git a/src/couchdb/couch_file.erl b/src/couchdb/couch_file.erl index d844d5b2..efca523a 100644 --- a/src/couchdb/couch_file.erl +++ b/src/couchdb/couch_file.erl @@ -33,23 +33,14 @@ open(Filepath) -> open(Filepath, []). open(Filepath, Options) -> - case gen_server:start_link(couch_file, {Filepath, Options, self()}, []) of - {ok, FdPid} -> - % we got back an ok, but that doesn't really mean it was successful. - % Instead the true status has been sent back to us as a message. - % We do this because if the gen_server doesn't initialize properly, - % it generates a crash report that will get logged. This avoids - % that mess, because we don't want crash reports generated - % every time a file cannot be found. + case gen_server:start_link(couch_file, + {Filepath, Options, self(), Ref = make_ref()}, []) of + {ok, Fd} -> + {ok, Fd}; + ignore -> + % get the error receive - {FdPid, ok} -> - {ok, FdPid}; - {FdPid, Error} -> - case process_info(self(), trap_exit) of - {trap_exit, true} -> - receive {'EXIT', FdPid, _} -> ok end; - _ -> ok - end, + {Ref, Error} -> Error end; Error -> @@ -235,12 +226,12 @@ read_header(Fd, Prefix) -> ?LOG_INFO("Header version differences.~nPrimary Header: ~p~nSecondary Header: ~p", [Header1, Header2]), {ok, Header1} end; - {error, Error} -> + Error -> % error reading second header. It's ok, but log it. ?LOG_INFO("Secondary header corruption (error: ~p). Using primary header.", [Error]), {ok, Header1} end; - {error, Error} -> + Error -> % error reading primary header case extract_header(Prefix, Bin2) of {ok, Header2} -> @@ -250,7 +241,7 @@ read_header(Fd, Prefix) -> _ -> % error reading secondary header too % return the error, no need to log anything as the caller will be responsible for dealing with the error. - {error, Error} + Error end end, case Result of @@ -277,26 +268,20 @@ extract_header(Prefix, Bin) -> Header = binary_to_term(TermBin), {ok, Header}; false -> - {error, header_corrupt} + header_corrupt end; _ -> - {error, unknown_header_type} + unknown_header_type end. - -init_status_ok(ReturnPid, Fd) -> - ReturnPid ! {self(), ok}, % signal back ok - {ok, Fd}. - -init_status_error(ReturnPid, Error) -> - ReturnPid ! {self(), Error}, % signal back error status - gen_server:cast(self(), close), % tell ourself to close async - {ok, nil}. +init_status_error(ReturnPid, Ref, Error) -> + ReturnPid ! {Ref, Error}, + ignore. % server functions -init({Filepath, Options, ReturnPid}) -> +init({Filepath, Options, ReturnPid, Ref}) -> case lists:member(create, Options) of true -> filelib:ensure_dir(Filepath), @@ -312,16 +297,16 @@ init({Filepath, Options, ReturnPid}) -> true -> {ok, 0} = file:position(Fd, 0), ok = file:truncate(Fd), - init_status_ok(ReturnPid, Fd); + {ok, Fd}; false -> ok = file:close(Fd), - init_status_error(ReturnPid, file_exists) + init_status_error(ReturnPid, Ref, file_exists) end; false -> - init_status_ok(ReturnPid, Fd) + {ok, Fd} end; Error -> - init_status_error(ReturnPid, Error) + init_status_error(ReturnPid, Ref, Error) end; false -> % open in read mode first, so we don't create the file if it doesn't exist. @@ -329,15 +314,13 @@ init({Filepath, Options, ReturnPid}) -> {ok, Fd_Read} -> {ok, Fd} = file:open(Filepath, [read, write, raw, binary]), ok = file:close(Fd_Read), - init_status_ok(ReturnPid, Fd); + {ok, Fd}; Error -> - init_status_error(ReturnPid, Error) + init_status_error(ReturnPid, Ref, Error) end end. -terminate(_Reason, nil) -> - ok; terminate(_Reason, Fd) -> file:close(Fd), ok. @@ -403,9 +386,7 @@ code_change(_OldVsn, State, _Extra) -> handle_info({'DOWN', MonitorRef, _Type, Pid, _Info}, Fd) -> {MonitorRef, _RefCount} = erase(Pid), - maybe_close_async(Fd); -handle_info(Info, Fd) -> - exit({error, {Info, Fd}}). + maybe_close_async(Fd). diff --git a/src/couchdb/couch_view.erl b/src/couchdb/couch_view.erl index 7c7730a7..84f327e3 100644 --- a/src/couchdb/couch_view.erl +++ b/src/couchdb/couch_view.erl @@ -27,8 +27,12 @@ get_temp_updater(DbName, Type, MapSrc, RedSrc) -> Pid. get_group_server(DbName, GroupId) -> - {ok, Pid} = gen_server:call(couch_view, {start_group_server, DbName, GroupId}), - Pid. + case gen_server:call(couch_view, {start_group_server, DbName, GroupId}) of + {ok, Pid} -> + Pid; + Error -> + throw(Error) + end. get_updated_group(DbName, GroupId, Update) -> couch_view_group:request_group(get_group_server(DbName, GroupId), seq_for_update(DbName, Update)). @@ -45,10 +49,10 @@ get_reduce_view({temp, DbName, Type, MapSrc, RedSrc}) -> {ok, {temp_reduce, View}}; get_reduce_view({DbName, GroupId, Name}) -> case get_updated_group(DbName, GroupId, true) of - {error, Reason} -> - Reason; {ok, #group{views=Views,def_lang=Lang}} -> - get_reduce_view0(Name, Lang, Views) + get_reduce_view0(Name, Lang, Views); + Error -> + Error end. get_reduce_view0(_Name, _Lang, []) -> @@ -124,10 +128,10 @@ get_map_view({temp, DbName, Type, Src}) -> {ok, View}; get_map_view({DbName, GroupId, Name, Update}) -> case get_updated_group(DbName, GroupId, Update) of - {error, Reason} -> - Reason; {ok, #group{views=Views}} -> - get_map_view0(Name, Views) + get_map_view0(Name, Views); + Error -> + Error end. get_map_view0(_Name, []) -> @@ -217,7 +221,7 @@ handle_call({start_temp_updater, DbName, Lang, MapSrc, RedSrc}, _From, #server{r ok end, ?LOG_DEBUG("Spawning new temp update process for db ~s.", [DbName]), - {ok, NewPid} = couch_view_group:start_link({DbName, Fd, Lang, MapSrc, RedSrc}), + {ok, NewPid} = couch_view_group:start_link({temp_view, DbName, Fd, Lang, MapSrc, RedSrc}), true = ets:insert(couch_temp_group_fd_by_db, {DbName, Fd, Count + 1}), add_to_ets(NewPid, DbName, Name), NewPid; @@ -226,17 +230,19 @@ handle_call({start_temp_updater, DbName, Lang, MapSrc, RedSrc}, _From, #server{r end, {reply, {ok, Pid}, Server}; handle_call({start_group_server, DbName, GroupId}, _From, #server{root_dir=Root}=Server) -> - Pid = case ets:lookup(group_servers_by_name, {DbName, GroupId}) of [] -> ?LOG_DEBUG("Spawning new group server for view group ~s in database ~s.", [GroupId, DbName]), - {ok, NewPid} = couch_view_group:start_link({Root, DbName, GroupId}), - add_to_ets(NewPid, DbName, GroupId), - NewPid; - [{_, ExistingPid0}] -> - ExistingPid0 - end, - {reply, {ok, Pid}, Server}. + case couch_view_group:start_link({view, Root, DbName, GroupId}) of + {ok, NewPid} -> + add_to_ets(NewPid, DbName, GroupId), + {reply, {ok, NewPid}, Server}; + Error -> + {reply, Error, Server} + end; + [{_, ExistingPid}] -> + {reply, {ok, ExistingPid}, Server} + end. handle_cast({reset_indexes, DbName}, #server{root_dir=Root}=Server) -> % shutdown all the updaters @@ -254,14 +260,15 @@ handle_cast({reset_indexes, DbName}, #server{root_dir=Root}=Server) -> file:delete(Root ++ "/." ++ binary_to_list(DbName) ++ "_temp"), {noreply, Server}. -handle_info({'EXIT', _FromPid, normal}, Server) -> - {noreply, Server}; handle_info({'EXIT', FromPid, Reason}, #server{root_dir=RootDir}=Server) -> - ?LOG_DEBUG("Exit from process: ~p", [{FromPid, Reason}]), case ets:lookup(couch_groups_by_updater, FromPid) of - [] -> % non-updater linked process must have died, we propagate the error - ?LOG_ERROR("Exit on non-updater process: ~p", [Reason]), - exit(Reason); + [] -> + if Reason /= normal -> + % non-updater linked process died, we propagate the error + ?LOG_ERROR("Exit on non-updater process: ~p", [Reason]), + exit(Reason); + true -> ok + end; [{_, {DbName, "_temp_" ++ _ = GroupId}}] -> delete_from_ets(FromPid, DbName, GroupId), [{_, Fd, Count}] = ets:lookup(couch_temp_group_fd_by_db, DbName), @@ -276,10 +283,7 @@ handle_info({'EXIT', FromPid, Reason}, #server{root_dir=RootDir}=Server) -> [{_, {DbName, GroupId}}] -> delete_from_ets(FromPid, DbName, GroupId) end, - {noreply, Server}; -handle_info(Msg, _Server) -> - ?LOG_ERROR("Bad message received for view module: ~p", [Msg]), - exit({error, Msg}). + {noreply, Server}. add_to_ets(Pid, DbName, GroupId) -> true = ets:insert(couch_groups_by_updater, {Pid, {DbName, GroupId}}), diff --git a/src/couchdb/couch_view_group.erl b/src/couchdb/couch_view_group.erl index 84547095..cbbac92a 100644 --- a/src/couchdb/couch_view_group.erl +++ b/src/couchdb/couch_view_group.erl @@ -15,7 +15,7 @@ %% API -export([start_link/1, request_group/2]). -% -export([design_doc_to_view_group/1]). +-export([design_doc_to_view_group/1]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -24,10 +24,9 @@ -include("couch_db.hrl"). -record(group_state, { - spawn_fun, - target_seq=0, - group_seq=0, - group=nil, + db_name, + init_args, + group, updater_pid=nil, waiting_list=[] }). @@ -37,7 +36,6 @@ request_group(Pid, Seq) -> ?LOG_DEBUG("request_group {Pid, Seq} ~p", [{Pid, Seq}]), case gen_server:call(Pid, {request_group, Seq}, infinity) of {ok, Group} -> - ?LOG_DEBUG("get_updated_group replied with group", []), {ok, Group}; Else -> ?LOG_DEBUG("get_updated_group replied with _Else ~p", [Else]), @@ -47,15 +45,34 @@ request_group(Pid, Seq) -> % from template start_link(InitArgs) -> - gen_server:start_link(couch_view_group, InitArgs, []). + case gen_server:start_link(couch_view_group, + {InitArgs, self(), Ref = make_ref()}, []) of + {ok, Pid} -> {ok, Pid}; + ignore -> receive {Ref, Error} -> Error end; + Error -> Error + end. % init differentiates between temp and design_doc views. It creates a closure % which spawns the appropriate view_updater. (It might also spawn the first % view_updater run.) -init(InitArgs) -> - SpawnFun = fun() -> spawn_updater(InitArgs) end, +init({InitArgs, ReturnPid, Ref}) -> process_flag(trap_exit, true), - {ok, #group_state{spawn_fun=SpawnFun}}. + case prepare_group(InitArgs, false) of + {ok, #group{db=Db}=Group} -> + couch_db:monitor(Db), + Pid = spawn_link(fun()-> couch_view_updater:update(Group) end), + {ok, #group_state{ + db_name=couch_db:name(Db), + init_args=InitArgs, + updater_pid = Pid, + group=Group}}; + Error -> + ReturnPid ! {Ref, Error}, + ignore + end. + + + % There are two sources of messages: couch_view, which requests an up to date % view group, and the couch_view_updater, which when spawned, updates the @@ -73,87 +90,92 @@ init(InitArgs) -> handle_call({request_group, RequestSeq}, From, #group_state{ - target_seq=TargetSeq, - spawn_fun=SpawnFun, - updater_pid=Up, - waiting_list=WaitList - }=State) when RequestSeq > TargetSeq, Up == nil -> - UpdaterPid = SpawnFun(), - {noreply, State#group_state{ - target_seq=RequestSeq, - updater_pid=UpdaterPid, - waiting_list=[{From,RequestSeq}|WaitList] - }, infinity}; - -handle_call({request_group, RequestSeq}, From, - #group_state{ - target_seq=TargetSeq, + db_name=DbName, + group=#group{current_seq=Seq}=Group, + updater_pid=nil, waiting_list=WaitList - }=State) when RequestSeq > TargetSeq -> + }=State) when RequestSeq > Seq -> + {ok, Db} = couch_db:open(DbName, []), + Group2 = Group#group{db=Db}, + Pid = spawn_link(fun()-> couch_view_updater:update(Group2) end), + {noreply, State#group_state{ - target_seq=RequestSeq, + updater_pid=Pid, + group=Group2, waiting_list=[{From,RequestSeq}|WaitList] - }, infinity}; + }, infinity}; % If the request seqence is less than or equal to the seq_id of a known Group, % we respond with that Group. handle_call({request_group, RequestSeq}, _From, - State=#group_state{ - group_seq=GroupSeq, - group=Group - }) when RequestSeq =< GroupSeq -> + #group_state{group=#group{current_seq=GroupSeq}=Group}=State) + when RequestSeq =< GroupSeq -> {reply, {ok, Group}, State}; % Otherwise: TargetSeq => RequestSeq > GroupSeq % We've already initiated the appropriate action, so just hold the response until the group is up to the RequestSeq handle_call({request_group, RequestSeq}, From, - #group_state{ - waiting_list=WaitList - }=State) -> + #group_state{waiting_list=WaitList}=State) -> {noreply, State#group_state{ waiting_list=[{From, RequestSeq}|WaitList] - }, infinity}. - - -% When the updater finishes, it will return a group with a seq_id, we should -% store that group and seq_id in our state. If our high_target is higher than -% the returned group, start a new updater. - -handle_cast({new_group, Group=#group{current_seq=NewGroupSeq}}, - State=#group_state{ - target_seq=TargetSeq, - waiting_list=WaitList, - spawn_fun=SpawnFun}) when TargetSeq > NewGroupSeq -> - StillWaiting = reply_with_group(Group, WaitList, []), - UpdaterPid = SpawnFun(), - {noreply, State#group_state{ - updater_pid=UpdaterPid, - waiting_list=StillWaiting, - group_seq=NewGroupSeq, - group=Group}}; - -handle_cast({new_group, Group=#group{current_seq=NewGroupSeq}}, - State=#group_state{waiting_list=WaitList}) -> - StillWaiting = reply_with_group(Group, WaitList, []), - {noreply, State#group_state{ - updater_pid=nil, - waiting_list=StillWaiting, - group_seq=NewGroupSeq, - group=Group}}. - + }, infinity}. + + +handle_cast(foo, State) -> + {ok, State}. + + +handle_info({'EXIT', FromPid, {new_group, #group{db=Db}=Group}}, + #group_state{db_name=DbName, + updater_pid=UpPid, + waiting_list=WaitList}=State) when UpPid == FromPid -> + ok = couch_db:close(Db), + case reply_with_group(Group, WaitList, []) of + [] -> + {noreply, State#group_state{waiting_list=[], + group=Group#group{db=nil}, + updater_pid=nil}}; + StillWaiting -> + % we still have some waiters, reopen the database and reupdate the index + {ok, Db2} = couch_db:open(DbName, []), + Group2 = Group#group{db=Db2}, + Pid = spawn_link(fun() -> couch_view_updater:update(Group2) end), + {noreply, State#group_state{waiting_list=StillWaiting, + group=Group2, + updater_pid=Pid}} + end; + +handle_info({'EXIT', FromPid, reset}, + #group_state{ + init_args=InitArgs, + updater_pid=UpPid, + group=Group}=State) when UpPid == FromPid -> + ok = couch_db:close(Group#group.db), + case prepare_group(InitArgs, true) of + {ok, ResetGroup} -> + Pid = spawn_link(fun()-> couch_view_updater:update(ResetGroup) end), + {noreply, State#group_state{ + updater_pid=Pid, + group=ResetGroup}}; + Error -> + {stop, normal, reply_all(State, Error)} + end; + handle_info({'EXIT', _FromPid, normal}, State) -> {noreply, State}; handle_info({'EXIT', FromPid, Reason}, State) -> - ?LOG_DEBUG("Exit from updater: ~p", [{FromPid, Reason}]), + ?LOG_DEBUG("Exit from linked pid: ~p", [{FromPid, Reason}]), {stop, Reason, State}; -handle_info(_Info, State) -> - {noreply, State}. +handle_info({'DOWN',_,_,_,_}, State) -> + ?LOG_INFO("Shutting down view group server, monitored db is closing.", []), + {stop, normal, reply_all(State, shutdown)}. + -terminate(Reason, _State=#group_state{waiting_list=WaitList}) -> - lists:foreach(fun({Waiter, _}) -> gen_server:reply(Waiter, {error, Reason}) end, WaitList), +terminate(Reason, #group_state{group=#group{fd=Fd}}=State) -> + reply_all(State, Reason), ok. code_change(_OldVsn, State, _Extra) -> @@ -181,12 +203,165 @@ reply_with_group(Group, [{Pid, Seq}|WaitList], StillWaiting) -> reply_with_group(_Group, [], StillWaiting) -> StillWaiting. -spawn_updater({RootDir, DbName, GroupId}) -> - spawn_link(couch_view_updater, update, - [RootDir, DbName, GroupId, self()]); +reply_all(#group_state{waiting_list=WaitList}=State, Reply) -> + [catch gen_server:reply(Pid, Reply) || {Pid, _} <- WaitList], + State#group_state{waiting_list=[]}. + +prepare_group({view, RootDir, DbName, GroupId}, ForceReset)-> + case open_db_group(DbName, GroupId) of + {ok, Db, #group{sig=Sig}=Group0} -> + case open_index_file(RootDir, DbName, GroupId) of + {ok, Fd} -> + Group = Group0#group{ + commit_fun = fun(GroupIn) -> + Header = {Sig, get_index_header_data(GroupIn)}, + ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, Header) + end}, + if ForceReset -> + {ok, reset_file(Db, Fd, DbName, Group)}; + true -> + case (catch couch_file:read_header(Fd, <<$r, $c, $k, 0>>)) of + {ok, {Sig, HeaderInfo}} -> + % sigs match! + {ok, init_group(Db, Fd, Group, HeaderInfo)}; + _ -> + {ok, reset_file(Db, Fd, DbName, Group)} + end + end; + Error -> + catch delete_index_file(RootDir, DbName, GroupId), + Error + end; + Error -> + catch delete_index_file(RootDir, DbName, GroupId), + Error + end; +prepare_group({temp_view, DbName, Fd, Lang, MapSrc, RedSrc}, _ForceReset) -> + case couch_db:open(DbName, []) of + {ok, Db} -> + View = #view{map_names=["_temp"], + id_num=0, + btree=nil, + def=MapSrc, + reduce_funs= if RedSrc==[] -> []; true -> [{"_temp", RedSrc}] end}, + {ok, init_group(Db, Fd, #group{name="_temp", db=Db, views=[View], + def_lang=Lang, commit_fun=fun(_G) -> ok end}, nil)}; + Error -> + Error + end. + + +get_index_header_data(#group{current_seq=Seq, purge_seq=PurgeSeq, + id_btree=IdBtree,views=Views}) -> + ViewStates = [couch_btree:get_state(Btree) || #view{btree=Btree} <- Views], + #index_header{seq=Seq, + purge_seq=PurgeSeq, + id_btree_state=couch_btree:get_state(IdBtree), + view_states=ViewStates}. + -spawn_updater({DbName, Fd, Lang, MapSrc, RedSrc}) -> - spawn_link(couch_view_updater, temp_update, - [DbName, Fd, Lang, MapSrc, RedSrc, self()]). +open_index_file(RootDir, DbName, GroupId) -> + FileName = RootDir ++ "/." ++ ?b2l(DbName) ++ ?b2l(GroupId) ++".view", + case couch_file:open(FileName) of + {ok, Fd} -> {ok, Fd}; + {error, enoent} -> couch_file:open(FileName, [create]); + Error -> Error + end. +open_db_group(DbName, GroupId) -> + case couch_db:open(DbName, []) of + {ok, Db} -> + case couch_db:open_doc(Db, GroupId) of + {ok, Doc} -> + {ok, Db, design_doc_to_view_group(Doc)}; + Else -> + couch_db:close(Db), + Else + end; + Else -> + Else + end. + +% maybe move to another module +design_doc_to_view_group(#doc{id=Id,body={Fields}}) -> + Language = proplists:get_value(<<"language">>, Fields, <<"javascript">>), + {RawViews} = proplists:get_value(<<"views">>, Fields, {[]}), + + % add the views to a dictionary object, with the map source as the key + DictBySrc = + lists:foldl( + fun({Name, {MRFuns}}, DictBySrcAcc) -> + MapSrc = proplists:get_value(<<"map">>, MRFuns), + RedSrc = proplists:get_value(<<"reduce">>, MRFuns, null), + View = + case dict:find(MapSrc, DictBySrcAcc) of + {ok, View0} -> View0; + error -> #view{def=MapSrc} % create new view object + end, + View2 = + if RedSrc == null -> + View#view{map_names=[Name|View#view.map_names]}; + true -> + View#view{reduce_funs=[{Name,RedSrc}|View#view.reduce_funs]} + end, + dict:store(MapSrc, View2, DictBySrcAcc) + end, dict:new(), RawViews), + % number the views + {Views, _N} = lists:mapfoldl( + fun({_Src, View}, N) -> + {View#view{id_num=N},N+1} + end, 0, dict:to_list(DictBySrc)), + + Group = #group{name=Id, views=Views, def_lang=Language}, + Group#group{sig=erlang:md5(term_to_binary(Group))}. + +reset_group(#group{views=Views}=Group) -> + Views2 = [View#view{btree=nil} || View <- Views], + Group#group{db=nil,fd=nil,query_server=nil,current_seq=0, + id_btree=nil,views=Views2}. + +reset_file(Db, Fd, DbName, #group{sig=Sig,name=Name} = Group) -> + ?LOG_DEBUG("Reseting group index \"~s\" in db ~s", [Name, DbName]), + ok = couch_file:truncate(Fd, 0), + ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, {Sig, nil}), + init_group(Db, Fd, reset_group(Group), nil). + +delete_index_file(RootDir, DbName, GroupId) -> + file:delete(RootDir ++ "/." ++ binary_to_list(DbName) + ++ binary_to_list(GroupId) ++ ".view"). + +init_group(Db, Fd, #group{views=Views}=Group, nil) -> + init_group(Db, Fd, Group, + #index_header{seq=0, purge_seq=couch_db:get_purge_seq(Db), + id_btree_state=nil, view_states=[nil || _ <- Views]}); +init_group(Db, Fd, #group{def_lang=Lang,views=Views}=Group, IndexHeader) -> + #index_header{seq=Seq, purge_seq=PurgeSeq, + id_btree_state=IdBtreeState, view_states=ViewStates} = IndexHeader, + {ok, IdBtree} = couch_btree:open(IdBtreeState, Fd), + Views2 = lists:zipwith( + fun(BtreeState, #view{reduce_funs=RedFuns}=View) -> + FunSrcs = [FunSrc || {_Name, FunSrc} <- RedFuns], + ReduceFun = + fun(reduce, KVs) -> + KVs2 = couch_view:expand_dups(KVs,[]), + KVs3 = couch_view:detuple_kvs(KVs2,[]), + {ok, Reduced} = couch_query_servers:reduce(Lang, FunSrcs, + KVs3), + {length(KVs3), Reduced}; + (rereduce, Reds) -> + Count = lists:sum([Count0 || {Count0, _} <- Reds]), + UserReds = [UserRedsList || {_, UserRedsList} <- Reds], + {ok, Reduced} = couch_query_servers:rereduce(Lang, FunSrcs, + UserReds), + {Count, Reduced} + end, + {ok, Btree} = couch_btree:open(BtreeState, Fd, + [{less, fun couch_view:less_json_keys/2}, + {reduce, ReduceFun}]), + View#view{btree=Btree} + end, + ViewStates, Views), + Group#group{db=Db, fd=Fd, current_seq=Seq, purge_seq=PurgeSeq, + id_btree=IdBtree, views=Views2}. + diff --git a/src/couchdb/couch_view_updater.erl b/src/couchdb/couch_view_updater.erl index 35bc5ffe..0532258c 100644 --- a/src/couchdb/couch_view_updater.erl +++ b/src/couchdb/couch_view_updater.erl @@ -12,83 +12,49 @@ -module(couch_view_updater). --export([update/4, temp_update/6]). +-export([update/1]). -include("couch_db.hrl"). - - -update(RootDir, DbName, GroupId, NotifyPid) -> - {ok, #group{sig=Sig,fd=Fd}=Group} = prepare_group(RootDir, DbName, GroupId), - {ok, Db} = couch_db:open(DbName, []), - Result = update_group(Group#group{db=Db}), - couch_db:close(Db), - case Result of - {same, Group2} -> - gen_server:cast(NotifyPid, {new_group, Group2}); - {updated, Group2} -> - HeaderData = {Sig, get_index_header_data(Group2)}, - ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, HeaderData), - gen_server:cast(NotifyPid, {new_group, Group2}) +update(#group{db=Db,current_seq=Seq,purge_seq=PurgeSeq, + commit_fun=CommitFun}=Group) -> + ?LOG_DEBUG("Starting index update.",[]), + DbPurgeSeq = couch_db:get_purge_seq(Db), + Group2 = + if DbPurgeSeq == PurgeSeq -> + Group; + DbPurgeSeq == PurgeSeq + 1 -> + ?LOG_DEBUG("Purging entries from view index.",[]), + purge_index(Group); + true -> + ?LOG_DEBUG("Resetting view index due to lost purge entries.",[]), + exit(reset) end, - garbage_collect(). -temp_update(DbName, Fd, Lang, MapSrc, RedSrc, NotifyPid) -> - case couch_db:open(DbName, []) of - {ok, Db} -> - View = #view{map_names=["_temp"], - id_num=0, - btree=nil, - def=MapSrc, - reduce_funs= if RedSrc==[] -> []; true -> [{"_temp", RedSrc}] end}, - Group = #group{name="_temp", - db=Db, - views=[View], - current_seq=0, - def_lang=Lang, - id_btree=nil}, - Group2 = init_group(Db, Fd, Group,nil), - couch_db:monitor(Db), - {_Updated, Group3} = update_group(Group2#group{db=Db}), - couch_db:close(Db), - gen_server:cast(NotifyPid, {new_group, Group3}), - garbage_collect(); - Else -> - exit(Else) - end. - - -update_group(#group{db=Db,current_seq=CurrentSeq}=Group) -> - ViewEmptyKVs = [{View, []} || View <- Group#group.views], + ViewEmptyKVs = [{View, []} || View <- Group2#group.views], % compute on all docs modified since we last computed. {ok, {UncomputedDocs, Group3, ViewKVsToAdd, DocIdViewIdKeys}} = couch_db:enum_docs_since( Db, - CurrentSeq, + Seq, fun(DocInfo, _, Acc) -> process_doc(Db, DocInfo, Acc) end, - {[], Group, ViewEmptyKVs, []} + {[], Group2, ViewEmptyKVs, []} ), {Group4, Results} = view_compute(Group3, UncomputedDocs), - {ViewKVsToAdd2, DocIdViewIdKeys2} = view_insert_query_results(UncomputedDocs, Results, ViewKVsToAdd, DocIdViewIdKeys), + {ViewKVsToAdd2, DocIdViewIdKeys2} = view_insert_query_results( + UncomputedDocs, Results, ViewKVsToAdd, DocIdViewIdKeys), couch_query_servers:stop_doc_map(Group4#group.query_server), NewSeq = couch_db:get_update_seq(Db), - if CurrentSeq /= NewSeq -> - {ok, Group5} = write_changes(Group4, ViewKVsToAdd2, DocIdViewIdKeys2, NewSeq), - {updated, Group5#group{query_server=nil}}; + if Seq /= NewSeq -> + {ok, Group5} = write_changes(Group4, ViewKVsToAdd2, DocIdViewIdKeys2, + NewSeq), + ok = CommitFun(Group5), + exit({new_group, Group5#group{query_server=nil}}); true -> - {same, Group4#group{query_server=nil}} + exit({new_group, Group4#group{query_server=nil}}) end. -get_index_header_data(#group{current_seq=Seq, purge_seq=PurgeSeq, - id_btree=IdBtree,views=Views}) -> - ViewStates = [couch_btree:get_state(Btree) || #view{btree=Btree} <- Views], - #index_header{seq=Seq, - purge_seq=PurgeSeq, - id_btree_state=couch_btree:get_state(IdBtree), - view_states=ViewStates}. - - purge_index(#group{db=Db, views=Views, id_btree=IdBtree}=Group) -> {ok, PurgedIdsRevs} = couch_db:get_last_purged(Db), Ids = [Id || {Id, _Revs} <- PurgedIdsRevs], @@ -120,7 +86,8 @@ purge_index(#group{db=Db, views=Views, id_btree=IdBtree}=Group) -> views=Views2, purge_seq=couch_db:get_purge_seq(Db)}. -process_doc(Db, DocInfo, {Docs, #group{sig=Sig,name=GroupId}=Group, ViewKVs, DocIdViewIdKeys}) -> +process_doc(Db, DocInfo, {Docs, #group{sig=Sig,name=GroupId}=Group, ViewKVs, + DocIdViewIdKeys}) -> % This fun computes once for each document #doc_info{id=DocId, deleted=Deleted} = DocInfo, case DocId of @@ -129,17 +96,15 @@ process_doc(Db, DocInfo, {Docs, #group{sig=Sig,name=GroupId}=Group, ViewKVs, Doc % anything in the definition changed. case couch_db:open_doc(Db, DocInfo) of {ok, Doc} -> - case design_doc_to_view_group(Doc) of + case couch_view_group:design_doc_to_view_group(Doc) of #group{sig=Sig} -> % The same md5 signature, keep on computing {ok, {Docs, Group, ViewKVs, DocIdViewIdKeys}}; _ -> - ?LOG_DEBUG("throw(restart) md5 broke ~p", [DocId]), - throw(restart) + exit(reset) end; {not_found, deleted} -> - ?LOG_DEBUG("throw(restart) {not_found, deleted} ~p", [DocId]), - throw(restart) + exit(reset) end; <<?DESIGN_DOC_PREFIX, _/binary>> -> % we skip design docs {ok, {Docs, Group, ViewKVs, DocIdViewIdKeys}}; @@ -250,134 +215,4 @@ write_changes(Group, ViewKeyValuesToAdd, DocIdViewIdKeys, NewSeq) -> ], Group2 = Group#group{views=Views2, current_seq=NewSeq, id_btree=IdBtree2}, {ok, Group2}. - -prepare_group(RootDir, DbName, GroupId) -> - {Db, Group} = case (catch couch_db:open(DbName, [])) of - {ok, Db0} -> - case (catch couch_db:open_doc(Db0, GroupId)) of - {ok, Doc} -> - {Db0, design_doc_to_view_group(Doc)}; - Else -> - delete_index_file(RootDir, DbName, GroupId), - ?LOG_DEBUG("prepare_group exit Else ~p self() ~p", [Else, self()]), - exit(Else) - end; - Else -> - delete_index_file(RootDir, DbName, GroupId), - exit(Else) - end, - FileName = RootDir ++ "/." ++ binary_to_list(DbName) ++ - binary_to_list(GroupId) ++".view", - Group2 = - case couch_file:open(FileName) of - {ok, Fd} -> - Sig = Group#group.sig, - case (catch couch_file:read_header(Fd, <<$r, $c, $k, 0>>)) of - {ok, {Sig, #index_header{purge_seq=PurgeSeq}=HeaderInfo}} -> - % sigs match! - DbPurgeSeq = couch_db:get_purge_seq(Db), - % We can only use index with the same, or next purge seq as the db. - if DbPurgeSeq == PurgeSeq -> - init_group(Db, Fd, Group, HeaderInfo); - DbPurgeSeq == PurgeSeq + 1 -> - ?LOG_DEBUG("Purging entries from view index.",[]), - purge_index(init_group(Db, Fd, Group, HeaderInfo)); - true -> - ?LOG_DEBUG("Reseting view index due to lost purge entries.",[]), - reset_file(Db, Fd, DbName, Group) - end; - _ -> - reset_file(Db, Fd, DbName, Group) - end; - {error, enoent} -> - case couch_file:open(FileName, [create]) of - {ok, Fd} -> reset_file(Db, Fd, DbName, Group); - Error -> throw(Error) - end - end, - - couch_db:monitor(Db), - couch_db:close(Db), - {ok, Group2}. - -% maybe move to another module -design_doc_to_view_group(#doc{id=Id,body={Fields}}) -> - Language = proplists:get_value(<<"language">>, Fields, <<"javascript">>), - {RawViews} = proplists:get_value(<<"views">>, Fields, {[]}), - % add the views to a dictionary object, with the map source as the key - DictBySrc = - lists:foldl( - fun({Name, {MRFuns}}, DictBySrcAcc) -> - MapSrc = proplists:get_value(<<"map">>, MRFuns), - RedSrc = proplists:get_value(<<"reduce">>, MRFuns, null), - View = - case dict:find(MapSrc, DictBySrcAcc) of - {ok, View0} -> View0; - error -> #view{def=MapSrc} % create new view object - end, - View2 = - if RedSrc == null -> - View#view{map_names=[Name|View#view.map_names]}; - true -> - View#view{reduce_funs=[{Name,RedSrc}|View#view.reduce_funs]} - end, - dict:store(MapSrc, View2, DictBySrcAcc) - end, dict:new(), RawViews), - % number the views - {Views, _N} = lists:mapfoldl( - fun({_Src, View}, N) -> - {View#view{id_num=N},N+1} - end, 0, dict:to_list(DictBySrc)), - - Group = #group{name=Id, views=Views, def_lang=Language}, - Group#group{sig=erlang:md5(term_to_binary(Group))}. - -reset_group(#group{views=Views}=Group) -> - Views2 = [View#view{btree=nil} || View <- Views], - Group#group{db=nil,fd=nil,query_server=nil,current_seq=0, - id_btree=nil,views=Views2}. - -reset_file(Db, Fd, DbName, #group{sig=Sig,name=Name} = Group) -> - ?LOG_DEBUG("Reseting group index \"~s\" in db ~s", [Name, DbName]), - ok = couch_file:truncate(Fd, 0), - ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, {Sig, nil}), - init_group(Db, Fd, reset_group(Group), nil). - -delete_index_file(RootDir, DbName, GroupId) -> - file:delete(RootDir ++ "/." ++ binary_to_list(DbName) - ++ binary_to_list(GroupId) ++ ".view"). - -init_group(Db, Fd, #group{views=Views}=Group, nil) -> - init_group(Db, Fd, Group, - #index_header{seq=0, purge_seq=couch_db:get_purge_seq(Db), - id_btree_state=nil, view_states=[nil || _ <- Views]}); -init_group(Db, Fd, #group{def_lang=Lang,views=Views}=Group, IndexHeader) -> - #index_header{seq=Seq, purge_seq=PurgeSeq, - id_btree_state=IdBtreeState, view_states=ViewStates} = IndexHeader, - {ok, IdBtree} = couch_btree:open(IdBtreeState, Fd), - Views2 = lists:zipwith( - fun(BtreeState, #view{reduce_funs=RedFuns}=View) -> - FunSrcs = [FunSrc || {_Name, FunSrc} <- RedFuns], - ReduceFun = - fun(reduce, KVs) -> - KVs2 = couch_view:expand_dups(KVs,[]), - KVs3 = couch_view:detuple_kvs(KVs2,[]), - {ok, Reduced} = couch_query_servers:reduce(Lang, FunSrcs, - KVs3), - {length(KVs3), Reduced}; - (rereduce, Reds) -> - Count = lists:sum([Count0 || {Count0, _} <- Reds]), - UserReds = [UserRedsList || {_, UserRedsList} <- Reds], - {ok, Reduced} = couch_query_servers:rereduce(Lang, FunSrcs, - UserReds), - {Count, Reduced} - end, - {ok, Btree} = couch_btree:open(BtreeState, Fd, - [{less, fun couch_view:less_json_keys/2}, - {reduce, ReduceFun}]), - View#view{btree=Btree} - end, - ViewStates, Views), - Group#group{db=Db, fd=Fd, current_seq=Seq, purge_seq=PurgeSeq, - id_btree=IdBtree, views=Views2}.
\ No newline at end of file |