summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAdam Kocoloski <kocolosk@apache.org>2010-03-05 16:27:00 +0000
committerAdam Kocoloski <kocolosk@apache.org>2010-03-05 16:27:00 +0000
commit64481d0117baba9fce06384addff168912c83546 (patch)
treee945c4dae6663f4c359e179a50baf88d0ad71ef3
parent52c9cec5c6715139cf06a99be9779e2f677bceae (diff)
efficient attachment replication. Patch by Filipe Manana. Closes COUCHDB-639
git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@919469 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--src/couchdb/couch_db.erl23
-rw-r--r--src/couchdb/couch_doc.erl91
-rw-r--r--src/couchdb/couch_httpd_db.erl7
-rw-r--r--src/couchdb/couch_rep_att.erl6
-rw-r--r--src/couchdb/couch_rep_reader.erl7
-rw-r--r--src/couchdb/couch_rep_writer.erl82
-rw-r--r--test/etap/113-replication-attachment-comp.t264
-rw-r--r--test/etap/Makefile.am1
8 files changed, 430 insertions, 51 deletions
diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl
index 4fca1346..e3c7eaf9 100644
--- a/src/couchdb/couch_db.erl
+++ b/src/couchdb/couch_db.erl
@@ -340,7 +340,11 @@ update_doc(Db, Doc, Options, UpdateType) ->
{ok, [{ok, NewRev}]} ->
{ok, NewRev};
{ok, [Error]} ->
- throw(Error)
+ throw(Error);
+ {ok, []} ->
+ % replication success
+ {Pos, [RevId | _]} = Doc#doc.revs,
+ {ok, {Pos, RevId}}
end.
update_docs(Db, Docs) ->
@@ -814,8 +818,9 @@ flush_att(Fd, #att{data=Fun,att_len=AttLen}=Att) when is_function(Fun) ->
% is present in the request, but there is no Content-MD5
% trailer, we're free to ignore this inconsistency and
% pretend that no Content-MD5 exists.
-with_stream(Fd, #att{md5=InMd5,type=Type}=Att, Fun) ->
- {ok, OutputStream} = case couch_util:compressible_att_type(Type) of
+with_stream(Fd, #att{md5=InMd5,type=Type,comp=AlreadyComp}=Att, Fun) ->
+ {ok, OutputStream} = case (not AlreadyComp) andalso
+ couch_util:compressible_att_type(Type) of
true ->
CompLevel = list_to_integer(
couch_config:get("attachments", "compression_level", "0")
@@ -836,12 +841,18 @@ with_stream(Fd, #att{md5=InMd5,type=Type}=Att, Fun) ->
{StreamInfo, Len, IdentityLen, Md5, IdentityMd5} =
couch_stream:close(OutputStream),
check_md5(IdentityMd5, ReqMd5),
+ {AttLen, DiskLen} = case AlreadyComp of
+ true ->
+ {Att#att.att_len, Att#att.disk_len};
+ _ ->
+ {Len, IdentityLen}
+ end,
Att#att{
data={Fd,StreamInfo},
- att_len=Len,
- disk_len=IdentityLen,
+ att_len=AttLen,
+ disk_len=DiskLen,
md5=Md5,
- comp=(IdentityMd5 =/= Md5)
+ comp=(AlreadyComp orelse (IdentityMd5 =/= Md5))
}.
diff --git a/src/couchdb/couch_doc.erl b/src/couchdb/couch_doc.erl
index f5c47bf7..71d21dda 100644
--- a/src/couchdb/couch_doc.erl
+++ b/src/couchdb/couch_doc.erl
@@ -17,7 +17,7 @@
-export([from_json_obj/1,to_json_obj/2,has_stubs/1, merge_stubs/2]).
-export([validate_docid/1]).
-export([doc_from_multi_part_stream/2]).
--export([doc_to_multi_part_stream/5, len_doc_to_multi_part_stream/4]).
+-export([doc_to_multi_part_stream/6, len_doc_to_multi_part_stream/5]).
-include("couch_db.hrl").
@@ -73,21 +73,26 @@ to_json_meta(Meta) ->
end, Meta).
to_json_attachments(Attachments, Options) ->
- case lists:member(attachments, Options) of
+ RevPos = case lists:member(attachments, Options) of
true -> % return all the binaries
- to_json_attachments(Attachments, 0, lists:member(follows, Options));
+ 0;
false ->
% note the default is [], because this sorts higher than all numbers.
% and will return all the binaries.
- RevPos = proplists:get_value(atts_after_revpos, Options, []),
- to_json_attachments(Attachments, RevPos, lists:member(follows, Options))
- end.
+ proplists:get_value(atts_after_revpos, Options, [])
+ end,
+ to_json_attachments(
+ Attachments,
+ RevPos,
+ lists:member(follows, Options),
+ lists:member(att_gzip_length, Options)
+ ).
-to_json_attachments([], _RevPosIncludeAfter, _DataToFollow) ->
+to_json_attachments([], _RevPosIncludeAfter, _DataToFollow, _ShowGzipLen) ->
[];
-to_json_attachments(Atts, RevPosIncludeAfter, DataToFollow) ->
+to_json_attachments(Atts, RevPosIncludeAfter, DataToFollow, ShowGzipLen) ->
AttProps = lists:map(
- fun(#att{disk_len=DiskLen}=Att) ->
+ fun(#att{disk_len=DiskLen, att_len=AttLen, comp=Comp}=Att) ->
{Att#att.name, {[
{<<"content_type">>, Att#att.type},
{<<"revpos">>, Att#att.revpos}
@@ -96,7 +101,7 @@ to_json_attachments(Atts, RevPosIncludeAfter, DataToFollow) ->
if DataToFollow ->
[{<<"length">>, DiskLen}, {<<"follows">>, true}];
true ->
- AttData = case Att#att.comp of
+ AttData = case Comp of
true ->
zlib:gunzip(att_to_bin(Att));
_ ->
@@ -106,7 +111,13 @@ to_json_attachments(Atts, RevPosIncludeAfter, DataToFollow) ->
end;
true ->
[{<<"length">>, DiskLen}, {<<"stub">>, true}]
- end
+ end ++
+ case {ShowGzipLen, Comp} of
+ {true, true} ->
+ [{<<"gzip_length">>, AttLen}];
+ _ ->
+ []
+ end
}}
end, Atts),
[{<<"_attachments">>, {AttProps}}].
@@ -190,19 +201,19 @@ transfer_fields([{<<"_attachments">>, {JsonBins}} | Rest], Doc) ->
case proplists:get_value(<<"stub">>, BinProps) of
true ->
Type = proplists:get_value(<<"content_type">>, BinProps),
- Length = proplists:get_value(<<"length">>, BinProps),
RevPos = proplists:get_value(<<"revpos">>, BinProps, 0),
- #att{name=Name, data=stub, type=Type, att_len=Length,
- disk_len=Length, revpos=RevPos};
+ {AttLen, DiskLen, Comp} = att_lengths(BinProps),
+ #att{name=Name, data=stub, type=Type, att_len=AttLen,
+ disk_len=DiskLen, comp=Comp, revpos=RevPos};
_ ->
Type = proplists:get_value(<<"content_type">>, BinProps,
?DEFAULT_ATTACHMENT_CONTENT_TYPE),
RevPos = proplists:get_value(<<"revpos">>, BinProps, 0),
case proplists:get_value(<<"follows">>, BinProps) of
true ->
- Len = proplists:get_value(<<"length">>, BinProps),
- #att{name=Name, data=follows, type=Type,
- att_len=Len, disk_len=Len, revpos=RevPos};
+ {AttLen, DiskLen, Comp} = att_lengths(BinProps),
+ #att{name=Name, data=follows, type=Type, comp=Comp,
+ att_len=AttLen, disk_len=DiskLen, revpos=RevPos};
_ ->
Value = proplists:get_value(<<"data">>, BinProps),
Bin = base64:decode(Value),
@@ -250,6 +261,16 @@ transfer_fields([{<<"_",Name/binary>>, _} | _], _) ->
transfer_fields([Field | Rest], #doc{body=Fields}=Doc) ->
transfer_fields(Rest, Doc#doc{body=[Field|Fields]}).
+att_lengths(BinProps) ->
+ DiskLen = proplists:get_value(<<"length">>, BinProps),
+ GzipLen = proplists:get_value(<<"gzip_length">>, BinProps),
+ case GzipLen of
+ undefined ->
+ {DiskLen, DiskLen, false};
+ _ ->
+ {GzipLen, DiskLen, true}
+ end.
+
to_doc_info(FullDocInfo) ->
{DocInfo, _Path} = to_doc_info_path(FullDocInfo),
DocInfo.
@@ -355,18 +376,24 @@ fold_streamed_data(RcvFun, LenLeft, Fun, Acc) when LenLeft > 0->
ResultAcc = Fun(Bin, Acc),
fold_streamed_data(RcvFun, LenLeft - size(Bin), Fun, ResultAcc).
-len_doc_to_multi_part_stream(Boundary,JsonBytes,Atts,AttsSinceRevPos) ->
+len_doc_to_multi_part_stream(Boundary, JsonBytes, Atts, AttsSinceRevPos,
+ SendGzipAtts) ->
2 + % "--"
size(Boundary) +
36 + % "\r\ncontent-type: application/json\r\n\r\n"
iolist_size(JsonBytes) +
4 + % "\r\n--"
size(Boundary) +
- + lists:foldl(fun(#att{revpos=RevPos,disk_len=DiskLen}, AccAttsSize) ->
+ + lists:foldl(fun(#att{revpos=RevPos} = Att, AccAttsSize) ->
if RevPos > AttsSinceRevPos ->
AccAttsSize +
4 + % "\r\n\r\n"
- DiskLen +
+ case SendGzipAtts of
+ true ->
+ Att#att.att_len;
+ _ ->
+ Att#att.disk_len
+ end +
4 + % "\r\n--"
size(Boundary);
true ->
@@ -374,29 +401,33 @@ len_doc_to_multi_part_stream(Boundary,JsonBytes,Atts,AttsSinceRevPos) ->
end
end, 0, Atts) +
2. % "--"
-
-doc_to_multi_part_stream(Boundary,JsonBytes,Atts,AttsSinceRevPos,WriteFun) ->
+
+doc_to_multi_part_stream(Boundary, JsonBytes, Atts, AttsSinceRevPos, WriteFun,
+ SendGzipAtts) ->
WriteFun([<<"--", Boundary/binary,
"\r\ncontent-type: application/json\r\n\r\n">>,
JsonBytes, <<"\r\n--", Boundary/binary>>]),
- atts_to_mp(Atts, Boundary, WriteFun, AttsSinceRevPos).
+ atts_to_mp(Atts, Boundary, WriteFun, AttsSinceRevPos, SendGzipAtts).
-atts_to_mp([], _Boundary, WriteFun, _AttsSinceRevPos) ->
+atts_to_mp([], _Boundary, WriteFun, _AttsSinceRevPos, _SendGzipAtts) ->
WriteFun(<<"--">>);
atts_to_mp([#att{revpos=RevPos} = Att | RestAtts], Boundary, WriteFun,
- AttsSinceRevPos) when RevPos > AttsSinceRevPos ->
+ AttsSinceRevPos, SendGzipAtts) when RevPos > AttsSinceRevPos ->
WriteFun(<<"\r\n\r\n">>),
- AttFun = case Att#att.comp of
- true ->
+ AttFun = case {Att#att.comp, SendGzipAtts} of
+ {true, false} ->
fun att_foldl_unzip/3;
_ ->
+ % receiver knows that the attachment is compressed by checking that the
+ % "gzip_length" field is present in the corresponding JSON attachment
+ % object found within the JSON doc
fun att_foldl/3
end,
AttFun(Att, fun(Data, ok) -> WriteFun(Data) end, ok),
WriteFun(<<"\r\n--", Boundary/binary>>),
- atts_to_mp(RestAtts, Boundary, WriteFun, AttsSinceRevPos);
-atts_to_mp([_ | RestAtts], Boundary, WriteFun, AttsSinceRevPos) ->
- atts_to_mp(RestAtts, Boundary, WriteFun, AttsSinceRevPos).
+ atts_to_mp(RestAtts, Boundary, WriteFun, AttsSinceRevPos, SendGzipAtts);
+atts_to_mp([_ | RestAtts], Boundary, WriteFun, AttsSinceRevPos, SendGzipAtts) ->
+ atts_to_mp(RestAtts, Boundary, WriteFun, AttsSinceRevPos, SendGzipAtts).
doc_from_multi_part_stream(ContentType, DataFun) ->
diff --git a/src/couchdb/couch_httpd_db.erl b/src/couchdb/couch_httpd_db.erl
index 1028e857..d7fe3310 100644
--- a/src/couchdb/couch_httpd_db.erl
+++ b/src/couchdb/couch_httpd_db.erl
@@ -742,13 +742,13 @@ send_doc_efficiently(Req, #doc{atts=Atts}=Doc, Headers, Options) ->
JsonBytes = ?JSON_ENCODE(couch_doc:to_json_obj(Doc, [follows|Options])),
AttsSinceRevPos = proplists:get_value(atts_after_revpos, Options, 0),
Len = couch_doc:len_doc_to_multi_part_stream(Boundary,JsonBytes,Atts,
- AttsSinceRevPos),
+ AttsSinceRevPos,false),
CType = {<<"Content-Type">>,
<<"multipart/related; boundary=\"", Boundary/binary, "\"">>},
{ok, Resp} = start_response_length(Req, 200, [CType|Headers], Len),
couch_doc:doc_to_multi_part_stream(Boundary,JsonBytes,Atts,
AttsSinceRevPos,
- fun(Data) -> couch_httpd:send(Resp, Data) end)
+ fun(Data) -> couch_httpd:send(Resp, Data) end, false)
end;
false ->
send_json(Req, 200, [], couch_doc:to_json_obj(Doc, Options))
@@ -1045,6 +1045,9 @@ parse_doc_query(Req) ->
Args#doc_query_args{update_type=replicated_changes};
{"new_edits", "true"} ->
Args#doc_query_args{update_type=interactive_edit};
+ {"att_gzip_length", "true"} ->
+ Options = [att_gzip_length | Args#doc_query_args.options],
+ Args#doc_query_args{options=Options};
_Else -> % unknown key value pair, ignore.
Args
end
diff --git a/src/couchdb/couch_rep_att.erl b/src/couchdb/couch_rep_att.erl
index 5f1b57e9..3527df00 100644
--- a/src/couchdb/couch_rep_att.erl
+++ b/src/couchdb/couch_rep_att.erl
@@ -79,11 +79,7 @@ receive_data(Ref, ReqId, ContentEncoding) ->
throw({attachment_request_failed, Err});
{ibrowse_async_response, ReqId, Data} ->
% ?LOG_DEBUG("got ~p bytes for ~p", [size(Data), ReqId]),
- if ContentEncoding =:= "gzip" ->
- zlib:gunzip(Data);
- true ->
- Data
- end;
+ Data;
{ibrowse_async_response_end, ReqId} ->
?LOG_ERROR("streaming att. ended but more data requested ~p", [ReqId]),
throw({attachment_request_failed, premature_end})
diff --git a/src/couchdb/couch_rep_reader.erl b/src/couchdb/couch_rep_reader.erl
index 4f03008f..bd807eab 100644
--- a/src/couchdb/couch_rep_reader.erl
+++ b/src/couchdb/couch_rep_reader.erl
@@ -227,7 +227,7 @@ update_sequence_lists(Seq, State) ->
open_doc_revs(#http_db{} = DbS, DocId, Revs) ->
%% all this logic just splits up revision lists that are too long for
%% MochiWeb into multiple requests
- BaseQS = [{revs,true}, {latest,true}],
+ BaseQS = [{revs,true}, {latest,true}, {att_gzip_length,true}],
BaseReq = DbS#http_db{resource=url_encode(DocId), qs=BaseQS},
BaseLength = length(couch_rep_httpc:full_url(BaseReq)) + 11, % &open_revs=
@@ -250,7 +250,10 @@ open_doc_revs(#http_db{} = DbS, DocId, Revs) ->
open_doc(#http_db{} = DbS, DocId) ->
% get latest rev of the doc
- Req = DbS#http_db{resource=url_encode(DocId)},
+ Req = DbS#http_db{
+ resource=url_encode(DocId),
+ qs=[{att_gzip_length, true}]
+ },
case couch_rep_httpc:request(Req) of
{[{<<"error">>,<<"not_found">>}, {<<"reason">>,<<"missing">>}]} ->
[];
diff --git a/src/couchdb/couch_rep_writer.erl b/src/couchdb/couch_rep_writer.erl
index 269b9799..cf01c576 100644
--- a/src/couchdb/couch_rep_writer.erl
+++ b/src/couchdb/couch_rep_writer.erl
@@ -51,8 +51,27 @@ writer_loop(Parent, Reader, Target) ->
writer_loop(Parent, Reader, Target)
end.
-write_docs(#http_db{headers = Headers} = Db, Docs) ->
- JsonDocs = [couch_doc:to_json_obj(Doc, [revs,attachments]) || Doc <- Docs],
+write_docs(#http_db{} = Db, Docs) ->
+ {DocsAtts, DocsNoAtts} = lists:partition(
+ fun(#doc{atts=[]}) -> false; (_) -> true end,
+ Docs
+ ),
+ ErrorsJson0 = write_bulk_docs(Db, DocsNoAtts),
+ ErrorsJson = lists:foldl(
+ fun(Doc, Acc) -> write_multi_part_doc(Db, Doc) ++ Acc end,
+ ErrorsJson0,
+ DocsAtts
+ ),
+ {ok, ErrorsJson};
+write_docs(Db, Docs) ->
+ couch_db:update_docs(Db, Docs, [delay_commit], replicated_changes).
+
+write_bulk_docs(_Db, []) ->
+ [];
+write_bulk_docs(#http_db{headers = Headers} = Db, Docs) ->
+ JsonDocs = [
+ couch_doc:to_json_obj(Doc, [revs, att_gzip_length]) || Doc <- Docs
+ ],
Request = Db#http_db{
resource = "_bulk_docs",
method = post,
@@ -65,10 +84,61 @@ write_docs(#http_db{headers = Headers} = Db, Docs) ->
List when is_list(List) ->
List
end,
- ErrorsList = [write_docs_1(V) || V <- ErrorsJson],
- {ok, ErrorsList};
-write_docs(Db, Docs) ->
- couch_db:update_docs(Db, Docs, [delay_commit], replicated_changes).
+ [write_docs_1(V) || V <- ErrorsJson].
+
+write_multi_part_doc(#http_db{headers=Headers} = Db, #doc{atts=Atts} = Doc) ->
+ JsonBytes = ?JSON_ENCODE(
+ couch_doc:to_json_obj(
+ Doc,
+ [follows, att_gzip_length, {atts_after_revpos, 0}]
+ )
+ ),
+ Boundary = couch_uuids:random(),
+ Len = couch_doc:len_doc_to_multi_part_stream(
+ Boundary, JsonBytes, Atts, 0, true
+ ),
+ {ok, DataQueue} = couch_work_queue:new(1024*1024, 1000),
+ _StreamerPid = spawn_link(
+ fun() ->
+ couch_doc:doc_to_multi_part_stream(
+ Boundary,
+ JsonBytes,
+ Atts,
+ 0,
+ fun(Data) -> couch_work_queue:queue(DataQueue, Data) end,
+ true
+ ),
+ couch_work_queue:close(DataQueue)
+ end
+ ),
+ BodyFun = fun(Acc) ->
+ case couch_work_queue:dequeue(DataQueue) of
+ closed ->
+ eof;
+ {ok, Data} ->
+ {ok, iolist_to_binary(lists:reverse(Data)), Acc}
+ end
+ end,
+ Request = Db#http_db{
+ resource = couch_util:url_encode(Doc#doc.id),
+ method = put,
+ qs = [{new_edits, false}],
+ body = {BodyFun, ok},
+ headers = [
+ {"x-couch-full-commit", "false"},
+ {"Content-Type",
+ "multipart/related; boundary=\"" ++ ?b2l(Boundary) ++ "\""},
+ {"Content-Length", Len} | Headers
+ ]
+ },
+ case couch_rep_httpc:request(Request) of
+ {[{<<"error">>, Error}, {<<"reason">>, Reason}]} ->
+ {Pos, [RevId | _]} = Doc#doc.revs,
+ ErrId = couch_util:to_existing_atom(Error),
+ [{Doc#doc.id, couch_doc:rev_to_str({Pos, RevId})}, {ErrId, Reason}];
+ _ ->
+ []
+ end.
write_docs_1({Props}) ->
Id = proplists:get_value(<<"id">>, Props),
diff --git a/test/etap/113-replication-attachment-comp.t b/test/etap/113-replication-attachment-comp.t
new file mode 100644
index 00000000..d9039694
--- /dev/null
+++ b/test/etap/113-replication-attachment-comp.t
@@ -0,0 +1,264 @@
+#!/usr/bin/env escript
+%% -*- erlang -*-
+
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-record(user_ctx, {
+ name = null,
+ roles = [],
+ handler
+}).
+
+default_config() ->
+ test_util:build_file("etc/couchdb/default_dev.ini").
+
+test_db_a_name() ->
+ <<"couch_test_rep_att_comp_a">>.
+
+test_db_b_name() ->
+ <<"couch_test_rep_att_comp_b">>.
+
+main(_) ->
+ test_util:init_code_path(),
+ etap:plan(28),
+ case (catch test()) of
+ ok ->
+ etap:end_tests();
+ Other ->
+ etap:diag(io_lib:format("Test died abnormally: ~p", [Other])),
+ etap:bail(Other)
+ end,
+ ok.
+
+test() ->
+ couch_server_sup:start_link([default_config()]),
+ put(addr, couch_config:get("httpd", "bind_address", "127.0.0.1")),
+ put(port, couch_config:get("httpd", "port", "5984")),
+ application:start(inets),
+ ibrowse:start(),
+ timer:sleep(1000),
+
+ %
+ % test pull replication
+ %
+
+ delete_db(test_db_a_name()),
+ delete_db(test_db_b_name()),
+ create_db(test_db_a_name()),
+ create_db(test_db_b_name()),
+
+ % enable compression
+ couch_config:set("attachments", "compression_level", "8"),
+ couch_config:set("attachments", "compressible_types", "text/*"),
+
+ % store doc with text attachment in DB A
+ put_text_att(test_db_a_name()),
+
+ % disable attachment compression
+ couch_config:set("attachments", "compression_level", "0"),
+
+ % do pull replication
+ do_pull_replication(test_db_a_name(), test_db_b_name()),
+
+ % verify that DB B has the attachment stored in compressed form
+ check_att_is_compressed(test_db_b_name()),
+ check_server_can_decompress_att(test_db_b_name()),
+ check_att_stubs(test_db_a_name(), test_db_b_name()),
+
+ %
+ % test push replication
+ %
+
+ delete_db(test_db_a_name()),
+ delete_db(test_db_b_name()),
+ create_db(test_db_a_name()),
+ create_db(test_db_b_name()),
+
+ % enable compression
+ couch_config:set("attachments", "compression_level", "8"),
+ couch_config:set("attachments", "compressible_types", "text/*"),
+
+ % store doc with text attachment in DB A
+ put_text_att(test_db_a_name()),
+
+ % disable attachment compression
+ couch_config:set("attachments", "compression_level", "0"),
+
+ % do push replication
+ do_push_replication(test_db_a_name(), test_db_b_name()),
+
+ % verify that DB B has the attachment stored in compressed form
+ check_att_is_compressed(test_db_b_name()),
+ check_server_can_decompress_att(test_db_b_name()),
+ check_att_stubs(test_db_a_name(), test_db_b_name()),
+
+ timer:sleep(3000), % to avoid mochiweb socket closed exceptions
+ delete_db(test_db_a_name()),
+ delete_db(test_db_b_name()),
+ couch_server_sup:stop(),
+ ok.
+
+put_text_att(DbName) ->
+ {ok, {{_, Code, _}, _Headers, _Body}} = http:request(
+ put,
+ {db_url(DbName) ++ "/testdoc1/readme.txt", [],
+ "text/plain", test_text_data()},
+ [],
+ [{sync, true}]),
+ etap:is(Code, 201, "Created text attachment"),
+ ok.
+
+do_pull_replication(SourceDbName, TargetDbName) ->
+ RepObj = {[
+ {<<"source">>, list_to_binary(db_url(SourceDbName))},
+ {<<"target">>, TargetDbName}
+ ]},
+ {ok, {{_, Code, _}, _Headers, Body}} = http:request(
+ post,
+ {rep_url(), [],
+ "application/json", list_to_binary(couch_util:json_encode(RepObj))},
+ [],
+ [{sync, true}]),
+ etap:is(Code, 200, "Pull replication successfully triggered"),
+ Json = couch_util:json_decode(Body),
+ RepOk = couch_util:get_nested_json_value(Json, [<<"ok">>]),
+ etap:is(RepOk, true, "Pull replication completed with success"),
+ ok.
+
+do_push_replication(SourceDbName, TargetDbName) ->
+ RepObj = {[
+ {<<"source">>, SourceDbName},
+ {<<"target">>, list_to_binary(db_url(TargetDbName))}
+ ]},
+ {ok, {{_, Code, _}, _Headers, Body}} = http:request(
+ post,
+ {rep_url(), [],
+ "application/json", list_to_binary(couch_util:json_encode(RepObj))},
+ [],
+ [{sync, true}]),
+ etap:is(Code, 200, "Push replication successfully triggered"),
+ Json = couch_util:json_decode(Body),
+ RepOk = couch_util:get_nested_json_value(Json, [<<"ok">>]),
+ etap:is(RepOk, true, "Push replication completed with success"),
+ ok.
+
+check_att_is_compressed(DbName) ->
+ {ok, {{_, Code, _}, Headers, Body}} = http:request(
+ get,
+ {db_url(DbName) ++ "/testdoc1/readme.txt",
+ [{"Accept-Encoding", "gzip"}]},
+ [],
+ [{sync, true}]),
+ etap:is(Code, 200, "HTTP response code for the attachment request is 200"),
+ Gziped = lists:member({"content-encoding", "gzip"}, Headers),
+ etap:is(Gziped, true, "The attachment was received in compressed form"),
+ Uncompressed = binary_to_list(zlib:gunzip(list_to_binary(Body))),
+ etap:is(
+ Uncompressed,
+ test_text_data(),
+ "The attachment content is valid after decompression at the client side"
+ ),
+ ok.
+
+check_server_can_decompress_att(DbName) ->
+ {ok, {{_, Code, _}, Headers, Body}} = http:request(
+ get,
+ {db_url(DbName) ++ "/testdoc1/readme.txt", []},
+ [],
+ [{sync, true}]),
+ etap:is(Code, 200, "HTTP response code for the attachment request is 200"),
+ Gziped = lists:member({"content-encoding", "gzip"}, Headers),
+ etap:is(
+ Gziped, false, "The attachment was not received in compressed form"
+ ),
+ etap:is(
+ Body,
+ test_text_data(),
+ "The attachment content is valid after server decompression"
+ ),
+ ok.
+
+check_att_stubs(SourceDbName, TargetDbName) ->
+ {ok, {{_, Code1, _}, _Headers1, Body1}} = http:request(
+ get,
+ {db_url(SourceDbName) ++ "/testdoc1?att_gzip_length=true", []},
+ [],
+ [{sync, true}]),
+ etap:is(
+ Code1,
+ 200,
+ "HTTP response code is 200 for the source DB doc request"
+ ),
+ Json1 = couch_util:json_decode(Body1),
+ SourceAttStub = couch_util:get_nested_json_value(
+ Json1,
+ [<<"_attachments">>, <<"readme.txt">>]
+ ),
+ {ok, {{_, Code2, _}, _Headers2, Body2}} = http:request(
+ get,
+ {db_url(TargetDbName) ++ "/testdoc1?att_gzip_length=true", []},
+ [],
+ [{sync, true}]),
+ etap:is(
+ Code2,
+ 200,
+ "HTTP response code is 200 for the target DB doc request"
+ ),
+ Json2 = couch_util:json_decode(Body2),
+ TargetAttStub = couch_util:get_nested_json_value(
+ Json2,
+ [<<"_attachments">>, <<"readme.txt">>]
+ ),
+ IdenticalStubs = (SourceAttStub =:= TargetAttStub),
+ etap:is(IdenticalStubs, true, "Attachment stubs are identical"),
+ TargetAttStubLength = couch_util:get_nested_json_value(
+ TargetAttStub,
+ [<<"length">>]
+ ),
+ TargetAttStubGzipLength = couch_util:get_nested_json_value(
+ TargetAttStub,
+ [<<"gzip_length">>]
+ ),
+ GzipLengthDefined = is_integer(TargetAttStubGzipLength),
+ etap:is(
+ GzipLengthDefined,
+ true,
+ "Stubs have the gzip_length field properly defined"
+ ),
+ GzipLengthSmaller = (TargetAttStubGzipLength < TargetAttStubLength),
+ etap:is(
+ GzipLengthSmaller,
+ true,
+ "Stubs have the gzip_length field smaller than their length field"
+ ),
+ ok.
+
+admin_user_ctx() ->
+ {user_ctx, #user_ctx{roles=[<<"_admin">>]}}.
+
+create_db(DbName) ->
+ {ok, _} = couch_db:create(DbName, [admin_user_ctx()]).
+
+delete_db(DbName) ->
+ couch_server:delete(DbName, [admin_user_ctx()]).
+
+db_url(DbName) ->
+ "http://" ++ get(addr) ++ ":" ++ get(port) ++ "/" ++
+ binary_to_list(DbName).
+
+rep_url() ->
+ "http://" ++ get(addr) ++ ":" ++ get(port) ++ "/_replicate".
+
+test_text_data() ->
+ {ok, Data} = file:read_file(test_util:source_file("README")),
+ binary_to_list(Data).
diff --git a/test/etap/Makefile.am b/test/etap/Makefile.am
index 6bbf45ba..bdab95aa 100644
--- a/test/etap/Makefile.am
+++ b/test/etap/Makefile.am
@@ -58,6 +58,7 @@ EXTRA_DIST = \
110-replication-httpc.t \
111-replication-changes-feed.t \
112-replication-missing-revs.t \
+ 113-replication-attachment-comp.t \
120-stats-collect.t \
121-stats-aggregates.cfg \
121-stats-aggregates.ini \