diff options
Diffstat (limited to 'apps/couch/src/couch_view_group.erl')
-rw-r--r-- | apps/couch/src/couch_view_group.erl | 631 |
1 files changed, 631 insertions, 0 deletions
diff --git a/apps/couch/src/couch_view_group.erl b/apps/couch/src/couch_view_group.erl new file mode 100644 index 00000000..87e4cd2e --- /dev/null +++ b/apps/couch/src/couch_view_group.erl @@ -0,0 +1,631 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_view_group). +-behaviour(gen_server). + +%% API +-export([start_link/1, request_group/2, trigger_group_update/2, request_group_info/1]). +-export([open_db_group/2, open_temp_group/5, design_doc_to_view_group/1,design_root/2]). + +%% Exports for the compactor +-export([get_index_header_data/1]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-include("couch_db.hrl"). + +-record(group_state, { + type, + db_name, + init_args, + group, + updater_pid=nil, + compactor_pid=nil, + waiting_commit=false, + waiting_list=[], + ref_counter=nil +}). + +% api methods +request_group(Pid, Seq) -> + ?LOG_DEBUG("request_group {Pid, Seq} ~p", [{Pid, Seq}]), + case gen_server:call(Pid, {request_group, Seq}, infinity) of + {ok, Group, _RefCounter} -> + {ok, Group}; + Error -> + ?LOG_DEBUG("request_group Error ~p", [Error]), + throw(Error) + end. + +request_group_info(Pid) -> + case gen_server:call(Pid, request_group_info) of + {ok, GroupInfoList} -> + {ok, GroupInfoList}; + Error -> + throw(Error) + end. + +trigger_group_update(Pid, RequestSeq) -> + gen_server:cast(Pid, {update_group, RequestSeq}). + +% from template +start_link(InitArgs) -> + case gen_server:start_link(couch_view_group, + {InitArgs, self(), Ref = make_ref()}, []) of + {ok, Pid} -> + {ok, Pid}; + ignore -> + receive + {Ref, Pid, Error} -> + case process_info(self(), trap_exit) of + {trap_exit, true} -> receive {'EXIT', Pid, _} -> ok end; + {trap_exit, false} -> ok + end, + Error + end; + Error -> + Error + end. + +% init creates a closure which spawns the appropriate view_updater. +init({{_, DbName, _} = InitArgs, ReturnPid, Ref}) -> + process_flag(trap_exit, true), + case prepare_group(InitArgs, false) of + {ok, Db, #group{fd=Fd, current_seq=Seq}=Group} -> + case Seq > couch_db:get_update_seq(Db) of + true -> + ReturnPid ! {Ref, self(), {error, invalid_view_seq}}, + couch_db:close(Db), + ignore; + _ -> + try couch_db:monitor(Db) after couch_db:close(Db) end, + {ok, #group_state{ + db_name=DbName, + init_args=InitArgs, + group=Group, + ref_counter=erlang:monitor(process,Fd)}} + end; + Error -> + ReturnPid ! {Ref, self(), Error}, + ignore + end. + + + + +% There are two sources of messages: couch_view, which requests an up to date +% view group, and the couch_view_updater, which when spawned, updates the +% group and sends it back here. We employ a caching mechanism, so that between +% database writes, we don't have to spawn a couch_view_updater with every view +% request. + +% The caching mechanism: each request is submitted with a seq_id for the +% database at the time it was read. We guarantee to return a view from that +% sequence or newer. + +% If the request sequence is higher than our current high_target seq, we set +% that as the highest seqence. If the updater is not running, we launch it. + +handle_call({request_group, RequestSeq}, From, + #group_state{ + db_name=DbName, + group=#group{current_seq=Seq}=Group, + updater_pid=nil, + waiting_list=WaitList + }=State) when RequestSeq > Seq -> + Owner = self(), + Pid = spawn_link(fun()-> couch_view_updater:update(Owner, Group, DbName) end), + + {noreply, State#group_state{ + updater_pid=Pid, + waiting_list=[{From,RequestSeq}|WaitList] + }, infinity}; + + +% If the request seqence is less than or equal to the seq_id of a known Group, +% we respond with that Group. +handle_call({request_group, RequestSeq}, _From, #group_state{ + group = #group{current_seq=GroupSeq} = Group, + ref_counter = RefCounter + } = State) when RequestSeq =< GroupSeq -> + {reply, {ok, Group, RefCounter}, State}; + +% Otherwise: TargetSeq => RequestSeq > GroupSeq +% We've already initiated the appropriate action, so just hold the response until the group is up to the RequestSeq +handle_call({request_group, RequestSeq}, From, + #group_state{waiting_list=WaitList}=State) -> + {noreply, State#group_state{ + waiting_list=[{From, RequestSeq}|WaitList] + }, infinity}; + +handle_call(request_group_info, _From, State) -> + GroupInfo = get_group_info(State), + {reply, {ok, GroupInfo}, State}; + +handle_call({start_compact, CompactFun}, _From, #group_state{compactor_pid=nil} + = State) -> + #group_state{ + group = #group{name = GroupId, sig = GroupSig} = Group, + init_args = {RootDir, DbName, _} + } = State, + ?LOG_INFO("View index compaction starting for ~s ~s", [DbName, GroupId]), + {ok, Db} = couch_db:open_int(DbName, []), + {ok, Fd} = open_index_file(compact, RootDir, DbName, GroupSig), + NewGroup = reset_file(Db, Fd, DbName, Group), + couch_db:close(Db), + Pid = spawn_link(fun() -> CompactFun(Group, NewGroup, DbName) end), + {reply, {ok, Pid}, State#group_state{compactor_pid = Pid}}; +handle_call({start_compact, _}, _From, #group_state{compactor_pid=Pid} = State) -> + %% compact already running, this is a no-op + {reply, {ok, Pid}, State}; + +handle_call({compact_done, #group{fd=NewFd, current_seq=NewSeq} = NewGroup}, _From, + #group_state{group = #group{current_seq=OldSeq}} = State) + when NewSeq >= OldSeq -> + #group_state{ + group = #group{name=GroupId, fd=OldFd, sig=GroupSig}, + init_args = {RootDir, DbName, _}, + updater_pid = UpdaterPid, + compactor_pid = CompactorPid, + ref_counter = RefCounter + } = State, + + ?LOG_INFO("View index compaction complete for ~s ~s", [DbName, GroupId]), + FileName = index_file_name(RootDir, DbName, GroupSig), + CompactName = index_file_name(compact, RootDir, DbName, GroupSig), + ok = couch_file:delete(RootDir, FileName), + ok = file:rename(CompactName, FileName), + + %% if an updater is running, kill it and start a new one + NewUpdaterPid = + if is_pid(UpdaterPid) -> + unlink(UpdaterPid), + exit(UpdaterPid, view_compaction_complete), + Owner = self(), + spawn_link(fun()-> couch_view_updater:update(Owner, NewGroup, DbName) end); + true -> + nil + end, + + %% cleanup old group + unlink(CompactorPid), + receive {'EXIT', CompactorPid, normal} -> ok after 0 -> ok end, + unlink(OldFd), + erlang:demonitor(RefCounter), + + self() ! delayed_commit, + {reply, ok, State#group_state{ + group=NewGroup, + ref_counter=erlang:monitor(process,NewFd), + compactor_pid=nil, + updater_pid=NewUpdaterPid + }}; +handle_call({compact_done, NewGroup}, _From, State) -> + #group_state{ + group = #group{name = GroupId, current_seq = CurrentSeq}, + init_args={_RootDir, DbName, _} + } = State, + ?LOG_INFO("View index compaction still behind for ~s ~s -- current: ~p " ++ + "compact: ~p", [DbName, GroupId, CurrentSeq, NewGroup#group.current_seq]), + {reply, update, State}. + +handle_cast({partial_update, Pid, NewGroup}, #group_state{updater_pid=Pid} + = State) -> + #group_state{ + db_name = DbName, + waiting_commit = WaitingCommit + } = State, + NewSeq = NewGroup#group.current_seq, + ?LOG_DEBUG("checkpointing view update at seq ~p for ~s ~s", [NewSeq, + DbName, NewGroup#group.name]), + if not WaitingCommit -> + erlang:send_after(1000, self(), delayed_commit); + true -> ok + end, + {noreply, State#group_state{group=NewGroup, waiting_commit=true}}; +handle_cast({partial_update, _, _}, State) -> + %% message from an old (probably pre-compaction) updater; ignore + {noreply, State}. + +handle_info(delayed_commit, #group_state{db_name=DbName,group=Group}=State) -> + {ok, Db} = couch_db:open_int(DbName, []), + CommittedSeq = couch_db:get_committed_update_seq(Db), + couch_db:close(Db), + if CommittedSeq >= Group#group.current_seq -> + % save the header + Header = {Group#group.sig, get_index_header_data(Group)}, + ok = couch_file:write_header(Group#group.fd, Header), + {noreply, State#group_state{waiting_commit=false}}; + true -> + % We can't commit the header because the database seq that's fully + % committed to disk is still behind us. If we committed now and the + % database lost those changes our view could be forever out of sync + % with the database. But a crash before we commit these changes, no big + % deal, we only lose incremental changes since last committal. + erlang:send_after(1000, self(), delayed_commit), + {noreply, State#group_state{waiting_commit=true}} + end; + +handle_info({'EXIT', FromPid, {new_group, Group}}, + #group_state{db_name=DbName, + updater_pid=UpPid, + ref_counter=RefCounter, + waiting_list=WaitList, + waiting_commit=WaitingCommit}=State) when UpPid == FromPid -> + if not WaitingCommit -> + erlang:send_after(1000, self(), delayed_commit); + true -> ok + end, + case reply_with_group(Group, WaitList, [], RefCounter) of + [] -> + {noreply, State#group_state{waiting_commit=true, waiting_list=[], + group=Group, updater_pid=nil}}; + StillWaiting -> + % we still have some waiters, reopen the database and reupdate the index + Owner = self(), + Pid = spawn_link(fun() -> couch_view_updater:update(Owner, Group, DbName) end), + {noreply, State#group_state{waiting_commit=true, + waiting_list=StillWaiting, updater_pid=Pid}} + end; +handle_info({'EXIT', _, {new_group, _}}, State) -> + %% message from an old (probably pre-compaction) updater; ignore + {noreply, State}; + +handle_info({'EXIT', UpPid, reset}, + #group_state{init_args=InitArgs, updater_pid=UpPid} = State) -> + case prepare_group(InitArgs, true) of + {ok, Db, ResetGroup} -> + Owner = self(), + couch_db:close(Db), + Pid = spawn_link(fun() -> + couch_view_updater:update(Owner, ResetGroup, Db#db.name) + end), + {noreply, State#group_state{ + updater_pid=Pid, + group=ResetGroup}}; + Error -> + {stop, normal, reply_all(State, Error)} + end; +handle_info({'EXIT', _, reset}, State) -> + %% message from an old (probably pre-compaction) updater; ignore + {noreply, State}; + +handle_info({'EXIT', _FromPid, normal}, State) -> + {noreply, State}; + +handle_info({'EXIT', FromPid, {{nocatch, Reason}, _Trace}}, State) -> + ?LOG_DEBUG("Uncaught throw() in linked pid: ~p", [{FromPid, Reason}]), + {stop, Reason, State}; + +handle_info({'EXIT', FromPid, Reason}, State) -> + ?LOG_DEBUG("Exit from linked pid: ~p", [{FromPid, Reason}]), + {stop, Reason, State}; + +handle_info({'DOWN',_,_,Pid,Reason}, #group_state{group=G}=State) -> + ?LOG_INFO("Shutting down group server ~p, db ~p closing w/ reason~n~p", + [G#group.name, Pid, Reason]), + {stop, normal, reply_all(State, shutdown)}. + + +terminate(Reason, #group_state{updater_pid=Update, compactor_pid=Compact}=S) -> + reply_all(S, Reason), + couch_util:shutdown_sync(Update), + couch_util:shutdown_sync(Compact), + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%% Local Functions + +% reply_with_group/3 +% for each item in the WaitingList {Pid, Seq} +% if the Seq is =< GroupSeq, reply +reply_with_group(Group=#group{current_seq=GroupSeq}, [{Pid, Seq}|WaitList], + StillWaiting, RefCounter) when Seq =< GroupSeq -> + gen_server:reply(Pid, {ok, Group, RefCounter}), + reply_with_group(Group, WaitList, StillWaiting, RefCounter); + +% else +% put it in the continuing waiting list +reply_with_group(Group, [{Pid, Seq}|WaitList], StillWaiting, RefCounter) -> + reply_with_group(Group, WaitList, [{Pid, Seq}|StillWaiting], RefCounter); + +% return the still waiting list +reply_with_group(_Group, [], StillWaiting, _RefCounter) -> + StillWaiting. + +reply_all(#group_state{waiting_list=WaitList}=State, Reply) -> + [catch gen_server:reply(Pid, Reply) || {Pid, _} <- WaitList], + State#group_state{waiting_list=[]}. + +prepare_group({RootDir, DbName, #group{sig=Sig}=Group}, ForceReset)-> + case couch_db:open_int(DbName, []) of + {ok, Db} -> + case open_index_file(RootDir, DbName, Sig) of + {ok, Fd} -> + if ForceReset -> + % this can happen if we missed a purge + {ok, Db, reset_file(Db, Fd, DbName, Group)}; + true -> + % 09 UPGRADE CODE + ok = couch_file:upgrade_old_header(Fd, <<$r, $c, $k, 0>>), + case (catch couch_file:read_header(Fd)) of + {ok, {Sig, HeaderInfo}} -> + % sigs match! + {ok, Db, init_group(Db, Fd, Group, HeaderInfo)}; + _ -> + % this happens on a new file + {ok, Db, reset_file(Db, Fd, DbName, Group)} + end + end; + Error -> + catch delete_index_file(RootDir, DbName, Sig), + Error + end; + Else -> + Else + end. + +get_index_header_data(#group{current_seq=Seq, purge_seq=PurgeSeq, + id_btree=IdBtree,views=Views}) -> + ViewStates = [ + {couch_btree:get_state(V#view.btree), V#view.update_seq, V#view.purge_seq} || V <- Views + ], + #index_header{ + seq=Seq, + purge_seq=PurgeSeq, + id_btree_state=couch_btree:get_state(IdBtree), + view_states=ViewStates + }. + +hex_sig(GroupSig) -> + couch_util:to_hex(?b2l(GroupSig)). + +design_root(RootDir, DbName) -> + RootDir ++ "/." ++ ?b2l(DbName) ++ "_design/". + +index_file_name(RootDir, DbName, GroupSig) -> + design_root(RootDir, DbName) ++ hex_sig(GroupSig) ++".view". + +index_file_name(compact, RootDir, DbName, GroupSig) -> + design_root(RootDir, DbName) ++ hex_sig(GroupSig) ++".compact.view". + + +open_index_file(RootDir, DbName, GroupSig) -> + FileName = index_file_name(RootDir, DbName, GroupSig), + case couch_file:open(FileName) of + {ok, Fd} -> {ok, Fd}; + {error, enoent} -> couch_file:open(FileName, [create]); + Error -> Error + end. + +open_index_file(compact, RootDir, DbName, GroupSig) -> + FileName = index_file_name(compact, RootDir, DbName, GroupSig), + case couch_file:open(FileName) of + {ok, Fd} -> {ok, Fd}; + {error, enoent} -> couch_file:open(FileName, [create]); + Error -> Error + end. + +open_temp_group(DbName, Language, DesignOptions, MapSrc, RedSrc) -> + case couch_db:open_int(DbName, []) of + {ok, Db} -> + View = #view{map_names=[<<"_temp">>], + id_num=0, + btree=nil, + def=MapSrc, + reduce_funs= if RedSrc==[] -> []; true -> [{<<"_temp">>, RedSrc}] end, + options=DesignOptions}, + couch_db:close(Db), + {ok, set_view_sig(#group{name = <<"_temp">>,lib={[]}, views=[View], + def_lang=Language, design_options=DesignOptions})}; + Error -> + Error + end. + +set_view_sig(#group{ + views=Views, + lib={[]}, + def_lang=Language, + design_options=DesignOptions}=G) -> + ViewInfo = [old_view_format(V) || V <- Views], + G#group{sig=couch_util:md5(term_to_binary({ViewInfo, Language, DesignOptions}))}; +set_view_sig(#group{ + views=Views, + lib=Lib, + def_lang=Language, + design_options=DesignOptions}=G) -> + ViewInfo = [old_view_format(V) || V <- Views], + G#group{sig=couch_util:md5(term_to_binary({ViewInfo, Language, DesignOptions, sort_lib(Lib)}))}. + +% Use the old view record format so group sig's don't change +old_view_format(View) -> + { + view, + View#view.id_num, + View#view.map_names, + View#view.def, + View#view.btree, + View#view.reduce_funs, + View#view.options + }. + +sort_lib({Lib}) -> + sort_lib(Lib, []). +sort_lib([], LAcc) -> + lists:keysort(1, LAcc); +sort_lib([{LName, {LObj}}|Rest], LAcc) -> + LSorted = sort_lib(LObj, []), % descend into nested object + sort_lib(Rest, [{LName, LSorted}|LAcc]); +sort_lib([{LName, LCode}|Rest], LAcc) -> + sort_lib(Rest, [{LName, LCode}|LAcc]). + +open_db_group(DbName, GroupId) -> + {Pid, Ref} = spawn_monitor(fun() -> + exit(try + fabric:open_doc(mem3:dbname(DbName), GroupId, []) + catch error:database_does_not_exist -> + {ok, Db} = couch_db:open(DbName, []), + couch_db:open_doc(Db, GroupId) + end) + end), + receive {'DOWN', Ref, process, Pid, {ok, Doc}} -> + {ok, design_doc_to_view_group(Doc)}; + {'DOWN', Ref, process, Pid, Error} -> + Error + end. + +get_group_info(State) -> + #group_state{ + group=Group, + updater_pid=UpdaterPid, + compactor_pid=CompactorPid, + waiting_commit=WaitingCommit, + waiting_list=WaitersList + } = State, + #group{ + fd = Fd, + sig = GroupSig, + def_lang = Lang, + views = Views, + current_seq=CurrentSeq, + purge_seq=PurgeSeq + } = Group, + {ok, Size} = couch_file:bytes(Fd), + [ + {signature, ?l2b(hex_sig(GroupSig))}, + {language, Lang}, + {disk_size, Size}, + {data_size, compute_data_size(Views)}, + {updater_running, UpdaterPid /= nil}, + {compact_running, CompactorPid /= nil}, + {waiting_commit, WaitingCommit}, + {waiting_clients, length(WaitersList)}, + {update_seq, CurrentSeq}, + {purge_seq, PurgeSeq} + ]. + +compute_data_size(ViewList) -> + lists:foldl(fun(#view{btree=Btree}, Acc) -> + {ok, {_, _, Size}} = couch_btree:full_reduce(Btree), + Size + Acc + end, 0, ViewList). + + +% maybe move to another module +design_doc_to_view_group(#doc{id=Id,body={Fields}}) -> + Language = couch_util:get_value(<<"language">>, Fields, <<"javascript">>), + {DesignOptions} = couch_util:get_value(<<"options">>, Fields, {[]}), + {RawViews} = couch_util:get_value(<<"views">>, Fields, {[]}), + Lib = couch_util:get_value(<<"lib">>, RawViews, {[]}), + % add the views to a dictionary object, with the map source as the key + DictBySrc = + lists:foldl( + fun({Name, {MRFuns}}, DictBySrcAcc) -> + case couch_util:get_value(<<"map">>, MRFuns) of + undefined -> DictBySrcAcc; + MapSrc -> + RedSrc = couch_util:get_value(<<"reduce">>, MRFuns, null), + {ViewOptions} = couch_util:get_value(<<"options">>, MRFuns, {[]}), + View = + case dict:find({MapSrc, ViewOptions}, DictBySrcAcc) of + {ok, View0} -> View0; + error -> #view{def=MapSrc, options=ViewOptions} % create new view object + end, + View2 = + if RedSrc == null -> + View#view{map_names=[Name|View#view.map_names]}; + true -> + View#view{reduce_funs=[{Name,RedSrc}|View#view.reduce_funs]} + end, + dict:store({MapSrc, ViewOptions}, View2, DictBySrcAcc) + end + end, dict:new(), RawViews), + % number the views + {Views, _N} = lists:mapfoldl( + fun({_Src, View}, N) -> + {View#view{id_num=N},N+1} + end, 0, lists:sort(dict:to_list(DictBySrc))), + set_view_sig(#group{name=Id, lib=Lib, views=Views, def_lang=Language, design_options=DesignOptions}). + +reset_group(#group{views=Views}=Group) -> + Views2 = [View#view{btree=nil} || View <- Views], + Group#group{fd=nil,query_server=nil,current_seq=0, + id_btree=nil,views=Views2}. + +reset_file(Db, Fd, DbName, #group{sig=Sig,name=Name} = Group) -> + ?LOG_DEBUG("Resetting group index \"~s\" in db ~s", [Name, DbName]), + ok = couch_file:truncate(Fd, 0), + ok = couch_file:write_header(Fd, {Sig, nil}), + init_group(Db, Fd, reset_group(Group), nil). + +delete_index_file(RootDir, DbName, GroupSig) -> + couch_file:delete(RootDir, index_file_name(RootDir, DbName, GroupSig)). + +init_group(Db, Fd, #group{views=Views}=Group, nil) -> + init_group(Db, Fd, Group, + #index_header{seq=0, purge_seq=couch_db:get_purge_seq(Db), + id_btree_state=nil, view_states=[{nil, 0, 0} || _ <- Views]}); +init_group(_Db, Fd, #group{def_lang=Lang,views=Views}= + Group, IndexHeader) -> + #index_header{seq=Seq, purge_seq=PurgeSeq, + id_btree_state=IdBtreeState, view_states=ViewStates} = IndexHeader, + StateUpdate = fun + ({_, _, _}=State) -> State; + (State) -> {State, 0, 0} + end, + ViewStates2 = lists:map(StateUpdate, ViewStates), + {ok, IdBtree} = couch_btree:open(IdBtreeState, Fd), + Views2 = lists:zipwith( + fun({BTState, USeq, PSeq}, #view{reduce_funs=RedFuns,options=Options}=View) -> + FunSrcs = [FunSrc || {_Name, FunSrc} <- RedFuns], + ReduceFun = + fun(reduce, KVs) -> + KVs2 = couch_view:expand_dups(KVs,[]), + KVs3 = couch_view:detuple_kvs(KVs2,[]), + {ok, Reduced} = couch_query_servers:reduce(Lang, FunSrcs, + KVs3), + {length(KVs3), Reduced, couch_view:data_size(KVs3, Reduced)}; + (rereduce, Reds) -> + Count = lists:sum(extract(Reds, counts)), + DataSize = lists:sum(extract(Reds, data_size)), + UserReds = extract(Reds, user_reds), + {ok, Reduced} = couch_query_servers:rereduce(Lang, FunSrcs, + UserReds), + {Count, Reduced, DataSize} + end, + + case couch_util:get_value(<<"collation">>, Options, <<"default">>) of + <<"default">> -> + Less = fun couch_view:less_json_ids/2; + <<"raw">> -> + Less = fun(A,B) -> A < B end + end, + {ok, Btree} = couch_btree:open(BTState, Fd, + [{less, Less}, {reduce, ReduceFun}] + ), + View#view{btree=Btree, update_seq=USeq, purge_seq=PSeq} + end, + ViewStates2, Views), + Group#group{fd=Fd, current_seq=Seq, purge_seq=PurgeSeq, id_btree=IdBtree, + views=Views2}. + +extract(Reds, counts) -> + [element(1, R) || R <- Reds]; +extract(Reds, user_reds) -> + [element(2, R) || R <- Reds]; +extract(Reds, data_size) -> + lists:map(fun({_, _}) -> 0; ({_, _, Size}) -> Size end, Reds). |