summaryrefslogtreecommitdiff
path: root/src/couchdb/couch_db.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/couchdb/couch_db.erl')
-rw-r--r--src/couchdb/couch_db.erl370
1 files changed, 284 insertions, 86 deletions
diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl
index 79e00ff8..4e945ad4 100644
--- a/src/couchdb/couch_db.erl
+++ b/src/couchdb/couch_db.erl
@@ -13,18 +13,21 @@
-module(couch_db).
-behaviour(gen_server).
--export([open/2,close/1,create/2,start_compact/1,get_db_info/1,get_design_docs/1]).
+-export([open/2,open_int/2,close/1,create/2,start_compact/1,get_db_info/1,get_design_docs/1]).
-export([open_ref_counted/2,is_idle/1,monitor/1,count_changes_since/2]).
-export([update_doc/3,update_doc/4,update_docs/4,update_docs/2,update_docs/3,delete_doc/3]).
-export([get_doc_info/2,open_doc/2,open_doc/3,open_doc_revs/4]).
--export([set_revs_limit/2,get_revs_limit/1,register_update_notifier/3]).
+-export([set_revs_limit/2,get_revs_limit/1]).
-export([get_missing_revs/2,name/1,doc_to_tree/1,get_update_seq/1,get_committed_update_seq/1]).
-export([enum_docs/4,enum_docs_since/5]).
-export([enum_docs_since_reduce_to_count/1,enum_docs_reduce_to_count/1]).
-export([increment_update_seq/1,get_purge_seq/1,purge_docs/2,get_last_purged/1]).
--export([start_link/3,open_doc_int/3,set_admins/2,get_admins/1,ensure_full_commit/1]).
+-export([start_link/3,open_doc_int/3,ensure_full_commit/1]).
+-export([set_security/2,get_security/1]).
-export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]).
-export([changes_since/5,changes_since/6,read_doc/2,new_revid/1]).
+-export([check_is_admin/1, check_is_reader/1]).
+-export([reopen/1]).
-include("couch_db.hrl").
@@ -63,9 +66,39 @@ open_db_file(Filepath, Options) ->
create(DbName, Options) ->
couch_server:create(DbName, Options).
-open(DbName, Options) ->
+% this is for opening a database for internal purposes like the replicator
+% or the view indexer. it never throws a reader error.
+open_int(DbName, Options) ->
couch_server:open(DbName, Options).
+% this should be called anytime an http request opens the database.
+% it ensures that the http userCtx is a valid reader
+open(DbName, Options) ->
+ case couch_server:open(DbName, Options) of
+ {ok, Db} ->
+ try
+ check_is_reader(Db),
+ {ok, Db}
+ catch
+ throw:Error ->
+ close(Db),
+ throw(Error)
+ end;
+ Else -> Else
+ end.
+
+reopen(#db{main_pid = Pid, fd_ref_counter = OldRefCntr, user_ctx = UserCtx}) ->
+ {ok, #db{fd_ref_counter = NewRefCntr} = NewDb} =
+ gen_server:call(Pid, get_db, infinity),
+ case NewRefCntr =:= OldRefCntr of
+ true ->
+ ok;
+ false ->
+ couch_ref_counter:add(NewRefCntr),
+ couch_ref_counter:drop(OldRefCntr)
+ end,
+ {ok, NewDb#db{user_ctx = UserCtx}}.
+
ensure_full_commit(#db{update_pid=UpdatePid,instance_start_time=StartTime}) ->
ok = gen_server:call(UpdatePid, full_commit, infinity),
{ok, StartTime}.
@@ -73,9 +106,8 @@ ensure_full_commit(#db{update_pid=UpdatePid,instance_start_time=StartTime}) ->
close(#db{fd_ref_counter=RefCntr}) ->
couch_ref_counter:drop(RefCntr).
-open_ref_counted(MainPid, UserCtx) ->
- {ok, Db} = gen_server:call(MainPid, {open_ref_count, self()}),
- {ok, Db#db{user_ctx=UserCtx}}.
+open_ref_counted(MainPid, OpenedPid) ->
+ gen_server:call(MainPid, {open_ref_count, OpenedPid}).
is_idle(MainPid) ->
gen_server:call(MainPid, is_idle).
@@ -83,11 +115,8 @@ is_idle(MainPid) ->
monitor(#db{main_pid=MainPid}) ->
erlang:monitor(process, MainPid).
-register_update_notifier(#db{main_pid=Pid}, Seq, Fun) ->
- gen_server:call(Pid, {register_update_notifier, Seq, Fun}).
-
start_compact(#db{update_pid=Pid}) ->
- gen_server:cast(Pid, start_compact).
+ gen_server:call(Pid, start_compact).
delete_doc(Db, Id, Revisions) ->
DeletedDocs = [#doc{id=Id, revs=[Rev], deleted=true} || Rev <- Revisions],
@@ -98,23 +127,52 @@ open_doc(Db, IdOrDocInfo) ->
open_doc(Db, IdOrDocInfo, []).
open_doc(Db, Id, Options) ->
- couch_stats_collector:increment({couchdb, database_reads}),
+ increment_stat(Db, {couchdb, database_reads}),
case open_doc_int(Db, Id, Options) of
{ok, #doc{deleted=true}=Doc} ->
case lists:member(deleted, Options) of
true ->
- {ok, Doc};
+ apply_open_options({ok, Doc},Options);
false ->
{not_found, deleted}
end;
Else ->
- Else
+ apply_open_options(Else,Options)
+ end.
+
+apply_open_options({ok, Doc},Options) ->
+ apply_open_options2(Doc,Options);
+apply_open_options(Else,_Options) ->
+ Else.
+
+apply_open_options2(Doc,[]) ->
+ {ok, Doc};
+apply_open_options2(#doc{atts=Atts,revs=Revs}=Doc,
+ [{atts_since, PossibleAncestors}|Rest]) ->
+ RevPos = find_ancestor_rev_pos(Revs, PossibleAncestors),
+ apply_open_options2(Doc#doc{atts=[A#att{data=
+ if AttPos>RevPos -> Data; true -> stub end}
+ || #att{revpos=AttPos,data=Data}=A <- Atts]}, Rest);
+apply_open_options2(Doc,[_|Rest]) ->
+ apply_open_options2(Doc,Rest).
+
+
+find_ancestor_rev_pos({_, []}, _AttsSinceRevs) ->
+ 0;
+find_ancestor_rev_pos(_DocRevs, []) ->
+ 0;
+find_ancestor_rev_pos({RevPos, [RevId|Rest]}, AttsSinceRevs) ->
+ case lists:member({RevPos, RevId}, AttsSinceRevs) of
+ true ->
+ RevPos;
+ false ->
+ find_ancestor_rev_pos({RevPos - 1, Rest}, AttsSinceRevs)
end.
open_doc_revs(Db, Id, Revs, Options) ->
- couch_stats_collector:increment({couchdb, database_reads}),
- [Result] = open_doc_revs_int(Db, [{Id, Revs}], Options),
- Result.
+ increment_stat(Db, {couchdb, database_reads}),
+ [{ok, Results}] = open_doc_revs_int(Db, [{Id, Revs}], Options),
+ {ok, [apply_open_options(Result, Options) || Result <- Results]}.
% Each returned result is a list of tuples:
% {Id, MissingRevs, PossibleAncestors}
@@ -135,9 +193,9 @@ find_missing([{Id, Revs}|RestIdRevs], [{ok, FullInfo} | RestLookupInfo]) ->
% Find the revs that are possible parents of this rev
PossibleAncestors =
lists:foldl(fun({LeafPos, LeafRevId}, Acc) ->
- % this leaf is a "possible ancenstor" of the missing
+ % this leaf is a "possible ancenstor" of the missing
% revs if this LeafPos lessthan any of the missing revs
- case lists:any(fun({MissingPos, _}) ->
+ case lists:any(fun({MissingPos, _}) ->
LeafPos < MissingPos end, MissingRevs) of
true ->
[{LeafPos, LeafRevId} | Acc];
@@ -194,7 +252,8 @@ get_db_info(Db) ->
update_seq=SeqNum,
name=Name,
fulldocinfo_by_id_btree=FullDocBtree,
- instance_start_time=StartTime} = Db,
+ instance_start_time=StartTime,
+ committed_update_seq=CommittedUpdateSeq} = Db,
{ok, Size} = couch_file:bytes(Fd),
{ok, {Count, DelCount}} = couch_btree:full_reduce(FullDocBtree),
InfoList = [
@@ -206,7 +265,8 @@ get_db_info(Db) ->
{compact_running, Compactor/=nil},
{disk_size, Size},
{instance_start_time, StartTime},
- {disk_format_version, DiskVersion}
+ {disk_format_version, DiskVersion},
+ {committed_update_seq, CommittedUpdateSeq}
],
{ok, InfoList}.
@@ -221,22 +281,88 @@ get_design_docs(#db{fulldocinfo_by_id_btree=Btree}=Db) ->
[], [{start_key, <<"_design/">>}, {end_key_gt, <<"_design0">>}]),
{ok, Docs}.
-check_is_admin(#db{admins=Admins, user_ctx=#user_ctx{name=Name,roles=Roles}}) ->
- DbAdmins = [<<"_admin">> | Admins],
- case DbAdmins -- [Name | Roles] of
- DbAdmins -> % same list, not an admin
- throw({unauthorized, <<"You are not a db or server admin.">>});
+check_is_admin(#db{user_ctx=#user_ctx{name=Name,roles=Roles}}=Db) ->
+ {Admins} = get_admins(Db),
+ AdminRoles = [<<"_admin">> | couch_util:get_value(<<"roles">>, Admins, [])],
+ AdminNames = couch_util:get_value(<<"names">>, Admins,[]),
+ case AdminRoles -- Roles of
+ AdminRoles -> % same list, not an admin role
+ case AdminNames -- [Name] of
+ AdminNames -> % same names, not an admin
+ throw({unauthorized, <<"You are not a db or server admin.">>});
+ _ ->
+ ok
+ end;
_ ->
ok
end.
-get_admins(#db{admins=Admins}) ->
- Admins.
+check_is_reader(#db{user_ctx=#user_ctx{name=Name,roles=Roles}=UserCtx}=Db) ->
+ case (catch check_is_admin(Db)) of
+ ok -> ok;
+ _ ->
+ {Readers} = get_readers(Db),
+ ReaderRoles = couch_util:get_value(<<"roles">>, Readers,[]),
+ WithAdminRoles = [<<"_admin">> | ReaderRoles],
+ ReaderNames = couch_util:get_value(<<"names">>, Readers,[]),
+ case ReaderRoles ++ ReaderNames of
+ [] -> ok; % no readers == public access
+ _Else ->
+ case WithAdminRoles -- Roles of
+ WithAdminRoles -> % same list, not an reader role
+ case ReaderNames -- [Name] of
+ ReaderNames -> % same names, not a reader
+ ?LOG_DEBUG("Not a reader: UserCtx ~p vs Names ~p Roles ~p",[UserCtx, ReaderNames, WithAdminRoles]),
+ throw({unauthorized, <<"You are not authorized to access this db.">>});
+ _ ->
+ ok
+ end;
+ _ ->
+ ok
+ end
+ end
+ end.
+
+get_admins(#db{security=SecProps}) ->
+ couch_util:get_value(<<"admins">>, SecProps, {[]}).
+
+get_readers(#db{security=SecProps}) ->
+ couch_util:get_value(<<"readers">>, SecProps, {[]}).
-set_admins(#db{update_pid=Pid}=Db, Admins) when is_list(Admins) ->
+get_security(#db{security=SecProps}) ->
+ {SecProps}.
+
+set_security(#db{update_pid=Pid}=Db, {NewSecProps}) when is_list(NewSecProps) ->
check_is_admin(Db),
- gen_server:call(Pid, {set_admins, Admins}, infinity).
+ ok = validate_security_object(NewSecProps),
+ ok = gen_server:call(Pid, {set_security, NewSecProps}, infinity),
+ {ok, _} = ensure_full_commit(Db),
+ ok;
+set_security(_, _) ->
+ throw(bad_request).
+
+validate_security_object(SecProps) ->
+ Admins = couch_util:get_value(<<"admins">>, SecProps, {[]}),
+ Readers = couch_util:get_value(<<"readers">>, SecProps, {[]}),
+ ok = validate_names_and_roles(Admins),
+ ok = validate_names_and_roles(Readers),
+ ok.
+% validate user input
+validate_names_and_roles({Props}) when is_list(Props) ->
+ case couch_util:get_value(<<"names">>,Props,[]) of
+ Ns when is_list(Ns) ->
+ [throw("names must be a JSON list of strings") ||N <- Ns, not is_binary(N)],
+ Ns;
+ _ -> throw("names must be a JSON list of strings")
+ end,
+ case couch_util:get_value(<<"roles">>,Props,[]) of
+ Rs when is_list(Rs) ->
+ [throw("roles must be a JSON list of strings") ||R <- Rs, not is_binary(R)],
+ Rs;
+ _ -> throw("roles must be a JSON list of strings")
+ end,
+ ok.
get_revs_limit(#db{revs_limit=Limit}) ->
Limit.
@@ -257,8 +383,14 @@ update_doc(Db, Doc, Options, UpdateType) ->
case update_docs(Db, [Doc], Options, UpdateType) of
{ok, [{ok, NewRev}]} ->
{ok, NewRev};
+ {ok, [{{_Id, _Rev}, Error}]} ->
+ throw(Error);
{ok, [Error]} ->
- throw(Error)
+ throw(Error);
+ {ok, []} ->
+ % replication success
+ {Pos, [RevId | _]} = Doc#doc.revs,
+ {ok, {Pos, RevId}}
end.
update_docs(Db, Docs) ->
@@ -285,18 +417,8 @@ group_alike_docs([Doc|Rest], [Bucket|RestBuckets]) ->
group_alike_docs(Rest, [[Doc]|[Bucket|RestBuckets]])
end.
-
-validate_doc_update(#db{user_ctx=UserCtx, admins=Admins},
- #doc{id= <<"_design/",_/binary>>}, _GetDiskDocFun) ->
- UserNames = [UserCtx#user_ctx.name | UserCtx#user_ctx.roles],
- % if the user is a server admin or db admin, allow the save
- case length(UserNames -- [<<"_admin">> | Admins]) =:= length(UserNames) of
- true ->
- % not an admin
- {unauthorized, <<"You are not a server or database admin.">>};
- false ->
- ok
- end;
+validate_doc_update(#db{}=Db, #doc{id= <<"_design/",_/binary>>}, _GetDiskDocFun) ->
+ catch check_is_admin(Db);
validate_doc_update(#db{validate_doc_funs=[]}, _Doc, _GetDiskDocFun) ->
ok;
validate_doc_update(_Db, #doc{id= <<"_local/",_/binary>>}, _GetDiskDocFun) ->
@@ -304,7 +426,8 @@ validate_doc_update(_Db, #doc{id= <<"_local/",_/binary>>}, _GetDiskDocFun) ->
validate_doc_update(Db, Doc, GetDiskDocFun) ->
DiskDoc = GetDiskDocFun(),
JsonCtx = couch_util:json_user_ctx(Db),
- try [case Fun(Doc, DiskDoc, JsonCtx) of
+ SecObj = get_security(Db),
+ try [case Fun(Doc, DiskDoc, JsonCtx, SecObj) of
ok -> ok;
Error -> throw(Error)
end || Fun <- Db#db.validate_doc_funs],
@@ -352,15 +475,15 @@ prep_and_validate_update(Db, #doc{id=Id,revs={RevStart, Revs}}=Doc,
prep_and_validate_updates(_Db, [], [], _AllowConflict, AccPrepped,
AccFatalErrors) ->
{AccPrepped, AccFatalErrors};
-prep_and_validate_updates(Db, [DocBucket|RestBuckets], [not_found|RestLookups],
+prep_and_validate_updates(Db, [DocBucket|RestBuckets], [not_found|RestLookups],
AllowConflict, AccPrepped, AccErrors) ->
[#doc{id=Id}|_]=DocBucket,
% no existing revs are known,
{PreppedBucket, AccErrors3} = lists:foldl(
- fun(#doc{revs=Revs}=Doc, {AccBucket, AccErrors2}) ->
+ fun(#doc{revs=Revs}=Doc, {AccBucket, AccErrors2}) ->
case couch_doc:has_stubs(Doc) of
true ->
- couch_doc:merge_doc(Doc, #doc{}); % will throw exception
+ couch_doc:merge_stubs(Doc, #doc{}); % will throw exception
false -> ok
end,
case Revs of
@@ -398,12 +521,12 @@ prep_and_validate_updates(Db, [DocBucket|RestBuckets],
end
end,
{[], AccErrors}, DocBucket),
- prep_and_validate_updates(Db, RestBuckets, RestLookups, AllowConflict,
+ prep_and_validate_updates(Db, RestBuckets, RestLookups, AllowConflict,
[PreppedBucket | AccPrepped], AccErrors3).
-update_docs(#db{update_pid=UpdatePid}=Db, Docs, Options) ->
- update_docs(#db{update_pid=UpdatePid}=Db, Docs, Options, interactive_edit).
+update_docs(Db, Docs, Options) ->
+ update_docs(Db, Docs, Options, interactive_edit).
prep_and_validate_replicated_updates(_Db, [], [], AccPrepped, AccErrors) ->
@@ -414,10 +537,10 @@ prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldI
case OldInfo of
not_found ->
{ValidatedBucket, AccErrors3} = lists:foldl(
- fun(Doc, {AccPrepped2, AccErrors2}) ->
+ fun(Doc, {AccPrepped2, AccErrors2}) ->
case couch_doc:has_stubs(Doc) of
true ->
- couch_doc:merge_doc(Doc, #doc{}); % will throw exception
+ couch_doc:merge_stubs(Doc, #doc{}); % will throw exception
false -> ok
end,
case validate_doc_update(Db, Doc, fun() -> nil end) of
@@ -432,7 +555,7 @@ prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldI
{ok, #full_doc_info{rev_tree=OldTree}} ->
NewRevTree = lists:foldl(
fun(NewDoc, AccTree) ->
- {NewTree, _} = couch_key_tree:merge(AccTree, [couch_db:doc_to_tree(NewDoc)]),
+ {NewTree, _} = couch_key_tree:merge(AccTree, couch_db:doc_to_tree(NewDoc)),
NewTree
end,
OldTree, Bucket),
@@ -487,7 +610,7 @@ new_revid(#doc{body=Body,revs={OldStart,OldRevs},
?l2b(integer_to_list(couch_util:rand32()));
Atts2 ->
OldRev = case OldRevs of [] -> 0; [OldRev0|_] -> OldRev0 end,
- erlang:md5(term_to_binary([Deleted, OldStart, OldRev, Body, Atts2]))
+ couch_util:md5(term_to_binary([Deleted, OldStart, OldRev, Body, Atts2]))
end.
new_revs([], OutBuckets, IdRevsAcc) ->
@@ -515,7 +638,7 @@ check_dup_atts2(_) ->
update_docs(Db, Docs, Options, replicated_changes) ->
- couch_stats_collector:increment({couchdb, database_writes}),
+ increment_stat(Db, {couchdb, database_writes}),
DocBuckets = group_alike_docs(Docs),
case (Db#db.validate_doc_funs /= []) orelse
@@ -541,7 +664,7 @@ update_docs(Db, Docs, Options, replicated_changes) ->
{ok, DocErrors};
update_docs(Db, Docs, Options, interactive_edit) ->
- couch_stats_collector:increment({couchdb, database_writes}),
+ increment_stat(Db, {couchdb, database_writes}),
AllOrNothing = lists:member(all_or_nothing, Options),
% go ahead and generate the new revision ids for the documents.
% separate out the NonRep documents from the rest of the documents
@@ -645,7 +768,7 @@ collect_results(UpdatePid, MRef, ResultsAcc) ->
exit(Reason)
end.
-write_and_commit(#db{update_pid=UpdatePid, user_ctx=Ctx}=Db, DocBuckets,
+write_and_commit(#db{update_pid=UpdatePid}=Db, DocBuckets,
NonRepDocs, Options0) ->
Options = set_commit_option(Options0),
MergeConflicts = lists:member(merge_conflicts, Options),
@@ -658,7 +781,7 @@ write_and_commit(#db{update_pid=UpdatePid, user_ctx=Ctx}=Db, DocBuckets,
retry ->
% This can happen if the db file we wrote to was swapped out by
% compaction. Retry by reopening the db and writing to the current file
- {ok, Db2} = open_ref_counted(Db#db.main_pid, Ctx),
+ {ok, Db2} = open_ref_counted(Db#db.main_pid, self()),
DocBuckets2 = [[doc_flush_atts(Doc, Db2#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets],
% We only retry once
close(Db2),
@@ -693,18 +816,19 @@ flush_att(Fd, #att{data={Fd0, _}}=Att) when Fd0 == Fd ->
% already written to our file, nothing to write
Att;
-flush_att(Fd, #att{data={OtherFd,StreamPointer}, md5=InMd5}=Att) ->
- {NewStreamData, Len, Md5} =
+flush_att(Fd, #att{data={OtherFd,StreamPointer}, md5=InMd5,
+ disk_len=InDiskLen} = Att) ->
+ {NewStreamData, Len, _IdentityLen, Md5, IdentityMd5} =
couch_stream:copy_to_new_stream(OtherFd, StreamPointer, Fd),
- check_md5(Md5, InMd5),
- Att#att{data={Fd, NewStreamData}, md5=Md5, len=Len};
+ check_md5(IdentityMd5, InMd5),
+ Att#att{data={Fd, NewStreamData}, md5=Md5, att_len=Len, disk_len=InDiskLen};
flush_att(Fd, #att{data=Data}=Att) when is_binary(Data) ->
with_stream(Fd, Att, fun(OutputStream) ->
couch_stream:write(OutputStream, Data)
end);
-flush_att(Fd, #att{data=Fun,len=undefined}=Att) when is_function(Fun) ->
+flush_att(Fd, #att{data=Fun,att_len=undefined}=Att) when is_function(Fun) ->
with_stream(Fd, Att, fun(OutputStream) ->
% Fun(MaxChunkSize, WriterFun) must call WriterFun
% once for each chunk of the attachment,
@@ -726,9 +850,9 @@ flush_att(Fd, #att{data=Fun,len=undefined}=Att) when is_function(Fun) ->
end, ok)
end);
-flush_att(Fd, #att{data=Fun,len=Len}=Att) when is_function(Fun) ->
+flush_att(Fd, #att{data=Fun,att_len=AttLen}=Att) when is_function(Fun) ->
with_stream(Fd, Att, fun(OutputStream) ->
- write_streamed_attachment(OutputStream, Fun, Len)
+ write_streamed_attachment(OutputStream, Fun, AttLen)
end).
% From RFC 2616 3.6.1 - Chunked Transfer Coding
@@ -741,8 +865,17 @@ flush_att(Fd, #att{data=Fun,len=Len}=Att) when is_function(Fun) ->
% is present in the request, but there is no Content-MD5
% trailer, we're free to ignore this inconsistency and
% pretend that no Content-MD5 exists.
-with_stream(Fd, #att{md5=InMd5}=Att, Fun) ->
- {ok, OutputStream} = couch_stream:open(Fd),
+with_stream(Fd, #att{md5=InMd5,type=Type,encoding=Enc}=Att, Fun) ->
+ {ok, OutputStream} = case (Enc =:= identity) andalso
+ couch_util:compressible_att_type(Type) of
+ true ->
+ CompLevel = list_to_integer(
+ couch_config:get("attachments", "compression_level", "0")
+ ),
+ couch_stream:open(Fd, gzip, [{compression_level, CompLevel}]);
+ _ ->
+ couch_stream:open(Fd)
+ end,
ReqMd5 = case Fun(OutputStream) of
{md5, FooterMd5} ->
case InMd5 of
@@ -752,9 +885,36 @@ with_stream(Fd, #att{md5=InMd5}=Att, Fun) ->
_ ->
InMd5
end,
- {StreamInfo, Len, Md5} = couch_stream:close(OutputStream),
- check_md5(Md5, ReqMd5),
- Att#att{data={Fd,StreamInfo},len=Len,md5=Md5}.
+ {StreamInfo, Len, IdentityLen, Md5, IdentityMd5} =
+ couch_stream:close(OutputStream),
+ check_md5(IdentityMd5, ReqMd5),
+ {AttLen, DiskLen, NewEnc} = case Enc of
+ identity ->
+ case {Md5, IdentityMd5} of
+ {Same, Same} ->
+ {Len, IdentityLen, identity};
+ _ ->
+ {Len, IdentityLen, gzip}
+ end;
+ gzip ->
+ case {Att#att.att_len, Att#att.disk_len} of
+ {AL, DL} when AL =:= undefined orelse DL =:= undefined ->
+ % Compressed attachment uploaded through the standalone API.
+ {Len, Len, gzip};
+ {AL, DL} ->
+ % This case is used for efficient push-replication, where a
+ % compressed attachment is located in the body of multipart
+ % content-type request.
+ {AL, DL, gzip}
+ end
+ end,
+ Att#att{
+ data={Fd,StreamInfo},
+ att_len=AttLen,
+ disk_len=DiskLen,
+ md5=Md5,
+ encoding=NewEnc
+ }.
write_streamed_attachment(_Stream, _F, 0) ->
@@ -779,17 +939,18 @@ changes_since(Db, Style, StartSeq, Fun, Acc) ->
changes_since(Db, Style, StartSeq, Fun, Options, Acc) ->
Wrapper = fun(DocInfo, _Offset, Acc2) ->
#doc_info{revs=Revs} = DocInfo,
+ DocInfo2 =
case Style of
main_only ->
- Infos = [DocInfo];
+ DocInfo;
all_docs ->
- % make each rev it's own doc info
- Infos = [DocInfo#doc_info{revs=[RevInfo]} ||
- #rev_info{seq=RevSeq}=RevInfo <- Revs, StartSeq < RevSeq]
+ % remove revs before the seq
+ DocInfo#doc_info{revs=[RevInfo ||
+ #rev_info{seq=RevSeq}=RevInfo <- Revs, StartSeq < RevSeq]}
end,
- Fun(Infos, Acc2)
+ Fun(DocInfo2, Acc2)
end,
- {ok, _LastReduction, AccOut} = couch_btree:fold(Db#db.docinfo_by_seq_btree,
+ {ok, _LastReduction, AccOut} = couch_btree:fold(Db#db.docinfo_by_seq_btree,
Wrapper, Acc, [{start_key, StartSeq + 1}] ++ Options),
{ok, AccOut}.
@@ -816,11 +977,17 @@ init({DbName, Filepath, Fd, Options}) ->
{ok, UpdaterPid} = gen_server:start_link(couch_db_updater, {self(), DbName, Filepath, Fd, Options}, []),
{ok, #db{fd_ref_counter=RefCntr}=Db} = gen_server:call(UpdaterPid, get_db),
couch_ref_counter:add(RefCntr),
- couch_stats_collector:track_process_count({couchdb, open_databases}),
+ case lists:member(sys_db, Options) of
+ true ->
+ ok;
+ false ->
+ couch_stats_collector:track_process_count({couchdb, open_databases})
+ end,
+ process_flag(trap_exit, true),
{ok, Db}.
-terminate(Reason, _Db) ->
- couch_util:terminate_linked(Reason),
+terminate(_Reason, Db) ->
+ couch_util:shutdown_sync(Db#db.update_pid),
ok.
handle_call({open_ref_count, OpenerPid}, _, #db{fd_ref_counter=RefCntr}=Db) ->
@@ -839,7 +1006,9 @@ handle_call({db_updated, NewDb}, _From, #db{fd_ref_counter=OldRefCntr}) ->
couch_ref_counter:add(NewRefCntr),
couch_ref_counter:drop(OldRefCntr)
end,
- {reply, ok, NewDb}.
+ {reply, ok, NewDb};
+handle_call(get_db, _From, Db) ->
+ {reply, {ok, Db}, Db}.
handle_cast(Msg, Db) ->
@@ -848,7 +1017,11 @@ handle_cast(Msg, Db) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-
+
+handle_info({'EXIT', _Pid, normal}, Db) ->
+ {noreply, Db};
+handle_info({'EXIT', _Pid, Reason}, Server) ->
+ {stop, Reason, Server};
handle_info(Msg, Db) ->
?LOG_ERROR("Bad message received for db ~s: ~p", [Db#db.name, Msg]),
exit({error, Msg}).
@@ -983,17 +1156,39 @@ make_doc(#db{fd=Fd}=Db, Id, Deleted, Bp, RevisionPath) ->
{ok, {BodyData0, Atts0}} = read_doc(Db, Bp),
{BodyData0,
lists:map(
- fun({Name,Type,Sp,Len,RevPos,Md5}) ->
+ fun({Name,Type,Sp,AttLen,DiskLen,RevPos,Md5,Enc}) ->
#att{name=Name,
type=Type,
- len=Len,
+ att_len=AttLen,
+ disk_len=DiskLen,
+ md5=Md5,
+ revpos=RevPos,
+ data={Fd,Sp},
+ encoding=
+ case Enc of
+ true ->
+ % 0110 UPGRADE CODE
+ gzip;
+ false ->
+ % 0110 UPGRADE CODE
+ identity;
+ _ ->
+ Enc
+ end
+ };
+ ({Name,Type,Sp,AttLen,RevPos,Md5}) ->
+ #att{name=Name,
+ type=Type,
+ att_len=AttLen,
+ disk_len=AttLen,
md5=Md5,
revpos=RevPos,
data={Fd,Sp}};
- ({Name,{Type,Sp,Len}}) ->
+ ({Name,{Type,Sp,AttLen}}) ->
#att{name=Name,
type=Type,
- len=Len,
+ att_len=AttLen,
+ disk_len=AttLen,
md5= <<>>,
revpos=0,
data={Fd,Sp}}
@@ -1008,4 +1203,7 @@ make_doc(#db{fd=Fd}=Db, Id, Deleted, Bp, RevisionPath) ->
}.
-
+increment_stat(#db{is_sys_db = true}, _Stat) ->
+ ok;
+increment_stat(#db{}, Stat) ->
+ couch_stats_collector:increment(Stat).