diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/couchdb/couch_db.erl | 3 | ||||
-rw-r--r-- | src/couchdb/couch_db.hrl | 8 | ||||
-rw-r--r-- | src/couchdb/couch_db_updater.erl | 69 | ||||
-rw-r--r-- | src/couchdb/couch_file.erl | 23 | ||||
-rw-r--r-- | src/couchdb/couch_httpd_misc_handlers.erl | 4 | ||||
-rw-r--r-- | src/couchdb/couch_key_tree.erl | 135 | ||||
-rw-r--r-- | src/couchdb/couch_rep.erl | 51 | ||||
-rw-r--r-- | src/couchdb/couch_rep_changes_feed.erl | 3 | ||||
-rw-r--r-- | src/couchdb/couch_rep_httpc.erl | 49 | ||||
-rw-r--r-- | src/couchdb/couch_rep_reader.erl | 8 | ||||
-rw-r--r-- | src/couchdb/couch_util.erl | 4 | ||||
-rw-r--r-- | src/couchdb/couch_view_compactor.erl | 14 | ||||
-rw-r--r-- | src/ibrowse/Makefile.am | 2 | ||||
-rw-r--r-- | src/ibrowse/ibrowse.app.in | 2 | ||||
-rw-r--r-- | src/ibrowse/ibrowse.erl | 2 | ||||
-rw-r--r-- | src/ibrowse/ibrowse_http_client.erl | 80 | ||||
-rw-r--r-- | src/ibrowse/ibrowse_lib.erl | 19 | ||||
-rw-r--r-- | src/ibrowse/ibrowse_test.erl | 93 |
18 files changed, 356 insertions, 213 deletions
diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl index 964c4704..f005a2ea 100644 --- a/src/couchdb/couch_db.erl +++ b/src/couchdb/couch_db.erl @@ -555,7 +555,8 @@ prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldI {ok, #full_doc_info{rev_tree=OldTree}} -> NewRevTree = lists:foldl( fun(NewDoc, AccTree) -> - {NewTree, _} = couch_key_tree:merge(AccTree, [couch_db:doc_to_tree(NewDoc)]), + {NewTree, _} = couch_key_tree:merge(AccTree, + couch_db:doc_to_tree(NewDoc), Db#db.revs_limit), NewTree end, OldTree, Bucket), diff --git a/src/couchdb/couch_db.hrl b/src/couchdb/couch_db.hrl index a35745ef..d9a8697c 100644 --- a/src/couchdb/couch_db.hrl +++ b/src/couchdb/couch_db.hrl @@ -264,13 +264,7 @@ body = nil, options = [ {response_format,binary}, - {inactivity_timeout, 30000}, - {max_sessions, list_to_integer( - couch_config:get("replicator", "max_http_sessions", "10") - )}, - {max_pipeline_size, list_to_integer( - couch_config:get("replicator", "max_http_pipeline_size", "10") - )} + {inactivity_timeout, 30000} ], retries = 10, pause = 500, diff --git a/src/couchdb/couch_db_updater.erl b/src/couchdb/couch_db_updater.erl index e5c6019a..eb1a3edc 100644 --- a/src/couchdb/couch_db_updater.erl +++ b/src/couchdb/couch_db_updater.erl @@ -489,16 +489,17 @@ send_result(Client, Id, OriginalRevs, NewResult) -> % used to send a result to the client catch(Client ! {result, self(), {{Id, OriginalRevs}, NewResult}}). -merge_rev_trees(_MergeConflicts, [], [], AccNewInfos, AccRemoveSeqs, AccSeq) -> +merge_rev_trees(_Limit, _Merge, [], [], AccNewInfos, AccRemoveSeqs, AccSeq) -> {ok, lists:reverse(AccNewInfos), AccRemoveSeqs, AccSeq}; -merge_rev_trees(MergeConflicts, [NewDocs|RestDocsList], +merge_rev_trees(Limit, MergeConflicts, [NewDocs|RestDocsList], [OldDocInfo|RestOldInfo], AccNewInfos, AccRemoveSeqs, AccSeq) -> #full_doc_info{id=Id,rev_tree=OldTree,deleted=OldDeleted,update_seq=OldSeq} = OldDocInfo, NewRevTree = lists:foldl( fun({Client, #doc{revs={Pos,[_Rev|PrevRevs]}}=NewDoc}, AccTree) -> if not MergeConflicts -> - case couch_key_tree:merge(AccTree, [couch_db:doc_to_tree(NewDoc)]) of + case couch_key_tree:merge(AccTree, couch_db:doc_to_tree(NewDoc), + Limit) of {_NewTree, conflicts} when (not OldDeleted) -> send_result(Client, Id, {Pos-1,PrevRevs}, conflict), AccTree; @@ -529,7 +530,7 @@ merge_rev_trees(MergeConflicts, [NewDocs|RestDocsList], NewDoc#doc{revs={OldPos, [OldRev]}}), NewDoc2 = NewDoc#doc{revs={OldPos + 1, [NewRevId, OldRev]}}, {NewTree2, _} = couch_key_tree:merge(AccTree, - [couch_db:doc_to_tree(NewDoc2)]), + couch_db:doc_to_tree(NewDoc2), Limit), % we changed the rev id, this tells the caller we did send_result(Client, Id, {Pos-1,PrevRevs}, {ok, {OldPos + 1, NewRevId}}), @@ -543,15 +544,15 @@ merge_rev_trees(MergeConflicts, [NewDocs|RestDocsList], end; true -> {NewTree, _} = couch_key_tree:merge(AccTree, - [couch_db:doc_to_tree(NewDoc)]), + couch_db:doc_to_tree(NewDoc), Limit), NewTree end end, OldTree, NewDocs), if NewRevTree == OldTree -> % nothing changed - merge_rev_trees(MergeConflicts, RestDocsList, RestOldInfo, AccNewInfos, - AccRemoveSeqs, AccSeq); + merge_rev_trees(Limit, MergeConflicts, RestDocsList, RestOldInfo, + AccNewInfos, AccRemoveSeqs, AccSeq); true -> % we have updated the document, give it a new seq # NewInfo = #full_doc_info{id=Id,update_seq=AccSeq+1,rev_tree=NewRevTree}, @@ -559,8 +560,8 @@ merge_rev_trees(MergeConflicts, [NewDocs|RestDocsList], 0 -> AccRemoveSeqs; _ -> [OldSeq | AccRemoveSeqs] end, - merge_rev_trees(MergeConflicts, RestDocsList, RestOldInfo, - [NewInfo|AccNewInfos], RemoveSeqs, AccSeq+1) + merge_rev_trees(Limit, MergeConflicts, RestDocsList, RestOldInfo, + [NewInfo|AccNewInfos], RemoveSeqs, AccSeq+1) end. @@ -583,7 +584,8 @@ update_docs_int(Db, DocsList, NonRepDocs, MergeConflicts, FullCommit) -> #db{ fulldocinfo_by_id_btree = DocInfoByIdBTree, docinfo_by_seq_btree = DocInfoBySeqBTree, - update_seq = LastSeq + update_seq = LastSeq, + revs_limit = RevsLimit } = Db, Ids = [Id || [{_Client, #doc{id=Id}}|_] <- DocsList], % lookup up the old documents, if they exist. @@ -596,11 +598,9 @@ update_docs_int(Db, DocsList, NonRepDocs, MergeConflicts, FullCommit) -> end, Ids, OldDocLookups), % Merge the new docs into the revision trees. - {ok, NewDocInfos0, RemoveSeqs, NewSeq} = merge_rev_trees( + {ok, NewFullDocInfos, RemoveSeqs, NewSeq} = merge_rev_trees(RevsLimit, MergeConflicts, DocsList, OldDocInfos, [], [], LastSeq), - NewFullDocInfos = stem_full_doc_infos(Db, NewDocInfos0), - % All documents are now ready to write. {ok, Db2} = update_local_docs(Db, NonRepDocs), @@ -765,36 +765,24 @@ copy_doc_attachments(#db{fd=SrcFd}=SrcDb, {Pos,_RevId}, SrcSp, DestFd) -> end, BinInfos), {BodyData, NewBinInfos}. -copy_rev_tree_attachments(SrcDb, DestFd, Tree) -> - couch_key_tree:map( - fun(Rev, {IsDel, Sp, Seq}, leaf) -> - DocBody = copy_doc_attachments(SrcDb, Rev, Sp, DestFd), - {IsDel, DocBody, Seq}; - (_, _, branch) -> - ?REV_MISSING - end, Tree). - - -copy_docs(Db, #db{fd=DestFd}=NewDb, InfoBySeq, Retry) -> +copy_docs(Db, #db{fd=DestFd}=NewDb, InfoBySeq0, Retry) -> + % COUCHDB-968, make sure we prune duplicates during compaction + InfoBySeq = lists:usort(fun(#doc_info{id=A}, #doc_info{id=B}) -> A =< B end, + InfoBySeq0), Ids = [Id || #doc_info{id=Id} <- InfoBySeq], LookupResults = couch_btree:lookup(Db#db.fulldocinfo_by_id_btree, Ids), - % write out the attachments - NewFullDocInfos0 = lists:map( - fun({ok, #full_doc_info{rev_tree=RevTree}=Info}) -> - Info#full_doc_info{rev_tree=copy_rev_tree_attachments(Db, DestFd, RevTree)} - end, LookupResults), - % write out the docs - % we do this in 2 stages so the docs are written out contiguously, making - % view indexing and replication faster. NewFullDocInfos1 = lists:map( - fun(#full_doc_info{rev_tree=RevTree}=Info) -> - Info#full_doc_info{rev_tree=couch_key_tree:map_leafs( - fun(_Key, {IsDel, DocBody, Seq}) -> + fun({ok, #full_doc_info{rev_tree=RevTree}=Info}) -> + Info#full_doc_info{rev_tree=couch_key_tree:map( + fun(Rev, {IsDel, Sp, Seq}, leaf) -> + DocBody = copy_doc_attachments(Db, Rev, Sp, DestFd), {ok, Pos} = couch_file:append_term_md5(DestFd, DocBody), - {IsDel, Pos, Seq} + {IsDel, Pos, Seq}; + (_, _, branch) -> + ?REV_MISSING end, RevTree)} - end, NewFullDocInfos0), + end, LookupResults), NewFullDocInfos = stem_full_doc_infos(Db, NewFullDocInfos1), NewDocInfos = [couch_doc:to_doc_info(Info) || Info <- NewFullDocInfos], @@ -866,7 +854,12 @@ start_copy_compact(#db{name=Name,filepath=Filepath}=Db) -> {ok, Fd} -> couch_task_status:add_task(<<"Database Compaction">>, <<Name/binary, " retry">>, <<"Starting">>), Retry = true, - {ok, Header} = couch_file:read_header(Fd); + case couch_file:read_header(Fd) of + {ok, Header} -> + ok; + no_valid_header -> + ok = couch_file:write_header(Fd, Header=#db_header{}) + end; {error, enoent} -> couch_task_status:add_task(<<"Database Compaction">>, Name, <<"Starting">>), {ok, Fd} = couch_file:open(CompactFile, [create]), diff --git a/src/couchdb/couch_file.erl b/src/couchdb/couch_file.erl index 0a891712..fbfd6c6a 100644 --- a/src/couchdb/couch_file.erl +++ b/src/couchdb/couch_file.erl @@ -120,7 +120,19 @@ pread_binary(Fd, Pos) -> pread_iolist(Fd, Pos) -> - gen_server:call(Fd, {pread_iolist, Pos}, infinity). + case gen_server:call(Fd, {pread_iolist, Pos}, infinity) of + {ok, IoList, <<>>} -> + {ok, IoList}; + {ok, IoList, Md5} -> + case couch_util:md5(IoList) of + Md5 -> + {ok, IoList}; + _ -> + exit({file_corruption, <<"file corruption">>}) + end; + Error -> + Error + end. %%---------------------------------------------------------------------- %% Purpose: The length of a file, in bytes. @@ -298,15 +310,10 @@ handle_call({pread_iolist, Pos}, _From, File) -> <<1:1/integer,Len:31/integer>> -> % an MD5-prefixed term {Md5AndIoList, _} = read_raw_iolist_int(File, NextPos, Len+16), {Md5, IoList} = extract_md5(Md5AndIoList), - case couch_util:md5(IoList) of - Md5 -> - {reply, {ok, IoList}, File}; - _ -> - {stop, file_corruption, {error,file_corruption}, File} - end; + {reply, {ok, IoList, Md5}, File}; <<0:1/integer,Len:31/integer>> -> {Iolist, _} = read_raw_iolist_int(File, NextPos, Len), - {reply, {ok, Iolist}, File} + {reply, {ok, Iolist, <<>>}, File} end; handle_call({pread, Pos, Bytes}, _From, #file{fd=Fd,tail_append_begin=TailAppendBegin}=File) -> {ok, Bin} = file:pread(Fd, Pos, Bytes), diff --git a/src/couchdb/couch_httpd_misc_handlers.erl b/src/couchdb/couch_httpd_misc_handlers.erl index 13d770f1..db1b2ca1 100644 --- a/src/couchdb/couch_httpd_misc_handlers.erl +++ b/src/couchdb/couch_httpd_misc_handlers.erl @@ -101,7 +101,9 @@ handle_replicate_req(#httpd{method='POST'}=Req) -> end catch throw:{db_not_found, Msg} -> - send_json(Req, 404, {[{error, db_not_found}, {reason, Msg}]}) + send_json(Req, 404, {[{error, db_not_found}, {reason, Msg}]}); + throw:{unauthorized, Msg} -> + send_json(Req, 404, {[{error, unauthorized}, {reason, Msg}]}) end; handle_replicate_req(Req) -> send_method_not_allowed(Req, "POST"). diff --git a/src/couchdb/couch_key_tree.erl b/src/couchdb/couch_key_tree.erl index 4fe09bf3..6701da58 100644 --- a/src/couchdb/couch_key_tree.erl +++ b/src/couchdb/couch_key_tree.erl @@ -12,104 +12,107 @@ -module(couch_key_tree). --export([merge/2, find_missing/2, get_key_leafs/2, get_full_key_paths/2, get/2]). +-export([merge/3, find_missing/2, get_key_leafs/2, get_full_key_paths/2, get/2]). -export([map/2, get_all_leafs/1, count_leafs/1, remove_leafs/2, get_all_leafs_full/1,stem/2,map_leafs/2]). -% a key tree looks like this: -% Tree -> [] or [{Key, Value, ChildTree} | SiblingTree] -% ChildTree -> Tree -% SiblingTree -> [] or [{SiblingKey, Value, Tree} | Tree] -% And each Key < SiblingKey - +% Tree::term() is really a tree(), but we don't want to require R13B04 yet +-type branch() :: {Key::term(), Value::term(), Tree::term()}. +-type path() :: {Start::pos_integer(), branch()}. +-type tree() :: [branch()]. % sorted by key % partial trees arranged by how much they are cut off. -merge(A, B) -> - {Merged, HasConflicts} = - lists:foldl( - fun(InsertTree, {AccTrees, AccConflicts}) -> - {ok, Merged, Conflicts} = merge_one(AccTrees, InsertTree, [], false), - {Merged, Conflicts or AccConflicts} - end, - {A, false}, B), - if HasConflicts or - ((length(Merged) =/= length(A)) and (length(Merged) =/= length(B))) -> +-spec merge([path()], path(), pos_integer()) -> {[path()], + conflicts | no_conflicts}. +merge(Paths, Path, Depth) -> + {Merged, Conflicts} = merge(Paths, Path), + {stem(Merged, Depth), Conflicts}. + +-spec merge([path()], path()) -> {[path()], conflicts | no_conflicts}. +merge(Paths, Path) -> + {ok, Merged, HasConflicts} = merge_one(Paths, Path, [], false), + if HasConflicts -> + Conflicts = conflicts; + (length(Merged) =/= length(Paths)) and (length(Merged) =/= 1) -> Conflicts = conflicts; true -> Conflicts = no_conflicts end, {lists:sort(Merged), Conflicts}. +-spec merge_one(Original::[path()], Inserted::path(), [path()], bool()) -> + {ok, Merged::[path()], NewConflicts::bool()}. merge_one([], Insert, OutAcc, ConflictsAcc) -> {ok, [Insert | OutAcc], ConflictsAcc}; -merge_one([{Start, Tree}|Rest], {StartInsert, TreeInsert}, OutAcc, ConflictsAcc) -> - if Start =< StartInsert -> - StartA = Start, - StartB = StartInsert, - TreeA = Tree, - TreeB = TreeInsert; - true -> - StartB = Start, - StartA = StartInsert, - TreeB = Tree, - TreeA = TreeInsert - end, - case merge_at([TreeA], StartB - StartA, TreeB) of - {ok, [CombinedTrees], Conflicts} -> - merge_one(Rest, {StartA, CombinedTrees}, OutAcc, Conflicts or ConflictsAcc); +merge_one([{Start, Tree}|Rest], {StartInsert, TreeInsert}, Acc, HasConflicts) -> + case merge_at([Tree], StartInsert - Start, [TreeInsert]) of + {ok, [Merged], Conflicts} -> + MergedStart = lists:min([Start, StartInsert]), + {ok, Rest ++ [{MergedStart, Merged} | Acc], Conflicts or HasConflicts}; no -> - merge_one(Rest, {StartB, TreeB}, [{StartA, TreeA} | OutAcc], ConflictsAcc) + AccOut = [{Start, Tree} | Acc], + merge_one(Rest, {StartInsert, TreeInsert}, AccOut, HasConflicts) end. +-spec merge_at(tree(), Place::integer(), tree()) -> + {ok, Merged::tree(), HasConflicts::bool()} | no. +merge_at(_Ours, _Place, []) -> + no; merge_at([], _Place, _Insert) -> no; -merge_at([{Key, Value, SubTree}|Sibs], 0, {InsertKey, InsertValue, InsertSubTree}) -> - if Key == InsertKey -> - {Merge, Conflicts} = merge_simple(SubTree, InsertSubTree), - {ok, [{Key, Value, Merge} | Sibs], Conflicts}; - true -> - case merge_at(Sibs, 0, {InsertKey, InsertValue, InsertSubTree}) of - {ok, Merged, Conflicts} -> - {ok, [{Key, Value, SubTree} | Merged], Conflicts}; - no -> - no - end - end; -merge_at([{Key, Value, SubTree}|Sibs], Place, Insert) -> - case merge_at(SubTree, Place - 1,Insert) of +merge_at([{Key, Value, SubTree}|Sibs], Place, InsertTree) when Place > 0 -> + % inserted starts later than committed, need to drill into committed subtree + case merge_at(SubTree, Place - 1, InsertTree) of {ok, Merged, Conflicts} -> {ok, [{Key, Value, Merged} | Sibs], Conflicts}; no -> - case merge_at(Sibs, Place, Insert) of + case merge_at(Sibs, Place, InsertTree) of {ok, Merged, Conflicts} -> {ok, [{Key, Value, SubTree} | Merged], Conflicts}; no -> no end + end; +merge_at(OurTree, Place, [{Key, Value, SubTree}]) when Place < 0 -> + % inserted starts earlier than committed, need to drill into insert subtree + case merge_at(OurTree, Place + 1, SubTree) of + {ok, Merged, Conflicts} -> + {ok, [{Key, Value, Merged}], Conflicts}; + no -> + no + end; +merge_at([{Key, Value, SubTree}|Sibs], 0, [{Key, _Value, InsertSubTree}]) -> + {Merged, Conflicts} = merge_simple(SubTree, InsertSubTree), + {ok, [{Key, Value, Merged} | Sibs], Conflicts}; +merge_at([{OurKey, _, _} | _], 0, [{Key, _, _}]) when OurKey > Key -> + % siblings keys are ordered, no point in continuing + no; +merge_at([Tree | Sibs], 0, InsertTree) -> + case merge_at(Sibs, 0, InsertTree) of + {ok, Merged, Conflicts} -> + {ok, [Tree | Merged], Conflicts}; + no -> + no end. % key tree functions + +-spec merge_simple(tree(), tree()) -> {Merged::tree(), NewConflicts::bool()}. merge_simple([], B) -> {B, false}; merge_simple(A, []) -> {A, false}; -merge_simple([ATree | ANextTree], [BTree | BNextTree]) -> - {AKey, AValue, ASubTree} = ATree, - {BKey, _BValue, BSubTree} = BTree, - if - AKey == BKey -> - %same key - {MergedSubTree, Conflict1} = merge_simple(ASubTree, BSubTree), - {MergedNextTree, Conflict2} = merge_simple(ANextTree, BNextTree), - {[{AKey, AValue, MergedSubTree} | MergedNextTree], Conflict1 or Conflict2}; - AKey < BKey -> - {MTree, _} = merge_simple(ANextTree, [BTree | BNextTree]), - {[ATree | MTree], true}; - true -> - {MTree, _} = merge_simple([ATree | ANextTree], BNextTree), - {[BTree | MTree], true} - end. +merge_simple([{Key, Value, SubA} | NextA], [{Key, _, SubB} | NextB]) -> + {MergedSubTree, Conflict1} = merge_simple(SubA, SubB), + {MergedNextTree, Conflict2} = merge_simple(NextA, NextB), + {[{Key, Value, MergedSubTree} | MergedNextTree], Conflict1 or Conflict2}; +merge_simple([{A, _, _} = Tree | Next], [{B, _, _} | _] = Insert) when A < B -> + {Merged, _} = merge_simple(Next, Insert), + {[Tree | Merged], true}; +merge_simple(Ours, [Tree | Next]) -> + {Merged, _} = merge_simple(Ours, Next), + {[Tree | Merged], true}. find_missing(_Tree, []) -> []; @@ -159,7 +162,7 @@ remove_leafs(Trees, Keys) -> fun({PathPos, Path},TreeAcc) -> [SingleTree] = lists:foldl( fun({K,V},NewTreeAcc) -> [{K,V,NewTreeAcc}] end, [], Path), - {NewTrees, _} = merge(TreeAcc, [{PathPos + 1 - length(Path), SingleTree}]), + {NewTrees, _} = merge(TreeAcc, {PathPos + 1 - length(Path), SingleTree}), NewTrees end, [], FilteredPaths), {NewTree, RemovedKeys}. @@ -321,7 +324,7 @@ stem(Trees, Limit) -> fun({PathPos, Path},TreeAcc) -> [SingleTree] = lists:foldl( fun({K,V},NewTreeAcc) -> [{K,V,NewTreeAcc}] end, [], Path), - {NewTrees, _} = merge(TreeAcc, [{PathPos + 1 - length(Path), SingleTree}]), + {NewTrees, _} = merge(TreeAcc, {PathPos + 1 - length(Path), SingleTree}), NewTrees end, [], Paths2). diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl index ba387285..c804b49d 100644 --- a/src/couchdb/couch_rep.erl +++ b/src/couchdb/couch_rep.erl @@ -108,8 +108,12 @@ get_result(Server, PostBody, UserCtx) -> end. init(InitArgs) -> - try do_init(InitArgs) - catch throw:{db_not_found, DbUrl} -> {stop, {db_not_found, DbUrl}} end. + try + do_init(InitArgs) + catch + throw:Error -> + {stop, Error} + end. do_init([RepId, {PostProps}, UserCtx] = InitArgs) -> process_flag(trap_exit, true), @@ -314,13 +318,19 @@ start_replication_server(Replicator) -> ?LOG_DEBUG("replication ~p already running at ~p", [RepId, Pid]), Pid; {error, {db_not_found, DbUrl}} -> - throw({db_not_found, <<"could not open ", DbUrl/binary>>}) + throw({db_not_found, <<"could not open ", DbUrl/binary>>}); + {error, {unauthorized, DbUrl}} -> + throw({unauthorized, + <<"unauthorized to access database ", DbUrl/binary>>}) end; {error, {already_started, Pid}} -> ?LOG_DEBUG("replication ~p already running at ~p", [RepId, Pid]), Pid; {error, {{db_not_found, DbUrl}, _}} -> - throw({db_not_found, <<"could not open ", DbUrl/binary>>}) + throw({db_not_found, <<"could not open ", DbUrl/binary>>}); + {error, {{unauthorized, DbUrl}, _}} -> + throw({unauthorized, + <<"unauthorized to access database ", DbUrl/binary>>}) end. compare_replication_logs(SrcDoc, TgtDoc) -> @@ -554,25 +564,34 @@ open_db({Props}, _UserCtx, ProxyParams, CreateTarget) -> auth = AuthProps, headers = lists:ukeymerge(1, Headers, DefaultHeaders) }, - Db = Db1#http_db{options = Db1#http_db.options ++ ProxyParams}, + Db = Db1#http_db{ + options = Db1#http_db.options ++ ProxyParams ++ + couch_rep_httpc:ssl_options(Db1) + }, couch_rep_httpc:db_exists(Db, CreateTarget); open_db(<<"http://",_/binary>>=Url, _, ProxyParams, CreateTarget) -> open_db({[{<<"url">>,Url}]}, [], ProxyParams, CreateTarget); open_db(<<"https://",_/binary>>=Url, _, ProxyParams, CreateTarget) -> open_db({[{<<"url">>,Url}]}, [], ProxyParams, CreateTarget); open_db(<<DbName/binary>>, UserCtx, _ProxyParams, CreateTarget) -> - case CreateTarget of - true -> - ok = couch_httpd:verify_is_server_admin(UserCtx), - couch_server:create(DbName, [{user_ctx, UserCtx}]); - false -> ok - end, + try + case CreateTarget of + true -> + ok = couch_httpd:verify_is_server_admin(UserCtx), + couch_server:create(DbName, [{user_ctx, UserCtx}]); + false -> + ok + end, - case couch_db:open(DbName, [{user_ctx, UserCtx}]) of - {ok, Db} -> - couch_db:monitor(Db), - Db; - {not_found, no_db_file} -> throw({db_not_found, DbName}) + case couch_db:open(DbName, [{user_ctx, UserCtx}]) of + {ok, Db} -> + couch_db:monitor(Db), + Db; + {not_found, no_db_file} -> + throw({db_not_found, DbName}) + end + catch throw:{unauthorized, _} -> + throw({unauthorized, DbName}) end. schedule_checkpoint(#state{checkpoint_scheduled = nil} = State) -> diff --git a/src/couchdb/couch_rep_changes_feed.erl b/src/couchdb/couch_rep_changes_feed.erl index 7f7d3a38..4d1afcb8 100644 --- a/src/couchdb/couch_rep_changes_feed.erl +++ b/src/couchdb/couch_rep_changes_feed.erl @@ -204,6 +204,9 @@ handle_info({ibrowse_async_response_end, Id}, #state{reqid=Id} = State) -> handle_info({'EXIT', From, normal}, #state{changes_loop=From} = State) -> handle_feed_completion(State); +handle_info({'EXIT', From, normal}, #state{conn=From, complete=true} = State) -> + {noreply, State}; + handle_info({'EXIT', From, Reason}, #state{changes_loop=From} = State) -> ?LOG_ERROR("changes_loop died with reason ~p", [Reason]), {stop, changes_loop_died, State}; diff --git a/src/couchdb/couch_rep_httpc.erl b/src/couchdb/couch_rep_httpc.erl index b32e4c77..e535c0d5 100644 --- a/src/couchdb/couch_rep_httpc.erl +++ b/src/couchdb/couch_rep_httpc.erl @@ -16,6 +16,7 @@ -export([db_exists/1, db_exists/2, full_url/1, request/1, redirected_request/2, redirect_url/2, spawn_worker_process/1, spawn_link_worker_process/1]). +-export([ssl_options/1]). request(#http_db{} = Req) -> do_request(Req). @@ -91,6 +92,7 @@ db_exists(Req, CanonicalUrl, CreateDB) -> end, case catch ibrowse:send_req(Url, HeadersFun(head), head, [], Options) of {ok, "200", _, _} -> + config_http(CanonicalUrl), Req#http_db{url = CanonicalUrl}; {ok, "301", RespHeaders, _} -> RedirectUrl = redirect_url(RespHeaders, Req#http_db.url), @@ -98,11 +100,26 @@ db_exists(Req, CanonicalUrl, CreateDB) -> {ok, "302", RespHeaders, _} -> RedirectUrl = redirect_url(RespHeaders, Req#http_db.url), db_exists(Req#http_db{url = RedirectUrl}, CanonicalUrl); + {ok, "401", _, _} -> + throw({unauthorized, ?l2b(Url)}); Error -> ?LOG_DEBUG("DB at ~s could not be found because ~p", [Url, Error]), throw({db_not_found, ?l2b(Url)}) end. +config_http(Url) -> + #url{host = Host, port = Port} = ibrowse_lib:parse_url(Url), + ok = ibrowse:set_max_sessions(Host, Port, list_to_integer( + couch_config:get("replicator", "max_http_sessions", "20"))), + ok = ibrowse:set_max_pipeline_size(Host, Port, list_to_integer( + couch_config:get("replicator", "max_http_pipeline_size", "50"))), + ok = couch_config:register( + fun("replicator", "max_http_sessions", MaxSessions) -> + ibrowse:set_max_sessions(Host, Port, list_to_integer(MaxSessions)); + ("replicator", "max_http_pipeline_size", PipeSize) -> + ibrowse:set_max_pipeline_size(Host, Port, list_to_integer(PipeSize)) + end). + redirect_url(RespHeaders, OrigUrl) -> MochiHeaders = mochiweb_headers:make(RespHeaders), RedUrl = mochiweb_headers:get_value("Location", MochiHeaders), @@ -244,3 +261,35 @@ oauth_header(Url, QS, Action, Props) -> Params = oauth:signed_params(Method, Url, QSL, Consumer, Token, TokenSecret) -- QSL, {"Authorization", "OAuth " ++ oauth_uri:params_to_header_string(Params)}. + +ssl_options(#http_db{url = Url}) -> + case ibrowse_lib:parse_url(Url) of + #url{protocol = https} -> + Depth = list_to_integer( + couch_config:get("replicator", "ssl_certificate_max_depth", "3") + ), + SslOpts = [{depth, Depth} | + case couch_config:get("replicator", "verify_ssl_certificates") of + "true" -> + ssl_verify_options(true); + _ -> + ssl_verify_options(false) + end], + [{is_ssl, true}, {ssl_options, SslOpts}]; + #url{protocol = http} -> + [] + end. + +ssl_verify_options(Value) -> + ssl_verify_options(Value, erlang:system_info(otp_release)). + +ssl_verify_options(true, OTPVersion) when OTPVersion >= "R14" -> + CAFile = couch_config:get("replicator", "ssl_trusted_certificates_file"), + [{verify, verify_peer}, {cacertfile, CAFile}]; +ssl_verify_options(false, OTPVersion) when OTPVersion >= "R14" -> + [{verify, verify_none}]; +ssl_verify_options(true, _OTPVersion) -> + CAFile = couch_config:get("replicator", "ssl_trusted_certificates_file"), + [{verify, 2}, {cacertfile, CAFile}]; +ssl_verify_options(false, _OTPVersion) -> + [{verify, 0}]. diff --git a/src/couchdb/couch_rep_reader.erl b/src/couchdb/couch_rep_reader.erl index 0930599c..a7ae45a8 100644 --- a/src/couchdb/couch_rep_reader.erl +++ b/src/couchdb/couch_rep_reader.erl @@ -21,11 +21,8 @@ -define (BUFFER_SIZE, 1000). -define (MAX_CONCURRENT_REQUESTS, 100). --define (MAX_CONNECTIONS, 20). --define (MAX_PIPELINE_SIZE, 50). -include("couch_db.hrl"). --include("../ibrowse/ibrowse.hrl"). -record (state, { parent, @@ -53,11 +50,6 @@ next(Pid) -> init([Parent, Source, MissingRevs_or_DocIds, _PostProps]) -> process_flag(trap_exit, true), - if is_record(Source, http_db) -> - #url{host=Host, port=Port} = ibrowse_lib:parse_url(Source#http_db.url), - ibrowse:set_max_sessions(Host, Port, ?MAX_CONNECTIONS), - ibrowse:set_max_pipeline_size(Host, Port, ?MAX_PIPELINE_SIZE); - true -> ok end, Self = self(), ReaderLoop = spawn_link( fun() -> reader_loop(Self, Parent, Source, MissingRevs_or_DocIds) end diff --git a/src/couchdb/couch_util.erl b/src/couchdb/couch_util.erl index 7a8ae055..ed6d2b25 100644 --- a/src/couchdb/couch_util.erl +++ b/src/couchdb/couch_util.erl @@ -418,8 +418,8 @@ compressible_att_type(MimeType) -> ), lists:any( fun(TypeExp) -> - Regexp = "^\\s*" ++ - re:replace(TypeExp, "\\*", ".*", [{return, list}]) ++ "\\s*$", + Regexp = ["^\\s*", re:replace(TypeExp, "\\*", ".*"), + "(?:\\s*;.*?)?\\s*", $$], case re:run(MimeType, Regexp, [caseless]) of {match, _} -> true; diff --git a/src/couchdb/couch_view_compactor.erl b/src/couchdb/couch_view_compactor.erl index 895556bf..9a47f5f8 100644 --- a/src/couchdb/couch_view_compactor.erl +++ b/src/couchdb/couch_view_compactor.erl @@ -48,18 +48,22 @@ compact_group(Group, EmptyGroup) -> TaskName = <<DbName/binary, ShortName/binary>>, couch_task_status:add_task(<<"View Group Compaction">>, TaskName, <<"">>), - Fun = fun(KV, {Bt, Acc, TotalCopied}) -> + Fun = fun({DocId, _ViewIdKeys} = KV, {Bt, Acc, TotalCopied, LastId}) -> + if DocId =:= LastId -> % COUCHDB-999 + Msg = "Duplicates of ~s detected in ~s ~s - rebuild required", + exit(io_lib:format(Msg, [DocId, DbName, GroupId])); + true -> ok end, if TotalCopied rem 10000 =:= 0 -> couch_task_status:update("Copied ~p of ~p Ids (~p%)", [TotalCopied, Count, (TotalCopied*100) div Count]), {ok, Bt2} = couch_btree:add(Bt, lists:reverse([KV|Acc])), - {ok, {Bt2, [], TotalCopied+1}}; + {ok, {Bt2, [], TotalCopied+1, DocId}}; true -> - {ok, {Bt, [KV|Acc], TotalCopied+1}} + {ok, {Bt, [KV|Acc], TotalCopied+1, DocId}} end end, - {ok, _, {Bt3, Uncopied, _Total}} = couch_btree:foldl(IdBtree, Fun, - {EmptyIdBtree, [], 0}), + {ok, _, {Bt3, Uncopied, _Total, _LastId}} = couch_btree:foldl(IdBtree, Fun, + {EmptyIdBtree, [], 0, nil}), {ok, NewIdBtree} = couch_btree:add(Bt3, lists:reverse(Uncopied)), NewViews = lists:map(fun({View, EmptyView}) -> diff --git a/src/ibrowse/Makefile.am b/src/ibrowse/Makefile.am index 8c5d3f8e..4cebe5d1 100644 --- a/src/ibrowse/Makefile.am +++ b/src/ibrowse/Makefile.am @@ -10,7 +10,7 @@ ## License for the specific language governing permissions and limitations under ## the License. -ibrowseebindir = $(localerlanglibdir)/ibrowse-2.1.0/ebin +ibrowseebindir = $(localerlanglibdir)/ibrowse-2.1.2/ebin ibrowse_file_collection = \ ibrowse.app.in \ diff --git a/src/ibrowse/ibrowse.app.in b/src/ibrowse/ibrowse.app.in index e8580d10..c8e42271 100644 --- a/src/ibrowse/ibrowse.app.in +++ b/src/ibrowse/ibrowse.app.in @@ -1,6 +1,6 @@ {application, ibrowse, [{description, "HTTP client application"}, - {vsn, "2.1.0"}, + {vsn, "2.1.2"}, {modules, [ ibrowse, ibrowse_http_client, ibrowse_app, diff --git a/src/ibrowse/ibrowse.erl b/src/ibrowse/ibrowse.erl index 1a42f4bc..e1051504 100644 --- a/src/ibrowse/ibrowse.erl +++ b/src/ibrowse/ibrowse.erl @@ -7,7 +7,7 @@ %%%------------------------------------------------------------------- %% @author Chandrashekhar Mullaparthi <chandrashekhar dot mullaparthi at gmail dot com> %% @copyright 2005-2010 Chandrashekhar Mullaparthi -%% @version 2.1.0 +%% @version 2.1.2 %% @doc The ibrowse application implements an HTTP 1.1 client in erlang. This %% module implements the API of the HTTP client. There is one named %% process called 'ibrowse' which assists in load balancing and maintaining configuration. There is one load balancing process per unique webserver. There is diff --git a/src/ibrowse/ibrowse_http_client.erl b/src/ibrowse/ibrowse_http_client.erl index 5ff323cd..ea759488 100644 --- a/src/ibrowse/ibrowse_http_client.erl +++ b/src/ibrowse/ibrowse_http_client.erl @@ -69,7 +69,7 @@ ]). -define(DEFAULT_STREAM_CHUNK_SIZE, 1024*1024). - +-define(dec2hex(X), erlang:integer_to_list(X, 16)). %%==================================================================== %% External functions %%==================================================================== @@ -191,13 +191,21 @@ handle_info({stream_next, Req_id}, #state{socket = Socket, {noreply, State}; handle_info({stream_next, _Req_id}, State) -> + _Cur_req_id = case State#state.cur_req of + #request{req_id = Cur} -> + Cur; + _ -> + undefined + end, +%% io:format("Ignoring stream_next as ~1000.p is not cur req (~1000.p)~n", +%% [_Req_id, _Cur_req_id]), {noreply, State}; handle_info({stream_close, _Req_id}, State) -> shutting_down(State), do_close(State), do_error_reply(State, closing_on_request), - {stop, normal, ok, State}; + {stop, normal, State}; handle_info({tcp_closed, _Sock}, State) -> do_trace("TCP connection closed by peer!~n", []), @@ -369,15 +377,6 @@ accumulate_response(Data, #state{cur_req = #request{save_response_to_file = Srtf {error, Reason} -> {error, {file_write_error, Reason}} end; -%% accumulate_response(<<>>, #state{cur_req = #request{caller_controls_socket = Ccs}, -%% socket = Socket} = State) -> -%% case Ccs of -%% true -> -%% do_setopts(Socket, [{active, once}], State); -%% false -> -%% ok -%% end, -%% State; accumulate_response(Data, #state{reply_buffer = RepBuf, rep_buf_size = RepBufSize, streamed_size = Streamed_size, @@ -544,7 +543,7 @@ do_send_body1(Source, Resp, State, TE) -> maybe_chunked_encode(Data, false) -> Data; maybe_chunked_encode(Data, true) -> - [ibrowse_lib:dec2hex(byte_size(to_binary(Data))), "\r\n", Data, "\r\n"]. + [?dec2hex(size(to_binary(Data))), "\r\n", Data, "\r\n"]. do_close(#state{socket = undefined}) -> ok; do_close(#state{socket = Sock, @@ -634,7 +633,7 @@ send_req_1(From, Path = [Server_host, $:, integer_to_list(Server_port)], {Req, Body_1} = make_request(connect, Pxy_auth_headers, Path, Path, - [], Options, State_1), + [], Options, State_1, undefined), TE = is_chunked_encoding_specified(Options), trace_request(Req), case do_send(Req, State) of @@ -683,8 +682,7 @@ send_req_1(From, path = RelPath} = Url, Headers, Method, Body, Options, Timeout, #state{status = Status, - socket = Socket, - is_ssl = Is_ssl} = State) -> + socket = Socket} = State) -> ReqId = make_req_id(), Resp_format = get_value(response_format, Options, list), Caller_socket_options = get_value(socket_options, Options, []), @@ -721,9 +719,10 @@ send_req_1(From, Headers_1 = maybe_modify_headers(Url, Method, Options, Headers, State_1), {Req, Body_1} = make_request(Method, Headers_1, - AbsPath, RelPath, Body, Options, State_1), + AbsPath, RelPath, Body, Options, State_1, + ReqId), trace_request(Req), - do_setopts(Socket, Caller_socket_options, Is_ssl), + do_setopts(Socket, Caller_socket_options, State_1), TE = is_chunked_encoding_specified(Options), case do_send(Req, State_1) of ok -> @@ -821,7 +820,7 @@ http_auth_digest(Username, Password) -> ibrowse_lib:encode_base64(Username ++ [$: | Password]). make_request(Method, Headers, AbsPath, RelPath, Body, Options, - #state{use_proxy = UseProxy, is_ssl = Is_ssl}) -> + #state{use_proxy = UseProxy, is_ssl = Is_ssl}, ReqId) -> HttpVsn = http_vsn_string(get_value(http_vsn, Options, {1,1})), Fun1 = fun({X, Y}) when is_atom(X) -> {to_lower(atom_to_list(X)), X, Y}; @@ -831,17 +830,14 @@ make_request(Method, Headers, AbsPath, RelPath, Body, Options, Headers_0 = [Fun1(X) || X <- Headers], Headers_1 = case lists:keysearch("content-length", 1, Headers_0) of - false when (Body == []) orelse - (Body == <<>>) orelse - is_tuple(Body) orelse - is_function(Body) -> - Headers_0; - false when is_binary(Body) -> - [{"content-length", "content-length", integer_to_list(size(Body))} | Headers_0]; - false when is_list(Body) -> - [{"content-length", "content-length", integer_to_list(length(Body))} | Headers_0]; + false when (Body =:= [] orelse Body =:= <<>>) andalso + (Method =:= post orelse Method =:= put) -> + [{"content-length", "Content-Length", "0"} | Headers_0]; + false when is_binary(Body) orelse is_list(Body) -> + [{"content-length", "Content-Length", integer_to_list(iolist_size(Body))} | Headers_0]; _ -> - %% Content-Length is already specified + %% Content-Length is already specified or Body is a + %% function or function/state pair Headers_0 end, {Headers_2, Body_1} = @@ -860,7 +856,13 @@ make_request(Method, Headers, AbsPath, RelPath, Body, Options, [{"Transfer-Encoding", "chunked"}], chunk_request_body(Body, Chunk_size_1)} end, - Headers_3 = cons_headers(Headers_2), + Headers_3 = case lists:member({include_ibrowse_req_id, true}, Options) of + true -> + [{"x-ibrowse-request-id", io_lib:format("~1000.p",[ReqId])} | Headers_2]; + false -> + Headers_2 + end, + Headers_4 = cons_headers(Headers_3), Uri = case get_value(use_absolute_uri, Options, false) or UseProxy of true -> case Is_ssl of @@ -872,7 +874,7 @@ make_request(Method, Headers, AbsPath, RelPath, Body, Options, false -> RelPath end, - {[method(Method), " ", Uri, " ", HttpVsn, crnl(), Headers_3, crnl()], Body_1}. + {[method(Method), " ", Uri, " ", HttpVsn, crnl(), Headers_4, crnl()], Body_1}. is_chunked_encoding_specified(Options) -> case get_value(transfer_encoding, Options, false) of @@ -927,23 +929,23 @@ chunk_request_body(Body, _ChunkSize, Acc) when Body == <<>>; Body == [] -> chunk_request_body(Body, ChunkSize, Acc) when is_binary(Body), size(Body) >= ChunkSize -> <<ChunkBody:ChunkSize/binary, Rest/binary>> = Body, - Chunk = [ibrowse_lib:dec2hex(ChunkSize),"\r\n", + Chunk = [?dec2hex(ChunkSize),"\r\n", ChunkBody, "\r\n"], chunk_request_body(Rest, ChunkSize, [Chunk | Acc]); chunk_request_body(Body, _ChunkSize, Acc) when is_binary(Body) -> BodySize = size(Body), - Chunk = [ibrowse_lib:dec2hex(BodySize),"\r\n", + Chunk = [?dec2hex(BodySize),"\r\n", Body, "\r\n"], LastChunk = "0\r\n", lists:reverse(["\r\n", LastChunk, Chunk | Acc]); chunk_request_body(Body, ChunkSize, Acc) when length(Body) >= ChunkSize -> {ChunkBody, Rest} = split_list_at(Body, ChunkSize), - Chunk = [ibrowse_lib:dec2hex(ChunkSize),"\r\n", + Chunk = [?dec2hex(ChunkSize),"\r\n", ChunkBody, "\r\n"], chunk_request_body(Rest, ChunkSize, [Chunk | Acc]); chunk_request_body(Body, _ChunkSize, Acc) when is_list(Body) -> BodySize = length(Body), - Chunk = [ibrowse_lib:dec2hex(BodySize),"\r\n", + Chunk = [?dec2hex(BodySize),"\r\n", Body, "\r\n"], LastChunk = "0\r\n", lists:reverse(["\r\n", LastChunk, Chunk | Acc]). @@ -1316,11 +1318,17 @@ reset_state(State) -> transfer_encoding = undefined }. -set_cur_request(#state{reqs = Reqs} = State) -> +set_cur_request(#state{reqs = Reqs, socket = Socket} = State) -> case queue:to_list(Reqs) of [] -> State#state{cur_req = undefined}; - [NextReq | _] -> + [#request{caller_controls_socket = Ccs} = NextReq | _] -> + case Ccs of + true -> + do_setopts(Socket, [{active, once}], State); + _ -> + ok + end, State#state{cur_req = NextReq} end. diff --git a/src/ibrowse/ibrowse_lib.erl b/src/ibrowse/ibrowse_lib.erl index e913adbe..696d0f69 100644 --- a/src/ibrowse/ibrowse_lib.erl +++ b/src/ibrowse/ibrowse_lib.erl @@ -19,9 +19,6 @@ url_encode/1, decode_rfc822_date/1, status_code/1, - dec2hex/1, - drv_ue/1, - drv_ue/2, encode_base64/1, decode_base64/1, get_value/2, @@ -33,17 +30,6 @@ get_trace_status(Host, Port) -> ibrowse:get_config_value({trace, Host, Port}, false). -drv_ue(Str) -> - [{port, Port}| _] = ets:lookup(ibrowse_table, port), - drv_ue(Str, Port). -drv_ue(Str, Port) -> - case erlang:port_control(Port, 1, Str) of - [] -> - Str; - Res -> - Res - end. - %% @doc URL-encodes a string based on RFC 1738. Returns a flat list. %% @spec url_encode(Str) -> UrlEncodedStr %% Str = string() @@ -163,11 +149,6 @@ status_code(507) -> insufficient_storage; status_code(X) when is_list(X) -> status_code(list_to_integer(X)); status_code(_) -> unknown_status_code. -%% @doc Returns a string with the hexadecimal representation of a given decimal. -%% N = integer() -- the number to represent as hex -%% @spec dec2hex(N::integer()) -> string() -dec2hex(N) -> lists:flatten(io_lib:format("~.16B", [N])). - %% @doc Implements the base64 encoding algorithm. The output data type matches in the input data type. %% @spec encode_base64(In) -> Out %% In = string() | binary() diff --git a/src/ibrowse/ibrowse_test.erl b/src/ibrowse/ibrowse_test.erl index 3ad76603..b8e0a4a5 100644 --- a/src/ibrowse/ibrowse_test.erl +++ b/src/ibrowse/ibrowse_test.erl @@ -20,7 +20,8 @@ test_chunked_streaming_once/0, i_do_async_req_list/4, test_stream_once/3, - test_stream_once/4 + test_stream_once/4, + test_20122010/0 ]). test_stream_once(Url, Method, Options) -> @@ -218,7 +219,8 @@ dump_errors(Key, Iod) -> {"http://jigsaw.w3.org/HTTP/Basic/", get, [{basic_auth, {"guest", "guest"}}]}, {"http://jigsaw.w3.org/HTTP/CL/", get}, {"http://www.httpwatch.com/httpgallery/chunked/", get}, - {"https://github.com", get, [{ssl_options, [{depth, 2}]}]} + {"https://github.com", get, [{ssl_options, [{depth, 2}]}]}, + {local_test_fun, test_20122010, []} ]). unit_tests() -> @@ -228,6 +230,7 @@ unit_tests(Options) -> application:start(crypto), application:start(public_key), application:start(ssl), + (catch ibrowse_test_server:start_server(8181, tcp)), ibrowse:start(), Options_1 = Options ++ [{connect_timeout, 5000}], {Pid, Ref} = erlang:spawn_monitor(?MODULE, unit_tests_1, [self(), Options_1]), @@ -242,7 +245,9 @@ unit_tests(Options) -> end. unit_tests_1(Parent, Options) -> - lists:foreach(fun({Url, Method}) -> + lists:foreach(fun({local_test_fun, Fun_name, Args}) -> + execute_req(local_test_fun, Fun_name, Args); + ({Url, Method}) -> execute_req(Url, Method, Options); ({Url, Method, X_Opts}) -> execute_req(Url, Method, X_Opts ++ Options) @@ -394,6 +399,10 @@ maybe_stream_next(Req_id, Options) -> ok end. +execute_req(local_test_fun, Method, Args) -> + io:format(" ~-54.54w: ", [Method]), + Result = (catch apply(?MODULE, Method, Args)), + io:format("~p~n", [Result]); execute_req(Url, Method, Options) -> io:format("~7.7w, ~50.50s: ", [Method, Url]), Result = (catch ibrowse:send_req(Url, [], Method, [], Options)), @@ -430,3 +439,81 @@ ue_test(Data) -> log_msg(Fmt, Args) -> io:format("~s -- " ++ Fmt, [ibrowse_lib:printable_date() | Args]). + +%%------------------------------------------------------------------------------ +%% +%%------------------------------------------------------------------------------ + +test_20122010() -> + {ok, Pid} = ibrowse:spawn_worker_process("http://localhost:8181"), + Expected_resp = <<"1-2-3-4-5-6-7-8-9-10-11-12-13-14-15-16-17-18-19-20-21-22-23-24-25-26-27-28-29-30-31-32-33-34-35-36-37-38-39-40-41-42-43-44-45-46-47-48-49-50-51-52-53-54-55-56-57-58-59-60-61-62-63-64-65-66-67-68-69-70-71-72-73-74-75-76-77-78-79-80-81-82-83-84-85-86-87-88-89-90-91-92-93-94-95-96-97-98-99-100">>, + Test_parent = self(), + Fun = fun() -> + do_test_20122010(Pid, Expected_resp, Test_parent) + end, + Pids = [erlang:spawn_monitor(Fun) || _ <- lists:seq(1,10)], + wait_for_workers(Pids). + +wait_for_workers([{Pid, _Ref} | Pids]) -> + receive + {Pid, success} -> + wait_for_workers(Pids) + after 5000 -> + test_failed + end; +wait_for_workers([]) -> + success. + +do_test_20122010(Pid, Expected_resp, Test_parent) -> + {ibrowse_req_id, Req_id} = ibrowse:send_req_direct( + Pid, + "http://localhost:8181/ibrowse_stream_once_chunk_pipeline_test", + [], get, [], + [{stream_to, {self(), once}}, + {include_ibrowse_req_id, true}]), + do_trace("~p -- sent request ~1000.p~n", [self(), Req_id]), + Req_id_str = lists:flatten(io_lib:format("~1000.p",[Req_id])), + receive + {ibrowse_async_headers, Req_id, "200", Headers} -> + case lists:keysearch("x-ibrowse-request-id", 1, Headers) of + {value, {_, Req_id_str}} -> + ok; + {value, {_, Req_id_1}} -> + do_trace("~p -- Sent req-id: ~1000.p. Recvd: ~1000.p~n", + [self(), Req_id, Req_id_1]), + exit(req_id_mismatch) + end + after 5000 -> + do_trace("~p -- response headers not received~n", [self()]), + exit({timeout, test_failed}) + end, + do_trace("~p -- response headers received~n", [self()]), + ok = ibrowse:stream_next(Req_id), + case do_test_20122010_1(Expected_resp, Req_id, []) of + true -> + Test_parent ! {self(), success}; + false -> + Test_parent ! {self(), failed} + end. + +do_test_20122010_1(Expected_resp, Req_id, Acc) -> + receive + {ibrowse_async_response, Req_id, Body_part} -> + ok = ibrowse:stream_next(Req_id), + do_test_20122010_1(Expected_resp, Req_id, [Body_part | Acc]); + {ibrowse_async_response_end, Req_id} -> + Acc_1 = list_to_binary(lists:reverse(Acc)), + Result = Acc_1 == Expected_resp, + do_trace("~p -- End of response. Result: ~p~n", [self(), Result]), + Result + after 1000 -> + exit({timeout, test_failed}) + end. + +do_trace(Fmt, Args) -> + do_trace(get(my_trace_flag), Fmt, Args). + +do_trace(true, Fmt, Args) -> + io:format("~s -- " ++ Fmt, [ibrowse_lib:printable_date() | Args]); +do_trace(_, _, _) -> + ok. |