summaryrefslogtreecommitdiff
path: root/src/couchdb/couch_db.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/couchdb/couch_db.erl')
-rw-r--r--src/couchdb/couch_db.erl179
1 files changed, 124 insertions, 55 deletions
diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl
index e1b36f42..6441e2e1 100644
--- a/src/couchdb/couch_db.erl
+++ b/src/couchdb/couch_db.erl
@@ -14,14 +14,14 @@
-behaviour(gen_server).
-export([open/2,close/1,create/2,start_compact/1,get_db_info/1]).
--export([open_ref_counted/2,num_refs/1,monitor/1]).
--export([save_docs/3,update_doc/3,update_docs/2,update_docs/3,delete_doc/3]).
+-export([open_ref_counted/3,num_refs/1,monitor/1]).
+-export([update_doc/3,update_docs/4,update_docs/2,update_docs/3,delete_doc/3]).
-export([get_doc_info/2,open_doc/2,open_doc/3,open_doc_revs/4]).
--export([get_missing_revs/2,name/1]).
+-export([get_missing_revs/2,name/1,doc_to_tree/1]).
-export([enum_docs/4,enum_docs/5,enum_docs_since/4,enum_docs_since/5]).
-export([enum_docs_since_reduce_to_count/1,enum_docs_reduce_to_count/1]).
-export([increment_update_seq/1,get_purge_seq/1,purge_docs/2,get_last_purged/1]).
--export([start_link/3]).
+-export([start_link/3,make_doc/2]).
-export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]).
@@ -33,19 +33,7 @@ start_link(DbName, Filepath, Options) ->
{ok, Fd} ->
StartResult = gen_server:start_link(couch_db, {DbName, Filepath, Fd, Options}, []),
unlink(Fd),
- case StartResult of
- {ok, _} ->
- % We successfully opened the db, delete old storage files if around
- case file:delete(Filepath ++ ".old") of
- ok ->
- ?LOG_INFO("Deleted old storage file ~s~s", [Filepath, ".old"]);
- {error, enoent} ->
- ok % normal result
- end,
- StartResult;
- Error ->
- Error
- end;
+ StartResult;
Else ->
Else
end.
@@ -79,8 +67,9 @@ open(DbName, Options) ->
close(#db{fd=Fd}) ->
couch_file:drop_ref(Fd).
-open_ref_counted(MainPid, OpeningPid) ->
- gen_server:call(MainPid, {open_ref_counted_instance, OpeningPid}).
+open_ref_counted(MainPid, OpeningPid, UserCred) ->
+ {ok, Db} = gen_server:call(MainPid, {open_ref_counted_instance, OpeningPid}),
+ {ok, Db#db{user_ctx=UserCred}}.
num_refs(MainPid) ->
gen_server:call(MainPid, num_refs).
@@ -213,39 +202,92 @@ group_alike_docs([Doc|Rest], [Bucket|RestBuckets]) ->
% add to new bucket
group_alike_docs(Rest, [[Doc]|[Bucket|RestBuckets]])
end.
-
-prepare_doc_for_new_edit(Db, #doc{id=Id,revs=[NewRev|PrevRevs]}=Doc, OldFullDocInfo, LeafRevsDict) ->
+validate_doc_update(#db{validate_doc_funs=[]}, Doc, _GetDiskDocFun) ->
+ Doc;
+validate_doc_update(_Db, #doc{id= <<"_design/",_/binary>>}=Doc, _GetDiskDocFun) ->
+ Doc;
+validate_doc_update(_Db, #doc{id= <<"_local/",_/binary>>}=Doc, _GetDiskDocFun) ->
+ Doc;
+validate_doc_update(#db{name=DbName,user_ctx={CtxProps}}=Db, Doc, GetDiskDocFun) ->
+ DiskDoc = GetDiskDocFun(),
+ [case Fun(Doc, DiskDoc, {[{<<"db">>, DbName} | CtxProps]}) of
+ ok -> ok;
+ Error -> throw(Error)
+ end || Fun <- Db#db.validate_doc_funs],
+ Doc.
+
+
+prep_and_validate_new_edit(Db, #doc{id=Id,revs=[NewRev|PrevRevs]}=Doc,
+ OldFullDocInfo, LeafRevsDict) ->
case PrevRevs of
[PrevRev|_] ->
case dict:find(PrevRev, LeafRevsDict) of
{ok, {Deleted, Sp, DiskRevs}} ->
- case couch_doc:has_stubs(Doc) of
+ Doc2 = Doc#doc{revs=[NewRev|DiskRevs]},
+ case couch_doc:has_stubs(Doc2) of
true ->
DiskDoc = make_doc(Db, Id, Deleted, Sp, DiskRevs),
- Doc2 = couch_doc:merge_stubs(Doc, DiskDoc),
- Doc2#doc{revs=[NewRev|DiskRevs]};
+ Doc3 = couch_doc:merge_stubs(Doc2, DiskDoc),
+ validate_doc_update(Db, Doc3, fun() -> DiskDoc end);
false ->
- Doc#doc{revs=[NewRev|DiskRevs]}
+ LoadDiskDoc = fun() -> make_doc(Db,Id,Deleted,Sp,DiskRevs) end,
+ validate_doc_update(Db, Doc2, LoadDiskDoc)
end;
error ->
throw(conflict)
end;
[] ->
- % new doc, and we have existing revs.
- OldDocInfo = couch_doc:to_doc_info(OldFullDocInfo),
- if OldDocInfo#doc_info.deleted ->
- % existing doc is a deleton
- % allow this new doc to be a later revision.
- {_Deleted, _Sp, Revs} = dict:fetch(OldDocInfo#doc_info.rev, LeafRevsDict),
- Doc#doc{revs=[NewRev|Revs]};
+ % new doc, and we have existing revs.
+ if OldFullDocInfo#full_doc_info.deleted ->
+ % existing docs are deletions
+ validate_doc_update(Db, Doc, nil);
true ->
throw(conflict)
end
end.
update_docs(#db{update_pid=UpdatePid}=Db, Docs, Options) ->
- % go ahead and generate the new revision ids for the documents.
+ update_docs(#db{update_pid=UpdatePid}=Db, Docs, Options, true).
+
+update_docs(Db, Docs, Options, false) ->
+ DocBuckets = group_alike_docs(Docs),
+ Ids = [Id || [#doc{id=Id}|_] <- DocBuckets],
+
+ ExistingDocs = get_full_doc_infos(Db, Ids),
+
+ DocBuckets2 = lists:zipwith(
+ fun(Bucket, not_found) ->
+ [validate_doc_update(Db, Doc, fun()-> nil end) || Doc <- Bucket];
+ (Bucket, {ok, #full_doc_info{rev_tree=OldRevTree}}) ->
+ NewTree = lists:foldl(
+ fun(Doc, RevTreeAcc) ->
+ couch_key_tree:merge(RevTreeAcc, doc_to_tree(Doc))
+ end,
+ OldRevTree, Bucket),
+ Leafs = couch_key_tree:get_all_leafs_full(NewTree),
+ LeafRevsFullDict = dict:from_list( [{Rev, FullPath} || [{Rev, _}|_]=FullPath <- Leafs]),
+ lists:flatmap(
+ fun(#doc{revs=[Rev|_]}=Doc) ->
+ case dict:find(Rev, LeafRevsFullDict) of
+ {ok, [{Rev, #doc{id=Id}}|_]=Path} ->
+ % our unflushed doc is a leaf node. Go back on the path
+ % to find the previous rev that's on disk.
+ LoadPrevRev = fun() ->
+ make_first_doc_on_disk(Db, Id, Path)
+ end,
+ [validate_doc_update(Db, Doc, LoadPrevRev)];
+ _ ->
+ % this doc isn't a leaf or is already exists in the tree. ignore
+ []
+ end
+ end, Bucket)
+ end,
+ DocBuckets, ExistingDocs),
+ write_and_commit(Db, DocBuckets2, Options);
+
+update_docs(Db, Docs, Options, true) ->
+ % go ahead and generate the new revision ids for the documents.
Docs2 = lists:map(
fun(#doc{id=Id,revs=Revs}=Doc) ->
case Id of
@@ -256,7 +298,6 @@ update_docs(#db{update_pid=UpdatePid}=Db, Docs, Options) ->
Doc#doc{revs=[list_to_binary(integer_to_list(couch_util:rand32())) | Revs]}
end
end, Docs),
- NewRevs = [NewRev || #doc{revs=[NewRev|_]} <- Docs2],
DocBuckets = group_alike_docs(Docs2),
Ids = [Id || [#doc{id=Id}|_] <- DocBuckets],
@@ -266,39 +307,51 @@ update_docs(#db{update_pid=UpdatePid}=Db, Docs, Options) ->
DocBuckets2 = lists:zipwith(
fun(Bucket, not_found) ->
- % no existing revs, make sure no old revision is specified.
+ % no existing revs on disk, make sure no old revs specified.
[throw(conflict) || #doc{revs=[_NewRev, _OldRev | _]} <- Bucket],
- Bucket;
+ [validate_doc_update(Db, Doc, fun()-> nil end) || Doc <- Bucket];
(Bucket, {ok, #full_doc_info{rev_tree=OldRevTree}=OldFullDocInfo}) ->
Leafs = couch_key_tree:get_all_leafs(OldRevTree),
LeafRevsDict = dict:from_list([{Rev, {Deleted, Sp, Revs}} || {Rev, {Deleted, Sp}, Revs} <- Leafs]),
- [prepare_doc_for_new_edit(Db, Doc, OldFullDocInfo, LeafRevsDict) || Doc <- Bucket]
+ [prep_and_validate_new_edit(Db, Doc, OldFullDocInfo, LeafRevsDict) || Doc <- Bucket]
end,
DocBuckets, ExistingDocs),
- % flush unwritten binaries to disk.
- DocBuckets3 = [[doc_flush_binaries(Doc, Db#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets2],
+ ok = write_and_commit(Db, DocBuckets2, [new_edits | Options]),
+ {ok, [NewRev ||#doc{revs=[NewRev|_]} <- Docs2]}.
+
+
+% Returns the first available document on disk. Input list is a full rev path
+% for the doc.
+make_first_doc_on_disk(_Db, _Id, []) ->
+ nil;
+make_first_doc_on_disk(Db, Id, [{_Rev, ?REV_MISSING}|RestPath]) ->
+ make_first_doc_on_disk(Db, Id, RestPath);
+make_first_doc_on_disk(Db, Id, [{_Rev, {IsDel, Sp}} |_]=DocPath) ->
+ Revs = [Rev || {Rev, _} <- DocPath],
+ make_doc(Db, Id, IsDel, Sp, Revs).
- case gen_server:call(UpdatePid, {update_docs, DocBuckets3, [new_edits | Options]}, infinity) of
- ok -> {ok, NewRevs};
+
+write_and_commit(#db{update_pid=UpdatePid}=Db, DocBuckets, Options) ->
+
+ % flush unwritten binaries to disk.
+ DocBuckets2 = [[doc_flush_binaries(Doc, Db#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets],
+ case gen_server:call(UpdatePid, {update_docs, DocBuckets2, Options}, infinity) of
+ ok -> ok;
retry ->
- {ok, Db2} = open_ref_counted(Db#db.main_pid, self()),
- DocBuckets4 = [[doc_flush_binaries(Doc, Db2#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets3],
+ % This can happen if the db file we wrote to was swapped out by
+ % compaction. Retry writing to the current file
+ {ok, Db2} = open_ref_counted(Db#db.main_pid, self(), {[]}),
+ DocBuckets3 = [[doc_flush_binaries(Doc, Db2#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets],
% We only retry once
close(Db2),
- case gen_server:call(UpdatePid, {update_docs, DocBuckets4, [new_edits | Options]}, infinity) of
- ok -> {ok, NewRevs};
+ case gen_server:call(UpdatePid, {update_docs, DocBuckets3, Options}, infinity) of
+ ok -> ok;
Else -> throw(Else)
end;
Else->
throw(Else)
end.
-save_docs(#db{update_pid=UpdatePid, fd=Fd}, Docs, Options) ->
- % flush unwritten binaries to disk.
- DocBuckets = group_alike_docs(Docs),
- DocBuckets2 = [[doc_flush_binaries(Doc, Fd) || Doc <- Bucket] || Bucket <- DocBuckets],
- ok = gen_server:call(UpdatePid, {update_docs, DocBuckets2, Options}, infinity).
-
doc_flush_binaries(Doc, Fd) ->
% calc size of binaries to write out
@@ -509,14 +562,30 @@ doc_meta_info(DocInfo, RevTree, Options) ->
end
end.
-make_doc(Db, Id, Deleted, SummaryPointer, RevisionPath) ->
+
+doc_to_tree(Doc) ->
+ doc_to_tree(Doc, lists:reverse(Doc#doc.revs)).
+
+doc_to_tree(Doc, [RevId]) ->
+ [{RevId, Doc, []}];
+doc_to_tree(Doc, [RevId | Rest]) ->
+ [{RevId, ?REV_MISSING, doc_to_tree(Doc, Rest)}].
+
+make_doc(Db, FullDocInfo) ->
+ {#doc_info{id=Id,deleted=Deleted,summary_pointer=Sp}, RevPath}
+ = couch_doc:to_doc_info_path(FullDocInfo),
+ make_doc(Db, Id, Deleted, Sp, RevPath).
+
+make_doc(#db{fd=Fd}=Db, Id, Deleted, BodySp, RevisionPath) ->
{BodyData, BinValues} =
- case SummaryPointer of
+ case BodySp of
nil ->
{[], []};
_ ->
- {ok, {BodyData0, BinValues0}} = couch_stream:read_term(Db#db.summary_stream, SummaryPointer),
- {BodyData0, [{Name, {Type, {Db#db.fd, Sp, Len}}} || {Name, {Type, Sp, Len}} <- BinValues0]}
+ {ok, {BodyData0, BinValues0}} =
+ couch_stream:read_term( Db#db.summary_stream, BodySp),
+ {BodyData0,
+ [{Name,{Type,{Fd,Sp,Len}}} || {Name,{Type,Sp,Len}} <- BinValues0]}
end,
#doc{
id = Id,