diff options
Diffstat (limited to 'src/couchdb/couch_db.erl')
-rw-r--r-- | src/couchdb/couch_db.erl | 1217 |
1 files changed, 0 insertions, 1217 deletions
diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl deleted file mode 100644 index 85f7e291..00000000 --- a/src/couchdb/couch_db.erl +++ /dev/null @@ -1,1217 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(couch_db). --behaviour(gen_server). - --export([open/2,open_int/2,close/1,create/2,start_compact/1,get_db_info/1,get_design_docs/1]). --export([open_ref_counted/2,is_idle/1,monitor/1,count_changes_since/2]). --export([update_doc/3,update_doc/4,update_docs/4,update_docs/2,update_docs/3,delete_doc/3]). --export([get_doc_info/2,open_doc/2,open_doc/3,open_doc_revs/4]). --export([set_revs_limit/2,get_revs_limit/1]). --export([get_missing_revs/2,name/1,doc_to_tree/1,get_update_seq/1,get_committed_update_seq/1]). --export([enum_docs/4,enum_docs_since/5]). --export([enum_docs_since_reduce_to_count/1,enum_docs_reduce_to_count/1]). --export([increment_update_seq/1,get_purge_seq/1,purge_docs/2,get_last_purged/1]). --export([start_link/3,open_doc_int/3,ensure_full_commit/1]). --export([set_security/2,get_security/1]). --export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]). --export([changes_since/5,changes_since/6,read_doc/2,new_revid/1]). --export([check_is_admin/1, check_is_reader/1]). --export([reopen/1]). - --include("couch_db.hrl"). - - -start_link(DbName, Filepath, Options) -> - case open_db_file(Filepath, Options) of - {ok, Fd} -> - StartResult = gen_server:start_link(couch_db, {DbName, Filepath, Fd, Options}, []), - unlink(Fd), - StartResult; - Else -> - Else - end. - -open_db_file(Filepath, Options) -> - case couch_file:open(Filepath, Options) of - {ok, Fd} -> - {ok, Fd}; - {error, enoent} -> - % couldn't find file. is there a compact version? This can happen if - % crashed during the file switch. - case couch_file:open(Filepath ++ ".compact") of - {ok, Fd} -> - ?LOG_INFO("Found ~s~s compaction file, using as primary storage.", [Filepath, ".compact"]), - ok = file:rename(Filepath ++ ".compact", Filepath), - ok = couch_file:sync(Fd), - {ok, Fd}; - {error, enoent} -> - {not_found, no_db_file} - end; - Error -> - Error - end. - - -create(DbName, Options) -> - couch_server:create(DbName, Options). - -% this is for opening a database for internal purposes like the replicator -% or the view indexer. it never throws a reader error. -open_int(DbName, Options) -> - couch_server:open(DbName, Options). - -% this should be called anytime an http request opens the database. -% it ensures that the http userCtx is a valid reader -open(DbName, Options) -> - case couch_server:open(DbName, Options) of - {ok, Db} -> - try - check_is_reader(Db), - {ok, Db} - catch - throw:Error -> - close(Db), - throw(Error) - end; - Else -> Else - end. - -reopen(#db{main_pid = Pid, fd_ref_counter = OldRefCntr, user_ctx = UserCtx}) -> - {ok, #db{fd_ref_counter = NewRefCntr} = NewDb} = - gen_server:call(Pid, get_db, infinity), - case NewRefCntr =:= OldRefCntr of - true -> - ok; - false -> - couch_ref_counter:add(NewRefCntr), - catch couch_ref_counter:drop(OldRefCntr) - end, - {ok, NewDb#db{user_ctx = UserCtx}}. - -ensure_full_commit(#db{update_pid=UpdatePid,instance_start_time=StartTime}) -> - ok = gen_server:call(UpdatePid, full_commit, infinity), - {ok, StartTime}. - -close(#db{fd_ref_counter=RefCntr}) -> - couch_ref_counter:drop(RefCntr). - -open_ref_counted(MainPid, OpenedPid) -> - gen_server:call(MainPid, {open_ref_count, OpenedPid}). - -is_idle(MainPid) -> - gen_server:call(MainPid, is_idle). - -monitor(#db{main_pid=MainPid}) -> - erlang:monitor(process, MainPid). - -start_compact(#db{update_pid=Pid}) -> - gen_server:call(Pid, start_compact). - -delete_doc(Db, Id, Revisions) -> - DeletedDocs = [#doc{id=Id, revs=[Rev], deleted=true} || Rev <- Revisions], - {ok, [Result]} = update_docs(Db, DeletedDocs, []), - {ok, Result}. - -open_doc(Db, IdOrDocInfo) -> - open_doc(Db, IdOrDocInfo, []). - -open_doc(Db, Id, Options) -> - increment_stat(Db, {couchdb, database_reads}), - case open_doc_int(Db, Id, Options) of - {ok, #doc{deleted=true}=Doc} -> - case lists:member(deleted, Options) of - true -> - apply_open_options({ok, Doc},Options); - false -> - {not_found, deleted} - end; - Else -> - apply_open_options(Else,Options) - end. - -apply_open_options({ok, Doc},Options) -> - apply_open_options2(Doc,Options); -apply_open_options(Else,_Options) -> - Else. - -apply_open_options2(Doc,[]) -> - {ok, Doc}; -apply_open_options2(#doc{atts=Atts,revs=Revs}=Doc, - [{atts_since, PossibleAncestors}|Rest]) -> - RevPos = find_ancestor_rev_pos(Revs, PossibleAncestors), - apply_open_options2(Doc#doc{atts=[A#att{data= - if AttPos>RevPos -> Data; true -> stub end} - || #att{revpos=AttPos,data=Data}=A <- Atts]}, Rest); -apply_open_options2(Doc,[_|Rest]) -> - apply_open_options2(Doc,Rest). - - -find_ancestor_rev_pos({_, []}, _AttsSinceRevs) -> - 0; -find_ancestor_rev_pos(_DocRevs, []) -> - 0; -find_ancestor_rev_pos({RevPos, [RevId|Rest]}, AttsSinceRevs) -> - case lists:member({RevPos, RevId}, AttsSinceRevs) of - true -> - RevPos; - false -> - find_ancestor_rev_pos({RevPos - 1, Rest}, AttsSinceRevs) - end. - -open_doc_revs(Db, Id, Revs, Options) -> - increment_stat(Db, {couchdb, database_reads}), - [{ok, Results}] = open_doc_revs_int(Db, [{Id, Revs}], Options), - {ok, [apply_open_options(Result, Options) || Result <- Results]}. - -% Each returned result is a list of tuples: -% {Id, MissingRevs, PossibleAncestors} -% if no revs are missing, it's omitted from the results. -get_missing_revs(Db, IdRevsList) -> - Results = get_full_doc_infos(Db, [Id1 || {Id1, _Revs} <- IdRevsList]), - {ok, find_missing(IdRevsList, Results)}. - -find_missing([], []) -> - []; -find_missing([{Id, Revs}|RestIdRevs], [{ok, FullInfo} | RestLookupInfo]) -> - case couch_key_tree:find_missing(FullInfo#full_doc_info.rev_tree, Revs) of - [] -> - find_missing(RestIdRevs, RestLookupInfo); - MissingRevs -> - #doc_info{revs=RevsInfo} = couch_doc:to_doc_info(FullInfo), - LeafRevs = [Rev || #rev_info{rev=Rev} <- RevsInfo], - % Find the revs that are possible parents of this rev - PossibleAncestors = - lists:foldl(fun({LeafPos, LeafRevId}, Acc) -> - % this leaf is a "possible ancenstor" of the missing - % revs if this LeafPos lessthan any of the missing revs - case lists:any(fun({MissingPos, _}) -> - LeafPos < MissingPos end, MissingRevs) of - true -> - [{LeafPos, LeafRevId} | Acc]; - false -> - Acc - end - end, [], LeafRevs), - [{Id, MissingRevs, PossibleAncestors} | - find_missing(RestIdRevs, RestLookupInfo)] - end; -find_missing([{Id, Revs}|RestIdRevs], [not_found | RestLookupInfo]) -> - [{Id, Revs, []} | find_missing(RestIdRevs, RestLookupInfo)]. - -get_doc_info(Db, Id) -> - case get_full_doc_info(Db, Id) of - {ok, DocInfo} -> - {ok, couch_doc:to_doc_info(DocInfo)}; - Else -> - Else - end. - -% returns {ok, DocInfo} or not_found -get_full_doc_info(Db, Id) -> - [Result] = get_full_doc_infos(Db, [Id]), - Result. - -get_full_doc_infos(Db, Ids) -> - couch_btree:lookup(Db#db.fulldocinfo_by_id_btree, Ids). - -increment_update_seq(#db{update_pid=UpdatePid}) -> - gen_server:call(UpdatePid, increment_update_seq). - -purge_docs(#db{update_pid=UpdatePid}, IdsRevs) -> - gen_server:call(UpdatePid, {purge_docs, IdsRevs}). - -get_committed_update_seq(#db{committed_update_seq=Seq}) -> - Seq. - -get_update_seq(#db{update_seq=Seq})-> - Seq. - -get_purge_seq(#db{header=#db_header{purge_seq=PurgeSeq}})-> - PurgeSeq. - -get_last_purged(#db{header=#db_header{purged_docs=nil}}) -> - {ok, []}; -get_last_purged(#db{fd=Fd, header=#db_header{purged_docs=PurgedPointer}}) -> - couch_file:pread_term(Fd, PurgedPointer). - -get_db_info(Db) -> - #db{fd=Fd, - header=#db_header{disk_version=DiskVersion}, - compactor_pid=Compactor, - update_seq=SeqNum, - name=Name, - fulldocinfo_by_id_btree=FullDocBtree, - instance_start_time=StartTime, - committed_update_seq=CommittedUpdateSeq} = Db, - {ok, Size} = couch_file:bytes(Fd), - {ok, {Count, DelCount}} = couch_btree:full_reduce(FullDocBtree), - InfoList = [ - {db_name, Name}, - {doc_count, Count}, - {doc_del_count, DelCount}, - {update_seq, SeqNum}, - {purge_seq, couch_db:get_purge_seq(Db)}, - {compact_running, Compactor/=nil}, - {disk_size, Size}, - {instance_start_time, StartTime}, - {disk_format_version, DiskVersion}, - {committed_update_seq, CommittedUpdateSeq} - ], - {ok, InfoList}. - -get_design_docs(#db{fulldocinfo_by_id_btree=Btree}=Db) -> - {ok,_, Docs} = couch_btree:fold(Btree, - fun(#full_doc_info{id= <<"_design/",_/binary>>}=FullDocInfo, _Reds, AccDocs) -> - {ok, Doc} = couch_db:open_doc_int(Db, FullDocInfo, []), - {ok, [Doc | AccDocs]}; - (_, _Reds, AccDocs) -> - {stop, AccDocs} - end, - [], [{start_key, <<"_design/">>}, {end_key_gt, <<"_design0">>}]), - {ok, Docs}. - -check_is_admin(#db{user_ctx=#user_ctx{name=Name,roles=Roles}}=Db) -> - {Admins} = get_admins(Db), - AdminRoles = [<<"_admin">> | couch_util:get_value(<<"roles">>, Admins, [])], - AdminNames = couch_util:get_value(<<"names">>, Admins,[]), - case AdminRoles -- Roles of - AdminRoles -> % same list, not an admin role - case AdminNames -- [Name] of - AdminNames -> % same names, not an admin - throw({unauthorized, <<"You are not a db or server admin.">>}); - _ -> - ok - end; - _ -> - ok - end. - -check_is_reader(#db{user_ctx=#user_ctx{name=Name,roles=Roles}=UserCtx}=Db) -> - case (catch check_is_admin(Db)) of - ok -> ok; - _ -> - {Readers} = get_readers(Db), - ReaderRoles = couch_util:get_value(<<"roles">>, Readers,[]), - WithAdminRoles = [<<"_admin">> | ReaderRoles], - ReaderNames = couch_util:get_value(<<"names">>, Readers,[]), - case ReaderRoles ++ ReaderNames of - [] -> ok; % no readers == public access - _Else -> - case WithAdminRoles -- Roles of - WithAdminRoles -> % same list, not an reader role - case ReaderNames -- [Name] of - ReaderNames -> % same names, not a reader - ?LOG_DEBUG("Not a reader: UserCtx ~p vs Names ~p Roles ~p",[UserCtx, ReaderNames, WithAdminRoles]), - throw({unauthorized, <<"You are not authorized to access this db.">>}); - _ -> - ok - end; - _ -> - ok - end - end - end. - -get_admins(#db{security=SecProps}) -> - couch_util:get_value(<<"admins">>, SecProps, {[]}). - -get_readers(#db{security=SecProps}) -> - couch_util:get_value(<<"readers">>, SecProps, {[]}). - -get_security(#db{security=SecProps}) -> - {SecProps}. - -set_security(#db{update_pid=Pid}=Db, {NewSecProps}) when is_list(NewSecProps) -> - check_is_admin(Db), - ok = validate_security_object(NewSecProps), - ok = gen_server:call(Pid, {set_security, NewSecProps}, infinity), - {ok, _} = ensure_full_commit(Db), - ok; -set_security(_, _) -> - throw(bad_request). - -validate_security_object(SecProps) -> - Admins = couch_util:get_value(<<"admins">>, SecProps, {[]}), - Readers = couch_util:get_value(<<"readers">>, SecProps, {[]}), - ok = validate_names_and_roles(Admins), - ok = validate_names_and_roles(Readers), - ok. - -% validate user input -validate_names_and_roles({Props}) when is_list(Props) -> - case couch_util:get_value(<<"names">>,Props,[]) of - Ns when is_list(Ns) -> - [throw("names must be a JSON list of strings") ||N <- Ns, not is_binary(N)], - Ns; - _ -> throw("names must be a JSON list of strings") - end, - case couch_util:get_value(<<"roles">>,Props,[]) of - Rs when is_list(Rs) -> - [throw("roles must be a JSON list of strings") ||R <- Rs, not is_binary(R)], - Rs; - _ -> throw("roles must be a JSON list of strings") - end, - ok. - -get_revs_limit(#db{revs_limit=Limit}) -> - Limit. - -set_revs_limit(#db{update_pid=Pid}=Db, Limit) when Limit > 0 -> - check_is_admin(Db), - gen_server:call(Pid, {set_revs_limit, Limit}, infinity); -set_revs_limit(_Db, _Limit) -> - throw(invalid_revs_limit). - -name(#db{name=Name}) -> - Name. - -update_doc(Db, Doc, Options) -> - update_doc(Db, Doc, Options, interactive_edit). - -update_doc(Db, Doc, Options, UpdateType) -> - case update_docs(Db, [Doc], Options, UpdateType) of - {ok, [{ok, NewRev}]} -> - {ok, NewRev}; - {ok, [{{_Id, _Rev}, Error}]} -> - throw(Error); - {ok, [Error]} -> - throw(Error); - {ok, []} -> - % replication success - {Pos, [RevId | _]} = Doc#doc.revs, - {ok, {Pos, RevId}} - end. - -update_docs(Db, Docs) -> - update_docs(Db, Docs, []). - -% group_alike_docs groups the sorted documents into sublist buckets, by id. -% ([DocA, DocA, DocB, DocC], []) -> [[DocA, DocA], [DocB], [DocC]] -group_alike_docs(Docs) -> - Sorted = lists:sort(fun(#doc{id=A},#doc{id=B})-> A < B end, Docs), - group_alike_docs(Sorted, []). - -group_alike_docs([], Buckets) -> - lists:reverse(Buckets); -group_alike_docs([Doc|Rest], []) -> - group_alike_docs(Rest, [[Doc]]); -group_alike_docs([Doc|Rest], [Bucket|RestBuckets]) -> - [#doc{id=BucketId}|_] = Bucket, - case Doc#doc.id == BucketId of - true -> - % add to existing bucket - group_alike_docs(Rest, [[Doc|Bucket]|RestBuckets]); - false -> - % add to new bucket - group_alike_docs(Rest, [[Doc]|[Bucket|RestBuckets]]) - end. - -validate_doc_update(#db{}=Db, #doc{id= <<"_design/",_/binary>>}, _GetDiskDocFun) -> - catch check_is_admin(Db); -validate_doc_update(#db{validate_doc_funs=[]}, _Doc, _GetDiskDocFun) -> - ok; -validate_doc_update(_Db, #doc{id= <<"_local/",_/binary>>}, _GetDiskDocFun) -> - ok; -validate_doc_update(Db, Doc, GetDiskDocFun) -> - DiskDoc = GetDiskDocFun(), - JsonCtx = couch_util:json_user_ctx(Db), - SecObj = get_security(Db), - try [case Fun(Doc, DiskDoc, JsonCtx, SecObj) of - ok -> ok; - Error -> throw(Error) - end || Fun <- Db#db.validate_doc_funs], - ok - catch - throw:Error -> - Error - end. - - -prep_and_validate_update(Db, #doc{id=Id,revs={RevStart, Revs}}=Doc, - OldFullDocInfo, LeafRevsDict, AllowConflict) -> - case Revs of - [PrevRev|_] -> - case dict:find({RevStart, PrevRev}, LeafRevsDict) of - {ok, {Deleted, DiskSp, DiskRevs}} -> - case couch_doc:has_stubs(Doc) of - true -> - DiskDoc = make_doc(Db, Id, Deleted, DiskSp, DiskRevs), - Doc2 = couch_doc:merge_stubs(Doc, DiskDoc), - {validate_doc_update(Db, Doc2, fun() -> DiskDoc end), Doc2}; - false -> - LoadDiskDoc = fun() -> make_doc(Db,Id,Deleted,DiskSp,DiskRevs) end, - {validate_doc_update(Db, Doc, LoadDiskDoc), Doc} - end; - error when AllowConflict -> - couch_doc:merge_stubs(Doc, #doc{}), % will generate error if - % there are stubs - {validate_doc_update(Db, Doc, fun() -> nil end), Doc}; - error -> - {conflict, Doc} - end; - [] -> - % new doc, and we have existing revs. - % reuse existing deleted doc - if OldFullDocInfo#full_doc_info.deleted orelse AllowConflict -> - {validate_doc_update(Db, Doc, fun() -> nil end), Doc}; - true -> - {conflict, Doc} - end - end. - - - -prep_and_validate_updates(_Db, [], [], _AllowConflict, AccPrepped, - AccFatalErrors) -> - {AccPrepped, AccFatalErrors}; -prep_and_validate_updates(Db, [DocBucket|RestBuckets], [not_found|RestLookups], - AllowConflict, AccPrepped, AccErrors) -> - [#doc{id=Id}|_]=DocBucket, - % no existing revs are known, - {PreppedBucket, AccErrors3} = lists:foldl( - fun(#doc{revs=Revs}=Doc, {AccBucket, AccErrors2}) -> - case couch_doc:has_stubs(Doc) of - true -> - couch_doc:merge_stubs(Doc, #doc{}); % will throw exception - false -> ok - end, - case Revs of - {0, []} -> - case validate_doc_update(Db, Doc, fun() -> nil end) of - ok -> - {[Doc | AccBucket], AccErrors2}; - Error -> - {AccBucket, [{{Id, {0, []}}, Error} | AccErrors2]} - end; - _ -> - % old revs specified but none exist, a conflict - {AccBucket, [{{Id, Revs}, conflict} | AccErrors2]} - end - end, - {[], AccErrors}, DocBucket), - - prep_and_validate_updates(Db, RestBuckets, RestLookups, AllowConflict, - [PreppedBucket | AccPrepped], AccErrors3); -prep_and_validate_updates(Db, [DocBucket|RestBuckets], - [{ok, #full_doc_info{rev_tree=OldRevTree}=OldFullDocInfo}|RestLookups], - AllowConflict, AccPrepped, AccErrors) -> - Leafs = couch_key_tree:get_all_leafs(OldRevTree), - LeafRevsDict = dict:from_list([{{Start, RevId}, {Deleted, Sp, Revs}} || - {{Deleted, Sp, _Seq}, {Start, [RevId|_]}=Revs} <- Leafs]), - {PreppedBucket, AccErrors3} = lists:foldl( - fun(Doc, {Docs2Acc, AccErrors2}) -> - case prep_and_validate_update(Db, Doc, OldFullDocInfo, - LeafRevsDict, AllowConflict) of - {ok, Doc2} -> - {[Doc2 | Docs2Acc], AccErrors2}; - {Error, #doc{id=Id,revs=Revs}} -> - % Record the error - {Docs2Acc, [{{Id, Revs}, Error} |AccErrors2]} - end - end, - {[], AccErrors}, DocBucket), - prep_and_validate_updates(Db, RestBuckets, RestLookups, AllowConflict, - [PreppedBucket | AccPrepped], AccErrors3). - - -update_docs(Db, Docs, Options) -> - update_docs(Db, Docs, Options, interactive_edit). - - -prep_and_validate_replicated_updates(_Db, [], [], AccPrepped, AccErrors) -> - Errors2 = [{{Id, {Pos, Rev}}, Error} || - {#doc{id=Id,revs={Pos,[Rev|_]}}, Error} <- AccErrors], - {lists:reverse(AccPrepped), lists:reverse(Errors2)}; -prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldInfo], AccPrepped, AccErrors) -> - case OldInfo of - not_found -> - {ValidatedBucket, AccErrors3} = lists:foldl( - fun(Doc, {AccPrepped2, AccErrors2}) -> - case couch_doc:has_stubs(Doc) of - true -> - couch_doc:merge_stubs(Doc, #doc{}); % will throw exception - false -> ok - end, - case validate_doc_update(Db, Doc, fun() -> nil end) of - ok -> - {[Doc | AccPrepped2], AccErrors2}; - Error -> - {AccPrepped2, [{Doc, Error} | AccErrors2]} - end - end, - {[], AccErrors}, Bucket), - prep_and_validate_replicated_updates(Db, RestBuckets, RestOldInfo, [ValidatedBucket | AccPrepped], AccErrors3); - {ok, #full_doc_info{rev_tree=OldTree}} -> - NewRevTree = lists:foldl( - fun(NewDoc, AccTree) -> - {NewTree, _} = couch_key_tree:merge(AccTree, - couch_db:doc_to_tree(NewDoc), Db#db.revs_limit), - NewTree - end, - OldTree, Bucket), - Leafs = couch_key_tree:get_all_leafs_full(NewRevTree), - LeafRevsFullDict = dict:from_list( [{{Start, RevId}, FullPath} || {Start, [{RevId, _}|_]}=FullPath <- Leafs]), - {ValidatedBucket, AccErrors3} = - lists:foldl( - fun(#doc{id=Id,revs={Pos, [RevId|_]}}=Doc, {AccValidated, AccErrors2}) -> - case dict:find({Pos, RevId}, LeafRevsFullDict) of - {ok, {Start, Path}} -> - % our unflushed doc is a leaf node. Go back on the path - % to find the previous rev that's on disk. - - LoadPrevRevFun = fun() -> - make_first_doc_on_disk(Db,Id,Start-1, tl(Path)) - end, - - case couch_doc:has_stubs(Doc) of - true -> - DiskDoc = LoadPrevRevFun(), - Doc2 = couch_doc:merge_stubs(Doc, DiskDoc), - GetDiskDocFun = fun() -> DiskDoc end; - false -> - Doc2 = Doc, - GetDiskDocFun = LoadPrevRevFun - end, - - case validate_doc_update(Db, Doc2, GetDiskDocFun) of - ok -> - {[Doc2 | AccValidated], AccErrors2}; - Error -> - {AccValidated, [{Doc, Error} | AccErrors2]} - end; - _ -> - % this doc isn't a leaf or already exists in the tree. - % ignore but consider it a success. - {AccValidated, AccErrors2} - end - end, - {[], AccErrors}, Bucket), - prep_and_validate_replicated_updates(Db, RestBuckets, RestOldInfo, - [ValidatedBucket | AccPrepped], AccErrors3) - end. - - - -new_revid(#doc{body=Body,revs={OldStart,OldRevs}, - atts=Atts,deleted=Deleted}) -> - case [{N, T, M} || #att{name=N,type=T,md5=M} <- Atts, M =/= <<>>] of - Atts2 when length(Atts) =/= length(Atts2) -> - % We must have old style non-md5 attachments - ?l2b(integer_to_list(couch_util:rand32())); - Atts2 -> - OldRev = case OldRevs of [] -> 0; [OldRev0|_] -> OldRev0 end, - couch_util:md5(term_to_binary([Deleted, OldStart, OldRev, Body, Atts2])) - end. - -new_revs([], OutBuckets, IdRevsAcc) -> - {lists:reverse(OutBuckets), IdRevsAcc}; -new_revs([Bucket|RestBuckets], OutBuckets, IdRevsAcc) -> - {NewBucket, IdRevsAcc3} = lists:mapfoldl( - fun(#doc{id=Id,revs={Start, RevIds}}=Doc, IdRevsAcc2)-> - NewRevId = new_revid(Doc), - {Doc#doc{revs={Start+1, [NewRevId | RevIds]}}, - [{{Id, {Start, RevIds}}, {ok, {Start+1, NewRevId}}} | IdRevsAcc2]} - end, IdRevsAcc, Bucket), - new_revs(RestBuckets, [NewBucket|OutBuckets], IdRevsAcc3). - -check_dup_atts(#doc{atts=Atts}=Doc) -> - Atts2 = lists:sort(fun(#att{name=N1}, #att{name=N2}) -> N1 < N2 end, Atts), - check_dup_atts2(Atts2), - Doc. - -check_dup_atts2([#att{name=N}, #att{name=N} | _]) -> - throw({bad_request, <<"Duplicate attachments">>}); -check_dup_atts2([_ | Rest]) -> - check_dup_atts2(Rest); -check_dup_atts2(_) -> - ok. - - -update_docs(Db, Docs, Options, replicated_changes) -> - increment_stat(Db, {couchdb, database_writes}), - DocBuckets = group_alike_docs(Docs), - - case (Db#db.validate_doc_funs /= []) orelse - lists:any( - fun(#doc{id= <<?DESIGN_DOC_PREFIX, _/binary>>}) -> true; - (#doc{atts=Atts}) -> - Atts /= [] - end, Docs) of - true -> - Ids = [Id || [#doc{id=Id}|_] <- DocBuckets], - ExistingDocs = get_full_doc_infos(Db, Ids), - - {DocBuckets2, DocErrors} = - prep_and_validate_replicated_updates(Db, DocBuckets, ExistingDocs, [], []), - DocBuckets3 = [Bucket || [_|_]=Bucket <- DocBuckets2]; % remove empty buckets - false -> - DocErrors = [], - DocBuckets3 = DocBuckets - end, - DocBuckets4 = [[doc_flush_atts(check_dup_atts(Doc), Db#db.fd) - || Doc <- Bucket] || Bucket <- DocBuckets3], - {ok, []} = write_and_commit(Db, DocBuckets4, [], [merge_conflicts | Options]), - {ok, DocErrors}; - -update_docs(Db, Docs, Options, interactive_edit) -> - increment_stat(Db, {couchdb, database_writes}), - AllOrNothing = lists:member(all_or_nothing, Options), - % go ahead and generate the new revision ids for the documents. - % separate out the NonRep documents from the rest of the documents - {Docs2, NonRepDocs} = lists:foldl( - fun(#doc{id=Id}=Doc, {DocsAcc, NonRepDocsAcc}) -> - case Id of - <<?LOCAL_DOC_PREFIX, _/binary>> -> - {DocsAcc, [Doc | NonRepDocsAcc]}; - Id-> - {[Doc | DocsAcc], NonRepDocsAcc} - end - end, {[], []}, Docs), - - DocBuckets = group_alike_docs(Docs2), - - case (Db#db.validate_doc_funs /= []) orelse - lists:any( - fun(#doc{id= <<?DESIGN_DOC_PREFIX, _/binary>>}) -> - true; - (#doc{atts=Atts}) -> - Atts /= [] - end, Docs2) of - true -> - % lookup the doc by id and get the most recent - Ids = [Id || [#doc{id=Id}|_] <- DocBuckets], - ExistingDocInfos = get_full_doc_infos(Db, Ids), - - {DocBucketsPrepped, PreCommitFailures} = prep_and_validate_updates(Db, - DocBuckets, ExistingDocInfos, AllOrNothing, [], []), - - % strip out any empty buckets - DocBuckets2 = [Bucket || [_|_] = Bucket <- DocBucketsPrepped]; - false -> - PreCommitFailures = [], - DocBuckets2 = DocBuckets - end, - - if (AllOrNothing) and (PreCommitFailures /= []) -> - {aborted, lists:map( - fun({{Id,{Pos, [RevId|_]}}, Error}) -> - {{Id, {Pos, RevId}}, Error}; - ({{Id,{0, []}}, Error}) -> - {{Id, {0, <<>>}}, Error} - end, PreCommitFailures)}; - true -> - Options2 = if AllOrNothing -> [merge_conflicts]; - true -> [] end ++ Options, - DocBuckets3 = [[ - doc_flush_atts(set_new_att_revpos( - check_dup_atts(Doc)), Db#db.fd) - || Doc <- B] || B <- DocBuckets2], - {DocBuckets4, IdRevs} = new_revs(DocBuckets3, [], []), - - {ok, CommitResults} = write_and_commit(Db, DocBuckets4, NonRepDocs, Options2), - - ResultsDict = dict:from_list(IdRevs ++ CommitResults ++ PreCommitFailures), - {ok, lists:map( - fun(#doc{id=Id,revs={Pos, RevIds}}) -> - {ok, Result} = dict:find({Id, {Pos, RevIds}}, ResultsDict), - Result - end, Docs)} - end. - -% Returns the first available document on disk. Input list is a full rev path -% for the doc. -make_first_doc_on_disk(_Db, _Id, _Pos, []) -> - nil; -make_first_doc_on_disk(Db, Id, Pos, [{_Rev, #doc{}} | RestPath]) -> - make_first_doc_on_disk(Db, Id, Pos-1, RestPath); -make_first_doc_on_disk(Db, Id, Pos, [{_Rev, ?REV_MISSING}|RestPath]) -> - make_first_doc_on_disk(Db, Id, Pos - 1, RestPath); -make_first_doc_on_disk(Db, Id, Pos, [{_Rev, {IsDel, Sp, _Seq}} |_]=DocPath) -> - Revs = [Rev || {Rev, _} <- DocPath], - make_doc(Db, Id, IsDel, Sp, {Pos, Revs}). - -set_commit_option(Options) -> - CommitSettings = { - [true || O <- Options, O==full_commit orelse O==delay_commit], - couch_config:get("couchdb", "delayed_commits", "false") - }, - case CommitSettings of - {[true], _} -> - Options; % user requested explicit commit setting, do not change it - {_, "true"} -> - Options; % delayed commits are enabled, do nothing - {_, "false"} -> - [full_commit|Options]; - {_, Else} -> - ?LOG_ERROR("[couchdb] delayed_commits setting must be true/false, not ~p", - [Else]), - [full_commit|Options] - end. - -collect_results(UpdatePid, MRef, ResultsAcc) -> - receive - {result, UpdatePid, Result} -> - collect_results(UpdatePid, MRef, [Result | ResultsAcc]); - {done, UpdatePid} -> - {ok, ResultsAcc}; - {retry, UpdatePid} -> - retry; - {'DOWN', MRef, _, _, Reason} -> - exit(Reason) - end. - -write_and_commit(#db{update_pid=UpdatePid}=Db, DocBuckets, - NonRepDocs, Options0) -> - Options = set_commit_option(Options0), - MergeConflicts = lists:member(merge_conflicts, Options), - FullCommit = lists:member(full_commit, Options), - MRef = erlang:monitor(process, UpdatePid), - try - UpdatePid ! {update_docs, self(), DocBuckets, NonRepDocs, MergeConflicts, FullCommit}, - case collect_results(UpdatePid, MRef, []) of - {ok, Results} -> {ok, Results}; - retry -> - % This can happen if the db file we wrote to was swapped out by - % compaction. Retry by reopening the db and writing to the current file - {ok, Db2} = open_ref_counted(Db#db.main_pid, self()), - DocBuckets2 = [[doc_flush_atts(Doc, Db2#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets], - % We only retry once - close(Db2), - UpdatePid ! {update_docs, self(), DocBuckets2, NonRepDocs, MergeConflicts, FullCommit}, - case collect_results(UpdatePid, MRef, []) of - {ok, Results} -> {ok, Results}; - retry -> throw({update_error, compaction_retry}) - end - end - after - erlang:demonitor(MRef, [flush]) - end. - - -set_new_att_revpos(#doc{revs={RevPos,_Revs},atts=Atts}=Doc) -> - Doc#doc{atts= lists:map(fun(#att{data={_Fd,_Sp}}=Att) -> - % already commited to disk, do not set new rev - Att; - (Att) -> - Att#att{revpos=RevPos+1} - end, Atts)}. - - -doc_flush_atts(Doc, Fd) -> - Doc#doc{atts=[flush_att(Fd, Att) || Att <- Doc#doc.atts]}. - -check_md5(_NewSig, <<>>) -> ok; -check_md5(Sig, Sig) -> ok; -check_md5(_, _) -> throw(md5_mismatch). - -flush_att(Fd, #att{data={Fd0, _}}=Att) when Fd0 == Fd -> - % already written to our file, nothing to write - Att; - -flush_att(Fd, #att{data={OtherFd,StreamPointer}, md5=InMd5, - disk_len=InDiskLen} = Att) -> - {NewStreamData, Len, _IdentityLen, Md5, IdentityMd5} = - couch_stream:copy_to_new_stream(OtherFd, StreamPointer, Fd), - check_md5(IdentityMd5, InMd5), - Att#att{data={Fd, NewStreamData}, md5=Md5, att_len=Len, disk_len=InDiskLen}; - -flush_att(Fd, #att{data=Data}=Att) when is_binary(Data) -> - with_stream(Fd, Att, fun(OutputStream) -> - couch_stream:write(OutputStream, Data) - end); - -flush_att(Fd, #att{data=Fun,att_len=undefined}=Att) when is_function(Fun) -> - with_stream(Fd, Att, fun(OutputStream) -> - % Fun(MaxChunkSize, WriterFun) must call WriterFun - % once for each chunk of the attachment, - Fun(4096, - % WriterFun({Length, Binary}, State) - % WriterFun({0, _Footers}, State) - % Called with Length == 0 on the last time. - % WriterFun returns NewState. - fun({0, Footers}, _) -> - F = mochiweb_headers:from_binary(Footers), - case mochiweb_headers:get_value("Content-MD5", F) of - undefined -> - ok; - Md5 -> - {md5, base64:decode(Md5)} - end; - ({_Length, Chunk}, _) -> - couch_stream:write(OutputStream, Chunk) - end, ok) - end); - -flush_att(Fd, #att{data=Fun,att_len=AttLen}=Att) when is_function(Fun) -> - with_stream(Fd, Att, fun(OutputStream) -> - write_streamed_attachment(OutputStream, Fun, AttLen) - end). - -% From RFC 2616 3.6.1 - Chunked Transfer Coding -% -% In other words, the origin server is willing to accept -% the possibility that the trailer fields might be silently -% discarded along the path to the client. -% -% I take this to mean that if "Trailers: Content-MD5\r\n" -% is present in the request, but there is no Content-MD5 -% trailer, we're free to ignore this inconsistency and -% pretend that no Content-MD5 exists. -with_stream(Fd, #att{md5=InMd5,type=Type,encoding=Enc}=Att, Fun) -> - {ok, OutputStream} = case (Enc =:= identity) andalso - couch_util:compressible_att_type(Type) of - true -> - CompLevel = list_to_integer( - couch_config:get("attachments", "compression_level", "0") - ), - couch_stream:open(Fd, gzip, [{compression_level, CompLevel}]); - _ -> - couch_stream:open(Fd) - end, - ReqMd5 = case Fun(OutputStream) of - {md5, FooterMd5} -> - case InMd5 of - md5_in_footer -> FooterMd5; - _ -> InMd5 - end; - _ -> - InMd5 - end, - {StreamInfo, Len, IdentityLen, Md5, IdentityMd5} = - couch_stream:close(OutputStream), - check_md5(IdentityMd5, ReqMd5), - {AttLen, DiskLen, NewEnc} = case Enc of - identity -> - case {Md5, IdentityMd5} of - {Same, Same} -> - {Len, IdentityLen, identity}; - _ -> - {Len, IdentityLen, gzip} - end; - gzip -> - case {Att#att.att_len, Att#att.disk_len} of - {AL, DL} when AL =:= undefined orelse DL =:= undefined -> - % Compressed attachment uploaded through the standalone API. - {Len, Len, gzip}; - {AL, DL} -> - % This case is used for efficient push-replication, where a - % compressed attachment is located in the body of multipart - % content-type request. - {AL, DL, gzip} - end - end, - Att#att{ - data={Fd,StreamInfo}, - att_len=AttLen, - disk_len=DiskLen, - md5=Md5, - encoding=NewEnc - }. - - -write_streamed_attachment(_Stream, _F, 0) -> - ok; -write_streamed_attachment(Stream, F, LenLeft) when LenLeft > 0 -> - Bin = read_next_chunk(F, LenLeft), - ok = couch_stream:write(Stream, Bin), - write_streamed_attachment(Stream, F, LenLeft - size(Bin)). - -read_next_chunk(F, _) when is_function(F, 0) -> - F(); -read_next_chunk(F, LenLeft) when is_function(F, 1) -> - F(lists:min([LenLeft, 16#2000])). - -enum_docs_since_reduce_to_count(Reds) -> - couch_btree:final_reduce( - fun couch_db_updater:btree_by_seq_reduce/2, Reds). - -enum_docs_reduce_to_count(Reds) -> - {Count, _DelCount} = couch_btree:final_reduce( - fun couch_db_updater:btree_by_id_reduce/2, Reds), - Count. - -changes_since(Db, Style, StartSeq, Fun, Acc) -> - changes_since(Db, Style, StartSeq, Fun, [], Acc). - -changes_since(Db, Style, StartSeq, Fun, Options, Acc) -> - Wrapper = fun(DocInfo, _Offset, Acc2) -> - #doc_info{revs=Revs} = DocInfo, - DocInfo2 = - case Style of - main_only -> - DocInfo; - all_docs -> - % remove revs before the seq - DocInfo#doc_info{revs=[RevInfo || - #rev_info{seq=RevSeq}=RevInfo <- Revs, StartSeq < RevSeq]} - end, - Fun(DocInfo2, Acc2) - end, - {ok, _LastReduction, AccOut} = couch_btree:fold(Db#db.docinfo_by_seq_btree, - Wrapper, Acc, [{start_key, StartSeq + 1}] ++ Options), - {ok, AccOut}. - -count_changes_since(Db, SinceSeq) -> - {ok, Changes} = - couch_btree:fold_reduce(Db#db.docinfo_by_seq_btree, - fun(_SeqStart, PartialReds, 0) -> - {ok, couch_btree:final_reduce(Db#db.docinfo_by_seq_btree, PartialReds)} - end, - 0, [{start_key, SinceSeq + 1}]), - Changes. - -enum_docs_since(Db, SinceSeq, InFun, Acc, Options) -> - {ok, LastReduction, AccOut} = couch_btree:fold(Db#db.docinfo_by_seq_btree, InFun, Acc, [{start_key, SinceSeq + 1} | Options]), - {ok, enum_docs_since_reduce_to_count(LastReduction), AccOut}. - -enum_docs(Db, InFun, InAcc, Options) -> - {ok, LastReduce, OutAcc} = couch_btree:fold(Db#db.fulldocinfo_by_id_btree, InFun, InAcc, Options), - {ok, enum_docs_reduce_to_count(LastReduce), OutAcc}. - -% server functions - -init({DbName, Filepath, Fd, Options}) -> - {ok, UpdaterPid} = gen_server:start_link(couch_db_updater, {self(), DbName, Filepath, Fd, Options}, []), - {ok, #db{fd_ref_counter=RefCntr}=Db} = gen_server:call(UpdaterPid, get_db), - couch_ref_counter:add(RefCntr), - case lists:member(sys_db, Options) of - true -> - ok; - false -> - couch_stats_collector:track_process_count({couchdb, open_databases}) - end, - process_flag(trap_exit, true), - {ok, Db}. - -terminate(_Reason, Db) -> - couch_util:shutdown_sync(Db#db.update_pid), - ok. - -handle_call({open_ref_count, OpenerPid}, _, #db{fd_ref_counter=RefCntr}=Db) -> - ok = couch_ref_counter:add(RefCntr, OpenerPid), - {reply, {ok, Db}, Db}; -handle_call(is_idle, _From, #db{fd_ref_counter=RefCntr, compactor_pid=Compact, - waiting_delayed_commit=Delay}=Db) -> - % Idle means no referrers. Unless in the middle of a compaction file switch, - % there are always at least 2 referrers, couch_db_updater and us. - {reply, (Delay == nil) andalso (Compact == nil) andalso (couch_ref_counter:count(RefCntr) == 2), Db}; -handle_call({db_updated, NewDb}, _From, #db{fd_ref_counter=OldRefCntr}) -> - #db{fd_ref_counter=NewRefCntr}=NewDb, - case NewRefCntr =:= OldRefCntr of - true -> ok; - false -> - couch_ref_counter:add(NewRefCntr), - couch_ref_counter:drop(OldRefCntr) - end, - {reply, ok, NewDb}; -handle_call(get_db, _From, Db) -> - {reply, {ok, Db}, Db}. - - -handle_cast(Msg, Db) -> - ?LOG_ERROR("Bad cast message received for db ~s: ~p", [Db#db.name, Msg]), - exit({error, Msg}). - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -handle_info({'EXIT', _Pid, normal}, Db) -> - {noreply, Db}; -handle_info({'EXIT', _Pid, Reason}, Server) -> - {stop, Reason, Server}; -handle_info(Msg, Db) -> - ?LOG_ERROR("Bad message received for db ~s: ~p", [Db#db.name, Msg]), - exit({error, Msg}). - - -%%% Internal function %%% -open_doc_revs_int(Db, IdRevs, Options) -> - Ids = [Id || {Id, _Revs} <- IdRevs], - LookupResults = get_full_doc_infos(Db, Ids), - lists:zipwith( - fun({Id, Revs}, Lookup) -> - case Lookup of - {ok, #full_doc_info{rev_tree=RevTree}} -> - {FoundRevs, MissingRevs} = - case Revs of - all -> - {couch_key_tree:get_all_leafs(RevTree), []}; - _ -> - case lists:member(latest, Options) of - true -> - couch_key_tree:get_key_leafs(RevTree, Revs); - false -> - couch_key_tree:get(RevTree, Revs) - end - end, - FoundResults = - lists:map(fun({Value, {Pos, [Rev|_]}=FoundRevPath}) -> - case Value of - ?REV_MISSING -> - % we have the rev in our list but know nothing about it - {{not_found, missing}, {Pos, Rev}}; - {IsDeleted, SummaryPtr, _UpdateSeq} -> - {ok, make_doc(Db, Id, IsDeleted, SummaryPtr, FoundRevPath)} - end - end, FoundRevs), - Results = FoundResults ++ [{{not_found, missing}, MissingRev} || MissingRev <- MissingRevs], - {ok, Results}; - not_found when Revs == all -> - {ok, []}; - not_found -> - {ok, [{{not_found, missing}, Rev} || Rev <- Revs]} - end - end, - IdRevs, LookupResults). - -open_doc_int(Db, <<?LOCAL_DOC_PREFIX, _/binary>> = Id, _Options) -> - case couch_btree:lookup(Db#db.local_docs_btree, [Id]) of - [{ok, {_, {Rev, BodyData}}}] -> - {ok, #doc{id=Id, revs={0, [list_to_binary(integer_to_list(Rev))]}, body=BodyData}}; - [not_found] -> - {not_found, missing} - end; -open_doc_int(Db, #doc_info{id=Id,revs=[RevInfo|_]}=DocInfo, Options) -> - #rev_info{deleted=IsDeleted,rev={Pos,RevId},body_sp=Bp} = RevInfo, - Doc = make_doc(Db, Id, IsDeleted, Bp, {Pos,[RevId]}), - {ok, Doc#doc{meta=doc_meta_info(DocInfo, [], Options)}}; -open_doc_int(Db, #full_doc_info{id=Id,rev_tree=RevTree}=FullDocInfo, Options) -> - #doc_info{revs=[#rev_info{deleted=IsDeleted,rev=Rev,body_sp=Bp}|_]} = - DocInfo = couch_doc:to_doc_info(FullDocInfo), - {[{_, RevPath}], []} = couch_key_tree:get(RevTree, [Rev]), - Doc = make_doc(Db, Id, IsDeleted, Bp, RevPath), - {ok, Doc#doc{meta=doc_meta_info(DocInfo, RevTree, Options)}}; -open_doc_int(Db, Id, Options) -> - case get_full_doc_info(Db, Id) of - {ok, FullDocInfo} -> - open_doc_int(Db, FullDocInfo, Options); - not_found -> - {not_found, missing} - end. - -doc_meta_info(#doc_info{high_seq=Seq,revs=[#rev_info{rev=Rev}|RestInfo]}, RevTree, Options) -> - case lists:member(revs_info, Options) of - false -> []; - true -> - {[{Pos, RevPath}],[]} = - couch_key_tree:get_full_key_paths(RevTree, [Rev]), - - [{revs_info, Pos, lists:map( - fun({Rev1, {true, _Sp, _UpdateSeq}}) -> - {Rev1, deleted}; - ({Rev1, {false, _Sp, _UpdateSeq}}) -> - {Rev1, available}; - ({Rev1, ?REV_MISSING}) -> - {Rev1, missing} - end, RevPath)}] - end ++ - case lists:member(conflicts, Options) of - false -> []; - true -> - case [Rev1 || #rev_info{rev=Rev1,deleted=false} <- RestInfo] of - [] -> []; - ConflictRevs -> [{conflicts, ConflictRevs}] - end - end ++ - case lists:member(deleted_conflicts, Options) of - false -> []; - true -> - case [Rev1 || #rev_info{rev=Rev1,deleted=true} <- RestInfo] of - [] -> []; - DelConflictRevs -> [{deleted_conflicts, DelConflictRevs}] - end - end ++ - case lists:member(local_seq, Options) of - false -> []; - true -> [{local_seq, Seq}] - end. - -read_doc(#db{fd=Fd}, OldStreamPointer) when is_tuple(OldStreamPointer) -> - % 09 UPGRADE CODE - couch_stream:old_read_term(Fd, OldStreamPointer); -read_doc(#db{fd=Fd}, Pos) -> - couch_file:pread_term(Fd, Pos). - - -doc_to_tree(#doc{revs={Start, RevIds}}=Doc) -> - [Tree] = doc_to_tree_simple(Doc, lists:reverse(RevIds)), - {Start - length(RevIds) + 1, Tree}. - - -doc_to_tree_simple(Doc, [RevId]) -> - [{RevId, Doc, []}]; -doc_to_tree_simple(Doc, [RevId | Rest]) -> - [{RevId, ?REV_MISSING, doc_to_tree_simple(Doc, Rest)}]. - - -make_doc(#db{fd=Fd}=Db, Id, Deleted, Bp, RevisionPath) -> - {BodyData, Atts} = - case Bp of - nil -> - {[], []}; - _ -> - {ok, {BodyData0, Atts0}} = read_doc(Db, Bp), - {BodyData0, - lists:map( - fun({Name,Type,Sp,AttLen,DiskLen,RevPos,Md5,Enc}) -> - #att{name=Name, - type=Type, - att_len=AttLen, - disk_len=DiskLen, - md5=Md5, - revpos=RevPos, - data={Fd,Sp}, - encoding= - case Enc of - true -> - % 0110 UPGRADE CODE - gzip; - false -> - % 0110 UPGRADE CODE - identity; - _ -> - Enc - end - }; - ({Name,Type,Sp,AttLen,RevPos,Md5}) -> - #att{name=Name, - type=Type, - att_len=AttLen, - disk_len=AttLen, - md5=Md5, - revpos=RevPos, - data={Fd,Sp}}; - ({Name,{Type,Sp,AttLen}}) -> - #att{name=Name, - type=Type, - att_len=AttLen, - disk_len=AttLen, - md5= <<>>, - revpos=0, - data={Fd,Sp}} - end, Atts0)} - end, - #doc{ - id = Id, - revs = RevisionPath, - body = BodyData, - atts = Atts, - deleted = Deleted - }. - - -increment_stat(#db{is_sys_db = true}, _Stat) -> - ok; -increment_stat(#db{}, Stat) -> - couch_stats_collector:increment(Stat). |