diff options
Diffstat (limited to 'src/couchdb/couch_db.erl')
| -rw-r--r-- | src/couchdb/couch_db.erl | 370 |
1 files changed, 284 insertions, 86 deletions
diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl index 79e00ff8..4e945ad4 100644 --- a/src/couchdb/couch_db.erl +++ b/src/couchdb/couch_db.erl @@ -13,18 +13,21 @@ -module(couch_db). -behaviour(gen_server). --export([open/2,close/1,create/2,start_compact/1,get_db_info/1,get_design_docs/1]). +-export([open/2,open_int/2,close/1,create/2,start_compact/1,get_db_info/1,get_design_docs/1]). -export([open_ref_counted/2,is_idle/1,monitor/1,count_changes_since/2]). -export([update_doc/3,update_doc/4,update_docs/4,update_docs/2,update_docs/3,delete_doc/3]). -export([get_doc_info/2,open_doc/2,open_doc/3,open_doc_revs/4]). --export([set_revs_limit/2,get_revs_limit/1,register_update_notifier/3]). +-export([set_revs_limit/2,get_revs_limit/1]). -export([get_missing_revs/2,name/1,doc_to_tree/1,get_update_seq/1,get_committed_update_seq/1]). -export([enum_docs/4,enum_docs_since/5]). -export([enum_docs_since_reduce_to_count/1,enum_docs_reduce_to_count/1]). -export([increment_update_seq/1,get_purge_seq/1,purge_docs/2,get_last_purged/1]). --export([start_link/3,open_doc_int/3,set_admins/2,get_admins/1,ensure_full_commit/1]). +-export([start_link/3,open_doc_int/3,ensure_full_commit/1]). +-export([set_security/2,get_security/1]). -export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]). -export([changes_since/5,changes_since/6,read_doc/2,new_revid/1]). +-export([check_is_admin/1, check_is_reader/1]). +-export([reopen/1]). -include("couch_db.hrl"). @@ -63,9 +66,39 @@ open_db_file(Filepath, Options) -> create(DbName, Options) -> couch_server:create(DbName, Options). -open(DbName, Options) -> +% this is for opening a database for internal purposes like the replicator +% or the view indexer. it never throws a reader error. +open_int(DbName, Options) -> couch_server:open(DbName, Options). +% this should be called anytime an http request opens the database. +% it ensures that the http userCtx is a valid reader +open(DbName, Options) -> + case couch_server:open(DbName, Options) of + {ok, Db} -> + try + check_is_reader(Db), + {ok, Db} + catch + throw:Error -> + close(Db), + throw(Error) + end; + Else -> Else + end. + +reopen(#db{main_pid = Pid, fd_ref_counter = OldRefCntr, user_ctx = UserCtx}) -> + {ok, #db{fd_ref_counter = NewRefCntr} = NewDb} = + gen_server:call(Pid, get_db, infinity), + case NewRefCntr =:= OldRefCntr of + true -> + ok; + false -> + couch_ref_counter:add(NewRefCntr), + couch_ref_counter:drop(OldRefCntr) + end, + {ok, NewDb#db{user_ctx = UserCtx}}. + ensure_full_commit(#db{update_pid=UpdatePid,instance_start_time=StartTime}) -> ok = gen_server:call(UpdatePid, full_commit, infinity), {ok, StartTime}. @@ -73,9 +106,8 @@ ensure_full_commit(#db{update_pid=UpdatePid,instance_start_time=StartTime}) -> close(#db{fd_ref_counter=RefCntr}) -> couch_ref_counter:drop(RefCntr). -open_ref_counted(MainPid, UserCtx) -> - {ok, Db} = gen_server:call(MainPid, {open_ref_count, self()}), - {ok, Db#db{user_ctx=UserCtx}}. +open_ref_counted(MainPid, OpenedPid) -> + gen_server:call(MainPid, {open_ref_count, OpenedPid}). is_idle(MainPid) -> gen_server:call(MainPid, is_idle). @@ -83,11 +115,8 @@ is_idle(MainPid) -> monitor(#db{main_pid=MainPid}) -> erlang:monitor(process, MainPid). -register_update_notifier(#db{main_pid=Pid}, Seq, Fun) -> - gen_server:call(Pid, {register_update_notifier, Seq, Fun}). - start_compact(#db{update_pid=Pid}) -> - gen_server:cast(Pid, start_compact). + gen_server:call(Pid, start_compact). delete_doc(Db, Id, Revisions) -> DeletedDocs = [#doc{id=Id, revs=[Rev], deleted=true} || Rev <- Revisions], @@ -98,23 +127,52 @@ open_doc(Db, IdOrDocInfo) -> open_doc(Db, IdOrDocInfo, []). open_doc(Db, Id, Options) -> - couch_stats_collector:increment({couchdb, database_reads}), + increment_stat(Db, {couchdb, database_reads}), case open_doc_int(Db, Id, Options) of {ok, #doc{deleted=true}=Doc} -> case lists:member(deleted, Options) of true -> - {ok, Doc}; + apply_open_options({ok, Doc},Options); false -> {not_found, deleted} end; Else -> - Else + apply_open_options(Else,Options) + end. + +apply_open_options({ok, Doc},Options) -> + apply_open_options2(Doc,Options); +apply_open_options(Else,_Options) -> + Else. + +apply_open_options2(Doc,[]) -> + {ok, Doc}; +apply_open_options2(#doc{atts=Atts,revs=Revs}=Doc, + [{atts_since, PossibleAncestors}|Rest]) -> + RevPos = find_ancestor_rev_pos(Revs, PossibleAncestors), + apply_open_options2(Doc#doc{atts=[A#att{data= + if AttPos>RevPos -> Data; true -> stub end} + || #att{revpos=AttPos,data=Data}=A <- Atts]}, Rest); +apply_open_options2(Doc,[_|Rest]) -> + apply_open_options2(Doc,Rest). + + +find_ancestor_rev_pos({_, []}, _AttsSinceRevs) -> + 0; +find_ancestor_rev_pos(_DocRevs, []) -> + 0; +find_ancestor_rev_pos({RevPos, [RevId|Rest]}, AttsSinceRevs) -> + case lists:member({RevPos, RevId}, AttsSinceRevs) of + true -> + RevPos; + false -> + find_ancestor_rev_pos({RevPos - 1, Rest}, AttsSinceRevs) end. open_doc_revs(Db, Id, Revs, Options) -> - couch_stats_collector:increment({couchdb, database_reads}), - [Result] = open_doc_revs_int(Db, [{Id, Revs}], Options), - Result. + increment_stat(Db, {couchdb, database_reads}), + [{ok, Results}] = open_doc_revs_int(Db, [{Id, Revs}], Options), + {ok, [apply_open_options(Result, Options) || Result <- Results]}. % Each returned result is a list of tuples: % {Id, MissingRevs, PossibleAncestors} @@ -135,9 +193,9 @@ find_missing([{Id, Revs}|RestIdRevs], [{ok, FullInfo} | RestLookupInfo]) -> % Find the revs that are possible parents of this rev PossibleAncestors = lists:foldl(fun({LeafPos, LeafRevId}, Acc) -> - % this leaf is a "possible ancenstor" of the missing + % this leaf is a "possible ancenstor" of the missing % revs if this LeafPos lessthan any of the missing revs - case lists:any(fun({MissingPos, _}) -> + case lists:any(fun({MissingPos, _}) -> LeafPos < MissingPos end, MissingRevs) of true -> [{LeafPos, LeafRevId} | Acc]; @@ -194,7 +252,8 @@ get_db_info(Db) -> update_seq=SeqNum, name=Name, fulldocinfo_by_id_btree=FullDocBtree, - instance_start_time=StartTime} = Db, + instance_start_time=StartTime, + committed_update_seq=CommittedUpdateSeq} = Db, {ok, Size} = couch_file:bytes(Fd), {ok, {Count, DelCount}} = couch_btree:full_reduce(FullDocBtree), InfoList = [ @@ -206,7 +265,8 @@ get_db_info(Db) -> {compact_running, Compactor/=nil}, {disk_size, Size}, {instance_start_time, StartTime}, - {disk_format_version, DiskVersion} + {disk_format_version, DiskVersion}, + {committed_update_seq, CommittedUpdateSeq} ], {ok, InfoList}. @@ -221,22 +281,88 @@ get_design_docs(#db{fulldocinfo_by_id_btree=Btree}=Db) -> [], [{start_key, <<"_design/">>}, {end_key_gt, <<"_design0">>}]), {ok, Docs}. -check_is_admin(#db{admins=Admins, user_ctx=#user_ctx{name=Name,roles=Roles}}) -> - DbAdmins = [<<"_admin">> | Admins], - case DbAdmins -- [Name | Roles] of - DbAdmins -> % same list, not an admin - throw({unauthorized, <<"You are not a db or server admin.">>}); +check_is_admin(#db{user_ctx=#user_ctx{name=Name,roles=Roles}}=Db) -> + {Admins} = get_admins(Db), + AdminRoles = [<<"_admin">> | couch_util:get_value(<<"roles">>, Admins, [])], + AdminNames = couch_util:get_value(<<"names">>, Admins,[]), + case AdminRoles -- Roles of + AdminRoles -> % same list, not an admin role + case AdminNames -- [Name] of + AdminNames -> % same names, not an admin + throw({unauthorized, <<"You are not a db or server admin.">>}); + _ -> + ok + end; _ -> ok end. -get_admins(#db{admins=Admins}) -> - Admins. +check_is_reader(#db{user_ctx=#user_ctx{name=Name,roles=Roles}=UserCtx}=Db) -> + case (catch check_is_admin(Db)) of + ok -> ok; + _ -> + {Readers} = get_readers(Db), + ReaderRoles = couch_util:get_value(<<"roles">>, Readers,[]), + WithAdminRoles = [<<"_admin">> | ReaderRoles], + ReaderNames = couch_util:get_value(<<"names">>, Readers,[]), + case ReaderRoles ++ ReaderNames of + [] -> ok; % no readers == public access + _Else -> + case WithAdminRoles -- Roles of + WithAdminRoles -> % same list, not an reader role + case ReaderNames -- [Name] of + ReaderNames -> % same names, not a reader + ?LOG_DEBUG("Not a reader: UserCtx ~p vs Names ~p Roles ~p",[UserCtx, ReaderNames, WithAdminRoles]), + throw({unauthorized, <<"You are not authorized to access this db.">>}); + _ -> + ok + end; + _ -> + ok + end + end + end. + +get_admins(#db{security=SecProps}) -> + couch_util:get_value(<<"admins">>, SecProps, {[]}). + +get_readers(#db{security=SecProps}) -> + couch_util:get_value(<<"readers">>, SecProps, {[]}). -set_admins(#db{update_pid=Pid}=Db, Admins) when is_list(Admins) -> +get_security(#db{security=SecProps}) -> + {SecProps}. + +set_security(#db{update_pid=Pid}=Db, {NewSecProps}) when is_list(NewSecProps) -> check_is_admin(Db), - gen_server:call(Pid, {set_admins, Admins}, infinity). + ok = validate_security_object(NewSecProps), + ok = gen_server:call(Pid, {set_security, NewSecProps}, infinity), + {ok, _} = ensure_full_commit(Db), + ok; +set_security(_, _) -> + throw(bad_request). + +validate_security_object(SecProps) -> + Admins = couch_util:get_value(<<"admins">>, SecProps, {[]}), + Readers = couch_util:get_value(<<"readers">>, SecProps, {[]}), + ok = validate_names_and_roles(Admins), + ok = validate_names_and_roles(Readers), + ok. +% validate user input +validate_names_and_roles({Props}) when is_list(Props) -> + case couch_util:get_value(<<"names">>,Props,[]) of + Ns when is_list(Ns) -> + [throw("names must be a JSON list of strings") ||N <- Ns, not is_binary(N)], + Ns; + _ -> throw("names must be a JSON list of strings") + end, + case couch_util:get_value(<<"roles">>,Props,[]) of + Rs when is_list(Rs) -> + [throw("roles must be a JSON list of strings") ||R <- Rs, not is_binary(R)], + Rs; + _ -> throw("roles must be a JSON list of strings") + end, + ok. get_revs_limit(#db{revs_limit=Limit}) -> Limit. @@ -257,8 +383,14 @@ update_doc(Db, Doc, Options, UpdateType) -> case update_docs(Db, [Doc], Options, UpdateType) of {ok, [{ok, NewRev}]} -> {ok, NewRev}; + {ok, [{{_Id, _Rev}, Error}]} -> + throw(Error); {ok, [Error]} -> - throw(Error) + throw(Error); + {ok, []} -> + % replication success + {Pos, [RevId | _]} = Doc#doc.revs, + {ok, {Pos, RevId}} end. update_docs(Db, Docs) -> @@ -285,18 +417,8 @@ group_alike_docs([Doc|Rest], [Bucket|RestBuckets]) -> group_alike_docs(Rest, [[Doc]|[Bucket|RestBuckets]]) end. - -validate_doc_update(#db{user_ctx=UserCtx, admins=Admins}, - #doc{id= <<"_design/",_/binary>>}, _GetDiskDocFun) -> - UserNames = [UserCtx#user_ctx.name | UserCtx#user_ctx.roles], - % if the user is a server admin or db admin, allow the save - case length(UserNames -- [<<"_admin">> | Admins]) =:= length(UserNames) of - true -> - % not an admin - {unauthorized, <<"You are not a server or database admin.">>}; - false -> - ok - end; +validate_doc_update(#db{}=Db, #doc{id= <<"_design/",_/binary>>}, _GetDiskDocFun) -> + catch check_is_admin(Db); validate_doc_update(#db{validate_doc_funs=[]}, _Doc, _GetDiskDocFun) -> ok; validate_doc_update(_Db, #doc{id= <<"_local/",_/binary>>}, _GetDiskDocFun) -> @@ -304,7 +426,8 @@ validate_doc_update(_Db, #doc{id= <<"_local/",_/binary>>}, _GetDiskDocFun) -> validate_doc_update(Db, Doc, GetDiskDocFun) -> DiskDoc = GetDiskDocFun(), JsonCtx = couch_util:json_user_ctx(Db), - try [case Fun(Doc, DiskDoc, JsonCtx) of + SecObj = get_security(Db), + try [case Fun(Doc, DiskDoc, JsonCtx, SecObj) of ok -> ok; Error -> throw(Error) end || Fun <- Db#db.validate_doc_funs], @@ -352,15 +475,15 @@ prep_and_validate_update(Db, #doc{id=Id,revs={RevStart, Revs}}=Doc, prep_and_validate_updates(_Db, [], [], _AllowConflict, AccPrepped, AccFatalErrors) -> {AccPrepped, AccFatalErrors}; -prep_and_validate_updates(Db, [DocBucket|RestBuckets], [not_found|RestLookups], +prep_and_validate_updates(Db, [DocBucket|RestBuckets], [not_found|RestLookups], AllowConflict, AccPrepped, AccErrors) -> [#doc{id=Id}|_]=DocBucket, % no existing revs are known, {PreppedBucket, AccErrors3} = lists:foldl( - fun(#doc{revs=Revs}=Doc, {AccBucket, AccErrors2}) -> + fun(#doc{revs=Revs}=Doc, {AccBucket, AccErrors2}) -> case couch_doc:has_stubs(Doc) of true -> - couch_doc:merge_doc(Doc, #doc{}); % will throw exception + couch_doc:merge_stubs(Doc, #doc{}); % will throw exception false -> ok end, case Revs of @@ -398,12 +521,12 @@ prep_and_validate_updates(Db, [DocBucket|RestBuckets], end end, {[], AccErrors}, DocBucket), - prep_and_validate_updates(Db, RestBuckets, RestLookups, AllowConflict, + prep_and_validate_updates(Db, RestBuckets, RestLookups, AllowConflict, [PreppedBucket | AccPrepped], AccErrors3). -update_docs(#db{update_pid=UpdatePid}=Db, Docs, Options) -> - update_docs(#db{update_pid=UpdatePid}=Db, Docs, Options, interactive_edit). +update_docs(Db, Docs, Options) -> + update_docs(Db, Docs, Options, interactive_edit). prep_and_validate_replicated_updates(_Db, [], [], AccPrepped, AccErrors) -> @@ -414,10 +537,10 @@ prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldI case OldInfo of not_found -> {ValidatedBucket, AccErrors3} = lists:foldl( - fun(Doc, {AccPrepped2, AccErrors2}) -> + fun(Doc, {AccPrepped2, AccErrors2}) -> case couch_doc:has_stubs(Doc) of true -> - couch_doc:merge_doc(Doc, #doc{}); % will throw exception + couch_doc:merge_stubs(Doc, #doc{}); % will throw exception false -> ok end, case validate_doc_update(Db, Doc, fun() -> nil end) of @@ -432,7 +555,7 @@ prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldI {ok, #full_doc_info{rev_tree=OldTree}} -> NewRevTree = lists:foldl( fun(NewDoc, AccTree) -> - {NewTree, _} = couch_key_tree:merge(AccTree, [couch_db:doc_to_tree(NewDoc)]), + {NewTree, _} = couch_key_tree:merge(AccTree, couch_db:doc_to_tree(NewDoc)), NewTree end, OldTree, Bucket), @@ -487,7 +610,7 @@ new_revid(#doc{body=Body,revs={OldStart,OldRevs}, ?l2b(integer_to_list(couch_util:rand32())); Atts2 -> OldRev = case OldRevs of [] -> 0; [OldRev0|_] -> OldRev0 end, - erlang:md5(term_to_binary([Deleted, OldStart, OldRev, Body, Atts2])) + couch_util:md5(term_to_binary([Deleted, OldStart, OldRev, Body, Atts2])) end. new_revs([], OutBuckets, IdRevsAcc) -> @@ -515,7 +638,7 @@ check_dup_atts2(_) -> update_docs(Db, Docs, Options, replicated_changes) -> - couch_stats_collector:increment({couchdb, database_writes}), + increment_stat(Db, {couchdb, database_writes}), DocBuckets = group_alike_docs(Docs), case (Db#db.validate_doc_funs /= []) orelse @@ -541,7 +664,7 @@ update_docs(Db, Docs, Options, replicated_changes) -> {ok, DocErrors}; update_docs(Db, Docs, Options, interactive_edit) -> - couch_stats_collector:increment({couchdb, database_writes}), + increment_stat(Db, {couchdb, database_writes}), AllOrNothing = lists:member(all_or_nothing, Options), % go ahead and generate the new revision ids for the documents. % separate out the NonRep documents from the rest of the documents @@ -645,7 +768,7 @@ collect_results(UpdatePid, MRef, ResultsAcc) -> exit(Reason) end. -write_and_commit(#db{update_pid=UpdatePid, user_ctx=Ctx}=Db, DocBuckets, +write_and_commit(#db{update_pid=UpdatePid}=Db, DocBuckets, NonRepDocs, Options0) -> Options = set_commit_option(Options0), MergeConflicts = lists:member(merge_conflicts, Options), @@ -658,7 +781,7 @@ write_and_commit(#db{update_pid=UpdatePid, user_ctx=Ctx}=Db, DocBuckets, retry -> % This can happen if the db file we wrote to was swapped out by % compaction. Retry by reopening the db and writing to the current file - {ok, Db2} = open_ref_counted(Db#db.main_pid, Ctx), + {ok, Db2} = open_ref_counted(Db#db.main_pid, self()), DocBuckets2 = [[doc_flush_atts(Doc, Db2#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets], % We only retry once close(Db2), @@ -693,18 +816,19 @@ flush_att(Fd, #att{data={Fd0, _}}=Att) when Fd0 == Fd -> % already written to our file, nothing to write Att; -flush_att(Fd, #att{data={OtherFd,StreamPointer}, md5=InMd5}=Att) -> - {NewStreamData, Len, Md5} = +flush_att(Fd, #att{data={OtherFd,StreamPointer}, md5=InMd5, + disk_len=InDiskLen} = Att) -> + {NewStreamData, Len, _IdentityLen, Md5, IdentityMd5} = couch_stream:copy_to_new_stream(OtherFd, StreamPointer, Fd), - check_md5(Md5, InMd5), - Att#att{data={Fd, NewStreamData}, md5=Md5, len=Len}; + check_md5(IdentityMd5, InMd5), + Att#att{data={Fd, NewStreamData}, md5=Md5, att_len=Len, disk_len=InDiskLen}; flush_att(Fd, #att{data=Data}=Att) when is_binary(Data) -> with_stream(Fd, Att, fun(OutputStream) -> couch_stream:write(OutputStream, Data) end); -flush_att(Fd, #att{data=Fun,len=undefined}=Att) when is_function(Fun) -> +flush_att(Fd, #att{data=Fun,att_len=undefined}=Att) when is_function(Fun) -> with_stream(Fd, Att, fun(OutputStream) -> % Fun(MaxChunkSize, WriterFun) must call WriterFun % once for each chunk of the attachment, @@ -726,9 +850,9 @@ flush_att(Fd, #att{data=Fun,len=undefined}=Att) when is_function(Fun) -> end, ok) end); -flush_att(Fd, #att{data=Fun,len=Len}=Att) when is_function(Fun) -> +flush_att(Fd, #att{data=Fun,att_len=AttLen}=Att) when is_function(Fun) -> with_stream(Fd, Att, fun(OutputStream) -> - write_streamed_attachment(OutputStream, Fun, Len) + write_streamed_attachment(OutputStream, Fun, AttLen) end). % From RFC 2616 3.6.1 - Chunked Transfer Coding @@ -741,8 +865,17 @@ flush_att(Fd, #att{data=Fun,len=Len}=Att) when is_function(Fun) -> % is present in the request, but there is no Content-MD5 % trailer, we're free to ignore this inconsistency and % pretend that no Content-MD5 exists. -with_stream(Fd, #att{md5=InMd5}=Att, Fun) -> - {ok, OutputStream} = couch_stream:open(Fd), +with_stream(Fd, #att{md5=InMd5,type=Type,encoding=Enc}=Att, Fun) -> + {ok, OutputStream} = case (Enc =:= identity) andalso + couch_util:compressible_att_type(Type) of + true -> + CompLevel = list_to_integer( + couch_config:get("attachments", "compression_level", "0") + ), + couch_stream:open(Fd, gzip, [{compression_level, CompLevel}]); + _ -> + couch_stream:open(Fd) + end, ReqMd5 = case Fun(OutputStream) of {md5, FooterMd5} -> case InMd5 of @@ -752,9 +885,36 @@ with_stream(Fd, #att{md5=InMd5}=Att, Fun) -> _ -> InMd5 end, - {StreamInfo, Len, Md5} = couch_stream:close(OutputStream), - check_md5(Md5, ReqMd5), - Att#att{data={Fd,StreamInfo},len=Len,md5=Md5}. + {StreamInfo, Len, IdentityLen, Md5, IdentityMd5} = + couch_stream:close(OutputStream), + check_md5(IdentityMd5, ReqMd5), + {AttLen, DiskLen, NewEnc} = case Enc of + identity -> + case {Md5, IdentityMd5} of + {Same, Same} -> + {Len, IdentityLen, identity}; + _ -> + {Len, IdentityLen, gzip} + end; + gzip -> + case {Att#att.att_len, Att#att.disk_len} of + {AL, DL} when AL =:= undefined orelse DL =:= undefined -> + % Compressed attachment uploaded through the standalone API. + {Len, Len, gzip}; + {AL, DL} -> + % This case is used for efficient push-replication, where a + % compressed attachment is located in the body of multipart + % content-type request. + {AL, DL, gzip} + end + end, + Att#att{ + data={Fd,StreamInfo}, + att_len=AttLen, + disk_len=DiskLen, + md5=Md5, + encoding=NewEnc + }. write_streamed_attachment(_Stream, _F, 0) -> @@ -779,17 +939,18 @@ changes_since(Db, Style, StartSeq, Fun, Acc) -> changes_since(Db, Style, StartSeq, Fun, Options, Acc) -> Wrapper = fun(DocInfo, _Offset, Acc2) -> #doc_info{revs=Revs} = DocInfo, + DocInfo2 = case Style of main_only -> - Infos = [DocInfo]; + DocInfo; all_docs -> - % make each rev it's own doc info - Infos = [DocInfo#doc_info{revs=[RevInfo]} || - #rev_info{seq=RevSeq}=RevInfo <- Revs, StartSeq < RevSeq] + % remove revs before the seq + DocInfo#doc_info{revs=[RevInfo || + #rev_info{seq=RevSeq}=RevInfo <- Revs, StartSeq < RevSeq]} end, - Fun(Infos, Acc2) + Fun(DocInfo2, Acc2) end, - {ok, _LastReduction, AccOut} = couch_btree:fold(Db#db.docinfo_by_seq_btree, + {ok, _LastReduction, AccOut} = couch_btree:fold(Db#db.docinfo_by_seq_btree, Wrapper, Acc, [{start_key, StartSeq + 1}] ++ Options), {ok, AccOut}. @@ -816,11 +977,17 @@ init({DbName, Filepath, Fd, Options}) -> {ok, UpdaterPid} = gen_server:start_link(couch_db_updater, {self(), DbName, Filepath, Fd, Options}, []), {ok, #db{fd_ref_counter=RefCntr}=Db} = gen_server:call(UpdaterPid, get_db), couch_ref_counter:add(RefCntr), - couch_stats_collector:track_process_count({couchdb, open_databases}), + case lists:member(sys_db, Options) of + true -> + ok; + false -> + couch_stats_collector:track_process_count({couchdb, open_databases}) + end, + process_flag(trap_exit, true), {ok, Db}. -terminate(Reason, _Db) -> - couch_util:terminate_linked(Reason), +terminate(_Reason, Db) -> + couch_util:shutdown_sync(Db#db.update_pid), ok. handle_call({open_ref_count, OpenerPid}, _, #db{fd_ref_counter=RefCntr}=Db) -> @@ -839,7 +1006,9 @@ handle_call({db_updated, NewDb}, _From, #db{fd_ref_counter=OldRefCntr}) -> couch_ref_counter:add(NewRefCntr), couch_ref_counter:drop(OldRefCntr) end, - {reply, ok, NewDb}. + {reply, ok, NewDb}; +handle_call(get_db, _From, Db) -> + {reply, {ok, Db}, Db}. handle_cast(Msg, Db) -> @@ -848,7 +1017,11 @@ handle_cast(Msg, Db) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. - + +handle_info({'EXIT', _Pid, normal}, Db) -> + {noreply, Db}; +handle_info({'EXIT', _Pid, Reason}, Server) -> + {stop, Reason, Server}; handle_info(Msg, Db) -> ?LOG_ERROR("Bad message received for db ~s: ~p", [Db#db.name, Msg]), exit({error, Msg}). @@ -983,17 +1156,39 @@ make_doc(#db{fd=Fd}=Db, Id, Deleted, Bp, RevisionPath) -> {ok, {BodyData0, Atts0}} = read_doc(Db, Bp), {BodyData0, lists:map( - fun({Name,Type,Sp,Len,RevPos,Md5}) -> + fun({Name,Type,Sp,AttLen,DiskLen,RevPos,Md5,Enc}) -> #att{name=Name, type=Type, - len=Len, + att_len=AttLen, + disk_len=DiskLen, + md5=Md5, + revpos=RevPos, + data={Fd,Sp}, + encoding= + case Enc of + true -> + % 0110 UPGRADE CODE + gzip; + false -> + % 0110 UPGRADE CODE + identity; + _ -> + Enc + end + }; + ({Name,Type,Sp,AttLen,RevPos,Md5}) -> + #att{name=Name, + type=Type, + att_len=AttLen, + disk_len=AttLen, md5=Md5, revpos=RevPos, data={Fd,Sp}}; - ({Name,{Type,Sp,Len}}) -> + ({Name,{Type,Sp,AttLen}}) -> #att{name=Name, type=Type, - len=Len, + att_len=AttLen, + disk_len=AttLen, md5= <<>>, revpos=0, data={Fd,Sp}} @@ -1008,4 +1203,7 @@ make_doc(#db{fd=Fd}=Db, Id, Deleted, Bp, RevisionPath) -> }. - +increment_stat(#db{is_sys_db = true}, _Stat) -> + ok; +increment_stat(#db{}, Stat) -> + couch_stats_collector:increment(Stat). |
