From cdf43ab5a1d5ea21e42302c848fe4f07150e6947 Mon Sep 17 00:00:00 2001 From: "Damien F. Katz" Date: Thu, 9 Apr 2009 21:37:23 +0000 Subject: Fix for attachment sparseness bug COUCHDB-220 by giving each attachment it's own stream and calling set_min_buffer instead of ensure_buffer. Also fixed spurious couch_file crash messages by putting the statistics decrement code into a seperate monitoring process. git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@763816 13f79535-47bb-0310-9956-ffa450edef68 --- share/www/script/test/attachments.js | 33 +++++++++++++++++++++++++ src/couchdb/couch_db.erl | 46 +++++++++++------------------------ src/couchdb/couch_file.erl | 26 ++++++++++++++------ src/couchdb/couch_stats_collector.erl | 8 ++++-- src/couchdb/couch_stream.erl | 1 + 5 files changed, 72 insertions(+), 42 deletions(-) diff --git a/share/www/script/test/attachments.js b/share/www/script/test/attachments.js index 4b43e31a..d9560bce 100644 --- a/share/www/script/test/attachments.js +++ b/share/www/script/test/attachments.js @@ -142,5 +142,38 @@ couchTests.attachments= function(debug) { var xhr = CouchDB.request("GET", "/test_suite_db/bin_doc4/attachment.txt"); T(xhr.status == 200); T(xhr.responseText == "This is a string"); + + + // Attachment sparseness COUCHDB-220 + + var docs = [] + for (var i = 0; i < 5; i++) { + var doc = { + _id: (i).toString(), + _attachments:{ + "foo.txt": { + content_type:"text/plain", + data: "VGhpcyBpcyBhIGJhc2U2NCBlbmNvZGVkIHRleHQ=" + } + } + } + docs.push(doc) + } + + db.bulkSave(docs); + + var before = db.info().disk_size; + + // Compact it. + T(db.compact().ok); + T(db.last_req.status == 202); + // compaction isn't instantaneous, loop until done + while (db.info().compact_running) {}; + + var after = db.info().disk_size; + + // Compaction should reduce the database slightly, but not + // orders of magnitude (unless attachments introduce sparseness) + T(after > before * 0.1, "before: " + before + " after: " + after); }; diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl index 681bc2ab..18cb98ee 100644 --- a/src/couchdb/couch_db.erl +++ b/src/couchdb/couch_db.erl @@ -548,34 +548,7 @@ write_and_commit(#db{update_pid=UpdatePid, user_ctx=Ctx}=Db, DocBuckets, doc_flush_binaries(Doc, Fd) -> - % calc size of binaries to write out - Bins = Doc#doc.attachments, - PreAllocSize = lists:foldl( - fun(BinValue, SizeAcc) -> - case BinValue of - {_Key, {_Type, {Fd0, _StreamPointer, _Len}}} when Fd0 == Fd -> - % already written to our file, nothing to write - SizeAcc; - {_Key, {_Type, {_OtherFd, _StreamPointer, Len}}} -> - % written to a different file - SizeAcc + Len; - {_Key, {_Type, Bin}} when is_binary(Bin) -> - % we have a new binary to write - SizeAcc + size(Bin); - {_Key, {_Type, {Fun, undefined}}} when is_function(Fun) -> - % function without a known length - % we'll have to alloc as we go with this one, for now, nothing - SizeAcc; - {_Key, {_Type, {Fun, Len}}} when is_function(Fun) -> - % function to yield binary data with known length - SizeAcc + Len - end - end, - 0, Bins), - - {ok, OutputStream} = couch_stream:open(Fd), - ok = couch_stream:ensure_buffer(OutputStream, PreAllocSize), - NewBins = lists:map( + NewAttachments = lists:map( fun({Key, {Type, BinValue}}) -> NewBinValue = case BinValue of @@ -585,6 +558,8 @@ doc_flush_binaries(Doc, Fd) -> {OtherFd, StreamPointer, Len} -> % written to a different file (or a closed file % instance, which will cause an error) + {ok, OutputStream} = couch_stream:open(Fd), + ok = couch_stream:set_min_buffer(OutputStream, Len), {ok, {NewStreamPointer, Len}, _EndSp} = couch_stream:foldl(OtherFd, StreamPointer, Len, fun(Bin, {BeginPointer, SizeAcc}) -> @@ -597,31 +572,38 @@ doc_flush_binaries(Doc, Fd) -> end end, {{0,0}, 0}), + couch_stream:close(OutputStream), {Fd, NewStreamPointer, Len}; Bin when is_binary(Bin) -> + {ok, OutputStream} = couch_stream:open(Fd), + ok = couch_stream:set_min_buffer(OutputStream, size(Bin)), {ok, StreamPointer} = couch_stream:write(OutputStream, Bin), + couch_stream:close(OutputStream), {Fd, StreamPointer, size(Bin)}; {StreamFun, undefined} when is_function(StreamFun) -> % max_attachment_chunk_size control the max we buffer in memory MaxChunkSize = list_to_integer(couch_config:get("couchdb", "max_attachment_chunk_size","4294967296")), + {ok, OutputStream} = couch_stream:open(Fd), WriterFun = make_writer_fun(OutputStream), % StreamFun(MaxChunkSize, WriterFun) % will call our WriterFun % once for each chunk of the attachment. {ok, {TotalLength, NewStreamPointer}} = StreamFun(MaxChunkSize, WriterFun, {0, nil}), + couch_stream:close(OutputStream), {Fd, NewStreamPointer, TotalLength}; {Fun, Len} when is_function(Fun) -> + {ok, OutputStream} = couch_stream:open(Fd), + ok = couch_stream:set_min_buffer(OutputStream, Len), {ok, StreamPointer} = write_streamed_attachment(OutputStream, Fun, Len, nil), + couch_stream:close(OutputStream), {Fd, StreamPointer, Len} end, {Key, {Type, NewBinValue}} - end, Bins), - - {ok, _FinalPos} = couch_stream:close(OutputStream), - Doc#doc{attachments = NewBins}. + end, Doc#doc.attachments), + Doc#doc{attachments = NewAttachments}. make_writer_fun(Stream) -> diff --git a/src/couchdb/couch_file.erl b/src/couchdb/couch_file.erl index ecab455b..93c43434 100644 --- a/src/couchdb/couch_file.erl +++ b/src/couchdb/couch_file.erl @@ -269,7 +269,6 @@ init_status_error(ReturnPid, Ref, Error) -> % server functions init({Filepath, Options, ReturnPid, Ref}) -> - process_flag(trap_exit, true), case lists:member(create, Options) of true -> filelib:ensure_dir(Filepath), @@ -285,14 +284,14 @@ init({Filepath, Options, ReturnPid, Ref}) -> true -> {ok, 0} = file:position(Fd, 0), ok = file:truncate(Fd), - catch couch_stats_collector:increment({couchdb, open_os_files}), + track_stats(), {ok, Fd}; false -> ok = file:close(Fd), init_status_error(ReturnPid, Ref, file_exists) end; false -> - catch couch_stats_collector:increment({couchdb, open_os_files}), + track_stats(), {ok, Fd} end; Error -> @@ -304,7 +303,7 @@ init({Filepath, Options, ReturnPid, Ref}) -> {ok, Fd_Read} -> {ok, Fd} = file:open(Filepath, [read, write, raw, binary]), ok = file:close(Fd_Read), - catch couch_stats_collector:increment({couchdb, open_os_files}), + track_stats(), {ok, Fd}; Error -> init_status_error(ReturnPid, Ref, Error) @@ -312,10 +311,21 @@ init({Filepath, Options, ReturnPid, Ref}) -> end. -terminate(_Reason, _Fd) -> - catch couch_stats_collector:decrement({couchdb, open_os_files}), +terminate(_Reason, _Fd) -> ok. +track_stats() -> + try couch_stats_collector:increment({couchdb, open_os_files}) of + ok -> + Self = self(), + spawn( + fun() -> + erlang:monitor(process, Self), + receive {'DOWN', _, _, _, _} -> ok end, + couch_stats_collector:decrement({couchdb, open_os_files}) + end) + catch _ -> ok + end. handle_call({pread, Pos, Bytes}, _From, Fd) -> {reply, file:pread(Fd, Pos, Bytes), Fd}; @@ -349,5 +359,5 @@ handle_cast(close, Fd) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -handle_info({'EXIT', _, Reason}, Fd) -> - {stop, Reason, Fd}. +handle_info(foo, _Fd) -> + ok. diff --git a/src/couchdb/couch_stats_collector.erl b/src/couchdb/couch_stats_collector.erl index 8d0234bb..854fffb0 100644 --- a/src/couchdb/couch_stats_collector.erl +++ b/src/couchdb/couch_stats_collector.erl @@ -56,13 +56,17 @@ increment({Module, Key}) when is_integer(Key) -> increment({Module, list_to_atom(integer_to_list(Key))}); increment(Key) -> case catch ets:update_counter(?HIT_COUNTER_TABLE, Key, 1) of - {'EXIT', {badarg, _}} -> ets:insert(?HIT_COUNTER_TABLE, {Key, 1}); + {'EXIT', {badarg, _}} -> + true = ets:insert(?HIT_COUNTER_TABLE, {Key, 1}), + ok; _ -> ok end. decrement(Key) -> case catch ets:update_counter(?HIT_COUNTER_TABLE, Key, -1) of - {'EXIT', {badarg, _}} -> ets:insert(?HIT_COUNTER_TABLE, {Key, -1}); + {'EXIT', {badarg, _}} -> + true = ets:insert(?HIT_COUNTER_TABLE, {Key, -1}), + ok; _ -> ok end. diff --git a/src/couchdb/couch_stream.erl b/src/couchdb/couch_stream.erl index d6f72696..addf9acf 100644 --- a/src/couchdb/couch_stream.erl +++ b/src/couchdb/couch_stream.erl @@ -38,6 +38,7 @@ current_pos = 0, bytes_remaining = 0, next_alloc = 0, + first_alloc = 0, min_alloc = 16#00010000 }). -- cgit v1.2.3