diff options
Diffstat (limited to 'src/couchdb/couch_db.erl')
-rw-r--r-- | src/couchdb/couch_db.erl | 179 |
1 files changed, 124 insertions, 55 deletions
diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl index e1b36f42..6441e2e1 100644 --- a/src/couchdb/couch_db.erl +++ b/src/couchdb/couch_db.erl @@ -14,14 +14,14 @@ -behaviour(gen_server). -export([open/2,close/1,create/2,start_compact/1,get_db_info/1]). --export([open_ref_counted/2,num_refs/1,monitor/1]). --export([save_docs/3,update_doc/3,update_docs/2,update_docs/3,delete_doc/3]). +-export([open_ref_counted/3,num_refs/1,monitor/1]). +-export([update_doc/3,update_docs/4,update_docs/2,update_docs/3,delete_doc/3]). -export([get_doc_info/2,open_doc/2,open_doc/3,open_doc_revs/4]). --export([get_missing_revs/2,name/1]). +-export([get_missing_revs/2,name/1,doc_to_tree/1]). -export([enum_docs/4,enum_docs/5,enum_docs_since/4,enum_docs_since/5]). -export([enum_docs_since_reduce_to_count/1,enum_docs_reduce_to_count/1]). -export([increment_update_seq/1,get_purge_seq/1,purge_docs/2,get_last_purged/1]). --export([start_link/3]). +-export([start_link/3,make_doc/2]). -export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]). @@ -33,19 +33,7 @@ start_link(DbName, Filepath, Options) -> {ok, Fd} -> StartResult = gen_server:start_link(couch_db, {DbName, Filepath, Fd, Options}, []), unlink(Fd), - case StartResult of - {ok, _} -> - % We successfully opened the db, delete old storage files if around - case file:delete(Filepath ++ ".old") of - ok -> - ?LOG_INFO("Deleted old storage file ~s~s", [Filepath, ".old"]); - {error, enoent} -> - ok % normal result - end, - StartResult; - Error -> - Error - end; + StartResult; Else -> Else end. @@ -79,8 +67,9 @@ open(DbName, Options) -> close(#db{fd=Fd}) -> couch_file:drop_ref(Fd). -open_ref_counted(MainPid, OpeningPid) -> - gen_server:call(MainPid, {open_ref_counted_instance, OpeningPid}). +open_ref_counted(MainPid, OpeningPid, UserCred) -> + {ok, Db} = gen_server:call(MainPid, {open_ref_counted_instance, OpeningPid}), + {ok, Db#db{user_ctx=UserCred}}. num_refs(MainPid) -> gen_server:call(MainPid, num_refs). @@ -213,39 +202,92 @@ group_alike_docs([Doc|Rest], [Bucket|RestBuckets]) -> % add to new bucket group_alike_docs(Rest, [[Doc]|[Bucket|RestBuckets]]) end. - -prepare_doc_for_new_edit(Db, #doc{id=Id,revs=[NewRev|PrevRevs]}=Doc, OldFullDocInfo, LeafRevsDict) -> +validate_doc_update(#db{validate_doc_funs=[]}, Doc, _GetDiskDocFun) -> + Doc; +validate_doc_update(_Db, #doc{id= <<"_design/",_/binary>>}=Doc, _GetDiskDocFun) -> + Doc; +validate_doc_update(_Db, #doc{id= <<"_local/",_/binary>>}=Doc, _GetDiskDocFun) -> + Doc; +validate_doc_update(#db{name=DbName,user_ctx={CtxProps}}=Db, Doc, GetDiskDocFun) -> + DiskDoc = GetDiskDocFun(), + [case Fun(Doc, DiskDoc, {[{<<"db">>, DbName} | CtxProps]}) of + ok -> ok; + Error -> throw(Error) + end || Fun <- Db#db.validate_doc_funs], + Doc. + + +prep_and_validate_new_edit(Db, #doc{id=Id,revs=[NewRev|PrevRevs]}=Doc, + OldFullDocInfo, LeafRevsDict) -> case PrevRevs of [PrevRev|_] -> case dict:find(PrevRev, LeafRevsDict) of {ok, {Deleted, Sp, DiskRevs}} -> - case couch_doc:has_stubs(Doc) of + Doc2 = Doc#doc{revs=[NewRev|DiskRevs]}, + case couch_doc:has_stubs(Doc2) of true -> DiskDoc = make_doc(Db, Id, Deleted, Sp, DiskRevs), - Doc2 = couch_doc:merge_stubs(Doc, DiskDoc), - Doc2#doc{revs=[NewRev|DiskRevs]}; + Doc3 = couch_doc:merge_stubs(Doc2, DiskDoc), + validate_doc_update(Db, Doc3, fun() -> DiskDoc end); false -> - Doc#doc{revs=[NewRev|DiskRevs]} + LoadDiskDoc = fun() -> make_doc(Db,Id,Deleted,Sp,DiskRevs) end, + validate_doc_update(Db, Doc2, LoadDiskDoc) end; error -> throw(conflict) end; [] -> - % new doc, and we have existing revs. - OldDocInfo = couch_doc:to_doc_info(OldFullDocInfo), - if OldDocInfo#doc_info.deleted -> - % existing doc is a deleton - % allow this new doc to be a later revision. - {_Deleted, _Sp, Revs} = dict:fetch(OldDocInfo#doc_info.rev, LeafRevsDict), - Doc#doc{revs=[NewRev|Revs]}; + % new doc, and we have existing revs. + if OldFullDocInfo#full_doc_info.deleted -> + % existing docs are deletions + validate_doc_update(Db, Doc, nil); true -> throw(conflict) end end. update_docs(#db{update_pid=UpdatePid}=Db, Docs, Options) -> - % go ahead and generate the new revision ids for the documents. + update_docs(#db{update_pid=UpdatePid}=Db, Docs, Options, true). + +update_docs(Db, Docs, Options, false) -> + DocBuckets = group_alike_docs(Docs), + Ids = [Id || [#doc{id=Id}|_] <- DocBuckets], + + ExistingDocs = get_full_doc_infos(Db, Ids), + + DocBuckets2 = lists:zipwith( + fun(Bucket, not_found) -> + [validate_doc_update(Db, Doc, fun()-> nil end) || Doc <- Bucket]; + (Bucket, {ok, #full_doc_info{rev_tree=OldRevTree}}) -> + NewTree = lists:foldl( + fun(Doc, RevTreeAcc) -> + couch_key_tree:merge(RevTreeAcc, doc_to_tree(Doc)) + end, + OldRevTree, Bucket), + Leafs = couch_key_tree:get_all_leafs_full(NewTree), + LeafRevsFullDict = dict:from_list( [{Rev, FullPath} || [{Rev, _}|_]=FullPath <- Leafs]), + lists:flatmap( + fun(#doc{revs=[Rev|_]}=Doc) -> + case dict:find(Rev, LeafRevsFullDict) of + {ok, [{Rev, #doc{id=Id}}|_]=Path} -> + % our unflushed doc is a leaf node. Go back on the path + % to find the previous rev that's on disk. + LoadPrevRev = fun() -> + make_first_doc_on_disk(Db, Id, Path) + end, + [validate_doc_update(Db, Doc, LoadPrevRev)]; + _ -> + % this doc isn't a leaf or is already exists in the tree. ignore + [] + end + end, Bucket) + end, + DocBuckets, ExistingDocs), + write_and_commit(Db, DocBuckets2, Options); + +update_docs(Db, Docs, Options, true) -> + % go ahead and generate the new revision ids for the documents. Docs2 = lists:map( fun(#doc{id=Id,revs=Revs}=Doc) -> case Id of @@ -256,7 +298,6 @@ update_docs(#db{update_pid=UpdatePid}=Db, Docs, Options) -> Doc#doc{revs=[list_to_binary(integer_to_list(couch_util:rand32())) | Revs]} end end, Docs), - NewRevs = [NewRev || #doc{revs=[NewRev|_]} <- Docs2], DocBuckets = group_alike_docs(Docs2), Ids = [Id || [#doc{id=Id}|_] <- DocBuckets], @@ -266,39 +307,51 @@ update_docs(#db{update_pid=UpdatePid}=Db, Docs, Options) -> DocBuckets2 = lists:zipwith( fun(Bucket, not_found) -> - % no existing revs, make sure no old revision is specified. + % no existing revs on disk, make sure no old revs specified. [throw(conflict) || #doc{revs=[_NewRev, _OldRev | _]} <- Bucket], - Bucket; + [validate_doc_update(Db, Doc, fun()-> nil end) || Doc <- Bucket]; (Bucket, {ok, #full_doc_info{rev_tree=OldRevTree}=OldFullDocInfo}) -> Leafs = couch_key_tree:get_all_leafs(OldRevTree), LeafRevsDict = dict:from_list([{Rev, {Deleted, Sp, Revs}} || {Rev, {Deleted, Sp}, Revs} <- Leafs]), - [prepare_doc_for_new_edit(Db, Doc, OldFullDocInfo, LeafRevsDict) || Doc <- Bucket] + [prep_and_validate_new_edit(Db, Doc, OldFullDocInfo, LeafRevsDict) || Doc <- Bucket] end, DocBuckets, ExistingDocs), - % flush unwritten binaries to disk. - DocBuckets3 = [[doc_flush_binaries(Doc, Db#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets2], + ok = write_and_commit(Db, DocBuckets2, [new_edits | Options]), + {ok, [NewRev ||#doc{revs=[NewRev|_]} <- Docs2]}. + + +% Returns the first available document on disk. Input list is a full rev path +% for the doc. +make_first_doc_on_disk(_Db, _Id, []) -> + nil; +make_first_doc_on_disk(Db, Id, [{_Rev, ?REV_MISSING}|RestPath]) -> + make_first_doc_on_disk(Db, Id, RestPath); +make_first_doc_on_disk(Db, Id, [{_Rev, {IsDel, Sp}} |_]=DocPath) -> + Revs = [Rev || {Rev, _} <- DocPath], + make_doc(Db, Id, IsDel, Sp, Revs). - case gen_server:call(UpdatePid, {update_docs, DocBuckets3, [new_edits | Options]}, infinity) of - ok -> {ok, NewRevs}; + +write_and_commit(#db{update_pid=UpdatePid}=Db, DocBuckets, Options) -> + + % flush unwritten binaries to disk. + DocBuckets2 = [[doc_flush_binaries(Doc, Db#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets], + case gen_server:call(UpdatePid, {update_docs, DocBuckets2, Options}, infinity) of + ok -> ok; retry -> - {ok, Db2} = open_ref_counted(Db#db.main_pid, self()), - DocBuckets4 = [[doc_flush_binaries(Doc, Db2#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets3], + % This can happen if the db file we wrote to was swapped out by + % compaction. Retry writing to the current file + {ok, Db2} = open_ref_counted(Db#db.main_pid, self(), {[]}), + DocBuckets3 = [[doc_flush_binaries(Doc, Db2#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets], % We only retry once close(Db2), - case gen_server:call(UpdatePid, {update_docs, DocBuckets4, [new_edits | Options]}, infinity) of - ok -> {ok, NewRevs}; + case gen_server:call(UpdatePid, {update_docs, DocBuckets3, Options}, infinity) of + ok -> ok; Else -> throw(Else) end; Else-> throw(Else) end. -save_docs(#db{update_pid=UpdatePid, fd=Fd}, Docs, Options) -> - % flush unwritten binaries to disk. - DocBuckets = group_alike_docs(Docs), - DocBuckets2 = [[doc_flush_binaries(Doc, Fd) || Doc <- Bucket] || Bucket <- DocBuckets], - ok = gen_server:call(UpdatePid, {update_docs, DocBuckets2, Options}, infinity). - doc_flush_binaries(Doc, Fd) -> % calc size of binaries to write out @@ -509,14 +562,30 @@ doc_meta_info(DocInfo, RevTree, Options) -> end end. -make_doc(Db, Id, Deleted, SummaryPointer, RevisionPath) -> + +doc_to_tree(Doc) -> + doc_to_tree(Doc, lists:reverse(Doc#doc.revs)). + +doc_to_tree(Doc, [RevId]) -> + [{RevId, Doc, []}]; +doc_to_tree(Doc, [RevId | Rest]) -> + [{RevId, ?REV_MISSING, doc_to_tree(Doc, Rest)}]. + +make_doc(Db, FullDocInfo) -> + {#doc_info{id=Id,deleted=Deleted,summary_pointer=Sp}, RevPath} + = couch_doc:to_doc_info_path(FullDocInfo), + make_doc(Db, Id, Deleted, Sp, RevPath). + +make_doc(#db{fd=Fd}=Db, Id, Deleted, BodySp, RevisionPath) -> {BodyData, BinValues} = - case SummaryPointer of + case BodySp of nil -> {[], []}; _ -> - {ok, {BodyData0, BinValues0}} = couch_stream:read_term(Db#db.summary_stream, SummaryPointer), - {BodyData0, [{Name, {Type, {Db#db.fd, Sp, Len}}} || {Name, {Type, Sp, Len}} <- BinValues0]} + {ok, {BodyData0, BinValues0}} = + couch_stream:read_term( Db#db.summary_stream, BodySp), + {BodyData0, + [{Name,{Type,{Fd,Sp,Len}}} || {Name,{Type,Sp,Len}} <- BinValues0]} end, #doc{ id = Id, |