From 042de2f5aeea9fb5be6768df934d61ba26985d5c Mon Sep 17 00:00:00 2001 From: "Damien F. Katz" Date: Tue, 1 Apr 2008 20:32:15 +0000 Subject: Fix for runaway process in the view code and the so far untested storage compaction code. git-svn-id: https://svn.apache.org/repos/asf/incubator/couchdb/trunk@643556 13f79535-47bb-0310-9956-ffa450edef68 --- src/couchdb/couch_btree.erl | 9 +- src/couchdb/couch_db.erl | 259 ++++++++++++++++++++++++++++++++----------- src/couchdb/couch_stream.erl | 2 +- src/couchdb/couch_util.erl | 20 +++- src/couchdb/couch_view.erl | 44 ++++---- 5 files changed, 241 insertions(+), 93 deletions(-) diff --git a/src/couchdb/couch_btree.erl b/src/couchdb/couch_btree.erl index 2ae837dd..62d42c09 100644 --- a/src/couchdb/couch_btree.erl +++ b/src/couchdb/couch_btree.erl @@ -12,7 +12,7 @@ -module(couch_btree). --export([open/2, open/3, query_modify/4, add_remove/3, foldl/3, foldl/4]). +-export([open/2, open/3, query_modify/4, add/2, add_remove/3, foldl/3, foldl/4]). -export([foldr/3, foldr/4, fold/4, fold/5, row_count/1]). -export([lookup/2, get_state/1, test/1, test/0]). @@ -85,9 +85,12 @@ fold(Bt, Key, Dir, Fun, Acc) -> {_ContinueFlag, Acc2} = stream_node(Bt, 0, Bt#btree.root, Key, Dir, convert_fun_arity(Fun), Acc), {ok, Acc2}. +add(Bt, InsertKeyValues) -> + add_remove(Bt, InsertKeyValues, []). + add_remove(Bt, InsertKeyValues, RemoveKeys) -> - {Result, [], Bt2} = query_modify(Bt, [], InsertKeyValues, RemoveKeys), - {Result, Bt2}. + {ok, [], Bt2} = query_modify(Bt, [], InsertKeyValues, RemoveKeys), + {ok, Bt2}. query_modify(Bt, LookupKeys, InsertValues, RemoveKeys) -> #btree{root=Root} = Bt, diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl index e567d27b..51d55822 100644 --- a/src/couchdb/couch_db.erl +++ b/src/couchdb/couch_db.erl @@ -13,20 +13,21 @@ -module(couch_db). -behaviour(gen_server). --export([open/2,create/2,create/3,get_doc_info/2]). +-export([open/2,create/2,create/3,get_doc_info/2,start_compact/1]). -export([save_docs/2, save_docs/3, get_db_info/1, update_doc/3, update_docs/2, update_docs/3]). --export([delete_doc/3,open_doc/2,open_doc/3,close/1,enum_docs_since/4,enum_docs_since/5]). +-export([delete_doc/3,open_doc/2,open_doc/3,enum_docs_since/4,enum_docs_since/5]). -export([enum_docs/4,enum_docs/5, open_doc_revs/4, get_missing_revs/2]). -export([start_update_loop/1]). -export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]). +-export([start_copy_compact_int/1,continue_copy_compact_int/2]). -include("couch_db.hrl"). -record(db_header, {write_version = 0, - last_update_seq = 0, + update_seq = 0, summary_stream_state = nil, - docinfo_by_Id_btree_state = nil, + fulldocinfo_by_id_btree_state = nil, docinfo_by_seq_btree_state = nil, local_docs_btree_state = nil, doc_count=0, @@ -34,20 +35,24 @@ }). -record(db, - {main_pid, - update_pid, + {main_pid=nil, + update_pid=nil, + compactor_pid=nil, fd, header = #db_header{}, summary_stream, - docinfo_by_Id_btree, + fulldocinfo_by_id_btree, docinfo_by_seq_btree, local_docs_btree, - last_update_seq, + update_seq, doc_count, doc_del_count, name }). +% small value used in revision trees to indicate the revision isn't stored +-define(REV_MISSING, []). + start_link(DbName, Filepath, Options) -> case couch_file:open(Filepath, Options) of {ok, Fd} -> @@ -72,6 +77,9 @@ create(DbName, Filepath, Options) when is_list(Options) -> open(DbName, Filepath) -> start_link(DbName, Filepath, []). +start_compact(MainPid) -> + gen_server:cast(MainPid, start_compact). + delete_doc(MainPid, Id, Revisions) -> DeletedDocs = [#doc{id=Id, revs=[Rev], deleted=true} || Rev <- Revisions], {ok, [Result]} = update_docs(MainPid, DeletedDocs, [new_edits]), @@ -128,15 +136,22 @@ get_full_doc_info(Db, Id) -> get_full_doc_infos(MainPid, Ids) when is_pid(MainPid) -> get_full_doc_infos(get_db(MainPid), Ids); get_full_doc_infos(#db{}=Db, Ids) -> - couch_btree:lookup(Db#db.docinfo_by_Id_btree, Ids). + couch_btree:lookup(Db#db.fulldocinfo_by_id_btree, Ids). get_db_info(MainPid) when is_pid(MainPid) -> get_db_info(get_db(MainPid)); -get_db_info(#db{doc_count=Count, doc_del_count=DelCount, last_update_seq=SeqNum}) -> +get_db_info(Db) -> + #db{fd=Fd, + compactor_pid=Compactor, + doc_count=Count, + doc_del_count=DelCount, + update_seq=SeqNum} = Db, InfoList = [ {doc_count, Count}, {doc_del_count, DelCount}, - {last_update_seq, SeqNum} + {last_update_seq, SeqNum}, + {compacting, Compactor==nil}, + {size, couch_file:bytes(Fd)} ], {ok, InfoList}. @@ -315,21 +330,11 @@ enum_docs_since(MainPid, SinceSeq, InFun, Acc) -> enum_docs(MainPid, StartId, Direction, InFun, InAcc) -> Db = get_db(MainPid), - couch_btree:fold(Db#db.docinfo_by_Id_btree, StartId, Direction, InFun, InAcc). + couch_btree:fold(Db#db.fulldocinfo_by_id_btree, StartId, Direction, InFun, InAcc). enum_docs(MainPid, StartId, InFun, Ctx) -> enum_docs(MainPid, StartId, fwd, InFun, Ctx). -close(MainPid) -> - Ref = erlang:monitor(process, MainPid), - unlink(MainPid), - exit(MainPid, normal), - receive - {'DOWN', Ref, process, MainPid, _Reason} -> - ok - end. - - % server functions init({DbName, Fd, Options}) -> @@ -339,12 +344,16 @@ init({DbName, Fd, Options}) -> % create a new header and writes it to the file Header = #db_header{}, ok = couch_file:write_header(Fd, <<$g, $m, $k, 0>>, Header), - ok = couch_file:sync(Fd), - init_main(DbName, Fd, Header); + ok = couch_file:sync(Fd); false -> - {ok, Header} = couch_file:read_header(Fd, <<$g, $m, $k, 0>>), - init_main(DbName, Fd, Header) - end. + {ok, Header} = couch_file:read_header(Fd, <<$g, $m, $k, 0>>) + end, + + Db = init_db(DbName, Fd, Header), + + UpdatePid = spawn_link(couch_db, start_update_loop, [Db]), + + {ok, Db#db{update_pid=UpdatePid}}. btree_by_seq_split(DocInfo) -> #doc_info{ @@ -374,10 +383,10 @@ btree_by_name_join(Id, {Seq, Tree}) -> #full_doc_info{id=Id, update_seq=Seq, rev_tree=Tree}. -init_main(DbName, Fd, Header) -> +init_db(DbName, Fd, Header) -> {ok, SummaryStream} = couch_stream:open(Header#db_header.summary_stream_state, Fd), ok = couch_stream:set_min_buffer(SummaryStream, 10000), - {ok, IdBtree} = couch_btree:open(Header#db_header.docinfo_by_Id_btree_state, Fd, + {ok, IdBtree} = couch_btree:open(Header#db_header.fulldocinfo_by_id_btree_state, Fd, [{split, fun(V) -> btree_by_name_split(V) end}, {join, fun(K,V) -> btree_by_name_join(K,V) end}] ), {ok, SeqBtree} = couch_btree:open(Header#db_header.docinfo_by_seq_btree_state, Fd, @@ -385,26 +394,22 @@ init_main(DbName, Fd, Header) -> {join, fun(K,V) -> btree_by_seq_join(K,V) end}] ), {ok, LocalDocsBtree} = couch_btree:open(Header#db_header.local_docs_btree_state, Fd), - Db = #db{ + #db{ main_pid=self(), fd=Fd, header=Header, summary_stream = SummaryStream, - docinfo_by_Id_btree = IdBtree, + fulldocinfo_by_id_btree = IdBtree, docinfo_by_seq_btree = SeqBtree, local_docs_btree = LocalDocsBtree, - last_update_seq = Header#db_header.last_update_seq, + update_seq = Header#db_header.update_seq, doc_count = Header#db_header.doc_count, doc_del_count = Header#db_header.doc_del_count, name = DbName - }, - - UpdatePid = spawn_link(couch_db, start_update_loop, [Db]), - - {ok, Db#db{update_pid=UpdatePid}}. + }. terminate(_Reason, Db) -> - Db#db.update_pid ! close, + exit(Db#db.update_pid, kill), couch_file:close(Db#db.fd). handle_call({update_docs, DocActions, Options}, From, #db{update_pid=Updater}=Db) -> @@ -416,8 +421,17 @@ handle_call({db_updated, NewDb}, _From, _OldDb) -> {reply, ok, NewDb}. -handle_cast(foo, Main) -> - {noreply, Main}. +handle_cast(start_compact, #db{update_pid=Updater}=Db) -> + Updater ! compact, + {noreply, Db}. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +handle_info(Msg, Db) -> + couch_log:error("Bad message received for db ~s: ~p", [Db#db.name, Msg]), + exit({error, Msg}). + %%% Internal function %%% @@ -439,11 +453,40 @@ update_loop(Db) -> Error -> exit(Error) % we crashed end; - close -> - % terminate loop - exit(normal) + compact -> + case Db#db.compactor_pid of + nil -> + Pid = spawn_link(couch_db, start_copy_compact_int, [Db]), + Db2 = Db#db{compactor_pid=Pid}, + ok = gen_server:call(Db#db.main_pid, {db_updated, Db2}), + update_loop(Db2); + _ -> + update_loop(Db) % already started + end; + {compact_done, #db{update_seq=CompactSeq}=NewDb} -> + case CompactSeq == Db#db.update_seq of + true -> + NewDb2 = swap_files(Db, NewDb), + update_loop(NewDb2#db{compactor_pid=nil}); + false -> + Pid = spawn_link(couch_db, continue_copy_compact_int, [Db, NewDb]), + Db2 = Db#db{compactor_pid=Pid}, + ok = gen_server:call(Db#db.main_pid, {db_updated, Db2}), + update_loop(Db2) + end; + Else -> + couch_log:error("Unknown message received in db ~s:~p", [Db#db.name, Else]), + exit({error, Else}) end. +swap_files(#db{fd=OldFd, name=Name}=_DbOld, DbNew) -> + NormalFilename = couch_server:get_filename(Name), + true = file:rename(NormalFilename, NormalFilename ++ ".old"), + true = file:rename(NormalFilename ++ ".compact", NormalFilename), + couch_file:close(OldFd), + file:delete(NormalFilename ++ ".old"), + DbNew. + get_db(MainPid) -> {ok, Db} = gen_server:call(MainPid, get_db), Db. @@ -466,7 +509,7 @@ open_doc_revs_int(Db, Id, Revs, Options) -> FoundResults = lists:map(fun({Rev, Value, FoundRevPath}) -> case Value of - 0 -> + ?REV_MISSING -> % we have the rev in our list but know nothing about it {{not_found, missing}, Rev}; {IsDeleted, SummaryPtr} -> @@ -538,7 +581,7 @@ doc_to_tree(Doc) -> doc_to_tree(Doc, [RevId]) -> [{RevId, Doc, []}]; doc_to_tree(Doc, [RevId | Rest]) -> - [{RevId, [], doc_to_tree(Doc, Rest)}]. + [{RevId, ?REV_MISSING, doc_to_tree(Doc, Rest)}]. make_doc(Db, Id, Deleted, SummaryPointer, RevisionPath) -> {BodyData, BinValues} = @@ -613,9 +656,9 @@ new_index_entries([Id|RestIds], [RevTree|RestTrees], Seq0, DocCount, DelCount, A update_docs_int(Db, DocsList, Options) -> #db{ - docinfo_by_Id_btree = DocInfoByIdBTree, + fulldocinfo_by_id_btree = DocInfoByIdBTree, docinfo_by_seq_btree = DocInfoBySeqBTree, - last_update_seq = LastSeq, + update_seq = LastSeq, doc_count = FullDocCount, doc_del_count = FullDelCount } = Db, @@ -678,9 +721,9 @@ update_docs_int(Db, DocsList, Options) -> {ok, DocInfoByIdBTree2} = couch_btree:add_remove(DocInfoByIdBTree, InfoById, []), Db3 = Db2#db{ - docinfo_by_Id_btree = DocInfoByIdBTree2, + fulldocinfo_by_id_btree = DocInfoByIdBTree2, docinfo_by_seq_btree = DocInfoBySeqBTree2, - last_update_seq = NewSeq, + update_seq = NewSeq, doc_count = FullDocCount + NewDocsCount - OldCount, doc_del_count = FullDelCount + NewDelCount - OldDelCount }, @@ -689,7 +732,7 @@ update_docs_int(Db, DocsList, Options) -> true -> {ok, Db3}; false -> - commit_outstanding(Db3) + {ok, commit_data(Db3)} end. update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) -> @@ -697,20 +740,20 @@ update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) -> OldDocLookups = couch_btree:lookup(Btree, Ids), BtreeEntries = lists:zipwith( fun(#doc{id=Id,deleted=Delete,revs=Revs,body=Body}, OldDocLookup) -> - BasedOnRev = + NewRev = case Revs of [] -> 0; - [RevStr|_] -> list_to_integer(RevStr) - 1 + [RevStr|_] -> list_to_integer(RevStr) end, OldRev = case OldDocLookup of {ok, {_, {OldRev0, _}}} -> OldRev0; not_found -> 0 end, - case OldRev == BasedOnRev of + case OldRev + 1 == NewRev of true -> case Delete of - false -> {update, {Id, {OldRev+1, Body}}}; + false -> {update, {Id, {NewRev, Body}}}; true -> {remove, Id} end; false -> @@ -729,29 +772,111 @@ update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) -> -commit_outstanding(#db{fd=Fd, header=Header} = Db) -> +commit_data(#db{fd=Fd, header=Header} = Db) -> ok = couch_file:sync(Fd), % commit outstanding data Header2 = Header#db_header{ - last_update_seq = Db#db.last_update_seq, + update_seq = Db#db.update_seq, summary_stream_state = couch_stream:get_state(Db#db.summary_stream), docinfo_by_seq_btree_state = couch_btree:get_state(Db#db.docinfo_by_seq_btree), - docinfo_by_Id_btree_state = couch_btree:get_state(Db#db.docinfo_by_Id_btree), + fulldocinfo_by_id_btree_state = couch_btree:get_state(Db#db.fulldocinfo_by_id_btree), local_docs_btree_state = couch_btree:get_state(Db#db.local_docs_btree), doc_count = Db#db.doc_count, doc_del_count = Db#db.doc_del_count }, - ok = couch_file:write_header(Fd, <<$g, $m, $k, 0>>, Header2), - ok = couch_file:sync(Fd), % commit header to disk - Db2 = Db#db{ - header = Header2 - }, - {ok, Db2}. + if Header == Header2 -> + Db; % unchanged. nothing to do + true -> + ok = couch_file:write_header(Fd, <<$g, $m, $k, 0>>, Header2), + ok = couch_file:sync(Fd), % commit header to disk + Db#db{header = Header2} + end. +copy_raw_doc(SrcFd, SrcSp, DestFd, DestStream) -> + {ok, {BodyData, BinInfos}} = couch_stream:read_term(SrcFd, SrcSp), + % copy the bin values + NewBinInfos = lists:map(fun({Name, {Type, BinSp, Len}}) -> + {ok, NewBinSp} = couch_stream:copy_stream(SrcFd, BinSp, Len, DestFd), + {Name, {Type, NewBinSp, Len}} + end, BinInfos), + % now write the document summary + {ok, _SummaryPointer} = couch_stream:write_term(DestStream, {BodyData, NewBinInfos}). + +copy_rev_tree(_SrcFd, _DestFd, _DestStream, []) -> + []; +copy_rev_tree(SrcFd, DestFd, DestStream, [{RevId, {IsDel, Sp}, []} | RestTrees]) -> + % This is a leaf node, copy it over + NewSp = copy_raw_doc(SrcFd, Sp, DestFd, DestStream), + [{RevId, {IsDel, NewSp}, []} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTrees)]; +copy_rev_tree(SrcFd, DestFd, DestStream, [{RevId, _, SubTrees} | RestTrees]) -> + % inner node, only copy info/data from leaf nodes + [{RevId, ?REV_MISSING, copy_rev_tree(SrcFd, DestFd, DestStream, SubTrees)} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTrees)]. + +copy_docs(#db{fd=SrcFd}=Db, #db{fd=DestFd,summary_stream=DestStream}=NewDb, InfoBySeq) -> + Ids = [Id || #doc_info{id=Id} <- InfoBySeq], + LookupResults = couch_btree:lookup(Db#db.fulldocinfo_by_id_btree, Ids), + NewFullDocInfos = lists:map( + fun({ok, #full_doc_info{rev_tree=RevTree}=Info}) -> + Info#full_doc_info{rev_tree=copy_rev_tree(SrcFd, DestFd, DestStream, RevTree)} + end, LookupResults), + NewDocInfos = [couch_doc:to_doc_info(FullDocInfo) || FullDocInfo <- NewFullDocInfos], + {ok, DocInfoBTree} = + couch_btree:add_remove(NewDb#db.docinfo_by_seq_btree, NewDocInfos, []), + {ok, FullDocInfoBTree} = + couch_btree:add_remove(NewDb#db.fulldocinfo_by_id_btree, NewFullDocInfos, []), + NewDb#db{fulldocinfo_by_id_btree=FullDocInfoBTree, docinfo_by_seq_btree=DocInfoBTree}. + + + +copy_compact_docs(Db, NewDb) -> + EnumBySeqFun = + fun(#doc_info{update_seq=Seq}=DocInfo, _Offset, {AccNewDb, AccUncopied}) -> + case couch_util:should_flush() of + true -> + NewDb2 = copy_docs(Db, AccNewDb, lists:reverse(AccUncopied, DocInfo)), + {ok, {commit_data(NewDb2#db{update_seq=Seq}), []}}; + false -> + {ok, {AccNewDb, [DocInfo | AccUncopied]}} + end + end, + {ok, {NewDb2, Uncopied}} = + couch_btree:foldl(Db#db.docinfo_by_seq_btree, NewDb#db.update_seq + 1, EnumBySeqFun, {NewDb, []}), -code_change(_OldVsn, State, _Extra) -> - {ok, State}. + case Uncopied of + [#doc_info{update_seq=LastSeq} | _] -> + commit_data( copy_docs(Db, NewDb2#db{update_seq=LastSeq}, + lists:reverse(Uncopied))); + [] -> + NewDb2 + end. -handle_info(_Info, State) -> - {noreply, State}. +start_copy_compact_int(#db{name=Name}=Db) -> + couch_log:debug("New compaction process spawned for db \"%s\"", [Name]), + Filename = couch_server:get_compaction_filename(Name), + case couch_file:open(Filename) of + {ok, Fd} -> + couch_log:debug("Found existing compaction file for db \"%s\"", [Name]), + {ok, Header} = couch_file:read_header(Fd, <<$g, $m, $k, 0>>); + {error, enoent} -> % + {ok, Fd} = couch_file:open(Filename, [create]), + Header = #db_header{}, + ok = couch_file:write_header(Fd, <<$g, $m, $k, 0>>, Header), + ok = couch_file:sync(Fd) + end, + NewDb = init_db(Name, Fd, Header), + NewDb2 = copy_compact_docs(Db, NewDb), + + % suck up all the local docs into memory and write them to the new db + {ok, LocalDocs} = couch_btree:foldl(Db#db.local_docs_btree, + fun(Value, _Offset, Acc) -> {ok, [Value | Acc]} end, []), + {ok, NewLocalBtree} = couch_btree:add(NewDb2#db.local_docs_btree, LocalDocs), + NewDb3 = commit_data(NewDb2#db{local_docs_btree=NewLocalBtree}), + + NewDb3#db.update_pid ! {compact_done, NewDb3}. + +continue_copy_compact_int(#db{name=Name}=Db, NewDb) -> + couch_log:debug("Continued compaction process spawned for db \"%s\"", [Name]), + NewDb2 = copy_compact_docs(Db, NewDb), + NewDb2#db.update_pid ! {compact_done, NewDb2}. + \ No newline at end of file diff --git a/src/couchdb/couch_stream.erl b/src/couchdb/couch_stream.erl index d5157b4d..ae1b4f2c 100644 --- a/src/couchdb/couch_stream.erl +++ b/src/couchdb/couch_stream.erl @@ -83,7 +83,7 @@ copy(#stream{pid = _Pid, fd = Fd}, Sp, Num, DestStream) -> copy(Fd, Sp, Num, DestStream) -> {ok, NewSp, _Sp2} = stream_data(Fd, Sp, Num, ?HUGE_CHUNK, fun(Bin, AccPointer) -> - {ok, NewPointer} = write(Bin, DestStream), + {ok, NewPointer} = write(DestStream, Bin), if AccPointer == null -> NewPointer; true -> AccPointer end end, null), diff --git a/src/couchdb/couch_util.erl b/src/couchdb/couch_util.erl index 42845fe0..f85cc834 100644 --- a/src/couchdb/couch_util.erl +++ b/src/couchdb/couch_util.erl @@ -14,7 +14,7 @@ -behaviour(gen_server). -export([start_link/0,start_link/1]). --export([parse_ini/1]). +-export([parse_ini/1,should_flush/0, should_flush/1]). -export([new_uuid/0, rand32/0, implode/2, collate/2, collate/3]). -export([abs_pathname/1,abs_pathname/2, trim/1, ascii_lower/1, test/0]). -export([encodeBase64/1, decodeBase64/1]). @@ -22,6 +22,8 @@ -export([init/1, terminate/2, handle_call/3]). -export([handle_cast/2,code_change/3,handle_info/2]). +% arbitrarily chosen amount of memory to use before flushing to disk +-define(FLUSH_MAX_MEM, 10000000). start_link() -> start_link(""). @@ -246,6 +248,22 @@ collate(A, B, Options) when is_list(A), is_list(B) -> [2] -> 0 end. +should_flush() -> + should_flush(?FLUSH_MAX_MEM). + +should_flush(MemThreshHold) -> + case process_info(self(), memory) of + {memory, Mem} when Mem > 2*MemThreshHold -> + garbage_collect(), + case process_info(self(), memory) of + {memory, Mem} when Mem > MemThreshHold -> + true; + _ -> + false + end; + _ -> + false + end. diff --git a/src/couchdb/couch_view.erl b/src/couchdb/couch_view.erl index 612eb5fd..97228530 100644 --- a/src/couchdb/couch_view.erl +++ b/src/couchdb/couch_view.erl @@ -20,9 +20,6 @@ -include("couch_db.hrl"). -% arbitrarily chosen amount of memory to use before flushing to disk --define(FLUSH_MAX_MEM, 10000000). - -record(group, {db, fd, @@ -68,12 +65,11 @@ get_updated_group(Pid) -> receive {Pid, Response} -> erlang:demonitor(Mref), - receive - {'DOWN', Mref, _, _, _} -> - Response - after 0 -> - Response - end; + receive + {'DOWN', Mref, _, _, _} -> ok + after 0 -> ok + end, + Response; {'DOWN', Mref, _, _, Reason} -> throw(Reason) end @@ -201,7 +197,10 @@ handle_info({'EXIT', FromPid, Reason}, #server{root_dir=RootDir}=Server) -> [{_, {DbName, GroupId}}] -> delete_from_ets(FromPid, DbName, GroupId) end, - {noreply, Server}. + {noreply, Server}; +handle_info(Msg, _Server) -> + couch_log:error("Bad message received for view module: ~p", [Msg]), + exit({error, Msg}). add_to_ets(Pid, DbName, GroupId) -> true = ets:insert(couch_views_by_updater, {Pid, {DbName, GroupId}}), @@ -216,11 +215,6 @@ delete_from_ets(Pid, DbName, GroupId) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -start_update_loop(RootDir, DbName, GroupId) -> - % wait for a notify request before doing anything. This way, we can just - % exit and any exits will be noticed by the callers. - start_update_loop(RootDir, DbName, GroupId, get_notify_pids(1000)). - start_temp_update_loop(DbName, Fd, Lang, Query) -> NotifyPids = get_notify_pids(1000), @@ -243,7 +237,12 @@ temp_update_loop(Group, NotifyPids) -> {ok, Group2} = update_group(Group), [Pid ! {self(), {ok, Group2}} || Pid <- NotifyPids], garbage_collect(), - temp_update_loop(Group2, get_notify_pids(100000)). + temp_update_loop(Group2, get_notify_pids(10000)). + +start_update_loop(RootDir, DbName, GroupId) -> + % wait for a notify request before doing anything. This way, we can just + % exit and any exits will be noticed by the callers. + start_update_loop(RootDir, DbName, GroupId, get_notify_pids(1000)). start_update_loop(RootDir, DbName, GroupId, NotifyPids) -> {Db, DefLang, Defs} = @@ -284,13 +283,16 @@ update_loop(#group{fd=Fd}=Group, NotifyPids) -> update_loop(Group2). update_loop(Group) -> - update_loop(Group, get_notify_pids()). + update_loop(Group, get_notify_pids(100000)). % wait for the first request to come in. get_notify_pids(Wait) -> receive {Pid, get_updated} -> - [Pid | get_notify_pids()] + [Pid | get_notify_pids()]; + Else -> + couch_log:error("Unexpected message in view updater: ~p", [Else]), + exit({error, Else}) after Wait -> exit(wait_timeout) end. @@ -526,15 +528,15 @@ process_doc(Db, DocInfo, {Docs, #group{name=GroupId}=Group, ViewKVs, DocIdViewId {ok, Doc} = couch_db:open_doc(Db, DocInfo, [conflicts, deleted_conflicts]), {[Doc | Docs], DocIdViewIdKeys} end, - case process_info(self(), memory) of - {memory, Mem} when Mem > ?FLUSH_MAX_MEM -> + case couch_util:should_flush() of + true -> {Group1, Results} = view_compute(Group, Docs2), {ViewKVs3, DocIdViewIdKeys3} = view_insert_query_results(Docs2, Results, ViewKVs, DocIdViewIdKeys2), {ok, Group2} = write_changes(Group1, ViewKVs3, DocIdViewIdKeys3, Seq), garbage_collect(), ViewEmptyKeyValues = [{View, []} || View <- Group2#group.views], {ok, {[], Group2, ViewEmptyKeyValues, [], Seq}}; - _Else -> + false -> {ok, {Docs2, Group, ViewKVs, DocIdViewIdKeys2, Seq}} end end. -- cgit v1.2.3