summaryrefslogtreecommitdiff
path: root/src/couchdb/couch_db.erl
diff options
context:
space:
mode:
authorDamien F. Katz <damien@apache.org>2008-11-11 19:45:50 +0000
committerDamien F. Katz <damien@apache.org>2008-11-11 19:45:50 +0000
commit9044fc0234ed65056f087a86c7c117922f2a2c75 (patch)
treeea87133de1e3617c5d3d7366983b7f7039d6cffd /src/couchdb/couch_db.erl
parent538f455f2e2842d5caa33ed300d28b3cd52599b3 (diff)
Check in of initial validation and authorization work. This work is incomplete, as there is not yet any way of restricting who can update the design docs.
git-svn-id: https://svn.apache.org/repos/asf/incubator/couchdb/trunk@713132 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src/couchdb/couch_db.erl')
-rw-r--r--src/couchdb/couch_db.erl179
1 files changed, 124 insertions, 55 deletions
diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl
index e1b36f42..6441e2e1 100644
--- a/src/couchdb/couch_db.erl
+++ b/src/couchdb/couch_db.erl
@@ -14,14 +14,14 @@
-behaviour(gen_server).
-export([open/2,close/1,create/2,start_compact/1,get_db_info/1]).
--export([open_ref_counted/2,num_refs/1,monitor/1]).
--export([save_docs/3,update_doc/3,update_docs/2,update_docs/3,delete_doc/3]).
+-export([open_ref_counted/3,num_refs/1,monitor/1]).
+-export([update_doc/3,update_docs/4,update_docs/2,update_docs/3,delete_doc/3]).
-export([get_doc_info/2,open_doc/2,open_doc/3,open_doc_revs/4]).
--export([get_missing_revs/2,name/1]).
+-export([get_missing_revs/2,name/1,doc_to_tree/1]).
-export([enum_docs/4,enum_docs/5,enum_docs_since/4,enum_docs_since/5]).
-export([enum_docs_since_reduce_to_count/1,enum_docs_reduce_to_count/1]).
-export([increment_update_seq/1,get_purge_seq/1,purge_docs/2,get_last_purged/1]).
--export([start_link/3]).
+-export([start_link/3,make_doc/2]).
-export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]).
@@ -33,19 +33,7 @@ start_link(DbName, Filepath, Options) ->
{ok, Fd} ->
StartResult = gen_server:start_link(couch_db, {DbName, Filepath, Fd, Options}, []),
unlink(Fd),
- case StartResult of
- {ok, _} ->
- % We successfully opened the db, delete old storage files if around
- case file:delete(Filepath ++ ".old") of
- ok ->
- ?LOG_INFO("Deleted old storage file ~s~s", [Filepath, ".old"]);
- {error, enoent} ->
- ok % normal result
- end,
- StartResult;
- Error ->
- Error
- end;
+ StartResult;
Else ->
Else
end.
@@ -79,8 +67,9 @@ open(DbName, Options) ->
close(#db{fd=Fd}) ->
couch_file:drop_ref(Fd).
-open_ref_counted(MainPid, OpeningPid) ->
- gen_server:call(MainPid, {open_ref_counted_instance, OpeningPid}).
+open_ref_counted(MainPid, OpeningPid, UserCred) ->
+ {ok, Db} = gen_server:call(MainPid, {open_ref_counted_instance, OpeningPid}),
+ {ok, Db#db{user_ctx=UserCred}}.
num_refs(MainPid) ->
gen_server:call(MainPid, num_refs).
@@ -213,39 +202,92 @@ group_alike_docs([Doc|Rest], [Bucket|RestBuckets]) ->
% add to new bucket
group_alike_docs(Rest, [[Doc]|[Bucket|RestBuckets]])
end.
-
-prepare_doc_for_new_edit(Db, #doc{id=Id,revs=[NewRev|PrevRevs]}=Doc, OldFullDocInfo, LeafRevsDict) ->
+validate_doc_update(#db{validate_doc_funs=[]}, Doc, _GetDiskDocFun) ->
+ Doc;
+validate_doc_update(_Db, #doc{id= <<"_design/",_/binary>>}=Doc, _GetDiskDocFun) ->
+ Doc;
+validate_doc_update(_Db, #doc{id= <<"_local/",_/binary>>}=Doc, _GetDiskDocFun) ->
+ Doc;
+validate_doc_update(#db{name=DbName,user_ctx={CtxProps}}=Db, Doc, GetDiskDocFun) ->
+ DiskDoc = GetDiskDocFun(),
+ [case Fun(Doc, DiskDoc, {[{<<"db">>, DbName} | CtxProps]}) of
+ ok -> ok;
+ Error -> throw(Error)
+ end || Fun <- Db#db.validate_doc_funs],
+ Doc.
+
+
+prep_and_validate_new_edit(Db, #doc{id=Id,revs=[NewRev|PrevRevs]}=Doc,
+ OldFullDocInfo, LeafRevsDict) ->
case PrevRevs of
[PrevRev|_] ->
case dict:find(PrevRev, LeafRevsDict) of
{ok, {Deleted, Sp, DiskRevs}} ->
- case couch_doc:has_stubs(Doc) of
+ Doc2 = Doc#doc{revs=[NewRev|DiskRevs]},
+ case couch_doc:has_stubs(Doc2) of
true ->
DiskDoc = make_doc(Db, Id, Deleted, Sp, DiskRevs),
- Doc2 = couch_doc:merge_stubs(Doc, DiskDoc),
- Doc2#doc{revs=[NewRev|DiskRevs]};
+ Doc3 = couch_doc:merge_stubs(Doc2, DiskDoc),
+ validate_doc_update(Db, Doc3, fun() -> DiskDoc end);
false ->
- Doc#doc{revs=[NewRev|DiskRevs]}
+ LoadDiskDoc = fun() -> make_doc(Db,Id,Deleted,Sp,DiskRevs) end,
+ validate_doc_update(Db, Doc2, LoadDiskDoc)
end;
error ->
throw(conflict)
end;
[] ->
- % new doc, and we have existing revs.
- OldDocInfo = couch_doc:to_doc_info(OldFullDocInfo),
- if OldDocInfo#doc_info.deleted ->
- % existing doc is a deleton
- % allow this new doc to be a later revision.
- {_Deleted, _Sp, Revs} = dict:fetch(OldDocInfo#doc_info.rev, LeafRevsDict),
- Doc#doc{revs=[NewRev|Revs]};
+ % new doc, and we have existing revs.
+ if OldFullDocInfo#full_doc_info.deleted ->
+ % existing docs are deletions
+ validate_doc_update(Db, Doc, nil);
true ->
throw(conflict)
end
end.
update_docs(#db{update_pid=UpdatePid}=Db, Docs, Options) ->
- % go ahead and generate the new revision ids for the documents.
+ update_docs(#db{update_pid=UpdatePid}=Db, Docs, Options, true).
+
+update_docs(Db, Docs, Options, false) ->
+ DocBuckets = group_alike_docs(Docs),
+ Ids = [Id || [#doc{id=Id}|_] <- DocBuckets],
+
+ ExistingDocs = get_full_doc_infos(Db, Ids),
+
+ DocBuckets2 = lists:zipwith(
+ fun(Bucket, not_found) ->
+ [validate_doc_update(Db, Doc, fun()-> nil end) || Doc <- Bucket];
+ (Bucket, {ok, #full_doc_info{rev_tree=OldRevTree}}) ->
+ NewTree = lists:foldl(
+ fun(Doc, RevTreeAcc) ->
+ couch_key_tree:merge(RevTreeAcc, doc_to_tree(Doc))
+ end,
+ OldRevTree, Bucket),
+ Leafs = couch_key_tree:get_all_leafs_full(NewTree),
+ LeafRevsFullDict = dict:from_list( [{Rev, FullPath} || [{Rev, _}|_]=FullPath <- Leafs]),
+ lists:flatmap(
+ fun(#doc{revs=[Rev|_]}=Doc) ->
+ case dict:find(Rev, LeafRevsFullDict) of
+ {ok, [{Rev, #doc{id=Id}}|_]=Path} ->
+ % our unflushed doc is a leaf node. Go back on the path
+ % to find the previous rev that's on disk.
+ LoadPrevRev = fun() ->
+ make_first_doc_on_disk(Db, Id, Path)
+ end,
+ [validate_doc_update(Db, Doc, LoadPrevRev)];
+ _ ->
+ % this doc isn't a leaf or is already exists in the tree. ignore
+ []
+ end
+ end, Bucket)
+ end,
+ DocBuckets, ExistingDocs),
+ write_and_commit(Db, DocBuckets2, Options);
+
+update_docs(Db, Docs, Options, true) ->
+ % go ahead and generate the new revision ids for the documents.
Docs2 = lists:map(
fun(#doc{id=Id,revs=Revs}=Doc) ->
case Id of
@@ -256,7 +298,6 @@ update_docs(#db{update_pid=UpdatePid}=Db, Docs, Options) ->
Doc#doc{revs=[list_to_binary(integer_to_list(couch_util:rand32())) | Revs]}
end
end, Docs),
- NewRevs = [NewRev || #doc{revs=[NewRev|_]} <- Docs2],
DocBuckets = group_alike_docs(Docs2),
Ids = [Id || [#doc{id=Id}|_] <- DocBuckets],
@@ -266,39 +307,51 @@ update_docs(#db{update_pid=UpdatePid}=Db, Docs, Options) ->
DocBuckets2 = lists:zipwith(
fun(Bucket, not_found) ->
- % no existing revs, make sure no old revision is specified.
+ % no existing revs on disk, make sure no old revs specified.
[throw(conflict) || #doc{revs=[_NewRev, _OldRev | _]} <- Bucket],
- Bucket;
+ [validate_doc_update(Db, Doc, fun()-> nil end) || Doc <- Bucket];
(Bucket, {ok, #full_doc_info{rev_tree=OldRevTree}=OldFullDocInfo}) ->
Leafs = couch_key_tree:get_all_leafs(OldRevTree),
LeafRevsDict = dict:from_list([{Rev, {Deleted, Sp, Revs}} || {Rev, {Deleted, Sp}, Revs} <- Leafs]),
- [prepare_doc_for_new_edit(Db, Doc, OldFullDocInfo, LeafRevsDict) || Doc <- Bucket]
+ [prep_and_validate_new_edit(Db, Doc, OldFullDocInfo, LeafRevsDict) || Doc <- Bucket]
end,
DocBuckets, ExistingDocs),
- % flush unwritten binaries to disk.
- DocBuckets3 = [[doc_flush_binaries(Doc, Db#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets2],
+ ok = write_and_commit(Db, DocBuckets2, [new_edits | Options]),
+ {ok, [NewRev ||#doc{revs=[NewRev|_]} <- Docs2]}.
+
+
+% Returns the first available document on disk. Input list is a full rev path
+% for the doc.
+make_first_doc_on_disk(_Db, _Id, []) ->
+ nil;
+make_first_doc_on_disk(Db, Id, [{_Rev, ?REV_MISSING}|RestPath]) ->
+ make_first_doc_on_disk(Db, Id, RestPath);
+make_first_doc_on_disk(Db, Id, [{_Rev, {IsDel, Sp}} |_]=DocPath) ->
+ Revs = [Rev || {Rev, _} <- DocPath],
+ make_doc(Db, Id, IsDel, Sp, Revs).
- case gen_server:call(UpdatePid, {update_docs, DocBuckets3, [new_edits | Options]}, infinity) of
- ok -> {ok, NewRevs};
+
+write_and_commit(#db{update_pid=UpdatePid}=Db, DocBuckets, Options) ->
+
+ % flush unwritten binaries to disk.
+ DocBuckets2 = [[doc_flush_binaries(Doc, Db#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets],
+ case gen_server:call(UpdatePid, {update_docs, DocBuckets2, Options}, infinity) of
+ ok -> ok;
retry ->
- {ok, Db2} = open_ref_counted(Db#db.main_pid, self()),
- DocBuckets4 = [[doc_flush_binaries(Doc, Db2#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets3],
+ % This can happen if the db file we wrote to was swapped out by
+ % compaction. Retry writing to the current file
+ {ok, Db2} = open_ref_counted(Db#db.main_pid, self(), {[]}),
+ DocBuckets3 = [[doc_flush_binaries(Doc, Db2#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets],
% We only retry once
close(Db2),
- case gen_server:call(UpdatePid, {update_docs, DocBuckets4, [new_edits | Options]}, infinity) of
- ok -> {ok, NewRevs};
+ case gen_server:call(UpdatePid, {update_docs, DocBuckets3, Options}, infinity) of
+ ok -> ok;
Else -> throw(Else)
end;
Else->
throw(Else)
end.
-save_docs(#db{update_pid=UpdatePid, fd=Fd}, Docs, Options) ->
- % flush unwritten binaries to disk.
- DocBuckets = group_alike_docs(Docs),
- DocBuckets2 = [[doc_flush_binaries(Doc, Fd) || Doc <- Bucket] || Bucket <- DocBuckets],
- ok = gen_server:call(UpdatePid, {update_docs, DocBuckets2, Options}, infinity).
-
doc_flush_binaries(Doc, Fd) ->
% calc size of binaries to write out
@@ -509,14 +562,30 @@ doc_meta_info(DocInfo, RevTree, Options) ->
end
end.
-make_doc(Db, Id, Deleted, SummaryPointer, RevisionPath) ->
+
+doc_to_tree(Doc) ->
+ doc_to_tree(Doc, lists:reverse(Doc#doc.revs)).
+
+doc_to_tree(Doc, [RevId]) ->
+ [{RevId, Doc, []}];
+doc_to_tree(Doc, [RevId | Rest]) ->
+ [{RevId, ?REV_MISSING, doc_to_tree(Doc, Rest)}].
+
+make_doc(Db, FullDocInfo) ->
+ {#doc_info{id=Id,deleted=Deleted,summary_pointer=Sp}, RevPath}
+ = couch_doc:to_doc_info_path(FullDocInfo),
+ make_doc(Db, Id, Deleted, Sp, RevPath).
+
+make_doc(#db{fd=Fd}=Db, Id, Deleted, BodySp, RevisionPath) ->
{BodyData, BinValues} =
- case SummaryPointer of
+ case BodySp of
nil ->
{[], []};
_ ->
- {ok, {BodyData0, BinValues0}} = couch_stream:read_term(Db#db.summary_stream, SummaryPointer),
- {BodyData0, [{Name, {Type, {Db#db.fd, Sp, Len}}} || {Name, {Type, Sp, Len}} <- BinValues0]}
+ {ok, {BodyData0, BinValues0}} =
+ couch_stream:read_term( Db#db.summary_stream, BodySp),
+ {BodyData0,
+ [{Name,{Type,{Fd,Sp,Len}}} || {Name,{Type,Sp,Len}} <- BinValues0]}
end,
#doc{
id = Id,