summaryrefslogtreecommitdiff
path: root/src/couchdb/couch_stream.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/couchdb/couch_stream.erl')
-rw-r--r--src/couchdb/couch_stream.erl46
1 files changed, 36 insertions, 10 deletions
diff --git a/src/couchdb/couch_stream.erl b/src/couchdb/couch_stream.erl
index 34dc5a07..65cf7126 100644
--- a/src/couchdb/couch_stream.erl
+++ b/src/couchdb/couch_stream.erl
@@ -24,7 +24,8 @@
-define(DEFAULT_STREAM_CHUNK, 16#00100000). % 1 meg chunks when streaming data
--export([open/1, close/1, write/2, foldl/4, old_foldl/5,old_copy_to_new_stream/4]).
+-export([open/1, close/1, write/2, foldl/4, foldl/5,
+ old_foldl/5,old_copy_to_new_stream/4]).
-export([copy_to_new_stream/3,old_read_term/2]).
-export([init/1, terminate/2, handle_call/3]).
-export([handle_cast/2,code_change/3,handle_info/2]).
@@ -37,7 +38,8 @@
buffer_list = [],
buffer_len = 0,
max_buffer = 4096,
- written_len = 0
+ written_len = 0,
+ md5
}).
@@ -79,6 +81,23 @@ foldl(Fd, [Pos|Rest], Fun, Acc) ->
{ok, Bin} = couch_file:pread_iolist(Fd, Pos),
foldl(Fd, Rest, Fun, Fun(Bin, Acc)).
+foldl(Fd, PosList, <<>>, Fun, Acc) ->
+ foldl(Fd, PosList, Fun, Acc);
+foldl(Fd, PosList, Md5, Fun, Acc) ->
+ foldl(Fd, PosList, Md5, erlang:md5_init(), Fun, Acc).
+
+
+foldl(_Fd, [], Md5, Md5Acc, _Fun, Acc) ->
+ Md5 = erlang:md5_final(Md5Acc),
+ Acc;
+foldl(Fd, [Pos], Md5, Md5Acc, Fun, Acc) ->
+ {ok, Bin} = couch_file:pread_iolist(Fd, Pos),
+ Md5 = erlang:md5_final(erlang:md5_update(Md5Acc, Bin)),
+ Fun(Bin, Acc);
+foldl(Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc) ->
+ {ok, Bin} = couch_file:pread_iolist(Fd, Pos),
+ foldl(Fd, Rest, Md5, erlang:md5_update(Md5Acc, Bin), Fun, Fun(Bin, Acc)).
+
write(_Pid, <<>>) ->
ok;
write(Pid, Bin) ->
@@ -86,7 +105,7 @@ write(Pid, Bin) ->
init(Fd) ->
- {ok, #stream{fd = Fd}}.
+ {ok, #stream{fd=Fd, md5=erlang:md5_init()}}.
terminate(_Reason, _Stream) ->
ok.
@@ -99,14 +118,18 @@ handle_call({write, Bin}, _From, Stream) ->
written_pointers = Written,
buffer_len = BufferLen,
buffer_list = Buffer,
- max_buffer = Max} = Stream,
+ max_buffer = Max,
+ md5 = Md5} = Stream,
if BinSize + BufferLen > Max ->
- {ok, Pos} = couch_file:append_binary(Fd, lists:reverse(Buffer, [Bin])),
+ WriteBin = lists:reverse(Buffer, [Bin]),
+ Md5_2 = erlang:md5_update(Md5, WriteBin),
+ {ok, Pos} = couch_file:append_binary(Fd, WriteBin),
{reply, ok, Stream#stream{
written_len=WrittenLen + BufferLen + BinSize,
written_pointers=[Pos|Written],
buffer_list=[],
- buffer_len=0}};
+ buffer_len=0,
+ md5=Md5_2}};
true ->
{reply, ok, Stream#stream{
buffer_list=[Bin|Buffer],
@@ -118,14 +141,17 @@ handle_call(close, _From, Stream) ->
written_len = WrittenLen,
written_pointers = Written,
buffer_len = BufferLen,
- buffer_list = Buffer} = Stream,
+ buffer_list = Buffer,
+ md5 = Md5} = Stream,
case Buffer of
[] ->
- Result = {lists:reverse(Written), WrittenLen};
+ Result = {lists:reverse(Written), WrittenLen, erlang:md5_final(Md5)};
_ ->
- {ok, Pos} = couch_file:append_binary(Fd, lists:reverse(Buffer)),
- Result = {lists:reverse(Written, [Pos]), WrittenLen + BufferLen}
+ WriteBin = lists:reverse(Buffer),
+ Md5Final = erlang:md5_final(erlang:md5_update(Md5, WriteBin)),
+ {ok, Pos} = couch_file:append_binary(Fd, WriteBin),
+ Result = {lists:reverse(Written, [Pos]), WrittenLen + BufferLen, Md5Final}
end,
{stop, normal, Result, Stream}.