summaryrefslogtreecommitdiff
path: root/src/couchdb
diff options
context:
space:
mode:
Diffstat (limited to 'src/couchdb')
-rw-r--r--src/couchdb/couch_db.erl304
-rw-r--r--src/couchdb/couch_db.hrl18
-rw-r--r--src/couchdb/couch_db_update_notifier_sup.erl1
-rw-r--r--src/couchdb/couch_db_updater.erl182
-rw-r--r--src/couchdb/couch_doc.erl128
-rw-r--r--src/couchdb/couch_file.erl25
-rw-r--r--src/couchdb/couch_httpd_db.erl101
-rw-r--r--src/couchdb/couch_key_tree.erl13
-rw-r--r--src/couchdb/couch_rep.erl32
-rw-r--r--src/couchdb/couch_stream.erl46
-rw-r--r--src/couchdb/couch_util.erl4
-rw-r--r--src/couchdb/couch_view.erl4
-rw-r--r--src/couchdb/couch_view_group.erl9
13 files changed, 519 insertions, 348 deletions
diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl
index 9cb887ea..9dc1fce8 100644
--- a/src/couchdb/couch_db.erl
+++ b/src/couchdb/couch_db.erl
@@ -24,7 +24,7 @@
-export([increment_update_seq/1,get_purge_seq/1,purge_docs/2,get_last_purged/1]).
-export([start_link/3,open_doc_int/3,set_admins/2,get_admins/1,ensure_full_commit/1]).
-export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]).
--export([changes_since/5,read_doc/2]).
+-export([changes_since/5,read_doc/2,new_revid/1]).
-include("couch_db.hrl").
@@ -295,11 +295,11 @@ validate_doc_update(#db{name=DbName,user_ctx=Ctx}=Db, Doc, GetDiskDocFun) ->
end.
-prep_and_validate_update(Db, #doc{id=Id,revs={RevStart, [_NewRev|PrevRevs]}}=Doc,
- OldFullDocInfo, LeafRevsDict) ->
- case PrevRevs of
+prep_and_validate_update(Db, #doc{id=Id,revs={RevStart, Revs}}=Doc,
+ OldFullDocInfo, LeafRevsDict, AllowConflict) ->
+ case Revs of
[PrevRev|_] ->
- case dict:find({RevStart-1, PrevRev}, LeafRevsDict) of
+ case dict:find({RevStart, PrevRev}, LeafRevsDict) of
{ok, {Deleted, DiskSp, DiskRevs}} ->
case couch_doc:has_stubs(Doc) of
true ->
@@ -310,13 +310,21 @@ prep_and_validate_update(Db, #doc{id=Id,revs={RevStart, [_NewRev|PrevRevs]}}=Doc
LoadDiskDoc = fun() -> make_doc(Db,Id,Deleted,DiskSp,DiskRevs) end,
{validate_doc_update(Db, Doc, LoadDiskDoc), Doc}
end;
+ error when AllowConflict ->
+ {validate_doc_update(Db, Doc, fun() -> nil end), Doc};
error ->
{conflict, Doc}
end;
[] ->
% new doc, and we have existing revs.
+ % reuse existing deleted doc
if OldFullDocInfo#full_doc_info.deleted ->
% existing docs are deletions
+ #doc_info{revs=[#rev_info{rev={Pos, DelRevId}}|_]} =
+ couch_doc:to_doc_info(OldFullDocInfo),
+ Doc2 = Doc#doc{revs={Pos, [DelRevId]}},
+ {validate_doc_update(Db, Doc2, fun() -> nil end), Doc2};
+ AllowConflict ->
{validate_doc_update(Db, Doc, fun() -> nil end), Doc};
true ->
{conflict, Doc}
@@ -325,49 +333,51 @@ prep_and_validate_update(Db, #doc{id=Id,revs={RevStart, [_NewRev|PrevRevs]}}=Doc
-prep_and_validate_updates(_Db, [], [], AccPrepped, AccFatalErrors) ->
+prep_and_validate_updates(_Db, [], [], _AllowConflict, AccPrepped,
+ AccFatalErrors) ->
{AccPrepped, AccFatalErrors};
-prep_and_validate_updates(Db, [DocBucket|RestBuckets], [not_found|RestLookups], AccPrepped, AccErrors) ->
+prep_and_validate_updates(Db, [DocBucket|RestBuckets], [not_found|RestLookups],
+ AllowConflict, AccPrepped, AccErrors) ->
[#doc{id=Id}|_]=DocBucket,
% no existing revs are known,
{PreppedBucket, AccErrors3} = lists:foldl(
fun(#doc{revs=Revs}=Doc, {AccBucket, AccErrors2}) ->
case Revs of
- {Pos, [NewRev,_OldRev|_]} ->
- % old revs specified but none exist, a conflict
- {AccBucket, [{{Id, {Pos, NewRev}}, conflict} | AccErrors2]};
- {Pos, [NewRev]} ->
+ {0, []} ->
case validate_doc_update(Db, Doc, fun() -> nil end) of
ok ->
{[Doc | AccBucket], AccErrors2};
Error ->
- {AccBucket, [{{Id, {Pos, NewRev}}, Error} | AccErrors2]}
- end
+ {AccBucket, [{{Id, {0, []}}, Error} | AccErrors2]}
+ end;
+ _ ->
+ % old revs specified but none exist, a conflict
+ {AccBucket, [{{Id, Revs}, conflict} | AccErrors2]}
end
end,
{[], AccErrors}, DocBucket),
- prep_and_validate_updates(Db, RestBuckets, RestLookups,
+ prep_and_validate_updates(Db, RestBuckets, RestLookups, AllowConflict,
[PreppedBucket | AccPrepped], AccErrors3);
prep_and_validate_updates(Db, [DocBucket|RestBuckets],
[{ok, #full_doc_info{rev_tree=OldRevTree}=OldFullDocInfo}|RestLookups],
- AccPrepped, AccErrors) ->
+ AllowConflict, AccPrepped, AccErrors) ->
Leafs = couch_key_tree:get_all_leafs(OldRevTree),
LeafRevsDict = dict:from_list([{{Start, RevId}, {Deleted, Sp, Revs}} ||
{{Deleted, Sp, _Seq}, {Start, [RevId|_]}=Revs} <- Leafs]),
{PreppedBucket, AccErrors3} = lists:foldl(
fun(Doc, {Docs2Acc, AccErrors2}) ->
case prep_and_validate_update(Db, Doc, OldFullDocInfo,
- LeafRevsDict) of
+ LeafRevsDict, AllowConflict) of
{ok, Doc2} ->
{[Doc2 | Docs2Acc], AccErrors2};
- {Error, #doc{id=Id,revs={Pos, [NewRev|_]}}} ->
+ {Error, #doc{id=Id,revs=Revs}} ->
% Record the error
- {Docs2Acc, [{{Id, {Pos, NewRev}}, Error} |AccErrors2]}
+ {Docs2Acc, [{{Id, Revs}, Error} |AccErrors2]}
end
end,
{[], AccErrors}, DocBucket),
- prep_and_validate_updates(Db, RestBuckets, RestLookups, [PreppedBucket | AccPrepped], AccErrors3).
+ prep_and_validate_updates(Db, RestBuckets, RestLookups, AllowConflict, [PreppedBucket | AccPrepped], AccErrors3).
update_docs(#db{update_pid=UpdatePid}=Db, Docs, Options) ->
@@ -408,32 +418,12 @@ prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldI
{ok, {Start, Path}} ->
% our unflushed doc is a leaf node. Go back on the path
% to find the previous rev that's on disk.
- PrevRevResult =
- case couch_doc:has_stubs(Doc) of
- true ->
- [_PrevRevFull | [PrevRevFull | _]=PrevPath] = Path,
- case PrevRevFull of
- {_RevId, ?REV_MISSING} ->
- conflict;
- {RevId, {IsDel, DiskSp, _Seq}} ->
- DiskDoc = make_doc(Db, Id, IsDel, DiskSp, PrevPath),
- Doc2 = couch_doc:merge_stubs(Doc, DiskDoc),
- {ok, Doc2, fun() -> DiskDoc end}
- end;
- false ->
- {ok, Doc,
- fun() ->
+ LoadPrevRevFun = fun() ->
make_first_doc_on_disk(Db,Id,Start-1, tl(Path))
- end}
- end,
- case PrevRevResult of
- {ok, NewDoc, LoadPrevRevFun} ->
- case validate_doc_update(Db, NewDoc, LoadPrevRevFun) of
- ok ->
- {[NewDoc | AccValidated], AccErrors2};
- Error ->
- {AccValidated, [{NewDoc, Error} | AccErrors2]}
- end;
+ end,
+ case validate_doc_update(Db, Doc, LoadPrevRevFun) of
+ ok ->
+ {[Doc | AccValidated], AccErrors2};
Error ->
{AccValidated, [{Doc, Error} | AccErrors2]}
end;
@@ -444,9 +434,47 @@ prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldI
end
end,
{[], AccErrors}, Bucket),
- prep_and_validate_replicated_updates(Db, RestBuckets, RestOldInfo, [ValidatedBucket | AccPrepped], AccErrors3)
+ prep_and_validate_replicated_updates(Db, RestBuckets, RestOldInfo,
+ [ValidatedBucket | AccPrepped], AccErrors3)
end.
+
+
+new_revid(#doc{body=Body,revs={OldStart,OldRevs},
+ atts=Atts,deleted=Deleted}) ->
+ case [{N, T, M} || #att{name=N,type=T,md5=M} <- Atts, M /= <<>>] of
+ Atts2 when length(Atts) /= length(Atts2) ->
+ % We must have old style non-md5 attachments
+ ?l2b(integer_to_list(couch_util:rand32()));
+ Atts2 ->
+ OldRev = case OldRevs of [] -> 0; [OldRev0|_] -> OldRev0 end,
+ erlang:md5(term_to_binary([Deleted, OldStart, OldRev, Body, Atts2]))
+ end.
+
+new_revs([], OutBuckets, IdRevsAcc) ->
+ {lists:reverse(OutBuckets), IdRevsAcc};
+new_revs([Bucket|RestBuckets], OutBuckets, IdRevsAcc) ->
+ {NewBucket, IdRevsAcc3} = lists:mapfoldl(
+ fun(#doc{id=Id,revs={Start, RevIds}}=Doc, IdRevsAcc2)->
+ NewRevId = new_revid(Doc),
+ {Doc#doc{revs={Start+1, [NewRevId | RevIds]}},
+ [{{Id, {Start, RevIds}}, {ok, {Start+1, NewRevId}}} | IdRevsAcc2]}
+ end, IdRevsAcc, Bucket),
+ new_revs(RestBuckets, [NewBucket|OutBuckets], IdRevsAcc3).
+
+check_dup_atts([#att{name=N1}, #att{name=N2} | _]) when N1 == N2 ->
+ throw({bad_request, <<"Duplicate attachments">>});
+check_dup_atts([_, _ | Rest]) ->
+ check_dup_atts(Rest);
+check_dup_atts(_) ->
+ ok.
+
+sort_and_check_atts(#doc{atts=Atts}=Doc) ->
+ Atts2 = lists:sort(fun(#att{name=N1}, #att{name=N2}) -> N1 < N2 end, Atts),
+ check_dup_atts(Atts2),
+ Doc#doc{atts=Atts2}.
+
+
update_docs(Db, Docs, Options, replicated_changes) ->
couch_stats_collector:increment({couchdb, database_writes}),
DocBuckets = group_alike_docs(Docs),
@@ -467,70 +495,74 @@ update_docs(Db, Docs, Options, replicated_changes) ->
DocErrors = [],
DocBuckets3 = DocBuckets
end,
- {ok, []} = write_and_commit(Db, DocBuckets3, [merge_conflicts | Options]),
+ DocBuckets4 = [[doc_flush_atts(sort_and_check_atts(Doc), Db#db.fd)
+ || Doc <- Bucket] || Bucket <- DocBuckets3],
+ {ok, []} = write_and_commit(Db, DocBuckets4, [], [merge_conflicts | Options]),
{ok, DocErrors};
update_docs(Db, Docs, Options, interactive_edit) ->
couch_stats_collector:increment({couchdb, database_writes}),
AllOrNothing = lists:member(all_or_nothing, Options),
% go ahead and generate the new revision ids for the documents.
- Docs2 = lists:map(
- fun(#doc{id=Id,revs={Start, RevIds}}=Doc) ->
+ % separate out the NonRep documents from the rest of the documents
+ {Docs2, NonRepDocs} = lists:foldl(
+ fun(#doc{id=Id}=Doc, {DocsAcc, NonRepDocsAcc}) ->
case Id of
<<?LOCAL_DOC_PREFIX, _/binary>> ->
- Rev = case RevIds of [] -> 0; [Rev0|_] -> list_to_integer(?b2l(Rev0)) end,
- Doc#doc{revs={Start, [?l2b(integer_to_list(Rev + 1))]}};
- _ ->
- Doc#doc{revs={Start+1, [?l2b(integer_to_list(couch_util:rand32())) | RevIds]}}
+ {DocsAcc, [Doc | NonRepDocsAcc]};
+ Id->
+ {[Doc | DocsAcc], NonRepDocsAcc}
end
- end, Docs),
+ end, {[], []}, Docs),
+
DocBuckets = group_alike_docs(Docs2),
case (Db#db.validate_doc_funs /= []) orelse
lists:any(
fun(#doc{id= <<?DESIGN_DOC_PREFIX, _/binary>>}) ->
true;
- (#doc{attachments=Atts}) ->
+ (#doc{atts=Atts}) ->
Atts /= []
- end, Docs) of
+ end, Docs2) of
true ->
% lookup the doc by id and get the most recent
Ids = [Id || [#doc{id=Id}|_] <- DocBuckets],
ExistingDocInfos = get_full_doc_infos(Db, Ids),
- {DocBucketsPrepped, Failures} =
- case AllOrNothing of
- true ->
- prep_and_validate_replicated_updates(Db, DocBuckets,
- ExistingDocInfos, [], []);
- false ->
- prep_and_validate_updates(Db, DocBuckets, ExistingDocInfos, [], [])
- end,
+ {DocBucketsPrepped, PreCommitFailures} = prep_and_validate_updates(Db,
+ DocBuckets, ExistingDocInfos, AllOrNothing, [], []),
% strip out any empty buckets
DocBuckets2 = [Bucket || [_|_] = Bucket <- DocBucketsPrepped];
false ->
- Failures = [],
+ PreCommitFailures = [],
DocBuckets2 = DocBuckets
end,
- if (AllOrNothing) and (Failures /= []) ->
- {aborted, Failures};
+ if (AllOrNothing) and (PreCommitFailures /= []) ->
+ {aborted, lists:map(
+ fun({{Id,{Pos, [RevId|_]}}, Error}) ->
+ {{Id, {Pos, RevId}}, Error};
+ ({{Id,{0, []}}, Error}) ->
+ {{Id, {0, <<>>}}, Error}
+ end, PreCommitFailures)};
true ->
Options2 = if AllOrNothing -> [merge_conflicts];
true -> [] end ++ Options,
- {ok, CommitFailures} = write_and_commit(Db, DocBuckets2, Options2),
- FailDict = dict:from_list(CommitFailures ++ Failures),
- % the output for each is either {ok, NewRev} or Error
+ DocBuckets3 = [[
+ doc_flush_atts(set_new_att_revpos(
+ sort_and_check_atts(Doc)), Db#db.fd)
+ || Doc <- B] || B <- DocBuckets2],
+ {DocBuckets4, IdRevs} = new_revs(DocBuckets3, [], []),
+
+ {ok, CommitResults} = write_and_commit(Db, DocBuckets4, NonRepDocs, Options2),
+
+ ResultsDict = dict:from_list(IdRevs ++ CommitResults ++ PreCommitFailures),
{ok, lists:map(
- fun(#doc{id=Id,revs={Pos, [NewRevId|_]}}) ->
- case dict:find({Id, {Pos, NewRevId}}, FailDict) of
- {ok, Error} ->
- Error;
- error ->
- {ok, {Pos, NewRevId}}
- end
- end, Docs2)}
+ fun(#doc{id=Id,revs={Pos, RevIds}}) ->
+ {ok, Result} = dict:find({Id, {Pos, RevIds}}, ResultsDict),
+ Result
+ end, Docs)}
end.
% Returns the first available document on disk. Input list is a full rev path
@@ -545,78 +577,81 @@ make_first_doc_on_disk(Db, Id, Pos, [{_Rev, {IsDel, Sp, _Seq}} |_]=DocPath) ->
write_and_commit(#db{update_pid=UpdatePid, user_ctx=Ctx}=Db, DocBuckets,
- Options) ->
- % flush unwritten binaries to disk.
- DocBuckets2 = [[doc_flush_binaries(Doc, Db#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets],
- case gen_server:call(UpdatePid, {update_docs, DocBuckets2, Options}, infinity) of
- {ok, Conflicts} -> {ok, Conflicts};
+ NonRepDocs, Options) ->
+ case gen_server:call(UpdatePid,
+ {update_docs, DocBuckets, NonRepDocs, Options}, infinity) of
+ {ok, Results} -> {ok, Results};
retry ->
% This can happen if the db file we wrote to was swapped out by
% compaction. Retry by reopening the db and writing to the current file
{ok, Db2} = open_ref_counted(Db#db.main_pid, Ctx),
- DocBuckets3 = [[doc_flush_binaries(Doc, Db2#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets],
+ DocBuckets2 = [[doc_flush_atts(Doc, Db2#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets],
% We only retry once
close(Db2),
- case gen_server:call(UpdatePid, {update_docs, DocBuckets3, Options}, infinity) of
- {ok, Conflicts} -> {ok, Conflicts};
+ case gen_server:call(UpdatePid, {update_docs, DocBuckets2, NonRepDocs, Options}, infinity) of
+ {ok, Results} -> {ok, Results};
retry -> throw({update_error, compaction_retry})
end
end.
+set_new_att_revpos(#doc{revs={RevPos,_Revs},atts=Atts}=Doc) ->
+ Doc#doc{atts= lists:map(fun(#att{data={_Fd,_Sp}}=Att) ->
+ % already commited to disk, do not set new rev
+ Att;
+ (Att) ->
+ Att#att{revpos=RevPos+1}
+ end, Atts)}.
+
-doc_flush_binaries(Doc, Fd) ->
- NewAttachments = lists:map(
- fun({Key, {Type, BinValue}}) ->
- NewBinValue = flush_binary(Fd, BinValue),
- {Key, {Type, NewBinValue}}
- end, Doc#doc.attachments),
- Doc#doc{attachments = NewAttachments}.
+doc_flush_atts(Doc, Fd) ->
+ Doc#doc{atts=[flush_att(Fd, Att) || Att <- Doc#doc.atts]}.
-flush_binary(Fd, {Fd0, StreamPointer, Len}) when Fd0 == Fd ->
- % already written to our file, nothing to write
- {Fd, StreamPointer, Len};
+check_md5(_NewSig, <<>>) -> ok;
+check_md5(Sig1, Sig2) when Sig1 == Sig2 -> ok;
+check_md5(_, _) -> throw(data_corruption).
-flush_binary(Fd, {OtherFd, StreamPointer, Len}) when is_tuple(StreamPointer) ->
- {NewStreamData, Len} =
- couch_stream:old_copy_to_new_stream(OtherFd, StreamPointer, Len, Fd),
- {Fd, NewStreamData, Len};
+flush_att(Fd, #att{data={Fd0, _}}=Att) when Fd0 == Fd ->
+ % already written to our file, nothing to write
+ Att;
-flush_binary(Fd, {OtherFd, StreamPointer, Len}) ->
- {NewStreamData, Len} =
+flush_att(Fd, #att{data={OtherFd,StreamPointer}, md5=InMd5}=Att) ->
+ {NewStreamData, Len, Md5} =
couch_stream:copy_to_new_stream(OtherFd, StreamPointer, Fd),
- {Fd, NewStreamData, Len};
+ check_md5(Md5, InMd5),
+ Att#att{data={Fd, NewStreamData}, md5=Md5, len=Len};
-flush_binary(Fd, Bin) when is_binary(Bin) ->
- with_stream(Fd, fun(OutputStream) ->
- couch_stream:write(OutputStream, Bin)
+flush_att(Fd, #att{data=Data}=Att) when is_binary(Data) ->
+ with_stream(Fd, Att, fun(OutputStream) ->
+ couch_stream:write(OutputStream, Data)
end);
-flush_binary(Fd, {StreamFun, undefined}) when is_function(StreamFun) ->
- with_stream(Fd, fun(OutputStream) ->
- % StreamFun(MaxChunkSize, WriterFun) must call WriterFun
+flush_att(Fd, #att{data=Fun,len=undefined}=Att) when is_function(Fun) ->
+ with_stream(Fd, Att, fun(OutputStream) ->
+ % Fun(MaxChunkSize, WriterFun) must call WriterFun
% once for each chunk of the attachment,
- StreamFun(4096,
+ Fun(4096,
% WriterFun({Length, Binary}, State)
% WriterFun({0, _Footers}, State)
% Called with Length == 0 on the last time.
% WriterFun returns NewState.
fun({0, _Footers}, _) ->
ok;
- ({_Length, Bin}, _) ->
- couch_stream:write(OutputStream, Bin)
+ ({_Length, Chunk}, _) ->
+ couch_stream:write(OutputStream, Chunk)
end, ok)
end);
-flush_binary(Fd, {Fun, Len}) when is_function(Fun) ->
- with_stream(Fd, fun(OutputStream) ->
+flush_att(Fd, #att{data=Fun,len=Len}=Att) when is_function(Fun) ->
+ with_stream(Fd, Att, fun(OutputStream) ->
write_streamed_attachment(OutputStream, Fun, Len)
end).
-with_stream(Fd, Fun) ->
+with_stream(Fd, #att{md5=InMd5}=Att, Fun) ->
{ok, OutputStream} = couch_stream:open(Fd),
Fun(OutputStream),
- {StreamInfo, Len} = couch_stream:close(OutputStream),
- {Fd, StreamInfo, Len}.
+ {StreamInfo, Len, Md5} = couch_stream:close(OutputStream),
+ check_md5(Md5, InMd5),
+ Att#att{data={Fd,StreamInfo},len=Len,md5=Md5}.
write_streamed_attachment(_Stream, _F, 0) ->
@@ -832,11 +867,15 @@ doc_meta_info(#doc_info{high_seq=Seq,revs=[#rev_info{rev=Rev}|RestInfo]}, RevTre
true -> [{local_seq, Seq}]
end.
-read_doc(Fd, Pos) when is_integer(Pos) ->
- couch_file:pread_term(Fd, Pos);
-read_doc(Fd, OldStyleStreamPointer) ->
+read_doc(#db{fd=Fd}, OldStreamPointer) when is_tuple(OldStreamPointer) ->
+ % 09 UPGRADE CODE
+ couch_stream:old_read_term(Fd, OldStreamPointer);
+read_doc(#db{header=#db_header{disk_version=Version},fd=Fd}, Pos)
+ when Version == 3 ->
% 09 UPGRADE CODE
- couch_stream:old_read_term(Fd, OldStyleStreamPointer).
+ couch_file:pread_term(Fd, Pos);
+read_doc(#db{fd=Fd}, Pos) ->
+ couch_file:pread_term_md5(Fd, Pos).
doc_to_tree(#doc{revs={Start, RevIds}}=Doc) ->
@@ -850,21 +889,36 @@ doc_to_tree_simple(Doc, [RevId | Rest]) ->
[{RevId, ?REV_MISSING, doc_to_tree_simple(Doc, Rest)}].
-make_doc(#db{fd=Fd}, Id, Deleted, Bp, RevisionPath) ->
- {BodyData, BinValues} =
+make_doc(#db{fd=Fd}=Db, Id, Deleted, Bp, RevisionPath) ->
+ {BodyData, Atts} =
case Bp of
nil ->
{[], []};
_ ->
- {ok, {BodyData0, BinValues0}} = read_doc(Fd, Bp),
+ {ok, {BodyData0, Atts0}} = read_doc(Db, Bp),
{BodyData0,
- [{Name,{Type,{Fd,Sp,Len}}} || {Name,{Type,Sp,Len}} <- BinValues0]}
+ lists:map(
+ fun({Name,Type,Sp,Len,RevPos,Md5}) ->
+ #att{name=Name,
+ type=Type,
+ len=Len,
+ md5=Md5,
+ revpos=RevPos,
+ data={Fd,Sp}};
+ ({Name,{Type,Sp,Len}}) ->
+ #att{name=Name,
+ type=Type,
+ len=Len,
+ md5= <<>>,
+ revpos=0,
+ data={Fd,Sp}}
+ end, Atts0)}
end,
#doc{
id = Id,
revs = RevisionPath,
body = BodyData,
- attachments = BinValues,
+ atts = Atts,
deleted = Deleted
}.
diff --git a/src/couchdb/couch_db.hrl b/src/couchdb/couch_db.hrl
index c1f97144..4eda42e6 100644
--- a/src/couchdb/couch_db.hrl
+++ b/src/couchdb/couch_db.hrl
@@ -79,11 +79,7 @@
% the json body object.
body = {[]},
- % each attachment contains:
- % {data, Type, <<binary>>}
- % or:
- % {pointer, Type, {FileHandle, StreamPointer, Length}}
- attachments = [],
+ atts = [], % attachments
deleted = false,
@@ -93,6 +89,16 @@
}).
+-record(att,
+ {
+ name,
+ type,
+ len,
+ md5= <<>>,
+ revpos=0,
+ data
+ }).
+
-record(user_ctx,
{name=null,
@@ -109,7 +115,7 @@
% if the disk revision is incremented, then new upgrade logic will need to be
% added to couch_db_updater:init_db.
--define(LATEST_DISK_VERSION, 3).
+-define(LATEST_DISK_VERSION, 4).
-record(db_header,
{disk_version = ?LATEST_DISK_VERSION,
diff --git a/src/couchdb/couch_db_update_notifier_sup.erl b/src/couchdb/couch_db_update_notifier_sup.erl
index 290a041a..4d730fc7 100644
--- a/src/couchdb/couch_db_update_notifier_sup.erl
+++ b/src/couchdb/couch_db_update_notifier_sup.erl
@@ -29,7 +29,6 @@ start_link() ->
couch_db_update_notifier_sup, []).
init([]) ->
- Self = self(),
ok = couch_config:register(
fun("update_notification", Key, Value) -> reload_config(Key, Value) end
),
diff --git a/src/couchdb/couch_db_updater.erl b/src/couchdb/couch_db_updater.erl
index dacf2515..fd1d340f 100644
--- a/src/couchdb/couch_db_updater.erl
+++ b/src/couchdb/couch_db_updater.erl
@@ -44,12 +44,15 @@ terminate(Reason, _Srv) ->
handle_call(get_db, _From, Db) ->
{reply, {ok, Db}, Db};
-handle_call({update_docs, DocActions, Options}, _From, Db) ->
- try update_docs_int(Db, DocActions, Options) of
- {ok, Conflicts, Db2} ->
+handle_call({update_docs, GroupedDocs, NonRepDocs, Options}, _From, Db) ->
+ try update_docs_int(Db, GroupedDocs, NonRepDocs, Options) of
+ {ok, Failures, Db2} ->
ok = gen_server:call(Db#db.main_pid, {db_updated, Db2}),
- couch_db_update_notifier:notify({updated, Db2#db.name}),
- {reply, {ok, Conflicts}, Db2}
+ if Db2#db.update_seq /= Db#db.update_seq ->
+ couch_db_update_notifier:notify({updated, Db2#db.name});
+ true -> ok
+ end,
+ {reply, {ok, Failures}, Db2}
catch
throw: retry ->
{reply, retry, Db}
@@ -289,6 +292,7 @@ init_db(DbName, Filepath, Fd, Header0) ->
case element(2, Header1) of
1 -> Header1#db_header{unused = 0}; % 0.9
2 -> Header1#db_header{unused = 0}; % post 0.9 and pre 0.10
+ 3 -> Header1; % post 0.9 and pre 0.10
?LATEST_DISK_VERSION -> Header1;
_ -> throw({database_disk_version_error, "Incorrect disk header version"})
end,
@@ -364,31 +368,39 @@ refresh_validate_doc_funs(Db) ->
flush_trees(_Db, [], AccFlushedTrees) ->
{ok, lists:reverse(AccFlushedTrees)};
-flush_trees(#db{fd=Fd}=Db, [InfoUnflushed | RestUnflushed], AccFlushed) ->
+flush_trees(#db{fd=Fd,header=Header}=Db,
+ [InfoUnflushed | RestUnflushed], AccFlushed) ->
#full_doc_info{update_seq=UpdateSeq, rev_tree=Unflushed} = InfoUnflushed,
Flushed = couch_key_tree:map(
fun(_Rev, Value) ->
case Value of
- #doc{attachments=Atts,deleted=IsDeleted}=Doc ->
+ #doc{atts=Atts,deleted=IsDeleted}=Doc ->
% this node value is actually an unwritten document summary,
% write to disk.
- % make sure the Fd in the written bins is the same Fd we are.
- Bins =
+ % make sure the Fd in the written bins is the same Fd we are
+ % and convert bins, removing the FD.
+ % All bins should have been written to disk already.
+ DiskAtts =
case Atts of
[] -> [];
- [{_BName, {_Type, {BinFd, _Sp, _Len}}} | _ ] when BinFd == Fd ->
- % convert bins, removing the FD.
- % All bins should have been flushed to disk already.
- [{BinName, {BinType, BinSp, BinLen}}
- || {BinName, {BinType, {_Fd, BinSp, BinLen}}}
+ [#att{data={BinFd, _Sp}} | _ ] when BinFd == Fd ->
+ [{N,T,P,L,R,M}
+ || #att{name=N,type=T,data={_,P},md5=M,revpos=R,len=L}
<- Atts];
_ ->
% BinFd must not equal our Fd. This can happen when a database
% is being switched out during a compaction
- ?LOG_DEBUG("File where the attachments are written has changed. Possibly retrying.", []),
+ ?LOG_DEBUG("File where the attachments are written has"
+ " changed. Possibly retrying.", []),
throw(retry)
end,
- {ok, NewSummaryPointer} = couch_file:append_term(Fd, {Doc#doc.body, Bins}),
+ {ok, NewSummaryPointer} =
+ case Header#db_header.disk_version < 4 of
+ true ->
+ couch_file:append_term(Fd, {Doc#doc.body, DiskAtts});
+ false ->
+ couch_file:append_term_md5(Fd, {Doc#doc.body, DiskAtts})
+ end,
{IsDeleted, NewSummaryPointer, UpdateSeq};
_ ->
Value
@@ -403,14 +415,40 @@ merge_rev_trees(MergeConflicts, [NewDocs|RestDocsList],
#full_doc_info{id=Id,rev_tree=OldTree,deleted=OldDeleted,update_seq=OldSeq}
= OldDocInfo,
{NewRevTree, NewConflicts} = lists:foldl(
- fun(#doc{revs={Pos,[Rev|_]}}=NewDoc, {AccTree, AccConflicts2}) ->
- case couch_key_tree:merge(AccTree, [couch_db:doc_to_tree(NewDoc)]) of
- {_NewTree, conflicts}
- when (not OldDeleted) and (not MergeConflicts) ->
- {AccTree, [{{Id, {Pos,Rev}}, conflict} | AccConflicts2]};
- {NewTree, _} ->
+ fun(#doc{revs={Pos,[_Rev|PrevRevs]}}=NewDoc, {AccTree, AccConflicts2}) ->
+ if not MergeConflicts ->
+ case couch_key_tree:merge(AccTree, [couch_db:doc_to_tree(NewDoc)]) of
+ {_NewTree, conflicts} when (not OldDeleted) ->
+ {AccTree, [{{Id, {Pos-1,PrevRevs}}, conflict} | AccConflicts2]};
+ {NewTree, no_conflicts} when AccTree == NewTree ->
+ % the tree didn't change at all
+ % meaning we are saving a rev that's already
+ % been editted again.
+ if (Pos == 1) and OldDeleted ->
+ % this means we are recreating a brand new document
+ % into a state that already existed before.
+ % put the rev into a subsequent edit of the deletion
+ #doc_info{revs=[#rev_info{rev={OldPos,OldRev}}|_]} =
+ couch_doc:to_doc_info(OldDocInfo),
+ NewRevId = couch_db:new_revid(
+ NewDoc#doc{revs={OldPos, [OldRev]}}),
+ NewDoc2 = NewDoc#doc{revs={OldPos + 1, [NewRevId, OldRev]}},
+ {NewTree2, _} = couch_key_tree:merge(AccTree,
+ [couch_db:doc_to_tree(NewDoc2)]),
+ % we changed the rev id, this tells the caller we did.
+ {NewTree2, [{{Id, {Pos-1,PrevRevs}}, {ok, {OldPos + 1, NewRevId}}}
+ | AccConflicts2]};
+ true ->
+ {AccTree, [{{Id, {Pos-1,PrevRevs}}, conflict} | AccConflicts2]}
+ end;
+ {NewTree, _} ->
+ {NewTree, AccConflicts2}
+ end;
+ true ->
+ {NewTree, _} = couch_key_tree:merge(AccTree,
+ [couch_db:doc_to_tree(NewDoc)]),
{NewTree, AccConflicts2}
- end
+ end
end,
{OldTree, AccConflicts}, NewDocs),
if NewRevTree == OldTree ->
@@ -444,26 +482,13 @@ stem_full_doc_infos(#db{revs_limit=Limit}, DocInfos) ->
[Info#full_doc_info{rev_tree=couch_key_tree:stem(Tree, Limit)} ||
#full_doc_info{rev_tree=Tree}=Info <- DocInfos].
-
-update_docs_int(Db, DocsList, Options) ->
+update_docs_int(Db, DocsList, NonRepDocs, Options) ->
#db{
fulldocinfo_by_id_btree = DocInfoByIdBTree,
docinfo_by_seq_btree = DocInfoBySeqBTree,
update_seq = LastSeq
} = Db,
- % separate out the NonRep documents from the rest of the documents
- {DocsList2, NonRepDocs} = lists:foldl(
- fun([#doc{id=Id}=Doc | _]=Docs, {DocsListAcc, NonRepDocsAcc}) ->
- case Id of
- <<?LOCAL_DOC_PREFIX, _/binary>> ->
- {DocsListAcc, [Doc | NonRepDocsAcc]};
- Id->
- {[Docs | DocsListAcc], NonRepDocsAcc}
- end
- end, {[], []}, DocsList),
-
- Ids = [Id || [#doc{id=Id}|_] <- DocsList2],
-
+ Ids = [Id || [#doc{id=Id}|_] <- DocsList],
% lookup up the old documents, if they exist.
OldDocLookups = couch_btree:lookup(DocInfoByIdBTree, Ids),
OldDocInfos = lists:zipwith(
@@ -477,7 +502,7 @@ update_docs_int(Db, DocsList, Options) ->
% Merge the new docs into the revision trees.
{ok, NewDocInfos0, RemoveSeqs, Conflicts, NewSeq} = merge_rev_trees(
lists:member(merge_conflicts, Options),
- DocsList2, OldDocInfos, [], [], [], LastSeq),
+ DocsList, OldDocInfos, [], [], [], LastSeq),
NewFullDocInfos = stem_full_doc_infos(Db, NewDocInfos0),
@@ -518,33 +543,43 @@ update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) ->
Ids = [Id || #doc{id=Id} <- Docs],
OldDocLookups = couch_btree:lookup(Btree, Ids),
BtreeEntries = lists:zipwith(
- fun(#doc{id=Id,deleted=Delete,revs={0,[RevStr]},body=Body}, OldDocLookup) ->
- NewRev = list_to_integer(?b2l(RevStr)),
+ fun(#doc{id=Id,deleted=Delete,revs={0,PrevRevs},body=Body}, OldDocLookup) ->
+ case PrevRevs of
+ [RevStr|_] ->
+ PrevRev = list_to_integer(?b2l(RevStr));
+ [] ->
+ PrevRev = 0
+ end,
OldRev =
case OldDocLookup of
{ok, {_, {OldRev0, _}}} -> OldRev0;
not_found -> 0
end,
- case OldRev + 1 == NewRev of
+ case OldRev == PrevRev of
true ->
case Delete of
- false -> {update, {Id, {NewRev, Body}}};
- true -> {remove, Id}
+ false -> {update, {Id, {PrevRev + 1, PrevRevs, Body}}};
+ true -> {remove, Id, PrevRevs}
end;
false ->
- {conflict, {Id, {0, RevStr}}}
+ {conflict, {Id, {0, PrevRevs}}}
end
-
end, Docs, OldDocLookups),
- BtreeIdsRemove = [Id || {remove, Id} <- BtreeEntries],
- BtreeIdsUpdate = [ByIdDocInfo || {update, ByIdDocInfo} <- BtreeEntries],
- Conflicts = [{conflict, IdRev} || {conflict, IdRev} <- BtreeEntries],
+ BtreeIdsRemove = [Id || {remove, Id, _PrevRevs} <- BtreeEntries],
+ BtreeIdsUpdate = [{Id, {NewRev, Body}} || {update, {Id, {NewRev, _OldRevs, Body}}} <- BtreeEntries],
+ Results =
+ [{{Id, {0, PrevRevs}}, {ok, {0, <<"0">>}}}
+ || {remove, Id, PrevRevs} <- BtreeEntries] ++
+ [{{Id, {0, PrevRevs}}, {ok, {0, ?l2b(integer_to_list(NewRev))}}}
+ || {update, {Id, {NewRev, PrevRevs, _Body}}} <- BtreeEntries] ++
+ [{IdRevs, conflict}
+ || {conflict, IdRevs} <- BtreeEntries],
{ok, Btree2} =
couch_btree:add_remove(Btree, BtreeIdsUpdate, BtreeIdsRemove),
- {ok, Conflicts, Db#db{local_docs_btree = Btree2}}.
+ {ok, Results, Db#db{local_docs_btree = Btree2}}.
commit_data(Db) ->
@@ -594,43 +629,42 @@ commit_data(#db{fd=Fd,header=OldHeader,fsync_options=FsyncOptions}=Db, Delay) ->
end.
-copy_doc_attachments(SrcFd, SrcSp, DestFd) ->
- {ok, {BodyData, BinInfos}} = couch_db:read_doc(SrcFd, SrcSp),
+copy_doc_attachments(#db{fd=SrcFd}=SrcDb, {Pos,_RevId}, SrcSp, DestFd) ->
+ {ok, {BodyData, BinInfos}} = couch_db:read_doc(SrcDb, SrcSp),
% copy the bin values
NewBinInfos = lists:map(
fun({Name, {Type, BinSp, Len}}) when is_tuple(BinSp) orelse BinSp == null ->
% 09 UPGRADE CODE
- {NewBinSp, Len} = couch_stream:old_copy_to_new_stream(SrcFd, BinSp, Len, DestFd),
- {Name, {Type, NewBinSp, Len}};
+ {NewBinSp, Len, Md5} = couch_stream:old_copy_to_new_stream(SrcFd, BinSp, Len, DestFd),
+ {Name, Type, NewBinSp, Len, Pos, Md5};
({Name, {Type, BinSp, Len}}) ->
- {NewBinSp, Len} = couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd),
- {Name, {Type, NewBinSp, Len}}
+ % 09 UPGRADE CODE
+ {NewBinSp, Len, Md5} = couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd),
+ {Name, Type, NewBinSp, Len, Pos, Md5};
+ ({Name, Type, BinSp, Len, RevPos, Md5}) ->
+ {NewBinSp, Len, Md5} = couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd),
+ {Name, Type, NewBinSp, Len, RevPos, Md5}
end, BinInfos),
{BodyData, NewBinInfos}.
-copy_rev_tree_attachments(_SrcFd, _DestFd, []) ->
- [];
-copy_rev_tree_attachments(SrcFd, DestFd, [{Start, Tree} | RestTree]) ->
- % root nner node, only copy info/data from leaf nodes
- [Tree2] = copy_rev_tree_attachments(SrcFd, DestFd, [Tree]),
- [{Start, Tree2} | copy_rev_tree_attachments(SrcFd, DestFd, RestTree)];
-copy_rev_tree_attachments(SrcFd, DestFd, [{RevId, {IsDel, Sp, Seq}, []} | RestTree]) ->
- % This is a leaf node, copy it over
- DocBody = copy_doc_attachments(SrcFd, Sp, DestFd),
- [{RevId, {IsDel, DocBody, Seq}, []} | copy_rev_tree_attachments(SrcFd, DestFd, RestTree)];
-copy_rev_tree_attachments(SrcFd, DestFd, [{RevId, _, SubTree} | RestTree]) ->
- % inner node, only copy info/data from leaf nodes
- [{RevId, ?REV_MISSING, copy_rev_tree_attachments(SrcFd, DestFd, SubTree)} | copy_rev_tree_attachments(SrcFd, DestFd, RestTree)].
-
-
-copy_docs(#db{fd=SrcFd}=Db, #db{fd=DestFd}=NewDb, InfoBySeq, Retry) ->
+copy_rev_tree_attachments(SrcDb, DestFd, Tree) ->
+ couch_key_tree:map(
+ fun(Rev, {IsDel, Sp, Seq}, leaf) ->
+ DocBody = copy_doc_attachments(SrcDb, Rev, Sp, DestFd),
+ {IsDel, DocBody, Seq};
+ (_, _, branch) ->
+ ?REV_MISSING
+ end, Tree).
+
+
+copy_docs(Db, #db{fd=DestFd}=NewDb, InfoBySeq, Retry) ->
Ids = [Id || #doc_info{id=Id} <- InfoBySeq],
LookupResults = couch_btree:lookup(Db#db.fulldocinfo_by_id_btree, Ids),
% write out the attachments
NewFullDocInfos0 = lists:map(
fun({ok, #full_doc_info{rev_tree=RevTree}=Info}) ->
- Info#full_doc_info{rev_tree=copy_rev_tree_attachments(SrcFd, DestFd, RevTree)}
+ Info#full_doc_info{rev_tree=copy_rev_tree_attachments(Db, DestFd, RevTree)}
end, LookupResults),
% write out the docs
% we do this in 2 stages so the docs are written out contiguously, making
@@ -639,7 +673,7 @@ copy_docs(#db{fd=SrcFd}=Db, #db{fd=DestFd}=NewDb, InfoBySeq, Retry) ->
fun(#full_doc_info{rev_tree=RevTree}=Info) ->
Info#full_doc_info{rev_tree=couch_key_tree:map_leafs(
fun(_Key, {IsDel, DocBody, Seq}) ->
- {ok, Pos} = couch_file:append_term(DestFd, DocBody),
+ {ok, Pos} = couch_file:append_term_md5(DestFd, DocBody),
{IsDel, Pos, Seq}
end, RevTree)}
end, NewFullDocInfos0),
diff --git a/src/couchdb/couch_doc.erl b/src/couchdb/couch_doc.erl
index 4c23155e..72b56d53 100644
--- a/src/couchdb/couch_doc.erl
+++ b/src/couchdb/couch_doc.erl
@@ -13,7 +13,7 @@
-module(couch_doc).
-export([to_doc_info/1,to_doc_info_path/1,parse_rev/1,parse_revs/1,rev_to_str/1,rev_to_strs/1]).
--export([bin_foldl/3,bin_size/1,bin_to_binary/1,get_validate_doc_fun/1]).
+-export([att_foldl/3,get_validate_doc_fun/1]).
-export([from_json_obj/1,to_json_obj/2,has_stubs/1, merge_stubs/2]).
-export([validate_docid/1]).
@@ -23,7 +23,7 @@
to_json_rev(0, []) ->
[];
to_json_rev(Start, [FirstRevId|_]) ->
- [{<<"_rev">>, ?l2b([integer_to_list(Start),"-",FirstRevId])}].
+ [{<<"_rev">>, ?l2b([integer_to_list(Start),"-",revid_to_str(FirstRevId)])}].
to_json_body(true, _Body) ->
[{<<"_deleted">>, true}];
@@ -35,12 +35,18 @@ to_json_revisions(Options, Start, RevIds) ->
false -> [];
true ->
[{<<"_revisions">>, {[{<<"start">>, Start},
- {<<"ids">>, RevIds}]}}]
+ {<<"ids">>, [revid_to_str(R) ||R <- RevIds]}]}}]
end.
-rev_to_str({Pos, RevId}) ->
- ?l2b([integer_to_list(Pos),"-",RevId]).
+revid_to_str(RevId) when size(RevId) == 16 ->
+ ?l2b(couch_util:to_hex(RevId));
+revid_to_str(RevId) ->
+ RevId.
+rev_to_str({Pos, RevId}) ->
+ ?l2b([integer_to_list(Pos),"-",revid_to_str(RevId)]).
+
+
rev_to_strs([]) ->
[];
rev_to_strs([{Pos, RevId}| Rest]) ->
@@ -66,17 +72,12 @@ to_json_meta(Meta) ->
to_json_attachment_stubs(Attachments) ->
BinProps = lists:map(
- fun({Name, {Type, {_RcvFun, Length}}}) ->
- {Name, {[
- {<<"stub">>, true},
- {<<"content_type">>, Type},
- {<<"length">>, Length}
- ]}};
- ({Name, {Type, BinValue}}) ->
+ fun(#att{name=Name,type=Type,len=Length,revpos=Pos}) ->
{Name, {[
{<<"stub">>, true},
{<<"content_type">>, Type},
- {<<"length">>, bin_size(BinValue)}
+ {<<"length">>, Length},
+ {<<"revpos">>, Pos}
]}}
end,
Attachments),
@@ -85,24 +86,26 @@ to_json_attachment_stubs(Attachments) ->
_ -> [{<<"_attachments">>, {BinProps}}]
end.
-to_json_attachments(Attachments) ->
- BinProps = lists:map(
- fun({Name, {Type, {RcvFun, Length}}}) ->
- Data = read_streamed_attachment(RcvFun, Length, _Acc = []),
- {Name, {[
- {<<"content_type">>, Type},
+to_json_attachments(Atts) ->
+ AttProps = lists:map(
+ fun(#att{data=Fun,len=Len}=Att) when is_function(Fun) ->
+ Data = read_streamed_attachment(Fun, Len, _Acc = []),
+ {Att#att.name, {[
+ {<<"content_type">>, Att#att.type},
+ {<<"revpos">>, Att#att.revpos},
{<<"data">>, couch_util:encodeBase64(Data)}
]}};
- ({Name, {Type, BinValue}}) ->
- {Name, {[
- {<<"content_type">>, Type},
- {<<"data">>, couch_util:encodeBase64(bin_to_binary(BinValue))}
+ (Att) ->
+ {Att#att.name, {[
+ {<<"content_type">>, Att#att.type},
+ {<<"revpos">>, Att#att.revpos},
+ {<<"data">>, couch_util:encodeBase64(att_to_iolist(Att))}
]}}
end,
- Attachments),
- case BinProps of
+ Atts),
+ case AttProps of
[] -> [];
- _ -> [{<<"_attachments">>, {BinProps}}]
+ _ -> [{<<"_attachments">>, {AttProps}}]
end.
to_json_attachments(Attachments, Options) ->
@@ -120,7 +123,7 @@ to_json_obj(#doc{id=Id,deleted=Del,body=Body,revs={Start, RevIds},
++ to_json_body(Del, Body)
++ to_json_revisions(Options, Start, RevIds)
++ to_json_meta(Meta)
- ++ to_json_attachments(Doc#doc.attachments, Options)
+ ++ to_json_attachments(Doc#doc.atts, Options)
}.
from_json_obj({Props}) ->
@@ -129,12 +132,24 @@ from_json_obj({Props}) ->
from_json_obj(_Other) ->
throw({bad_request, "Document must be a JSON object"}).
+parse_revid(RevId) when size(RevId) == 32 ->
+ RevInt = erlang:list_to_integer(?b2l(RevId), 16),
+ <<RevInt:128>>;
+parse_revid(RevId) when length(RevId) == 32 ->
+ RevInt = erlang:list_to_integer(RevId, 16),
+ <<RevInt:128>>;
+parse_revid(RevId) when is_binary(RevId) ->
+ RevId;
+parse_revid(RevId) when is_list(RevId) ->
+ ?l2b(RevId).
+
+
parse_rev(Rev) when is_binary(Rev) ->
parse_rev(?b2l(Rev));
parse_rev(Rev) when is_list(Rev) ->
SplitRev = lists:splitwith(fun($-) -> false; (_) -> true end, Rev),
case SplitRev of
- {Pos, [$- | RevId]} -> {list_to_integer(Pos), ?l2b(RevId)};
+ {Pos, [$- | RevId]} -> {list_to_integer(Pos), parse_revid(RevId)};
_Else -> throw({bad_request, <<"Invalid rev format">>})
end;
parse_rev(_BadRev) ->
@@ -176,20 +191,23 @@ transfer_fields([{<<"_rev">>, _Rev} | Rest], Doc) ->
transfer_fields(Rest,Doc);
transfer_fields([{<<"_attachments">>, {JsonBins}} | Rest], Doc) ->
- Bins = lists:flatmap(fun({Name, {BinProps}}) ->
+ Atts = lists:map(fun({Name, {BinProps}}) ->
case proplists:get_value(<<"stub">>, BinProps) of
true ->
Type = proplists:get_value(<<"content_type">>, BinProps),
Length = proplists:get_value(<<"length">>, BinProps),
- [{Name, {stub, Type, Length}}];
+ RevPos = proplists:get_value(<<"revpos">>, BinProps, 0),
+ #att{name=Name, data=stub, type=Type, len=Length, revpos=RevPos};
_ ->
Value = proplists:get_value(<<"data">>, BinProps),
Type = proplists:get_value(<<"content_type">>, BinProps,
?DEFAULT_ATTACHMENT_CONTENT_TYPE),
- [{Name, {Type, couch_util:decodeBase64(Value)}}]
+ RevPos = proplists:get_value(<<"revpos">>, BinProps, 0),
+ Bin = couch_util:decodeBase64(Value),
+ #att{name=Name, data=Bin, type=Type, len=size(Bin), revpos=RevPos}
end
end, JsonBins),
- transfer_fields(Rest, Doc#doc{attachments=Bins});
+ transfer_fields(Rest, Doc#doc{atts=Atts});
transfer_fields([{<<"_revisions">>, {Props}} | Rest], Doc) ->
RevIds = proplists:get_value(<<"ids">>, Props),
@@ -203,7 +221,8 @@ transfer_fields([{<<"_revisions">>, {Props}} | Rest], Doc) ->
end,
[throw({doc_validation, "RevId isn't a string"}) ||
RevId <- RevIds, not is_binary(RevId)],
- transfer_fields(Rest, Doc#doc{revs={Start, RevIds}});
+ RevIds2 = [parse_revid(RevId) || RevId <- RevIds],
+ transfer_fields(Rest, Doc#doc{revs={Start, RevIds2}});
transfer_fields([{<<"_deleted">>, B} | Rest], Doc) when (B==true) or (B==false) ->
transfer_fields(Rest, Doc#doc{deleted=B});
@@ -253,23 +272,20 @@ to_doc_info_path(#full_doc_info{id=Id,rev_tree=Tree}) ->
-bin_foldl(Bin, Fun, Acc) when is_binary(Bin) ->
+att_foldl(#att{data=Bin}, Fun, Acc) when is_binary(Bin) ->
Fun(Bin, Acc);
-bin_foldl({Fd, Sp, Len}, Fun, Acc) when is_tuple(Sp) orelse Sp == null ->
+att_foldl(#att{data={Fd,Sp},len=Len}, Fun, Acc) when is_tuple(Sp) orelse Sp == null ->
% 09 UPGRADE CODE
couch_stream:old_foldl(Fd, Sp, Len, Fun, Acc);
-bin_foldl({Fd, Sp, _Len}, Fun, Acc) ->
- couch_stream:foldl(Fd, Sp, Fun, Acc).
-
-bin_size(Bin) when is_binary(Bin) ->
- size(Bin);
-bin_size({_Fd, _Sp, Len}) ->
- Len.
+att_foldl(#att{data={Fd,Sp},md5=Md5}, Fun, Acc) ->
+ couch_stream:foldl(Fd, Sp, Md5, Fun, Acc).
-bin_to_binary(Bin) when is_binary(Bin) ->
+att_to_iolist(#att{data=Bin}) when is_binary(Bin) ->
Bin;
-bin_to_binary({Fd, Sp, _Len}) ->
- lists:reverse(couch_stream:foldl(Fd, Sp, fun(Bin,Acc) -> [Bin|Acc] end, [])).
+att_to_iolist(#att{data=Iolist}) when is_list(Iolist) ->
+ Iolist;
+att_to_iolist(#att{data={Fd,Sp},md5=Md5}) ->
+ lists:reverse(couch_stream:foldl(Fd, Sp, Md5, fun(Bin,Acc) -> [Bin|Acc] end, [])).
get_validate_doc_fun(#doc{body={Props}}) ->
Lang = proplists:get_value(<<"language">>, Props, <<"javascript">>),
@@ -284,24 +300,24 @@ get_validate_doc_fun(#doc{body={Props}}) ->
end.
-has_stubs(#doc{attachments=Bins}) ->
- has_stubs(Bins);
+has_stubs(#doc{atts=Atts}) ->
+ has_stubs(Atts);
has_stubs([]) ->
false;
-has_stubs([{_Name, {stub, _, _}}|_]) ->
+has_stubs([#att{data=stub}|_]) ->
true;
-has_stubs([_Bin|Rest]) ->
+has_stubs([_Att|Rest]) ->
has_stubs(Rest).
-merge_stubs(#doc{attachments=MemBins}=StubsDoc, #doc{attachments=DiskBins}) ->
- BinDict = dict:from_list(DiskBins),
+merge_stubs(#doc{atts=MemBins}=StubsDoc, #doc{atts=DiskBins}) ->
+ BinDict = dict:from_list([{Name, Att} || #att{name=Name}=Att <- DiskBins]),
MergedBins = lists:map(
- fun({Name, {stub, _, _}}) ->
- {Name, dict:fetch(Name, BinDict)};
- ({Name, Value}) ->
- {Name, Value}
+ fun(#att{name=Name, data=stub}) ->
+ dict:fetch(Name, BinDict);
+ (Att) ->
+ Att
end, MemBins),
- StubsDoc#doc{attachments= MergedBins}.
+ StubsDoc#doc{atts= MergedBins}.
read_streamed_attachment(_RcvFun, 0, Acc) ->
list_to_binary(lists:reverse(Acc));
diff --git a/src/couchdb/couch_file.erl b/src/couchdb/couch_file.erl
index aec632fb..4c450163 100644
--- a/src/couchdb/couch_file.erl
+++ b/src/couchdb/couch_file.erl
@@ -25,6 +25,8 @@
-export([open/1, open/2, close/1, bytes/1, sync/1, append_binary/2,old_pread/3]).
-export([append_term/2, pread_term/2, pread_iolist/2, write_header/2]).
-export([pread_binary/2, read_header/1, truncate/2, upgrade_old_header/2]).
+-export([append_term_md5/2, pread_iolist_md5/2, pread_binary_md5/2]).
+-export([pread_term_md5/2]).
-export([init/1, terminate/2, handle_call/3, handle_cast/2, code_change/3, handle_info/2]).
%%----------------------------------------------------------------------
@@ -67,6 +69,9 @@ open(Filepath, Options) ->
append_term(Fd, Term) ->
append_binary(Fd, term_to_binary(Term)).
+
+append_term_md5(Fd, Term) ->
+ append_binary_md5(Fd, term_to_binary(Term)).
%%----------------------------------------------------------------------
@@ -80,6 +85,11 @@ append_term(Fd, Term) ->
append_binary(Fd, Bin) ->
Size = iolist_size(Bin),
gen_server:call(Fd, {append_bin, [<<Size:32/integer>>, Bin]}, infinity).
+
+append_binary_md5(Fd, Bin) ->
+ Size = iolist_size(Bin),
+ gen_server:call(Fd, {append_bin,
+ [<<Size:32/integer>>, erlang:md5(Bin), Bin]}, infinity).
%%----------------------------------------------------------------------
@@ -94,6 +104,10 @@ pread_term(Fd, Pos) ->
{ok, Bin} = pread_binary(Fd, Pos),
{ok, binary_to_term(Bin)}.
+pread_term_md5(Fd, Pos) ->
+ {ok, Bin} = pread_binary_md5(Fd, Pos),
+ {ok, binary_to_term(Bin)}.
+
%%----------------------------------------------------------------------
%% Purpose: Reads a binrary from a file that was written with append_binary
@@ -111,6 +125,17 @@ pread_iolist(Fd, Pos) ->
<<Len:32/integer>> = iolist_to_binary(LenIolist),
{ok, Iolist, _} = read_raw_iolist(Fd, NextPos, Len),
{ok, Iolist}.
+
+pread_binary_md5(Fd, Pos) ->
+ {ok, L} = pread_iolist_md5(Fd, Pos),
+ {ok, iolist_to_binary(L)}.
+
+pread_iolist_md5(Fd, Pos) ->
+ {ok, LenIolist, NextPos} =read_raw_iolist(Fd, Pos, 20),
+ <<Len:32/integer, Md5/binary>> = iolist_to_binary(LenIolist),
+ {ok, Iolist, _} = read_raw_iolist(Fd, NextPos, Len),
+ Md5 = erlang:md5(Iolist),
+ {ok, Iolist}.
read_raw_iolist(Fd, Pos, Len) ->
BlockOffset = Pos rem ?SIZE_BLOCK,
diff --git a/src/couchdb/couch_httpd_db.erl b/src/couchdb/couch_httpd_db.erl
index 594b6455..0ec1cbda 100644
--- a/src/couchdb/couch_httpd_db.erl
+++ b/src/couchdb/couch_httpd_db.erl
@@ -579,13 +579,15 @@ db_doc_req(#httpd{method='GET'}=Req, Db, DocId) ->
[] ->
Doc = couch_doc_open(Db, DocId, Rev, Options),
DiskEtag = couch_httpd:doc_etag(Doc),
- couch_httpd:etag_respond(Req, DiskEtag, fun() ->
- Headers = case Doc#doc.meta of
- [] -> [{"Etag", DiskEtag}]; % output etag only when we have no meta
- _ -> []
- end,
- send_json(Req, 200, Headers, couch_doc:to_json_obj(Doc, Options))
- end);
+ case Doc#doc.meta of
+ [] ->
+ % output etag only when we have no meta
+ couch_httpd:etag_respond(Req, DiskEtag, fun() ->
+ send_json(Req, 200, [{"Etag", DiskEtag}], couch_doc:to_json_obj(Doc, Options))
+ end);
+ _ ->
+ send_json(Req, 200, [], couch_doc:to_json_obj(Doc, Options))
+ end;
_ ->
{ok, Results} = couch_db:open_doc_revs(Db, DocId, Revs, Options),
{ok, Resp} = start_json_response(Req, 200),
@@ -626,14 +628,23 @@ db_doc_req(#httpd{method='POST'}=Req, Db, DocId) ->
Rev = couch_doc:parse_rev(list_to_binary(proplists:get_value("_rev", Form))),
{ok, [{ok, Doc}]} = couch_db:open_doc_revs(Db, DocId, [Rev], []),
- NewAttachments = [
- {validate_attachment_name(Name), {list_to_binary(ContentType), Content}} ||
+ UpdatedAtts = [
+ #att{name=validate_attachment_name(Name),
+ type=list_to_binary(ContentType),
+ data=Content} ||
{Name, {ContentType, _}, Content} <-
proplists:get_all_values("_attachments", Form)
],
- #doc{attachments=Attachments} = Doc,
+ #doc{atts=OldAtts} = Doc,
+ OldAtts2 = lists:flatmap(
+ fun(#att{name=OldName}=Att) ->
+ case [1 || A <- UpdatedAtts, A#att.name == OldName] of
+ [] -> [Att]; % the attachment wasn't in the UpdatedAtts, return it
+ _ -> [] % the attachment was in the UpdatedAtts, drop it
+ end
+ end, OldAtts),
NewDoc = Doc#doc{
- attachments = Attachments ++ NewAttachments
+ atts = UpdatedAtts ++ OldAtts2
},
{ok, NewRev} = couch_db:update_doc(Db, NewDoc, []),
@@ -765,13 +776,12 @@ db_attachment_req(#httpd{method='GET'}=Req, Db, DocId, FileNameParts) ->
options=Options
} = parse_doc_query(Req),
#doc{
- attachments=Attachments
+ atts=Atts
} = Doc = couch_doc_open(Db, DocId, Rev, Options),
-
- case proplists:get_value(FileName, Attachments) of
- undefined ->
+ case [A || A <- Atts, A#att.name == FileName] of
+ [] ->
throw({not_found, "Document is missing attachment"});
- {Type, Bin} ->
+ [#att{type=Type}=Att] ->
Etag = couch_httpd:doc_etag(Doc),
couch_httpd:etag_respond(Req, Etag, fun() ->
{ok, Resp} = start_chunked_response(Req, 200, [
@@ -784,7 +794,7 @@ db_attachment_req(#httpd{method='GET'}=Req, Db, DocId, FileNameParts) ->
% open to discussion.
% {"Content-Length", integer_to_list(couch_doc:bin_size(Bin))}
]),
- couch_doc:bin_foldl(Bin,
+ couch_doc:att_foldl(Att,
fun(BinSegment, _) -> send_chunk(Resp, BinSegment) end,[]),
send_chunk(Resp, "")
end)
@@ -798,31 +808,36 @@ db_attachment_req(#httpd{method=Method}=Req, Db, DocId, FileNameParts)
lists:map(fun binary_to_list/1,
FileNameParts),"/")),
- NewAttachment = case Method of
+ NewAtt = case Method of
'DELETE' ->
[];
_ ->
- % see couch_db:doc_flush_binaries for usage of this structure
- [{FileName, {
- case couch_httpd:header_value(Req,"Content-Type") of
- undefined ->
- % We could throw an error here or guess by the FileName.
- % Currently, just giving it a default.
- <<"application/octet-stream">>;
- CType ->
- list_to_binary(CType)
- end,
- case couch_httpd:header_value(Req,"Content-Length") of
- undefined ->
- {fun(MaxChunkSize, ChunkFun, InitState) ->
- couch_httpd:recv_chunked(Req, MaxChunkSize,
- ChunkFun, InitState)
- end, undefined};
- Length ->
- {fun() -> couch_httpd:recv(Req, 0) end,
- list_to_integer(Length)}
- end
- }}]
+ [#att{
+ name=FileName,
+ type = case couch_httpd:header_value(Req,"Content-Type") of
+ undefined ->
+ % We could throw an error here or guess by the FileName.
+ % Currently, just giving it a default.
+ <<"application/octet-stream">>;
+ CType ->
+ list_to_binary(CType)
+ end,
+ data = case couch_httpd:header_value(Req,"Content-Length") of
+ undefined ->
+ fun(MaxChunkSize, ChunkFun, InitState) ->
+ couch_httpd:recv_chunked(Req, MaxChunkSize,
+ ChunkFun, InitState)
+ end;
+ Length ->
+ fun() -> couch_httpd:recv(Req, 0) end
+ end,
+ len = case couch_httpd:header_value(Req,"Content-Length") of
+ undefined ->
+ undefined;
+ Length ->
+ list_to_integer(Length)
+ end
+ }]
end,
Doc = case extract_header_rev(Req, couch_httpd:qs_value(Req, "rev")) of
@@ -835,9 +850,9 @@ db_attachment_req(#httpd{method=Method}=Req, Db, DocId, FileNameParts)
end
end,
- #doc{attachments=Attachments} = Doc,
+ #doc{atts=Atts} = Doc,
DocEdited = Doc#doc{
- attachments = NewAttachment ++ proplists:delete(FileName, Attachments)
+ atts = NewAtt ++ [A || A <- Atts, A#att.name /= FileName]
},
{ok, UpdatedRev} = couch_db:update_doc(Db, DocEdited, []),
#db{name=DbName} = Db,
@@ -941,9 +956,9 @@ parse_copy_destination_header(Req) ->
end.
validate_attachment_names(Doc) ->
- lists:foreach(fun({Name, _}) ->
+ lists:foreach(fun(#att{name=Name}) ->
validate_attachment_name(Name)
- end, Doc#doc.attachments).
+ end, Doc#doc.atts).
validate_attachment_name(Name) when is_list(Name) ->
validate_attachment_name(list_to_binary(Name));
diff --git a/src/couchdb/couch_key_tree.erl b/src/couchdb/couch_key_tree.erl
index 647a3455..830820f3 100644
--- a/src/couchdb/couch_key_tree.erl
+++ b/src/couchdb/couch_key_tree.erl
@@ -278,13 +278,20 @@ count_leafs_simple([{_Key, _Value, SubTree} | RestTree]) ->
map(_Fun, []) ->
[];
map(Fun, [{Pos, Tree}|Rest]) ->
- [NewTree] = map_simple(Fun, Pos, [Tree]),
- [{Pos, NewTree} | map(Fun, Rest)].
+ case erlang:fun_info(Fun, arity) of
+ {arity, 2} ->
+ [NewTree] = map_simple(fun(A,B,_C) -> Fun(A,B) end, Pos, [Tree]),
+ [{Pos, NewTree} | map(Fun, Rest)];
+ {arity, 3} ->
+ [NewTree] = map_simple(Fun, Pos, [Tree]),
+ [{Pos, NewTree} | map(Fun, Rest)]
+ end.
map_simple(_Fun, _Pos, []) ->
[];
map_simple(Fun, Pos, [{Key, Value, SubTree} | RestTree]) ->
- Value2 = Fun({Pos, Key}, Value),
+ Value2 = Fun({Pos, Key}, Value,
+ if SubTree == [] -> leaf; true -> branch end),
[{Key, Value2, map_simple(Fun, Pos + 1, SubTree)} | map_simple(Fun, Pos, RestTree)].
diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl
index 7aec3b5d..3cb90347 100644
--- a/src/couchdb/couch_rep.erl
+++ b/src/couchdb/couch_rep.erl
@@ -312,8 +312,6 @@ terminate(normal, State) ->
terminate(Reason, State) ->
?LOG_ERROR("replicator terminating with reason ~p", [Reason]),
#state{
- context = Context,
- current_seq = Seq,
listeners = Listeners,
source = Source,
target = Target,
@@ -390,25 +388,26 @@ attachment_loop(ReqId, Conn) ->
exit(normal)
end.
-attachment_stub_converter(DbS, Id, Rev, {Name, {stub, Type, Length}}) ->
+att_stub_converter(DbS, Id, Rev,
+ #att{name=Name,data=stub,type=Type,len=Length}=Att) ->
#http_db{uri=DbUrl, headers=Headers} = DbS,
{Pos, [RevId|_]} = Rev,
Url = lists:flatten([DbUrl, url_encode(Id), "/", url_encode(?b2l(Name)),
"?rev=", ?b2l(couch_doc:rev_to_str({Pos,RevId}))]),
?LOG_DEBUG("Attachment URL ~s", [Url]),
- {ok, RcvFun} = make_attachment_stub_receiver(Url, Headers, Name,
+ {ok, RcvFun} = make_att_stub_receiver(Url, Headers, Name,
Type, Length),
- {Name, {Type, {RcvFun, Length}}}.
+ Att#att{name=Name,type=Type,data=RcvFun,len=Length}.
-make_attachment_stub_receiver(Url, Headers, Name, Type, Length) ->
- make_attachment_stub_receiver(Url, Headers, Name, Type, Length, 10, 1000).
+make_att_stub_receiver(Url, Headers, Name, Type, Length) ->
+ make_att_stub_receiver(Url, Headers, Name, Type, Length, 10, 1000).
-make_attachment_stub_receiver(Url, _Headers, _Name, _Type, _Length, 0, _Pause) ->
+make_att_stub_receiver(Url, _Headers, _Name, _Type, _Length, 0, _Pause) ->
?LOG_ERROR("streaming attachment request failed after 10 retries: ~s",
[Url]),
exit({attachment_request_failed, ?l2b(["failed to replicate ", Url])});
-make_attachment_stub_receiver(Url, Headers, Name, Type, Length, Retries, Pause) ->
+make_att_stub_receiver(Url, Headers, Name, Type, Length, Retries, Pause) ->
%% start the process that receives attachment data from ibrowse
#url{host=Host, port=Port} = ibrowse_lib:parse_url(Url),
{ok, Conn} = ibrowse:spawn_link_worker_process(Host, Port),
@@ -425,7 +424,7 @@ make_attachment_stub_receiver(Url, Headers, Name, Type, Length, Retries, Pause)
"seconds due to {error, ~p}: ~s", [Pause/1000, Reason, Url]),
catch ibrowse:stop_worker_process(Conn),
timer:sleep(Pause),
- make_attachment_stub_receiver(Url, Headers, Name, Type, Length,
+ make_att_stub_receiver(Url, Headers, Name, Type, Length,
Retries-1, 2*Pause)
end,
@@ -437,7 +436,7 @@ make_attachment_stub_receiver(Url, Headers, Name, Type, Length, Retries, Pause)
{'EXIT', Pid, _Reason} ->
catch ibrowse:stop_worker_process(Conn),
timer:sleep(Pause),
- make_attachment_stub_receiver(Url, Headers, Name, Type, Length,
+ make_att_stub_receiver(Url, Headers, Name, Type, Length,
Retries-1, 2*Pause)
end,
@@ -447,7 +446,7 @@ make_attachment_stub_receiver(Url, Headers, Name, Type, Length, Retries, Pause)
receive
{'EXIT', Pid, attachment_request_failed} ->
catch ibrowse:stop_worker_process(Conn),
- make_attachment_stub_receiver(Url, Headers, Name, Type, Length,
+ make_att_stub_receiver(Url, Headers, Name, Type, Length,
Retries-1, Pause);
{Pid, {status, StreamStatus, StreamHeaders}} ->
?LOG_DEBUG("streaming attachment Status ~p Headers ~p",
@@ -476,7 +475,7 @@ make_attachment_stub_receiver(Url, Headers, Name, Type, Length, Retries, Pause)
RedirectUrl = mochiweb_headers:get_value("Location",
mochiweb_headers:make(StreamHeaders)),
catch ibrowse:stop_worker_process(Conn),
- make_attachment_stub_receiver(RedirectUrl, Headers, Name, Type,
+ make_att_stub_receiver(RedirectUrl, Headers, Name, Type,
Length, Retries - 1, Pause);
ResponseCode >= 400, ResponseCode < 500 ->
% an error... log and fail
@@ -491,7 +490,7 @@ make_attachment_stub_receiver(Url, Headers, Name, Type, Length, Retries, Pause)
Pid ! {self(), fail},
catch ibrowse:stop_worker_process(Conn),
timer:sleep(Pause),
- make_attachment_stub_receiver(Url, Headers, Name, Type, Length,
+ make_att_stub_receiver(Url, Headers, Name, Type, Length,
Retries - 1, 2*Pause)
end
end.
@@ -772,10 +771,9 @@ open_doc_revs(#http_db{uri=DbUrl, headers=Headers} = DbS, DocId, Revs0,
fun({[{<<"missing">>, Rev}]}) ->
{{not_found, missing}, couch_doc:parse_rev(Rev)};
({[{<<"ok">>, JsonDoc}]}) ->
- #doc{id=Id, revs=Rev, attachments=Attach} = Doc =
+ #doc{id=Id, revs=Rev, atts=Atts} = Doc =
couch_doc:from_json_obj(JsonDoc),
- Attach2 = [attachment_stub_converter(DbS,Id,Rev,A) || A <- Attach],
- {ok, Doc#doc{attachments=Attach2}}
+ {ok, Doc#doc{atts=[att_stub_converter(DbS,Id,Rev,A) || A <- Atts]}}
end, JsonResults),
{ok, Results};
open_doc_revs(Db, DocId, Revs, Options) ->
diff --git a/src/couchdb/couch_stream.erl b/src/couchdb/couch_stream.erl
index 34dc5a07..65cf7126 100644
--- a/src/couchdb/couch_stream.erl
+++ b/src/couchdb/couch_stream.erl
@@ -24,7 +24,8 @@
-define(DEFAULT_STREAM_CHUNK, 16#00100000). % 1 meg chunks when streaming data
--export([open/1, close/1, write/2, foldl/4, old_foldl/5,old_copy_to_new_stream/4]).
+-export([open/1, close/1, write/2, foldl/4, foldl/5,
+ old_foldl/5,old_copy_to_new_stream/4]).
-export([copy_to_new_stream/3,old_read_term/2]).
-export([init/1, terminate/2, handle_call/3]).
-export([handle_cast/2,code_change/3,handle_info/2]).
@@ -37,7 +38,8 @@
buffer_list = [],
buffer_len = 0,
max_buffer = 4096,
- written_len = 0
+ written_len = 0,
+ md5
}).
@@ -79,6 +81,23 @@ foldl(Fd, [Pos|Rest], Fun, Acc) ->
{ok, Bin} = couch_file:pread_iolist(Fd, Pos),
foldl(Fd, Rest, Fun, Fun(Bin, Acc)).
+foldl(Fd, PosList, <<>>, Fun, Acc) ->
+ foldl(Fd, PosList, Fun, Acc);
+foldl(Fd, PosList, Md5, Fun, Acc) ->
+ foldl(Fd, PosList, Md5, erlang:md5_init(), Fun, Acc).
+
+
+foldl(_Fd, [], Md5, Md5Acc, _Fun, Acc) ->
+ Md5 = erlang:md5_final(Md5Acc),
+ Acc;
+foldl(Fd, [Pos], Md5, Md5Acc, Fun, Acc) ->
+ {ok, Bin} = couch_file:pread_iolist(Fd, Pos),
+ Md5 = erlang:md5_final(erlang:md5_update(Md5Acc, Bin)),
+ Fun(Bin, Acc);
+foldl(Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc) ->
+ {ok, Bin} = couch_file:pread_iolist(Fd, Pos),
+ foldl(Fd, Rest, Md5, erlang:md5_update(Md5Acc, Bin), Fun, Fun(Bin, Acc)).
+
write(_Pid, <<>>) ->
ok;
write(Pid, Bin) ->
@@ -86,7 +105,7 @@ write(Pid, Bin) ->
init(Fd) ->
- {ok, #stream{fd = Fd}}.
+ {ok, #stream{fd=Fd, md5=erlang:md5_init()}}.
terminate(_Reason, _Stream) ->
ok.
@@ -99,14 +118,18 @@ handle_call({write, Bin}, _From, Stream) ->
written_pointers = Written,
buffer_len = BufferLen,
buffer_list = Buffer,
- max_buffer = Max} = Stream,
+ max_buffer = Max,
+ md5 = Md5} = Stream,
if BinSize + BufferLen > Max ->
- {ok, Pos} = couch_file:append_binary(Fd, lists:reverse(Buffer, [Bin])),
+ WriteBin = lists:reverse(Buffer, [Bin]),
+ Md5_2 = erlang:md5_update(Md5, WriteBin),
+ {ok, Pos} = couch_file:append_binary(Fd, WriteBin),
{reply, ok, Stream#stream{
written_len=WrittenLen + BufferLen + BinSize,
written_pointers=[Pos|Written],
buffer_list=[],
- buffer_len=0}};
+ buffer_len=0,
+ md5=Md5_2}};
true ->
{reply, ok, Stream#stream{
buffer_list=[Bin|Buffer],
@@ -118,14 +141,17 @@ handle_call(close, _From, Stream) ->
written_len = WrittenLen,
written_pointers = Written,
buffer_len = BufferLen,
- buffer_list = Buffer} = Stream,
+ buffer_list = Buffer,
+ md5 = Md5} = Stream,
case Buffer of
[] ->
- Result = {lists:reverse(Written), WrittenLen};
+ Result = {lists:reverse(Written), WrittenLen, erlang:md5_final(Md5)};
_ ->
- {ok, Pos} = couch_file:append_binary(Fd, lists:reverse(Buffer)),
- Result = {lists:reverse(Written, [Pos]), WrittenLen + BufferLen}
+ WriteBin = lists:reverse(Buffer),
+ Md5Final = erlang:md5_final(erlang:md5_update(Md5, WriteBin)),
+ {ok, Pos} = couch_file:append_binary(Fd, WriteBin),
+ Result = {lists:reverse(Written, [Pos]), WrittenLen + BufferLen, Md5Final}
end,
{stop, normal, Result, Stream}.
diff --git a/src/couchdb/couch_util.erl b/src/couchdb/couch_util.erl
index 436dee38..817572bb 100644
--- a/src/couchdb/couch_util.erl
+++ b/src/couchdb/couch_util.erl
@@ -226,8 +226,8 @@ should_flush(MemThreshHold) ->
%% Take 3 bytes a time (3 x 8 = 24 bits), and make 4 characters out of
%% them (4 x 6 = 24 bits).
%%
-encodeBase64(Bs) when list(Bs) ->
- encodeBase64(list_to_binary(Bs), <<>>);
+encodeBase64(Bs) when is_list(Bs) ->
+ encodeBase64(iolist_to_binary(Bs), <<>>);
encodeBase64(Bs) ->
encodeBase64(Bs, <<>>).
diff --git a/src/couchdb/couch_view.erl b/src/couchdb/couch_view.erl
index 8842c44b..a0819ef5 100644
--- a/src/couchdb/couch_view.erl
+++ b/src/couchdb/couch_view.erl
@@ -78,7 +78,7 @@ cleanup_index_files(Db) ->
{ok, DesignDocs} = couch_db:get_design_docs(Db),
% make unique list of group sigs
- Sigs = lists:map(fun(#doc{id = GroupId} = DDoc) ->
+ Sigs = lists:map(fun(#doc{id = GroupId}) ->
{ok, Info} = get_group_info(Db, GroupId),
?b2l(proplists:get_value(signature, Info))
end, [DD||DD <- DesignDocs, DD#doc.deleted == false]),
@@ -100,7 +100,7 @@ cleanup_index_files(Db) ->
list_index_files(Db) ->
% call server to fetch the index files
RootDir = couch_config:get("couchdb", "view_index_dir"),
- Files = filelib:wildcard(RootDir ++ "/." ++ ?b2l(couch_db:name(Db)) ++ "_design"++"/*").
+ filelib:wildcard(RootDir ++ "/." ++ ?b2l(couch_db:name(Db)) ++ "_design"++"/*").
get_row_count(#view{btree=Bt}) ->
diff --git a/src/couchdb/couch_view_group.erl b/src/couchdb/couch_view_group.erl
index c4b495b0..57c3ad21 100644
--- a/src/couchdb/couch_view_group.erl
+++ b/src/couchdb/couch_view_group.erl
@@ -55,15 +55,6 @@ request_group_info(Pid) ->
throw(Error)
end.
-request_index_files(Pid) ->
- case gen_server:call(Pid, request_index_files) of
- {ok, Filelist} ->
- {ok, Filelist};
- Error ->
- throw(Error)
- end.
-
-
% from template
start_link(InitArgs) ->
case gen_server:start_link(couch_view_group,