summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDamien F. Katz <damien@apache.org>2008-04-01 20:32:15 +0000
committerDamien F. Katz <damien@apache.org>2008-04-01 20:32:15 +0000
commit042de2f5aeea9fb5be6768df934d61ba26985d5c (patch)
tree9471eb95045593332cf0df9ce5abefb190921781
parent504c93c4534f07affc2c933bd4d5d7f6075ea013 (diff)
Fix for runaway process in the view code and the so far untested storage compaction code.
git-svn-id: https://svn.apache.org/repos/asf/incubator/couchdb/trunk@643556 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--src/couchdb/couch_btree.erl9
-rw-r--r--src/couchdb/couch_db.erl259
-rw-r--r--src/couchdb/couch_stream.erl2
-rw-r--r--src/couchdb/couch_util.erl20
-rw-r--r--src/couchdb/couch_view.erl44
5 files changed, 241 insertions, 93 deletions
diff --git a/src/couchdb/couch_btree.erl b/src/couchdb/couch_btree.erl
index 2ae837dd..62d42c09 100644
--- a/src/couchdb/couch_btree.erl
+++ b/src/couchdb/couch_btree.erl
@@ -12,7 +12,7 @@
-module(couch_btree).
--export([open/2, open/3, query_modify/4, add_remove/3, foldl/3, foldl/4]).
+-export([open/2, open/3, query_modify/4, add/2, add_remove/3, foldl/3, foldl/4]).
-export([foldr/3, foldr/4, fold/4, fold/5, row_count/1]).
-export([lookup/2, get_state/1, test/1, test/0]).
@@ -85,9 +85,12 @@ fold(Bt, Key, Dir, Fun, Acc) ->
{_ContinueFlag, Acc2} = stream_node(Bt, 0, Bt#btree.root, Key, Dir, convert_fun_arity(Fun), Acc),
{ok, Acc2}.
+add(Bt, InsertKeyValues) ->
+ add_remove(Bt, InsertKeyValues, []).
+
add_remove(Bt, InsertKeyValues, RemoveKeys) ->
- {Result, [], Bt2} = query_modify(Bt, [], InsertKeyValues, RemoveKeys),
- {Result, Bt2}.
+ {ok, [], Bt2} = query_modify(Bt, [], InsertKeyValues, RemoveKeys),
+ {ok, Bt2}.
query_modify(Bt, LookupKeys, InsertValues, RemoveKeys) ->
#btree{root=Root} = Bt,
diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl
index e567d27b..51d55822 100644
--- a/src/couchdb/couch_db.erl
+++ b/src/couchdb/couch_db.erl
@@ -13,20 +13,21 @@
-module(couch_db).
-behaviour(gen_server).
--export([open/2,create/2,create/3,get_doc_info/2]).
+-export([open/2,create/2,create/3,get_doc_info/2,start_compact/1]).
-export([save_docs/2, save_docs/3, get_db_info/1, update_doc/3, update_docs/2, update_docs/3]).
--export([delete_doc/3,open_doc/2,open_doc/3,close/1,enum_docs_since/4,enum_docs_since/5]).
+-export([delete_doc/3,open_doc/2,open_doc/3,enum_docs_since/4,enum_docs_since/5]).
-export([enum_docs/4,enum_docs/5, open_doc_revs/4, get_missing_revs/2]).
-export([start_update_loop/1]).
-export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]).
+-export([start_copy_compact_int/1,continue_copy_compact_int/2]).
-include("couch_db.hrl").
-record(db_header,
{write_version = 0,
- last_update_seq = 0,
+ update_seq = 0,
summary_stream_state = nil,
- docinfo_by_Id_btree_state = nil,
+ fulldocinfo_by_id_btree_state = nil,
docinfo_by_seq_btree_state = nil,
local_docs_btree_state = nil,
doc_count=0,
@@ -34,20 +35,24 @@
}).
-record(db,
- {main_pid,
- update_pid,
+ {main_pid=nil,
+ update_pid=nil,
+ compactor_pid=nil,
fd,
header = #db_header{},
summary_stream,
- docinfo_by_Id_btree,
+ fulldocinfo_by_id_btree,
docinfo_by_seq_btree,
local_docs_btree,
- last_update_seq,
+ update_seq,
doc_count,
doc_del_count,
name
}).
+% small value used in revision trees to indicate the revision isn't stored
+-define(REV_MISSING, []).
+
start_link(DbName, Filepath, Options) ->
case couch_file:open(Filepath, Options) of
{ok, Fd} ->
@@ -72,6 +77,9 @@ create(DbName, Filepath, Options) when is_list(Options) ->
open(DbName, Filepath) ->
start_link(DbName, Filepath, []).
+start_compact(MainPid) ->
+ gen_server:cast(MainPid, start_compact).
+
delete_doc(MainPid, Id, Revisions) ->
DeletedDocs = [#doc{id=Id, revs=[Rev], deleted=true} || Rev <- Revisions],
{ok, [Result]} = update_docs(MainPid, DeletedDocs, [new_edits]),
@@ -128,15 +136,22 @@ get_full_doc_info(Db, Id) ->
get_full_doc_infos(MainPid, Ids) when is_pid(MainPid) ->
get_full_doc_infos(get_db(MainPid), Ids);
get_full_doc_infos(#db{}=Db, Ids) ->
- couch_btree:lookup(Db#db.docinfo_by_Id_btree, Ids).
+ couch_btree:lookup(Db#db.fulldocinfo_by_id_btree, Ids).
get_db_info(MainPid) when is_pid(MainPid) ->
get_db_info(get_db(MainPid));
-get_db_info(#db{doc_count=Count, doc_del_count=DelCount, last_update_seq=SeqNum}) ->
+get_db_info(Db) ->
+ #db{fd=Fd,
+ compactor_pid=Compactor,
+ doc_count=Count,
+ doc_del_count=DelCount,
+ update_seq=SeqNum} = Db,
InfoList = [
{doc_count, Count},
{doc_del_count, DelCount},
- {last_update_seq, SeqNum}
+ {last_update_seq, SeqNum},
+ {compacting, Compactor==nil},
+ {size, couch_file:bytes(Fd)}
],
{ok, InfoList}.
@@ -315,21 +330,11 @@ enum_docs_since(MainPid, SinceSeq, InFun, Acc) ->
enum_docs(MainPid, StartId, Direction, InFun, InAcc) ->
Db = get_db(MainPid),
- couch_btree:fold(Db#db.docinfo_by_Id_btree, StartId, Direction, InFun, InAcc).
+ couch_btree:fold(Db#db.fulldocinfo_by_id_btree, StartId, Direction, InFun, InAcc).
enum_docs(MainPid, StartId, InFun, Ctx) ->
enum_docs(MainPid, StartId, fwd, InFun, Ctx).
-close(MainPid) ->
- Ref = erlang:monitor(process, MainPid),
- unlink(MainPid),
- exit(MainPid, normal),
- receive
- {'DOWN', Ref, process, MainPid, _Reason} ->
- ok
- end.
-
-
% server functions
init({DbName, Fd, Options}) ->
@@ -339,12 +344,16 @@ init({DbName, Fd, Options}) ->
% create a new header and writes it to the file
Header = #db_header{},
ok = couch_file:write_header(Fd, <<$g, $m, $k, 0>>, Header),
- ok = couch_file:sync(Fd),
- init_main(DbName, Fd, Header);
+ ok = couch_file:sync(Fd);
false ->
- {ok, Header} = couch_file:read_header(Fd, <<$g, $m, $k, 0>>),
- init_main(DbName, Fd, Header)
- end.
+ {ok, Header} = couch_file:read_header(Fd, <<$g, $m, $k, 0>>)
+ end,
+
+ Db = init_db(DbName, Fd, Header),
+
+ UpdatePid = spawn_link(couch_db, start_update_loop, [Db]),
+
+ {ok, Db#db{update_pid=UpdatePid}}.
btree_by_seq_split(DocInfo) ->
#doc_info{
@@ -374,10 +383,10 @@ btree_by_name_join(Id, {Seq, Tree}) ->
#full_doc_info{id=Id, update_seq=Seq, rev_tree=Tree}.
-init_main(DbName, Fd, Header) ->
+init_db(DbName, Fd, Header) ->
{ok, SummaryStream} = couch_stream:open(Header#db_header.summary_stream_state, Fd),
ok = couch_stream:set_min_buffer(SummaryStream, 10000),
- {ok, IdBtree} = couch_btree:open(Header#db_header.docinfo_by_Id_btree_state, Fd,
+ {ok, IdBtree} = couch_btree:open(Header#db_header.fulldocinfo_by_id_btree_state, Fd,
[{split, fun(V) -> btree_by_name_split(V) end},
{join, fun(K,V) -> btree_by_name_join(K,V) end}] ),
{ok, SeqBtree} = couch_btree:open(Header#db_header.docinfo_by_seq_btree_state, Fd,
@@ -385,26 +394,22 @@ init_main(DbName, Fd, Header) ->
{join, fun(K,V) -> btree_by_seq_join(K,V) end}] ),
{ok, LocalDocsBtree} = couch_btree:open(Header#db_header.local_docs_btree_state, Fd),
- Db = #db{
+ #db{
main_pid=self(),
fd=Fd,
header=Header,
summary_stream = SummaryStream,
- docinfo_by_Id_btree = IdBtree,
+ fulldocinfo_by_id_btree = IdBtree,
docinfo_by_seq_btree = SeqBtree,
local_docs_btree = LocalDocsBtree,
- last_update_seq = Header#db_header.last_update_seq,
+ update_seq = Header#db_header.update_seq,
doc_count = Header#db_header.doc_count,
doc_del_count = Header#db_header.doc_del_count,
name = DbName
- },
-
- UpdatePid = spawn_link(couch_db, start_update_loop, [Db]),
-
- {ok, Db#db{update_pid=UpdatePid}}.
+ }.
terminate(_Reason, Db) ->
- Db#db.update_pid ! close,
+ exit(Db#db.update_pid, kill),
couch_file:close(Db#db.fd).
handle_call({update_docs, DocActions, Options}, From, #db{update_pid=Updater}=Db) ->
@@ -416,8 +421,17 @@ handle_call({db_updated, NewDb}, _From, _OldDb) ->
{reply, ok, NewDb}.
-handle_cast(foo, Main) ->
- {noreply, Main}.
+handle_cast(start_compact, #db{update_pid=Updater}=Db) ->
+ Updater ! compact,
+ {noreply, Db}.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+handle_info(Msg, Db) ->
+ couch_log:error("Bad message received for db ~s: ~p", [Db#db.name, Msg]),
+ exit({error, Msg}).
+
%%% Internal function %%%
@@ -439,11 +453,40 @@ update_loop(Db) ->
Error ->
exit(Error) % we crashed
end;
- close ->
- % terminate loop
- exit(normal)
+ compact ->
+ case Db#db.compactor_pid of
+ nil ->
+ Pid = spawn_link(couch_db, start_copy_compact_int, [Db]),
+ Db2 = Db#db{compactor_pid=Pid},
+ ok = gen_server:call(Db#db.main_pid, {db_updated, Db2}),
+ update_loop(Db2);
+ _ ->
+ update_loop(Db) % already started
+ end;
+ {compact_done, #db{update_seq=CompactSeq}=NewDb} ->
+ case CompactSeq == Db#db.update_seq of
+ true ->
+ NewDb2 = swap_files(Db, NewDb),
+ update_loop(NewDb2#db{compactor_pid=nil});
+ false ->
+ Pid = spawn_link(couch_db, continue_copy_compact_int, [Db, NewDb]),
+ Db2 = Db#db{compactor_pid=Pid},
+ ok = gen_server:call(Db#db.main_pid, {db_updated, Db2}),
+ update_loop(Db2)
+ end;
+ Else ->
+ couch_log:error("Unknown message received in db ~s:~p", [Db#db.name, Else]),
+ exit({error, Else})
end.
+swap_files(#db{fd=OldFd, name=Name}=_DbOld, DbNew) ->
+ NormalFilename = couch_server:get_filename(Name),
+ true = file:rename(NormalFilename, NormalFilename ++ ".old"),
+ true = file:rename(NormalFilename ++ ".compact", NormalFilename),
+ couch_file:close(OldFd),
+ file:delete(NormalFilename ++ ".old"),
+ DbNew.
+
get_db(MainPid) ->
{ok, Db} = gen_server:call(MainPid, get_db),
Db.
@@ -466,7 +509,7 @@ open_doc_revs_int(Db, Id, Revs, Options) ->
FoundResults =
lists:map(fun({Rev, Value, FoundRevPath}) ->
case Value of
- 0 ->
+ ?REV_MISSING ->
% we have the rev in our list but know nothing about it
{{not_found, missing}, Rev};
{IsDeleted, SummaryPtr} ->
@@ -538,7 +581,7 @@ doc_to_tree(Doc) ->
doc_to_tree(Doc, [RevId]) ->
[{RevId, Doc, []}];
doc_to_tree(Doc, [RevId | Rest]) ->
- [{RevId, [], doc_to_tree(Doc, Rest)}].
+ [{RevId, ?REV_MISSING, doc_to_tree(Doc, Rest)}].
make_doc(Db, Id, Deleted, SummaryPointer, RevisionPath) ->
{BodyData, BinValues} =
@@ -613,9 +656,9 @@ new_index_entries([Id|RestIds], [RevTree|RestTrees], Seq0, DocCount, DelCount, A
update_docs_int(Db, DocsList, Options) ->
#db{
- docinfo_by_Id_btree = DocInfoByIdBTree,
+ fulldocinfo_by_id_btree = DocInfoByIdBTree,
docinfo_by_seq_btree = DocInfoBySeqBTree,
- last_update_seq = LastSeq,
+ update_seq = LastSeq,
doc_count = FullDocCount,
doc_del_count = FullDelCount
} = Db,
@@ -678,9 +721,9 @@ update_docs_int(Db, DocsList, Options) ->
{ok, DocInfoByIdBTree2} = couch_btree:add_remove(DocInfoByIdBTree, InfoById, []),
Db3 = Db2#db{
- docinfo_by_Id_btree = DocInfoByIdBTree2,
+ fulldocinfo_by_id_btree = DocInfoByIdBTree2,
docinfo_by_seq_btree = DocInfoBySeqBTree2,
- last_update_seq = NewSeq,
+ update_seq = NewSeq,
doc_count = FullDocCount + NewDocsCount - OldCount,
doc_del_count = FullDelCount + NewDelCount - OldDelCount
},
@@ -689,7 +732,7 @@ update_docs_int(Db, DocsList, Options) ->
true ->
{ok, Db3};
false ->
- commit_outstanding(Db3)
+ {ok, commit_data(Db3)}
end.
update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) ->
@@ -697,20 +740,20 @@ update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) ->
OldDocLookups = couch_btree:lookup(Btree, Ids),
BtreeEntries = lists:zipwith(
fun(#doc{id=Id,deleted=Delete,revs=Revs,body=Body}, OldDocLookup) ->
- BasedOnRev =
+ NewRev =
case Revs of
[] -> 0;
- [RevStr|_] -> list_to_integer(RevStr) - 1
+ [RevStr|_] -> list_to_integer(RevStr)
end,
OldRev =
case OldDocLookup of
{ok, {_, {OldRev0, _}}} -> OldRev0;
not_found -> 0
end,
- case OldRev == BasedOnRev of
+ case OldRev + 1 == NewRev of
true ->
case Delete of
- false -> {update, {Id, {OldRev+1, Body}}};
+ false -> {update, {Id, {NewRev, Body}}};
true -> {remove, Id}
end;
false ->
@@ -729,29 +772,111 @@ update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) ->
-commit_outstanding(#db{fd=Fd, header=Header} = Db) ->
+commit_data(#db{fd=Fd, header=Header} = Db) ->
ok = couch_file:sync(Fd), % commit outstanding data
Header2 = Header#db_header{
- last_update_seq = Db#db.last_update_seq,
+ update_seq = Db#db.update_seq,
summary_stream_state = couch_stream:get_state(Db#db.summary_stream),
docinfo_by_seq_btree_state = couch_btree:get_state(Db#db.docinfo_by_seq_btree),
- docinfo_by_Id_btree_state = couch_btree:get_state(Db#db.docinfo_by_Id_btree),
+ fulldocinfo_by_id_btree_state = couch_btree:get_state(Db#db.fulldocinfo_by_id_btree),
local_docs_btree_state = couch_btree:get_state(Db#db.local_docs_btree),
doc_count = Db#db.doc_count,
doc_del_count = Db#db.doc_del_count
},
- ok = couch_file:write_header(Fd, <<$g, $m, $k, 0>>, Header2),
- ok = couch_file:sync(Fd), % commit header to disk
- Db2 = Db#db{
- header = Header2
- },
- {ok, Db2}.
+ if Header == Header2 ->
+ Db; % unchanged. nothing to do
+ true ->
+ ok = couch_file:write_header(Fd, <<$g, $m, $k, 0>>, Header2),
+ ok = couch_file:sync(Fd), % commit header to disk
+ Db#db{header = Header2}
+ end.
+copy_raw_doc(SrcFd, SrcSp, DestFd, DestStream) ->
+ {ok, {BodyData, BinInfos}} = couch_stream:read_term(SrcFd, SrcSp),
+ % copy the bin values
+ NewBinInfos = lists:map(fun({Name, {Type, BinSp, Len}}) ->
+ {ok, NewBinSp} = couch_stream:copy_stream(SrcFd, BinSp, Len, DestFd),
+ {Name, {Type, NewBinSp, Len}}
+ end, BinInfos),
+ % now write the document summary
+ {ok, _SummaryPointer} = couch_stream:write_term(DestStream, {BodyData, NewBinInfos}).
+
+copy_rev_tree(_SrcFd, _DestFd, _DestStream, []) ->
+ [];
+copy_rev_tree(SrcFd, DestFd, DestStream, [{RevId, {IsDel, Sp}, []} | RestTrees]) ->
+ % This is a leaf node, copy it over
+ NewSp = copy_raw_doc(SrcFd, Sp, DestFd, DestStream),
+ [{RevId, {IsDel, NewSp}, []} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTrees)];
+copy_rev_tree(SrcFd, DestFd, DestStream, [{RevId, _, SubTrees} | RestTrees]) ->
+ % inner node, only copy info/data from leaf nodes
+ [{RevId, ?REV_MISSING, copy_rev_tree(SrcFd, DestFd, DestStream, SubTrees)} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTrees)].
+
+copy_docs(#db{fd=SrcFd}=Db, #db{fd=DestFd,summary_stream=DestStream}=NewDb, InfoBySeq) ->
+ Ids = [Id || #doc_info{id=Id} <- InfoBySeq],
+ LookupResults = couch_btree:lookup(Db#db.fulldocinfo_by_id_btree, Ids),
+ NewFullDocInfos = lists:map(
+ fun({ok, #full_doc_info{rev_tree=RevTree}=Info}) ->
+ Info#full_doc_info{rev_tree=copy_rev_tree(SrcFd, DestFd, DestStream, RevTree)}
+ end, LookupResults),
+ NewDocInfos = [couch_doc:to_doc_info(FullDocInfo) || FullDocInfo <- NewFullDocInfos],
+ {ok, DocInfoBTree} =
+ couch_btree:add_remove(NewDb#db.docinfo_by_seq_btree, NewDocInfos, []),
+ {ok, FullDocInfoBTree} =
+ couch_btree:add_remove(NewDb#db.fulldocinfo_by_id_btree, NewFullDocInfos, []),
+ NewDb#db{fulldocinfo_by_id_btree=FullDocInfoBTree, docinfo_by_seq_btree=DocInfoBTree}.
+
+
+
+copy_compact_docs(Db, NewDb) ->
+ EnumBySeqFun =
+ fun(#doc_info{update_seq=Seq}=DocInfo, _Offset, {AccNewDb, AccUncopied}) ->
+ case couch_util:should_flush() of
+ true ->
+ NewDb2 = copy_docs(Db, AccNewDb, lists:reverse(AccUncopied, DocInfo)),
+ {ok, {commit_data(NewDb2#db{update_seq=Seq}), []}};
+ false ->
+ {ok, {AccNewDb, [DocInfo | AccUncopied]}}
+ end
+ end,
+ {ok, {NewDb2, Uncopied}} =
+ couch_btree:foldl(Db#db.docinfo_by_seq_btree, NewDb#db.update_seq + 1, EnumBySeqFun, {NewDb, []}),
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
+ case Uncopied of
+ [#doc_info{update_seq=LastSeq} | _] ->
+ commit_data( copy_docs(Db, NewDb2#db{update_seq=LastSeq},
+ lists:reverse(Uncopied)));
+ [] ->
+ NewDb2
+ end.
-handle_info(_Info, State) ->
- {noreply, State}.
+start_copy_compact_int(#db{name=Name}=Db) ->
+ couch_log:debug("New compaction process spawned for db \"%s\"", [Name]),
+ Filename = couch_server:get_compaction_filename(Name),
+ case couch_file:open(Filename) of
+ {ok, Fd} ->
+ couch_log:debug("Found existing compaction file for db \"%s\"", [Name]),
+ {ok, Header} = couch_file:read_header(Fd, <<$g, $m, $k, 0>>);
+ {error, enoent} -> %
+ {ok, Fd} = couch_file:open(Filename, [create]),
+ Header = #db_header{},
+ ok = couch_file:write_header(Fd, <<$g, $m, $k, 0>>, Header),
+ ok = couch_file:sync(Fd)
+ end,
+ NewDb = init_db(Name, Fd, Header),
+ NewDb2 = copy_compact_docs(Db, NewDb),
+
+ % suck up all the local docs into memory and write them to the new db
+ {ok, LocalDocs} = couch_btree:foldl(Db#db.local_docs_btree,
+ fun(Value, _Offset, Acc) -> {ok, [Value | Acc]} end, []),
+ {ok, NewLocalBtree} = couch_btree:add(NewDb2#db.local_docs_btree, LocalDocs),
+ NewDb3 = commit_data(NewDb2#db{local_docs_btree=NewLocalBtree}),
+
+ NewDb3#db.update_pid ! {compact_done, NewDb3}.
+
+continue_copy_compact_int(#db{name=Name}=Db, NewDb) ->
+ couch_log:debug("Continued compaction process spawned for db \"%s\"", [Name]),
+ NewDb2 = copy_compact_docs(Db, NewDb),
+ NewDb2#db.update_pid ! {compact_done, NewDb2}.
+ \ No newline at end of file
diff --git a/src/couchdb/couch_stream.erl b/src/couchdb/couch_stream.erl
index d5157b4d..ae1b4f2c 100644
--- a/src/couchdb/couch_stream.erl
+++ b/src/couchdb/couch_stream.erl
@@ -83,7 +83,7 @@ copy(#stream{pid = _Pid, fd = Fd}, Sp, Num, DestStream) ->
copy(Fd, Sp, Num, DestStream) ->
{ok, NewSp, _Sp2} = stream_data(Fd, Sp, Num, ?HUGE_CHUNK,
fun(Bin, AccPointer) ->
- {ok, NewPointer} = write(Bin, DestStream),
+ {ok, NewPointer} = write(DestStream, Bin),
if AccPointer == null -> NewPointer; true -> AccPointer end
end,
null),
diff --git a/src/couchdb/couch_util.erl b/src/couchdb/couch_util.erl
index 42845fe0..f85cc834 100644
--- a/src/couchdb/couch_util.erl
+++ b/src/couchdb/couch_util.erl
@@ -14,7 +14,7 @@
-behaviour(gen_server).
-export([start_link/0,start_link/1]).
--export([parse_ini/1]).
+-export([parse_ini/1,should_flush/0, should_flush/1]).
-export([new_uuid/0, rand32/0, implode/2, collate/2, collate/3]).
-export([abs_pathname/1,abs_pathname/2, trim/1, ascii_lower/1, test/0]).
-export([encodeBase64/1, decodeBase64/1]).
@@ -22,6 +22,8 @@
-export([init/1, terminate/2, handle_call/3]).
-export([handle_cast/2,code_change/3,handle_info/2]).
+% arbitrarily chosen amount of memory to use before flushing to disk
+-define(FLUSH_MAX_MEM, 10000000).
start_link() ->
start_link("").
@@ -246,6 +248,22 @@ collate(A, B, Options) when is_list(A), is_list(B) ->
[2] -> 0
end.
+should_flush() ->
+ should_flush(?FLUSH_MAX_MEM).
+
+should_flush(MemThreshHold) ->
+ case process_info(self(), memory) of
+ {memory, Mem} when Mem > 2*MemThreshHold ->
+ garbage_collect(),
+ case process_info(self(), memory) of
+ {memory, Mem} when Mem > MemThreshHold ->
+ true;
+ _ ->
+ false
+ end;
+ _ ->
+ false
+ end.
diff --git a/src/couchdb/couch_view.erl b/src/couchdb/couch_view.erl
index 612eb5fd..97228530 100644
--- a/src/couchdb/couch_view.erl
+++ b/src/couchdb/couch_view.erl
@@ -20,9 +20,6 @@
-include("couch_db.hrl").
-% arbitrarily chosen amount of memory to use before flushing to disk
--define(FLUSH_MAX_MEM, 10000000).
-
-record(group,
{db,
fd,
@@ -68,12 +65,11 @@ get_updated_group(Pid) ->
receive
{Pid, Response} ->
erlang:demonitor(Mref),
- receive
- {'DOWN', Mref, _, _, _} ->
- Response
- after 0 ->
- Response
- end;
+ receive
+ {'DOWN', Mref, _, _, _} -> ok
+ after 0 -> ok
+ end,
+ Response;
{'DOWN', Mref, _, _, Reason} ->
throw(Reason)
end
@@ -201,7 +197,10 @@ handle_info({'EXIT', FromPid, Reason}, #server{root_dir=RootDir}=Server) ->
[{_, {DbName, GroupId}}] ->
delete_from_ets(FromPid, DbName, GroupId)
end,
- {noreply, Server}.
+ {noreply, Server};
+handle_info(Msg, _Server) ->
+ couch_log:error("Bad message received for view module: ~p", [Msg]),
+ exit({error, Msg}).
add_to_ets(Pid, DbName, GroupId) ->
true = ets:insert(couch_views_by_updater, {Pid, {DbName, GroupId}}),
@@ -216,11 +215,6 @@ delete_from_ets(Pid, DbName, GroupId) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-start_update_loop(RootDir, DbName, GroupId) ->
- % wait for a notify request before doing anything. This way, we can just
- % exit and any exits will be noticed by the callers.
- start_update_loop(RootDir, DbName, GroupId, get_notify_pids(1000)).
-
start_temp_update_loop(DbName, Fd, Lang, Query) ->
NotifyPids = get_notify_pids(1000),
@@ -243,7 +237,12 @@ temp_update_loop(Group, NotifyPids) ->
{ok, Group2} = update_group(Group),
[Pid ! {self(), {ok, Group2}} || Pid <- NotifyPids],
garbage_collect(),
- temp_update_loop(Group2, get_notify_pids(100000)).
+ temp_update_loop(Group2, get_notify_pids(10000)).
+
+start_update_loop(RootDir, DbName, GroupId) ->
+ % wait for a notify request before doing anything. This way, we can just
+ % exit and any exits will be noticed by the callers.
+ start_update_loop(RootDir, DbName, GroupId, get_notify_pids(1000)).
start_update_loop(RootDir, DbName, GroupId, NotifyPids) ->
{Db, DefLang, Defs} =
@@ -284,13 +283,16 @@ update_loop(#group{fd=Fd}=Group, NotifyPids) ->
update_loop(Group2).
update_loop(Group) ->
- update_loop(Group, get_notify_pids()).
+ update_loop(Group, get_notify_pids(100000)).
% wait for the first request to come in.
get_notify_pids(Wait) ->
receive
{Pid, get_updated} ->
- [Pid | get_notify_pids()]
+ [Pid | get_notify_pids()];
+ Else ->
+ couch_log:error("Unexpected message in view updater: ~p", [Else]),
+ exit({error, Else})
after Wait ->
exit(wait_timeout)
end.
@@ -526,15 +528,15 @@ process_doc(Db, DocInfo, {Docs, #group{name=GroupId}=Group, ViewKVs, DocIdViewId
{ok, Doc} = couch_db:open_doc(Db, DocInfo, [conflicts, deleted_conflicts]),
{[Doc | Docs], DocIdViewIdKeys}
end,
- case process_info(self(), memory) of
- {memory, Mem} when Mem > ?FLUSH_MAX_MEM ->
+ case couch_util:should_flush() of
+ true ->
{Group1, Results} = view_compute(Group, Docs2),
{ViewKVs3, DocIdViewIdKeys3} = view_insert_query_results(Docs2, Results, ViewKVs, DocIdViewIdKeys2),
{ok, Group2} = write_changes(Group1, ViewKVs3, DocIdViewIdKeys3, Seq),
garbage_collect(),
ViewEmptyKeyValues = [{View, []} || View <- Group2#group.views],
{ok, {[], Group2, ViewEmptyKeyValues, [], Seq}};
- _Else ->
+ false ->
{ok, {Docs2, Group, ViewKVs, DocIdViewIdKeys2, Seq}}
end
end.