summaryrefslogtreecommitdiff
path: root/src/couchdb/couch_db.erl
diff options
context:
space:
mode:
authorDamien F. Katz <damien@apache.org>2009-05-25 19:52:28 +0000
committerDamien F. Katz <damien@apache.org>2009-05-25 19:52:28 +0000
commit16ccd4c0b8ae4272fa27d32948658b1424a291fc (patch)
treef6d59d017234409436091cc53938b27549d9b54f /src/couchdb/couch_db.erl
parent4aac0f7c6dcd3f3a29cfe5e1bf2bee84b9bae9d5 (diff)
Merging new tail append storage into trunk. Upgrades are automatic, once opened by this version old versions of CouchDB will not be able to open the files. As a precaution, you should back-up your production databases before upgrading.
git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@778485 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src/couchdb/couch_db.erl')
-rw-r--r--src/couchdb/couch_db.erl114
1 files changed, 41 insertions, 73 deletions
diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl
index 29dbbd38..d0a4e34c 100644
--- a/src/couchdb/couch_db.erl
+++ b/src/couchdb/couch_db.erl
@@ -24,7 +24,7 @@
-export([increment_update_seq/1,get_purge_seq/1,purge_docs/2,get_last_purged/1]).
-export([start_link/3,open_doc_int/3,set_admins/2,get_admins/1,ensure_full_commit/1]).
-export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]).
--export([changes_since/5]).
+-export([changes_since/5,read_doc/2]).
-include("couch_db.hrl").
@@ -50,6 +50,7 @@ open_db_file(Filepath, Options) ->
{ok, Fd} ->
?LOG_INFO("Found ~s~s compaction file, using as primary storage.", [Filepath, ".compact"]),
ok = file:rename(Filepath ++ ".compact", Filepath),
+ ok = couch_file:sync(Fd),
{ok, Fd};
{error, enoent} ->
{not_found, no_db_file}
@@ -154,7 +155,7 @@ increment_update_seq(#db{update_pid=UpdatePid}) ->
purge_docs(#db{update_pid=UpdatePid}, IdsRevs) ->
gen_server:call(UpdatePid, {purge_docs, IdsRevs}).
-get_committed_update_seq(#db{header=#db_header{update_seq=Seq}}) ->
+get_committed_update_seq(#db{committed_update_seq=Seq}) ->
Seq.
get_update_seq(#db{update_seq=Seq})->
@@ -565,93 +566,55 @@ flush_binary(Fd, {Fd0, StreamPointer, Len}) when Fd0 == Fd ->
% already written to our file, nothing to write
{Fd, StreamPointer, Len};
+flush_binary(Fd, {OtherFd, StreamPointer, Len}) when is_tuple(StreamPointer) ->
+ {NewStreamData, Len} =
+ couch_stream:old_copy_to_new_stream(OtherFd, StreamPointer, Len, Fd),
+ {Fd, NewStreamData, Len};
+
flush_binary(Fd, {OtherFd, StreamPointer, Len}) ->
- with_stream(Fd, fun(OutputStream) ->
- % written to a different file (or a closed file
- % instance, which will cause an error)
- ok = couch_stream:set_min_buffer(OutputStream, Len),
- {ok, {NewStreamPointer, Len}, _EndSp} =
- couch_stream:foldl(OtherFd, StreamPointer, Len,
- fun(Bin, {BeginPointer, SizeAcc}) ->
- {ok, Pointer} = couch_stream:write(OutputStream, Bin),
- case SizeAcc of
- 0 -> % this was the first write, record the pointer
- {ok, {Pointer, size(Bin)}};
- _ ->
- {ok, {BeginPointer, SizeAcc + size(Bin)}}
- end
- end,
- {{0,0}, 0}),
- {Fd, NewStreamPointer, Len}
- end);
+ {NewStreamData, Len} =
+ couch_stream:copy_to_new_stream(OtherFd, StreamPointer, Fd),
+ {Fd, NewStreamData, Len};
flush_binary(Fd, Bin) when is_binary(Bin) ->
- with_stream(Fd, fun(OutputStream) ->
- ok = couch_stream:set_min_buffer(OutputStream, size(Bin)),
- {ok, StreamPointer} = couch_stream:write(OutputStream, Bin),
- {Fd, StreamPointer, size(Bin)}
+ with_stream(Fd, fun(OutputStream) ->
+ couch_stream:write(OutputStream, Bin)
end);
flush_binary(Fd, {StreamFun, undefined}) when is_function(StreamFun) ->
- % max_attachment_chunk_size control the max we buffer in memory
- MaxChunkSize = list_to_integer(couch_config:get("couchdb",
- "max_attachment_chunk_size","4294967296")),
with_stream(Fd, fun(OutputStream) ->
% StreamFun(MaxChunkSize, WriterFun) must call WriterFun
- % once for each chunk of the attachment.
- WriterFun = make_writer_fun(OutputStream),
- {ok, {TotalLength, NewStreamPointer}} =
- StreamFun(MaxChunkSize, WriterFun, {0, nil}),
- {Fd, NewStreamPointer, TotalLength}
+ % once for each chunk of the attachment,
+ StreamFun(4096,
+ % WriterFun({Length, Binary}, State)
+ % WriterFun({0, _Footers}, State)
+ % Called with Length == 0 on the last time.
+ % WriterFun returns NewState.
+ fun({0, _Footers}, _) ->
+ ok;
+ ({_Length, Bin}, _) ->
+ couch_stream:write(OutputStream, Bin)
+ end, ok)
end);
flush_binary(Fd, {Fun, Len}) when is_function(Fun) ->
with_stream(Fd, fun(OutputStream) ->
- ok = couch_stream:set_min_buffer(OutputStream, Len),
- {ok, StreamPointer} =
- write_streamed_attachment(OutputStream, Fun, Len, nil),
- {Fd, StreamPointer, Len}
+ write_streamed_attachment(OutputStream, Fun, Len)
end).
with_stream(Fd, Fun) ->
{ok, OutputStream} = couch_stream:open(Fd),
- Result = Fun(OutputStream),
- couch_stream:close(OutputStream),
- Result.
+ Fun(OutputStream),
+ {StreamInfo, Len} = couch_stream:close(OutputStream),
+ {Fd, StreamInfo, Len}.
-make_writer_fun(Stream) ->
- % WriterFun({Length, Binary}, State)
- % WriterFun({0, _Footers}, State)
- % Called with Length == 0 on the last time.
- % WriterFun returns NewState.
- fun
- ({0, _Footers}, {FinalLen, SpFin}) ->
- % last block, return the final tuple
- {ok, {FinalLen, SpFin}};
- ({Length, Bin}, {Total, nil}) ->
- % save StreamPointer
- ok = couch_stream:set_min_buffer(Stream, Length),
- {ok, StreamPointer} = couch_stream:write(Stream, Bin),
- {Total+Length, StreamPointer};
- ({Length, Bin}, {Total, SpAcc}) ->
- % write the Bin to disk
- ok = couch_stream:set_min_buffer(Stream, Length),
- {ok, _Sp} = couch_stream:write(Stream, Bin),
- {Total+Length, SpAcc}
- end.
-write_streamed_attachment(_Stream, _F, 0, SpAcc) ->
- {ok, SpAcc};
-write_streamed_attachment(Stream, F, LenLeft, nil) ->
- Bin = F(),
- TruncatedBin = check_bin_length(LenLeft, Bin),
- {ok, SpAcc} = couch_stream:write(Stream, TruncatedBin),
- write_streamed_attachment(Stream, F, LenLeft - size(TruncatedBin), SpAcc);
-write_streamed_attachment(Stream, F, LenLeft, SpAcc) ->
+write_streamed_attachment(_Stream, _F, 0) ->
+ ok;
+write_streamed_attachment(Stream, F, LenLeft) ->
Bin = F(),
- TruncatedBin = check_bin_length(LenLeft, Bin),
- {ok, _} = couch_stream:write(Stream, TruncatedBin),
- write_streamed_attachment(Stream, F, LenLeft - size(TruncatedBin), SpAcc).
+ ok = couch_stream:write(Stream, check_bin_length(LenLeft, Bin)),
+ write_streamed_attachment(Stream, F, LenLeft - size(Bin)).
%% on rare occasions ibrowse seems to process a chunked response incorrectly
%% and include an extra "\r" in the last chunk. This code ensures that we
@@ -857,6 +820,12 @@ doc_meta_info(#doc_info{high_seq=Seq,revs=[#rev_info{rev=Rev}|RestInfo]}, RevTre
true -> [{local_seq, Seq}]
end.
+read_doc(Fd, Pos) when is_integer(Pos) ->
+ couch_file:pread_term(Fd, Pos);
+read_doc(Fd, OldStyleStreamPointer) ->
+ % 09 UPGRADE CODE
+ couch_stream:old_read_term(Fd, OldStyleStreamPointer).
+
doc_to_tree(#doc{revs={Start, RevIds}}=Doc) ->
[Tree] = doc_to_tree_simple(Doc, lists:reverse(RevIds)),
@@ -869,14 +838,13 @@ doc_to_tree_simple(Doc, [RevId | Rest]) ->
[{RevId, ?REV_MISSING, doc_to_tree_simple(Doc, Rest)}].
-make_doc(#db{fd=Fd}=Db, Id, Deleted, Bp, RevisionPath) ->
+make_doc(#db{fd=Fd}, Id, Deleted, Bp, RevisionPath) ->
{BodyData, BinValues} =
case Bp of
nil ->
{[], []};
_ ->
- {ok, {BodyData0, BinValues0}} =
- couch_stream:read_term( Db#db.summary_stream, Bp),
+ {ok, {BodyData0, BinValues0}} = read_doc(Fd, Bp),
{BodyData0,
[{Name,{Type,{Fd,Sp,Len}}} || {Name,{Type,Sp,Len}} <- BinValues0]}
end,