From 64481d0117baba9fce06384addff168912c83546 Mon Sep 17 00:00:00 2001 From: Adam Kocoloski Date: Fri, 5 Mar 2010 16:27:00 +0000 Subject: efficient attachment replication. Patch by Filipe Manana. Closes COUCHDB-639 git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@919469 13f79535-47bb-0310-9956-ffa450edef68 --- src/couchdb/couch_db.erl | 23 ++- src/couchdb/couch_doc.erl | 91 ++++++---- src/couchdb/couch_httpd_db.erl | 7 +- src/couchdb/couch_rep_att.erl | 6 +- src/couchdb/couch_rep_reader.erl | 7 +- src/couchdb/couch_rep_writer.erl | 82 ++++++++- test/etap/113-replication-attachment-comp.t | 264 ++++++++++++++++++++++++++++ test/etap/Makefile.am | 1 + 8 files changed, 430 insertions(+), 51 deletions(-) create mode 100644 test/etap/113-replication-attachment-comp.t diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl index 4fca1346..e3c7eaf9 100644 --- a/src/couchdb/couch_db.erl +++ b/src/couchdb/couch_db.erl @@ -340,7 +340,11 @@ update_doc(Db, Doc, Options, UpdateType) -> {ok, [{ok, NewRev}]} -> {ok, NewRev}; {ok, [Error]} -> - throw(Error) + throw(Error); + {ok, []} -> + % replication success + {Pos, [RevId | _]} = Doc#doc.revs, + {ok, {Pos, RevId}} end. update_docs(Db, Docs) -> @@ -814,8 +818,9 @@ flush_att(Fd, #att{data=Fun,att_len=AttLen}=Att) when is_function(Fun) -> % is present in the request, but there is no Content-MD5 % trailer, we're free to ignore this inconsistency and % pretend that no Content-MD5 exists. -with_stream(Fd, #att{md5=InMd5,type=Type}=Att, Fun) -> - {ok, OutputStream} = case couch_util:compressible_att_type(Type) of +with_stream(Fd, #att{md5=InMd5,type=Type,comp=AlreadyComp}=Att, Fun) -> + {ok, OutputStream} = case (not AlreadyComp) andalso + couch_util:compressible_att_type(Type) of true -> CompLevel = list_to_integer( couch_config:get("attachments", "compression_level", "0") @@ -836,12 +841,18 @@ with_stream(Fd, #att{md5=InMd5,type=Type}=Att, Fun) -> {StreamInfo, Len, IdentityLen, Md5, IdentityMd5} = couch_stream:close(OutputStream), check_md5(IdentityMd5, ReqMd5), + {AttLen, DiskLen} = case AlreadyComp of + true -> + {Att#att.att_len, Att#att.disk_len}; + _ -> + {Len, IdentityLen} + end, Att#att{ data={Fd,StreamInfo}, - att_len=Len, - disk_len=IdentityLen, + att_len=AttLen, + disk_len=DiskLen, md5=Md5, - comp=(IdentityMd5 =/= Md5) + comp=(AlreadyComp orelse (IdentityMd5 =/= Md5)) }. diff --git a/src/couchdb/couch_doc.erl b/src/couchdb/couch_doc.erl index f5c47bf7..71d21dda 100644 --- a/src/couchdb/couch_doc.erl +++ b/src/couchdb/couch_doc.erl @@ -17,7 +17,7 @@ -export([from_json_obj/1,to_json_obj/2,has_stubs/1, merge_stubs/2]). -export([validate_docid/1]). -export([doc_from_multi_part_stream/2]). --export([doc_to_multi_part_stream/5, len_doc_to_multi_part_stream/4]). +-export([doc_to_multi_part_stream/6, len_doc_to_multi_part_stream/5]). -include("couch_db.hrl"). @@ -73,21 +73,26 @@ to_json_meta(Meta) -> end, Meta). to_json_attachments(Attachments, Options) -> - case lists:member(attachments, Options) of + RevPos = case lists:member(attachments, Options) of true -> % return all the binaries - to_json_attachments(Attachments, 0, lists:member(follows, Options)); + 0; false -> % note the default is [], because this sorts higher than all numbers. % and will return all the binaries. - RevPos = proplists:get_value(atts_after_revpos, Options, []), - to_json_attachments(Attachments, RevPos, lists:member(follows, Options)) - end. + proplists:get_value(atts_after_revpos, Options, []) + end, + to_json_attachments( + Attachments, + RevPos, + lists:member(follows, Options), + lists:member(att_gzip_length, Options) + ). -to_json_attachments([], _RevPosIncludeAfter, _DataToFollow) -> +to_json_attachments([], _RevPosIncludeAfter, _DataToFollow, _ShowGzipLen) -> []; -to_json_attachments(Atts, RevPosIncludeAfter, DataToFollow) -> +to_json_attachments(Atts, RevPosIncludeAfter, DataToFollow, ShowGzipLen) -> AttProps = lists:map( - fun(#att{disk_len=DiskLen}=Att) -> + fun(#att{disk_len=DiskLen, att_len=AttLen, comp=Comp}=Att) -> {Att#att.name, {[ {<<"content_type">>, Att#att.type}, {<<"revpos">>, Att#att.revpos} @@ -96,7 +101,7 @@ to_json_attachments(Atts, RevPosIncludeAfter, DataToFollow) -> if DataToFollow -> [{<<"length">>, DiskLen}, {<<"follows">>, true}]; true -> - AttData = case Att#att.comp of + AttData = case Comp of true -> zlib:gunzip(att_to_bin(Att)); _ -> @@ -106,7 +111,13 @@ to_json_attachments(Atts, RevPosIncludeAfter, DataToFollow) -> end; true -> [{<<"length">>, DiskLen}, {<<"stub">>, true}] - end + end ++ + case {ShowGzipLen, Comp} of + {true, true} -> + [{<<"gzip_length">>, AttLen}]; + _ -> + [] + end }} end, Atts), [{<<"_attachments">>, {AttProps}}]. @@ -190,19 +201,19 @@ transfer_fields([{<<"_attachments">>, {JsonBins}} | Rest], Doc) -> case proplists:get_value(<<"stub">>, BinProps) of true -> Type = proplists:get_value(<<"content_type">>, BinProps), - Length = proplists:get_value(<<"length">>, BinProps), RevPos = proplists:get_value(<<"revpos">>, BinProps, 0), - #att{name=Name, data=stub, type=Type, att_len=Length, - disk_len=Length, revpos=RevPos}; + {AttLen, DiskLen, Comp} = att_lengths(BinProps), + #att{name=Name, data=stub, type=Type, att_len=AttLen, + disk_len=DiskLen, comp=Comp, revpos=RevPos}; _ -> Type = proplists:get_value(<<"content_type">>, BinProps, ?DEFAULT_ATTACHMENT_CONTENT_TYPE), RevPos = proplists:get_value(<<"revpos">>, BinProps, 0), case proplists:get_value(<<"follows">>, BinProps) of true -> - Len = proplists:get_value(<<"length">>, BinProps), - #att{name=Name, data=follows, type=Type, - att_len=Len, disk_len=Len, revpos=RevPos}; + {AttLen, DiskLen, Comp} = att_lengths(BinProps), + #att{name=Name, data=follows, type=Type, comp=Comp, + att_len=AttLen, disk_len=DiskLen, revpos=RevPos}; _ -> Value = proplists:get_value(<<"data">>, BinProps), Bin = base64:decode(Value), @@ -250,6 +261,16 @@ transfer_fields([{<<"_",Name/binary>>, _} | _], _) -> transfer_fields([Field | Rest], #doc{body=Fields}=Doc) -> transfer_fields(Rest, Doc#doc{body=[Field|Fields]}). +att_lengths(BinProps) -> + DiskLen = proplists:get_value(<<"length">>, BinProps), + GzipLen = proplists:get_value(<<"gzip_length">>, BinProps), + case GzipLen of + undefined -> + {DiskLen, DiskLen, false}; + _ -> + {GzipLen, DiskLen, true} + end. + to_doc_info(FullDocInfo) -> {DocInfo, _Path} = to_doc_info_path(FullDocInfo), DocInfo. @@ -355,18 +376,24 @@ fold_streamed_data(RcvFun, LenLeft, Fun, Acc) when LenLeft > 0-> ResultAcc = Fun(Bin, Acc), fold_streamed_data(RcvFun, LenLeft - size(Bin), Fun, ResultAcc). -len_doc_to_multi_part_stream(Boundary,JsonBytes,Atts,AttsSinceRevPos) -> +len_doc_to_multi_part_stream(Boundary, JsonBytes, Atts, AttsSinceRevPos, + SendGzipAtts) -> 2 + % "--" size(Boundary) + 36 + % "\r\ncontent-type: application/json\r\n\r\n" iolist_size(JsonBytes) + 4 + % "\r\n--" size(Boundary) + - + lists:foldl(fun(#att{revpos=RevPos,disk_len=DiskLen}, AccAttsSize) -> + + lists:foldl(fun(#att{revpos=RevPos} = Att, AccAttsSize) -> if RevPos > AttsSinceRevPos -> AccAttsSize + 4 + % "\r\n\r\n" - DiskLen + + case SendGzipAtts of + true -> + Att#att.att_len; + _ -> + Att#att.disk_len + end + 4 + % "\r\n--" size(Boundary); true -> @@ -374,29 +401,33 @@ len_doc_to_multi_part_stream(Boundary,JsonBytes,Atts,AttsSinceRevPos) -> end end, 0, Atts) + 2. % "--" - -doc_to_multi_part_stream(Boundary,JsonBytes,Atts,AttsSinceRevPos,WriteFun) -> + +doc_to_multi_part_stream(Boundary, JsonBytes, Atts, AttsSinceRevPos, WriteFun, + SendGzipAtts) -> WriteFun([<<"--", Boundary/binary, "\r\ncontent-type: application/json\r\n\r\n">>, JsonBytes, <<"\r\n--", Boundary/binary>>]), - atts_to_mp(Atts, Boundary, WriteFun, AttsSinceRevPos). + atts_to_mp(Atts, Boundary, WriteFun, AttsSinceRevPos, SendGzipAtts). -atts_to_mp([], _Boundary, WriteFun, _AttsSinceRevPos) -> +atts_to_mp([], _Boundary, WriteFun, _AttsSinceRevPos, _SendGzipAtts) -> WriteFun(<<"--">>); atts_to_mp([#att{revpos=RevPos} = Att | RestAtts], Boundary, WriteFun, - AttsSinceRevPos) when RevPos > AttsSinceRevPos -> + AttsSinceRevPos, SendGzipAtts) when RevPos > AttsSinceRevPos -> WriteFun(<<"\r\n\r\n">>), - AttFun = case Att#att.comp of - true -> + AttFun = case {Att#att.comp, SendGzipAtts} of + {true, false} -> fun att_foldl_unzip/3; _ -> + % receiver knows that the attachment is compressed by checking that the + % "gzip_length" field is present in the corresponding JSON attachment + % object found within the JSON doc fun att_foldl/3 end, AttFun(Att, fun(Data, ok) -> WriteFun(Data) end, ok), WriteFun(<<"\r\n--", Boundary/binary>>), - atts_to_mp(RestAtts, Boundary, WriteFun, AttsSinceRevPos); -atts_to_mp([_ | RestAtts], Boundary, WriteFun, AttsSinceRevPos) -> - atts_to_mp(RestAtts, Boundary, WriteFun, AttsSinceRevPos). + atts_to_mp(RestAtts, Boundary, WriteFun, AttsSinceRevPos, SendGzipAtts); +atts_to_mp([_ | RestAtts], Boundary, WriteFun, AttsSinceRevPos, SendGzipAtts) -> + atts_to_mp(RestAtts, Boundary, WriteFun, AttsSinceRevPos, SendGzipAtts). doc_from_multi_part_stream(ContentType, DataFun) -> diff --git a/src/couchdb/couch_httpd_db.erl b/src/couchdb/couch_httpd_db.erl index 1028e857..d7fe3310 100644 --- a/src/couchdb/couch_httpd_db.erl +++ b/src/couchdb/couch_httpd_db.erl @@ -742,13 +742,13 @@ send_doc_efficiently(Req, #doc{atts=Atts}=Doc, Headers, Options) -> JsonBytes = ?JSON_ENCODE(couch_doc:to_json_obj(Doc, [follows|Options])), AttsSinceRevPos = proplists:get_value(atts_after_revpos, Options, 0), Len = couch_doc:len_doc_to_multi_part_stream(Boundary,JsonBytes,Atts, - AttsSinceRevPos), + AttsSinceRevPos,false), CType = {<<"Content-Type">>, <<"multipart/related; boundary=\"", Boundary/binary, "\"">>}, {ok, Resp} = start_response_length(Req, 200, [CType|Headers], Len), couch_doc:doc_to_multi_part_stream(Boundary,JsonBytes,Atts, AttsSinceRevPos, - fun(Data) -> couch_httpd:send(Resp, Data) end) + fun(Data) -> couch_httpd:send(Resp, Data) end, false) end; false -> send_json(Req, 200, [], couch_doc:to_json_obj(Doc, Options)) @@ -1045,6 +1045,9 @@ parse_doc_query(Req) -> Args#doc_query_args{update_type=replicated_changes}; {"new_edits", "true"} -> Args#doc_query_args{update_type=interactive_edit}; + {"att_gzip_length", "true"} -> + Options = [att_gzip_length | Args#doc_query_args.options], + Args#doc_query_args{options=Options}; _Else -> % unknown key value pair, ignore. Args end diff --git a/src/couchdb/couch_rep_att.erl b/src/couchdb/couch_rep_att.erl index 5f1b57e9..3527df00 100644 --- a/src/couchdb/couch_rep_att.erl +++ b/src/couchdb/couch_rep_att.erl @@ -79,11 +79,7 @@ receive_data(Ref, ReqId, ContentEncoding) -> throw({attachment_request_failed, Err}); {ibrowse_async_response, ReqId, Data} -> % ?LOG_DEBUG("got ~p bytes for ~p", [size(Data), ReqId]), - if ContentEncoding =:= "gzip" -> - zlib:gunzip(Data); - true -> - Data - end; + Data; {ibrowse_async_response_end, ReqId} -> ?LOG_ERROR("streaming att. ended but more data requested ~p", [ReqId]), throw({attachment_request_failed, premature_end}) diff --git a/src/couchdb/couch_rep_reader.erl b/src/couchdb/couch_rep_reader.erl index 4f03008f..bd807eab 100644 --- a/src/couchdb/couch_rep_reader.erl +++ b/src/couchdb/couch_rep_reader.erl @@ -227,7 +227,7 @@ update_sequence_lists(Seq, State) -> open_doc_revs(#http_db{} = DbS, DocId, Revs) -> %% all this logic just splits up revision lists that are too long for %% MochiWeb into multiple requests - BaseQS = [{revs,true}, {latest,true}], + BaseQS = [{revs,true}, {latest,true}, {att_gzip_length,true}], BaseReq = DbS#http_db{resource=url_encode(DocId), qs=BaseQS}, BaseLength = length(couch_rep_httpc:full_url(BaseReq)) + 11, % &open_revs= @@ -250,7 +250,10 @@ open_doc_revs(#http_db{} = DbS, DocId, Revs) -> open_doc(#http_db{} = DbS, DocId) -> % get latest rev of the doc - Req = DbS#http_db{resource=url_encode(DocId)}, + Req = DbS#http_db{ + resource=url_encode(DocId), + qs=[{att_gzip_length, true}] + }, case couch_rep_httpc:request(Req) of {[{<<"error">>,<<"not_found">>}, {<<"reason">>,<<"missing">>}]} -> []; diff --git a/src/couchdb/couch_rep_writer.erl b/src/couchdb/couch_rep_writer.erl index 269b9799..cf01c576 100644 --- a/src/couchdb/couch_rep_writer.erl +++ b/src/couchdb/couch_rep_writer.erl @@ -51,8 +51,27 @@ writer_loop(Parent, Reader, Target) -> writer_loop(Parent, Reader, Target) end. -write_docs(#http_db{headers = Headers} = Db, Docs) -> - JsonDocs = [couch_doc:to_json_obj(Doc, [revs,attachments]) || Doc <- Docs], +write_docs(#http_db{} = Db, Docs) -> + {DocsAtts, DocsNoAtts} = lists:partition( + fun(#doc{atts=[]}) -> false; (_) -> true end, + Docs + ), + ErrorsJson0 = write_bulk_docs(Db, DocsNoAtts), + ErrorsJson = lists:foldl( + fun(Doc, Acc) -> write_multi_part_doc(Db, Doc) ++ Acc end, + ErrorsJson0, + DocsAtts + ), + {ok, ErrorsJson}; +write_docs(Db, Docs) -> + couch_db:update_docs(Db, Docs, [delay_commit], replicated_changes). + +write_bulk_docs(_Db, []) -> + []; +write_bulk_docs(#http_db{headers = Headers} = Db, Docs) -> + JsonDocs = [ + couch_doc:to_json_obj(Doc, [revs, att_gzip_length]) || Doc <- Docs + ], Request = Db#http_db{ resource = "_bulk_docs", method = post, @@ -65,10 +84,61 @@ write_docs(#http_db{headers = Headers} = Db, Docs) -> List when is_list(List) -> List end, - ErrorsList = [write_docs_1(V) || V <- ErrorsJson], - {ok, ErrorsList}; -write_docs(Db, Docs) -> - couch_db:update_docs(Db, Docs, [delay_commit], replicated_changes). + [write_docs_1(V) || V <- ErrorsJson]. + +write_multi_part_doc(#http_db{headers=Headers} = Db, #doc{atts=Atts} = Doc) -> + JsonBytes = ?JSON_ENCODE( + couch_doc:to_json_obj( + Doc, + [follows, att_gzip_length, {atts_after_revpos, 0}] + ) + ), + Boundary = couch_uuids:random(), + Len = couch_doc:len_doc_to_multi_part_stream( + Boundary, JsonBytes, Atts, 0, true + ), + {ok, DataQueue} = couch_work_queue:new(1024*1024, 1000), + _StreamerPid = spawn_link( + fun() -> + couch_doc:doc_to_multi_part_stream( + Boundary, + JsonBytes, + Atts, + 0, + fun(Data) -> couch_work_queue:queue(DataQueue, Data) end, + true + ), + couch_work_queue:close(DataQueue) + end + ), + BodyFun = fun(Acc) -> + case couch_work_queue:dequeue(DataQueue) of + closed -> + eof; + {ok, Data} -> + {ok, iolist_to_binary(lists:reverse(Data)), Acc} + end + end, + Request = Db#http_db{ + resource = couch_util:url_encode(Doc#doc.id), + method = put, + qs = [{new_edits, false}], + body = {BodyFun, ok}, + headers = [ + {"x-couch-full-commit", "false"}, + {"Content-Type", + "multipart/related; boundary=\"" ++ ?b2l(Boundary) ++ "\""}, + {"Content-Length", Len} | Headers + ] + }, + case couch_rep_httpc:request(Request) of + {[{<<"error">>, Error}, {<<"reason">>, Reason}]} -> + {Pos, [RevId | _]} = Doc#doc.revs, + ErrId = couch_util:to_existing_atom(Error), + [{Doc#doc.id, couch_doc:rev_to_str({Pos, RevId})}, {ErrId, Reason}]; + _ -> + [] + end. write_docs_1({Props}) -> Id = proplists:get_value(<<"id">>, Props), diff --git a/test/etap/113-replication-attachment-comp.t b/test/etap/113-replication-attachment-comp.t new file mode 100644 index 00000000..d9039694 --- /dev/null +++ b/test/etap/113-replication-attachment-comp.t @@ -0,0 +1,264 @@ +#!/usr/bin/env escript +%% -*- erlang -*- + +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-record(user_ctx, { + name = null, + roles = [], + handler +}). + +default_config() -> + test_util:build_file("etc/couchdb/default_dev.ini"). + +test_db_a_name() -> + <<"couch_test_rep_att_comp_a">>. + +test_db_b_name() -> + <<"couch_test_rep_att_comp_b">>. + +main(_) -> + test_util:init_code_path(), + etap:plan(28), + case (catch test()) of + ok -> + etap:end_tests(); + Other -> + etap:diag(io_lib:format("Test died abnormally: ~p", [Other])), + etap:bail(Other) + end, + ok. + +test() -> + couch_server_sup:start_link([default_config()]), + put(addr, couch_config:get("httpd", "bind_address", "127.0.0.1")), + put(port, couch_config:get("httpd", "port", "5984")), + application:start(inets), + ibrowse:start(), + timer:sleep(1000), + + % + % test pull replication + % + + delete_db(test_db_a_name()), + delete_db(test_db_b_name()), + create_db(test_db_a_name()), + create_db(test_db_b_name()), + + % enable compression + couch_config:set("attachments", "compression_level", "8"), + couch_config:set("attachments", "compressible_types", "text/*"), + + % store doc with text attachment in DB A + put_text_att(test_db_a_name()), + + % disable attachment compression + couch_config:set("attachments", "compression_level", "0"), + + % do pull replication + do_pull_replication(test_db_a_name(), test_db_b_name()), + + % verify that DB B has the attachment stored in compressed form + check_att_is_compressed(test_db_b_name()), + check_server_can_decompress_att(test_db_b_name()), + check_att_stubs(test_db_a_name(), test_db_b_name()), + + % + % test push replication + % + + delete_db(test_db_a_name()), + delete_db(test_db_b_name()), + create_db(test_db_a_name()), + create_db(test_db_b_name()), + + % enable compression + couch_config:set("attachments", "compression_level", "8"), + couch_config:set("attachments", "compressible_types", "text/*"), + + % store doc with text attachment in DB A + put_text_att(test_db_a_name()), + + % disable attachment compression + couch_config:set("attachments", "compression_level", "0"), + + % do push replication + do_push_replication(test_db_a_name(), test_db_b_name()), + + % verify that DB B has the attachment stored in compressed form + check_att_is_compressed(test_db_b_name()), + check_server_can_decompress_att(test_db_b_name()), + check_att_stubs(test_db_a_name(), test_db_b_name()), + + timer:sleep(3000), % to avoid mochiweb socket closed exceptions + delete_db(test_db_a_name()), + delete_db(test_db_b_name()), + couch_server_sup:stop(), + ok. + +put_text_att(DbName) -> + {ok, {{_, Code, _}, _Headers, _Body}} = http:request( + put, + {db_url(DbName) ++ "/testdoc1/readme.txt", [], + "text/plain", test_text_data()}, + [], + [{sync, true}]), + etap:is(Code, 201, "Created text attachment"), + ok. + +do_pull_replication(SourceDbName, TargetDbName) -> + RepObj = {[ + {<<"source">>, list_to_binary(db_url(SourceDbName))}, + {<<"target">>, TargetDbName} + ]}, + {ok, {{_, Code, _}, _Headers, Body}} = http:request( + post, + {rep_url(), [], + "application/json", list_to_binary(couch_util:json_encode(RepObj))}, + [], + [{sync, true}]), + etap:is(Code, 200, "Pull replication successfully triggered"), + Json = couch_util:json_decode(Body), + RepOk = couch_util:get_nested_json_value(Json, [<<"ok">>]), + etap:is(RepOk, true, "Pull replication completed with success"), + ok. + +do_push_replication(SourceDbName, TargetDbName) -> + RepObj = {[ + {<<"source">>, SourceDbName}, + {<<"target">>, list_to_binary(db_url(TargetDbName))} + ]}, + {ok, {{_, Code, _}, _Headers, Body}} = http:request( + post, + {rep_url(), [], + "application/json", list_to_binary(couch_util:json_encode(RepObj))}, + [], + [{sync, true}]), + etap:is(Code, 200, "Push replication successfully triggered"), + Json = couch_util:json_decode(Body), + RepOk = couch_util:get_nested_json_value(Json, [<<"ok">>]), + etap:is(RepOk, true, "Push replication completed with success"), + ok. + +check_att_is_compressed(DbName) -> + {ok, {{_, Code, _}, Headers, Body}} = http:request( + get, + {db_url(DbName) ++ "/testdoc1/readme.txt", + [{"Accept-Encoding", "gzip"}]}, + [], + [{sync, true}]), + etap:is(Code, 200, "HTTP response code for the attachment request is 200"), + Gziped = lists:member({"content-encoding", "gzip"}, Headers), + etap:is(Gziped, true, "The attachment was received in compressed form"), + Uncompressed = binary_to_list(zlib:gunzip(list_to_binary(Body))), + etap:is( + Uncompressed, + test_text_data(), + "The attachment content is valid after decompression at the client side" + ), + ok. + +check_server_can_decompress_att(DbName) -> + {ok, {{_, Code, _}, Headers, Body}} = http:request( + get, + {db_url(DbName) ++ "/testdoc1/readme.txt", []}, + [], + [{sync, true}]), + etap:is(Code, 200, "HTTP response code for the attachment request is 200"), + Gziped = lists:member({"content-encoding", "gzip"}, Headers), + etap:is( + Gziped, false, "The attachment was not received in compressed form" + ), + etap:is( + Body, + test_text_data(), + "The attachment content is valid after server decompression" + ), + ok. + +check_att_stubs(SourceDbName, TargetDbName) -> + {ok, {{_, Code1, _}, _Headers1, Body1}} = http:request( + get, + {db_url(SourceDbName) ++ "/testdoc1?att_gzip_length=true", []}, + [], + [{sync, true}]), + etap:is( + Code1, + 200, + "HTTP response code is 200 for the source DB doc request" + ), + Json1 = couch_util:json_decode(Body1), + SourceAttStub = couch_util:get_nested_json_value( + Json1, + [<<"_attachments">>, <<"readme.txt">>] + ), + {ok, {{_, Code2, _}, _Headers2, Body2}} = http:request( + get, + {db_url(TargetDbName) ++ "/testdoc1?att_gzip_length=true", []}, + [], + [{sync, true}]), + etap:is( + Code2, + 200, + "HTTP response code is 200 for the target DB doc request" + ), + Json2 = couch_util:json_decode(Body2), + TargetAttStub = couch_util:get_nested_json_value( + Json2, + [<<"_attachments">>, <<"readme.txt">>] + ), + IdenticalStubs = (SourceAttStub =:= TargetAttStub), + etap:is(IdenticalStubs, true, "Attachment stubs are identical"), + TargetAttStubLength = couch_util:get_nested_json_value( + TargetAttStub, + [<<"length">>] + ), + TargetAttStubGzipLength = couch_util:get_nested_json_value( + TargetAttStub, + [<<"gzip_length">>] + ), + GzipLengthDefined = is_integer(TargetAttStubGzipLength), + etap:is( + GzipLengthDefined, + true, + "Stubs have the gzip_length field properly defined" + ), + GzipLengthSmaller = (TargetAttStubGzipLength < TargetAttStubLength), + etap:is( + GzipLengthSmaller, + true, + "Stubs have the gzip_length field smaller than their length field" + ), + ok. + +admin_user_ctx() -> + {user_ctx, #user_ctx{roles=[<<"_admin">>]}}. + +create_db(DbName) -> + {ok, _} = couch_db:create(DbName, [admin_user_ctx()]). + +delete_db(DbName) -> + couch_server:delete(DbName, [admin_user_ctx()]). + +db_url(DbName) -> + "http://" ++ get(addr) ++ ":" ++ get(port) ++ "/" ++ + binary_to_list(DbName). + +rep_url() -> + "http://" ++ get(addr) ++ ":" ++ get(port) ++ "/_replicate". + +test_text_data() -> + {ok, Data} = file:read_file(test_util:source_file("README")), + binary_to_list(Data). diff --git a/test/etap/Makefile.am b/test/etap/Makefile.am index 6bbf45ba..bdab95aa 100644 --- a/test/etap/Makefile.am +++ b/test/etap/Makefile.am @@ -58,6 +58,7 @@ EXTRA_DIST = \ 110-replication-httpc.t \ 111-replication-changes-feed.t \ 112-replication-missing-revs.t \ + 113-replication-attachment-comp.t \ 120-stats-collect.t \ 121-stats-aggregates.cfg \ 121-stats-aggregates.ini \ -- cgit v1.2.3