summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDamien F. Katz <damien@apache.org>2009-05-25 19:52:28 +0000
committerDamien F. Katz <damien@apache.org>2009-05-25 19:52:28 +0000
commit16ccd4c0b8ae4272fa27d32948658b1424a291fc (patch)
treef6d59d017234409436091cc53938b27549d9b54f /src
parent4aac0f7c6dcd3f3a29cfe5e1bf2bee84b9bae9d5 (diff)
Merging new tail append storage into trunk. Upgrades are automatic, once opened by this version old versions of CouchDB will not be able to open the files. As a precaution, you should back-up your production databases before upgrading.
git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@778485 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src')
-rw-r--r--src/couchdb/couch_db.erl114
-rw-r--r--src/couchdb/couch_db.hrl10
-rw-r--r--src/couchdb/couch_db_updater.erl148
-rw-r--r--src/couchdb/couch_doc.erl18
-rw-r--r--src/couchdb/couch_file.erl421
-rw-r--r--src/couchdb/couch_httpd_db.erl7
-rw-r--r--src/couchdb/couch_stream.erl316
-rw-r--r--src/couchdb/couch_view_group.erl10
8 files changed, 571 insertions, 473 deletions
diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl
index 29dbbd38..d0a4e34c 100644
--- a/src/couchdb/couch_db.erl
+++ b/src/couchdb/couch_db.erl
@@ -24,7 +24,7 @@
-export([increment_update_seq/1,get_purge_seq/1,purge_docs/2,get_last_purged/1]).
-export([start_link/3,open_doc_int/3,set_admins/2,get_admins/1,ensure_full_commit/1]).
-export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]).
--export([changes_since/5]).
+-export([changes_since/5,read_doc/2]).
-include("couch_db.hrl").
@@ -50,6 +50,7 @@ open_db_file(Filepath, Options) ->
{ok, Fd} ->
?LOG_INFO("Found ~s~s compaction file, using as primary storage.", [Filepath, ".compact"]),
ok = file:rename(Filepath ++ ".compact", Filepath),
+ ok = couch_file:sync(Fd),
{ok, Fd};
{error, enoent} ->
{not_found, no_db_file}
@@ -154,7 +155,7 @@ increment_update_seq(#db{update_pid=UpdatePid}) ->
purge_docs(#db{update_pid=UpdatePid}, IdsRevs) ->
gen_server:call(UpdatePid, {purge_docs, IdsRevs}).
-get_committed_update_seq(#db{header=#db_header{update_seq=Seq}}) ->
+get_committed_update_seq(#db{committed_update_seq=Seq}) ->
Seq.
get_update_seq(#db{update_seq=Seq})->
@@ -565,93 +566,55 @@ flush_binary(Fd, {Fd0, StreamPointer, Len}) when Fd0 == Fd ->
% already written to our file, nothing to write
{Fd, StreamPointer, Len};
+flush_binary(Fd, {OtherFd, StreamPointer, Len}) when is_tuple(StreamPointer) ->
+ {NewStreamData, Len} =
+ couch_stream:old_copy_to_new_stream(OtherFd, StreamPointer, Len, Fd),
+ {Fd, NewStreamData, Len};
+
flush_binary(Fd, {OtherFd, StreamPointer, Len}) ->
- with_stream(Fd, fun(OutputStream) ->
- % written to a different file (or a closed file
- % instance, which will cause an error)
- ok = couch_stream:set_min_buffer(OutputStream, Len),
- {ok, {NewStreamPointer, Len}, _EndSp} =
- couch_stream:foldl(OtherFd, StreamPointer, Len,
- fun(Bin, {BeginPointer, SizeAcc}) ->
- {ok, Pointer} = couch_stream:write(OutputStream, Bin),
- case SizeAcc of
- 0 -> % this was the first write, record the pointer
- {ok, {Pointer, size(Bin)}};
- _ ->
- {ok, {BeginPointer, SizeAcc + size(Bin)}}
- end
- end,
- {{0,0}, 0}),
- {Fd, NewStreamPointer, Len}
- end);
+ {NewStreamData, Len} =
+ couch_stream:copy_to_new_stream(OtherFd, StreamPointer, Fd),
+ {Fd, NewStreamData, Len};
flush_binary(Fd, Bin) when is_binary(Bin) ->
- with_stream(Fd, fun(OutputStream) ->
- ok = couch_stream:set_min_buffer(OutputStream, size(Bin)),
- {ok, StreamPointer} = couch_stream:write(OutputStream, Bin),
- {Fd, StreamPointer, size(Bin)}
+ with_stream(Fd, fun(OutputStream) ->
+ couch_stream:write(OutputStream, Bin)
end);
flush_binary(Fd, {StreamFun, undefined}) when is_function(StreamFun) ->
- % max_attachment_chunk_size control the max we buffer in memory
- MaxChunkSize = list_to_integer(couch_config:get("couchdb",
- "max_attachment_chunk_size","4294967296")),
with_stream(Fd, fun(OutputStream) ->
% StreamFun(MaxChunkSize, WriterFun) must call WriterFun
- % once for each chunk of the attachment.
- WriterFun = make_writer_fun(OutputStream),
- {ok, {TotalLength, NewStreamPointer}} =
- StreamFun(MaxChunkSize, WriterFun, {0, nil}),
- {Fd, NewStreamPointer, TotalLength}
+ % once for each chunk of the attachment,
+ StreamFun(4096,
+ % WriterFun({Length, Binary}, State)
+ % WriterFun({0, _Footers}, State)
+ % Called with Length == 0 on the last time.
+ % WriterFun returns NewState.
+ fun({0, _Footers}, _) ->
+ ok;
+ ({_Length, Bin}, _) ->
+ couch_stream:write(OutputStream, Bin)
+ end, ok)
end);
flush_binary(Fd, {Fun, Len}) when is_function(Fun) ->
with_stream(Fd, fun(OutputStream) ->
- ok = couch_stream:set_min_buffer(OutputStream, Len),
- {ok, StreamPointer} =
- write_streamed_attachment(OutputStream, Fun, Len, nil),
- {Fd, StreamPointer, Len}
+ write_streamed_attachment(OutputStream, Fun, Len)
end).
with_stream(Fd, Fun) ->
{ok, OutputStream} = couch_stream:open(Fd),
- Result = Fun(OutputStream),
- couch_stream:close(OutputStream),
- Result.
+ Fun(OutputStream),
+ {StreamInfo, Len} = couch_stream:close(OutputStream),
+ {Fd, StreamInfo, Len}.
-make_writer_fun(Stream) ->
- % WriterFun({Length, Binary}, State)
- % WriterFun({0, _Footers}, State)
- % Called with Length == 0 on the last time.
- % WriterFun returns NewState.
- fun
- ({0, _Footers}, {FinalLen, SpFin}) ->
- % last block, return the final tuple
- {ok, {FinalLen, SpFin}};
- ({Length, Bin}, {Total, nil}) ->
- % save StreamPointer
- ok = couch_stream:set_min_buffer(Stream, Length),
- {ok, StreamPointer} = couch_stream:write(Stream, Bin),
- {Total+Length, StreamPointer};
- ({Length, Bin}, {Total, SpAcc}) ->
- % write the Bin to disk
- ok = couch_stream:set_min_buffer(Stream, Length),
- {ok, _Sp} = couch_stream:write(Stream, Bin),
- {Total+Length, SpAcc}
- end.
-write_streamed_attachment(_Stream, _F, 0, SpAcc) ->
- {ok, SpAcc};
-write_streamed_attachment(Stream, F, LenLeft, nil) ->
- Bin = F(),
- TruncatedBin = check_bin_length(LenLeft, Bin),
- {ok, SpAcc} = couch_stream:write(Stream, TruncatedBin),
- write_streamed_attachment(Stream, F, LenLeft - size(TruncatedBin), SpAcc);
-write_streamed_attachment(Stream, F, LenLeft, SpAcc) ->
+write_streamed_attachment(_Stream, _F, 0) ->
+ ok;
+write_streamed_attachment(Stream, F, LenLeft) ->
Bin = F(),
- TruncatedBin = check_bin_length(LenLeft, Bin),
- {ok, _} = couch_stream:write(Stream, TruncatedBin),
- write_streamed_attachment(Stream, F, LenLeft - size(TruncatedBin), SpAcc).
+ ok = couch_stream:write(Stream, check_bin_length(LenLeft, Bin)),
+ write_streamed_attachment(Stream, F, LenLeft - size(Bin)).
%% on rare occasions ibrowse seems to process a chunked response incorrectly
%% and include an extra "\r" in the last chunk. This code ensures that we
@@ -857,6 +820,12 @@ doc_meta_info(#doc_info{high_seq=Seq,revs=[#rev_info{rev=Rev}|RestInfo]}, RevTre
true -> [{local_seq, Seq}]
end.
+read_doc(Fd, Pos) when is_integer(Pos) ->
+ couch_file:pread_term(Fd, Pos);
+read_doc(Fd, OldStyleStreamPointer) ->
+ % 09 UPGRADE CODE
+ couch_stream:old_read_term(Fd, OldStyleStreamPointer).
+
doc_to_tree(#doc{revs={Start, RevIds}}=Doc) ->
[Tree] = doc_to_tree_simple(Doc, lists:reverse(RevIds)),
@@ -869,14 +838,13 @@ doc_to_tree_simple(Doc, [RevId | Rest]) ->
[{RevId, ?REV_MISSING, doc_to_tree_simple(Doc, Rest)}].
-make_doc(#db{fd=Fd}=Db, Id, Deleted, Bp, RevisionPath) ->
+make_doc(#db{fd=Fd}, Id, Deleted, Bp, RevisionPath) ->
{BodyData, BinValues} =
case Bp of
nil ->
{[], []};
_ ->
- {ok, {BodyData0, BinValues0}} =
- couch_stream:read_term( Db#db.summary_stream, Bp),
+ {ok, {BodyData0, BinValues0}} = read_doc(Fd, Bp),
{BodyData0,
[{Name,{Type,{Fd,Sp,Len}}} || {Name,{Type,Sp,Len}} <- BinValues0]}
end,
diff --git a/src/couchdb/couch_db.hrl b/src/couchdb/couch_db.hrl
index 0f7e344e..0d28b2fd 100644
--- a/src/couchdb/couch_db.hrl
+++ b/src/couchdb/couch_db.hrl
@@ -109,13 +109,12 @@
% if the disk revision is incremented, then new upgrade logic will need to be
% added to couch_db_updater:init_db.
--define(DISK_VERSION_0_9, 1).
--define(LATEST_DISK_VERSION, 2).
+-define(LATEST_DISK_VERSION, 3).
-record(db_header,
{disk_version = ?LATEST_DISK_VERSION,
update_seq = 0,
- summary_stream_state = nil,
+ unused = 0,
fulldocinfo_by_id_btree_state = nil,
docinfo_by_seq_btree_state = nil,
local_docs_btree_state = nil,
@@ -133,7 +132,7 @@
fd,
fd_ref_counter,
header = #db_header{},
- summary_stream,
+ committed_update_seq,
fulldocinfo_by_id_btree,
docinfo_by_seq_btree,
local_docs_btree,
@@ -145,7 +144,8 @@
admins_ptr = nil,
user_ctx = #user_ctx{},
waiting_delayed_commit = nil,
- revs_limit = 1000
+ revs_limit = 1000,
+ fsync_options = []
}).
diff --git a/src/couchdb/couch_db_updater.erl b/src/couchdb/couch_db_updater.erl
index b1cb9037..31ddbf8c 100644
--- a/src/couchdb/couch_db_updater.erl
+++ b/src/couchdb/couch_db_updater.erl
@@ -19,18 +19,18 @@
-include("couch_db.hrl").
--define(HEADER_SIG, <<$g, $m, $k, 0>>).
init({MainPid, DbName, Filepath, Fd, Options}) ->
case lists:member(create, Options) of
true ->
% create a new header and writes it to the file
Header = #db_header{},
- ok = couch_file:write_header(Fd, ?HEADER_SIG, Header),
+ ok = couch_file:write_header(Fd, Header),
% delete any old compaction files that might be hanging around
file:delete(Filepath ++ ".compact");
false ->
- {ok, Header} = couch_file:read_header(Fd, ?HEADER_SIG)
+ ok = couch_file:upgrade_old_header(Fd, <<$g, $m, $k, 0>>), % 09 UPGRADE CODE
+ {ok, Header} = couch_file:read_header(Fd)
end,
Db = init_db(DbName, Filepath, Fd, Header),
@@ -56,7 +56,7 @@ handle_call({update_docs, DocActions, Options}, _From, Db) ->
end;
handle_call(full_commit, _From, #db{waiting_delayed_commit=nil}=Db) ->
{reply, ok, Db}; % no data waiting, return ok immediately
-handle_call(full_commit, _From, Db) ->
+handle_call(full_commit, _From, Db) ->
{reply, ok, commit_data(Db)}; % commit the data and return ok
handle_call(increment_update_seq, _From, Db) ->
Db2 = commit_data(Db#db{update_seq=Db#db.update_seq+1}),
@@ -158,7 +158,7 @@ handle_cast(start_compact, Db) ->
end;
handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath}=Db) ->
{ok, NewFd} = couch_file:open(CompactFilepath),
- {ok, NewHeader} = couch_file:read_header(NewFd, ?HEADER_SIG),
+ {ok, NewHeader} = couch_file:read_header(NewFd),
#db{update_seq=NewSeq} = NewDb =
init_db(Db#db.name, Filepath, NewFd, NewHeader),
unlink(NewFd),
@@ -191,7 +191,7 @@ handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath}=Db) ->
end.
handle_info(delayed_commit, Db) ->
- {noreply, commit_data(Db#db{waiting_delayed_commit=nil})}.
+ {noreply, commit_data(Db)}.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@@ -214,6 +214,7 @@ btree_by_seq_join(KeySeq, {Id, RevInfos, DeletedRevInfos}) ->
[#rev_info{rev=Rev,seq=Seq,deleted=true,body_sp = Bp} ||
{Rev, Seq, Bp} <- DeletedRevInfos]};
btree_by_seq_join(KeySeq,{Id, Rev, Bp, Conflicts, DelConflicts, Deleted}) ->
+ % 09 UPGRADE CODE
% this is the 0.9.0 and earlier by_seq record. It's missing the body pointers
% and individual seq nums for conflicts that are currently in the index,
% meaning the filtered _changes api will not work except for on main docs.
@@ -244,6 +245,7 @@ btree_by_id_join(Id, {HighSeq, Deleted, DiskTree}) ->
(_RevId, ?REV_MISSING) ->
?REV_MISSING;
(_RevId, {IsDeleted, BodyPointer}) ->
+ % 09 UPGRADE CODE
% this is the 0.9.0 and earlier rev info record. It's missing the seq
% nums, which means couchdb will sometimes reexamine unchanged
% documents with the _changes API.
@@ -280,17 +282,27 @@ less_docid(nil, _) -> true; % nil - special key sorts before all
less_docid({}, _) -> false; % {} -> special key sorts after all
less_docid(A, B) -> A < B.
+
init_db(DbName, Filepath, Fd, Header0) ->
- case element(2, Header0) of
- ?DISK_VERSION_0_9 -> ok; % no problem, all records upgrade on the fly
- ?LATEST_DISK_VERSION -> ok;
+ Header1 = simple_upgrade_record(Header0, #db_header{}),
+ Header =
+ case element(2, Header1) of
+ 1 -> Header1#db_header{unused = 0}; % 0.9
+ 2 -> Header1#db_header{unused = 0}; % post 0.9 and pre 0.10
+ ?LATEST_DISK_VERSION -> Header1;
_ -> throw({database_disk_version_error, "Incorrect disk header version"})
end,
- Header = simple_upgrade_record(Header0, #db_header{}),
- {ok, SummaryStream} = couch_stream:open(Header#db_header.summary_stream_state, Fd),
- ok = couch_stream:set_min_buffer(SummaryStream, 10000),
Less = fun less_docid/2,
-
+
+ {ok, FsyncOptions} = couch_util:parse_term(
+ couch_config:get("couchdb", "fsync_options",
+ "[before_header, after_header, on_file_open]")),
+
+ case lists:member(on_file_open, FsyncOptions) of
+ true -> ok = couch_file:sync(Fd);
+ _ -> ok
+ end,
+
{ok, IdBtree} = couch_btree:open(Header#db_header.fulldocinfo_by_id_btree_state, Fd,
[{split, fun(X) -> btree_by_id_split(X) end},
{join, fun(X,Y) -> btree_by_id_join(X,Y) end},
@@ -308,7 +320,6 @@ init_db(DbName, Filepath, Fd, Header0) ->
AdminsPtr ->
{ok, Admins} = couch_file:pread_term(Fd, AdminsPtr)
end,
-
% convert start time tuple to microsecs and store as a binary string
{MegaSecs, Secs, MicroSecs} = now(),
StartTime = ?l2b(io_lib:format("~p",
@@ -319,22 +330,22 @@ init_db(DbName, Filepath, Fd, Header0) ->
fd=Fd,
fd_ref_counter = RefCntr,
header=Header,
- summary_stream = SummaryStream,
fulldocinfo_by_id_btree = IdBtree,
docinfo_by_seq_btree = SeqBtree,
local_docs_btree = LocalDocsBtree,
+ committed_update_seq = Header#db_header.update_seq,
update_seq = Header#db_header.update_seq,
name = DbName,
filepath = Filepath,
admins = Admins,
admins_ptr = AdminsPtr,
instance_start_time = StartTime,
- revs_limit = Header#db_header.revs_limit
+ revs_limit = Header#db_header.revs_limit,
+ fsync_options = FsyncOptions
}.
-close_db(#db{ fd_ref_counter = RefCntr, summary_stream = SummaryStream}) ->
- couch_stream:close(SummaryStream),
+close_db(#db{fd_ref_counter = RefCntr}) ->
couch_ref_counter:drop(RefCntr).
@@ -387,7 +398,7 @@ flush_trees(#db{fd=Fd}=Db, [InfoUnflushed | RestUnflushed], AccFlushed) ->
?LOG_DEBUG("File where the attachments are written has changed. Possibly retrying.", []),
throw(retry)
end,
- {ok, NewSummaryPointer} = couch_stream:write_term(Db#db.summary_stream, {Doc#doc.body, Bins}),
+ {ok, NewSummaryPointer} = couch_file:append_term(Fd, {Doc#doc.body, Bins}),
{IsDeleted, NewSummaryPointer, UpdateSeq};
_ ->
Value
@@ -549,18 +560,18 @@ update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) ->
commit_data(Db) ->
commit_data(Db, false).
-
-commit_data(#db{fd=Fd, header=Header} = Db, Delay) ->
- Header2 = Header#db_header{
+db_to_header(Db, Header) ->
+ Header#db_header{
update_seq = Db#db.update_seq,
- summary_stream_state = couch_stream:get_state(Db#db.summary_stream),
docinfo_by_seq_btree_state = couch_btree:get_state(Db#db.docinfo_by_seq_btree),
fulldocinfo_by_id_btree_state = couch_btree:get_state(Db#db.fulldocinfo_by_id_btree),
local_docs_btree_state = couch_btree:get_state(Db#db.local_docs_btree),
admins_ptr = Db#db.admins_ptr,
- revs_limit = Db#db.revs_limit
- },
- if Header == Header2 ->
+ revs_limit = Db#db.revs_limit}.
+
+commit_data(#db{fd=Fd,header=OldHeader,fsync_options=FsyncOptions}=Db, Delay) ->
+ Header = db_to_header(Db, OldHeader),
+ if OldHeader == Header ->
Db;
Delay and (Db#db.waiting_delayed_commit == nil) ->
Db#db{waiting_delayed_commit=
@@ -575,43 +586,75 @@ commit_data(#db{fd=Fd, header=Header} = Db, Delay) ->
end;
true -> ok
end,
- ok = couch_file:write_header(Fd, ?HEADER_SIG, Header2),
- Db#db{waiting_delayed_commit=nil,header=Header2}
+ case lists:member(before_header, FsyncOptions) of
+ true -> ok = couch_file:sync(Fd);
+ _ -> ok
+ end,
+
+ ok = couch_file:write_header(Fd, Header),
+
+ case lists:member(after_header, FsyncOptions) of
+ true -> ok = couch_file:sync(Fd);
+ _ -> ok
+ end,
+
+ Db#db{waiting_delayed_commit=nil,
+ header=Header,
+ committed_update_seq=Db#db.update_seq}
end.
-copy_raw_doc(SrcFd, SrcSp, DestFd, DestStream) ->
- {ok, {BodyData, BinInfos}} = couch_stream:read_term(SrcFd, SrcSp),
+
+copy_doc_attachments(SrcFd, SrcSp, DestFd) ->
+ {ok, {BodyData, BinInfos}} = couch_db:read_doc(SrcFd, SrcSp),
% copy the bin values
- NewBinInfos = lists:map(fun({Name, {Type, BinSp, Len}}) ->
- {ok, NewBinSp} = couch_stream:copy_to_new_stream(SrcFd, BinSp, Len, DestFd),
- {Name, {Type, NewBinSp, Len}}
+ NewBinInfos = lists:map(
+ fun({Name, {Type, BinSp, Len}}) when is_tuple(BinSp) orelse BinSp == null ->
+ % 09 UPGRADE CODE
+ {NewBinSp, Len} = couch_stream:old_copy_to_new_stream(SrcFd, BinSp, Len, DestFd),
+ {Name, {Type, NewBinSp, Len}};
+ ({Name, {Type, BinSp, Len}}) ->
+ {NewBinSp, Len} = couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd),
+ {Name, {Type, NewBinSp, Len}}
end, BinInfos),
- % now write the document summary
- {ok, Sp} = couch_stream:write_term(DestStream, {BodyData, NewBinInfos}),
- Sp.
+ {BodyData, NewBinInfos}.
-copy_rev_tree(_SrcFd, _DestFd, _DestStream, []) ->
+copy_rev_tree_attachments(_SrcFd, _DestFd, []) ->
[];
-copy_rev_tree(SrcFd, DestFd, DestStream, [{Start, Tree} | RestTree]) ->
+copy_rev_tree_attachments(SrcFd, DestFd, [{Start, Tree} | RestTree]) ->
% root nner node, only copy info/data from leaf nodes
- [Tree2] = copy_rev_tree(SrcFd, DestFd, DestStream, [Tree]),
- [{Start, Tree2} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTree)];
-copy_rev_tree(SrcFd, DestFd, DestStream, [{RevId, {IsDel, Sp, Seq}, []} | RestTree]) ->
+ [Tree2] = copy_rev_tree_attachments(SrcFd, DestFd, [Tree]),
+ [{Start, Tree2} | copy_rev_tree_attachments(SrcFd, DestFd, RestTree)];
+copy_rev_tree_attachments(SrcFd, DestFd, [{RevId, {IsDel, Sp, Seq}, []} | RestTree]) ->
% This is a leaf node, copy it over
- NewSp = copy_raw_doc(SrcFd, Sp, DestFd, DestStream),
- [{RevId, {IsDel, NewSp, Seq}, []} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTree)];
-copy_rev_tree(SrcFd, DestFd, DestStream, [{RevId, _, SubTree} | RestTree]) ->
+ DocBody = copy_doc_attachments(SrcFd, Sp, DestFd),
+ [{RevId, {IsDel, DocBody, Seq}, []} | copy_rev_tree_attachments(SrcFd, DestFd, RestTree)];
+copy_rev_tree_attachments(SrcFd, DestFd, [{RevId, _, SubTree} | RestTree]) ->
% inner node, only copy info/data from leaf nodes
- [{RevId, ?REV_MISSING, copy_rev_tree(SrcFd, DestFd, DestStream, SubTree)} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTree)].
+ [{RevId, ?REV_MISSING, copy_rev_tree_attachments(SrcFd, DestFd, SubTree)} | copy_rev_tree_attachments(SrcFd, DestFd, RestTree)].
+
-copy_docs(#db{fd=SrcFd}=Db, #db{fd=DestFd,summary_stream=DestStream}=NewDb, InfoBySeq, Retry) ->
+copy_docs(#db{fd=SrcFd}=Db, #db{fd=DestFd}=NewDb, InfoBySeq, Retry) ->
Ids = [Id || #doc_info{id=Id} <- InfoBySeq],
LookupResults = couch_btree:lookup(Db#db.fulldocinfo_by_id_btree, Ids),
+
+ % write out the attachments
NewFullDocInfos0 = lists:map(
fun({ok, #full_doc_info{rev_tree=RevTree}=Info}) ->
- Info#full_doc_info{rev_tree=copy_rev_tree(SrcFd, DestFd, DestStream, RevTree)}
+ Info#full_doc_info{rev_tree=copy_rev_tree_attachments(SrcFd, DestFd, RevTree)}
end, LookupResults),
- NewFullDocInfos = stem_full_doc_infos(Db, NewFullDocInfos0),
+ % write out the docs
+ % we do this in 2 stages so the docs are written out contiguously, making
+ % view indexing and replication faster.
+ NewFullDocInfos1 = lists:map(
+ fun(#full_doc_info{rev_tree=RevTree}=Info) ->
+ Info#full_doc_info{rev_tree=couch_key_tree:map_leafs(
+ fun(_Key, {IsDel, DocBody, Seq}) ->
+ {ok, Pos} = couch_file:append_term(DestFd, DocBody),
+ {IsDel, Pos, Seq}
+ end, RevTree)}
+ end, NewFullDocInfos0),
+
+ NewFullDocInfos = stem_full_doc_infos(Db, NewFullDocInfos1),
NewDocInfos = [couch_doc:to_doc_info(Info) || Info <- NewFullDocInfos],
RemoveSeqs =
case Retry of
@@ -633,7 +676,9 @@ copy_docs(#db{fd=SrcFd}=Db, #db{fd=DestFd,summary_stream=DestStream}=NewDb, Info
-copy_compact(Db, NewDb, Retry) ->
+copy_compact(Db, NewDb0, Retry) ->
+ FsyncOptions = [Op || Op <- NewDb0#db.fsync_options, Op == before_header],
+ NewDb = NewDb0#db{fsync_options=FsyncOptions},
TotalChanges = couch_db:count_changes_since(Db, NewDb#db.update_seq),
EnumBySeqFun =
fun(#doc_info{high_seq=Seq}=DocInfo, _Offset, {AccNewDb, AccUncopied, TotalCopied}) ->
@@ -677,15 +722,14 @@ start_copy_compact(#db{name=Name,filepath=Filepath}=Db) ->
{ok, Fd} ->
couch_task_status:add_task(<<"Database Compaction">>, <<Name/binary, " retry">>, <<"Starting">>),
Retry = true,
- {ok, Header} = couch_file:read_header(Fd, ?HEADER_SIG);
+ {ok, Header} = couch_file:read_header(Fd);
{error, enoent} ->
couch_task_status:add_task(<<"Database Compaction">>, Name, <<"Starting">>),
{ok, Fd} = couch_file:open(CompactFile, [create]),
Retry = false,
- ok = couch_file:write_header(Fd, ?HEADER_SIG, Header=#db_header{})
+ ok = couch_file:write_header(Fd, Header=#db_header{})
end,
NewDb = init_db(Name, CompactFile, Fd, Header),
- unlink(Fd),
NewDb2 = copy_compact(Db, NewDb, Retry),
gen_server:cast(Db#db.update_pid, {compact_done, CompactFile}),
diff --git a/src/couchdb/couch_doc.erl b/src/couchdb/couch_doc.erl
index f3a003e1..8018a1e4 100644
--- a/src/couchdb/couch_doc.erl
+++ b/src/couchdb/couch_doc.erl
@@ -252,13 +252,12 @@ to_doc_info_path(#full_doc_info{id=Id,rev_tree=Tree}) ->
bin_foldl(Bin, Fun, Acc) when is_binary(Bin) ->
- case Fun(Bin, Acc) of
- {ok, Acc2} -> {ok, Acc2};
- {done, Acc2} -> {ok, Acc2}
- end;
-bin_foldl({Fd, Sp, Len}, Fun, Acc) ->
- {ok, Acc2, _Sp2} = couch_stream:foldl(Fd, Sp, Len, Fun, Acc),
- {ok, Acc2}.
+ Fun(Bin, Acc);
+bin_foldl({Fd, Sp, Len}, Fun, Acc) when is_tuple(Sp) orelse Sp == null ->
+ % 09 UPGRADE CODE
+ couch_stream:old_foldl(Fd, Sp, Len, Fun, Acc);
+bin_foldl({Fd, Sp, _Len}, Fun, Acc) ->
+ couch_stream:foldl(Fd, Sp, Fun, Acc).
bin_size(Bin) when is_binary(Bin) ->
size(Bin);
@@ -267,9 +266,8 @@ bin_size({_Fd, _Sp, Len}) ->
bin_to_binary(Bin) when is_binary(Bin) ->
Bin;
-bin_to_binary({Fd, Sp, Len}) ->
- {ok, Bin, _Sp2} = couch_stream:read(Fd, Sp, Len),
- Bin.
+bin_to_binary({Fd, Sp, _Len}) ->
+ couch_stream:foldl(Fd, Sp, fun(Bin, Acc) -> [Bin|Acc] end, []).
get_validate_doc_fun(#doc{body={Props}}) ->
Lang = proplists:get_value(<<"language">>, Props, <<"javascript">>),
diff --git a/src/couchdb/couch_file.erl b/src/couchdb/couch_file.erl
index 430aa6b7..c773be98 100644
--- a/src/couchdb/couch_file.erl
+++ b/src/couchdb/couch_file.erl
@@ -15,10 +15,16 @@
-include("couch_db.hrl").
--define(HEADER_SIZE, 2048). % size of each segment of the doubly written header
+-define(SIZE_BLOCK, 4096).
+
+-record(file, {
+ fd,
+ tail_append_begin=0 % 09 UPGRADE CODE
+ }).
--export([open/1, open/2, close/1, pread/3, pwrite/3, expand/2, bytes/1, sync/1]).
--export([append_term/2, pread_term/2,write_header/3, read_header/2, truncate/2]).
+-export([open/1, open/2, close/1, bytes/1, sync/1, append_binary/2,old_pread/3]).
+-export([append_term/2, pread_term/2, pread_iolist/2, write_header/2]).
+-export([pread_binary/2, read_header/1, truncate/2, upgrade_old_header/2]).
-export([init/1, terminate/2, handle_call/3, handle_cast/2, code_change/3, handle_info/2]).
%%----------------------------------------------------------------------
@@ -52,39 +58,6 @@ open(Filepath, Options) ->
%%----------------------------------------------------------------------
-%% Args: Pos is the offset from the beginning of the file, Bytes is
-%% is the number of bytes to read.
-%% Returns: {ok, Binary} where Binary is a binary data from disk
-%% or {error, Reason}.
-%%----------------------------------------------------------------------
-
-pread(Fd, Pos, Bytes) when Bytes > 0 ->
- gen_server:call(Fd, {pread, Pos, Bytes}, infinity).
-
-
-%%----------------------------------------------------------------------
-%% Args: Pos is the offset from the beginning of the file, Bin is
-%% is the binary to write
-%% Returns: ok
-%% or {error, Reason}.
-%%----------------------------------------------------------------------
-
-pwrite(Fd, Pos, Bin) ->
- gen_server:call(Fd, {pwrite, Pos, Bin}, infinity).
-
-%%----------------------------------------------------------------------
-%% Purpose: To append a segment of zeros to the end of the file.
-%% Args: Bytes is the number of bytes to append to the file.
-%% Returns: {ok, Pos} where Pos is the file offset to the beginning of
-%% the new segments.
-%% or {error, Reason}.
-%%----------------------------------------------------------------------
-
-expand(Fd, Bytes) when Bytes > 0 ->
- gen_server:call(Fd, {expand, Bytes}, infinity).
-
-
-%%----------------------------------------------------------------------
%% Purpose: To append an Erlang term to the end of the file.
%% Args: Erlang term to serialize and append to the file.
%% Returns: {ok, Pos} where Pos is the file offset to the beginning the
@@ -93,7 +66,7 @@ expand(Fd, Bytes) when Bytes > 0 ->
%%----------------------------------------------------------------------
append_term(Fd, Term) ->
- append_binary(Fd, term_to_binary(Term, [compressed])).
+ append_binary(Fd, term_to_binary(Term)).
%%----------------------------------------------------------------------
@@ -105,7 +78,8 @@ append_term(Fd, Term) ->
%%----------------------------------------------------------------------
append_binary(Fd, Bin) ->
- gen_server:call(Fd, {append_bin, Bin}, infinity).
+ Size = iolist_size(Bin),
+ gen_server:call(Fd, {append_bin, [<<Size:32/integer>>, Bin]}, infinity).
%%----------------------------------------------------------------------
@@ -115,10 +89,12 @@ append_binary(Fd, Bin) ->
%% or {error, Reason}.
%%----------------------------------------------------------------------
+
pread_term(Fd, Pos) ->
{ok, Bin} = pread_binary(Fd, Pos),
{ok, binary_to_term(Bin)}.
+
%%----------------------------------------------------------------------
%% Purpose: Reads a binrary from a file that was written with append_binary
%% Args: Pos, the offset into the file where the term is serialized.
@@ -127,8 +103,26 @@ pread_term(Fd, Pos) ->
%%----------------------------------------------------------------------
pread_binary(Fd, Pos) ->
- gen_server:call(Fd, {pread_bin, Pos}, infinity).
-
+ {ok, L} = pread_iolist(Fd, Pos),
+ {ok, iolist_to_binary(L)}.
+
+pread_iolist(Fd, Pos) ->
+ {ok, LenIolist, NextPos} =read_raw_iolist(Fd, Pos, 4),
+ <<Len:32/integer>> = iolist_to_binary(LenIolist),
+ {ok, Iolist, _} = read_raw_iolist(Fd, NextPos, Len),
+ {ok, Iolist}.
+
+read_raw_iolist(Fd, Pos, Len) ->
+ BlockOffset = Pos rem ?SIZE_BLOCK,
+ TotalBytes = calculate_total_read_len(BlockOffset, Len),
+ {ok, <<RawBin:TotalBytes/binary>>, HasPrefixes} = gen_server:call(Fd, {pread, Pos, TotalBytes}, infinity),
+ if HasPrefixes ->
+ {ok, remove_block_prefixes(BlockOffset, RawBin), Pos + TotalBytes};
+ true ->
+ % 09 UPGRADE CODE
+ <<ReturnBin:Len/binary, _/binary>> = RawBin,
+ {ok, [ReturnBin], Pos + Len}
+ end.
%%----------------------------------------------------------------------
%% Purpose: The length of a file, in bytes.
@@ -167,35 +161,153 @@ close(Fd) ->
catch unlink(Fd),
Result.
+% 09 UPGRADE CODE
+old_pread(Fd, Pos, Len) ->
+ {ok, <<RawBin:Len/binary>>, false} = gen_server:call(Fd, {pread, Pos, Len}, infinity),
+ {ok, RawBin}.
-write_header(Fd, Prefix, Data) ->
- TermBin = term_to_binary(Data),
- % the size of all the bytes written to the header, including the md5 signature (16 bytes)
- FilledSize = size(Prefix) + size(TermBin) + 16,
- {TermBin2, FilledSize2} =
- case FilledSize > ?HEADER_SIZE of
+% 09 UPGRADE CODE
+upgrade_old_header(Fd, Sig) ->
+ gen_server:call(Fd, {upgrade_old_header, Sig}, infinity).
+
+
+read_header(Fd) ->
+ case gen_server:call(Fd, find_header, infinity) of
+ {ok, Bin} ->
+ {ok, binary_to_term(Bin)};
+ Else ->
+ Else
+ end.
+
+write_header(Fd, Data) ->
+ Bin = term_to_binary(Data),
+ Md5 = erlang:md5(Bin),
+ % now we assemble the final header binary and write to disk
+ FinalBin = <<Md5/binary, Bin/binary>>,
+ gen_server:call(Fd, {write_header, FinalBin}, infinity).
+
+
+
+
+init_status_error(ReturnPid, Ref, Error) ->
+ ReturnPid ! {Ref, self(), Error},
+ ignore.
+
+% server functions
+
+init({Filepath, Options, ReturnPid, Ref}) ->
+ case lists:member(create, Options) of
true ->
- % too big!
- {ok, Pos} = append_binary(Fd, TermBin),
- PtrBin = term_to_binary({pointer_to_header_data, Pos}),
- {PtrBin, size(Prefix) + size(PtrBin) + 16};
+ filelib:ensure_dir(Filepath),
+ case file:open(Filepath, [read, write, raw, binary]) of
+ {ok, Fd} ->
+ {ok, Length} = file:position(Fd, eof),
+ case Length > 0 of
+ true ->
+ % this means the file already exists and has data.
+ % FYI: We don't differentiate between empty files and non-existant
+ % files here.
+ case lists:member(overwrite, Options) of
+ true ->
+ {ok, 0} = file:position(Fd, 0),
+ ok = file:truncate(Fd),
+ ok = file:sync(Fd),
+ couch_stats_collector:track_process_count(
+ {couchdb, open_os_files}),
+ {ok, #file{fd=Fd}};
+ false ->
+ ok = file:close(Fd),
+ init_status_error(ReturnPid, Ref, file_exists)
+ end;
+ false ->
+ couch_stats_collector:track_process_count(
+ {couchdb, open_os_files}),
+ {ok, #file{fd=Fd}}
+ end;
+ Error ->
+ init_status_error(ReturnPid, Ref, Error)
+ end;
false ->
- {TermBin, FilledSize}
+ % open in read mode first, so we don't create the file if it doesn't exist.
+ case file:open(Filepath, [read, raw]) of
+ {ok, Fd_Read} ->
+ {ok, Fd} = file:open(Filepath, [read, write, raw, binary]),
+ ok = file:close(Fd_Read),
+ couch_stats_collector:track_process_count({couchdb, open_os_files}),
+ {ok, #file{fd=Fd}};
+ Error ->
+ init_status_error(ReturnPid, Ref, Error)
+ end
+ end.
+
+
+terminate(_Reason, _Fd) ->
+ ok.
+
+
+handle_call({pread, Pos, Bytes}, _From, #file{fd=Fd,tail_append_begin=TailAppendBegin}=File) ->
+ {ok, Bin} = file:pread(Fd, Pos, Bytes),
+ {reply, {ok, Bin, Pos >= TailAppendBegin}, File};
+handle_call(bytes, _From, #file{fd=Fd}=File) ->
+ {reply, file:position(Fd, eof), File};
+handle_call(sync, _From, #file{fd=Fd}=File) ->
+ {reply, file:sync(Fd), File};
+handle_call({truncate, Pos}, _From, #file{fd=Fd}=File) ->
+ {ok, Pos} = file:position(Fd, Pos),
+ {reply, file:truncate(Fd), File};
+handle_call({append_bin, Bin}, _From, #file{fd=Fd}=File) ->
+ {ok, Pos} = file:position(Fd, eof),
+ Blocks = make_blocks(Pos rem ?SIZE_BLOCK, Bin),
+ case file:pwrite(Fd, Pos, Blocks) of
+ ok ->
+ {reply, {ok, Pos}, File};
+ Error ->
+ {reply, Error, File}
+ end;
+handle_call({write_header, Bin}, _From, #file{fd=Fd}=File) ->
+ {ok, Pos} = file:position(Fd, eof),
+ BinSize = size(Bin),
+ case Pos rem ?SIZE_BLOCK of
+ 0 ->
+ Padding = <<>>;
+ BlockOffset ->
+ Padding = <<0:(8*(?SIZE_BLOCK-BlockOffset))>>
end,
- ok = sync(Fd),
- % pad out the header with zeros, then take the md5 hash
- PadZeros = <<0:(8*(?HEADER_SIZE - FilledSize2))>>,
- Sig = erlang:md5([TermBin2, PadZeros]),
- % now we assemble the final header binary and write to disk
- WriteBin = <<Prefix/binary, TermBin2/binary, PadZeros/binary, Sig/binary>>,
- ?HEADER_SIZE = size(WriteBin), % sanity check
- DblWriteBin = [WriteBin, WriteBin],
- ok = pwrite(Fd, 0, DblWriteBin),
- ok = sync(Fd).
+ FinalBin = [Padding, <<1, BinSize:32/integer>> | make_blocks(1, [Bin])],
+ {reply, file:pwrite(Fd, Pos, FinalBin), File};
+
+
+handle_call({upgrade_old_header, Prefix}, _From, #file{fd=Fd}=File) ->
+ case (catch read_old_header(Fd, Prefix)) of
+ {ok, Header} ->
+ {ok, TailAppendBegin} = file:position(Fd, eof),
+ Bin = term_to_binary(Header),
+ Md5 = erlang:md5(Bin),
+ % now we assemble the final header binary and write to disk
+ FinalBin = <<Md5/binary, Bin/binary>>,
+ {reply, ok, _} = handle_call({write_header, FinalBin}, ok, File),
+ ok = write_old_header(Fd, <<"upgraded">>, TailAppendBegin),
+ {reply, ok, File#file{tail_append_begin=TailAppendBegin}};
+ _Error ->
+ case (catch read_old_header(Fd, <<"upgraded">>)) of
+ {ok, TailAppendBegin} ->
+ {reply, ok, File#file{tail_append_begin = TailAppendBegin}};
+ _Error2 ->
+ {reply, ok, File}
+ end
+ end;
+
+handle_call(find_header, _From, #file{fd=Fd}=File) ->
+ {ok, Pos} = file:position(Fd, eof),
+ {reply, find_header(Fd, Pos div ?SIZE_BLOCK), File}.
+
+% 09 UPGRADE CODE
+-define(HEADER_SIZE, 2048). % size of each segment of the doubly written header
-read_header(Fd, Prefix) ->
- {ok, Bin} = couch_file:pread(Fd, 0, 2*(?HEADER_SIZE)),
+% 09 UPGRADE CODE
+read_old_header(Fd, Prefix) ->
+ {ok, Bin} = file:pread(Fd, 0, 2*(?HEADER_SIZE)),
<<Bin1:(?HEADER_SIZE)/binary, Bin2:(?HEADER_SIZE)/binary>> = Bin,
Result =
% read the first header
@@ -238,6 +350,7 @@ read_header(Fd, Prefix) ->
Result
end.
+% 09 UPGRADE CODE
extract_header(Prefix, Bin) ->
SizeOfPrefix = size(Prefix),
SizeOfTermBin = ?HEADER_SIZE -
@@ -260,88 +373,35 @@ extract_header(Prefix, Bin) ->
_ ->
unknown_header_type
end.
+
-
-init_status_error(ReturnPid, Ref, Error) ->
- ReturnPid ! {Ref, self(), Error},
- ignore.
-
-% server functions
-
-init({Filepath, Options, ReturnPid, Ref}) ->
- case lists:member(create, Options) of
+% 09 UPGRADE CODE
+write_old_header(Fd, Prefix, Data) ->
+ TermBin = term_to_binary(Data),
+ % the size of all the bytes written to the header, including the md5 signature (16 bytes)
+ FilledSize = size(Prefix) + size(TermBin) + 16,
+ {TermBin2, FilledSize2} =
+ case FilledSize > ?HEADER_SIZE of
true ->
- filelib:ensure_dir(Filepath),
- case file:open(Filepath, [read, write, raw, binary]) of
- {ok, Fd} ->
- {ok, Length} = file:position(Fd, eof),
- case Length > 0 of
- true ->
- % this means the file already exists and has data.
- % FYI: We don't differentiate between empty files and non-existant
- % files here.
- case lists:member(overwrite, Options) of
- true ->
- {ok, 0} = file:position(Fd, 0),
- ok = file:truncate(Fd),
- couch_stats_collector:track_process_count(
- {couchdb, open_os_files}),
- {ok, Fd};
- false ->
- ok = file:close(Fd),
- init_status_error(ReturnPid, Ref, file_exists)
- end;
- false ->
- couch_stats_collector:track_process_count(
- {couchdb, open_os_files}),
- {ok, Fd}
- end;
- Error ->
- init_status_error(ReturnPid, Ref, Error)
- end;
+ % too big!
+ {ok, Pos} = append_binary(Fd, TermBin),
+ PtrBin = term_to_binary({pointer_to_header_data, Pos}),
+ {PtrBin, size(Prefix) + size(PtrBin) + 16};
false ->
- % open in read mode first, so we don't create the file if it doesn't exist.
- case file:open(Filepath, [read, raw]) of
- {ok, Fd_Read} ->
- {ok, Fd} = file:open(Filepath, [read, write, raw, binary]),
- ok = file:close(Fd_Read),
- couch_stats_collector:track_process_count({couchdb, open_os_files}),
- {ok, Fd};
- Error ->
- init_status_error(ReturnPid, Ref, Error)
- end
- end.
-
-
-terminate(_Reason, _Fd) ->
- ok.
-
-
-handle_call({pread, Pos, Bytes}, _From, Fd) ->
- {reply, file:pread(Fd, Pos, Bytes), Fd};
-handle_call({pwrite, Pos, Bin}, _From, Fd) ->
- {reply, file:pwrite(Fd, Pos, Bin), Fd};
-handle_call({expand, Num}, _From, Fd) ->
- {ok, Pos} = file:position(Fd, eof),
- {reply, {file:pwrite(Fd, Pos + Num - 1, <<0>>), Pos}, Fd};
-handle_call(bytes, _From, Fd) ->
- {reply, file:position(Fd, eof), Fd};
-handle_call(sync, _From, Fd) ->
- {reply, file:sync(Fd), Fd};
-handle_call({truncate, Pos}, _From, Fd) ->
- {ok, Pos} = file:position(Fd, Pos),
- {reply, file:truncate(Fd), Fd};
-handle_call({append_bin, Bin}, _From, Fd) ->
- Len = size(Bin),
- Bin2 = <<Len:32, Bin/binary>>,
- {ok, Pos} = file:position(Fd, eof),
- {reply, {file:pwrite(Fd, Pos, Bin2), Pos}, Fd};
-handle_call({pread_bin, Pos}, _From, Fd) ->
- {ok, <<TermLen:32>>} = file:pread(Fd, Pos, 4),
- {ok, Bin} = file:pread(Fd, Pos + 4, TermLen),
- {reply, {ok, Bin}, Fd}.
-
+ {TermBin, FilledSize}
+ end,
+ ok = file:sync(Fd),
+ % pad out the header with zeros, then take the md5 hash
+ PadZeros = <<0:(8*(?HEADER_SIZE - FilledSize2))>>,
+ Sig = erlang:md5([TermBin2, PadZeros]),
+ % now we assemble the final header binary and write to disk
+ WriteBin = <<Prefix/binary, TermBin2/binary, PadZeros/binary, Sig/binary>>,
+ ?HEADER_SIZE = size(WriteBin), % sanity check
+ DblWriteBin = [WriteBin, WriteBin],
+ ok = file:pwrite(Fd, 0, DblWriteBin),
+ ok = file:sync(Fd).
+
handle_cast(close, Fd) ->
{stop,normal,Fd}.
@@ -351,3 +411,82 @@ code_change(_OldVsn, State, _Extra) ->
handle_info({'EXIT', _, Reason}, Fd) ->
{stop, Reason, Fd}.
+
+
+find_header(_Fd, -1) ->
+ no_valid_header;
+find_header(Fd, Block) ->
+ case (catch load_header(Fd, Block)) of
+ {ok, Bin} ->
+ {ok, Bin};
+ _Error ->
+ find_header(Fd, Block -1)
+ end.
+
+load_header(Fd, Block) ->
+ {ok, <<1>>} = file:pread(Fd, Block*?SIZE_BLOCK, 1),
+ {ok, <<HeaderLen:32/integer>>} = file:pread(Fd, (Block*?SIZE_BLOCK) + 1, 4),
+ TotalBytes = calculate_total_read_len(1, HeaderLen),
+ {ok, <<RawBin:TotalBytes/binary>>} =
+ file:pread(Fd, (Block*?SIZE_BLOCK) + 5, TotalBytes),
+ <<Md5Sig:16/binary, HeaderBin/binary>> =
+ iolist_to_binary(remove_block_prefixes(1, RawBin)),
+ Md5Sig = erlang:md5(HeaderBin),
+ {ok, HeaderBin}.
+
+calculate_total_read_len(0, FinalLen) ->
+ calculate_total_read_len(1, FinalLen) + 1;
+calculate_total_read_len(BlockOffset, FinalLen) ->
+ case ?SIZE_BLOCK - BlockOffset of
+ BlockLeft when BlockLeft >= FinalLen ->
+ FinalLen;
+ BlockLeft ->
+ FinalLen + ((FinalLen - BlockLeft) div (?SIZE_BLOCK -1)) +
+ if ((FinalLen - BlockLeft) rem (?SIZE_BLOCK -1)) == 0 -> 0;
+ true -> 1 end
+ end.
+
+remove_block_prefixes(_BlockOffset, <<>>) ->
+ [];
+remove_block_prefixes(0, <<_BlockPrefix,Rest/binary>>) ->
+ remove_block_prefixes(1, Rest);
+remove_block_prefixes(BlockOffset, Bin) ->
+ BlockBytesAvailable = ?SIZE_BLOCK - BlockOffset,
+ case size(Bin) of
+ Size when Size > BlockBytesAvailable ->
+ <<DataBlock:BlockBytesAvailable/binary,Rest/binary>> = Bin,
+ [DataBlock | remove_block_prefixes(0, Rest)];
+ _Size ->
+ [Bin]
+ end.
+
+make_blocks(_BlockOffset, []) ->
+ [];
+make_blocks(0, IoList) ->
+ [<<0>> | make_blocks(1, IoList)];
+make_blocks(BlockOffset, IoList) ->
+ case split_iolist(IoList, (?SIZE_BLOCK - BlockOffset), []) of
+ {Begin, End} ->
+ [Begin | make_blocks(0, End)];
+ _Size ->
+ IoList
+ end.
+
+split_iolist(List, 0, BeginAcc) ->
+ {lists:reverse(BeginAcc), List};
+split_iolist([], SplitAt, _BeginAcc) ->
+ SplitAt;
+split_iolist([<<Bin/binary>> | Rest], SplitAt, BeginAcc) when SplitAt > size(Bin) ->
+ split_iolist(Rest, SplitAt - size(Bin), [Bin | BeginAcc]);
+split_iolist([<<Bin/binary>> | Rest], SplitAt, BeginAcc) ->
+ <<Begin:SplitAt/binary,End/binary>> = Bin,
+ split_iolist([End | Rest], 0, [Begin | BeginAcc]);
+split_iolist([Sublist| Rest], SplitAt, BeginAcc) when is_list(Sublist) ->
+ case split_iolist(Sublist, SplitAt, BeginAcc) of
+ {Begin, End} ->
+ {Begin, [End | Rest]};
+ Len ->
+ split_iolist(Rest, SplitAt - Len, [Sublist | BeginAcc])
+ end;
+split_iolist([Byte | Rest], SplitAt, BeginAcc) when is_integer(Byte) ->
+ split_iolist(Rest, SplitAt - 1, [Byte | BeginAcc]).
diff --git a/src/couchdb/couch_httpd_db.erl b/src/couchdb/couch_httpd_db.erl
index 03cba11e..cb8c205f 100644
--- a/src/couchdb/couch_httpd_db.erl
+++ b/src/couchdb/couch_httpd_db.erl
@@ -723,12 +723,7 @@ db_attachment_req(#httpd{method='GET'}=Req, Db, DocId, FileNameParts) ->
% {"Content-Length", integer_to_list(couch_doc:bin_size(Bin))}
]),
couch_doc:bin_foldl(Bin,
- fun(BinSegment, []) ->
- send_chunk(Resp, BinSegment),
- {ok, []}
- end,
- []
- ),
+ fun(BinSegment, _) -> send_chunk(Resp, BinSegment) end,[]),
send_chunk(Resp, "")
end;
diff --git a/src/couchdb/couch_stream.erl b/src/couchdb/couch_stream.erl
index d6f72696..bf9fd3c2 100644
--- a/src/couchdb/couch_stream.erl
+++ b/src/couchdb/couch_stream.erl
@@ -13,14 +13,6 @@
-module(couch_stream).
-behaviour(gen_server).
--export([test/1]).
--export([open/1, open/2, close/1, read/3, read_term/2, write/2, write_term/2, get_state/1, foldl/5]).
--export([copy/4, copy_to_new_stream/4]).
--export([ensure_buffer/2, set_min_buffer/2]).
--export([init/1, terminate/2, handle_call/3]).
--export([handle_cast/2,code_change/3,handle_info/2]).
-
--include("couch_db.hrl").
-define(FILE_POINTER_BYTES, 8).
-define(FILE_POINTER_BITS, 8*(?FILE_POINTER_BYTES)).
@@ -32,125 +24,111 @@
-define(DEFAULT_STREAM_CHUNK, 16#00100000). % 1 meg chunks when streaming data
+-export([test/0]).
+-export([open/1, close/1, write/2, foldl/4, old_foldl/5,old_copy_to_new_stream/4]).
+-export([copy_to_new_stream/3,old_read_term/2]).
+-export([init/1, terminate/2, handle_call/3]).
+-export([handle_cast/2,code_change/3,handle_info/2]).
--record(write_stream,
- {fd = 0,
- current_pos = 0,
- bytes_remaining = 0,
- next_alloc = 0,
- min_alloc = 16#00010000
- }).
+-include("couch_db.hrl").
-record(stream,
- {
- pid,
- fd
+ {fd = 0,
+ written_pointers=[],
+ buffer_list = [],
+ buffer_len = 0,
+ max_buffer = 4096,
+ written_len = 0
}).
%%% Interface functions %%%
open(Fd) ->
- open(nil, Fd).
+ gen_server:start_link(couch_stream, Fd, []).
-open(nil, Fd) ->
- open({0,0}, Fd);
-open(State, Fd) ->
- {ok, Pid} = gen_server:start_link(couch_stream, {State, Fd}, []),
- {ok, #stream{pid = Pid, fd = Fd}}.
-
-close(#stream{pid = Pid, fd = _Fd}) ->
+close(Pid) ->
gen_server:call(Pid, close, infinity).
-get_state(#stream{pid = Pid, fd = _Fd}) ->
- gen_server:call(Pid, get_state, infinity).
-
-ensure_buffer(#stream{pid = Pid, fd = _Fd}, Bytes) ->
- gen_server:call(Pid, {ensure_buffer, Bytes}).
-
-set_min_buffer(#stream{pid = Pid, fd = _Fd}, Bytes) ->
- gen_server:call(Pid, {set_min_buffer, Bytes}).
-
-read(#stream{pid = _Pid, fd = Fd}, Sp, Num) ->
- read(Fd, Sp, Num);
-read(Fd, Sp, Num) ->
- {ok, RevBin, Sp2} = stream_data(Fd, Sp, Num, ?HUGE_CHUNK, fun(Bin, Acc) -> {ok, [Bin | Acc]} end, []),
- Bin = list_to_binary(lists:reverse(RevBin)),
- {ok, Bin, Sp2}.
-
-copy_to_new_stream(Src, Sp, Len, DestFd) ->
+copy_to_new_stream(Fd, PosList, DestFd) ->
{ok, Dest} = open(DestFd),
- ok = set_min_buffer(Dest, 0),
- {ok, NewSp} = copy(Src, Sp, Len, Dest),
- close(Dest),
- {ok, NewSp}.
-
-copy(#stream{pid = _Pid, fd = Fd}, Sp, Len, DestStream) ->
- copy(Fd, Sp, Len, DestStream);
-copy(Fd, Sp, Len, DestStream) ->
- {ok, NewSp, _Sp2} = stream_data(Fd, Sp, Len, ?HUGE_CHUNK,
- fun(Bin, AccPointer) ->
- {ok, NewPointer} = write(DestStream, Bin),
- {ok, if AccPointer == null -> NewPointer; true -> AccPointer end}
- end,
- null),
- {ok, NewSp}.
-
-foldl(#stream{pid = _Pid, fd = Fd}, Sp, Num, Fun, Acc) ->
- foldl(Fd, Sp, Num, Fun, Acc);
-foldl(Fd, Sp, Num, Fun, Acc) ->
- {ok, _Acc, _Sp} = stream_data(Fd, Sp, Num, ?DEFAULT_STREAM_CHUNK, Fun, Acc).
-
-read_term(#stream{pid = _Pid, fd = Fd}, Sp) ->
- read_term(Fd, Sp);
-read_term(Fd, Sp) ->
- {ok, <<TermLen:(?STREAM_OFFSET_BITS)>>, Sp2}
- = read(Fd, Sp, ?STREAM_OFFSET_BYTES),
- {ok, Bin, _Sp3} = read(Fd, Sp2, TermLen),
- {ok, binary_to_term(Bin)}.
+ foldl(Fd, PosList,
+ fun(Bin, _) ->
+ ok = write(Dest, Bin)
+ end, ok),
+ close(Dest).
-write_term(Stream, Term) ->
- Bin = term_to_binary(Term),
- Size = size(Bin),
- Bin2 = <<Size:(?STREAM_OFFSET_BITS), Bin/binary>>,
- write(Stream, Bin2).
-write(#stream{}, <<>>) ->
- {ok, {0,0}};
-write(#stream{pid = Pid}, Bin) when is_binary(Bin) ->
+% 09 UPGRADE CODE
+old_copy_to_new_stream(Fd, Pos, Len, DestFd) ->
+ {ok, Dest} = open(DestFd),
+ old_foldl(Fd, Pos, Len,
+ fun(Bin, _) ->
+ ok = write(Dest, Bin)
+ end, ok),
+ close(Dest).
+
+% 09 UPGRADE CODE
+old_foldl(_Fd, null, 0, _Fun, Acc) ->
+ Acc;
+old_foldl(Fd, OldPointer, Len, Fun, Acc) when is_tuple(OldPointer)->
+ old_stream_data(Fd, OldPointer, Len, ?DEFAULT_STREAM_CHUNK, Fun, Acc).
+
+foldl(_Fd, [], _Fun, Acc) ->
+ Acc;
+foldl(Fd, [Pos|Rest], Fun, Acc) ->
+ {ok, Bin} = couch_file:pread_iolist(Fd, Pos),
+ foldl(Fd, Rest, Fun, Fun(Bin, Acc)).
+
+write(_Pid, <<>>) ->
+ ok;
+write(Pid, Bin) ->
gen_server:call(Pid, {write, Bin}, infinity).
-init({{Pos, BytesRemaining}, Fd}) ->
- {ok, #write_stream
- {fd = Fd,
- current_pos = Pos,
- bytes_remaining = BytesRemaining
- }}.
+init(Fd) ->
+ {ok, #stream{fd = Fd}}.
terminate(_Reason, _Stream) ->
ok.
-handle_call(get_state, _From, Stream) ->
- #write_stream{current_pos = Pos, bytes_remaining = BytesRemaining} = Stream,
- {reply, {Pos, BytesRemaining}, Stream};
-handle_call({set_min_buffer, MinBuffer}, _From, Stream) ->
- {reply, ok, Stream#write_stream{min_alloc = MinBuffer}};
-% set next_alloc if we need more room
-handle_call({ensure_buffer, BufferSizeRequested}, _From, Stream) ->
- #write_stream{bytes_remaining = BytesRemainingInCurrentBuffer} = Stream,
- case BytesRemainingInCurrentBuffer < BufferSizeRequested of
- true -> NextAlloc = BufferSizeRequested - BytesRemainingInCurrentBuffer;
- false -> NextAlloc = 0 % enough room in current segment
- end,
- {reply, ok, Stream#write_stream{next_alloc = NextAlloc}};
handle_call({write, Bin}, _From, Stream) ->
- % ensure init is called first so we can get a pointer to the begining of the binary
- {ok, Sp, Stream2} = write_data(Stream, Bin),
- {reply, {ok, Sp}, Stream2};
+ BinSize = iolist_size(Bin),
+ #stream{
+ fd = Fd,
+ written_len = WrittenLen,
+ written_pointers = Written,
+ buffer_len = BufferLen,
+ buffer_list = Buffer,
+ max_buffer = Max} = Stream,
+ if BinSize + BufferLen > Max ->
+ {ok, Pos} = couch_file:append_binary(Fd, lists:reverse(Buffer, Bin)),
+ {reply, ok, Stream#stream{
+ written_len=WrittenLen + BufferLen + BinSize,
+ written_pointers=[Pos|Written],
+ buffer_list=[],
+ buffer_len=0}};
+ true ->
+ {reply, ok, Stream#stream{
+ buffer_list=[Bin|Buffer],
+ buffer_len=BufferLen + BinSize}}
+ end;
handle_call(close, _From, Stream) ->
- #write_stream{current_pos=Pos, bytes_remaining = BytesRemaining} = Stream,
- {stop, normal, {ok, {Pos, BytesRemaining}}, Stream}.
+ #stream{
+ fd = Fd,
+ written_len = WrittenLen,
+ written_pointers = Written,
+ buffer_len = BufferLen,
+ buffer_list = Buffer} = Stream,
+
+ case Buffer of
+ [] ->
+ Result = {Written, WrittenLen};
+ _ ->
+ {ok, Pos} = couch_file:append_binary(Fd, lists:reverse(Buffer)),
+ Result = {[Pos|Written], WrittenLen + BufferLen}
+ end,
+ {stop, normal, Result, Stream}.
handle_cast(_Msg, State) ->
{noreply,State}.
@@ -160,14 +138,27 @@ code_change(_OldVsn, State, _Extra) ->
handle_info(_Info, State) ->
{noreply, State}.
+
+
-%%% Internal function %%%
+% 09 UPGRADE CODE
+old_read_term(Fd, Sp) ->
+ {ok, <<TermLen:(?STREAM_OFFSET_BITS)>>, Sp2}
+ = old_read(Fd, Sp, ?STREAM_OFFSET_BYTES),
+ {ok, Bin, _Sp3} = old_read(Fd, Sp2, TermLen),
+ {ok, binary_to_term(Bin)}.
-stream_data(_Fd, Sp, 0, _MaxChunk, _Fun, Acc) ->
+old_read(Fd, Sp, Num) ->
+ {ok, RevBin, Sp2} = old_stream_data(Fd, Sp, Num, ?HUGE_CHUNK, fun(Bin, Acc) -> [Bin | Acc] end, []),
+ Bin = list_to_binary(lists:reverse(RevBin)),
+ {ok, Bin, Sp2}.
+
+% 09 UPGRADE CODE
+old_stream_data(_Fd, Sp, 0, _MaxChunk, _Fun, Acc) ->
{ok, Acc, Sp};
-stream_data(Fd, {Pos, 0}, Num, MaxChunk, Fun, Acc) ->
+old_stream_data(Fd, {Pos, 0}, Num, MaxChunk, Fun, Acc) ->
{ok, <<NextPos:(?FILE_POINTER_BITS), NextOffset:(?STREAM_OFFSET_BITS)>>}
- = couch_file:pread(Fd, Pos, ?FILE_POINTER_BYTES + ?STREAM_OFFSET_BYTES),
+ = couch_file:old_pread(Fd, Pos, ?FILE_POINTER_BYTES + ?STREAM_OFFSET_BYTES),
Sp = {NextPos, NextOffset},
% Check NextPos is past current Pos (this is always true in a stream)
% Guards against potential infinite loops caused by corruption.
@@ -175,86 +166,47 @@ stream_data(Fd, {Pos, 0}, Num, MaxChunk, Fun, Acc) ->
true -> ok;
false -> throw({error, stream_corruption})
end,
- stream_data(Fd, Sp, Num, MaxChunk, Fun, Acc);
-stream_data(Fd, {Pos, Offset}, Num, MaxChunk, Fun, Acc) ->
+ old_stream_data(Fd, Sp, Num, MaxChunk, Fun, Acc);
+old_stream_data(Fd, {Pos, Offset}, Num, MaxChunk, Fun, Acc) ->
ReadAmount = lists:min([MaxChunk, Num, Offset]),
- {ok, Bin} = couch_file:pread(Fd, Pos, ReadAmount),
+ {ok, Bin} = couch_file:old_pread(Fd, Pos, ReadAmount),
Sp = {Pos + ReadAmount, Offset - ReadAmount},
- case Fun(Bin, Acc) of
- {ok, Acc2} ->
- stream_data(Fd, Sp, Num - ReadAmount, MaxChunk, Fun, Acc2);
- {stop, Acc2} ->
- {ok, Acc2, Sp}
- end.
-
-write_data(Stream, <<>>) ->
- {ok, {0,0}, Stream};
-write_data(#write_stream{bytes_remaining=0} = Stream, Bin) ->
- #write_stream {
- fd = Fd,
- current_pos = CurrentPos,
- next_alloc = NextAlloc,
- min_alloc = MinAlloc
- }= Stream,
-
- NewSize = lists:max([MinAlloc, NextAlloc, size(Bin)]),
- % no space in the current segment, must alloc a new segment
- {ok, NewPos} = couch_file:expand(Fd, NewSize + ?FILE_POINTER_BYTES + ?STREAM_OFFSET_BYTES),
-
- case CurrentPos of
- 0 ->
- ok;
- _ ->
- ok = couch_file:pwrite(Fd, CurrentPos, <<NewPos:(?FILE_POINTER_BITS), NewSize:(?STREAM_OFFSET_BITS)>>)
- end,
- Stream2 = Stream#write_stream{
- current_pos=NewPos,
- bytes_remaining=NewSize,
- next_alloc=0},
- write_data(Stream2, Bin);
-write_data(#write_stream{fd=Fd, current_pos=Pos, bytes_remaining=BytesRemaining} = Stream, Bin) ->
- BytesToWrite = lists:min([size(Bin), BytesRemaining]),
- {WriteBin, Rest} = split_binary(Bin, BytesToWrite),
- ok = couch_file:pwrite(Fd, Pos, WriteBin),
- Stream2 = Stream#write_stream{
- bytes_remaining=BytesRemaining - BytesToWrite,
- current_pos=Pos + BytesToWrite
- },
- {ok, _, Stream3} = write_data(Stream2, Rest),
- {ok, {Pos, BytesRemaining}, Stream3}.
+ old_stream_data(Fd, Sp, Num - ReadAmount, MaxChunk, Fun, Fun(Bin, Acc)).
%%% Tests %%%
-
-test(Term) ->
- {ok, Fd} = couch_file:open("foo", [write]),
- {ok, Stream} = open({0,0}, Fd),
- {ok, Pos} = write_term(Stream, Term),
- {ok, Pos2} = write_term(Stream, {Term, Term}),
- close(Stream),
+read_all(Fd, PosList) ->
+ iolist_to_binary(foldl(Fd, PosList,
+ fun(Bin, Acc) ->
+ [Bin, Acc]
+ end, [])).
+
+
+test() ->
+ {ok, Fd} = couch_file:open("foo", [create,overwrite]),
+ ok = couch_file:write_header(Fd, {howdy, howdy}),
+ Bin = <<"damienkatz">>,
+ {ok, Pos} = couch_file:append_binary(Fd, Bin),
+ {ok, Bin} = couch_file:pread_binary(Fd, Pos),
+ {ok, {howdy, howdy}} = couch_file:read_header(Fd),
+ ok = couch_file:write_header(Fd, {foo, foo}),
+ {ok, {foo, foo}} = couch_file:read_header(Fd),
+
+ {ok, Stream} = open(Fd),
+ ok = write(Stream, <<"food">>),
+ ok = write(Stream, <<"foob">>),
+ {PosList, 8} = close(Stream),
+ <<"foodfoob">> = read_all(Fd, PosList),
+ {ok, Stream2} = open(Fd),
+ OneBits = <<1:(8*10)>>,
+ ZeroBits = <<0:(8*10)>>,
+ ok = write(Stream2, OneBits),
+ ok = write(Stream2, ZeroBits),
+ {PosList2, 20} = close(Stream2),
+ AllBits = iolist_to_binary([OneBits,ZeroBits]),
+ AllBits = read_all(Fd, PosList2),
couch_file:close(Fd),
- {ok, Fd2} = couch_file:open("foo", [read, write]),
- {ok, Stream2} = open({0,0}, Fd2),
- {ok, Term1} = read_term(Fd2, Pos),
- io:format("Term1: ~w ~n",[Term1]),
- {ok, Term2} = read_term(Fd2, Pos2),
- io:format("Term2: ~w ~n",[Term2]),
- {ok, PointerList} = deep_write_test(Stream2, Term, 1000, []),
- deep_read_test(Fd2, PointerList),
- close(Stream2),
- couch_file:close(Fd2).
-
-deep_read_test(_Fd, []) ->
- ok;
-deep_read_test(Fd, [Pointer | RestPointerList]) ->
- {ok, _Term} = read_term(Fd, Pointer),
- deep_read_test(Fd, RestPointerList).
-
-deep_write_test(_Stream, _Term, 0, PointerList) ->
- {ok, PointerList};
-deep_write_test(Stream, Term, N, PointerList) ->
- WriteList = lists:duplicate(random:uniform(N), Term),
- {ok, Pointer} = write_term(Stream, WriteList),
- deep_write_test(Stream, Term, N-1, [Pointer | PointerList]).
+ PosList2.
+
diff --git a/src/couchdb/couch_view_group.erl b/src/couchdb/couch_view_group.erl
index af4ea814..28679927 100644
--- a/src/couchdb/couch_view_group.erl
+++ b/src/couchdb/couch_view_group.erl
@@ -205,7 +205,7 @@ handle_info(delayed_commit, #group_state{db_name=DbName,group=Group}=State) ->
if CommittedSeq >= Group#group.current_seq ->
% save the header
Header = {Group#group.sig, get_index_header_data(Group)},
- ok = couch_file:write_header(Group#group.fd, <<$r, $c, $k, 0>>, Header),
+ ok = couch_file:write_header(Group#group.fd, Header),
{noreply, State#group_state{waiting_commit=false}};
true ->
% We can't commit the header because the database seq that's fully
@@ -261,7 +261,7 @@ handle_info({'EXIT', FromPid, reset},
handle_info({'EXIT', _FromPid, normal}, State) ->
{noreply, State};
-handle_info({'EXIT', FromPid, {{nocatch, Reason}, Trace}}, State) ->
+handle_info({'EXIT', FromPid, {{nocatch, Reason}, _Trace}}, State) ->
?LOG_DEBUG("Uncaught throw() in linked pid: ~p", [{FromPid, Reason}]),
{stop, Reason, State};
@@ -313,7 +313,9 @@ prepare_group({view, RootDir, DbName, GroupId}, ForceReset)->
if ForceReset ->
{ok, reset_file(Db, Fd, DbName, Group)};
true ->
- case (catch couch_file:read_header(Fd, <<$r, $c, $k, 0>>)) of
+ % 09 UPGRADE CODE
+ ok = couch_file:upgrade_old_header(Fd, <<$r, $c, $k, 0>>),
+ case (catch couch_file:read_header(Fd)) of
{ok, {Sig, HeaderInfo}} ->
% sigs match!
{ok, init_group(Db, Fd, Group, HeaderInfo)};
@@ -417,7 +419,7 @@ reset_group(#group{views=Views}=Group) ->
reset_file(Db, Fd, DbName, #group{sig=Sig,name=Name} = Group) ->
?LOG_DEBUG("Reseting group index \"~s\" in db ~s", [Name, DbName]),
ok = couch_file:truncate(Fd, 0),
- ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, {Sig, nil}),
+ ok = couch_file:write_header(Fd, {Sig, nil}),
init_group(Db, Fd, reset_group(Group), nil).
delete_index_file(RootDir, DbName, GroupId) ->