From 16ccd4c0b8ae4272fa27d32948658b1424a291fc Mon Sep 17 00:00:00 2001 From: "Damien F. Katz" Date: Mon, 25 May 2009 19:52:28 +0000 Subject: Merging new tail append storage into trunk. Upgrades are automatic, once opened by this version old versions of CouchDB will not be able to open the files. As a precaution, you should back-up your production databases before upgrading. git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@778485 13f79535-47bb-0310-9956-ffa450edef68 --- src/couchdb/couch_db.erl | 114 +++++++++++++++++------------------------------ 1 file changed, 41 insertions(+), 73 deletions(-) (limited to 'src/couchdb/couch_db.erl') diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl index 29dbbd38..d0a4e34c 100644 --- a/src/couchdb/couch_db.erl +++ b/src/couchdb/couch_db.erl @@ -24,7 +24,7 @@ -export([increment_update_seq/1,get_purge_seq/1,purge_docs/2,get_last_purged/1]). -export([start_link/3,open_doc_int/3,set_admins/2,get_admins/1,ensure_full_commit/1]). -export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]). --export([changes_since/5]). +-export([changes_since/5,read_doc/2]). -include("couch_db.hrl"). @@ -50,6 +50,7 @@ open_db_file(Filepath, Options) -> {ok, Fd} -> ?LOG_INFO("Found ~s~s compaction file, using as primary storage.", [Filepath, ".compact"]), ok = file:rename(Filepath ++ ".compact", Filepath), + ok = couch_file:sync(Fd), {ok, Fd}; {error, enoent} -> {not_found, no_db_file} @@ -154,7 +155,7 @@ increment_update_seq(#db{update_pid=UpdatePid}) -> purge_docs(#db{update_pid=UpdatePid}, IdsRevs) -> gen_server:call(UpdatePid, {purge_docs, IdsRevs}). -get_committed_update_seq(#db{header=#db_header{update_seq=Seq}}) -> +get_committed_update_seq(#db{committed_update_seq=Seq}) -> Seq. get_update_seq(#db{update_seq=Seq})-> @@ -565,93 +566,55 @@ flush_binary(Fd, {Fd0, StreamPointer, Len}) when Fd0 == Fd -> % already written to our file, nothing to write {Fd, StreamPointer, Len}; +flush_binary(Fd, {OtherFd, StreamPointer, Len}) when is_tuple(StreamPointer) -> + {NewStreamData, Len} = + couch_stream:old_copy_to_new_stream(OtherFd, StreamPointer, Len, Fd), + {Fd, NewStreamData, Len}; + flush_binary(Fd, {OtherFd, StreamPointer, Len}) -> - with_stream(Fd, fun(OutputStream) -> - % written to a different file (or a closed file - % instance, which will cause an error) - ok = couch_stream:set_min_buffer(OutputStream, Len), - {ok, {NewStreamPointer, Len}, _EndSp} = - couch_stream:foldl(OtherFd, StreamPointer, Len, - fun(Bin, {BeginPointer, SizeAcc}) -> - {ok, Pointer} = couch_stream:write(OutputStream, Bin), - case SizeAcc of - 0 -> % this was the first write, record the pointer - {ok, {Pointer, size(Bin)}}; - _ -> - {ok, {BeginPointer, SizeAcc + size(Bin)}} - end - end, - {{0,0}, 0}), - {Fd, NewStreamPointer, Len} - end); + {NewStreamData, Len} = + couch_stream:copy_to_new_stream(OtherFd, StreamPointer, Fd), + {Fd, NewStreamData, Len}; flush_binary(Fd, Bin) when is_binary(Bin) -> - with_stream(Fd, fun(OutputStream) -> - ok = couch_stream:set_min_buffer(OutputStream, size(Bin)), - {ok, StreamPointer} = couch_stream:write(OutputStream, Bin), - {Fd, StreamPointer, size(Bin)} + with_stream(Fd, fun(OutputStream) -> + couch_stream:write(OutputStream, Bin) end); flush_binary(Fd, {StreamFun, undefined}) when is_function(StreamFun) -> - % max_attachment_chunk_size control the max we buffer in memory - MaxChunkSize = list_to_integer(couch_config:get("couchdb", - "max_attachment_chunk_size","4294967296")), with_stream(Fd, fun(OutputStream) -> % StreamFun(MaxChunkSize, WriterFun) must call WriterFun - % once for each chunk of the attachment. - WriterFun = make_writer_fun(OutputStream), - {ok, {TotalLength, NewStreamPointer}} = - StreamFun(MaxChunkSize, WriterFun, {0, nil}), - {Fd, NewStreamPointer, TotalLength} + % once for each chunk of the attachment, + StreamFun(4096, + % WriterFun({Length, Binary}, State) + % WriterFun({0, _Footers}, State) + % Called with Length == 0 on the last time. + % WriterFun returns NewState. + fun({0, _Footers}, _) -> + ok; + ({_Length, Bin}, _) -> + couch_stream:write(OutputStream, Bin) + end, ok) end); flush_binary(Fd, {Fun, Len}) when is_function(Fun) -> with_stream(Fd, fun(OutputStream) -> - ok = couch_stream:set_min_buffer(OutputStream, Len), - {ok, StreamPointer} = - write_streamed_attachment(OutputStream, Fun, Len, nil), - {Fd, StreamPointer, Len} + write_streamed_attachment(OutputStream, Fun, Len) end). with_stream(Fd, Fun) -> {ok, OutputStream} = couch_stream:open(Fd), - Result = Fun(OutputStream), - couch_stream:close(OutputStream), - Result. + Fun(OutputStream), + {StreamInfo, Len} = couch_stream:close(OutputStream), + {Fd, StreamInfo, Len}. -make_writer_fun(Stream) -> - % WriterFun({Length, Binary}, State) - % WriterFun({0, _Footers}, State) - % Called with Length == 0 on the last time. - % WriterFun returns NewState. - fun - ({0, _Footers}, {FinalLen, SpFin}) -> - % last block, return the final tuple - {ok, {FinalLen, SpFin}}; - ({Length, Bin}, {Total, nil}) -> - % save StreamPointer - ok = couch_stream:set_min_buffer(Stream, Length), - {ok, StreamPointer} = couch_stream:write(Stream, Bin), - {Total+Length, StreamPointer}; - ({Length, Bin}, {Total, SpAcc}) -> - % write the Bin to disk - ok = couch_stream:set_min_buffer(Stream, Length), - {ok, _Sp} = couch_stream:write(Stream, Bin), - {Total+Length, SpAcc} - end. -write_streamed_attachment(_Stream, _F, 0, SpAcc) -> - {ok, SpAcc}; -write_streamed_attachment(Stream, F, LenLeft, nil) -> - Bin = F(), - TruncatedBin = check_bin_length(LenLeft, Bin), - {ok, SpAcc} = couch_stream:write(Stream, TruncatedBin), - write_streamed_attachment(Stream, F, LenLeft - size(TruncatedBin), SpAcc); -write_streamed_attachment(Stream, F, LenLeft, SpAcc) -> +write_streamed_attachment(_Stream, _F, 0) -> + ok; +write_streamed_attachment(Stream, F, LenLeft) -> Bin = F(), - TruncatedBin = check_bin_length(LenLeft, Bin), - {ok, _} = couch_stream:write(Stream, TruncatedBin), - write_streamed_attachment(Stream, F, LenLeft - size(TruncatedBin), SpAcc). + ok = couch_stream:write(Stream, check_bin_length(LenLeft, Bin)), + write_streamed_attachment(Stream, F, LenLeft - size(Bin)). %% on rare occasions ibrowse seems to process a chunked response incorrectly %% and include an extra "\r" in the last chunk. This code ensures that we @@ -857,6 +820,12 @@ doc_meta_info(#doc_info{high_seq=Seq,revs=[#rev_info{rev=Rev}|RestInfo]}, RevTre true -> [{local_seq, Seq}] end. +read_doc(Fd, Pos) when is_integer(Pos) -> + couch_file:pread_term(Fd, Pos); +read_doc(Fd, OldStyleStreamPointer) -> + % 09 UPGRADE CODE + couch_stream:old_read_term(Fd, OldStyleStreamPointer). + doc_to_tree(#doc{revs={Start, RevIds}}=Doc) -> [Tree] = doc_to_tree_simple(Doc, lists:reverse(RevIds)), @@ -869,14 +838,13 @@ doc_to_tree_simple(Doc, [RevId | Rest]) -> [{RevId, ?REV_MISSING, doc_to_tree_simple(Doc, Rest)}]. -make_doc(#db{fd=Fd}=Db, Id, Deleted, Bp, RevisionPath) -> +make_doc(#db{fd=Fd}, Id, Deleted, Bp, RevisionPath) -> {BodyData, BinValues} = case Bp of nil -> {[], []}; _ -> - {ok, {BodyData0, BinValues0}} = - couch_stream:read_term( Db#db.summary_stream, Bp), + {ok, {BodyData0, BinValues0}} = read_doc(Fd, Bp), {BodyData0, [{Name,{Type,{Fd,Sp,Len}}} || {Name,{Type,Sp,Len}} <- BinValues0]} end, -- cgit v1.2.3