summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Christopher Anderson <jchris@apache.org>2009-04-11 22:20:22 +0000
committerJohn Christopher Anderson <jchris@apache.org>2009-04-11 22:20:22 +0000
commit73da90b61d8f51a059700f2061c62f2f9d452662 (patch)
tree07512b67e95d6799bbec7b5a49a718460ca03e4c
parent1df631f8653fefdda8d852b39f4ca9aec46f69da (diff)
refactor: extract method from doc_flush_binaries. add with_stream/2 to handle automatically opening and closing binary streams.
git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@764257 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--src/couchdb/couch_db.erl109
1 files changed, 58 insertions, 51 deletions
diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl
index e7df2f25..a4b79ae5 100644
--- a/src/couchdb/couch_db.erl
+++ b/src/couchdb/couch_db.erl
@@ -550,61 +550,68 @@ write_and_commit(#db{update_pid=UpdatePid, user_ctx=Ctx}=Db, DocBuckets,
doc_flush_binaries(Doc, Fd) ->
NewAttachments = lists:map(
fun({Key, {Type, BinValue}}) ->
- NewBinValue =
- case BinValue of
- {Fd0, StreamPointer, Len} when Fd0 == Fd ->
- % already written to our file, nothing to write
- {Fd, StreamPointer, Len};
- {OtherFd, StreamPointer, Len} ->
- % written to a different file (or a closed file
- % instance, which will cause an error)
- {ok, OutputStream} = couch_stream:open(Fd),
- ok = couch_stream:set_min_buffer(OutputStream, Len),
- {ok, {NewStreamPointer, Len}, _EndSp} =
- couch_stream:foldl(OtherFd, StreamPointer, Len,
- fun(Bin, {BeginPointer, SizeAcc}) ->
- {ok, Pointer} = couch_stream:write(OutputStream, Bin),
- case SizeAcc of
- 0 -> % this was the first write, record the pointer
- {ok, {Pointer, size(Bin)}};
- _ ->
- {ok, {BeginPointer, SizeAcc + size(Bin)}}
- end
- end,
- {{0,0}, 0}),
- couch_stream:close(OutputStream),
- {Fd, NewStreamPointer, Len};
- Bin when is_binary(Bin) ->
- {ok, OutputStream} = couch_stream:open(Fd),
- ok = couch_stream:set_min_buffer(OutputStream, size(Bin)),
- {ok, StreamPointer} = couch_stream:write(OutputStream, Bin),
- couch_stream:close(OutputStream),
- {Fd, StreamPointer, size(Bin)};
- {StreamFun, undefined} when is_function(StreamFun) ->
- % max_attachment_chunk_size control the max we buffer in memory
- MaxChunkSize = list_to_integer(couch_config:get("couchdb",
- "max_attachment_chunk_size","4294967296")),
- {ok, OutputStream} = couch_stream:open(Fd),
- WriterFun = make_writer_fun(OutputStream),
- % StreamFun(MaxChunkSize, WriterFun)
- % will call our WriterFun
- % once for each chunk of the attachment.
- {ok, {TotalLength, NewStreamPointer}} =
- StreamFun(MaxChunkSize, WriterFun, {0, nil}),
- couch_stream:close(OutputStream),
- {Fd, NewStreamPointer, TotalLength};
- {Fun, Len} when is_function(Fun) ->
- {ok, OutputStream} = couch_stream:open(Fd),
- ok = couch_stream:set_min_buffer(OutputStream, Len),
- {ok, StreamPointer} =
- write_streamed_attachment(OutputStream, Fun, Len, nil),
- couch_stream:close(OutputStream),
- {Fd, StreamPointer, Len}
- end,
+ NewBinValue = flush_binary(Fd, BinValue),
{Key, {Type, NewBinValue}}
end, Doc#doc.attachments),
Doc#doc{attachments = NewAttachments}.
+flush_binary(Fd, {Fd0, StreamPointer, Len}) when Fd0 == Fd ->
+ % already written to our file, nothing to write
+ {Fd, StreamPointer, Len};
+
+flush_binary(Fd, {OtherFd, StreamPointer, Len}) ->
+ with_stream(Fd, fun(OutputStream) ->
+ % written to a different file (or a closed file
+ % instance, which will cause an error)
+ ok = couch_stream:set_min_buffer(OutputStream, Len),
+ {ok, {NewStreamPointer, Len}, _EndSp} =
+ couch_stream:foldl(OtherFd, StreamPointer, Len,
+ fun(Bin, {BeginPointer, SizeAcc}) ->
+ {ok, Pointer} = couch_stream:write(OutputStream, Bin),
+ case SizeAcc of
+ 0 -> % this was the first write, record the pointer
+ {ok, {Pointer, size(Bin)}};
+ _ ->
+ {ok, {BeginPointer, SizeAcc + size(Bin)}}
+ end
+ end,
+ {{0,0}, 0}),
+ {Fd, NewStreamPointer, Len}
+ end);
+
+flush_binary(Fd, Bin) when is_binary(Bin) ->
+ with_stream(Fd, fun(OutputStream) ->
+ ok = couch_stream:set_min_buffer(OutputStream, size(Bin)),
+ {ok, StreamPointer} = couch_stream:write(OutputStream, Bin),
+ {Fd, StreamPointer, size(Bin)}
+ end);
+
+flush_binary(Fd, {StreamFun, undefined}) when is_function(StreamFun) ->
+ % max_attachment_chunk_size control the max we buffer in memory
+ MaxChunkSize = list_to_integer(couch_config:get("couchdb",
+ "max_attachment_chunk_size","4294967296")),
+ with_stream(Fd, fun(OutputStream) ->
+ % StreamFun(MaxChunkSize, WriterFun) must call WriterFun
+ % once for each chunk of the attachment.
+ WriterFun = make_writer_fun(OutputStream),
+ {ok, {TotalLength, NewStreamPointer}} =
+ StreamFun(MaxChunkSize, WriterFun, {0, nil}),
+ {Fd, NewStreamPointer, TotalLength}
+ end);
+
+flush_binary(Fd, {Fun, Len}) when is_function(Fun) ->
+ with_stream(Fd, fun(OutputStream) ->
+ ok = couch_stream:set_min_buffer(OutputStream, Len),
+ {ok, StreamPointer} =
+ write_streamed_attachment(OutputStream, Fun, Len, nil),
+ {Fd, StreamPointer, Len}
+ end).
+
+with_stream(Fd, Fun) ->
+ {ok, OutputStream} = couch_stream:open(Fd),
+ Result = Fun(OutputStream),
+ couch_stream:close(OutputStream),
+ Result.
make_writer_fun(Stream) ->
% WriterFun({Length, Binary}, State)