diff options
author | Damien F. Katz <damien@apache.org> | 2009-07-17 21:33:41 +0000 |
---|---|---|
committer | Damien F. Katz <damien@apache.org> | 2009-07-17 21:33:41 +0000 |
commit | 91bf33fdc69c2087707795b8822b0fa7617f8709 (patch) | |
tree | f01ba98e03b77923d8256e9dd7cdca997ba06cf2 | |
parent | 627562eef0290bcf07659f40d7ba85c333978ce6 (diff) |
Deterministic revids, MD5 checking of documents, added tracking of rev when an attachment is edited to allow attachment level replication.
git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@795232 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | share/www/script/test/bulk_docs.js | 2 | ||||
-rw-r--r-- | share/www/script/test/recreate_doc.js | 8 | ||||
-rw-r--r-- | src/couchdb/couch_db.erl | 304 | ||||
-rw-r--r-- | src/couchdb/couch_db.hrl | 18 | ||||
-rw-r--r-- | src/couchdb/couch_db_update_notifier_sup.erl | 1 | ||||
-rw-r--r-- | src/couchdb/couch_db_updater.erl | 182 | ||||
-rw-r--r-- | src/couchdb/couch_doc.erl | 128 | ||||
-rw-r--r-- | src/couchdb/couch_file.erl | 25 | ||||
-rw-r--r-- | src/couchdb/couch_httpd_db.erl | 101 | ||||
-rw-r--r-- | src/couchdb/couch_key_tree.erl | 13 | ||||
-rw-r--r-- | src/couchdb/couch_rep.erl | 32 | ||||
-rw-r--r-- | src/couchdb/couch_stream.erl | 46 | ||||
-rw-r--r-- | src/couchdb/couch_util.erl | 4 | ||||
-rw-r--r-- | src/couchdb/couch_view.erl | 4 | ||||
-rw-r--r-- | src/couchdb/couch_view_group.erl | 9 |
15 files changed, 522 insertions, 355 deletions
diff --git a/share/www/script/test/bulk_docs.js b/share/www/script/test/bulk_docs.js index 88a4313e..972accf3 100644 --- a/share/www/script/test/bulk_docs.js +++ b/share/www/script/test/bulk_docs.js @@ -96,5 +96,5 @@ couchTests.bulk_docs = function(debug) { update = {"_id": newdoc._id, "_rev": newdoc._rev, "body": "blam"}; torem = {"_id": newdoc._id, "_rev": newdoc._rev, "_deleted": true}; results = db.bulkSave([update, torem]); - T(results[1].error == "conflict"); + T(results[0].error == "conflict" || results[1].error == "conflict"); }; diff --git a/share/www/script/test/recreate_doc.js b/share/www/script/test/recreate_doc.js index ca64265d..75e34618 100644 --- a/share/www/script/test/recreate_doc.js +++ b/share/www/script/test/recreate_doc.js @@ -29,11 +29,7 @@ couchTests.recreate_doc = function(debug) { T(db.save(doc).ok); doc = db.open("foo"); doc.a = "baz"; - try { - T(db.save(doc).ok); - } finally { - // And now, we can't even delete the document anymore :/ - T(db.deleteDoc(doc).rev != undefined); - } + T(db.save(doc).ok); + T(db.deleteDoc(doc).rev != undefined); } }; diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl index 9cb887ea..9dc1fce8 100644 --- a/src/couchdb/couch_db.erl +++ b/src/couchdb/couch_db.erl @@ -24,7 +24,7 @@ -export([increment_update_seq/1,get_purge_seq/1,purge_docs/2,get_last_purged/1]). -export([start_link/3,open_doc_int/3,set_admins/2,get_admins/1,ensure_full_commit/1]). -export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]). --export([changes_since/5,read_doc/2]). +-export([changes_since/5,read_doc/2,new_revid/1]). -include("couch_db.hrl"). @@ -295,11 +295,11 @@ validate_doc_update(#db{name=DbName,user_ctx=Ctx}=Db, Doc, GetDiskDocFun) -> end. -prep_and_validate_update(Db, #doc{id=Id,revs={RevStart, [_NewRev|PrevRevs]}}=Doc, - OldFullDocInfo, LeafRevsDict) -> - case PrevRevs of +prep_and_validate_update(Db, #doc{id=Id,revs={RevStart, Revs}}=Doc, + OldFullDocInfo, LeafRevsDict, AllowConflict) -> + case Revs of [PrevRev|_] -> - case dict:find({RevStart-1, PrevRev}, LeafRevsDict) of + case dict:find({RevStart, PrevRev}, LeafRevsDict) of {ok, {Deleted, DiskSp, DiskRevs}} -> case couch_doc:has_stubs(Doc) of true -> @@ -310,13 +310,21 @@ prep_and_validate_update(Db, #doc{id=Id,revs={RevStart, [_NewRev|PrevRevs]}}=Doc LoadDiskDoc = fun() -> make_doc(Db,Id,Deleted,DiskSp,DiskRevs) end, {validate_doc_update(Db, Doc, LoadDiskDoc), Doc} end; + error when AllowConflict -> + {validate_doc_update(Db, Doc, fun() -> nil end), Doc}; error -> {conflict, Doc} end; [] -> % new doc, and we have existing revs. + % reuse existing deleted doc if OldFullDocInfo#full_doc_info.deleted -> % existing docs are deletions + #doc_info{revs=[#rev_info{rev={Pos, DelRevId}}|_]} = + couch_doc:to_doc_info(OldFullDocInfo), + Doc2 = Doc#doc{revs={Pos, [DelRevId]}}, + {validate_doc_update(Db, Doc2, fun() -> nil end), Doc2}; + AllowConflict -> {validate_doc_update(Db, Doc, fun() -> nil end), Doc}; true -> {conflict, Doc} @@ -325,49 +333,51 @@ prep_and_validate_update(Db, #doc{id=Id,revs={RevStart, [_NewRev|PrevRevs]}}=Doc -prep_and_validate_updates(_Db, [], [], AccPrepped, AccFatalErrors) -> +prep_and_validate_updates(_Db, [], [], _AllowConflict, AccPrepped, + AccFatalErrors) -> {AccPrepped, AccFatalErrors}; -prep_and_validate_updates(Db, [DocBucket|RestBuckets], [not_found|RestLookups], AccPrepped, AccErrors) -> +prep_and_validate_updates(Db, [DocBucket|RestBuckets], [not_found|RestLookups], + AllowConflict, AccPrepped, AccErrors) -> [#doc{id=Id}|_]=DocBucket, % no existing revs are known, {PreppedBucket, AccErrors3} = lists:foldl( fun(#doc{revs=Revs}=Doc, {AccBucket, AccErrors2}) -> case Revs of - {Pos, [NewRev,_OldRev|_]} -> - % old revs specified but none exist, a conflict - {AccBucket, [{{Id, {Pos, NewRev}}, conflict} | AccErrors2]}; - {Pos, [NewRev]} -> + {0, []} -> case validate_doc_update(Db, Doc, fun() -> nil end) of ok -> {[Doc | AccBucket], AccErrors2}; Error -> - {AccBucket, [{{Id, {Pos, NewRev}}, Error} | AccErrors2]} - end + {AccBucket, [{{Id, {0, []}}, Error} | AccErrors2]} + end; + _ -> + % old revs specified but none exist, a conflict + {AccBucket, [{{Id, Revs}, conflict} | AccErrors2]} end end, {[], AccErrors}, DocBucket), - prep_and_validate_updates(Db, RestBuckets, RestLookups, + prep_and_validate_updates(Db, RestBuckets, RestLookups, AllowConflict, [PreppedBucket | AccPrepped], AccErrors3); prep_and_validate_updates(Db, [DocBucket|RestBuckets], [{ok, #full_doc_info{rev_tree=OldRevTree}=OldFullDocInfo}|RestLookups], - AccPrepped, AccErrors) -> + AllowConflict, AccPrepped, AccErrors) -> Leafs = couch_key_tree:get_all_leafs(OldRevTree), LeafRevsDict = dict:from_list([{{Start, RevId}, {Deleted, Sp, Revs}} || {{Deleted, Sp, _Seq}, {Start, [RevId|_]}=Revs} <- Leafs]), {PreppedBucket, AccErrors3} = lists:foldl( fun(Doc, {Docs2Acc, AccErrors2}) -> case prep_and_validate_update(Db, Doc, OldFullDocInfo, - LeafRevsDict) of + LeafRevsDict, AllowConflict) of {ok, Doc2} -> {[Doc2 | Docs2Acc], AccErrors2}; - {Error, #doc{id=Id,revs={Pos, [NewRev|_]}}} -> + {Error, #doc{id=Id,revs=Revs}} -> % Record the error - {Docs2Acc, [{{Id, {Pos, NewRev}}, Error} |AccErrors2]} + {Docs2Acc, [{{Id, Revs}, Error} |AccErrors2]} end end, {[], AccErrors}, DocBucket), - prep_and_validate_updates(Db, RestBuckets, RestLookups, [PreppedBucket | AccPrepped], AccErrors3). + prep_and_validate_updates(Db, RestBuckets, RestLookups, AllowConflict, [PreppedBucket | AccPrepped], AccErrors3). update_docs(#db{update_pid=UpdatePid}=Db, Docs, Options) -> @@ -408,32 +418,12 @@ prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldI {ok, {Start, Path}} -> % our unflushed doc is a leaf node. Go back on the path % to find the previous rev that's on disk. - PrevRevResult = - case couch_doc:has_stubs(Doc) of - true -> - [_PrevRevFull | [PrevRevFull | _]=PrevPath] = Path, - case PrevRevFull of - {_RevId, ?REV_MISSING} -> - conflict; - {RevId, {IsDel, DiskSp, _Seq}} -> - DiskDoc = make_doc(Db, Id, IsDel, DiskSp, PrevPath), - Doc2 = couch_doc:merge_stubs(Doc, DiskDoc), - {ok, Doc2, fun() -> DiskDoc end} - end; - false -> - {ok, Doc, - fun() -> + LoadPrevRevFun = fun() -> make_first_doc_on_disk(Db,Id,Start-1, tl(Path)) - end} - end, - case PrevRevResult of - {ok, NewDoc, LoadPrevRevFun} -> - case validate_doc_update(Db, NewDoc, LoadPrevRevFun) of - ok -> - {[NewDoc | AccValidated], AccErrors2}; - Error -> - {AccValidated, [{NewDoc, Error} | AccErrors2]} - end; + end, + case validate_doc_update(Db, Doc, LoadPrevRevFun) of + ok -> + {[Doc | AccValidated], AccErrors2}; Error -> {AccValidated, [{Doc, Error} | AccErrors2]} end; @@ -444,9 +434,47 @@ prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldI end end, {[], AccErrors}, Bucket), - prep_and_validate_replicated_updates(Db, RestBuckets, RestOldInfo, [ValidatedBucket | AccPrepped], AccErrors3) + prep_and_validate_replicated_updates(Db, RestBuckets, RestOldInfo, + [ValidatedBucket | AccPrepped], AccErrors3) end. + + +new_revid(#doc{body=Body,revs={OldStart,OldRevs}, + atts=Atts,deleted=Deleted}) -> + case [{N, T, M} || #att{name=N,type=T,md5=M} <- Atts, M /= <<>>] of + Atts2 when length(Atts) /= length(Atts2) -> + % We must have old style non-md5 attachments + ?l2b(integer_to_list(couch_util:rand32())); + Atts2 -> + OldRev = case OldRevs of [] -> 0; [OldRev0|_] -> OldRev0 end, + erlang:md5(term_to_binary([Deleted, OldStart, OldRev, Body, Atts2])) + end. + +new_revs([], OutBuckets, IdRevsAcc) -> + {lists:reverse(OutBuckets), IdRevsAcc}; +new_revs([Bucket|RestBuckets], OutBuckets, IdRevsAcc) -> + {NewBucket, IdRevsAcc3} = lists:mapfoldl( + fun(#doc{id=Id,revs={Start, RevIds}}=Doc, IdRevsAcc2)-> + NewRevId = new_revid(Doc), + {Doc#doc{revs={Start+1, [NewRevId | RevIds]}}, + [{{Id, {Start, RevIds}}, {ok, {Start+1, NewRevId}}} | IdRevsAcc2]} + end, IdRevsAcc, Bucket), + new_revs(RestBuckets, [NewBucket|OutBuckets], IdRevsAcc3). + +check_dup_atts([#att{name=N1}, #att{name=N2} | _]) when N1 == N2 -> + throw({bad_request, <<"Duplicate attachments">>}); +check_dup_atts([_, _ | Rest]) -> + check_dup_atts(Rest); +check_dup_atts(_) -> + ok. + +sort_and_check_atts(#doc{atts=Atts}=Doc) -> + Atts2 = lists:sort(fun(#att{name=N1}, #att{name=N2}) -> N1 < N2 end, Atts), + check_dup_atts(Atts2), + Doc#doc{atts=Atts2}. + + update_docs(Db, Docs, Options, replicated_changes) -> couch_stats_collector:increment({couchdb, database_writes}), DocBuckets = group_alike_docs(Docs), @@ -467,70 +495,74 @@ update_docs(Db, Docs, Options, replicated_changes) -> DocErrors = [], DocBuckets3 = DocBuckets end, - {ok, []} = write_and_commit(Db, DocBuckets3, [merge_conflicts | Options]), + DocBuckets4 = [[doc_flush_atts(sort_and_check_atts(Doc), Db#db.fd) + || Doc <- Bucket] || Bucket <- DocBuckets3], + {ok, []} = write_and_commit(Db, DocBuckets4, [], [merge_conflicts | Options]), {ok, DocErrors}; update_docs(Db, Docs, Options, interactive_edit) -> couch_stats_collector:increment({couchdb, database_writes}), AllOrNothing = lists:member(all_or_nothing, Options), % go ahead and generate the new revision ids for the documents. - Docs2 = lists:map( - fun(#doc{id=Id,revs={Start, RevIds}}=Doc) -> + % separate out the NonRep documents from the rest of the documents + {Docs2, NonRepDocs} = lists:foldl( + fun(#doc{id=Id}=Doc, {DocsAcc, NonRepDocsAcc}) -> case Id of <<?LOCAL_DOC_PREFIX, _/binary>> -> - Rev = case RevIds of [] -> 0; [Rev0|_] -> list_to_integer(?b2l(Rev0)) end, - Doc#doc{revs={Start, [?l2b(integer_to_list(Rev + 1))]}}; - _ -> - Doc#doc{revs={Start+1, [?l2b(integer_to_list(couch_util:rand32())) | RevIds]}} + {DocsAcc, [Doc | NonRepDocsAcc]}; + Id-> + {[Doc | DocsAcc], NonRepDocsAcc} end - end, Docs), + end, {[], []}, Docs), + DocBuckets = group_alike_docs(Docs2), case (Db#db.validate_doc_funs /= []) orelse lists:any( fun(#doc{id= <<?DESIGN_DOC_PREFIX, _/binary>>}) -> true; - (#doc{attachments=Atts}) -> + (#doc{atts=Atts}) -> Atts /= [] - end, Docs) of + end, Docs2) of true -> % lookup the doc by id and get the most recent Ids = [Id || [#doc{id=Id}|_] <- DocBuckets], ExistingDocInfos = get_full_doc_infos(Db, Ids), - {DocBucketsPrepped, Failures} = - case AllOrNothing of - true -> - prep_and_validate_replicated_updates(Db, DocBuckets, - ExistingDocInfos, [], []); - false -> - prep_and_validate_updates(Db, DocBuckets, ExistingDocInfos, [], []) - end, + {DocBucketsPrepped, PreCommitFailures} = prep_and_validate_updates(Db, + DocBuckets, ExistingDocInfos, AllOrNothing, [], []), % strip out any empty buckets DocBuckets2 = [Bucket || [_|_] = Bucket <- DocBucketsPrepped]; false -> - Failures = [], + PreCommitFailures = [], DocBuckets2 = DocBuckets end, - if (AllOrNothing) and (Failures /= []) -> - {aborted, Failures}; + if (AllOrNothing) and (PreCommitFailures /= []) -> + {aborted, lists:map( + fun({{Id,{Pos, [RevId|_]}}, Error}) -> + {{Id, {Pos, RevId}}, Error}; + ({{Id,{0, []}}, Error}) -> + {{Id, {0, <<>>}}, Error} + end, PreCommitFailures)}; true -> Options2 = if AllOrNothing -> [merge_conflicts]; true -> [] end ++ Options, - {ok, CommitFailures} = write_and_commit(Db, DocBuckets2, Options2), - FailDict = dict:from_list(CommitFailures ++ Failures), - % the output for each is either {ok, NewRev} or Error + DocBuckets3 = [[ + doc_flush_atts(set_new_att_revpos( + sort_and_check_atts(Doc)), Db#db.fd) + || Doc <- B] || B <- DocBuckets2], + {DocBuckets4, IdRevs} = new_revs(DocBuckets3, [], []), + + {ok, CommitResults} = write_and_commit(Db, DocBuckets4, NonRepDocs, Options2), + + ResultsDict = dict:from_list(IdRevs ++ CommitResults ++ PreCommitFailures), {ok, lists:map( - fun(#doc{id=Id,revs={Pos, [NewRevId|_]}}) -> - case dict:find({Id, {Pos, NewRevId}}, FailDict) of - {ok, Error} -> - Error; - error -> - {ok, {Pos, NewRevId}} - end - end, Docs2)} + fun(#doc{id=Id,revs={Pos, RevIds}}) -> + {ok, Result} = dict:find({Id, {Pos, RevIds}}, ResultsDict), + Result + end, Docs)} end. % Returns the first available document on disk. Input list is a full rev path @@ -545,78 +577,81 @@ make_first_doc_on_disk(Db, Id, Pos, [{_Rev, {IsDel, Sp, _Seq}} |_]=DocPath) -> write_and_commit(#db{update_pid=UpdatePid, user_ctx=Ctx}=Db, DocBuckets, - Options) -> - % flush unwritten binaries to disk. - DocBuckets2 = [[doc_flush_binaries(Doc, Db#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets], - case gen_server:call(UpdatePid, {update_docs, DocBuckets2, Options}, infinity) of - {ok, Conflicts} -> {ok, Conflicts}; + NonRepDocs, Options) -> + case gen_server:call(UpdatePid, + {update_docs, DocBuckets, NonRepDocs, Options}, infinity) of + {ok, Results} -> {ok, Results}; retry -> % This can happen if the db file we wrote to was swapped out by % compaction. Retry by reopening the db and writing to the current file {ok, Db2} = open_ref_counted(Db#db.main_pid, Ctx), - DocBuckets3 = [[doc_flush_binaries(Doc, Db2#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets], + DocBuckets2 = [[doc_flush_atts(Doc, Db2#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets], % We only retry once close(Db2), - case gen_server:call(UpdatePid, {update_docs, DocBuckets3, Options}, infinity) of - {ok, Conflicts} -> {ok, Conflicts}; + case gen_server:call(UpdatePid, {update_docs, DocBuckets2, NonRepDocs, Options}, infinity) of + {ok, Results} -> {ok, Results}; retry -> throw({update_error, compaction_retry}) end end. +set_new_att_revpos(#doc{revs={RevPos,_Revs},atts=Atts}=Doc) -> + Doc#doc{atts= lists:map(fun(#att{data={_Fd,_Sp}}=Att) -> + % already commited to disk, do not set new rev + Att; + (Att) -> + Att#att{revpos=RevPos+1} + end, Atts)}. + -doc_flush_binaries(Doc, Fd) -> - NewAttachments = lists:map( - fun({Key, {Type, BinValue}}) -> - NewBinValue = flush_binary(Fd, BinValue), - {Key, {Type, NewBinValue}} - end, Doc#doc.attachments), - Doc#doc{attachments = NewAttachments}. +doc_flush_atts(Doc, Fd) -> + Doc#doc{atts=[flush_att(Fd, Att) || Att <- Doc#doc.atts]}. -flush_binary(Fd, {Fd0, StreamPointer, Len}) when Fd0 == Fd -> - % already written to our file, nothing to write - {Fd, StreamPointer, Len}; +check_md5(_NewSig, <<>>) -> ok; +check_md5(Sig1, Sig2) when Sig1 == Sig2 -> ok; +check_md5(_, _) -> throw(data_corruption). -flush_binary(Fd, {OtherFd, StreamPointer, Len}) when is_tuple(StreamPointer) -> - {NewStreamData, Len} = - couch_stream:old_copy_to_new_stream(OtherFd, StreamPointer, Len, Fd), - {Fd, NewStreamData, Len}; +flush_att(Fd, #att{data={Fd0, _}}=Att) when Fd0 == Fd -> + % already written to our file, nothing to write + Att; -flush_binary(Fd, {OtherFd, StreamPointer, Len}) -> - {NewStreamData, Len} = +flush_att(Fd, #att{data={OtherFd,StreamPointer}, md5=InMd5}=Att) -> + {NewStreamData, Len, Md5} = couch_stream:copy_to_new_stream(OtherFd, StreamPointer, Fd), - {Fd, NewStreamData, Len}; + check_md5(Md5, InMd5), + Att#att{data={Fd, NewStreamData}, md5=Md5, len=Len}; -flush_binary(Fd, Bin) when is_binary(Bin) -> - with_stream(Fd, fun(OutputStream) -> - couch_stream:write(OutputStream, Bin) +flush_att(Fd, #att{data=Data}=Att) when is_binary(Data) -> + with_stream(Fd, Att, fun(OutputStream) -> + couch_stream:write(OutputStream, Data) end); -flush_binary(Fd, {StreamFun, undefined}) when is_function(StreamFun) -> - with_stream(Fd, fun(OutputStream) -> - % StreamFun(MaxChunkSize, WriterFun) must call WriterFun +flush_att(Fd, #att{data=Fun,len=undefined}=Att) when is_function(Fun) -> + with_stream(Fd, Att, fun(OutputStream) -> + % Fun(MaxChunkSize, WriterFun) must call WriterFun % once for each chunk of the attachment, - StreamFun(4096, + Fun(4096, % WriterFun({Length, Binary}, State) % WriterFun({0, _Footers}, State) % Called with Length == 0 on the last time. % WriterFun returns NewState. fun({0, _Footers}, _) -> ok; - ({_Length, Bin}, _) -> - couch_stream:write(OutputStream, Bin) + ({_Length, Chunk}, _) -> + couch_stream:write(OutputStream, Chunk) end, ok) end); -flush_binary(Fd, {Fun, Len}) when is_function(Fun) -> - with_stream(Fd, fun(OutputStream) -> +flush_att(Fd, #att{data=Fun,len=Len}=Att) when is_function(Fun) -> + with_stream(Fd, Att, fun(OutputStream) -> write_streamed_attachment(OutputStream, Fun, Len) end). -with_stream(Fd, Fun) -> +with_stream(Fd, #att{md5=InMd5}=Att, Fun) -> {ok, OutputStream} = couch_stream:open(Fd), Fun(OutputStream), - {StreamInfo, Len} = couch_stream:close(OutputStream), - {Fd, StreamInfo, Len}. + {StreamInfo, Len, Md5} = couch_stream:close(OutputStream), + check_md5(Md5, InMd5), + Att#att{data={Fd,StreamInfo},len=Len,md5=Md5}. write_streamed_attachment(_Stream, _F, 0) -> @@ -832,11 +867,15 @@ doc_meta_info(#doc_info{high_seq=Seq,revs=[#rev_info{rev=Rev}|RestInfo]}, RevTre true -> [{local_seq, Seq}] end. -read_doc(Fd, Pos) when is_integer(Pos) -> - couch_file:pread_term(Fd, Pos); -read_doc(Fd, OldStyleStreamPointer) -> +read_doc(#db{fd=Fd}, OldStreamPointer) when is_tuple(OldStreamPointer) -> + % 09 UPGRADE CODE + couch_stream:old_read_term(Fd, OldStreamPointer); +read_doc(#db{header=#db_header{disk_version=Version},fd=Fd}, Pos) + when Version == 3 -> % 09 UPGRADE CODE - couch_stream:old_read_term(Fd, OldStyleStreamPointer). + couch_file:pread_term(Fd, Pos); +read_doc(#db{fd=Fd}, Pos) -> + couch_file:pread_term_md5(Fd, Pos). doc_to_tree(#doc{revs={Start, RevIds}}=Doc) -> @@ -850,21 +889,36 @@ doc_to_tree_simple(Doc, [RevId | Rest]) -> [{RevId, ?REV_MISSING, doc_to_tree_simple(Doc, Rest)}]. -make_doc(#db{fd=Fd}, Id, Deleted, Bp, RevisionPath) -> - {BodyData, BinValues} = +make_doc(#db{fd=Fd}=Db, Id, Deleted, Bp, RevisionPath) -> + {BodyData, Atts} = case Bp of nil -> {[], []}; _ -> - {ok, {BodyData0, BinValues0}} = read_doc(Fd, Bp), + {ok, {BodyData0, Atts0}} = read_doc(Db, Bp), {BodyData0, - [{Name,{Type,{Fd,Sp,Len}}} || {Name,{Type,Sp,Len}} <- BinValues0]} + lists:map( + fun({Name,Type,Sp,Len,RevPos,Md5}) -> + #att{name=Name, + type=Type, + len=Len, + md5=Md5, + revpos=RevPos, + data={Fd,Sp}}; + ({Name,{Type,Sp,Len}}) -> + #att{name=Name, + type=Type, + len=Len, + md5= <<>>, + revpos=0, + data={Fd,Sp}} + end, Atts0)} end, #doc{ id = Id, revs = RevisionPath, body = BodyData, - attachments = BinValues, + atts = Atts, deleted = Deleted }. diff --git a/src/couchdb/couch_db.hrl b/src/couchdb/couch_db.hrl index c1f97144..4eda42e6 100644 --- a/src/couchdb/couch_db.hrl +++ b/src/couchdb/couch_db.hrl @@ -79,11 +79,7 @@ % the json body object. body = {[]}, - % each attachment contains: - % {data, Type, <<binary>>} - % or: - % {pointer, Type, {FileHandle, StreamPointer, Length}} - attachments = [], + atts = [], % attachments deleted = false, @@ -93,6 +89,16 @@ }). +-record(att, + { + name, + type, + len, + md5= <<>>, + revpos=0, + data + }). + -record(user_ctx, {name=null, @@ -109,7 +115,7 @@ % if the disk revision is incremented, then new upgrade logic will need to be % added to couch_db_updater:init_db. --define(LATEST_DISK_VERSION, 3). +-define(LATEST_DISK_VERSION, 4). -record(db_header, {disk_version = ?LATEST_DISK_VERSION, diff --git a/src/couchdb/couch_db_update_notifier_sup.erl b/src/couchdb/couch_db_update_notifier_sup.erl index 290a041a..4d730fc7 100644 --- a/src/couchdb/couch_db_update_notifier_sup.erl +++ b/src/couchdb/couch_db_update_notifier_sup.erl @@ -29,7 +29,6 @@ start_link() -> couch_db_update_notifier_sup, []). init([]) -> - Self = self(), ok = couch_config:register( fun("update_notification", Key, Value) -> reload_config(Key, Value) end ), diff --git a/src/couchdb/couch_db_updater.erl b/src/couchdb/couch_db_updater.erl index dacf2515..fd1d340f 100644 --- a/src/couchdb/couch_db_updater.erl +++ b/src/couchdb/couch_db_updater.erl @@ -44,12 +44,15 @@ terminate(Reason, _Srv) -> handle_call(get_db, _From, Db) -> {reply, {ok, Db}, Db}; -handle_call({update_docs, DocActions, Options}, _From, Db) -> - try update_docs_int(Db, DocActions, Options) of - {ok, Conflicts, Db2} -> +handle_call({update_docs, GroupedDocs, NonRepDocs, Options}, _From, Db) -> + try update_docs_int(Db, GroupedDocs, NonRepDocs, Options) of + {ok, Failures, Db2} -> ok = gen_server:call(Db#db.main_pid, {db_updated, Db2}), - couch_db_update_notifier:notify({updated, Db2#db.name}), - {reply, {ok, Conflicts}, Db2} + if Db2#db.update_seq /= Db#db.update_seq -> + couch_db_update_notifier:notify({updated, Db2#db.name}); + true -> ok + end, + {reply, {ok, Failures}, Db2} catch throw: retry -> {reply, retry, Db} @@ -289,6 +292,7 @@ init_db(DbName, Filepath, Fd, Header0) -> case element(2, Header1) of 1 -> Header1#db_header{unused = 0}; % 0.9 2 -> Header1#db_header{unused = 0}; % post 0.9 and pre 0.10 + 3 -> Header1; % post 0.9 and pre 0.10 ?LATEST_DISK_VERSION -> Header1; _ -> throw({database_disk_version_error, "Incorrect disk header version"}) end, @@ -364,31 +368,39 @@ refresh_validate_doc_funs(Db) -> flush_trees(_Db, [], AccFlushedTrees) -> {ok, lists:reverse(AccFlushedTrees)}; -flush_trees(#db{fd=Fd}=Db, [InfoUnflushed | RestUnflushed], AccFlushed) -> +flush_trees(#db{fd=Fd,header=Header}=Db, + [InfoUnflushed | RestUnflushed], AccFlushed) -> #full_doc_info{update_seq=UpdateSeq, rev_tree=Unflushed} = InfoUnflushed, Flushed = couch_key_tree:map( fun(_Rev, Value) -> case Value of - #doc{attachments=Atts,deleted=IsDeleted}=Doc -> + #doc{atts=Atts,deleted=IsDeleted}=Doc -> % this node value is actually an unwritten document summary, % write to disk. - % make sure the Fd in the written bins is the same Fd we are. - Bins = + % make sure the Fd in the written bins is the same Fd we are + % and convert bins, removing the FD. + % All bins should have been written to disk already. + DiskAtts = case Atts of [] -> []; - [{_BName, {_Type, {BinFd, _Sp, _Len}}} | _ ] when BinFd == Fd -> - % convert bins, removing the FD. - % All bins should have been flushed to disk already. - [{BinName, {BinType, BinSp, BinLen}} - || {BinName, {BinType, {_Fd, BinSp, BinLen}}} + [#att{data={BinFd, _Sp}} | _ ] when BinFd == Fd -> + [{N,T,P,L,R,M} + || #att{name=N,type=T,data={_,P},md5=M,revpos=R,len=L} <- Atts]; _ -> % BinFd must not equal our Fd. This can happen when a database % is being switched out during a compaction - ?LOG_DEBUG("File where the attachments are written has changed. Possibly retrying.", []), + ?LOG_DEBUG("File where the attachments are written has" + " changed. Possibly retrying.", []), throw(retry) end, - {ok, NewSummaryPointer} = couch_file:append_term(Fd, {Doc#doc.body, Bins}), + {ok, NewSummaryPointer} = + case Header#db_header.disk_version < 4 of + true -> + couch_file:append_term(Fd, {Doc#doc.body, DiskAtts}); + false -> + couch_file:append_term_md5(Fd, {Doc#doc.body, DiskAtts}) + end, {IsDeleted, NewSummaryPointer, UpdateSeq}; _ -> Value @@ -403,14 +415,40 @@ merge_rev_trees(MergeConflicts, [NewDocs|RestDocsList], #full_doc_info{id=Id,rev_tree=OldTree,deleted=OldDeleted,update_seq=OldSeq} = OldDocInfo, {NewRevTree, NewConflicts} = lists:foldl( - fun(#doc{revs={Pos,[Rev|_]}}=NewDoc, {AccTree, AccConflicts2}) -> - case couch_key_tree:merge(AccTree, [couch_db:doc_to_tree(NewDoc)]) of - {_NewTree, conflicts} - when (not OldDeleted) and (not MergeConflicts) -> - {AccTree, [{{Id, {Pos,Rev}}, conflict} | AccConflicts2]}; - {NewTree, _} -> + fun(#doc{revs={Pos,[_Rev|PrevRevs]}}=NewDoc, {AccTree, AccConflicts2}) -> + if not MergeConflicts -> + case couch_key_tree:merge(AccTree, [couch_db:doc_to_tree(NewDoc)]) of + {_NewTree, conflicts} when (not OldDeleted) -> + {AccTree, [{{Id, {Pos-1,PrevRevs}}, conflict} | AccConflicts2]}; + {NewTree, no_conflicts} when AccTree == NewTree -> + % the tree didn't change at all + % meaning we are saving a rev that's already + % been editted again. + if (Pos == 1) and OldDeleted -> + % this means we are recreating a brand new document + % into a state that already existed before. + % put the rev into a subsequent edit of the deletion + #doc_info{revs=[#rev_info{rev={OldPos,OldRev}}|_]} = + couch_doc:to_doc_info(OldDocInfo), + NewRevId = couch_db:new_revid( + NewDoc#doc{revs={OldPos, [OldRev]}}), + NewDoc2 = NewDoc#doc{revs={OldPos + 1, [NewRevId, OldRev]}}, + {NewTree2, _} = couch_key_tree:merge(AccTree, + [couch_db:doc_to_tree(NewDoc2)]), + % we changed the rev id, this tells the caller we did. + {NewTree2, [{{Id, {Pos-1,PrevRevs}}, {ok, {OldPos + 1, NewRevId}}} + | AccConflicts2]}; + true -> + {AccTree, [{{Id, {Pos-1,PrevRevs}}, conflict} | AccConflicts2]} + end; + {NewTree, _} -> + {NewTree, AccConflicts2} + end; + true -> + {NewTree, _} = couch_key_tree:merge(AccTree, + [couch_db:doc_to_tree(NewDoc)]), {NewTree, AccConflicts2} - end + end end, {OldTree, AccConflicts}, NewDocs), if NewRevTree == OldTree -> @@ -444,26 +482,13 @@ stem_full_doc_infos(#db{revs_limit=Limit}, DocInfos) -> [Info#full_doc_info{rev_tree=couch_key_tree:stem(Tree, Limit)} || #full_doc_info{rev_tree=Tree}=Info <- DocInfos]. - -update_docs_int(Db, DocsList, Options) -> +update_docs_int(Db, DocsList, NonRepDocs, Options) -> #db{ fulldocinfo_by_id_btree = DocInfoByIdBTree, docinfo_by_seq_btree = DocInfoBySeqBTree, update_seq = LastSeq } = Db, - % separate out the NonRep documents from the rest of the documents - {DocsList2, NonRepDocs} = lists:foldl( - fun([#doc{id=Id}=Doc | _]=Docs, {DocsListAcc, NonRepDocsAcc}) -> - case Id of - <<?LOCAL_DOC_PREFIX, _/binary>> -> - {DocsListAcc, [Doc | NonRepDocsAcc]}; - Id-> - {[Docs | DocsListAcc], NonRepDocsAcc} - end - end, {[], []}, DocsList), - - Ids = [Id || [#doc{id=Id}|_] <- DocsList2], - + Ids = [Id || [#doc{id=Id}|_] <- DocsList], % lookup up the old documents, if they exist. OldDocLookups = couch_btree:lookup(DocInfoByIdBTree, Ids), OldDocInfos = lists:zipwith( @@ -477,7 +502,7 @@ update_docs_int(Db, DocsList, Options) -> % Merge the new docs into the revision trees. {ok, NewDocInfos0, RemoveSeqs, Conflicts, NewSeq} = merge_rev_trees( lists:member(merge_conflicts, Options), - DocsList2, OldDocInfos, [], [], [], LastSeq), + DocsList, OldDocInfos, [], [], [], LastSeq), NewFullDocInfos = stem_full_doc_infos(Db, NewDocInfos0), @@ -518,33 +543,43 @@ update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) -> Ids = [Id || #doc{id=Id} <- Docs], OldDocLookups = couch_btree:lookup(Btree, Ids), BtreeEntries = lists:zipwith( - fun(#doc{id=Id,deleted=Delete,revs={0,[RevStr]},body=Body}, OldDocLookup) -> - NewRev = list_to_integer(?b2l(RevStr)), + fun(#doc{id=Id,deleted=Delete,revs={0,PrevRevs},body=Body}, OldDocLookup) -> + case PrevRevs of + [RevStr|_] -> + PrevRev = list_to_integer(?b2l(RevStr)); + [] -> + PrevRev = 0 + end, OldRev = case OldDocLookup of {ok, {_, {OldRev0, _}}} -> OldRev0; not_found -> 0 end, - case OldRev + 1 == NewRev of + case OldRev == PrevRev of true -> case Delete of - false -> {update, {Id, {NewRev, Body}}}; - true -> {remove, Id} + false -> {update, {Id, {PrevRev + 1, PrevRevs, Body}}}; + true -> {remove, Id, PrevRevs} end; false -> - {conflict, {Id, {0, RevStr}}} + {conflict, {Id, {0, PrevRevs}}} end - end, Docs, OldDocLookups), - BtreeIdsRemove = [Id || {remove, Id} <- BtreeEntries], - BtreeIdsUpdate = [ByIdDocInfo || {update, ByIdDocInfo} <- BtreeEntries], - Conflicts = [{conflict, IdRev} || {conflict, IdRev} <- BtreeEntries], + BtreeIdsRemove = [Id || {remove, Id, _PrevRevs} <- BtreeEntries], + BtreeIdsUpdate = [{Id, {NewRev, Body}} || {update, {Id, {NewRev, _OldRevs, Body}}} <- BtreeEntries], + Results = + [{{Id, {0, PrevRevs}}, {ok, {0, <<"0">>}}} + || {remove, Id, PrevRevs} <- BtreeEntries] ++ + [{{Id, {0, PrevRevs}}, {ok, {0, ?l2b(integer_to_list(NewRev))}}} + || {update, {Id, {NewRev, PrevRevs, _Body}}} <- BtreeEntries] ++ + [{IdRevs, conflict} + || {conflict, IdRevs} <- BtreeEntries], {ok, Btree2} = couch_btree:add_remove(Btree, BtreeIdsUpdate, BtreeIdsRemove), - {ok, Conflicts, Db#db{local_docs_btree = Btree2}}. + {ok, Results, Db#db{local_docs_btree = Btree2}}. commit_data(Db) -> @@ -594,43 +629,42 @@ commit_data(#db{fd=Fd,header=OldHeader,fsync_options=FsyncOptions}=Db, Delay) -> end. -copy_doc_attachments(SrcFd, SrcSp, DestFd) -> - {ok, {BodyData, BinInfos}} = couch_db:read_doc(SrcFd, SrcSp), +copy_doc_attachments(#db{fd=SrcFd}=SrcDb, {Pos,_RevId}, SrcSp, DestFd) -> + {ok, {BodyData, BinInfos}} = couch_db:read_doc(SrcDb, SrcSp), % copy the bin values NewBinInfos = lists:map( fun({Name, {Type, BinSp, Len}}) when is_tuple(BinSp) orelse BinSp == null -> % 09 UPGRADE CODE - {NewBinSp, Len} = couch_stream:old_copy_to_new_stream(SrcFd, BinSp, Len, DestFd), - {Name, {Type, NewBinSp, Len}}; + {NewBinSp, Len, Md5} = couch_stream:old_copy_to_new_stream(SrcFd, BinSp, Len, DestFd), + {Name, Type, NewBinSp, Len, Pos, Md5}; ({Name, {Type, BinSp, Len}}) -> - {NewBinSp, Len} = couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd), - {Name, {Type, NewBinSp, Len}} + % 09 UPGRADE CODE + {NewBinSp, Len, Md5} = couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd), + {Name, Type, NewBinSp, Len, Pos, Md5}; + ({Name, Type, BinSp, Len, RevPos, Md5}) -> + {NewBinSp, Len, Md5} = couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd), + {Name, Type, NewBinSp, Len, RevPos, Md5} end, BinInfos), {BodyData, NewBinInfos}. -copy_rev_tree_attachments(_SrcFd, _DestFd, []) -> - []; -copy_rev_tree_attachments(SrcFd, DestFd, [{Start, Tree} | RestTree]) -> - % root nner node, only copy info/data from leaf nodes - [Tree2] = copy_rev_tree_attachments(SrcFd, DestFd, [Tree]), - [{Start, Tree2} | copy_rev_tree_attachments(SrcFd, DestFd, RestTree)]; -copy_rev_tree_attachments(SrcFd, DestFd, [{RevId, {IsDel, Sp, Seq}, []} | RestTree]) -> - % This is a leaf node, copy it over - DocBody = copy_doc_attachments(SrcFd, Sp, DestFd), - [{RevId, {IsDel, DocBody, Seq}, []} | copy_rev_tree_attachments(SrcFd, DestFd, RestTree)]; -copy_rev_tree_attachments(SrcFd, DestFd, [{RevId, _, SubTree} | RestTree]) -> - % inner node, only copy info/data from leaf nodes - [{RevId, ?REV_MISSING, copy_rev_tree_attachments(SrcFd, DestFd, SubTree)} | copy_rev_tree_attachments(SrcFd, DestFd, RestTree)]. - - -copy_docs(#db{fd=SrcFd}=Db, #db{fd=DestFd}=NewDb, InfoBySeq, Retry) -> +copy_rev_tree_attachments(SrcDb, DestFd, Tree) -> + couch_key_tree:map( + fun(Rev, {IsDel, Sp, Seq}, leaf) -> + DocBody = copy_doc_attachments(SrcDb, Rev, Sp, DestFd), + {IsDel, DocBody, Seq}; + (_, _, branch) -> + ?REV_MISSING + end, Tree). + + +copy_docs(Db, #db{fd=DestFd}=NewDb, InfoBySeq, Retry) -> Ids = [Id || #doc_info{id=Id} <- InfoBySeq], LookupResults = couch_btree:lookup(Db#db.fulldocinfo_by_id_btree, Ids), % write out the attachments NewFullDocInfos0 = lists:map( fun({ok, #full_doc_info{rev_tree=RevTree}=Info}) -> - Info#full_doc_info{rev_tree=copy_rev_tree_attachments(SrcFd, DestFd, RevTree)} + Info#full_doc_info{rev_tree=copy_rev_tree_attachments(Db, DestFd, RevTree)} end, LookupResults), % write out the docs % we do this in 2 stages so the docs are written out contiguously, making @@ -639,7 +673,7 @@ copy_docs(#db{fd=SrcFd}=Db, #db{fd=DestFd}=NewDb, InfoBySeq, Retry) -> fun(#full_doc_info{rev_tree=RevTree}=Info) -> Info#full_doc_info{rev_tree=couch_key_tree:map_leafs( fun(_Key, {IsDel, DocBody, Seq}) -> - {ok, Pos} = couch_file:append_term(DestFd, DocBody), + {ok, Pos} = couch_file:append_term_md5(DestFd, DocBody), {IsDel, Pos, Seq} end, RevTree)} end, NewFullDocInfos0), diff --git a/src/couchdb/couch_doc.erl b/src/couchdb/couch_doc.erl index 4c23155e..72b56d53 100644 --- a/src/couchdb/couch_doc.erl +++ b/src/couchdb/couch_doc.erl @@ -13,7 +13,7 @@ -module(couch_doc). -export([to_doc_info/1,to_doc_info_path/1,parse_rev/1,parse_revs/1,rev_to_str/1,rev_to_strs/1]). --export([bin_foldl/3,bin_size/1,bin_to_binary/1,get_validate_doc_fun/1]). +-export([att_foldl/3,get_validate_doc_fun/1]). -export([from_json_obj/1,to_json_obj/2,has_stubs/1, merge_stubs/2]). -export([validate_docid/1]). @@ -23,7 +23,7 @@ to_json_rev(0, []) -> []; to_json_rev(Start, [FirstRevId|_]) -> - [{<<"_rev">>, ?l2b([integer_to_list(Start),"-",FirstRevId])}]. + [{<<"_rev">>, ?l2b([integer_to_list(Start),"-",revid_to_str(FirstRevId)])}]. to_json_body(true, _Body) -> [{<<"_deleted">>, true}]; @@ -35,12 +35,18 @@ to_json_revisions(Options, Start, RevIds) -> false -> []; true -> [{<<"_revisions">>, {[{<<"start">>, Start}, - {<<"ids">>, RevIds}]}}] + {<<"ids">>, [revid_to_str(R) ||R <- RevIds]}]}}] end. -rev_to_str({Pos, RevId}) -> - ?l2b([integer_to_list(Pos),"-",RevId]). +revid_to_str(RevId) when size(RevId) == 16 -> + ?l2b(couch_util:to_hex(RevId)); +revid_to_str(RevId) -> + RevId. +rev_to_str({Pos, RevId}) -> + ?l2b([integer_to_list(Pos),"-",revid_to_str(RevId)]). + + rev_to_strs([]) -> []; rev_to_strs([{Pos, RevId}| Rest]) -> @@ -66,17 +72,12 @@ to_json_meta(Meta) -> to_json_attachment_stubs(Attachments) -> BinProps = lists:map( - fun({Name, {Type, {_RcvFun, Length}}}) -> - {Name, {[ - {<<"stub">>, true}, - {<<"content_type">>, Type}, - {<<"length">>, Length} - ]}}; - ({Name, {Type, BinValue}}) -> + fun(#att{name=Name,type=Type,len=Length,revpos=Pos}) -> {Name, {[ {<<"stub">>, true}, {<<"content_type">>, Type}, - {<<"length">>, bin_size(BinValue)} + {<<"length">>, Length}, + {<<"revpos">>, Pos} ]}} end, Attachments), @@ -85,24 +86,26 @@ to_json_attachment_stubs(Attachments) -> _ -> [{<<"_attachments">>, {BinProps}}] end. -to_json_attachments(Attachments) -> - BinProps = lists:map( - fun({Name, {Type, {RcvFun, Length}}}) -> - Data = read_streamed_attachment(RcvFun, Length, _Acc = []), - {Name, {[ - {<<"content_type">>, Type}, +to_json_attachments(Atts) -> + AttProps = lists:map( + fun(#att{data=Fun,len=Len}=Att) when is_function(Fun) -> + Data = read_streamed_attachment(Fun, Len, _Acc = []), + {Att#att.name, {[ + {<<"content_type">>, Att#att.type}, + {<<"revpos">>, Att#att.revpos}, {<<"data">>, couch_util:encodeBase64(Data)} ]}}; - ({Name, {Type, BinValue}}) -> - {Name, {[ - {<<"content_type">>, Type}, - {<<"data">>, couch_util:encodeBase64(bin_to_binary(BinValue))} + (Att) -> + {Att#att.name, {[ + {<<"content_type">>, Att#att.type}, + {<<"revpos">>, Att#att.revpos}, + {<<"data">>, couch_util:encodeBase64(att_to_iolist(Att))} ]}} end, - Attachments), - case BinProps of + Atts), + case AttProps of [] -> []; - _ -> [{<<"_attachments">>, {BinProps}}] + _ -> [{<<"_attachments">>, {AttProps}}] end. to_json_attachments(Attachments, Options) -> @@ -120,7 +123,7 @@ to_json_obj(#doc{id=Id,deleted=Del,body=Body,revs={Start, RevIds}, ++ to_json_body(Del, Body) ++ to_json_revisions(Options, Start, RevIds) ++ to_json_meta(Meta) - ++ to_json_attachments(Doc#doc.attachments, Options) + ++ to_json_attachments(Doc#doc.atts, Options) }. from_json_obj({Props}) -> @@ -129,12 +132,24 @@ from_json_obj({Props}) -> from_json_obj(_Other) -> throw({bad_request, "Document must be a JSON object"}). +parse_revid(RevId) when size(RevId) == 32 -> + RevInt = erlang:list_to_integer(?b2l(RevId), 16), + <<RevInt:128>>; +parse_revid(RevId) when length(RevId) == 32 -> + RevInt = erlang:list_to_integer(RevId, 16), + <<RevInt:128>>; +parse_revid(RevId) when is_binary(RevId) -> + RevId; +parse_revid(RevId) when is_list(RevId) -> + ?l2b(RevId). + + parse_rev(Rev) when is_binary(Rev) -> parse_rev(?b2l(Rev)); parse_rev(Rev) when is_list(Rev) -> SplitRev = lists:splitwith(fun($-) -> false; (_) -> true end, Rev), case SplitRev of - {Pos, [$- | RevId]} -> {list_to_integer(Pos), ?l2b(RevId)}; + {Pos, [$- | RevId]} -> {list_to_integer(Pos), parse_revid(RevId)}; _Else -> throw({bad_request, <<"Invalid rev format">>}) end; parse_rev(_BadRev) -> @@ -176,20 +191,23 @@ transfer_fields([{<<"_rev">>, _Rev} | Rest], Doc) -> transfer_fields(Rest,Doc); transfer_fields([{<<"_attachments">>, {JsonBins}} | Rest], Doc) -> - Bins = lists:flatmap(fun({Name, {BinProps}}) -> + Atts = lists:map(fun({Name, {BinProps}}) -> case proplists:get_value(<<"stub">>, BinProps) of true -> Type = proplists:get_value(<<"content_type">>, BinProps), Length = proplists:get_value(<<"length">>, BinProps), - [{Name, {stub, Type, Length}}]; + RevPos = proplists:get_value(<<"revpos">>, BinProps, 0), + #att{name=Name, data=stub, type=Type, len=Length, revpos=RevPos}; _ -> Value = proplists:get_value(<<"data">>, BinProps), Type = proplists:get_value(<<"content_type">>, BinProps, ?DEFAULT_ATTACHMENT_CONTENT_TYPE), - [{Name, {Type, couch_util:decodeBase64(Value)}}] + RevPos = proplists:get_value(<<"revpos">>, BinProps, 0), + Bin = couch_util:decodeBase64(Value), + #att{name=Name, data=Bin, type=Type, len=size(Bin), revpos=RevPos} end end, JsonBins), - transfer_fields(Rest, Doc#doc{attachments=Bins}); + transfer_fields(Rest, Doc#doc{atts=Atts}); transfer_fields([{<<"_revisions">>, {Props}} | Rest], Doc) -> RevIds = proplists:get_value(<<"ids">>, Props), @@ -203,7 +221,8 @@ transfer_fields([{<<"_revisions">>, {Props}} | Rest], Doc) -> end, [throw({doc_validation, "RevId isn't a string"}) || RevId <- RevIds, not is_binary(RevId)], - transfer_fields(Rest, Doc#doc{revs={Start, RevIds}}); + RevIds2 = [parse_revid(RevId) || RevId <- RevIds], + transfer_fields(Rest, Doc#doc{revs={Start, RevIds2}}); transfer_fields([{<<"_deleted">>, B} | Rest], Doc) when (B==true) or (B==false) -> transfer_fields(Rest, Doc#doc{deleted=B}); @@ -253,23 +272,20 @@ to_doc_info_path(#full_doc_info{id=Id,rev_tree=Tree}) -> -bin_foldl(Bin, Fun, Acc) when is_binary(Bin) -> +att_foldl(#att{data=Bin}, Fun, Acc) when is_binary(Bin) -> Fun(Bin, Acc); -bin_foldl({Fd, Sp, Len}, Fun, Acc) when is_tuple(Sp) orelse Sp == null -> +att_foldl(#att{data={Fd,Sp},len=Len}, Fun, Acc) when is_tuple(Sp) orelse Sp == null -> % 09 UPGRADE CODE couch_stream:old_foldl(Fd, Sp, Len, Fun, Acc); -bin_foldl({Fd, Sp, _Len}, Fun, Acc) -> - couch_stream:foldl(Fd, Sp, Fun, Acc). - -bin_size(Bin) when is_binary(Bin) -> - size(Bin); -bin_size({_Fd, _Sp, Len}) -> - Len. +att_foldl(#att{data={Fd,Sp},md5=Md5}, Fun, Acc) -> + couch_stream:foldl(Fd, Sp, Md5, Fun, Acc). -bin_to_binary(Bin) when is_binary(Bin) -> +att_to_iolist(#att{data=Bin}) when is_binary(Bin) -> Bin; -bin_to_binary({Fd, Sp, _Len}) -> - lists:reverse(couch_stream:foldl(Fd, Sp, fun(Bin,Acc) -> [Bin|Acc] end, [])). +att_to_iolist(#att{data=Iolist}) when is_list(Iolist) -> + Iolist; +att_to_iolist(#att{data={Fd,Sp},md5=Md5}) -> + lists:reverse(couch_stream:foldl(Fd, Sp, Md5, fun(Bin,Acc) -> [Bin|Acc] end, [])). get_validate_doc_fun(#doc{body={Props}}) -> Lang = proplists:get_value(<<"language">>, Props, <<"javascript">>), @@ -284,24 +300,24 @@ get_validate_doc_fun(#doc{body={Props}}) -> end. -has_stubs(#doc{attachments=Bins}) -> - has_stubs(Bins); +has_stubs(#doc{atts=Atts}) -> + has_stubs(Atts); has_stubs([]) -> false; -has_stubs([{_Name, {stub, _, _}}|_]) -> +has_stubs([#att{data=stub}|_]) -> true; -has_stubs([_Bin|Rest]) -> +has_stubs([_Att|Rest]) -> has_stubs(Rest). -merge_stubs(#doc{attachments=MemBins}=StubsDoc, #doc{attachments=DiskBins}) -> - BinDict = dict:from_list(DiskBins), +merge_stubs(#doc{atts=MemBins}=StubsDoc, #doc{atts=DiskBins}) -> + BinDict = dict:from_list([{Name, Att} || #att{name=Name}=Att <- DiskBins]), MergedBins = lists:map( - fun({Name, {stub, _, _}}) -> - {Name, dict:fetch(Name, BinDict)}; - ({Name, Value}) -> - {Name, Value} + fun(#att{name=Name, data=stub}) -> + dict:fetch(Name, BinDict); + (Att) -> + Att end, MemBins), - StubsDoc#doc{attachments= MergedBins}. + StubsDoc#doc{atts= MergedBins}. read_streamed_attachment(_RcvFun, 0, Acc) -> list_to_binary(lists:reverse(Acc)); diff --git a/src/couchdb/couch_file.erl b/src/couchdb/couch_file.erl index aec632fb..4c450163 100644 --- a/src/couchdb/couch_file.erl +++ b/src/couchdb/couch_file.erl @@ -25,6 +25,8 @@ -export([open/1, open/2, close/1, bytes/1, sync/1, append_binary/2,old_pread/3]). -export([append_term/2, pread_term/2, pread_iolist/2, write_header/2]). -export([pread_binary/2, read_header/1, truncate/2, upgrade_old_header/2]). +-export([append_term_md5/2, pread_iolist_md5/2, pread_binary_md5/2]). +-export([pread_term_md5/2]). -export([init/1, terminate/2, handle_call/3, handle_cast/2, code_change/3, handle_info/2]). %%---------------------------------------------------------------------- @@ -67,6 +69,9 @@ open(Filepath, Options) -> append_term(Fd, Term) -> append_binary(Fd, term_to_binary(Term)). + +append_term_md5(Fd, Term) -> + append_binary_md5(Fd, term_to_binary(Term)). %%---------------------------------------------------------------------- @@ -80,6 +85,11 @@ append_term(Fd, Term) -> append_binary(Fd, Bin) -> Size = iolist_size(Bin), gen_server:call(Fd, {append_bin, [<<Size:32/integer>>, Bin]}, infinity). + +append_binary_md5(Fd, Bin) -> + Size = iolist_size(Bin), + gen_server:call(Fd, {append_bin, + [<<Size:32/integer>>, erlang:md5(Bin), Bin]}, infinity). %%---------------------------------------------------------------------- @@ -94,6 +104,10 @@ pread_term(Fd, Pos) -> {ok, Bin} = pread_binary(Fd, Pos), {ok, binary_to_term(Bin)}. +pread_term_md5(Fd, Pos) -> + {ok, Bin} = pread_binary_md5(Fd, Pos), + {ok, binary_to_term(Bin)}. + %%---------------------------------------------------------------------- %% Purpose: Reads a binrary from a file that was written with append_binary @@ -111,6 +125,17 @@ pread_iolist(Fd, Pos) -> <<Len:32/integer>> = iolist_to_binary(LenIolist), {ok, Iolist, _} = read_raw_iolist(Fd, NextPos, Len), {ok, Iolist}. + +pread_binary_md5(Fd, Pos) -> + {ok, L} = pread_iolist_md5(Fd, Pos), + {ok, iolist_to_binary(L)}. + +pread_iolist_md5(Fd, Pos) -> + {ok, LenIolist, NextPos} =read_raw_iolist(Fd, Pos, 20), + <<Len:32/integer, Md5/binary>> = iolist_to_binary(LenIolist), + {ok, Iolist, _} = read_raw_iolist(Fd, NextPos, Len), + Md5 = erlang:md5(Iolist), + {ok, Iolist}. read_raw_iolist(Fd, Pos, Len) -> BlockOffset = Pos rem ?SIZE_BLOCK, diff --git a/src/couchdb/couch_httpd_db.erl b/src/couchdb/couch_httpd_db.erl index 594b6455..0ec1cbda 100644 --- a/src/couchdb/couch_httpd_db.erl +++ b/src/couchdb/couch_httpd_db.erl @@ -579,13 +579,15 @@ db_doc_req(#httpd{method='GET'}=Req, Db, DocId) -> [] -> Doc = couch_doc_open(Db, DocId, Rev, Options), DiskEtag = couch_httpd:doc_etag(Doc), - couch_httpd:etag_respond(Req, DiskEtag, fun() -> - Headers = case Doc#doc.meta of - [] -> [{"Etag", DiskEtag}]; % output etag only when we have no meta - _ -> [] - end, - send_json(Req, 200, Headers, couch_doc:to_json_obj(Doc, Options)) - end); + case Doc#doc.meta of + [] -> + % output etag only when we have no meta + couch_httpd:etag_respond(Req, DiskEtag, fun() -> + send_json(Req, 200, [{"Etag", DiskEtag}], couch_doc:to_json_obj(Doc, Options)) + end); + _ -> + send_json(Req, 200, [], couch_doc:to_json_obj(Doc, Options)) + end; _ -> {ok, Results} = couch_db:open_doc_revs(Db, DocId, Revs, Options), {ok, Resp} = start_json_response(Req, 200), @@ -626,14 +628,23 @@ db_doc_req(#httpd{method='POST'}=Req, Db, DocId) -> Rev = couch_doc:parse_rev(list_to_binary(proplists:get_value("_rev", Form))), {ok, [{ok, Doc}]} = couch_db:open_doc_revs(Db, DocId, [Rev], []), - NewAttachments = [ - {validate_attachment_name(Name), {list_to_binary(ContentType), Content}} || + UpdatedAtts = [ + #att{name=validate_attachment_name(Name), + type=list_to_binary(ContentType), + data=Content} || {Name, {ContentType, _}, Content} <- proplists:get_all_values("_attachments", Form) ], - #doc{attachments=Attachments} = Doc, + #doc{atts=OldAtts} = Doc, + OldAtts2 = lists:flatmap( + fun(#att{name=OldName}=Att) -> + case [1 || A <- UpdatedAtts, A#att.name == OldName] of + [] -> [Att]; % the attachment wasn't in the UpdatedAtts, return it + _ -> [] % the attachment was in the UpdatedAtts, drop it + end + end, OldAtts), NewDoc = Doc#doc{ - attachments = Attachments ++ NewAttachments + atts = UpdatedAtts ++ OldAtts2 }, {ok, NewRev} = couch_db:update_doc(Db, NewDoc, []), @@ -765,13 +776,12 @@ db_attachment_req(#httpd{method='GET'}=Req, Db, DocId, FileNameParts) -> options=Options } = parse_doc_query(Req), #doc{ - attachments=Attachments + atts=Atts } = Doc = couch_doc_open(Db, DocId, Rev, Options), - - case proplists:get_value(FileName, Attachments) of - undefined -> + case [A || A <- Atts, A#att.name == FileName] of + [] -> throw({not_found, "Document is missing attachment"}); - {Type, Bin} -> + [#att{type=Type}=Att] -> Etag = couch_httpd:doc_etag(Doc), couch_httpd:etag_respond(Req, Etag, fun() -> {ok, Resp} = start_chunked_response(Req, 200, [ @@ -784,7 +794,7 @@ db_attachment_req(#httpd{method='GET'}=Req, Db, DocId, FileNameParts) -> % open to discussion. % {"Content-Length", integer_to_list(couch_doc:bin_size(Bin))} ]), - couch_doc:bin_foldl(Bin, + couch_doc:att_foldl(Att, fun(BinSegment, _) -> send_chunk(Resp, BinSegment) end,[]), send_chunk(Resp, "") end) @@ -798,31 +808,36 @@ db_attachment_req(#httpd{method=Method}=Req, Db, DocId, FileNameParts) lists:map(fun binary_to_list/1, FileNameParts),"/")), - NewAttachment = case Method of + NewAtt = case Method of 'DELETE' -> []; _ -> - % see couch_db:doc_flush_binaries for usage of this structure - [{FileName, { - case couch_httpd:header_value(Req,"Content-Type") of - undefined -> - % We could throw an error here or guess by the FileName. - % Currently, just giving it a default. - <<"application/octet-stream">>; - CType -> - list_to_binary(CType) - end, - case couch_httpd:header_value(Req,"Content-Length") of - undefined -> - {fun(MaxChunkSize, ChunkFun, InitState) -> - couch_httpd:recv_chunked(Req, MaxChunkSize, - ChunkFun, InitState) - end, undefined}; - Length -> - {fun() -> couch_httpd:recv(Req, 0) end, - list_to_integer(Length)} - end - }}] + [#att{ + name=FileName, + type = case couch_httpd:header_value(Req,"Content-Type") of + undefined -> + % We could throw an error here or guess by the FileName. + % Currently, just giving it a default. + <<"application/octet-stream">>; + CType -> + list_to_binary(CType) + end, + data = case couch_httpd:header_value(Req,"Content-Length") of + undefined -> + fun(MaxChunkSize, ChunkFun, InitState) -> + couch_httpd:recv_chunked(Req, MaxChunkSize, + ChunkFun, InitState) + end; + Length -> + fun() -> couch_httpd:recv(Req, 0) end + end, + len = case couch_httpd:header_value(Req,"Content-Length") of + undefined -> + undefined; + Length -> + list_to_integer(Length) + end + }] end, Doc = case extract_header_rev(Req, couch_httpd:qs_value(Req, "rev")) of @@ -835,9 +850,9 @@ db_attachment_req(#httpd{method=Method}=Req, Db, DocId, FileNameParts) end end, - #doc{attachments=Attachments} = Doc, + #doc{atts=Atts} = Doc, DocEdited = Doc#doc{ - attachments = NewAttachment ++ proplists:delete(FileName, Attachments) + atts = NewAtt ++ [A || A <- Atts, A#att.name /= FileName] }, {ok, UpdatedRev} = couch_db:update_doc(Db, DocEdited, []), #db{name=DbName} = Db, @@ -941,9 +956,9 @@ parse_copy_destination_header(Req) -> end. validate_attachment_names(Doc) -> - lists:foreach(fun({Name, _}) -> + lists:foreach(fun(#att{name=Name}) -> validate_attachment_name(Name) - end, Doc#doc.attachments). + end, Doc#doc.atts). validate_attachment_name(Name) when is_list(Name) -> validate_attachment_name(list_to_binary(Name)); diff --git a/src/couchdb/couch_key_tree.erl b/src/couchdb/couch_key_tree.erl index 647a3455..830820f3 100644 --- a/src/couchdb/couch_key_tree.erl +++ b/src/couchdb/couch_key_tree.erl @@ -278,13 +278,20 @@ count_leafs_simple([{_Key, _Value, SubTree} | RestTree]) -> map(_Fun, []) -> []; map(Fun, [{Pos, Tree}|Rest]) -> - [NewTree] = map_simple(Fun, Pos, [Tree]), - [{Pos, NewTree} | map(Fun, Rest)]. + case erlang:fun_info(Fun, arity) of + {arity, 2} -> + [NewTree] = map_simple(fun(A,B,_C) -> Fun(A,B) end, Pos, [Tree]), + [{Pos, NewTree} | map(Fun, Rest)]; + {arity, 3} -> + [NewTree] = map_simple(Fun, Pos, [Tree]), + [{Pos, NewTree} | map(Fun, Rest)] + end. map_simple(_Fun, _Pos, []) -> []; map_simple(Fun, Pos, [{Key, Value, SubTree} | RestTree]) -> - Value2 = Fun({Pos, Key}, Value), + Value2 = Fun({Pos, Key}, Value, + if SubTree == [] -> leaf; true -> branch end), [{Key, Value2, map_simple(Fun, Pos + 1, SubTree)} | map_simple(Fun, Pos, RestTree)]. diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl index 7aec3b5d..3cb90347 100644 --- a/src/couchdb/couch_rep.erl +++ b/src/couchdb/couch_rep.erl @@ -312,8 +312,6 @@ terminate(normal, State) -> terminate(Reason, State) -> ?LOG_ERROR("replicator terminating with reason ~p", [Reason]), #state{ - context = Context, - current_seq = Seq, listeners = Listeners, source = Source, target = Target, @@ -390,25 +388,26 @@ attachment_loop(ReqId, Conn) -> exit(normal) end. -attachment_stub_converter(DbS, Id, Rev, {Name, {stub, Type, Length}}) -> +att_stub_converter(DbS, Id, Rev, + #att{name=Name,data=stub,type=Type,len=Length}=Att) -> #http_db{uri=DbUrl, headers=Headers} = DbS, {Pos, [RevId|_]} = Rev, Url = lists:flatten([DbUrl, url_encode(Id), "/", url_encode(?b2l(Name)), "?rev=", ?b2l(couch_doc:rev_to_str({Pos,RevId}))]), ?LOG_DEBUG("Attachment URL ~s", [Url]), - {ok, RcvFun} = make_attachment_stub_receiver(Url, Headers, Name, + {ok, RcvFun} = make_att_stub_receiver(Url, Headers, Name, Type, Length), - {Name, {Type, {RcvFun, Length}}}. + Att#att{name=Name,type=Type,data=RcvFun,len=Length}. -make_attachment_stub_receiver(Url, Headers, Name, Type, Length) -> - make_attachment_stub_receiver(Url, Headers, Name, Type, Length, 10, 1000). +make_att_stub_receiver(Url, Headers, Name, Type, Length) -> + make_att_stub_receiver(Url, Headers, Name, Type, Length, 10, 1000). -make_attachment_stub_receiver(Url, _Headers, _Name, _Type, _Length, 0, _Pause) -> +make_att_stub_receiver(Url, _Headers, _Name, _Type, _Length, 0, _Pause) -> ?LOG_ERROR("streaming attachment request failed after 10 retries: ~s", [Url]), exit({attachment_request_failed, ?l2b(["failed to replicate ", Url])}); -make_attachment_stub_receiver(Url, Headers, Name, Type, Length, Retries, Pause) -> +make_att_stub_receiver(Url, Headers, Name, Type, Length, Retries, Pause) -> %% start the process that receives attachment data from ibrowse #url{host=Host, port=Port} = ibrowse_lib:parse_url(Url), {ok, Conn} = ibrowse:spawn_link_worker_process(Host, Port), @@ -425,7 +424,7 @@ make_attachment_stub_receiver(Url, Headers, Name, Type, Length, Retries, Pause) "seconds due to {error, ~p}: ~s", [Pause/1000, Reason, Url]), catch ibrowse:stop_worker_process(Conn), timer:sleep(Pause), - make_attachment_stub_receiver(Url, Headers, Name, Type, Length, + make_att_stub_receiver(Url, Headers, Name, Type, Length, Retries-1, 2*Pause) end, @@ -437,7 +436,7 @@ make_attachment_stub_receiver(Url, Headers, Name, Type, Length, Retries, Pause) {'EXIT', Pid, _Reason} -> catch ibrowse:stop_worker_process(Conn), timer:sleep(Pause), - make_attachment_stub_receiver(Url, Headers, Name, Type, Length, + make_att_stub_receiver(Url, Headers, Name, Type, Length, Retries-1, 2*Pause) end, @@ -447,7 +446,7 @@ make_attachment_stub_receiver(Url, Headers, Name, Type, Length, Retries, Pause) receive {'EXIT', Pid, attachment_request_failed} -> catch ibrowse:stop_worker_process(Conn), - make_attachment_stub_receiver(Url, Headers, Name, Type, Length, + make_att_stub_receiver(Url, Headers, Name, Type, Length, Retries-1, Pause); {Pid, {status, StreamStatus, StreamHeaders}} -> ?LOG_DEBUG("streaming attachment Status ~p Headers ~p", @@ -476,7 +475,7 @@ make_attachment_stub_receiver(Url, Headers, Name, Type, Length, Retries, Pause) RedirectUrl = mochiweb_headers:get_value("Location", mochiweb_headers:make(StreamHeaders)), catch ibrowse:stop_worker_process(Conn), - make_attachment_stub_receiver(RedirectUrl, Headers, Name, Type, + make_att_stub_receiver(RedirectUrl, Headers, Name, Type, Length, Retries - 1, Pause); ResponseCode >= 400, ResponseCode < 500 -> % an error... log and fail @@ -491,7 +490,7 @@ make_attachment_stub_receiver(Url, Headers, Name, Type, Length, Retries, Pause) Pid ! {self(), fail}, catch ibrowse:stop_worker_process(Conn), timer:sleep(Pause), - make_attachment_stub_receiver(Url, Headers, Name, Type, Length, + make_att_stub_receiver(Url, Headers, Name, Type, Length, Retries - 1, 2*Pause) end end. @@ -772,10 +771,9 @@ open_doc_revs(#http_db{uri=DbUrl, headers=Headers} = DbS, DocId, Revs0, fun({[{<<"missing">>, Rev}]}) -> {{not_found, missing}, couch_doc:parse_rev(Rev)}; ({[{<<"ok">>, JsonDoc}]}) -> - #doc{id=Id, revs=Rev, attachments=Attach} = Doc = + #doc{id=Id, revs=Rev, atts=Atts} = Doc = couch_doc:from_json_obj(JsonDoc), - Attach2 = [attachment_stub_converter(DbS,Id,Rev,A) || A <- Attach], - {ok, Doc#doc{attachments=Attach2}} + {ok, Doc#doc{atts=[att_stub_converter(DbS,Id,Rev,A) || A <- Atts]}} end, JsonResults), {ok, Results}; open_doc_revs(Db, DocId, Revs, Options) -> diff --git a/src/couchdb/couch_stream.erl b/src/couchdb/couch_stream.erl index 34dc5a07..65cf7126 100644 --- a/src/couchdb/couch_stream.erl +++ b/src/couchdb/couch_stream.erl @@ -24,7 +24,8 @@ -define(DEFAULT_STREAM_CHUNK, 16#00100000). % 1 meg chunks when streaming data --export([open/1, close/1, write/2, foldl/4, old_foldl/5,old_copy_to_new_stream/4]). +-export([open/1, close/1, write/2, foldl/4, foldl/5, + old_foldl/5,old_copy_to_new_stream/4]). -export([copy_to_new_stream/3,old_read_term/2]). -export([init/1, terminate/2, handle_call/3]). -export([handle_cast/2,code_change/3,handle_info/2]). @@ -37,7 +38,8 @@ buffer_list = [], buffer_len = 0, max_buffer = 4096, - written_len = 0 + written_len = 0, + md5 }). @@ -79,6 +81,23 @@ foldl(Fd, [Pos|Rest], Fun, Acc) -> {ok, Bin} = couch_file:pread_iolist(Fd, Pos), foldl(Fd, Rest, Fun, Fun(Bin, Acc)). +foldl(Fd, PosList, <<>>, Fun, Acc) -> + foldl(Fd, PosList, Fun, Acc); +foldl(Fd, PosList, Md5, Fun, Acc) -> + foldl(Fd, PosList, Md5, erlang:md5_init(), Fun, Acc). + + +foldl(_Fd, [], Md5, Md5Acc, _Fun, Acc) -> + Md5 = erlang:md5_final(Md5Acc), + Acc; +foldl(Fd, [Pos], Md5, Md5Acc, Fun, Acc) -> + {ok, Bin} = couch_file:pread_iolist(Fd, Pos), + Md5 = erlang:md5_final(erlang:md5_update(Md5Acc, Bin)), + Fun(Bin, Acc); +foldl(Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc) -> + {ok, Bin} = couch_file:pread_iolist(Fd, Pos), + foldl(Fd, Rest, Md5, erlang:md5_update(Md5Acc, Bin), Fun, Fun(Bin, Acc)). + write(_Pid, <<>>) -> ok; write(Pid, Bin) -> @@ -86,7 +105,7 @@ write(Pid, Bin) -> init(Fd) -> - {ok, #stream{fd = Fd}}. + {ok, #stream{fd=Fd, md5=erlang:md5_init()}}. terminate(_Reason, _Stream) -> ok. @@ -99,14 +118,18 @@ handle_call({write, Bin}, _From, Stream) -> written_pointers = Written, buffer_len = BufferLen, buffer_list = Buffer, - max_buffer = Max} = Stream, + max_buffer = Max, + md5 = Md5} = Stream, if BinSize + BufferLen > Max -> - {ok, Pos} = couch_file:append_binary(Fd, lists:reverse(Buffer, [Bin])), + WriteBin = lists:reverse(Buffer, [Bin]), + Md5_2 = erlang:md5_update(Md5, WriteBin), + {ok, Pos} = couch_file:append_binary(Fd, WriteBin), {reply, ok, Stream#stream{ written_len=WrittenLen + BufferLen + BinSize, written_pointers=[Pos|Written], buffer_list=[], - buffer_len=0}}; + buffer_len=0, + md5=Md5_2}}; true -> {reply, ok, Stream#stream{ buffer_list=[Bin|Buffer], @@ -118,14 +141,17 @@ handle_call(close, _From, Stream) -> written_len = WrittenLen, written_pointers = Written, buffer_len = BufferLen, - buffer_list = Buffer} = Stream, + buffer_list = Buffer, + md5 = Md5} = Stream, case Buffer of [] -> - Result = {lists:reverse(Written), WrittenLen}; + Result = {lists:reverse(Written), WrittenLen, erlang:md5_final(Md5)}; _ -> - {ok, Pos} = couch_file:append_binary(Fd, lists:reverse(Buffer)), - Result = {lists:reverse(Written, [Pos]), WrittenLen + BufferLen} + WriteBin = lists:reverse(Buffer), + Md5Final = erlang:md5_final(erlang:md5_update(Md5, WriteBin)), + {ok, Pos} = couch_file:append_binary(Fd, WriteBin), + Result = {lists:reverse(Written, [Pos]), WrittenLen + BufferLen, Md5Final} end, {stop, normal, Result, Stream}. diff --git a/src/couchdb/couch_util.erl b/src/couchdb/couch_util.erl index 436dee38..817572bb 100644 --- a/src/couchdb/couch_util.erl +++ b/src/couchdb/couch_util.erl @@ -226,8 +226,8 @@ should_flush(MemThreshHold) -> %% Take 3 bytes a time (3 x 8 = 24 bits), and make 4 characters out of %% them (4 x 6 = 24 bits). %% -encodeBase64(Bs) when list(Bs) -> - encodeBase64(list_to_binary(Bs), <<>>); +encodeBase64(Bs) when is_list(Bs) -> + encodeBase64(iolist_to_binary(Bs), <<>>); encodeBase64(Bs) -> encodeBase64(Bs, <<>>). diff --git a/src/couchdb/couch_view.erl b/src/couchdb/couch_view.erl index 8842c44b..a0819ef5 100644 --- a/src/couchdb/couch_view.erl +++ b/src/couchdb/couch_view.erl @@ -78,7 +78,7 @@ cleanup_index_files(Db) -> {ok, DesignDocs} = couch_db:get_design_docs(Db), % make unique list of group sigs - Sigs = lists:map(fun(#doc{id = GroupId} = DDoc) -> + Sigs = lists:map(fun(#doc{id = GroupId}) -> {ok, Info} = get_group_info(Db, GroupId), ?b2l(proplists:get_value(signature, Info)) end, [DD||DD <- DesignDocs, DD#doc.deleted == false]), @@ -100,7 +100,7 @@ cleanup_index_files(Db) -> list_index_files(Db) -> % call server to fetch the index files RootDir = couch_config:get("couchdb", "view_index_dir"), - Files = filelib:wildcard(RootDir ++ "/." ++ ?b2l(couch_db:name(Db)) ++ "_design"++"/*"). + filelib:wildcard(RootDir ++ "/." ++ ?b2l(couch_db:name(Db)) ++ "_design"++"/*"). get_row_count(#view{btree=Bt}) -> diff --git a/src/couchdb/couch_view_group.erl b/src/couchdb/couch_view_group.erl index c4b495b0..57c3ad21 100644 --- a/src/couchdb/couch_view_group.erl +++ b/src/couchdb/couch_view_group.erl @@ -55,15 +55,6 @@ request_group_info(Pid) -> throw(Error) end. -request_index_files(Pid) -> - case gen_server:call(Pid, request_index_files) of - {ok, Filelist} -> - {ok, Filelist}; - Error -> - throw(Error) - end. - - % from template start_link(InitArgs) -> case gen_server:start_link(couch_view_group, |