From 73da90b61d8f51a059700f2061c62f2f9d452662 Mon Sep 17 00:00:00 2001 From: John Christopher Anderson Date: Sat, 11 Apr 2009 22:20:22 +0000 Subject: refactor: extract method from doc_flush_binaries. add with_stream/2 to handle automatically opening and closing binary streams. git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@764257 13f79535-47bb-0310-9956-ffa450edef68 --- src/couchdb/couch_db.erl | 109 +++++++++++++++++++++++++---------------------- 1 file changed, 58 insertions(+), 51 deletions(-) (limited to 'src/couchdb/couch_db.erl') diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl index e7df2f25..a4b79ae5 100644 --- a/src/couchdb/couch_db.erl +++ b/src/couchdb/couch_db.erl @@ -550,61 +550,68 @@ write_and_commit(#db{update_pid=UpdatePid, user_ctx=Ctx}=Db, DocBuckets, doc_flush_binaries(Doc, Fd) -> NewAttachments = lists:map( fun({Key, {Type, BinValue}}) -> - NewBinValue = - case BinValue of - {Fd0, StreamPointer, Len} when Fd0 == Fd -> - % already written to our file, nothing to write - {Fd, StreamPointer, Len}; - {OtherFd, StreamPointer, Len} -> - % written to a different file (or a closed file - % instance, which will cause an error) - {ok, OutputStream} = couch_stream:open(Fd), - ok = couch_stream:set_min_buffer(OutputStream, Len), - {ok, {NewStreamPointer, Len}, _EndSp} = - couch_stream:foldl(OtherFd, StreamPointer, Len, - fun(Bin, {BeginPointer, SizeAcc}) -> - {ok, Pointer} = couch_stream:write(OutputStream, Bin), - case SizeAcc of - 0 -> % this was the first write, record the pointer - {ok, {Pointer, size(Bin)}}; - _ -> - {ok, {BeginPointer, SizeAcc + size(Bin)}} - end - end, - {{0,0}, 0}), - couch_stream:close(OutputStream), - {Fd, NewStreamPointer, Len}; - Bin when is_binary(Bin) -> - {ok, OutputStream} = couch_stream:open(Fd), - ok = couch_stream:set_min_buffer(OutputStream, size(Bin)), - {ok, StreamPointer} = couch_stream:write(OutputStream, Bin), - couch_stream:close(OutputStream), - {Fd, StreamPointer, size(Bin)}; - {StreamFun, undefined} when is_function(StreamFun) -> - % max_attachment_chunk_size control the max we buffer in memory - MaxChunkSize = list_to_integer(couch_config:get("couchdb", - "max_attachment_chunk_size","4294967296")), - {ok, OutputStream} = couch_stream:open(Fd), - WriterFun = make_writer_fun(OutputStream), - % StreamFun(MaxChunkSize, WriterFun) - % will call our WriterFun - % once for each chunk of the attachment. - {ok, {TotalLength, NewStreamPointer}} = - StreamFun(MaxChunkSize, WriterFun, {0, nil}), - couch_stream:close(OutputStream), - {Fd, NewStreamPointer, TotalLength}; - {Fun, Len} when is_function(Fun) -> - {ok, OutputStream} = couch_stream:open(Fd), - ok = couch_stream:set_min_buffer(OutputStream, Len), - {ok, StreamPointer} = - write_streamed_attachment(OutputStream, Fun, Len, nil), - couch_stream:close(OutputStream), - {Fd, StreamPointer, Len} - end, + NewBinValue = flush_binary(Fd, BinValue), {Key, {Type, NewBinValue}} end, Doc#doc.attachments), Doc#doc{attachments = NewAttachments}. +flush_binary(Fd, {Fd0, StreamPointer, Len}) when Fd0 == Fd -> + % already written to our file, nothing to write + {Fd, StreamPointer, Len}; + +flush_binary(Fd, {OtherFd, StreamPointer, Len}) -> + with_stream(Fd, fun(OutputStream) -> + % written to a different file (or a closed file + % instance, which will cause an error) + ok = couch_stream:set_min_buffer(OutputStream, Len), + {ok, {NewStreamPointer, Len}, _EndSp} = + couch_stream:foldl(OtherFd, StreamPointer, Len, + fun(Bin, {BeginPointer, SizeAcc}) -> + {ok, Pointer} = couch_stream:write(OutputStream, Bin), + case SizeAcc of + 0 -> % this was the first write, record the pointer + {ok, {Pointer, size(Bin)}}; + _ -> + {ok, {BeginPointer, SizeAcc + size(Bin)}} + end + end, + {{0,0}, 0}), + {Fd, NewStreamPointer, Len} + end); + +flush_binary(Fd, Bin) when is_binary(Bin) -> + with_stream(Fd, fun(OutputStream) -> + ok = couch_stream:set_min_buffer(OutputStream, size(Bin)), + {ok, StreamPointer} = couch_stream:write(OutputStream, Bin), + {Fd, StreamPointer, size(Bin)} + end); + +flush_binary(Fd, {StreamFun, undefined}) when is_function(StreamFun) -> + % max_attachment_chunk_size control the max we buffer in memory + MaxChunkSize = list_to_integer(couch_config:get("couchdb", + "max_attachment_chunk_size","4294967296")), + with_stream(Fd, fun(OutputStream) -> + % StreamFun(MaxChunkSize, WriterFun) must call WriterFun + % once for each chunk of the attachment. + WriterFun = make_writer_fun(OutputStream), + {ok, {TotalLength, NewStreamPointer}} = + StreamFun(MaxChunkSize, WriterFun, {0, nil}), + {Fd, NewStreamPointer, TotalLength} + end); + +flush_binary(Fd, {Fun, Len}) when is_function(Fun) -> + with_stream(Fd, fun(OutputStream) -> + ok = couch_stream:set_min_buffer(OutputStream, Len), + {ok, StreamPointer} = + write_streamed_attachment(OutputStream, Fun, Len, nil), + {Fd, StreamPointer, Len} + end). + +with_stream(Fd, Fun) -> + {ok, OutputStream} = couch_stream:open(Fd), + Result = Fun(OutputStream), + couch_stream:close(OutputStream), + Result. make_writer_fun(Stream) -> % WriterFun({Length, Binary}, State) -- cgit v1.2.3