summaryrefslogtreecommitdiff
path: root/src/couchdb/couch_db.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/couchdb/couch_db.erl')
-rw-r--r--src/couchdb/couch_db.erl304
1 files changed, 179 insertions, 125 deletions
diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl
index 9cb887ea..9dc1fce8 100644
--- a/src/couchdb/couch_db.erl
+++ b/src/couchdb/couch_db.erl
@@ -24,7 +24,7 @@
-export([increment_update_seq/1,get_purge_seq/1,purge_docs/2,get_last_purged/1]).
-export([start_link/3,open_doc_int/3,set_admins/2,get_admins/1,ensure_full_commit/1]).
-export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]).
--export([changes_since/5,read_doc/2]).
+-export([changes_since/5,read_doc/2,new_revid/1]).
-include("couch_db.hrl").
@@ -295,11 +295,11 @@ validate_doc_update(#db{name=DbName,user_ctx=Ctx}=Db, Doc, GetDiskDocFun) ->
end.
-prep_and_validate_update(Db, #doc{id=Id,revs={RevStart, [_NewRev|PrevRevs]}}=Doc,
- OldFullDocInfo, LeafRevsDict) ->
- case PrevRevs of
+prep_and_validate_update(Db, #doc{id=Id,revs={RevStart, Revs}}=Doc,
+ OldFullDocInfo, LeafRevsDict, AllowConflict) ->
+ case Revs of
[PrevRev|_] ->
- case dict:find({RevStart-1, PrevRev}, LeafRevsDict) of
+ case dict:find({RevStart, PrevRev}, LeafRevsDict) of
{ok, {Deleted, DiskSp, DiskRevs}} ->
case couch_doc:has_stubs(Doc) of
true ->
@@ -310,13 +310,21 @@ prep_and_validate_update(Db, #doc{id=Id,revs={RevStart, [_NewRev|PrevRevs]}}=Doc
LoadDiskDoc = fun() -> make_doc(Db,Id,Deleted,DiskSp,DiskRevs) end,
{validate_doc_update(Db, Doc, LoadDiskDoc), Doc}
end;
+ error when AllowConflict ->
+ {validate_doc_update(Db, Doc, fun() -> nil end), Doc};
error ->
{conflict, Doc}
end;
[] ->
% new doc, and we have existing revs.
+ % reuse existing deleted doc
if OldFullDocInfo#full_doc_info.deleted ->
% existing docs are deletions
+ #doc_info{revs=[#rev_info{rev={Pos, DelRevId}}|_]} =
+ couch_doc:to_doc_info(OldFullDocInfo),
+ Doc2 = Doc#doc{revs={Pos, [DelRevId]}},
+ {validate_doc_update(Db, Doc2, fun() -> nil end), Doc2};
+ AllowConflict ->
{validate_doc_update(Db, Doc, fun() -> nil end), Doc};
true ->
{conflict, Doc}
@@ -325,49 +333,51 @@ prep_and_validate_update(Db, #doc{id=Id,revs={RevStart, [_NewRev|PrevRevs]}}=Doc
-prep_and_validate_updates(_Db, [], [], AccPrepped, AccFatalErrors) ->
+prep_and_validate_updates(_Db, [], [], _AllowConflict, AccPrepped,
+ AccFatalErrors) ->
{AccPrepped, AccFatalErrors};
-prep_and_validate_updates(Db, [DocBucket|RestBuckets], [not_found|RestLookups], AccPrepped, AccErrors) ->
+prep_and_validate_updates(Db, [DocBucket|RestBuckets], [not_found|RestLookups],
+ AllowConflict, AccPrepped, AccErrors) ->
[#doc{id=Id}|_]=DocBucket,
% no existing revs are known,
{PreppedBucket, AccErrors3} = lists:foldl(
fun(#doc{revs=Revs}=Doc, {AccBucket, AccErrors2}) ->
case Revs of
- {Pos, [NewRev,_OldRev|_]} ->
- % old revs specified but none exist, a conflict
- {AccBucket, [{{Id, {Pos, NewRev}}, conflict} | AccErrors2]};
- {Pos, [NewRev]} ->
+ {0, []} ->
case validate_doc_update(Db, Doc, fun() -> nil end) of
ok ->
{[Doc | AccBucket], AccErrors2};
Error ->
- {AccBucket, [{{Id, {Pos, NewRev}}, Error} | AccErrors2]}
- end
+ {AccBucket, [{{Id, {0, []}}, Error} | AccErrors2]}
+ end;
+ _ ->
+ % old revs specified but none exist, a conflict
+ {AccBucket, [{{Id, Revs}, conflict} | AccErrors2]}
end
end,
{[], AccErrors}, DocBucket),
- prep_and_validate_updates(Db, RestBuckets, RestLookups,
+ prep_and_validate_updates(Db, RestBuckets, RestLookups, AllowConflict,
[PreppedBucket | AccPrepped], AccErrors3);
prep_and_validate_updates(Db, [DocBucket|RestBuckets],
[{ok, #full_doc_info{rev_tree=OldRevTree}=OldFullDocInfo}|RestLookups],
- AccPrepped, AccErrors) ->
+ AllowConflict, AccPrepped, AccErrors) ->
Leafs = couch_key_tree:get_all_leafs(OldRevTree),
LeafRevsDict = dict:from_list([{{Start, RevId}, {Deleted, Sp, Revs}} ||
{{Deleted, Sp, _Seq}, {Start, [RevId|_]}=Revs} <- Leafs]),
{PreppedBucket, AccErrors3} = lists:foldl(
fun(Doc, {Docs2Acc, AccErrors2}) ->
case prep_and_validate_update(Db, Doc, OldFullDocInfo,
- LeafRevsDict) of
+ LeafRevsDict, AllowConflict) of
{ok, Doc2} ->
{[Doc2 | Docs2Acc], AccErrors2};
- {Error, #doc{id=Id,revs={Pos, [NewRev|_]}}} ->
+ {Error, #doc{id=Id,revs=Revs}} ->
% Record the error
- {Docs2Acc, [{{Id, {Pos, NewRev}}, Error} |AccErrors2]}
+ {Docs2Acc, [{{Id, Revs}, Error} |AccErrors2]}
end
end,
{[], AccErrors}, DocBucket),
- prep_and_validate_updates(Db, RestBuckets, RestLookups, [PreppedBucket | AccPrepped], AccErrors3).
+ prep_and_validate_updates(Db, RestBuckets, RestLookups, AllowConflict, [PreppedBucket | AccPrepped], AccErrors3).
update_docs(#db{update_pid=UpdatePid}=Db, Docs, Options) ->
@@ -408,32 +418,12 @@ prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldI
{ok, {Start, Path}} ->
% our unflushed doc is a leaf node. Go back on the path
% to find the previous rev that's on disk.
- PrevRevResult =
- case couch_doc:has_stubs(Doc) of
- true ->
- [_PrevRevFull | [PrevRevFull | _]=PrevPath] = Path,
- case PrevRevFull of
- {_RevId, ?REV_MISSING} ->
- conflict;
- {RevId, {IsDel, DiskSp, _Seq}} ->
- DiskDoc = make_doc(Db, Id, IsDel, DiskSp, PrevPath),
- Doc2 = couch_doc:merge_stubs(Doc, DiskDoc),
- {ok, Doc2, fun() -> DiskDoc end}
- end;
- false ->
- {ok, Doc,
- fun() ->
+ LoadPrevRevFun = fun() ->
make_first_doc_on_disk(Db,Id,Start-1, tl(Path))
- end}
- end,
- case PrevRevResult of
- {ok, NewDoc, LoadPrevRevFun} ->
- case validate_doc_update(Db, NewDoc, LoadPrevRevFun) of
- ok ->
- {[NewDoc | AccValidated], AccErrors2};
- Error ->
- {AccValidated, [{NewDoc, Error} | AccErrors2]}
- end;
+ end,
+ case validate_doc_update(Db, Doc, LoadPrevRevFun) of
+ ok ->
+ {[Doc | AccValidated], AccErrors2};
Error ->
{AccValidated, [{Doc, Error} | AccErrors2]}
end;
@@ -444,9 +434,47 @@ prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldI
end
end,
{[], AccErrors}, Bucket),
- prep_and_validate_replicated_updates(Db, RestBuckets, RestOldInfo, [ValidatedBucket | AccPrepped], AccErrors3)
+ prep_and_validate_replicated_updates(Db, RestBuckets, RestOldInfo,
+ [ValidatedBucket | AccPrepped], AccErrors3)
end.
+
+
+new_revid(#doc{body=Body,revs={OldStart,OldRevs},
+ atts=Atts,deleted=Deleted}) ->
+ case [{N, T, M} || #att{name=N,type=T,md5=M} <- Atts, M /= <<>>] of
+ Atts2 when length(Atts) /= length(Atts2) ->
+ % We must have old style non-md5 attachments
+ ?l2b(integer_to_list(couch_util:rand32()));
+ Atts2 ->
+ OldRev = case OldRevs of [] -> 0; [OldRev0|_] -> OldRev0 end,
+ erlang:md5(term_to_binary([Deleted, OldStart, OldRev, Body, Atts2]))
+ end.
+
+new_revs([], OutBuckets, IdRevsAcc) ->
+ {lists:reverse(OutBuckets), IdRevsAcc};
+new_revs([Bucket|RestBuckets], OutBuckets, IdRevsAcc) ->
+ {NewBucket, IdRevsAcc3} = lists:mapfoldl(
+ fun(#doc{id=Id,revs={Start, RevIds}}=Doc, IdRevsAcc2)->
+ NewRevId = new_revid(Doc),
+ {Doc#doc{revs={Start+1, [NewRevId | RevIds]}},
+ [{{Id, {Start, RevIds}}, {ok, {Start+1, NewRevId}}} | IdRevsAcc2]}
+ end, IdRevsAcc, Bucket),
+ new_revs(RestBuckets, [NewBucket|OutBuckets], IdRevsAcc3).
+
+check_dup_atts([#att{name=N1}, #att{name=N2} | _]) when N1 == N2 ->
+ throw({bad_request, <<"Duplicate attachments">>});
+check_dup_atts([_, _ | Rest]) ->
+ check_dup_atts(Rest);
+check_dup_atts(_) ->
+ ok.
+
+sort_and_check_atts(#doc{atts=Atts}=Doc) ->
+ Atts2 = lists:sort(fun(#att{name=N1}, #att{name=N2}) -> N1 < N2 end, Atts),
+ check_dup_atts(Atts2),
+ Doc#doc{atts=Atts2}.
+
+
update_docs(Db, Docs, Options, replicated_changes) ->
couch_stats_collector:increment({couchdb, database_writes}),
DocBuckets = group_alike_docs(Docs),
@@ -467,70 +495,74 @@ update_docs(Db, Docs, Options, replicated_changes) ->
DocErrors = [],
DocBuckets3 = DocBuckets
end,
- {ok, []} = write_and_commit(Db, DocBuckets3, [merge_conflicts | Options]),
+ DocBuckets4 = [[doc_flush_atts(sort_and_check_atts(Doc), Db#db.fd)
+ || Doc <- Bucket] || Bucket <- DocBuckets3],
+ {ok, []} = write_and_commit(Db, DocBuckets4, [], [merge_conflicts | Options]),
{ok, DocErrors};
update_docs(Db, Docs, Options, interactive_edit) ->
couch_stats_collector:increment({couchdb, database_writes}),
AllOrNothing = lists:member(all_or_nothing, Options),
% go ahead and generate the new revision ids for the documents.
- Docs2 = lists:map(
- fun(#doc{id=Id,revs={Start, RevIds}}=Doc) ->
+ % separate out the NonRep documents from the rest of the documents
+ {Docs2, NonRepDocs} = lists:foldl(
+ fun(#doc{id=Id}=Doc, {DocsAcc, NonRepDocsAcc}) ->
case Id of
<<?LOCAL_DOC_PREFIX, _/binary>> ->
- Rev = case RevIds of [] -> 0; [Rev0|_] -> list_to_integer(?b2l(Rev0)) end,
- Doc#doc{revs={Start, [?l2b(integer_to_list(Rev + 1))]}};
- _ ->
- Doc#doc{revs={Start+1, [?l2b(integer_to_list(couch_util:rand32())) | RevIds]}}
+ {DocsAcc, [Doc | NonRepDocsAcc]};
+ Id->
+ {[Doc | DocsAcc], NonRepDocsAcc}
end
- end, Docs),
+ end, {[], []}, Docs),
+
DocBuckets = group_alike_docs(Docs2),
case (Db#db.validate_doc_funs /= []) orelse
lists:any(
fun(#doc{id= <<?DESIGN_DOC_PREFIX, _/binary>>}) ->
true;
- (#doc{attachments=Atts}) ->
+ (#doc{atts=Atts}) ->
Atts /= []
- end, Docs) of
+ end, Docs2) of
true ->
% lookup the doc by id and get the most recent
Ids = [Id || [#doc{id=Id}|_] <- DocBuckets],
ExistingDocInfos = get_full_doc_infos(Db, Ids),
- {DocBucketsPrepped, Failures} =
- case AllOrNothing of
- true ->
- prep_and_validate_replicated_updates(Db, DocBuckets,
- ExistingDocInfos, [], []);
- false ->
- prep_and_validate_updates(Db, DocBuckets, ExistingDocInfos, [], [])
- end,
+ {DocBucketsPrepped, PreCommitFailures} = prep_and_validate_updates(Db,
+ DocBuckets, ExistingDocInfos, AllOrNothing, [], []),
% strip out any empty buckets
DocBuckets2 = [Bucket || [_|_] = Bucket <- DocBucketsPrepped];
false ->
- Failures = [],
+ PreCommitFailures = [],
DocBuckets2 = DocBuckets
end,
- if (AllOrNothing) and (Failures /= []) ->
- {aborted, Failures};
+ if (AllOrNothing) and (PreCommitFailures /= []) ->
+ {aborted, lists:map(
+ fun({{Id,{Pos, [RevId|_]}}, Error}) ->
+ {{Id, {Pos, RevId}}, Error};
+ ({{Id,{0, []}}, Error}) ->
+ {{Id, {0, <<>>}}, Error}
+ end, PreCommitFailures)};
true ->
Options2 = if AllOrNothing -> [merge_conflicts];
true -> [] end ++ Options,
- {ok, CommitFailures} = write_and_commit(Db, DocBuckets2, Options2),
- FailDict = dict:from_list(CommitFailures ++ Failures),
- % the output for each is either {ok, NewRev} or Error
+ DocBuckets3 = [[
+ doc_flush_atts(set_new_att_revpos(
+ sort_and_check_atts(Doc)), Db#db.fd)
+ || Doc <- B] || B <- DocBuckets2],
+ {DocBuckets4, IdRevs} = new_revs(DocBuckets3, [], []),
+
+ {ok, CommitResults} = write_and_commit(Db, DocBuckets4, NonRepDocs, Options2),
+
+ ResultsDict = dict:from_list(IdRevs ++ CommitResults ++ PreCommitFailures),
{ok, lists:map(
- fun(#doc{id=Id,revs={Pos, [NewRevId|_]}}) ->
- case dict:find({Id, {Pos, NewRevId}}, FailDict) of
- {ok, Error} ->
- Error;
- error ->
- {ok, {Pos, NewRevId}}
- end
- end, Docs2)}
+ fun(#doc{id=Id,revs={Pos, RevIds}}) ->
+ {ok, Result} = dict:find({Id, {Pos, RevIds}}, ResultsDict),
+ Result
+ end, Docs)}
end.
% Returns the first available document on disk. Input list is a full rev path
@@ -545,78 +577,81 @@ make_first_doc_on_disk(Db, Id, Pos, [{_Rev, {IsDel, Sp, _Seq}} |_]=DocPath) ->
write_and_commit(#db{update_pid=UpdatePid, user_ctx=Ctx}=Db, DocBuckets,
- Options) ->
- % flush unwritten binaries to disk.
- DocBuckets2 = [[doc_flush_binaries(Doc, Db#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets],
- case gen_server:call(UpdatePid, {update_docs, DocBuckets2, Options}, infinity) of
- {ok, Conflicts} -> {ok, Conflicts};
+ NonRepDocs, Options) ->
+ case gen_server:call(UpdatePid,
+ {update_docs, DocBuckets, NonRepDocs, Options}, infinity) of
+ {ok, Results} -> {ok, Results};
retry ->
% This can happen if the db file we wrote to was swapped out by
% compaction. Retry by reopening the db and writing to the current file
{ok, Db2} = open_ref_counted(Db#db.main_pid, Ctx),
- DocBuckets3 = [[doc_flush_binaries(Doc, Db2#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets],
+ DocBuckets2 = [[doc_flush_atts(Doc, Db2#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets],
% We only retry once
close(Db2),
- case gen_server:call(UpdatePid, {update_docs, DocBuckets3, Options}, infinity) of
- {ok, Conflicts} -> {ok, Conflicts};
+ case gen_server:call(UpdatePid, {update_docs, DocBuckets2, NonRepDocs, Options}, infinity) of
+ {ok, Results} -> {ok, Results};
retry -> throw({update_error, compaction_retry})
end
end.
+set_new_att_revpos(#doc{revs={RevPos,_Revs},atts=Atts}=Doc) ->
+ Doc#doc{atts= lists:map(fun(#att{data={_Fd,_Sp}}=Att) ->
+ % already commited to disk, do not set new rev
+ Att;
+ (Att) ->
+ Att#att{revpos=RevPos+1}
+ end, Atts)}.
+
-doc_flush_binaries(Doc, Fd) ->
- NewAttachments = lists:map(
- fun({Key, {Type, BinValue}}) ->
- NewBinValue = flush_binary(Fd, BinValue),
- {Key, {Type, NewBinValue}}
- end, Doc#doc.attachments),
- Doc#doc{attachments = NewAttachments}.
+doc_flush_atts(Doc, Fd) ->
+ Doc#doc{atts=[flush_att(Fd, Att) || Att <- Doc#doc.atts]}.
-flush_binary(Fd, {Fd0, StreamPointer, Len}) when Fd0 == Fd ->
- % already written to our file, nothing to write
- {Fd, StreamPointer, Len};
+check_md5(_NewSig, <<>>) -> ok;
+check_md5(Sig1, Sig2) when Sig1 == Sig2 -> ok;
+check_md5(_, _) -> throw(data_corruption).
-flush_binary(Fd, {OtherFd, StreamPointer, Len}) when is_tuple(StreamPointer) ->
- {NewStreamData, Len} =
- couch_stream:old_copy_to_new_stream(OtherFd, StreamPointer, Len, Fd),
- {Fd, NewStreamData, Len};
+flush_att(Fd, #att{data={Fd0, _}}=Att) when Fd0 == Fd ->
+ % already written to our file, nothing to write
+ Att;
-flush_binary(Fd, {OtherFd, StreamPointer, Len}) ->
- {NewStreamData, Len} =
+flush_att(Fd, #att{data={OtherFd,StreamPointer}, md5=InMd5}=Att) ->
+ {NewStreamData, Len, Md5} =
couch_stream:copy_to_new_stream(OtherFd, StreamPointer, Fd),
- {Fd, NewStreamData, Len};
+ check_md5(Md5, InMd5),
+ Att#att{data={Fd, NewStreamData}, md5=Md5, len=Len};
-flush_binary(Fd, Bin) when is_binary(Bin) ->
- with_stream(Fd, fun(OutputStream) ->
- couch_stream:write(OutputStream, Bin)
+flush_att(Fd, #att{data=Data}=Att) when is_binary(Data) ->
+ with_stream(Fd, Att, fun(OutputStream) ->
+ couch_stream:write(OutputStream, Data)
end);
-flush_binary(Fd, {StreamFun, undefined}) when is_function(StreamFun) ->
- with_stream(Fd, fun(OutputStream) ->
- % StreamFun(MaxChunkSize, WriterFun) must call WriterFun
+flush_att(Fd, #att{data=Fun,len=undefined}=Att) when is_function(Fun) ->
+ with_stream(Fd, Att, fun(OutputStream) ->
+ % Fun(MaxChunkSize, WriterFun) must call WriterFun
% once for each chunk of the attachment,
- StreamFun(4096,
+ Fun(4096,
% WriterFun({Length, Binary}, State)
% WriterFun({0, _Footers}, State)
% Called with Length == 0 on the last time.
% WriterFun returns NewState.
fun({0, _Footers}, _) ->
ok;
- ({_Length, Bin}, _) ->
- couch_stream:write(OutputStream, Bin)
+ ({_Length, Chunk}, _) ->
+ couch_stream:write(OutputStream, Chunk)
end, ok)
end);
-flush_binary(Fd, {Fun, Len}) when is_function(Fun) ->
- with_stream(Fd, fun(OutputStream) ->
+flush_att(Fd, #att{data=Fun,len=Len}=Att) when is_function(Fun) ->
+ with_stream(Fd, Att, fun(OutputStream) ->
write_streamed_attachment(OutputStream, Fun, Len)
end).
-with_stream(Fd, Fun) ->
+with_stream(Fd, #att{md5=InMd5}=Att, Fun) ->
{ok, OutputStream} = couch_stream:open(Fd),
Fun(OutputStream),
- {StreamInfo, Len} = couch_stream:close(OutputStream),
- {Fd, StreamInfo, Len}.
+ {StreamInfo, Len, Md5} = couch_stream:close(OutputStream),
+ check_md5(Md5, InMd5),
+ Att#att{data={Fd,StreamInfo},len=Len,md5=Md5}.
write_streamed_attachment(_Stream, _F, 0) ->
@@ -832,11 +867,15 @@ doc_meta_info(#doc_info{high_seq=Seq,revs=[#rev_info{rev=Rev}|RestInfo]}, RevTre
true -> [{local_seq, Seq}]
end.
-read_doc(Fd, Pos) when is_integer(Pos) ->
- couch_file:pread_term(Fd, Pos);
-read_doc(Fd, OldStyleStreamPointer) ->
+read_doc(#db{fd=Fd}, OldStreamPointer) when is_tuple(OldStreamPointer) ->
+ % 09 UPGRADE CODE
+ couch_stream:old_read_term(Fd, OldStreamPointer);
+read_doc(#db{header=#db_header{disk_version=Version},fd=Fd}, Pos)
+ when Version == 3 ->
% 09 UPGRADE CODE
- couch_stream:old_read_term(Fd, OldStyleStreamPointer).
+ couch_file:pread_term(Fd, Pos);
+read_doc(#db{fd=Fd}, Pos) ->
+ couch_file:pread_term_md5(Fd, Pos).
doc_to_tree(#doc{revs={Start, RevIds}}=Doc) ->
@@ -850,21 +889,36 @@ doc_to_tree_simple(Doc, [RevId | Rest]) ->
[{RevId, ?REV_MISSING, doc_to_tree_simple(Doc, Rest)}].
-make_doc(#db{fd=Fd}, Id, Deleted, Bp, RevisionPath) ->
- {BodyData, BinValues} =
+make_doc(#db{fd=Fd}=Db, Id, Deleted, Bp, RevisionPath) ->
+ {BodyData, Atts} =
case Bp of
nil ->
{[], []};
_ ->
- {ok, {BodyData0, BinValues0}} = read_doc(Fd, Bp),
+ {ok, {BodyData0, Atts0}} = read_doc(Db, Bp),
{BodyData0,
- [{Name,{Type,{Fd,Sp,Len}}} || {Name,{Type,Sp,Len}} <- BinValues0]}
+ lists:map(
+ fun({Name,Type,Sp,Len,RevPos,Md5}) ->
+ #att{name=Name,
+ type=Type,
+ len=Len,
+ md5=Md5,
+ revpos=RevPos,
+ data={Fd,Sp}};
+ ({Name,{Type,Sp,Len}}) ->
+ #att{name=Name,
+ type=Type,
+ len=Len,
+ md5= <<>>,
+ revpos=0,
+ data={Fd,Sp}}
+ end, Atts0)}
end,
#doc{
id = Id,
revs = RevisionPath,
body = BodyData,
- attachments = BinValues,
+ atts = Atts,
deleted = Deleted
}.