From 16ccd4c0b8ae4272fa27d32948658b1424a291fc Mon Sep 17 00:00:00 2001 From: "Damien F. Katz" Date: Mon, 25 May 2009 19:52:28 +0000 Subject: Merging new tail append storage into trunk. Upgrades are automatic, once opened by this version old versions of CouchDB will not be able to open the files. As a precaution, you should back-up your production databases before upgrading. git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@778485 13f79535-47bb-0310-9956-ffa450edef68 --- src/couchdb/couch_db.erl | 114 ++++------- src/couchdb/couch_db.hrl | 10 +- src/couchdb/couch_db_updater.erl | 148 +++++++++----- src/couchdb/couch_doc.erl | 18 +- src/couchdb/couch_file.erl | 421 ++++++++++++++++++++++++++------------- src/couchdb/couch_httpd_db.erl | 7 +- src/couchdb/couch_stream.erl | 316 +++++++++++++---------------- src/couchdb/couch_view_group.erl | 10 +- 8 files changed, 571 insertions(+), 473 deletions(-) (limited to 'src') diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl index 29dbbd38..d0a4e34c 100644 --- a/src/couchdb/couch_db.erl +++ b/src/couchdb/couch_db.erl @@ -24,7 +24,7 @@ -export([increment_update_seq/1,get_purge_seq/1,purge_docs/2,get_last_purged/1]). -export([start_link/3,open_doc_int/3,set_admins/2,get_admins/1,ensure_full_commit/1]). -export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]). --export([changes_since/5]). +-export([changes_since/5,read_doc/2]). -include("couch_db.hrl"). @@ -50,6 +50,7 @@ open_db_file(Filepath, Options) -> {ok, Fd} -> ?LOG_INFO("Found ~s~s compaction file, using as primary storage.", [Filepath, ".compact"]), ok = file:rename(Filepath ++ ".compact", Filepath), + ok = couch_file:sync(Fd), {ok, Fd}; {error, enoent} -> {not_found, no_db_file} @@ -154,7 +155,7 @@ increment_update_seq(#db{update_pid=UpdatePid}) -> purge_docs(#db{update_pid=UpdatePid}, IdsRevs) -> gen_server:call(UpdatePid, {purge_docs, IdsRevs}). -get_committed_update_seq(#db{header=#db_header{update_seq=Seq}}) -> +get_committed_update_seq(#db{committed_update_seq=Seq}) -> Seq. get_update_seq(#db{update_seq=Seq})-> @@ -565,93 +566,55 @@ flush_binary(Fd, {Fd0, StreamPointer, Len}) when Fd0 == Fd -> % already written to our file, nothing to write {Fd, StreamPointer, Len}; +flush_binary(Fd, {OtherFd, StreamPointer, Len}) when is_tuple(StreamPointer) -> + {NewStreamData, Len} = + couch_stream:old_copy_to_new_stream(OtherFd, StreamPointer, Len, Fd), + {Fd, NewStreamData, Len}; + flush_binary(Fd, {OtherFd, StreamPointer, Len}) -> - with_stream(Fd, fun(OutputStream) -> - % written to a different file (or a closed file - % instance, which will cause an error) - ok = couch_stream:set_min_buffer(OutputStream, Len), - {ok, {NewStreamPointer, Len}, _EndSp} = - couch_stream:foldl(OtherFd, StreamPointer, Len, - fun(Bin, {BeginPointer, SizeAcc}) -> - {ok, Pointer} = couch_stream:write(OutputStream, Bin), - case SizeAcc of - 0 -> % this was the first write, record the pointer - {ok, {Pointer, size(Bin)}}; - _ -> - {ok, {BeginPointer, SizeAcc + size(Bin)}} - end - end, - {{0,0}, 0}), - {Fd, NewStreamPointer, Len} - end); + {NewStreamData, Len} = + couch_stream:copy_to_new_stream(OtherFd, StreamPointer, Fd), + {Fd, NewStreamData, Len}; flush_binary(Fd, Bin) when is_binary(Bin) -> - with_stream(Fd, fun(OutputStream) -> - ok = couch_stream:set_min_buffer(OutputStream, size(Bin)), - {ok, StreamPointer} = couch_stream:write(OutputStream, Bin), - {Fd, StreamPointer, size(Bin)} + with_stream(Fd, fun(OutputStream) -> + couch_stream:write(OutputStream, Bin) end); flush_binary(Fd, {StreamFun, undefined}) when is_function(StreamFun) -> - % max_attachment_chunk_size control the max we buffer in memory - MaxChunkSize = list_to_integer(couch_config:get("couchdb", - "max_attachment_chunk_size","4294967296")), with_stream(Fd, fun(OutputStream) -> % StreamFun(MaxChunkSize, WriterFun) must call WriterFun - % once for each chunk of the attachment. - WriterFun = make_writer_fun(OutputStream), - {ok, {TotalLength, NewStreamPointer}} = - StreamFun(MaxChunkSize, WriterFun, {0, nil}), - {Fd, NewStreamPointer, TotalLength} + % once for each chunk of the attachment, + StreamFun(4096, + % WriterFun({Length, Binary}, State) + % WriterFun({0, _Footers}, State) + % Called with Length == 0 on the last time. + % WriterFun returns NewState. + fun({0, _Footers}, _) -> + ok; + ({_Length, Bin}, _) -> + couch_stream:write(OutputStream, Bin) + end, ok) end); flush_binary(Fd, {Fun, Len}) when is_function(Fun) -> with_stream(Fd, fun(OutputStream) -> - ok = couch_stream:set_min_buffer(OutputStream, Len), - {ok, StreamPointer} = - write_streamed_attachment(OutputStream, Fun, Len, nil), - {Fd, StreamPointer, Len} + write_streamed_attachment(OutputStream, Fun, Len) end). with_stream(Fd, Fun) -> {ok, OutputStream} = couch_stream:open(Fd), - Result = Fun(OutputStream), - couch_stream:close(OutputStream), - Result. + Fun(OutputStream), + {StreamInfo, Len} = couch_stream:close(OutputStream), + {Fd, StreamInfo, Len}. -make_writer_fun(Stream) -> - % WriterFun({Length, Binary}, State) - % WriterFun({0, _Footers}, State) - % Called with Length == 0 on the last time. - % WriterFun returns NewState. - fun - ({0, _Footers}, {FinalLen, SpFin}) -> - % last block, return the final tuple - {ok, {FinalLen, SpFin}}; - ({Length, Bin}, {Total, nil}) -> - % save StreamPointer - ok = couch_stream:set_min_buffer(Stream, Length), - {ok, StreamPointer} = couch_stream:write(Stream, Bin), - {Total+Length, StreamPointer}; - ({Length, Bin}, {Total, SpAcc}) -> - % write the Bin to disk - ok = couch_stream:set_min_buffer(Stream, Length), - {ok, _Sp} = couch_stream:write(Stream, Bin), - {Total+Length, SpAcc} - end. -write_streamed_attachment(_Stream, _F, 0, SpAcc) -> - {ok, SpAcc}; -write_streamed_attachment(Stream, F, LenLeft, nil) -> - Bin = F(), - TruncatedBin = check_bin_length(LenLeft, Bin), - {ok, SpAcc} = couch_stream:write(Stream, TruncatedBin), - write_streamed_attachment(Stream, F, LenLeft - size(TruncatedBin), SpAcc); -write_streamed_attachment(Stream, F, LenLeft, SpAcc) -> +write_streamed_attachment(_Stream, _F, 0) -> + ok; +write_streamed_attachment(Stream, F, LenLeft) -> Bin = F(), - TruncatedBin = check_bin_length(LenLeft, Bin), - {ok, _} = couch_stream:write(Stream, TruncatedBin), - write_streamed_attachment(Stream, F, LenLeft - size(TruncatedBin), SpAcc). + ok = couch_stream:write(Stream, check_bin_length(LenLeft, Bin)), + write_streamed_attachment(Stream, F, LenLeft - size(Bin)). %% on rare occasions ibrowse seems to process a chunked response incorrectly %% and include an extra "\r" in the last chunk. This code ensures that we @@ -857,6 +820,12 @@ doc_meta_info(#doc_info{high_seq=Seq,revs=[#rev_info{rev=Rev}|RestInfo]}, RevTre true -> [{local_seq, Seq}] end. +read_doc(Fd, Pos) when is_integer(Pos) -> + couch_file:pread_term(Fd, Pos); +read_doc(Fd, OldStyleStreamPointer) -> + % 09 UPGRADE CODE + couch_stream:old_read_term(Fd, OldStyleStreamPointer). + doc_to_tree(#doc{revs={Start, RevIds}}=Doc) -> [Tree] = doc_to_tree_simple(Doc, lists:reverse(RevIds)), @@ -869,14 +838,13 @@ doc_to_tree_simple(Doc, [RevId | Rest]) -> [{RevId, ?REV_MISSING, doc_to_tree_simple(Doc, Rest)}]. -make_doc(#db{fd=Fd}=Db, Id, Deleted, Bp, RevisionPath) -> +make_doc(#db{fd=Fd}, Id, Deleted, Bp, RevisionPath) -> {BodyData, BinValues} = case Bp of nil -> {[], []}; _ -> - {ok, {BodyData0, BinValues0}} = - couch_stream:read_term( Db#db.summary_stream, Bp), + {ok, {BodyData0, BinValues0}} = read_doc(Fd, Bp), {BodyData0, [{Name,{Type,{Fd,Sp,Len}}} || {Name,{Type,Sp,Len}} <- BinValues0]} end, diff --git a/src/couchdb/couch_db.hrl b/src/couchdb/couch_db.hrl index 0f7e344e..0d28b2fd 100644 --- a/src/couchdb/couch_db.hrl +++ b/src/couchdb/couch_db.hrl @@ -109,13 +109,12 @@ % if the disk revision is incremented, then new upgrade logic will need to be % added to couch_db_updater:init_db. --define(DISK_VERSION_0_9, 1). --define(LATEST_DISK_VERSION, 2). +-define(LATEST_DISK_VERSION, 3). -record(db_header, {disk_version = ?LATEST_DISK_VERSION, update_seq = 0, - summary_stream_state = nil, + unused = 0, fulldocinfo_by_id_btree_state = nil, docinfo_by_seq_btree_state = nil, local_docs_btree_state = nil, @@ -133,7 +132,7 @@ fd, fd_ref_counter, header = #db_header{}, - summary_stream, + committed_update_seq, fulldocinfo_by_id_btree, docinfo_by_seq_btree, local_docs_btree, @@ -145,7 +144,8 @@ admins_ptr = nil, user_ctx = #user_ctx{}, waiting_delayed_commit = nil, - revs_limit = 1000 + revs_limit = 1000, + fsync_options = [] }). diff --git a/src/couchdb/couch_db_updater.erl b/src/couchdb/couch_db_updater.erl index b1cb9037..31ddbf8c 100644 --- a/src/couchdb/couch_db_updater.erl +++ b/src/couchdb/couch_db_updater.erl @@ -19,18 +19,18 @@ -include("couch_db.hrl"). --define(HEADER_SIG, <<$g, $m, $k, 0>>). init({MainPid, DbName, Filepath, Fd, Options}) -> case lists:member(create, Options) of true -> % create a new header and writes it to the file Header = #db_header{}, - ok = couch_file:write_header(Fd, ?HEADER_SIG, Header), + ok = couch_file:write_header(Fd, Header), % delete any old compaction files that might be hanging around file:delete(Filepath ++ ".compact"); false -> - {ok, Header} = couch_file:read_header(Fd, ?HEADER_SIG) + ok = couch_file:upgrade_old_header(Fd, <<$g, $m, $k, 0>>), % 09 UPGRADE CODE + {ok, Header} = couch_file:read_header(Fd) end, Db = init_db(DbName, Filepath, Fd, Header), @@ -56,7 +56,7 @@ handle_call({update_docs, DocActions, Options}, _From, Db) -> end; handle_call(full_commit, _From, #db{waiting_delayed_commit=nil}=Db) -> {reply, ok, Db}; % no data waiting, return ok immediately -handle_call(full_commit, _From, Db) -> +handle_call(full_commit, _From, Db) -> {reply, ok, commit_data(Db)}; % commit the data and return ok handle_call(increment_update_seq, _From, Db) -> Db2 = commit_data(Db#db{update_seq=Db#db.update_seq+1}), @@ -158,7 +158,7 @@ handle_cast(start_compact, Db) -> end; handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath}=Db) -> {ok, NewFd} = couch_file:open(CompactFilepath), - {ok, NewHeader} = couch_file:read_header(NewFd, ?HEADER_SIG), + {ok, NewHeader} = couch_file:read_header(NewFd), #db{update_seq=NewSeq} = NewDb = init_db(Db#db.name, Filepath, NewFd, NewHeader), unlink(NewFd), @@ -191,7 +191,7 @@ handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath}=Db) -> end. handle_info(delayed_commit, Db) -> - {noreply, commit_data(Db#db{waiting_delayed_commit=nil})}. + {noreply, commit_data(Db)}. code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -214,6 +214,7 @@ btree_by_seq_join(KeySeq, {Id, RevInfos, DeletedRevInfos}) -> [#rev_info{rev=Rev,seq=Seq,deleted=true,body_sp = Bp} || {Rev, Seq, Bp} <- DeletedRevInfos]}; btree_by_seq_join(KeySeq,{Id, Rev, Bp, Conflicts, DelConflicts, Deleted}) -> + % 09 UPGRADE CODE % this is the 0.9.0 and earlier by_seq record. It's missing the body pointers % and individual seq nums for conflicts that are currently in the index, % meaning the filtered _changes api will not work except for on main docs. @@ -244,6 +245,7 @@ btree_by_id_join(Id, {HighSeq, Deleted, DiskTree}) -> (_RevId, ?REV_MISSING) -> ?REV_MISSING; (_RevId, {IsDeleted, BodyPointer}) -> + % 09 UPGRADE CODE % this is the 0.9.0 and earlier rev info record. It's missing the seq % nums, which means couchdb will sometimes reexamine unchanged % documents with the _changes API. @@ -280,17 +282,27 @@ less_docid(nil, _) -> true; % nil - special key sorts before all less_docid({}, _) -> false; % {} -> special key sorts after all less_docid(A, B) -> A < B. + init_db(DbName, Filepath, Fd, Header0) -> - case element(2, Header0) of - ?DISK_VERSION_0_9 -> ok; % no problem, all records upgrade on the fly - ?LATEST_DISK_VERSION -> ok; + Header1 = simple_upgrade_record(Header0, #db_header{}), + Header = + case element(2, Header1) of + 1 -> Header1#db_header{unused = 0}; % 0.9 + 2 -> Header1#db_header{unused = 0}; % post 0.9 and pre 0.10 + ?LATEST_DISK_VERSION -> Header1; _ -> throw({database_disk_version_error, "Incorrect disk header version"}) end, - Header = simple_upgrade_record(Header0, #db_header{}), - {ok, SummaryStream} = couch_stream:open(Header#db_header.summary_stream_state, Fd), - ok = couch_stream:set_min_buffer(SummaryStream, 10000), Less = fun less_docid/2, - + + {ok, FsyncOptions} = couch_util:parse_term( + couch_config:get("couchdb", "fsync_options", + "[before_header, after_header, on_file_open]")), + + case lists:member(on_file_open, FsyncOptions) of + true -> ok = couch_file:sync(Fd); + _ -> ok + end, + {ok, IdBtree} = couch_btree:open(Header#db_header.fulldocinfo_by_id_btree_state, Fd, [{split, fun(X) -> btree_by_id_split(X) end}, {join, fun(X,Y) -> btree_by_id_join(X,Y) end}, @@ -308,7 +320,6 @@ init_db(DbName, Filepath, Fd, Header0) -> AdminsPtr -> {ok, Admins} = couch_file:pread_term(Fd, AdminsPtr) end, - % convert start time tuple to microsecs and store as a binary string {MegaSecs, Secs, MicroSecs} = now(), StartTime = ?l2b(io_lib:format("~p", @@ -319,22 +330,22 @@ init_db(DbName, Filepath, Fd, Header0) -> fd=Fd, fd_ref_counter = RefCntr, header=Header, - summary_stream = SummaryStream, fulldocinfo_by_id_btree = IdBtree, docinfo_by_seq_btree = SeqBtree, local_docs_btree = LocalDocsBtree, + committed_update_seq = Header#db_header.update_seq, update_seq = Header#db_header.update_seq, name = DbName, filepath = Filepath, admins = Admins, admins_ptr = AdminsPtr, instance_start_time = StartTime, - revs_limit = Header#db_header.revs_limit + revs_limit = Header#db_header.revs_limit, + fsync_options = FsyncOptions }. -close_db(#db{ fd_ref_counter = RefCntr, summary_stream = SummaryStream}) -> - couch_stream:close(SummaryStream), +close_db(#db{fd_ref_counter = RefCntr}) -> couch_ref_counter:drop(RefCntr). @@ -387,7 +398,7 @@ flush_trees(#db{fd=Fd}=Db, [InfoUnflushed | RestUnflushed], AccFlushed) -> ?LOG_DEBUG("File where the attachments are written has changed. Possibly retrying.", []), throw(retry) end, - {ok, NewSummaryPointer} = couch_stream:write_term(Db#db.summary_stream, {Doc#doc.body, Bins}), + {ok, NewSummaryPointer} = couch_file:append_term(Fd, {Doc#doc.body, Bins}), {IsDeleted, NewSummaryPointer, UpdateSeq}; _ -> Value @@ -549,18 +560,18 @@ update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) -> commit_data(Db) -> commit_data(Db, false). - -commit_data(#db{fd=Fd, header=Header} = Db, Delay) -> - Header2 = Header#db_header{ +db_to_header(Db, Header) -> + Header#db_header{ update_seq = Db#db.update_seq, - summary_stream_state = couch_stream:get_state(Db#db.summary_stream), docinfo_by_seq_btree_state = couch_btree:get_state(Db#db.docinfo_by_seq_btree), fulldocinfo_by_id_btree_state = couch_btree:get_state(Db#db.fulldocinfo_by_id_btree), local_docs_btree_state = couch_btree:get_state(Db#db.local_docs_btree), admins_ptr = Db#db.admins_ptr, - revs_limit = Db#db.revs_limit - }, - if Header == Header2 -> + revs_limit = Db#db.revs_limit}. + +commit_data(#db{fd=Fd,header=OldHeader,fsync_options=FsyncOptions}=Db, Delay) -> + Header = db_to_header(Db, OldHeader), + if OldHeader == Header -> Db; Delay and (Db#db.waiting_delayed_commit == nil) -> Db#db{waiting_delayed_commit= @@ -575,43 +586,75 @@ commit_data(#db{fd=Fd, header=Header} = Db, Delay) -> end; true -> ok end, - ok = couch_file:write_header(Fd, ?HEADER_SIG, Header2), - Db#db{waiting_delayed_commit=nil,header=Header2} + case lists:member(before_header, FsyncOptions) of + true -> ok = couch_file:sync(Fd); + _ -> ok + end, + + ok = couch_file:write_header(Fd, Header), + + case lists:member(after_header, FsyncOptions) of + true -> ok = couch_file:sync(Fd); + _ -> ok + end, + + Db#db{waiting_delayed_commit=nil, + header=Header, + committed_update_seq=Db#db.update_seq} end. -copy_raw_doc(SrcFd, SrcSp, DestFd, DestStream) -> - {ok, {BodyData, BinInfos}} = couch_stream:read_term(SrcFd, SrcSp), + +copy_doc_attachments(SrcFd, SrcSp, DestFd) -> + {ok, {BodyData, BinInfos}} = couch_db:read_doc(SrcFd, SrcSp), % copy the bin values - NewBinInfos = lists:map(fun({Name, {Type, BinSp, Len}}) -> - {ok, NewBinSp} = couch_stream:copy_to_new_stream(SrcFd, BinSp, Len, DestFd), - {Name, {Type, NewBinSp, Len}} + NewBinInfos = lists:map( + fun({Name, {Type, BinSp, Len}}) when is_tuple(BinSp) orelse BinSp == null -> + % 09 UPGRADE CODE + {NewBinSp, Len} = couch_stream:old_copy_to_new_stream(SrcFd, BinSp, Len, DestFd), + {Name, {Type, NewBinSp, Len}}; + ({Name, {Type, BinSp, Len}}) -> + {NewBinSp, Len} = couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd), + {Name, {Type, NewBinSp, Len}} end, BinInfos), - % now write the document summary - {ok, Sp} = couch_stream:write_term(DestStream, {BodyData, NewBinInfos}), - Sp. + {BodyData, NewBinInfos}. -copy_rev_tree(_SrcFd, _DestFd, _DestStream, []) -> +copy_rev_tree_attachments(_SrcFd, _DestFd, []) -> []; -copy_rev_tree(SrcFd, DestFd, DestStream, [{Start, Tree} | RestTree]) -> +copy_rev_tree_attachments(SrcFd, DestFd, [{Start, Tree} | RestTree]) -> % root nner node, only copy info/data from leaf nodes - [Tree2] = copy_rev_tree(SrcFd, DestFd, DestStream, [Tree]), - [{Start, Tree2} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTree)]; -copy_rev_tree(SrcFd, DestFd, DestStream, [{RevId, {IsDel, Sp, Seq}, []} | RestTree]) -> + [Tree2] = copy_rev_tree_attachments(SrcFd, DestFd, [Tree]), + [{Start, Tree2} | copy_rev_tree_attachments(SrcFd, DestFd, RestTree)]; +copy_rev_tree_attachments(SrcFd, DestFd, [{RevId, {IsDel, Sp, Seq}, []} | RestTree]) -> % This is a leaf node, copy it over - NewSp = copy_raw_doc(SrcFd, Sp, DestFd, DestStream), - [{RevId, {IsDel, NewSp, Seq}, []} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTree)]; -copy_rev_tree(SrcFd, DestFd, DestStream, [{RevId, _, SubTree} | RestTree]) -> + DocBody = copy_doc_attachments(SrcFd, Sp, DestFd), + [{RevId, {IsDel, DocBody, Seq}, []} | copy_rev_tree_attachments(SrcFd, DestFd, RestTree)]; +copy_rev_tree_attachments(SrcFd, DestFd, [{RevId, _, SubTree} | RestTree]) -> % inner node, only copy info/data from leaf nodes - [{RevId, ?REV_MISSING, copy_rev_tree(SrcFd, DestFd, DestStream, SubTree)} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTree)]. + [{RevId, ?REV_MISSING, copy_rev_tree_attachments(SrcFd, DestFd, SubTree)} | copy_rev_tree_attachments(SrcFd, DestFd, RestTree)]. + -copy_docs(#db{fd=SrcFd}=Db, #db{fd=DestFd,summary_stream=DestStream}=NewDb, InfoBySeq, Retry) -> +copy_docs(#db{fd=SrcFd}=Db, #db{fd=DestFd}=NewDb, InfoBySeq, Retry) -> Ids = [Id || #doc_info{id=Id} <- InfoBySeq], LookupResults = couch_btree:lookup(Db#db.fulldocinfo_by_id_btree, Ids), + + % write out the attachments NewFullDocInfos0 = lists:map( fun({ok, #full_doc_info{rev_tree=RevTree}=Info}) -> - Info#full_doc_info{rev_tree=copy_rev_tree(SrcFd, DestFd, DestStream, RevTree)} + Info#full_doc_info{rev_tree=copy_rev_tree_attachments(SrcFd, DestFd, RevTree)} end, LookupResults), - NewFullDocInfos = stem_full_doc_infos(Db, NewFullDocInfos0), + % write out the docs + % we do this in 2 stages so the docs are written out contiguously, making + % view indexing and replication faster. + NewFullDocInfos1 = lists:map( + fun(#full_doc_info{rev_tree=RevTree}=Info) -> + Info#full_doc_info{rev_tree=couch_key_tree:map_leafs( + fun(_Key, {IsDel, DocBody, Seq}) -> + {ok, Pos} = couch_file:append_term(DestFd, DocBody), + {IsDel, Pos, Seq} + end, RevTree)} + end, NewFullDocInfos0), + + NewFullDocInfos = stem_full_doc_infos(Db, NewFullDocInfos1), NewDocInfos = [couch_doc:to_doc_info(Info) || Info <- NewFullDocInfos], RemoveSeqs = case Retry of @@ -633,7 +676,9 @@ copy_docs(#db{fd=SrcFd}=Db, #db{fd=DestFd,summary_stream=DestStream}=NewDb, Info -copy_compact(Db, NewDb, Retry) -> +copy_compact(Db, NewDb0, Retry) -> + FsyncOptions = [Op || Op <- NewDb0#db.fsync_options, Op == before_header], + NewDb = NewDb0#db{fsync_options=FsyncOptions}, TotalChanges = couch_db:count_changes_since(Db, NewDb#db.update_seq), EnumBySeqFun = fun(#doc_info{high_seq=Seq}=DocInfo, _Offset, {AccNewDb, AccUncopied, TotalCopied}) -> @@ -677,15 +722,14 @@ start_copy_compact(#db{name=Name,filepath=Filepath}=Db) -> {ok, Fd} -> couch_task_status:add_task(<<"Database Compaction">>, <>, <<"Starting">>), Retry = true, - {ok, Header} = couch_file:read_header(Fd, ?HEADER_SIG); + {ok, Header} = couch_file:read_header(Fd); {error, enoent} -> couch_task_status:add_task(<<"Database Compaction">>, Name, <<"Starting">>), {ok, Fd} = couch_file:open(CompactFile, [create]), Retry = false, - ok = couch_file:write_header(Fd, ?HEADER_SIG, Header=#db_header{}) + ok = couch_file:write_header(Fd, Header=#db_header{}) end, NewDb = init_db(Name, CompactFile, Fd, Header), - unlink(Fd), NewDb2 = copy_compact(Db, NewDb, Retry), gen_server:cast(Db#db.update_pid, {compact_done, CompactFile}), diff --git a/src/couchdb/couch_doc.erl b/src/couchdb/couch_doc.erl index f3a003e1..8018a1e4 100644 --- a/src/couchdb/couch_doc.erl +++ b/src/couchdb/couch_doc.erl @@ -252,13 +252,12 @@ to_doc_info_path(#full_doc_info{id=Id,rev_tree=Tree}) -> bin_foldl(Bin, Fun, Acc) when is_binary(Bin) -> - case Fun(Bin, Acc) of - {ok, Acc2} -> {ok, Acc2}; - {done, Acc2} -> {ok, Acc2} - end; -bin_foldl({Fd, Sp, Len}, Fun, Acc) -> - {ok, Acc2, _Sp2} = couch_stream:foldl(Fd, Sp, Len, Fun, Acc), - {ok, Acc2}. + Fun(Bin, Acc); +bin_foldl({Fd, Sp, Len}, Fun, Acc) when is_tuple(Sp) orelse Sp == null -> + % 09 UPGRADE CODE + couch_stream:old_foldl(Fd, Sp, Len, Fun, Acc); +bin_foldl({Fd, Sp, _Len}, Fun, Acc) -> + couch_stream:foldl(Fd, Sp, Fun, Acc). bin_size(Bin) when is_binary(Bin) -> size(Bin); @@ -267,9 +266,8 @@ bin_size({_Fd, _Sp, Len}) -> bin_to_binary(Bin) when is_binary(Bin) -> Bin; -bin_to_binary({Fd, Sp, Len}) -> - {ok, Bin, _Sp2} = couch_stream:read(Fd, Sp, Len), - Bin. +bin_to_binary({Fd, Sp, _Len}) -> + couch_stream:foldl(Fd, Sp, fun(Bin, Acc) -> [Bin|Acc] end, []). get_validate_doc_fun(#doc{body={Props}}) -> Lang = proplists:get_value(<<"language">>, Props, <<"javascript">>), diff --git a/src/couchdb/couch_file.erl b/src/couchdb/couch_file.erl index 430aa6b7..c773be98 100644 --- a/src/couchdb/couch_file.erl +++ b/src/couchdb/couch_file.erl @@ -15,10 +15,16 @@ -include("couch_db.hrl"). --define(HEADER_SIZE, 2048). % size of each segment of the doubly written header +-define(SIZE_BLOCK, 4096). + +-record(file, { + fd, + tail_append_begin=0 % 09 UPGRADE CODE + }). --export([open/1, open/2, close/1, pread/3, pwrite/3, expand/2, bytes/1, sync/1]). --export([append_term/2, pread_term/2,write_header/3, read_header/2, truncate/2]). +-export([open/1, open/2, close/1, bytes/1, sync/1, append_binary/2,old_pread/3]). +-export([append_term/2, pread_term/2, pread_iolist/2, write_header/2]). +-export([pread_binary/2, read_header/1, truncate/2, upgrade_old_header/2]). -export([init/1, terminate/2, handle_call/3, handle_cast/2, code_change/3, handle_info/2]). %%---------------------------------------------------------------------- @@ -51,39 +57,6 @@ open(Filepath, Options) -> end. -%%---------------------------------------------------------------------- -%% Args: Pos is the offset from the beginning of the file, Bytes is -%% is the number of bytes to read. -%% Returns: {ok, Binary} where Binary is a binary data from disk -%% or {error, Reason}. -%%---------------------------------------------------------------------- - -pread(Fd, Pos, Bytes) when Bytes > 0 -> - gen_server:call(Fd, {pread, Pos, Bytes}, infinity). - - -%%---------------------------------------------------------------------- -%% Args: Pos is the offset from the beginning of the file, Bin is -%% is the binary to write -%% Returns: ok -%% or {error, Reason}. -%%---------------------------------------------------------------------- - -pwrite(Fd, Pos, Bin) -> - gen_server:call(Fd, {pwrite, Pos, Bin}, infinity). - -%%---------------------------------------------------------------------- -%% Purpose: To append a segment of zeros to the end of the file. -%% Args: Bytes is the number of bytes to append to the file. -%% Returns: {ok, Pos} where Pos is the file offset to the beginning of -%% the new segments. -%% or {error, Reason}. -%%---------------------------------------------------------------------- - -expand(Fd, Bytes) when Bytes > 0 -> - gen_server:call(Fd, {expand, Bytes}, infinity). - - %%---------------------------------------------------------------------- %% Purpose: To append an Erlang term to the end of the file. %% Args: Erlang term to serialize and append to the file. @@ -93,7 +66,7 @@ expand(Fd, Bytes) when Bytes > 0 -> %%---------------------------------------------------------------------- append_term(Fd, Term) -> - append_binary(Fd, term_to_binary(Term, [compressed])). + append_binary(Fd, term_to_binary(Term)). %%---------------------------------------------------------------------- @@ -105,7 +78,8 @@ append_term(Fd, Term) -> %%---------------------------------------------------------------------- append_binary(Fd, Bin) -> - gen_server:call(Fd, {append_bin, Bin}, infinity). + Size = iolist_size(Bin), + gen_server:call(Fd, {append_bin, [<>, Bin]}, infinity). %%---------------------------------------------------------------------- @@ -115,10 +89,12 @@ append_binary(Fd, Bin) -> %% or {error, Reason}. %%---------------------------------------------------------------------- + pread_term(Fd, Pos) -> {ok, Bin} = pread_binary(Fd, Pos), {ok, binary_to_term(Bin)}. + %%---------------------------------------------------------------------- %% Purpose: Reads a binrary from a file that was written with append_binary %% Args: Pos, the offset into the file where the term is serialized. @@ -127,8 +103,26 @@ pread_term(Fd, Pos) -> %%---------------------------------------------------------------------- pread_binary(Fd, Pos) -> - gen_server:call(Fd, {pread_bin, Pos}, infinity). - + {ok, L} = pread_iolist(Fd, Pos), + {ok, iolist_to_binary(L)}. + +pread_iolist(Fd, Pos) -> + {ok, LenIolist, NextPos} =read_raw_iolist(Fd, Pos, 4), + <> = iolist_to_binary(LenIolist), + {ok, Iolist, _} = read_raw_iolist(Fd, NextPos, Len), + {ok, Iolist}. + +read_raw_iolist(Fd, Pos, Len) -> + BlockOffset = Pos rem ?SIZE_BLOCK, + TotalBytes = calculate_total_read_len(BlockOffset, Len), + {ok, <>, HasPrefixes} = gen_server:call(Fd, {pread, Pos, TotalBytes}, infinity), + if HasPrefixes -> + {ok, remove_block_prefixes(BlockOffset, RawBin), Pos + TotalBytes}; + true -> + % 09 UPGRADE CODE + <> = RawBin, + {ok, [ReturnBin], Pos + Len} + end. %%---------------------------------------------------------------------- %% Purpose: The length of a file, in bytes. @@ -167,35 +161,153 @@ close(Fd) -> catch unlink(Fd), Result. +% 09 UPGRADE CODE +old_pread(Fd, Pos, Len) -> + {ok, <>, false} = gen_server:call(Fd, {pread, Pos, Len}, infinity), + {ok, RawBin}. -write_header(Fd, Prefix, Data) -> - TermBin = term_to_binary(Data), - % the size of all the bytes written to the header, including the md5 signature (16 bytes) - FilledSize = size(Prefix) + size(TermBin) + 16, - {TermBin2, FilledSize2} = - case FilledSize > ?HEADER_SIZE of +% 09 UPGRADE CODE +upgrade_old_header(Fd, Sig) -> + gen_server:call(Fd, {upgrade_old_header, Sig}, infinity). + + +read_header(Fd) -> + case gen_server:call(Fd, find_header, infinity) of + {ok, Bin} -> + {ok, binary_to_term(Bin)}; + Else -> + Else + end. + +write_header(Fd, Data) -> + Bin = term_to_binary(Data), + Md5 = erlang:md5(Bin), + % now we assemble the final header binary and write to disk + FinalBin = <>, + gen_server:call(Fd, {write_header, FinalBin}, infinity). + + + + +init_status_error(ReturnPid, Ref, Error) -> + ReturnPid ! {Ref, self(), Error}, + ignore. + +% server functions + +init({Filepath, Options, ReturnPid, Ref}) -> + case lists:member(create, Options) of true -> - % too big! - {ok, Pos} = append_binary(Fd, TermBin), - PtrBin = term_to_binary({pointer_to_header_data, Pos}), - {PtrBin, size(Prefix) + size(PtrBin) + 16}; + filelib:ensure_dir(Filepath), + case file:open(Filepath, [read, write, raw, binary]) of + {ok, Fd} -> + {ok, Length} = file:position(Fd, eof), + case Length > 0 of + true -> + % this means the file already exists and has data. + % FYI: We don't differentiate between empty files and non-existant + % files here. + case lists:member(overwrite, Options) of + true -> + {ok, 0} = file:position(Fd, 0), + ok = file:truncate(Fd), + ok = file:sync(Fd), + couch_stats_collector:track_process_count( + {couchdb, open_os_files}), + {ok, #file{fd=Fd}}; + false -> + ok = file:close(Fd), + init_status_error(ReturnPid, Ref, file_exists) + end; + false -> + couch_stats_collector:track_process_count( + {couchdb, open_os_files}), + {ok, #file{fd=Fd}} + end; + Error -> + init_status_error(ReturnPid, Ref, Error) + end; false -> - {TermBin, FilledSize} + % open in read mode first, so we don't create the file if it doesn't exist. + case file:open(Filepath, [read, raw]) of + {ok, Fd_Read} -> + {ok, Fd} = file:open(Filepath, [read, write, raw, binary]), + ok = file:close(Fd_Read), + couch_stats_collector:track_process_count({couchdb, open_os_files}), + {ok, #file{fd=Fd}}; + Error -> + init_status_error(ReturnPid, Ref, Error) + end + end. + + +terminate(_Reason, _Fd) -> + ok. + + +handle_call({pread, Pos, Bytes}, _From, #file{fd=Fd,tail_append_begin=TailAppendBegin}=File) -> + {ok, Bin} = file:pread(Fd, Pos, Bytes), + {reply, {ok, Bin, Pos >= TailAppendBegin}, File}; +handle_call(bytes, _From, #file{fd=Fd}=File) -> + {reply, file:position(Fd, eof), File}; +handle_call(sync, _From, #file{fd=Fd}=File) -> + {reply, file:sync(Fd), File}; +handle_call({truncate, Pos}, _From, #file{fd=Fd}=File) -> + {ok, Pos} = file:position(Fd, Pos), + {reply, file:truncate(Fd), File}; +handle_call({append_bin, Bin}, _From, #file{fd=Fd}=File) -> + {ok, Pos} = file:position(Fd, eof), + Blocks = make_blocks(Pos rem ?SIZE_BLOCK, Bin), + case file:pwrite(Fd, Pos, Blocks) of + ok -> + {reply, {ok, Pos}, File}; + Error -> + {reply, Error, File} + end; +handle_call({write_header, Bin}, _From, #file{fd=Fd}=File) -> + {ok, Pos} = file:position(Fd, eof), + BinSize = size(Bin), + case Pos rem ?SIZE_BLOCK of + 0 -> + Padding = <<>>; + BlockOffset -> + Padding = <<0:(8*(?SIZE_BLOCK-BlockOffset))>> end, - ok = sync(Fd), - % pad out the header with zeros, then take the md5 hash - PadZeros = <<0:(8*(?HEADER_SIZE - FilledSize2))>>, - Sig = erlang:md5([TermBin2, PadZeros]), - % now we assemble the final header binary and write to disk - WriteBin = <>, - ?HEADER_SIZE = size(WriteBin), % sanity check - DblWriteBin = [WriteBin, WriteBin], - ok = pwrite(Fd, 0, DblWriteBin), - ok = sync(Fd). + FinalBin = [Padding, <<1, BinSize:32/integer>> | make_blocks(1, [Bin])], + {reply, file:pwrite(Fd, Pos, FinalBin), File}; + + +handle_call({upgrade_old_header, Prefix}, _From, #file{fd=Fd}=File) -> + case (catch read_old_header(Fd, Prefix)) of + {ok, Header} -> + {ok, TailAppendBegin} = file:position(Fd, eof), + Bin = term_to_binary(Header), + Md5 = erlang:md5(Bin), + % now we assemble the final header binary and write to disk + FinalBin = <>, + {reply, ok, _} = handle_call({write_header, FinalBin}, ok, File), + ok = write_old_header(Fd, <<"upgraded">>, TailAppendBegin), + {reply, ok, File#file{tail_append_begin=TailAppendBegin}}; + _Error -> + case (catch read_old_header(Fd, <<"upgraded">>)) of + {ok, TailAppendBegin} -> + {reply, ok, File#file{tail_append_begin = TailAppendBegin}}; + _Error2 -> + {reply, ok, File} + end + end; + +handle_call(find_header, _From, #file{fd=Fd}=File) -> + {ok, Pos} = file:position(Fd, eof), + {reply, find_header(Fd, Pos div ?SIZE_BLOCK), File}. + +% 09 UPGRADE CODE +-define(HEADER_SIZE, 2048). % size of each segment of the doubly written header -read_header(Fd, Prefix) -> - {ok, Bin} = couch_file:pread(Fd, 0, 2*(?HEADER_SIZE)), +% 09 UPGRADE CODE +read_old_header(Fd, Prefix) -> + {ok, Bin} = file:pread(Fd, 0, 2*(?HEADER_SIZE)), <> = Bin, Result = % read the first header @@ -238,6 +350,7 @@ read_header(Fd, Prefix) -> Result end. +% 09 UPGRADE CODE extract_header(Prefix, Bin) -> SizeOfPrefix = size(Prefix), SizeOfTermBin = ?HEADER_SIZE - @@ -260,88 +373,35 @@ extract_header(Prefix, Bin) -> _ -> unknown_header_type end. + - -init_status_error(ReturnPid, Ref, Error) -> - ReturnPid ! {Ref, self(), Error}, - ignore. - -% server functions - -init({Filepath, Options, ReturnPid, Ref}) -> - case lists:member(create, Options) of +% 09 UPGRADE CODE +write_old_header(Fd, Prefix, Data) -> + TermBin = term_to_binary(Data), + % the size of all the bytes written to the header, including the md5 signature (16 bytes) + FilledSize = size(Prefix) + size(TermBin) + 16, + {TermBin2, FilledSize2} = + case FilledSize > ?HEADER_SIZE of true -> - filelib:ensure_dir(Filepath), - case file:open(Filepath, [read, write, raw, binary]) of - {ok, Fd} -> - {ok, Length} = file:position(Fd, eof), - case Length > 0 of - true -> - % this means the file already exists and has data. - % FYI: We don't differentiate between empty files and non-existant - % files here. - case lists:member(overwrite, Options) of - true -> - {ok, 0} = file:position(Fd, 0), - ok = file:truncate(Fd), - couch_stats_collector:track_process_count( - {couchdb, open_os_files}), - {ok, Fd}; - false -> - ok = file:close(Fd), - init_status_error(ReturnPid, Ref, file_exists) - end; - false -> - couch_stats_collector:track_process_count( - {couchdb, open_os_files}), - {ok, Fd} - end; - Error -> - init_status_error(ReturnPid, Ref, Error) - end; + % too big! + {ok, Pos} = append_binary(Fd, TermBin), + PtrBin = term_to_binary({pointer_to_header_data, Pos}), + {PtrBin, size(Prefix) + size(PtrBin) + 16}; false -> - % open in read mode first, so we don't create the file if it doesn't exist. - case file:open(Filepath, [read, raw]) of - {ok, Fd_Read} -> - {ok, Fd} = file:open(Filepath, [read, write, raw, binary]), - ok = file:close(Fd_Read), - couch_stats_collector:track_process_count({couchdb, open_os_files}), - {ok, Fd}; - Error -> - init_status_error(ReturnPid, Ref, Error) - end - end. - - -terminate(_Reason, _Fd) -> - ok. - - -handle_call({pread, Pos, Bytes}, _From, Fd) -> - {reply, file:pread(Fd, Pos, Bytes), Fd}; -handle_call({pwrite, Pos, Bin}, _From, Fd) -> - {reply, file:pwrite(Fd, Pos, Bin), Fd}; -handle_call({expand, Num}, _From, Fd) -> - {ok, Pos} = file:position(Fd, eof), - {reply, {file:pwrite(Fd, Pos + Num - 1, <<0>>), Pos}, Fd}; -handle_call(bytes, _From, Fd) -> - {reply, file:position(Fd, eof), Fd}; -handle_call(sync, _From, Fd) -> - {reply, file:sync(Fd), Fd}; -handle_call({truncate, Pos}, _From, Fd) -> - {ok, Pos} = file:position(Fd, Pos), - {reply, file:truncate(Fd), Fd}; -handle_call({append_bin, Bin}, _From, Fd) -> - Len = size(Bin), - Bin2 = <>, - {ok, Pos} = file:position(Fd, eof), - {reply, {file:pwrite(Fd, Pos, Bin2), Pos}, Fd}; -handle_call({pread_bin, Pos}, _From, Fd) -> - {ok, <>} = file:pread(Fd, Pos, 4), - {ok, Bin} = file:pread(Fd, Pos + 4, TermLen), - {reply, {ok, Bin}, Fd}. - + {TermBin, FilledSize} + end, + ok = file:sync(Fd), + % pad out the header with zeros, then take the md5 hash + PadZeros = <<0:(8*(?HEADER_SIZE - FilledSize2))>>, + Sig = erlang:md5([TermBin2, PadZeros]), + % now we assemble the final header binary and write to disk + WriteBin = <>, + ?HEADER_SIZE = size(WriteBin), % sanity check + DblWriteBin = [WriteBin, WriteBin], + ok = file:pwrite(Fd, 0, DblWriteBin), + ok = file:sync(Fd). + handle_cast(close, Fd) -> {stop,normal,Fd}. @@ -351,3 +411,82 @@ code_change(_OldVsn, State, _Extra) -> handle_info({'EXIT', _, Reason}, Fd) -> {stop, Reason, Fd}. + + +find_header(_Fd, -1) -> + no_valid_header; +find_header(Fd, Block) -> + case (catch load_header(Fd, Block)) of + {ok, Bin} -> + {ok, Bin}; + _Error -> + find_header(Fd, Block -1) + end. + +load_header(Fd, Block) -> + {ok, <<1>>} = file:pread(Fd, Block*?SIZE_BLOCK, 1), + {ok, <>} = file:pread(Fd, (Block*?SIZE_BLOCK) + 1, 4), + TotalBytes = calculate_total_read_len(1, HeaderLen), + {ok, <>} = + file:pread(Fd, (Block*?SIZE_BLOCK) + 5, TotalBytes), + <> = + iolist_to_binary(remove_block_prefixes(1, RawBin)), + Md5Sig = erlang:md5(HeaderBin), + {ok, HeaderBin}. + +calculate_total_read_len(0, FinalLen) -> + calculate_total_read_len(1, FinalLen) + 1; +calculate_total_read_len(BlockOffset, FinalLen) -> + case ?SIZE_BLOCK - BlockOffset of + BlockLeft when BlockLeft >= FinalLen -> + FinalLen; + BlockLeft -> + FinalLen + ((FinalLen - BlockLeft) div (?SIZE_BLOCK -1)) + + if ((FinalLen - BlockLeft) rem (?SIZE_BLOCK -1)) == 0 -> 0; + true -> 1 end + end. + +remove_block_prefixes(_BlockOffset, <<>>) -> + []; +remove_block_prefixes(0, <<_BlockPrefix,Rest/binary>>) -> + remove_block_prefixes(1, Rest); +remove_block_prefixes(BlockOffset, Bin) -> + BlockBytesAvailable = ?SIZE_BLOCK - BlockOffset, + case size(Bin) of + Size when Size > BlockBytesAvailable -> + <> = Bin, + [DataBlock | remove_block_prefixes(0, Rest)]; + _Size -> + [Bin] + end. + +make_blocks(_BlockOffset, []) -> + []; +make_blocks(0, IoList) -> + [<<0>> | make_blocks(1, IoList)]; +make_blocks(BlockOffset, IoList) -> + case split_iolist(IoList, (?SIZE_BLOCK - BlockOffset), []) of + {Begin, End} -> + [Begin | make_blocks(0, End)]; + _Size -> + IoList + end. + +split_iolist(List, 0, BeginAcc) -> + {lists:reverse(BeginAcc), List}; +split_iolist([], SplitAt, _BeginAcc) -> + SplitAt; +split_iolist([<> | Rest], SplitAt, BeginAcc) when SplitAt > size(Bin) -> + split_iolist(Rest, SplitAt - size(Bin), [Bin | BeginAcc]); +split_iolist([<> | Rest], SplitAt, BeginAcc) -> + <> = Bin, + split_iolist([End | Rest], 0, [Begin | BeginAcc]); +split_iolist([Sublist| Rest], SplitAt, BeginAcc) when is_list(Sublist) -> + case split_iolist(Sublist, SplitAt, BeginAcc) of + {Begin, End} -> + {Begin, [End | Rest]}; + Len -> + split_iolist(Rest, SplitAt - Len, [Sublist | BeginAcc]) + end; +split_iolist([Byte | Rest], SplitAt, BeginAcc) when is_integer(Byte) -> + split_iolist(Rest, SplitAt - 1, [Byte | BeginAcc]). diff --git a/src/couchdb/couch_httpd_db.erl b/src/couchdb/couch_httpd_db.erl index 03cba11e..cb8c205f 100644 --- a/src/couchdb/couch_httpd_db.erl +++ b/src/couchdb/couch_httpd_db.erl @@ -723,12 +723,7 @@ db_attachment_req(#httpd{method='GET'}=Req, Db, DocId, FileNameParts) -> % {"Content-Length", integer_to_list(couch_doc:bin_size(Bin))} ]), couch_doc:bin_foldl(Bin, - fun(BinSegment, []) -> - send_chunk(Resp, BinSegment), - {ok, []} - end, - [] - ), + fun(BinSegment, _) -> send_chunk(Resp, BinSegment) end,[]), send_chunk(Resp, "") end; diff --git a/src/couchdb/couch_stream.erl b/src/couchdb/couch_stream.erl index d6f72696..bf9fd3c2 100644 --- a/src/couchdb/couch_stream.erl +++ b/src/couchdb/couch_stream.erl @@ -13,14 +13,6 @@ -module(couch_stream). -behaviour(gen_server). --export([test/1]). --export([open/1, open/2, close/1, read/3, read_term/2, write/2, write_term/2, get_state/1, foldl/5]). --export([copy/4, copy_to_new_stream/4]). --export([ensure_buffer/2, set_min_buffer/2]). --export([init/1, terminate/2, handle_call/3]). --export([handle_cast/2,code_change/3,handle_info/2]). - --include("couch_db.hrl"). -define(FILE_POINTER_BYTES, 8). -define(FILE_POINTER_BITS, 8*(?FILE_POINTER_BYTES)). @@ -32,125 +24,111 @@ -define(DEFAULT_STREAM_CHUNK, 16#00100000). % 1 meg chunks when streaming data +-export([test/0]). +-export([open/1, close/1, write/2, foldl/4, old_foldl/5,old_copy_to_new_stream/4]). +-export([copy_to_new_stream/3,old_read_term/2]). +-export([init/1, terminate/2, handle_call/3]). +-export([handle_cast/2,code_change/3,handle_info/2]). --record(write_stream, - {fd = 0, - current_pos = 0, - bytes_remaining = 0, - next_alloc = 0, - min_alloc = 16#00010000 - }). +-include("couch_db.hrl"). -record(stream, - { - pid, - fd + {fd = 0, + written_pointers=[], + buffer_list = [], + buffer_len = 0, + max_buffer = 4096, + written_len = 0 }). %%% Interface functions %%% open(Fd) -> - open(nil, Fd). + gen_server:start_link(couch_stream, Fd, []). -open(nil, Fd) -> - open({0,0}, Fd); -open(State, Fd) -> - {ok, Pid} = gen_server:start_link(couch_stream, {State, Fd}, []), - {ok, #stream{pid = Pid, fd = Fd}}. - -close(#stream{pid = Pid, fd = _Fd}) -> +close(Pid) -> gen_server:call(Pid, close, infinity). -get_state(#stream{pid = Pid, fd = _Fd}) -> - gen_server:call(Pid, get_state, infinity). - -ensure_buffer(#stream{pid = Pid, fd = _Fd}, Bytes) -> - gen_server:call(Pid, {ensure_buffer, Bytes}). - -set_min_buffer(#stream{pid = Pid, fd = _Fd}, Bytes) -> - gen_server:call(Pid, {set_min_buffer, Bytes}). - -read(#stream{pid = _Pid, fd = Fd}, Sp, Num) -> - read(Fd, Sp, Num); -read(Fd, Sp, Num) -> - {ok, RevBin, Sp2} = stream_data(Fd, Sp, Num, ?HUGE_CHUNK, fun(Bin, Acc) -> {ok, [Bin | Acc]} end, []), - Bin = list_to_binary(lists:reverse(RevBin)), - {ok, Bin, Sp2}. - -copy_to_new_stream(Src, Sp, Len, DestFd) -> +copy_to_new_stream(Fd, PosList, DestFd) -> {ok, Dest} = open(DestFd), - ok = set_min_buffer(Dest, 0), - {ok, NewSp} = copy(Src, Sp, Len, Dest), - close(Dest), - {ok, NewSp}. - -copy(#stream{pid = _Pid, fd = Fd}, Sp, Len, DestStream) -> - copy(Fd, Sp, Len, DestStream); -copy(Fd, Sp, Len, DestStream) -> - {ok, NewSp, _Sp2} = stream_data(Fd, Sp, Len, ?HUGE_CHUNK, - fun(Bin, AccPointer) -> - {ok, NewPointer} = write(DestStream, Bin), - {ok, if AccPointer == null -> NewPointer; true -> AccPointer end} - end, - null), - {ok, NewSp}. - -foldl(#stream{pid = _Pid, fd = Fd}, Sp, Num, Fun, Acc) -> - foldl(Fd, Sp, Num, Fun, Acc); -foldl(Fd, Sp, Num, Fun, Acc) -> - {ok, _Acc, _Sp} = stream_data(Fd, Sp, Num, ?DEFAULT_STREAM_CHUNK, Fun, Acc). - -read_term(#stream{pid = _Pid, fd = Fd}, Sp) -> - read_term(Fd, Sp); -read_term(Fd, Sp) -> - {ok, <>, Sp2} - = read(Fd, Sp, ?STREAM_OFFSET_BYTES), - {ok, Bin, _Sp3} = read(Fd, Sp2, TermLen), - {ok, binary_to_term(Bin)}. + foldl(Fd, PosList, + fun(Bin, _) -> + ok = write(Dest, Bin) + end, ok), + close(Dest). -write_term(Stream, Term) -> - Bin = term_to_binary(Term), - Size = size(Bin), - Bin2 = <>, - write(Stream, Bin2). -write(#stream{}, <<>>) -> - {ok, {0,0}}; -write(#stream{pid = Pid}, Bin) when is_binary(Bin) -> +% 09 UPGRADE CODE +old_copy_to_new_stream(Fd, Pos, Len, DestFd) -> + {ok, Dest} = open(DestFd), + old_foldl(Fd, Pos, Len, + fun(Bin, _) -> + ok = write(Dest, Bin) + end, ok), + close(Dest). + +% 09 UPGRADE CODE +old_foldl(_Fd, null, 0, _Fun, Acc) -> + Acc; +old_foldl(Fd, OldPointer, Len, Fun, Acc) when is_tuple(OldPointer)-> + old_stream_data(Fd, OldPointer, Len, ?DEFAULT_STREAM_CHUNK, Fun, Acc). + +foldl(_Fd, [], _Fun, Acc) -> + Acc; +foldl(Fd, [Pos|Rest], Fun, Acc) -> + {ok, Bin} = couch_file:pread_iolist(Fd, Pos), + foldl(Fd, Rest, Fun, Fun(Bin, Acc)). + +write(_Pid, <<>>) -> + ok; +write(Pid, Bin) -> gen_server:call(Pid, {write, Bin}, infinity). -init({{Pos, BytesRemaining}, Fd}) -> - {ok, #write_stream - {fd = Fd, - current_pos = Pos, - bytes_remaining = BytesRemaining - }}. +init(Fd) -> + {ok, #stream{fd = Fd}}. terminate(_Reason, _Stream) -> ok. -handle_call(get_state, _From, Stream) -> - #write_stream{current_pos = Pos, bytes_remaining = BytesRemaining} = Stream, - {reply, {Pos, BytesRemaining}, Stream}; -handle_call({set_min_buffer, MinBuffer}, _From, Stream) -> - {reply, ok, Stream#write_stream{min_alloc = MinBuffer}}; -% set next_alloc if we need more room -handle_call({ensure_buffer, BufferSizeRequested}, _From, Stream) -> - #write_stream{bytes_remaining = BytesRemainingInCurrentBuffer} = Stream, - case BytesRemainingInCurrentBuffer < BufferSizeRequested of - true -> NextAlloc = BufferSizeRequested - BytesRemainingInCurrentBuffer; - false -> NextAlloc = 0 % enough room in current segment - end, - {reply, ok, Stream#write_stream{next_alloc = NextAlloc}}; handle_call({write, Bin}, _From, Stream) -> - % ensure init is called first so we can get a pointer to the begining of the binary - {ok, Sp, Stream2} = write_data(Stream, Bin), - {reply, {ok, Sp}, Stream2}; + BinSize = iolist_size(Bin), + #stream{ + fd = Fd, + written_len = WrittenLen, + written_pointers = Written, + buffer_len = BufferLen, + buffer_list = Buffer, + max_buffer = Max} = Stream, + if BinSize + BufferLen > Max -> + {ok, Pos} = couch_file:append_binary(Fd, lists:reverse(Buffer, Bin)), + {reply, ok, Stream#stream{ + written_len=WrittenLen + BufferLen + BinSize, + written_pointers=[Pos|Written], + buffer_list=[], + buffer_len=0}}; + true -> + {reply, ok, Stream#stream{ + buffer_list=[Bin|Buffer], + buffer_len=BufferLen + BinSize}} + end; handle_call(close, _From, Stream) -> - #write_stream{current_pos=Pos, bytes_remaining = BytesRemaining} = Stream, - {stop, normal, {ok, {Pos, BytesRemaining}}, Stream}. + #stream{ + fd = Fd, + written_len = WrittenLen, + written_pointers = Written, + buffer_len = BufferLen, + buffer_list = Buffer} = Stream, + + case Buffer of + [] -> + Result = {Written, WrittenLen}; + _ -> + {ok, Pos} = couch_file:append_binary(Fd, lists:reverse(Buffer)), + Result = {[Pos|Written], WrittenLen + BufferLen} + end, + {stop, normal, Result, Stream}. handle_cast(_Msg, State) -> {noreply,State}. @@ -160,14 +138,27 @@ code_change(_OldVsn, State, _Extra) -> handle_info(_Info, State) -> {noreply, State}. + + -%%% Internal function %%% +% 09 UPGRADE CODE +old_read_term(Fd, Sp) -> + {ok, <>, Sp2} + = old_read(Fd, Sp, ?STREAM_OFFSET_BYTES), + {ok, Bin, _Sp3} = old_read(Fd, Sp2, TermLen), + {ok, binary_to_term(Bin)}. -stream_data(_Fd, Sp, 0, _MaxChunk, _Fun, Acc) -> +old_read(Fd, Sp, Num) -> + {ok, RevBin, Sp2} = old_stream_data(Fd, Sp, Num, ?HUGE_CHUNK, fun(Bin, Acc) -> [Bin | Acc] end, []), + Bin = list_to_binary(lists:reverse(RevBin)), + {ok, Bin, Sp2}. + +% 09 UPGRADE CODE +old_stream_data(_Fd, Sp, 0, _MaxChunk, _Fun, Acc) -> {ok, Acc, Sp}; -stream_data(Fd, {Pos, 0}, Num, MaxChunk, Fun, Acc) -> +old_stream_data(Fd, {Pos, 0}, Num, MaxChunk, Fun, Acc) -> {ok, <>} - = couch_file:pread(Fd, Pos, ?FILE_POINTER_BYTES + ?STREAM_OFFSET_BYTES), + = couch_file:old_pread(Fd, Pos, ?FILE_POINTER_BYTES + ?STREAM_OFFSET_BYTES), Sp = {NextPos, NextOffset}, % Check NextPos is past current Pos (this is always true in a stream) % Guards against potential infinite loops caused by corruption. @@ -175,86 +166,47 @@ stream_data(Fd, {Pos, 0}, Num, MaxChunk, Fun, Acc) -> true -> ok; false -> throw({error, stream_corruption}) end, - stream_data(Fd, Sp, Num, MaxChunk, Fun, Acc); -stream_data(Fd, {Pos, Offset}, Num, MaxChunk, Fun, Acc) -> + old_stream_data(Fd, Sp, Num, MaxChunk, Fun, Acc); +old_stream_data(Fd, {Pos, Offset}, Num, MaxChunk, Fun, Acc) -> ReadAmount = lists:min([MaxChunk, Num, Offset]), - {ok, Bin} = couch_file:pread(Fd, Pos, ReadAmount), + {ok, Bin} = couch_file:old_pread(Fd, Pos, ReadAmount), Sp = {Pos + ReadAmount, Offset - ReadAmount}, - case Fun(Bin, Acc) of - {ok, Acc2} -> - stream_data(Fd, Sp, Num - ReadAmount, MaxChunk, Fun, Acc2); - {stop, Acc2} -> - {ok, Acc2, Sp} - end. - -write_data(Stream, <<>>) -> - {ok, {0,0}, Stream}; -write_data(#write_stream{bytes_remaining=0} = Stream, Bin) -> - #write_stream { - fd = Fd, - current_pos = CurrentPos, - next_alloc = NextAlloc, - min_alloc = MinAlloc - }= Stream, - - NewSize = lists:max([MinAlloc, NextAlloc, size(Bin)]), - % no space in the current segment, must alloc a new segment - {ok, NewPos} = couch_file:expand(Fd, NewSize + ?FILE_POINTER_BYTES + ?STREAM_OFFSET_BYTES), - - case CurrentPos of - 0 -> - ok; - _ -> - ok = couch_file:pwrite(Fd, CurrentPos, <>) - end, - Stream2 = Stream#write_stream{ - current_pos=NewPos, - bytes_remaining=NewSize, - next_alloc=0}, - write_data(Stream2, Bin); -write_data(#write_stream{fd=Fd, current_pos=Pos, bytes_remaining=BytesRemaining} = Stream, Bin) -> - BytesToWrite = lists:min([size(Bin), BytesRemaining]), - {WriteBin, Rest} = split_binary(Bin, BytesToWrite), - ok = couch_file:pwrite(Fd, Pos, WriteBin), - Stream2 = Stream#write_stream{ - bytes_remaining=BytesRemaining - BytesToWrite, - current_pos=Pos + BytesToWrite - }, - {ok, _, Stream3} = write_data(Stream2, Rest), - {ok, {Pos, BytesRemaining}, Stream3}. + old_stream_data(Fd, Sp, Num - ReadAmount, MaxChunk, Fun, Fun(Bin, Acc)). %%% Tests %%% - -test(Term) -> - {ok, Fd} = couch_file:open("foo", [write]), - {ok, Stream} = open({0,0}, Fd), - {ok, Pos} = write_term(Stream, Term), - {ok, Pos2} = write_term(Stream, {Term, Term}), - close(Stream), +read_all(Fd, PosList) -> + iolist_to_binary(foldl(Fd, PosList, + fun(Bin, Acc) -> + [Bin, Acc] + end, [])). + + +test() -> + {ok, Fd} = couch_file:open("foo", [create,overwrite]), + ok = couch_file:write_header(Fd, {howdy, howdy}), + Bin = <<"damienkatz">>, + {ok, Pos} = couch_file:append_binary(Fd, Bin), + {ok, Bin} = couch_file:pread_binary(Fd, Pos), + {ok, {howdy, howdy}} = couch_file:read_header(Fd), + ok = couch_file:write_header(Fd, {foo, foo}), + {ok, {foo, foo}} = couch_file:read_header(Fd), + + {ok, Stream} = open(Fd), + ok = write(Stream, <<"food">>), + ok = write(Stream, <<"foob">>), + {PosList, 8} = close(Stream), + <<"foodfoob">> = read_all(Fd, PosList), + {ok, Stream2} = open(Fd), + OneBits = <<1:(8*10)>>, + ZeroBits = <<0:(8*10)>>, + ok = write(Stream2, OneBits), + ok = write(Stream2, ZeroBits), + {PosList2, 20} = close(Stream2), + AllBits = iolist_to_binary([OneBits,ZeroBits]), + AllBits = read_all(Fd, PosList2), couch_file:close(Fd), - {ok, Fd2} = couch_file:open("foo", [read, write]), - {ok, Stream2} = open({0,0}, Fd2), - {ok, Term1} = read_term(Fd2, Pos), - io:format("Term1: ~w ~n",[Term1]), - {ok, Term2} = read_term(Fd2, Pos2), - io:format("Term2: ~w ~n",[Term2]), - {ok, PointerList} = deep_write_test(Stream2, Term, 1000, []), - deep_read_test(Fd2, PointerList), - close(Stream2), - couch_file:close(Fd2). - -deep_read_test(_Fd, []) -> - ok; -deep_read_test(Fd, [Pointer | RestPointerList]) -> - {ok, _Term} = read_term(Fd, Pointer), - deep_read_test(Fd, RestPointerList). - -deep_write_test(_Stream, _Term, 0, PointerList) -> - {ok, PointerList}; -deep_write_test(Stream, Term, N, PointerList) -> - WriteList = lists:duplicate(random:uniform(N), Term), - {ok, Pointer} = write_term(Stream, WriteList), - deep_write_test(Stream, Term, N-1, [Pointer | PointerList]). + PosList2. + diff --git a/src/couchdb/couch_view_group.erl b/src/couchdb/couch_view_group.erl index af4ea814..28679927 100644 --- a/src/couchdb/couch_view_group.erl +++ b/src/couchdb/couch_view_group.erl @@ -205,7 +205,7 @@ handle_info(delayed_commit, #group_state{db_name=DbName,group=Group}=State) -> if CommittedSeq >= Group#group.current_seq -> % save the header Header = {Group#group.sig, get_index_header_data(Group)}, - ok = couch_file:write_header(Group#group.fd, <<$r, $c, $k, 0>>, Header), + ok = couch_file:write_header(Group#group.fd, Header), {noreply, State#group_state{waiting_commit=false}}; true -> % We can't commit the header because the database seq that's fully @@ -261,7 +261,7 @@ handle_info({'EXIT', FromPid, reset}, handle_info({'EXIT', _FromPid, normal}, State) -> {noreply, State}; -handle_info({'EXIT', FromPid, {{nocatch, Reason}, Trace}}, State) -> +handle_info({'EXIT', FromPid, {{nocatch, Reason}, _Trace}}, State) -> ?LOG_DEBUG("Uncaught throw() in linked pid: ~p", [{FromPid, Reason}]), {stop, Reason, State}; @@ -313,7 +313,9 @@ prepare_group({view, RootDir, DbName, GroupId}, ForceReset)-> if ForceReset -> {ok, reset_file(Db, Fd, DbName, Group)}; true -> - case (catch couch_file:read_header(Fd, <<$r, $c, $k, 0>>)) of + % 09 UPGRADE CODE + ok = couch_file:upgrade_old_header(Fd, <<$r, $c, $k, 0>>), + case (catch couch_file:read_header(Fd)) of {ok, {Sig, HeaderInfo}} -> % sigs match! {ok, init_group(Db, Fd, Group, HeaderInfo)}; @@ -417,7 +419,7 @@ reset_group(#group{views=Views}=Group) -> reset_file(Db, Fd, DbName, #group{sig=Sig,name=Name} = Group) -> ?LOG_DEBUG("Reseting group index \"~s\" in db ~s", [Name, DbName]), ok = couch_file:truncate(Fd, 0), - ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, {Sig, nil}), + ok = couch_file:write_header(Fd, {Sig, nil}), init_group(Db, Fd, reset_group(Group), nil). delete_index_file(RootDir, DbName, GroupId) -> -- cgit v1.2.3