summaryrefslogtreecommitdiff
path: root/src/couchdb/couch_db.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/couchdb/couch_db.erl')
-rw-r--r--src/couchdb/couch_db.erl114
1 files changed, 41 insertions, 73 deletions
diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl
index 29dbbd38..d0a4e34c 100644
--- a/src/couchdb/couch_db.erl
+++ b/src/couchdb/couch_db.erl
@@ -24,7 +24,7 @@
-export([increment_update_seq/1,get_purge_seq/1,purge_docs/2,get_last_purged/1]).
-export([start_link/3,open_doc_int/3,set_admins/2,get_admins/1,ensure_full_commit/1]).
-export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]).
--export([changes_since/5]).
+-export([changes_since/5,read_doc/2]).
-include("couch_db.hrl").
@@ -50,6 +50,7 @@ open_db_file(Filepath, Options) ->
{ok, Fd} ->
?LOG_INFO("Found ~s~s compaction file, using as primary storage.", [Filepath, ".compact"]),
ok = file:rename(Filepath ++ ".compact", Filepath),
+ ok = couch_file:sync(Fd),
{ok, Fd};
{error, enoent} ->
{not_found, no_db_file}
@@ -154,7 +155,7 @@ increment_update_seq(#db{update_pid=UpdatePid}) ->
purge_docs(#db{update_pid=UpdatePid}, IdsRevs) ->
gen_server:call(UpdatePid, {purge_docs, IdsRevs}).
-get_committed_update_seq(#db{header=#db_header{update_seq=Seq}}) ->
+get_committed_update_seq(#db{committed_update_seq=Seq}) ->
Seq.
get_update_seq(#db{update_seq=Seq})->
@@ -565,93 +566,55 @@ flush_binary(Fd, {Fd0, StreamPointer, Len}) when Fd0 == Fd ->
% already written to our file, nothing to write
{Fd, StreamPointer, Len};
+flush_binary(Fd, {OtherFd, StreamPointer, Len}) when is_tuple(StreamPointer) ->
+ {NewStreamData, Len} =
+ couch_stream:old_copy_to_new_stream(OtherFd, StreamPointer, Len, Fd),
+ {Fd, NewStreamData, Len};
+
flush_binary(Fd, {OtherFd, StreamPointer, Len}) ->
- with_stream(Fd, fun(OutputStream) ->
- % written to a different file (or a closed file
- % instance, which will cause an error)
- ok = couch_stream:set_min_buffer(OutputStream, Len),
- {ok, {NewStreamPointer, Len}, _EndSp} =
- couch_stream:foldl(OtherFd, StreamPointer, Len,
- fun(Bin, {BeginPointer, SizeAcc}) ->
- {ok, Pointer} = couch_stream:write(OutputStream, Bin),
- case SizeAcc of
- 0 -> % this was the first write, record the pointer
- {ok, {Pointer, size(Bin)}};
- _ ->
- {ok, {BeginPointer, SizeAcc + size(Bin)}}
- end
- end,
- {{0,0}, 0}),
- {Fd, NewStreamPointer, Len}
- end);
+ {NewStreamData, Len} =
+ couch_stream:copy_to_new_stream(OtherFd, StreamPointer, Fd),
+ {Fd, NewStreamData, Len};
flush_binary(Fd, Bin) when is_binary(Bin) ->
- with_stream(Fd, fun(OutputStream) ->
- ok = couch_stream:set_min_buffer(OutputStream, size(Bin)),
- {ok, StreamPointer} = couch_stream:write(OutputStream, Bin),
- {Fd, StreamPointer, size(Bin)}
+ with_stream(Fd, fun(OutputStream) ->
+ couch_stream:write(OutputStream, Bin)
end);
flush_binary(Fd, {StreamFun, undefined}) when is_function(StreamFun) ->
- % max_attachment_chunk_size control the max we buffer in memory
- MaxChunkSize = list_to_integer(couch_config:get("couchdb",
- "max_attachment_chunk_size","4294967296")),
with_stream(Fd, fun(OutputStream) ->
% StreamFun(MaxChunkSize, WriterFun) must call WriterFun
- % once for each chunk of the attachment.
- WriterFun = make_writer_fun(OutputStream),
- {ok, {TotalLength, NewStreamPointer}} =
- StreamFun(MaxChunkSize, WriterFun, {0, nil}),
- {Fd, NewStreamPointer, TotalLength}
+ % once for each chunk of the attachment,
+ StreamFun(4096,
+ % WriterFun({Length, Binary}, State)
+ % WriterFun({0, _Footers}, State)
+ % Called with Length == 0 on the last time.
+ % WriterFun returns NewState.
+ fun({0, _Footers}, _) ->
+ ok;
+ ({_Length, Bin}, _) ->
+ couch_stream:write(OutputStream, Bin)
+ end, ok)
end);
flush_binary(Fd, {Fun, Len}) when is_function(Fun) ->
with_stream(Fd, fun(OutputStream) ->
- ok = couch_stream:set_min_buffer(OutputStream, Len),
- {ok, StreamPointer} =
- write_streamed_attachment(OutputStream, Fun, Len, nil),
- {Fd, StreamPointer, Len}
+ write_streamed_attachment(OutputStream, Fun, Len)
end).
with_stream(Fd, Fun) ->
{ok, OutputStream} = couch_stream:open(Fd),
- Result = Fun(OutputStream),
- couch_stream:close(OutputStream),
- Result.
+ Fun(OutputStream),
+ {StreamInfo, Len} = couch_stream:close(OutputStream),
+ {Fd, StreamInfo, Len}.
-make_writer_fun(Stream) ->
- % WriterFun({Length, Binary}, State)
- % WriterFun({0, _Footers}, State)
- % Called with Length == 0 on the last time.
- % WriterFun returns NewState.
- fun
- ({0, _Footers}, {FinalLen, SpFin}) ->
- % last block, return the final tuple
- {ok, {FinalLen, SpFin}};
- ({Length, Bin}, {Total, nil}) ->
- % save StreamPointer
- ok = couch_stream:set_min_buffer(Stream, Length),
- {ok, StreamPointer} = couch_stream:write(Stream, Bin),
- {Total+Length, StreamPointer};
- ({Length, Bin}, {Total, SpAcc}) ->
- % write the Bin to disk
- ok = couch_stream:set_min_buffer(Stream, Length),
- {ok, _Sp} = couch_stream:write(Stream, Bin),
- {Total+Length, SpAcc}
- end.
-write_streamed_attachment(_Stream, _F, 0, SpAcc) ->
- {ok, SpAcc};
-write_streamed_attachment(Stream, F, LenLeft, nil) ->
- Bin = F(),
- TruncatedBin = check_bin_length(LenLeft, Bin),
- {ok, SpAcc} = couch_stream:write(Stream, TruncatedBin),
- write_streamed_attachment(Stream, F, LenLeft - size(TruncatedBin), SpAcc);
-write_streamed_attachment(Stream, F, LenLeft, SpAcc) ->
+write_streamed_attachment(_Stream, _F, 0) ->
+ ok;
+write_streamed_attachment(Stream, F, LenLeft) ->
Bin = F(),
- TruncatedBin = check_bin_length(LenLeft, Bin),
- {ok, _} = couch_stream:write(Stream, TruncatedBin),
- write_streamed_attachment(Stream, F, LenLeft - size(TruncatedBin), SpAcc).
+ ok = couch_stream:write(Stream, check_bin_length(LenLeft, Bin)),
+ write_streamed_attachment(Stream, F, LenLeft - size(Bin)).
%% on rare occasions ibrowse seems to process a chunked response incorrectly
%% and include an extra "\r" in the last chunk. This code ensures that we
@@ -857,6 +820,12 @@ doc_meta_info(#doc_info{high_seq=Seq,revs=[#rev_info{rev=Rev}|RestInfo]}, RevTre
true -> [{local_seq, Seq}]
end.
+read_doc(Fd, Pos) when is_integer(Pos) ->
+ couch_file:pread_term(Fd, Pos);
+read_doc(Fd, OldStyleStreamPointer) ->
+ % 09 UPGRADE CODE
+ couch_stream:old_read_term(Fd, OldStyleStreamPointer).
+
doc_to_tree(#doc{revs={Start, RevIds}}=Doc) ->
[Tree] = doc_to_tree_simple(Doc, lists:reverse(RevIds)),
@@ -869,14 +838,13 @@ doc_to_tree_simple(Doc, [RevId | Rest]) ->
[{RevId, ?REV_MISSING, doc_to_tree_simple(Doc, Rest)}].
-make_doc(#db{fd=Fd}=Db, Id, Deleted, Bp, RevisionPath) ->
+make_doc(#db{fd=Fd}, Id, Deleted, Bp, RevisionPath) ->
{BodyData, BinValues} =
case Bp of
nil ->
{[], []};
_ ->
- {ok, {BodyData0, BinValues0}} =
- couch_stream:read_term( Db#db.summary_stream, Bp),
+ {ok, {BodyData0, BinValues0}} = read_doc(Fd, Bp),
{BodyData0,
[{Name,{Type,{Fd,Sp,Len}}} || {Name,{Type,Sp,Len}} <- BinValues0]}
end,