summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--etc/couchdb/local_dev.ini2
-rw-r--r--share/www/script/test/basics.js2
-rw-r--r--share/www/script/test/compact.js3
-rw-r--r--src/couchdb/couch_db.erl114
-rw-r--r--src/couchdb/couch_db.hrl10
-rw-r--r--src/couchdb/couch_db_updater.erl148
-rw-r--r--src/couchdb/couch_doc.erl18
-rw-r--r--src/couchdb/couch_file.erl421
-rw-r--r--src/couchdb/couch_httpd_db.erl7
-rw-r--r--src/couchdb/couch_stream.erl316
-rw-r--r--src/couchdb/couch_view_group.erl10
11 files changed, 575 insertions, 476 deletions
diff --git a/etc/couchdb/local_dev.ini b/etc/couchdb/local_dev.ini
index 84764e45..876295b1 100644
--- a/etc/couchdb/local_dev.ini
+++ b/etc/couchdb/local_dev.ini
@@ -12,7 +12,7 @@
;bind_address = 127.0.0.1
[log]
-level = info
+level = error
[update_notification]
;unique notifier name=/full/path/to/exe -with "cmd line arg"
diff --git a/share/www/script/test/basics.js b/share/www/script/test/basics.js
index 39500629..19a7e015 100644
--- a/share/www/script/test/basics.js
+++ b/share/www/script/test/basics.js
@@ -139,7 +139,7 @@ couchTests.basics = function(debug) {
// make sure we can still open the old rev of the deleted doc
T(db.open(existingDoc._id, {rev: existingDoc._rev}) != null);
-
+ console.log("db.info: " + db.info.update_seq),
// make sure restart works
T(db.ensureFullCommit().ok);
restartServer();
diff --git a/share/www/script/test/compact.js b/share/www/script/test/compact.js
index 51a57051..a3b55d85 100644
--- a/share/www/script/test/compact.js
+++ b/share/www/script/test/compact.js
@@ -15,7 +15,7 @@ couchTests.compact = function(debug) {
db.deleteDb();
db.createDb();
if (debug) debugger;
- var docs = makeDocs(0, 10);
+ var docs = makeDocs(0, 20);
db.bulkSave(docs);
var binAttDoc = {
@@ -35,6 +35,7 @@ couchTests.compact = function(debug) {
for(var i in docs) {
db.deleteDoc(docs[i]);
}
+ T(db.ensureFullCommit().ok);
var deletesize = db.info().disk_size;
T(deletesize > originalsize);
diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl
index 29dbbd38..d0a4e34c 100644
--- a/src/couchdb/couch_db.erl
+++ b/src/couchdb/couch_db.erl
@@ -24,7 +24,7 @@
-export([increment_update_seq/1,get_purge_seq/1,purge_docs/2,get_last_purged/1]).
-export([start_link/3,open_doc_int/3,set_admins/2,get_admins/1,ensure_full_commit/1]).
-export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]).
--export([changes_since/5]).
+-export([changes_since/5,read_doc/2]).
-include("couch_db.hrl").
@@ -50,6 +50,7 @@ open_db_file(Filepath, Options) ->
{ok, Fd} ->
?LOG_INFO("Found ~s~s compaction file, using as primary storage.", [Filepath, ".compact"]),
ok = file:rename(Filepath ++ ".compact", Filepath),
+ ok = couch_file:sync(Fd),
{ok, Fd};
{error, enoent} ->
{not_found, no_db_file}
@@ -154,7 +155,7 @@ increment_update_seq(#db{update_pid=UpdatePid}) ->
purge_docs(#db{update_pid=UpdatePid}, IdsRevs) ->
gen_server:call(UpdatePid, {purge_docs, IdsRevs}).
-get_committed_update_seq(#db{header=#db_header{update_seq=Seq}}) ->
+get_committed_update_seq(#db{committed_update_seq=Seq}) ->
Seq.
get_update_seq(#db{update_seq=Seq})->
@@ -565,93 +566,55 @@ flush_binary(Fd, {Fd0, StreamPointer, Len}) when Fd0 == Fd ->
% already written to our file, nothing to write
{Fd, StreamPointer, Len};
+flush_binary(Fd, {OtherFd, StreamPointer, Len}) when is_tuple(StreamPointer) ->
+ {NewStreamData, Len} =
+ couch_stream:old_copy_to_new_stream(OtherFd, StreamPointer, Len, Fd),
+ {Fd, NewStreamData, Len};
+
flush_binary(Fd, {OtherFd, StreamPointer, Len}) ->
- with_stream(Fd, fun(OutputStream) ->
- % written to a different file (or a closed file
- % instance, which will cause an error)
- ok = couch_stream:set_min_buffer(OutputStream, Len),
- {ok, {NewStreamPointer, Len}, _EndSp} =
- couch_stream:foldl(OtherFd, StreamPointer, Len,
- fun(Bin, {BeginPointer, SizeAcc}) ->
- {ok, Pointer} = couch_stream:write(OutputStream, Bin),
- case SizeAcc of
- 0 -> % this was the first write, record the pointer
- {ok, {Pointer, size(Bin)}};
- _ ->
- {ok, {BeginPointer, SizeAcc + size(Bin)}}
- end
- end,
- {{0,0}, 0}),
- {Fd, NewStreamPointer, Len}
- end);
+ {NewStreamData, Len} =
+ couch_stream:copy_to_new_stream(OtherFd, StreamPointer, Fd),
+ {Fd, NewStreamData, Len};
flush_binary(Fd, Bin) when is_binary(Bin) ->
- with_stream(Fd, fun(OutputStream) ->
- ok = couch_stream:set_min_buffer(OutputStream, size(Bin)),
- {ok, StreamPointer} = couch_stream:write(OutputStream, Bin),
- {Fd, StreamPointer, size(Bin)}
+ with_stream(Fd, fun(OutputStream) ->
+ couch_stream:write(OutputStream, Bin)
end);
flush_binary(Fd, {StreamFun, undefined}) when is_function(StreamFun) ->
- % max_attachment_chunk_size control the max we buffer in memory
- MaxChunkSize = list_to_integer(couch_config:get("couchdb",
- "max_attachment_chunk_size","4294967296")),
with_stream(Fd, fun(OutputStream) ->
% StreamFun(MaxChunkSize, WriterFun) must call WriterFun
- % once for each chunk of the attachment.
- WriterFun = make_writer_fun(OutputStream),
- {ok, {TotalLength, NewStreamPointer}} =
- StreamFun(MaxChunkSize, WriterFun, {0, nil}),
- {Fd, NewStreamPointer, TotalLength}
+ % once for each chunk of the attachment,
+ StreamFun(4096,
+ % WriterFun({Length, Binary}, State)
+ % WriterFun({0, _Footers}, State)
+ % Called with Length == 0 on the last time.
+ % WriterFun returns NewState.
+ fun({0, _Footers}, _) ->
+ ok;
+ ({_Length, Bin}, _) ->
+ couch_stream:write(OutputStream, Bin)
+ end, ok)
end);
flush_binary(Fd, {Fun, Len}) when is_function(Fun) ->
with_stream(Fd, fun(OutputStream) ->
- ok = couch_stream:set_min_buffer(OutputStream, Len),
- {ok, StreamPointer} =
- write_streamed_attachment(OutputStream, Fun, Len, nil),
- {Fd, StreamPointer, Len}
+ write_streamed_attachment(OutputStream, Fun, Len)
end).
with_stream(Fd, Fun) ->
{ok, OutputStream} = couch_stream:open(Fd),
- Result = Fun(OutputStream),
- couch_stream:close(OutputStream),
- Result.
+ Fun(OutputStream),
+ {StreamInfo, Len} = couch_stream:close(OutputStream),
+ {Fd, StreamInfo, Len}.
-make_writer_fun(Stream) ->
- % WriterFun({Length, Binary}, State)
- % WriterFun({0, _Footers}, State)
- % Called with Length == 0 on the last time.
- % WriterFun returns NewState.
- fun
- ({0, _Footers}, {FinalLen, SpFin}) ->
- % last block, return the final tuple
- {ok, {FinalLen, SpFin}};
- ({Length, Bin}, {Total, nil}) ->
- % save StreamPointer
- ok = couch_stream:set_min_buffer(Stream, Length),
- {ok, StreamPointer} = couch_stream:write(Stream, Bin),
- {Total+Length, StreamPointer};
- ({Length, Bin}, {Total, SpAcc}) ->
- % write the Bin to disk
- ok = couch_stream:set_min_buffer(Stream, Length),
- {ok, _Sp} = couch_stream:write(Stream, Bin),
- {Total+Length, SpAcc}
- end.
-write_streamed_attachment(_Stream, _F, 0, SpAcc) ->
- {ok, SpAcc};
-write_streamed_attachment(Stream, F, LenLeft, nil) ->
- Bin = F(),
- TruncatedBin = check_bin_length(LenLeft, Bin),
- {ok, SpAcc} = couch_stream:write(Stream, TruncatedBin),
- write_streamed_attachment(Stream, F, LenLeft - size(TruncatedBin), SpAcc);
-write_streamed_attachment(Stream, F, LenLeft, SpAcc) ->
+write_streamed_attachment(_Stream, _F, 0) ->
+ ok;
+write_streamed_attachment(Stream, F, LenLeft) ->
Bin = F(),
- TruncatedBin = check_bin_length(LenLeft, Bin),
- {ok, _} = couch_stream:write(Stream, TruncatedBin),
- write_streamed_attachment(Stream, F, LenLeft - size(TruncatedBin), SpAcc).
+ ok = couch_stream:write(Stream, check_bin_length(LenLeft, Bin)),
+ write_streamed_attachment(Stream, F, LenLeft - size(Bin)).
%% on rare occasions ibrowse seems to process a chunked response incorrectly
%% and include an extra "\r" in the last chunk. This code ensures that we
@@ -857,6 +820,12 @@ doc_meta_info(#doc_info{high_seq=Seq,revs=[#rev_info{rev=Rev}|RestInfo]}, RevTre
true -> [{local_seq, Seq}]
end.
+read_doc(Fd, Pos) when is_integer(Pos) ->
+ couch_file:pread_term(Fd, Pos);
+read_doc(Fd, OldStyleStreamPointer) ->
+ % 09 UPGRADE CODE
+ couch_stream:old_read_term(Fd, OldStyleStreamPointer).
+
doc_to_tree(#doc{revs={Start, RevIds}}=Doc) ->
[Tree] = doc_to_tree_simple(Doc, lists:reverse(RevIds)),
@@ -869,14 +838,13 @@ doc_to_tree_simple(Doc, [RevId | Rest]) ->
[{RevId, ?REV_MISSING, doc_to_tree_simple(Doc, Rest)}].
-make_doc(#db{fd=Fd}=Db, Id, Deleted, Bp, RevisionPath) ->
+make_doc(#db{fd=Fd}, Id, Deleted, Bp, RevisionPath) ->
{BodyData, BinValues} =
case Bp of
nil ->
{[], []};
_ ->
- {ok, {BodyData0, BinValues0}} =
- couch_stream:read_term( Db#db.summary_stream, Bp),
+ {ok, {BodyData0, BinValues0}} = read_doc(Fd, Bp),
{BodyData0,
[{Name,{Type,{Fd,Sp,Len}}} || {Name,{Type,Sp,Len}} <- BinValues0]}
end,
diff --git a/src/couchdb/couch_db.hrl b/src/couchdb/couch_db.hrl
index 0f7e344e..0d28b2fd 100644
--- a/src/couchdb/couch_db.hrl
+++ b/src/couchdb/couch_db.hrl
@@ -109,13 +109,12 @@
% if the disk revision is incremented, then new upgrade logic will need to be
% added to couch_db_updater:init_db.
--define(DISK_VERSION_0_9, 1).
--define(LATEST_DISK_VERSION, 2).
+-define(LATEST_DISK_VERSION, 3).
-record(db_header,
{disk_version = ?LATEST_DISK_VERSION,
update_seq = 0,
- summary_stream_state = nil,
+ unused = 0,
fulldocinfo_by_id_btree_state = nil,
docinfo_by_seq_btree_state = nil,
local_docs_btree_state = nil,
@@ -133,7 +132,7 @@
fd,
fd_ref_counter,
header = #db_header{},
- summary_stream,
+ committed_update_seq,
fulldocinfo_by_id_btree,
docinfo_by_seq_btree,
local_docs_btree,
@@ -145,7 +144,8 @@
admins_ptr = nil,
user_ctx = #user_ctx{},
waiting_delayed_commit = nil,
- revs_limit = 1000
+ revs_limit = 1000,
+ fsync_options = []
}).
diff --git a/src/couchdb/couch_db_updater.erl b/src/couchdb/couch_db_updater.erl
index b1cb9037..31ddbf8c 100644
--- a/src/couchdb/couch_db_updater.erl
+++ b/src/couchdb/couch_db_updater.erl
@@ -19,18 +19,18 @@
-include("couch_db.hrl").
--define(HEADER_SIG, <<$g, $m, $k, 0>>).
init({MainPid, DbName, Filepath, Fd, Options}) ->
case lists:member(create, Options) of
true ->
% create a new header and writes it to the file
Header = #db_header{},
- ok = couch_file:write_header(Fd, ?HEADER_SIG, Header),
+ ok = couch_file:write_header(Fd, Header),
% delete any old compaction files that might be hanging around
file:delete(Filepath ++ ".compact");
false ->
- {ok, Header} = couch_file:read_header(Fd, ?HEADER_SIG)
+ ok = couch_file:upgrade_old_header(Fd, <<$g, $m, $k, 0>>), % 09 UPGRADE CODE
+ {ok, Header} = couch_file:read_header(Fd)
end,
Db = init_db(DbName, Filepath, Fd, Header),
@@ -56,7 +56,7 @@ handle_call({update_docs, DocActions, Options}, _From, Db) ->
end;
handle_call(full_commit, _From, #db{waiting_delayed_commit=nil}=Db) ->
{reply, ok, Db}; % no data waiting, return ok immediately
-handle_call(full_commit, _From, Db) ->
+handle_call(full_commit, _From, Db) ->
{reply, ok, commit_data(Db)}; % commit the data and return ok
handle_call(increment_update_seq, _From, Db) ->
Db2 = commit_data(Db#db{update_seq=Db#db.update_seq+1}),
@@ -158,7 +158,7 @@ handle_cast(start_compact, Db) ->
end;
handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath}=Db) ->
{ok, NewFd} = couch_file:open(CompactFilepath),
- {ok, NewHeader} = couch_file:read_header(NewFd, ?HEADER_SIG),
+ {ok, NewHeader} = couch_file:read_header(NewFd),
#db{update_seq=NewSeq} = NewDb =
init_db(Db#db.name, Filepath, NewFd, NewHeader),
unlink(NewFd),
@@ -191,7 +191,7 @@ handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath}=Db) ->
end.
handle_info(delayed_commit, Db) ->
- {noreply, commit_data(Db#db{waiting_delayed_commit=nil})}.
+ {noreply, commit_data(Db)}.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@@ -214,6 +214,7 @@ btree_by_seq_join(KeySeq, {Id, RevInfos, DeletedRevInfos}) ->
[#rev_info{rev=Rev,seq=Seq,deleted=true,body_sp = Bp} ||
{Rev, Seq, Bp} <- DeletedRevInfos]};
btree_by_seq_join(KeySeq,{Id, Rev, Bp, Conflicts, DelConflicts, Deleted}) ->
+ % 09 UPGRADE CODE
% this is the 0.9.0 and earlier by_seq record. It's missing the body pointers
% and individual seq nums for conflicts that are currently in the index,
% meaning the filtered _changes api will not work except for on main docs.
@@ -244,6 +245,7 @@ btree_by_id_join(Id, {HighSeq, Deleted, DiskTree}) ->
(_RevId, ?REV_MISSING) ->
?REV_MISSING;
(_RevId, {IsDeleted, BodyPointer}) ->
+ % 09 UPGRADE CODE
% this is the 0.9.0 and earlier rev info record. It's missing the seq
% nums, which means couchdb will sometimes reexamine unchanged
% documents with the _changes API.
@@ -280,17 +282,27 @@ less_docid(nil, _) -> true; % nil - special key sorts before all
less_docid({}, _) -> false; % {} -> special key sorts after all
less_docid(A, B) -> A < B.
+
init_db(DbName, Filepath, Fd, Header0) ->
- case element(2, Header0) of
- ?DISK_VERSION_0_9 -> ok; % no problem, all records upgrade on the fly
- ?LATEST_DISK_VERSION -> ok;
+ Header1 = simple_upgrade_record(Header0, #db_header{}),
+ Header =
+ case element(2, Header1) of
+ 1 -> Header1#db_header{unused = 0}; % 0.9
+ 2 -> Header1#db_header{unused = 0}; % post 0.9 and pre 0.10
+ ?LATEST_DISK_VERSION -> Header1;
_ -> throw({database_disk_version_error, "Incorrect disk header version"})
end,
- Header = simple_upgrade_record(Header0, #db_header{}),
- {ok, SummaryStream} = couch_stream:open(Header#db_header.summary_stream_state, Fd),
- ok = couch_stream:set_min_buffer(SummaryStream, 10000),
Less = fun less_docid/2,
-
+
+ {ok, FsyncOptions} = couch_util:parse_term(
+ couch_config:get("couchdb", "fsync_options",
+ "[before_header, after_header, on_file_open]")),
+
+ case lists:member(on_file_open, FsyncOptions) of
+ true -> ok = couch_file:sync(Fd);
+ _ -> ok
+ end,
+
{ok, IdBtree} = couch_btree:open(Header#db_header.fulldocinfo_by_id_btree_state, Fd,
[{split, fun(X) -> btree_by_id_split(X) end},
{join, fun(X,Y) -> btree_by_id_join(X,Y) end},
@@ -308,7 +320,6 @@ init_db(DbName, Filepath, Fd, Header0) ->
AdminsPtr ->
{ok, Admins} = couch_file:pread_term(Fd, AdminsPtr)
end,
-
% convert start time tuple to microsecs and store as a binary string
{MegaSecs, Secs, MicroSecs} = now(),
StartTime = ?l2b(io_lib:format("~p",
@@ -319,22 +330,22 @@ init_db(DbName, Filepath, Fd, Header0) ->
fd=Fd,
fd_ref_counter = RefCntr,
header=Header,
- summary_stream = SummaryStream,
fulldocinfo_by_id_btree = IdBtree,
docinfo_by_seq_btree = SeqBtree,
local_docs_btree = LocalDocsBtree,
+ committed_update_seq = Header#db_header.update_seq,
update_seq = Header#db_header.update_seq,
name = DbName,
filepath = Filepath,
admins = Admins,
admins_ptr = AdminsPtr,
instance_start_time = StartTime,
- revs_limit = Header#db_header.revs_limit
+ revs_limit = Header#db_header.revs_limit,
+ fsync_options = FsyncOptions
}.
-close_db(#db{ fd_ref_counter = RefCntr, summary_stream = SummaryStream}) ->
- couch_stream:close(SummaryStream),
+close_db(#db{fd_ref_counter = RefCntr}) ->
couch_ref_counter:drop(RefCntr).
@@ -387,7 +398,7 @@ flush_trees(#db{fd=Fd}=Db, [InfoUnflushed | RestUnflushed], AccFlushed) ->
?LOG_DEBUG("File where the attachments are written has changed. Possibly retrying.", []),
throw(retry)
end,
- {ok, NewSummaryPointer} = couch_stream:write_term(Db#db.summary_stream, {Doc#doc.body, Bins}),
+ {ok, NewSummaryPointer} = couch_file:append_term(Fd, {Doc#doc.body, Bins}),
{IsDeleted, NewSummaryPointer, UpdateSeq};
_ ->
Value
@@ -549,18 +560,18 @@ update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) ->
commit_data(Db) ->
commit_data(Db, false).
-
-commit_data(#db{fd=Fd, header=Header} = Db, Delay) ->
- Header2 = Header#db_header{
+db_to_header(Db, Header) ->
+ Header#db_header{
update_seq = Db#db.update_seq,
- summary_stream_state = couch_stream:get_state(Db#db.summary_stream),
docinfo_by_seq_btree_state = couch_btree:get_state(Db#db.docinfo_by_seq_btree),
fulldocinfo_by_id_btree_state = couch_btree:get_state(Db#db.fulldocinfo_by_id_btree),
local_docs_btree_state = couch_btree:get_state(Db#db.local_docs_btree),
admins_ptr = Db#db.admins_ptr,
- revs_limit = Db#db.revs_limit
- },
- if Header == Header2 ->
+ revs_limit = Db#db.revs_limit}.
+
+commit_data(#db{fd=Fd,header=OldHeader,fsync_options=FsyncOptions}=Db, Delay) ->
+ Header = db_to_header(Db, OldHeader),
+ if OldHeader == Header ->
Db;
Delay and (Db#db.waiting_delayed_commit == nil) ->
Db#db{waiting_delayed_commit=
@@ -575,43 +586,75 @@ commit_data(#db{fd=Fd, header=Header} = Db, Delay) ->
end;
true -> ok
end,
- ok = couch_file:write_header(Fd, ?HEADER_SIG, Header2),
- Db#db{waiting_delayed_commit=nil,header=Header2}
+ case lists:member(before_header, FsyncOptions) of
+ true -> ok = couch_file:sync(Fd);
+ _ -> ok
+ end,
+
+ ok = couch_file:write_header(Fd, Header),
+
+ case lists:member(after_header, FsyncOptions) of
+ true -> ok = couch_file:sync(Fd);
+ _ -> ok
+ end,
+
+ Db#db{waiting_delayed_commit=nil,
+ header=Header,
+ committed_update_seq=Db#db.update_seq}
end.
-copy_raw_doc(SrcFd, SrcSp, DestFd, DestStream) ->
- {ok, {BodyData, BinInfos}} = couch_stream:read_term(SrcFd, SrcSp),
+
+copy_doc_attachments(SrcFd, SrcSp, DestFd) ->
+ {ok, {BodyData, BinInfos}} = couch_db:read_doc(SrcFd, SrcSp),
% copy the bin values
- NewBinInfos = lists:map(fun({Name, {Type, BinSp, Len}}) ->
- {ok, NewBinSp} = couch_stream:copy_to_new_stream(SrcFd, BinSp, Len, DestFd),
- {Name, {Type, NewBinSp, Len}}
+ NewBinInfos = lists:map(
+ fun({Name, {Type, BinSp, Len}}) when is_tuple(BinSp) orelse BinSp == null ->
+ % 09 UPGRADE CODE
+ {NewBinSp, Len} = couch_stream:old_copy_to_new_stream(SrcFd, BinSp, Len, DestFd),
+ {Name, {Type, NewBinSp, Len}};
+ ({Name, {Type, BinSp, Len}}) ->
+ {NewBinSp, Len} = couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd),
+ {Name, {Type, NewBinSp, Len}}
end, BinInfos),
- % now write the document summary
- {ok, Sp} = couch_stream:write_term(DestStream, {BodyData, NewBinInfos}),
- Sp.
+ {BodyData, NewBinInfos}.
-copy_rev_tree(_SrcFd, _DestFd, _DestStream, []) ->
+copy_rev_tree_attachments(_SrcFd, _DestFd, []) ->
[];
-copy_rev_tree(SrcFd, DestFd, DestStream, [{Start, Tree} | RestTree]) ->
+copy_rev_tree_attachments(SrcFd, DestFd, [{Start, Tree} | RestTree]) ->
% root nner node, only copy info/data from leaf nodes
- [Tree2] = copy_rev_tree(SrcFd, DestFd, DestStream, [Tree]),
- [{Start, Tree2} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTree)];
-copy_rev_tree(SrcFd, DestFd, DestStream, [{RevId, {IsDel, Sp, Seq}, []} | RestTree]) ->
+ [Tree2] = copy_rev_tree_attachments(SrcFd, DestFd, [Tree]),
+ [{Start, Tree2} | copy_rev_tree_attachments(SrcFd, DestFd, RestTree)];
+copy_rev_tree_attachments(SrcFd, DestFd, [{RevId, {IsDel, Sp, Seq}, []} | RestTree]) ->
% This is a leaf node, copy it over
- NewSp = copy_raw_doc(SrcFd, Sp, DestFd, DestStream),
- [{RevId, {IsDel, NewSp, Seq}, []} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTree)];
-copy_rev_tree(SrcFd, DestFd, DestStream, [{RevId, _, SubTree} | RestTree]) ->
+ DocBody = copy_doc_attachments(SrcFd, Sp, DestFd),
+ [{RevId, {IsDel, DocBody, Seq}, []} | copy_rev_tree_attachments(SrcFd, DestFd, RestTree)];
+copy_rev_tree_attachments(SrcFd, DestFd, [{RevId, _, SubTree} | RestTree]) ->
% inner node, only copy info/data from leaf nodes
- [{RevId, ?REV_MISSING, copy_rev_tree(SrcFd, DestFd, DestStream, SubTree)} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTree)].
+ [{RevId, ?REV_MISSING, copy_rev_tree_attachments(SrcFd, DestFd, SubTree)} | copy_rev_tree_attachments(SrcFd, DestFd, RestTree)].
+
-copy_docs(#db{fd=SrcFd}=Db, #db{fd=DestFd,summary_stream=DestStream}=NewDb, InfoBySeq, Retry) ->
+copy_docs(#db{fd=SrcFd}=Db, #db{fd=DestFd}=NewDb, InfoBySeq, Retry) ->
Ids = [Id || #doc_info{id=Id} <- InfoBySeq],
LookupResults = couch_btree:lookup(Db#db.fulldocinfo_by_id_btree, Ids),
+
+ % write out the attachments
NewFullDocInfos0 = lists:map(
fun({ok, #full_doc_info{rev_tree=RevTree}=Info}) ->
- Info#full_doc_info{rev_tree=copy_rev_tree(SrcFd, DestFd, DestStream, RevTree)}
+ Info#full_doc_info{rev_tree=copy_rev_tree_attachments(SrcFd, DestFd, RevTree)}
end, LookupResults),
- NewFullDocInfos = stem_full_doc_infos(Db, NewFullDocInfos0),
+ % write out the docs
+ % we do this in 2 stages so the docs are written out contiguously, making
+ % view indexing and replication faster.
+ NewFullDocInfos1 = lists:map(
+ fun(#full_doc_info{rev_tree=RevTree}=Info) ->
+ Info#full_doc_info{rev_tree=couch_key_tree:map_leafs(
+ fun(_Key, {IsDel, DocBody, Seq}) ->
+ {ok, Pos} = couch_file:append_term(DestFd, DocBody),
+ {IsDel, Pos, Seq}
+ end, RevTree)}
+ end, NewFullDocInfos0),
+
+ NewFullDocInfos = stem_full_doc_infos(Db, NewFullDocInfos1),
NewDocInfos = [couch_doc:to_doc_info(Info) || Info <- NewFullDocInfos],
RemoveSeqs =
case Retry of
@@ -633,7 +676,9 @@ copy_docs(#db{fd=SrcFd}=Db, #db{fd=DestFd,summary_stream=DestStream}=NewDb, Info
-copy_compact(Db, NewDb, Retry) ->
+copy_compact(Db, NewDb0, Retry) ->
+ FsyncOptions = [Op || Op <- NewDb0#db.fsync_options, Op == before_header],
+ NewDb = NewDb0#db{fsync_options=FsyncOptions},
TotalChanges = couch_db:count_changes_since(Db, NewDb#db.update_seq),
EnumBySeqFun =
fun(#doc_info{high_seq=Seq}=DocInfo, _Offset, {AccNewDb, AccUncopied, TotalCopied}) ->
@@ -677,15 +722,14 @@ start_copy_compact(#db{name=Name,filepath=Filepath}=Db) ->
{ok, Fd} ->
couch_task_status:add_task(<<"Database Compaction">>, <<Name/binary, " retry">>, <<"Starting">>),
Retry = true,
- {ok, Header} = couch_file:read_header(Fd, ?HEADER_SIG);
+ {ok, Header} = couch_file:read_header(Fd);
{error, enoent} ->
couch_task_status:add_task(<<"Database Compaction">>, Name, <<"Starting">>),
{ok, Fd} = couch_file:open(CompactFile, [create]),
Retry = false,
- ok = couch_file:write_header(Fd, ?HEADER_SIG, Header=#db_header{})
+ ok = couch_file:write_header(Fd, Header=#db_header{})
end,
NewDb = init_db(Name, CompactFile, Fd, Header),
- unlink(Fd),
NewDb2 = copy_compact(Db, NewDb, Retry),
gen_server:cast(Db#db.update_pid, {compact_done, CompactFile}),
diff --git a/src/couchdb/couch_doc.erl b/src/couchdb/couch_doc.erl
index f3a003e1..8018a1e4 100644
--- a/src/couchdb/couch_doc.erl
+++ b/src/couchdb/couch_doc.erl
@@ -252,13 +252,12 @@ to_doc_info_path(#full_doc_info{id=Id,rev_tree=Tree}) ->
bin_foldl(Bin, Fun, Acc) when is_binary(Bin) ->
- case Fun(Bin, Acc) of
- {ok, Acc2} -> {ok, Acc2};
- {done, Acc2} -> {ok, Acc2}
- end;
-bin_foldl({Fd, Sp, Len}, Fun, Acc) ->
- {ok, Acc2, _Sp2} = couch_stream:foldl(Fd, Sp, Len, Fun, Acc),
- {ok, Acc2}.
+ Fun(Bin, Acc);
+bin_foldl({Fd, Sp, Len}, Fun, Acc) when is_tuple(Sp) orelse Sp == null ->
+ % 09 UPGRADE CODE
+ couch_stream:old_foldl(Fd, Sp, Len, Fun, Acc);
+bin_foldl({Fd, Sp, _Len}, Fun, Acc) ->
+ couch_stream:foldl(Fd, Sp, Fun, Acc).
bin_size(Bin) when is_binary(Bin) ->
size(Bin);
@@ -267,9 +266,8 @@ bin_size({_Fd, _Sp, Len}) ->
bin_to_binary(Bin) when is_binary(Bin) ->
Bin;
-bin_to_binary({Fd, Sp, Len}) ->
- {ok, Bin, _Sp2} = couch_stream:read(Fd, Sp, Len),
- Bin.
+bin_to_binary({Fd, Sp, _Len}) ->
+ couch_stream:foldl(Fd, Sp, fun(Bin, Acc) -> [Bin|Acc] end, []).
get_validate_doc_fun(#doc{body={Props}}) ->
Lang = proplists:get_value(<<"language">>, Props, <<"javascript">>),
diff --git a/src/couchdb/couch_file.erl b/src/couchdb/couch_file.erl
index 430aa6b7..c773be98 100644
--- a/src/couchdb/couch_file.erl
+++ b/src/couchdb/couch_file.erl
@@ -15,10 +15,16 @@
-include("couch_db.hrl").
--define(HEADER_SIZE, 2048). % size of each segment of the doubly written header
+-define(SIZE_BLOCK, 4096).
+
+-record(file, {
+ fd,
+ tail_append_begin=0 % 09 UPGRADE CODE
+ }).
--export([open/1, open/2, close/1, pread/3, pwrite/3, expand/2, bytes/1, sync/1]).
--export([append_term/2, pread_term/2,write_header/3, read_header/2, truncate/2]).
+-export([open/1, open/2, close/1, bytes/1, sync/1, append_binary/2,old_pread/3]).
+-export([append_term/2, pread_term/2, pread_iolist/2, write_header/2]).
+-export([pread_binary/2, read_header/1, truncate/2, upgrade_old_header/2]).
-export([init/1, terminate/2, handle_call/3, handle_cast/2, code_change/3, handle_info/2]).
%%----------------------------------------------------------------------
@@ -52,39 +58,6 @@ open(Filepath, Options) ->
%%----------------------------------------------------------------------
-%% Args: Pos is the offset from the beginning of the file, Bytes is
-%% is the number of bytes to read.
-%% Returns: {ok, Binary} where Binary is a binary data from disk
-%% or {error, Reason}.
-%%----------------------------------------------------------------------
-
-pread(Fd, Pos, Bytes) when Bytes > 0 ->
- gen_server:call(Fd, {pread, Pos, Bytes}, infinity).
-
-
-%%----------------------------------------------------------------------
-%% Args: Pos is the offset from the beginning of the file, Bin is
-%% is the binary to write
-%% Returns: ok
-%% or {error, Reason}.
-%%----------------------------------------------------------------------
-
-pwrite(Fd, Pos, Bin) ->
- gen_server:call(Fd, {pwrite, Pos, Bin}, infinity).
-
-%%----------------------------------------------------------------------
-%% Purpose: To append a segment of zeros to the end of the file.
-%% Args: Bytes is the number of bytes to append to the file.
-%% Returns: {ok, Pos} where Pos is the file offset to the beginning of
-%% the new segments.
-%% or {error, Reason}.
-%%----------------------------------------------------------------------
-
-expand(Fd, Bytes) when Bytes > 0 ->
- gen_server:call(Fd, {expand, Bytes}, infinity).
-
-
-%%----------------------------------------------------------------------
%% Purpose: To append an Erlang term to the end of the file.
%% Args: Erlang term to serialize and append to the file.
%% Returns: {ok, Pos} where Pos is the file offset to the beginning the
@@ -93,7 +66,7 @@ expand(Fd, Bytes) when Bytes > 0 ->
%%----------------------------------------------------------------------
append_term(Fd, Term) ->
- append_binary(Fd, term_to_binary(Term, [compressed])).
+ append_binary(Fd, term_to_binary(Term)).
%%----------------------------------------------------------------------
@@ -105,7 +78,8 @@ append_term(Fd, Term) ->
%%----------------------------------------------------------------------
append_binary(Fd, Bin) ->
- gen_server:call(Fd, {append_bin, Bin}, infinity).
+ Size = iolist_size(Bin),
+ gen_server:call(Fd, {append_bin, [<<Size:32/integer>>, Bin]}, infinity).
%%----------------------------------------------------------------------
@@ -115,10 +89,12 @@ append_binary(Fd, Bin) ->
%% or {error, Reason}.
%%----------------------------------------------------------------------
+
pread_term(Fd, Pos) ->
{ok, Bin} = pread_binary(Fd, Pos),
{ok, binary_to_term(Bin)}.
+
%%----------------------------------------------------------------------
%% Purpose: Reads a binrary from a file that was written with append_binary
%% Args: Pos, the offset into the file where the term is serialized.
@@ -127,8 +103,26 @@ pread_term(Fd, Pos) ->
%%----------------------------------------------------------------------
pread_binary(Fd, Pos) ->
- gen_server:call(Fd, {pread_bin, Pos}, infinity).
-
+ {ok, L} = pread_iolist(Fd, Pos),
+ {ok, iolist_to_binary(L)}.
+
+pread_iolist(Fd, Pos) ->
+ {ok, LenIolist, NextPos} =read_raw_iolist(Fd, Pos, 4),
+ <<Len:32/integer>> = iolist_to_binary(LenIolist),
+ {ok, Iolist, _} = read_raw_iolist(Fd, NextPos, Len),
+ {ok, Iolist}.
+
+read_raw_iolist(Fd, Pos, Len) ->
+ BlockOffset = Pos rem ?SIZE_BLOCK,
+ TotalBytes = calculate_total_read_len(BlockOffset, Len),
+ {ok, <<RawBin:TotalBytes/binary>>, HasPrefixes} = gen_server:call(Fd, {pread, Pos, TotalBytes}, infinity),
+ if HasPrefixes ->
+ {ok, remove_block_prefixes(BlockOffset, RawBin), Pos + TotalBytes};
+ true ->
+ % 09 UPGRADE CODE
+ <<ReturnBin:Len/binary, _/binary>> = RawBin,
+ {ok, [ReturnBin], Pos + Len}
+ end.
%%----------------------------------------------------------------------
%% Purpose: The length of a file, in bytes.
@@ -167,35 +161,153 @@ close(Fd) ->
catch unlink(Fd),
Result.
+% 09 UPGRADE CODE
+old_pread(Fd, Pos, Len) ->
+ {ok, <<RawBin:Len/binary>>, false} = gen_server:call(Fd, {pread, Pos, Len}, infinity),
+ {ok, RawBin}.
-write_header(Fd, Prefix, Data) ->
- TermBin = term_to_binary(Data),
- % the size of all the bytes written to the header, including the md5 signature (16 bytes)
- FilledSize = size(Prefix) + size(TermBin) + 16,
- {TermBin2, FilledSize2} =
- case FilledSize > ?HEADER_SIZE of
+% 09 UPGRADE CODE
+upgrade_old_header(Fd, Sig) ->
+ gen_server:call(Fd, {upgrade_old_header, Sig}, infinity).
+
+
+read_header(Fd) ->
+ case gen_server:call(Fd, find_header, infinity) of
+ {ok, Bin} ->
+ {ok, binary_to_term(Bin)};
+ Else ->
+ Else
+ end.
+
+write_header(Fd, Data) ->
+ Bin = term_to_binary(Data),
+ Md5 = erlang:md5(Bin),
+ % now we assemble the final header binary and write to disk
+ FinalBin = <<Md5/binary, Bin/binary>>,
+ gen_server:call(Fd, {write_header, FinalBin}, infinity).
+
+
+
+
+init_status_error(ReturnPid, Ref, Error) ->
+ ReturnPid ! {Ref, self(), Error},
+ ignore.
+
+% server functions
+
+init({Filepath, Options, ReturnPid, Ref}) ->
+ case lists:member(create, Options) of
true ->
- % too big!
- {ok, Pos} = append_binary(Fd, TermBin),
- PtrBin = term_to_binary({pointer_to_header_data, Pos}),
- {PtrBin, size(Prefix) + size(PtrBin) + 16};
+ filelib:ensure_dir(Filepath),
+ case file:open(Filepath, [read, write, raw, binary]) of
+ {ok, Fd} ->
+ {ok, Length} = file:position(Fd, eof),
+ case Length > 0 of
+ true ->
+ % this means the file already exists and has data.
+ % FYI: We don't differentiate between empty files and non-existant
+ % files here.
+ case lists:member(overwrite, Options) of
+ true ->
+ {ok, 0} = file:position(Fd, 0),
+ ok = file:truncate(Fd),
+ ok = file:sync(Fd),
+ couch_stats_collector:track_process_count(
+ {couchdb, open_os_files}),
+ {ok, #file{fd=Fd}};
+ false ->
+ ok = file:close(Fd),
+ init_status_error(ReturnPid, Ref, file_exists)
+ end;
+ false ->
+ couch_stats_collector:track_process_count(
+ {couchdb, open_os_files}),
+ {ok, #file{fd=Fd}}
+ end;
+ Error ->
+ init_status_error(ReturnPid, Ref, Error)
+ end;
false ->
- {TermBin, FilledSize}
+ % open in read mode first, so we don't create the file if it doesn't exist.
+ case file:open(Filepath, [read, raw]) of
+ {ok, Fd_Read} ->
+ {ok, Fd} = file:open(Filepath, [read, write, raw, binary]),
+ ok = file:close(Fd_Read),
+ couch_stats_collector:track_process_count({couchdb, open_os_files}),
+ {ok, #file{fd=Fd}};
+ Error ->
+ init_status_error(ReturnPid, Ref, Error)
+ end
+ end.
+
+
+terminate(_Reason, _Fd) ->
+ ok.
+
+
+handle_call({pread, Pos, Bytes}, _From, #file{fd=Fd,tail_append_begin=TailAppendBegin}=File) ->
+ {ok, Bin} = file:pread(Fd, Pos, Bytes),
+ {reply, {ok, Bin, Pos >= TailAppendBegin}, File};
+handle_call(bytes, _From, #file{fd=Fd}=File) ->
+ {reply, file:position(Fd, eof), File};
+handle_call(sync, _From, #file{fd=Fd}=File) ->
+ {reply, file:sync(Fd), File};
+handle_call({truncate, Pos}, _From, #file{fd=Fd}=File) ->
+ {ok, Pos} = file:position(Fd, Pos),
+ {reply, file:truncate(Fd), File};
+handle_call({append_bin, Bin}, _From, #file{fd=Fd}=File) ->
+ {ok, Pos} = file:position(Fd, eof),
+ Blocks = make_blocks(Pos rem ?SIZE_BLOCK, Bin),
+ case file:pwrite(Fd, Pos, Blocks) of
+ ok ->
+ {reply, {ok, Pos}, File};
+ Error ->
+ {reply, Error, File}
+ end;
+handle_call({write_header, Bin}, _From, #file{fd=Fd}=File) ->
+ {ok, Pos} = file:position(Fd, eof),
+ BinSize = size(Bin),
+ case Pos rem ?SIZE_BLOCK of
+ 0 ->
+ Padding = <<>>;
+ BlockOffset ->
+ Padding = <<0:(8*(?SIZE_BLOCK-BlockOffset))>>
end,
- ok = sync(Fd),
- % pad out the header with zeros, then take the md5 hash
- PadZeros = <<0:(8*(?HEADER_SIZE - FilledSize2))>>,
- Sig = erlang:md5([TermBin2, PadZeros]),
- % now we assemble the final header binary and write to disk
- WriteBin = <<Prefix/binary, TermBin2/binary, PadZeros/binary, Sig/binary>>,
- ?HEADER_SIZE = size(WriteBin), % sanity check
- DblWriteBin = [WriteBin, WriteBin],
- ok = pwrite(Fd, 0, DblWriteBin),
- ok = sync(Fd).
+ FinalBin = [Padding, <<1, BinSize:32/integer>> | make_blocks(1, [Bin])],
+ {reply, file:pwrite(Fd, Pos, FinalBin), File};
+
+
+handle_call({upgrade_old_header, Prefix}, _From, #file{fd=Fd}=File) ->
+ case (catch read_old_header(Fd, Prefix)) of
+ {ok, Header} ->
+ {ok, TailAppendBegin} = file:position(Fd, eof),
+ Bin = term_to_binary(Header),
+ Md5 = erlang:md5(Bin),
+ % now we assemble the final header binary and write to disk
+ FinalBin = <<Md5/binary, Bin/binary>>,
+ {reply, ok, _} = handle_call({write_header, FinalBin}, ok, File),
+ ok = write_old_header(Fd, <<"upgraded">>, TailAppendBegin),
+ {reply, ok, File#file{tail_append_begin=TailAppendBegin}};
+ _Error ->
+ case (catch read_old_header(Fd, <<"upgraded">>)) of
+ {ok, TailAppendBegin} ->
+ {reply, ok, File#file{tail_append_begin = TailAppendBegin}};
+ _Error2 ->
+ {reply, ok, File}
+ end
+ end;
+
+handle_call(find_header, _From, #file{fd=Fd}=File) ->
+ {ok, Pos} = file:position(Fd, eof),
+ {reply, find_header(Fd, Pos div ?SIZE_BLOCK), File}.
+
+% 09 UPGRADE CODE
+-define(HEADER_SIZE, 2048). % size of each segment of the doubly written header
-read_header(Fd, Prefix) ->
- {ok, Bin} = couch_file:pread(Fd, 0, 2*(?HEADER_SIZE)),
+% 09 UPGRADE CODE
+read_old_header(Fd, Prefix) ->
+ {ok, Bin} = file:pread(Fd, 0, 2*(?HEADER_SIZE)),
<<Bin1:(?HEADER_SIZE)/binary, Bin2:(?HEADER_SIZE)/binary>> = Bin,
Result =
% read the first header
@@ -238,6 +350,7 @@ read_header(Fd, Prefix) ->
Result
end.
+% 09 UPGRADE CODE
extract_header(Prefix, Bin) ->
SizeOfPrefix = size(Prefix),
SizeOfTermBin = ?HEADER_SIZE -
@@ -260,88 +373,35 @@ extract_header(Prefix, Bin) ->
_ ->
unknown_header_type
end.
+
-
-init_status_error(ReturnPid, Ref, Error) ->
- ReturnPid ! {Ref, self(), Error},
- ignore.
-
-% server functions
-
-init({Filepath, Options, ReturnPid, Ref}) ->
- case lists:member(create, Options) of
+% 09 UPGRADE CODE
+write_old_header(Fd, Prefix, Data) ->
+ TermBin = term_to_binary(Data),
+ % the size of all the bytes written to the header, including the md5 signature (16 bytes)
+ FilledSize = size(Prefix) + size(TermBin) + 16,
+ {TermBin2, FilledSize2} =
+ case FilledSize > ?HEADER_SIZE of
true ->
- filelib:ensure_dir(Filepath),
- case file:open(Filepath, [read, write, raw, binary]) of
- {ok, Fd} ->
- {ok, Length} = file:position(Fd, eof),
- case Length > 0 of
- true ->
- % this means the file already exists and has data.
- % FYI: We don't differentiate between empty files and non-existant
- % files here.
- case lists:member(overwrite, Options) of
- true ->
- {ok, 0} = file:position(Fd, 0),
- ok = file:truncate(Fd),
- couch_stats_collector:track_process_count(
- {couchdb, open_os_files}),
- {ok, Fd};
- false ->
- ok = file:close(Fd),
- init_status_error(ReturnPid, Ref, file_exists)
- end;
- false ->
- couch_stats_collector:track_process_count(
- {couchdb, open_os_files}),
- {ok, Fd}
- end;
- Error ->
- init_status_error(ReturnPid, Ref, Error)
- end;
+ % too big!
+ {ok, Pos} = append_binary(Fd, TermBin),
+ PtrBin = term_to_binary({pointer_to_header_data, Pos}),
+ {PtrBin, size(Prefix) + size(PtrBin) + 16};
false ->
- % open in read mode first, so we don't create the file if it doesn't exist.
- case file:open(Filepath, [read, raw]) of
- {ok, Fd_Read} ->
- {ok, Fd} = file:open(Filepath, [read, write, raw, binary]),
- ok = file:close(Fd_Read),
- couch_stats_collector:track_process_count({couchdb, open_os_files}),
- {ok, Fd};
- Error ->
- init_status_error(ReturnPid, Ref, Error)
- end
- end.
-
-
-terminate(_Reason, _Fd) ->
- ok.
-
-
-handle_call({pread, Pos, Bytes}, _From, Fd) ->
- {reply, file:pread(Fd, Pos, Bytes), Fd};
-handle_call({pwrite, Pos, Bin}, _From, Fd) ->
- {reply, file:pwrite(Fd, Pos, Bin), Fd};
-handle_call({expand, Num}, _From, Fd) ->
- {ok, Pos} = file:position(Fd, eof),
- {reply, {file:pwrite(Fd, Pos + Num - 1, <<0>>), Pos}, Fd};
-handle_call(bytes, _From, Fd) ->
- {reply, file:position(Fd, eof), Fd};
-handle_call(sync, _From, Fd) ->
- {reply, file:sync(Fd), Fd};
-handle_call({truncate, Pos}, _From, Fd) ->
- {ok, Pos} = file:position(Fd, Pos),
- {reply, file:truncate(Fd), Fd};
-handle_call({append_bin, Bin}, _From, Fd) ->
- Len = size(Bin),
- Bin2 = <<Len:32, Bin/binary>>,
- {ok, Pos} = file:position(Fd, eof),
- {reply, {file:pwrite(Fd, Pos, Bin2), Pos}, Fd};
-handle_call({pread_bin, Pos}, _From, Fd) ->
- {ok, <<TermLen:32>>} = file:pread(Fd, Pos, 4),
- {ok, Bin} = file:pread(Fd, Pos + 4, TermLen),
- {reply, {ok, Bin}, Fd}.
-
+ {TermBin, FilledSize}
+ end,
+ ok = file:sync(Fd),
+ % pad out the header with zeros, then take the md5 hash
+ PadZeros = <<0:(8*(?HEADER_SIZE - FilledSize2))>>,
+ Sig = erlang:md5([TermBin2, PadZeros]),
+ % now we assemble the final header binary and write to disk
+ WriteBin = <<Prefix/binary, TermBin2/binary, PadZeros/binary, Sig/binary>>,
+ ?HEADER_SIZE = size(WriteBin), % sanity check
+ DblWriteBin = [WriteBin, WriteBin],
+ ok = file:pwrite(Fd, 0, DblWriteBin),
+ ok = file:sync(Fd).
+
handle_cast(close, Fd) ->
{stop,normal,Fd}.
@@ -351,3 +411,82 @@ code_change(_OldVsn, State, _Extra) ->
handle_info({'EXIT', _, Reason}, Fd) ->
{stop, Reason, Fd}.
+
+
+find_header(_Fd, -1) ->
+ no_valid_header;
+find_header(Fd, Block) ->
+ case (catch load_header(Fd, Block)) of
+ {ok, Bin} ->
+ {ok, Bin};
+ _Error ->
+ find_header(Fd, Block -1)
+ end.
+
+load_header(Fd, Block) ->
+ {ok, <<1>>} = file:pread(Fd, Block*?SIZE_BLOCK, 1),
+ {ok, <<HeaderLen:32/integer>>} = file:pread(Fd, (Block*?SIZE_BLOCK) + 1, 4),
+ TotalBytes = calculate_total_read_len(1, HeaderLen),
+ {ok, <<RawBin:TotalBytes/binary>>} =
+ file:pread(Fd, (Block*?SIZE_BLOCK) + 5, TotalBytes),
+ <<Md5Sig:16/binary, HeaderBin/binary>> =
+ iolist_to_binary(remove_block_prefixes(1, RawBin)),
+ Md5Sig = erlang:md5(HeaderBin),
+ {ok, HeaderBin}.
+
+calculate_total_read_len(0, FinalLen) ->
+ calculate_total_read_len(1, FinalLen) + 1;
+calculate_total_read_len(BlockOffset, FinalLen) ->
+ case ?SIZE_BLOCK - BlockOffset of
+ BlockLeft when BlockLeft >= FinalLen ->
+ FinalLen;
+ BlockLeft ->
+ FinalLen + ((FinalLen - BlockLeft) div (?SIZE_BLOCK -1)) +
+ if ((FinalLen - BlockLeft) rem (?SIZE_BLOCK -1)) == 0 -> 0;
+ true -> 1 end
+ end.
+
+remove_block_prefixes(_BlockOffset, <<>>) ->
+ [];
+remove_block_prefixes(0, <<_BlockPrefix,Rest/binary>>) ->
+ remove_block_prefixes(1, Rest);
+remove_block_prefixes(BlockOffset, Bin) ->
+ BlockBytesAvailable = ?SIZE_BLOCK - BlockOffset,
+ case size(Bin) of
+ Size when Size > BlockBytesAvailable ->
+ <<DataBlock:BlockBytesAvailable/binary,Rest/binary>> = Bin,
+ [DataBlock | remove_block_prefixes(0, Rest)];
+ _Size ->
+ [Bin]
+ end.
+
+make_blocks(_BlockOffset, []) ->
+ [];
+make_blocks(0, IoList) ->
+ [<<0>> | make_blocks(1, IoList)];
+make_blocks(BlockOffset, IoList) ->
+ case split_iolist(IoList, (?SIZE_BLOCK - BlockOffset), []) of
+ {Begin, End} ->
+ [Begin | make_blocks(0, End)];
+ _Size ->
+ IoList
+ end.
+
+split_iolist(List, 0, BeginAcc) ->
+ {lists:reverse(BeginAcc), List};
+split_iolist([], SplitAt, _BeginAcc) ->
+ SplitAt;
+split_iolist([<<Bin/binary>> | Rest], SplitAt, BeginAcc) when SplitAt > size(Bin) ->
+ split_iolist(Rest, SplitAt - size(Bin), [Bin | BeginAcc]);
+split_iolist([<<Bin/binary>> | Rest], SplitAt, BeginAcc) ->
+ <<Begin:SplitAt/binary,End/binary>> = Bin,
+ split_iolist([End | Rest], 0, [Begin | BeginAcc]);
+split_iolist([Sublist| Rest], SplitAt, BeginAcc) when is_list(Sublist) ->
+ case split_iolist(Sublist, SplitAt, BeginAcc) of
+ {Begin, End} ->
+ {Begin, [End | Rest]};
+ Len ->
+ split_iolist(Rest, SplitAt - Len, [Sublist | BeginAcc])
+ end;
+split_iolist([Byte | Rest], SplitAt, BeginAcc) when is_integer(Byte) ->
+ split_iolist(Rest, SplitAt - 1, [Byte | BeginAcc]).
diff --git a/src/couchdb/couch_httpd_db.erl b/src/couchdb/couch_httpd_db.erl
index 03cba11e..cb8c205f 100644
--- a/src/couchdb/couch_httpd_db.erl
+++ b/src/couchdb/couch_httpd_db.erl
@@ -723,12 +723,7 @@ db_attachment_req(#httpd{method='GET'}=Req, Db, DocId, FileNameParts) ->
% {"Content-Length", integer_to_list(couch_doc:bin_size(Bin))}
]),
couch_doc:bin_foldl(Bin,
- fun(BinSegment, []) ->
- send_chunk(Resp, BinSegment),
- {ok, []}
- end,
- []
- ),
+ fun(BinSegment, _) -> send_chunk(Resp, BinSegment) end,[]),
send_chunk(Resp, "")
end;
diff --git a/src/couchdb/couch_stream.erl b/src/couchdb/couch_stream.erl
index d6f72696..bf9fd3c2 100644
--- a/src/couchdb/couch_stream.erl
+++ b/src/couchdb/couch_stream.erl
@@ -13,14 +13,6 @@
-module(couch_stream).
-behaviour(gen_server).
--export([test/1]).
--export([open/1, open/2, close/1, read/3, read_term/2, write/2, write_term/2, get_state/1, foldl/5]).
--export([copy/4, copy_to_new_stream/4]).
--export([ensure_buffer/2, set_min_buffer/2]).
--export([init/1, terminate/2, handle_call/3]).
--export([handle_cast/2,code_change/3,handle_info/2]).
-
--include("couch_db.hrl").
-define(FILE_POINTER_BYTES, 8).
-define(FILE_POINTER_BITS, 8*(?FILE_POINTER_BYTES)).
@@ -32,125 +24,111 @@
-define(DEFAULT_STREAM_CHUNK, 16#00100000). % 1 meg chunks when streaming data
+-export([test/0]).
+-export([open/1, close/1, write/2, foldl/4, old_foldl/5,old_copy_to_new_stream/4]).
+-export([copy_to_new_stream/3,old_read_term/2]).
+-export([init/1, terminate/2, handle_call/3]).
+-export([handle_cast/2,code_change/3,handle_info/2]).
--record(write_stream,
- {fd = 0,
- current_pos = 0,
- bytes_remaining = 0,
- next_alloc = 0,
- min_alloc = 16#00010000
- }).
+-include("couch_db.hrl").
-record(stream,
- {
- pid,
- fd
+ {fd = 0,
+ written_pointers=[],
+ buffer_list = [],
+ buffer_len = 0,
+ max_buffer = 4096,
+ written_len = 0
}).
%%% Interface functions %%%
open(Fd) ->
- open(nil, Fd).
+ gen_server:start_link(couch_stream, Fd, []).
-open(nil, Fd) ->
- open({0,0}, Fd);
-open(State, Fd) ->
- {ok, Pid} = gen_server:start_link(couch_stream, {State, Fd}, []),
- {ok, #stream{pid = Pid, fd = Fd}}.
-
-close(#stream{pid = Pid, fd = _Fd}) ->
+close(Pid) ->
gen_server:call(Pid, close, infinity).
-get_state(#stream{pid = Pid, fd = _Fd}) ->
- gen_server:call(Pid, get_state, infinity).
-
-ensure_buffer(#stream{pid = Pid, fd = _Fd}, Bytes) ->
- gen_server:call(Pid, {ensure_buffer, Bytes}).
-
-set_min_buffer(#stream{pid = Pid, fd = _Fd}, Bytes) ->
- gen_server:call(Pid, {set_min_buffer, Bytes}).
-
-read(#stream{pid = _Pid, fd = Fd}, Sp, Num) ->
- read(Fd, Sp, Num);
-read(Fd, Sp, Num) ->
- {ok, RevBin, Sp2} = stream_data(Fd, Sp, Num, ?HUGE_CHUNK, fun(Bin, Acc) -> {ok, [Bin | Acc]} end, []),
- Bin = list_to_binary(lists:reverse(RevBin)),
- {ok, Bin, Sp2}.
-
-copy_to_new_stream(Src, Sp, Len, DestFd) ->
+copy_to_new_stream(Fd, PosList, DestFd) ->
{ok, Dest} = open(DestFd),
- ok = set_min_buffer(Dest, 0),
- {ok, NewSp} = copy(Src, Sp, Len, Dest),
- close(Dest),
- {ok, NewSp}.
-
-copy(#stream{pid = _Pid, fd = Fd}, Sp, Len, DestStream) ->
- copy(Fd, Sp, Len, DestStream);
-copy(Fd, Sp, Len, DestStream) ->
- {ok, NewSp, _Sp2} = stream_data(Fd, Sp, Len, ?HUGE_CHUNK,
- fun(Bin, AccPointer) ->
- {ok, NewPointer} = write(DestStream, Bin),
- {ok, if AccPointer == null -> NewPointer; true -> AccPointer end}
- end,
- null),
- {ok, NewSp}.
-
-foldl(#stream{pid = _Pid, fd = Fd}, Sp, Num, Fun, Acc) ->
- foldl(Fd, Sp, Num, Fun, Acc);
-foldl(Fd, Sp, Num, Fun, Acc) ->
- {ok, _Acc, _Sp} = stream_data(Fd, Sp, Num, ?DEFAULT_STREAM_CHUNK, Fun, Acc).
-
-read_term(#stream{pid = _Pid, fd = Fd}, Sp) ->
- read_term(Fd, Sp);
-read_term(Fd, Sp) ->
- {ok, <<TermLen:(?STREAM_OFFSET_BITS)>>, Sp2}
- = read(Fd, Sp, ?STREAM_OFFSET_BYTES),
- {ok, Bin, _Sp3} = read(Fd, Sp2, TermLen),
- {ok, binary_to_term(Bin)}.
+ foldl(Fd, PosList,
+ fun(Bin, _) ->
+ ok = write(Dest, Bin)
+ end, ok),
+ close(Dest).
-write_term(Stream, Term) ->
- Bin = term_to_binary(Term),
- Size = size(Bin),
- Bin2 = <<Size:(?STREAM_OFFSET_BITS), Bin/binary>>,
- write(Stream, Bin2).
-write(#stream{}, <<>>) ->
- {ok, {0,0}};
-write(#stream{pid = Pid}, Bin) when is_binary(Bin) ->
+% 09 UPGRADE CODE
+old_copy_to_new_stream(Fd, Pos, Len, DestFd) ->
+ {ok, Dest} = open(DestFd),
+ old_foldl(Fd, Pos, Len,
+ fun(Bin, _) ->
+ ok = write(Dest, Bin)
+ end, ok),
+ close(Dest).
+
+% 09 UPGRADE CODE
+old_foldl(_Fd, null, 0, _Fun, Acc) ->
+ Acc;
+old_foldl(Fd, OldPointer, Len, Fun, Acc) when is_tuple(OldPointer)->
+ old_stream_data(Fd, OldPointer, Len, ?DEFAULT_STREAM_CHUNK, Fun, Acc).
+
+foldl(_Fd, [], _Fun, Acc) ->
+ Acc;
+foldl(Fd, [Pos|Rest], Fun, Acc) ->
+ {ok, Bin} = couch_file:pread_iolist(Fd, Pos),
+ foldl(Fd, Rest, Fun, Fun(Bin, Acc)).
+
+write(_Pid, <<>>) ->
+ ok;
+write(Pid, Bin) ->
gen_server:call(Pid, {write, Bin}, infinity).
-init({{Pos, BytesRemaining}, Fd}) ->
- {ok, #write_stream
- {fd = Fd,
- current_pos = Pos,
- bytes_remaining = BytesRemaining
- }}.
+init(Fd) ->
+ {ok, #stream{fd = Fd}}.
terminate(_Reason, _Stream) ->
ok.
-handle_call(get_state, _From, Stream) ->
- #write_stream{current_pos = Pos, bytes_remaining = BytesRemaining} = Stream,
- {reply, {Pos, BytesRemaining}, Stream};
-handle_call({set_min_buffer, MinBuffer}, _From, Stream) ->
- {reply, ok, Stream#write_stream{min_alloc = MinBuffer}};
-% set next_alloc if we need more room
-handle_call({ensure_buffer, BufferSizeRequested}, _From, Stream) ->
- #write_stream{bytes_remaining = BytesRemainingInCurrentBuffer} = Stream,
- case BytesRemainingInCurrentBuffer < BufferSizeRequested of
- true -> NextAlloc = BufferSizeRequested - BytesRemainingInCurrentBuffer;
- false -> NextAlloc = 0 % enough room in current segment
- end,
- {reply, ok, Stream#write_stream{next_alloc = NextAlloc}};
handle_call({write, Bin}, _From, Stream) ->
- % ensure init is called first so we can get a pointer to the begining of the binary
- {ok, Sp, Stream2} = write_data(Stream, Bin),
- {reply, {ok, Sp}, Stream2};
+ BinSize = iolist_size(Bin),
+ #stream{
+ fd = Fd,
+ written_len = WrittenLen,
+ written_pointers = Written,
+ buffer_len = BufferLen,
+ buffer_list = Buffer,
+ max_buffer = Max} = Stream,
+ if BinSize + BufferLen > Max ->
+ {ok, Pos} = couch_file:append_binary(Fd, lists:reverse(Buffer, Bin)),
+ {reply, ok, Stream#stream{
+ written_len=WrittenLen + BufferLen + BinSize,
+ written_pointers=[Pos|Written],
+ buffer_list=[],
+ buffer_len=0}};
+ true ->
+ {reply, ok, Stream#stream{
+ buffer_list=[Bin|Buffer],
+ buffer_len=BufferLen + BinSize}}
+ end;
handle_call(close, _From, Stream) ->
- #write_stream{current_pos=Pos, bytes_remaining = BytesRemaining} = Stream,
- {stop, normal, {ok, {Pos, BytesRemaining}}, Stream}.
+ #stream{
+ fd = Fd,
+ written_len = WrittenLen,
+ written_pointers = Written,
+ buffer_len = BufferLen,
+ buffer_list = Buffer} = Stream,
+
+ case Buffer of
+ [] ->
+ Result = {Written, WrittenLen};
+ _ ->
+ {ok, Pos} = couch_file:append_binary(Fd, lists:reverse(Buffer)),
+ Result = {[Pos|Written], WrittenLen + BufferLen}
+ end,
+ {stop, normal, Result, Stream}.
handle_cast(_Msg, State) ->
{noreply,State}.
@@ -160,14 +138,27 @@ code_change(_OldVsn, State, _Extra) ->
handle_info(_Info, State) ->
{noreply, State}.
+
+
-%%% Internal function %%%
+% 09 UPGRADE CODE
+old_read_term(Fd, Sp) ->
+ {ok, <<TermLen:(?STREAM_OFFSET_BITS)>>, Sp2}
+ = old_read(Fd, Sp, ?STREAM_OFFSET_BYTES),
+ {ok, Bin, _Sp3} = old_read(Fd, Sp2, TermLen),
+ {ok, binary_to_term(Bin)}.
-stream_data(_Fd, Sp, 0, _MaxChunk, _Fun, Acc) ->
+old_read(Fd, Sp, Num) ->
+ {ok, RevBin, Sp2} = old_stream_data(Fd, Sp, Num, ?HUGE_CHUNK, fun(Bin, Acc) -> [Bin | Acc] end, []),
+ Bin = list_to_binary(lists:reverse(RevBin)),
+ {ok, Bin, Sp2}.
+
+% 09 UPGRADE CODE
+old_stream_data(_Fd, Sp, 0, _MaxChunk, _Fun, Acc) ->
{ok, Acc, Sp};
-stream_data(Fd, {Pos, 0}, Num, MaxChunk, Fun, Acc) ->
+old_stream_data(Fd, {Pos, 0}, Num, MaxChunk, Fun, Acc) ->
{ok, <<NextPos:(?FILE_POINTER_BITS), NextOffset:(?STREAM_OFFSET_BITS)>>}
- = couch_file:pread(Fd, Pos, ?FILE_POINTER_BYTES + ?STREAM_OFFSET_BYTES),
+ = couch_file:old_pread(Fd, Pos, ?FILE_POINTER_BYTES + ?STREAM_OFFSET_BYTES),
Sp = {NextPos, NextOffset},
% Check NextPos is past current Pos (this is always true in a stream)
% Guards against potential infinite loops caused by corruption.
@@ -175,86 +166,47 @@ stream_data(Fd, {Pos, 0}, Num, MaxChunk, Fun, Acc) ->
true -> ok;
false -> throw({error, stream_corruption})
end,
- stream_data(Fd, Sp, Num, MaxChunk, Fun, Acc);
-stream_data(Fd, {Pos, Offset}, Num, MaxChunk, Fun, Acc) ->
+ old_stream_data(Fd, Sp, Num, MaxChunk, Fun, Acc);
+old_stream_data(Fd, {Pos, Offset}, Num, MaxChunk, Fun, Acc) ->
ReadAmount = lists:min([MaxChunk, Num, Offset]),
- {ok, Bin} = couch_file:pread(Fd, Pos, ReadAmount),
+ {ok, Bin} = couch_file:old_pread(Fd, Pos, ReadAmount),
Sp = {Pos + ReadAmount, Offset - ReadAmount},
- case Fun(Bin, Acc) of
- {ok, Acc2} ->
- stream_data(Fd, Sp, Num - ReadAmount, MaxChunk, Fun, Acc2);
- {stop, Acc2} ->
- {ok, Acc2, Sp}
- end.
-
-write_data(Stream, <<>>) ->
- {ok, {0,0}, Stream};
-write_data(#write_stream{bytes_remaining=0} = Stream, Bin) ->
- #write_stream {
- fd = Fd,
- current_pos = CurrentPos,
- next_alloc = NextAlloc,
- min_alloc = MinAlloc
- }= Stream,
-
- NewSize = lists:max([MinAlloc, NextAlloc, size(Bin)]),
- % no space in the current segment, must alloc a new segment
- {ok, NewPos} = couch_file:expand(Fd, NewSize + ?FILE_POINTER_BYTES + ?STREAM_OFFSET_BYTES),
-
- case CurrentPos of
- 0 ->
- ok;
- _ ->
- ok = couch_file:pwrite(Fd, CurrentPos, <<NewPos:(?FILE_POINTER_BITS), NewSize:(?STREAM_OFFSET_BITS)>>)
- end,
- Stream2 = Stream#write_stream{
- current_pos=NewPos,
- bytes_remaining=NewSize,
- next_alloc=0},
- write_data(Stream2, Bin);
-write_data(#write_stream{fd=Fd, current_pos=Pos, bytes_remaining=BytesRemaining} = Stream, Bin) ->
- BytesToWrite = lists:min([size(Bin), BytesRemaining]),
- {WriteBin, Rest} = split_binary(Bin, BytesToWrite),
- ok = couch_file:pwrite(Fd, Pos, WriteBin),
- Stream2 = Stream#write_stream{
- bytes_remaining=BytesRemaining - BytesToWrite,
- current_pos=Pos + BytesToWrite
- },
- {ok, _, Stream3} = write_data(Stream2, Rest),
- {ok, {Pos, BytesRemaining}, Stream3}.
+ old_stream_data(Fd, Sp, Num - ReadAmount, MaxChunk, Fun, Fun(Bin, Acc)).
%%% Tests %%%
-
-test(Term) ->
- {ok, Fd} = couch_file:open("foo", [write]),
- {ok, Stream} = open({0,0}, Fd),
- {ok, Pos} = write_term(Stream, Term),
- {ok, Pos2} = write_term(Stream, {Term, Term}),
- close(Stream),
+read_all(Fd, PosList) ->
+ iolist_to_binary(foldl(Fd, PosList,
+ fun(Bin, Acc) ->
+ [Bin, Acc]
+ end, [])).
+
+
+test() ->
+ {ok, Fd} = couch_file:open("foo", [create,overwrite]),
+ ok = couch_file:write_header(Fd, {howdy, howdy}),
+ Bin = <<"damienkatz">>,
+ {ok, Pos} = couch_file:append_binary(Fd, Bin),
+ {ok, Bin} = couch_file:pread_binary(Fd, Pos),
+ {ok, {howdy, howdy}} = couch_file:read_header(Fd),
+ ok = couch_file:write_header(Fd, {foo, foo}),
+ {ok, {foo, foo}} = couch_file:read_header(Fd),
+
+ {ok, Stream} = open(Fd),
+ ok = write(Stream, <<"food">>),
+ ok = write(Stream, <<"foob">>),
+ {PosList, 8} = close(Stream),
+ <<"foodfoob">> = read_all(Fd, PosList),
+ {ok, Stream2} = open(Fd),
+ OneBits = <<1:(8*10)>>,
+ ZeroBits = <<0:(8*10)>>,
+ ok = write(Stream2, OneBits),
+ ok = write(Stream2, ZeroBits),
+ {PosList2, 20} = close(Stream2),
+ AllBits = iolist_to_binary([OneBits,ZeroBits]),
+ AllBits = read_all(Fd, PosList2),
couch_file:close(Fd),
- {ok, Fd2} = couch_file:open("foo", [read, write]),
- {ok, Stream2} = open({0,0}, Fd2),
- {ok, Term1} = read_term(Fd2, Pos),
- io:format("Term1: ~w ~n",[Term1]),
- {ok, Term2} = read_term(Fd2, Pos2),
- io:format("Term2: ~w ~n",[Term2]),
- {ok, PointerList} = deep_write_test(Stream2, Term, 1000, []),
- deep_read_test(Fd2, PointerList),
- close(Stream2),
- couch_file:close(Fd2).
-
-deep_read_test(_Fd, []) ->
- ok;
-deep_read_test(Fd, [Pointer | RestPointerList]) ->
- {ok, _Term} = read_term(Fd, Pointer),
- deep_read_test(Fd, RestPointerList).
-
-deep_write_test(_Stream, _Term, 0, PointerList) ->
- {ok, PointerList};
-deep_write_test(Stream, Term, N, PointerList) ->
- WriteList = lists:duplicate(random:uniform(N), Term),
- {ok, Pointer} = write_term(Stream, WriteList),
- deep_write_test(Stream, Term, N-1, [Pointer | PointerList]).
+ PosList2.
+
diff --git a/src/couchdb/couch_view_group.erl b/src/couchdb/couch_view_group.erl
index af4ea814..28679927 100644
--- a/src/couchdb/couch_view_group.erl
+++ b/src/couchdb/couch_view_group.erl
@@ -205,7 +205,7 @@ handle_info(delayed_commit, #group_state{db_name=DbName,group=Group}=State) ->
if CommittedSeq >= Group#group.current_seq ->
% save the header
Header = {Group#group.sig, get_index_header_data(Group)},
- ok = couch_file:write_header(Group#group.fd, <<$r, $c, $k, 0>>, Header),
+ ok = couch_file:write_header(Group#group.fd, Header),
{noreply, State#group_state{waiting_commit=false}};
true ->
% We can't commit the header because the database seq that's fully
@@ -261,7 +261,7 @@ handle_info({'EXIT', FromPid, reset},
handle_info({'EXIT', _FromPid, normal}, State) ->
{noreply, State};
-handle_info({'EXIT', FromPid, {{nocatch, Reason}, Trace}}, State) ->
+handle_info({'EXIT', FromPid, {{nocatch, Reason}, _Trace}}, State) ->
?LOG_DEBUG("Uncaught throw() in linked pid: ~p", [{FromPid, Reason}]),
{stop, Reason, State};
@@ -313,7 +313,9 @@ prepare_group({view, RootDir, DbName, GroupId}, ForceReset)->
if ForceReset ->
{ok, reset_file(Db, Fd, DbName, Group)};
true ->
- case (catch couch_file:read_header(Fd, <<$r, $c, $k, 0>>)) of
+ % 09 UPGRADE CODE
+ ok = couch_file:upgrade_old_header(Fd, <<$r, $c, $k, 0>>),
+ case (catch couch_file:read_header(Fd)) of
{ok, {Sig, HeaderInfo}} ->
% sigs match!
{ok, init_group(Db, Fd, Group, HeaderInfo)};
@@ -417,7 +419,7 @@ reset_group(#group{views=Views}=Group) ->
reset_file(Db, Fd, DbName, #group{sig=Sig,name=Name} = Group) ->
?LOG_DEBUG("Reseting group index \"~s\" in db ~s", [Name, DbName]),
ok = couch_file:truncate(Fd, 0),
- ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, {Sig, nil}),
+ ok = couch_file:write_header(Fd, {Sig, nil}),
init_group(Db, Fd, reset_group(Group), nil).
delete_index_file(RootDir, DbName, GroupId) ->