From ace6dfe0107010b57c5da0596f69cfd10fc84a38 Mon Sep 17 00:00:00 2001 From: John Christopher Anderson Date: Thu, 29 Jan 2009 22:15:48 +0000 Subject: Replacement of inets with ibrowse. Fixes COUCHDB-179 and enhances replication. Thanks Jason Davies and Adam Kocoloski for the fix, Maximillian Dornseif for reporting. git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@739047 13f79535-47bb-0310-9956-ffa450edef68 --- src/couchdb/couch_rep.erl | 96 +++++++++++++++++++++++++++++++---------------- 1 file changed, 64 insertions(+), 32 deletions(-) (limited to 'src/couchdb/couch_rep.erl') diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl index 29f1fc80..ff926584 100644 --- a/src/couchdb/couch_rep.erl +++ b/src/couchdb/couch_rep.erl @@ -157,30 +157,54 @@ replicate2(Source, DbSrc, Target, DbTgt, Options) -> end. pull_rep(DbTarget, DbSource, SourceSeqNum) -> - http:set_options([{max_pipeline_length, 101}, {pipeline_timeout, 5000}]), {ok, {NewSeq, Stats}} = enum_docs_since(DbSource, DbTarget, SourceSeqNum, {SourceSeqNum, []}), - http:set_options([{max_pipeline_length, 2}, {pipeline_timeout, 0}]), {NewSeq, Stats}. do_http_request(Url, Action, Headers) -> do_http_request(Url, Action, Headers, []). do_http_request(Url, Action, Headers, JsonBody) -> - ?LOG_DEBUG("couch_rep HTTP client request:", []), - ?LOG_DEBUG("\tAction: ~p", [Action]), - ?LOG_DEBUG("\tUrl: ~p", [Url]), - Request = + do_http_request(Url, Action, Headers, JsonBody, 10). + +do_http_request(Url, Action, _Headers, _JsonBody, 0) -> + ?LOG_ERROR("couch_rep HTTP ~p request failed after 10 retries: ~p", + [Action, Url]); +do_http_request(Url, Action, Headers, JsonBody, Retries) -> + ?LOG_DEBUG("couch_rep HTTP ~p request: ~p", [Action, Url]), + Body = case JsonBody of [] -> - {Url, Headers}; + <<>>; _ -> - {Url, Headers, "application/json; charset=utf-8", iolist_to_binary(?JSON_ENCODE(JsonBody))} + iolist_to_binary(?JSON_ENCODE(JsonBody)) end, - {ok, {{_, ResponseCode,_},_Headers, ResponseBody}} = http:request(Action, Request, [], []), - if - ResponseCode >= 200, ResponseCode < 500 -> - ?JSON_DECODE(ResponseBody) + Options = [ + {content_type, "application/json; charset=utf-8"}, + {max_pipeline_size, 101}, + {transfer_encoding, {chunked, 65535}} + ], + case ibrowse:send_req(Url, Headers, Action, Body, Options) of + {ok, Status, ResponseHeaders, ResponseBody} -> + ResponseCode = list_to_integer(Status), + if + ResponseCode >= 200, ResponseCode < 300 -> + ?JSON_DECODE(ResponseBody); + ResponseCode >= 300, ResponseCode < 400 -> + RedirectUrl = mochiweb_headers:get_value("Location", + mochiweb_headers:make(ResponseHeaders)), + do_http_request(RedirectUrl, Action, Headers, JsonBody, Retries-1); + ResponseCode >= 400, ResponseCode < 500 -> + ?JSON_DECODE(ResponseBody); + ResponseCode == 500 -> + ?LOG_INFO("retrying couch_rep HTTP ~p request due to 500 error: ~p", + [Action, Url]), + do_http_request(Url, Action, Headers, JsonBody, Retries - 1) + end; + {error, Reason} -> + ?LOG_INFO("retrying couch_rep HTTP ~p request due to {error, ~p}: ~p", + [Action, Reason, Url]), + do_http_request(Url, Action, Headers, JsonBody, Retries - 1) end. save_docs_buffer(DbTarget, DocsBuffer, []) -> @@ -223,20 +247,17 @@ wait_result({Pid,Ref}) -> {'DOWN', Ref, _, _, Reason} -> exit(Reason) end. -enum_docs_parallel(DbS, DbT, DocInfoList) -> - UpdateSeqs = [D#doc_info.update_seq || D <- DocInfoList], +enum_docs_parallel(DbS, DbT, InfoList) -> + UpdateSeqs = [Seq || {_, Seq, _, _} <- InfoList], SaveDocsPid = spawn_link(fun() -> save_docs_buffer(DbT,[],UpdateSeqs) end), - Stats = pmap(fun(SrcDocInfo) -> - #doc_info{id=Id, - rev=Rev, - conflict_revs=Conflicts, - deleted_conflict_revs=DelConflicts, - update_seq=Seq} = SrcDocInfo, - SrcRevs = [Rev | Conflicts] ++ DelConflicts, - - case get_missing_revs(DbT, [{Id, SrcRevs}]) of - {ok, [{Id, MissingRevs}]} -> + Stats = pmap(fun({Id, Seq, SrcRevs, MissingRevs}) -> + case MissingRevs of + [] -> + SaveDocsPid ! {self(), skip, Seq}, + receive got_it -> ok end, + [{missing_checked, length(SrcRevs)}]; + _ -> {ok, DocResults} = open_doc_revs(DbS, Id, MissingRevs, [latest]), % only save successful reads @@ -247,13 +268,9 @@ enum_docs_parallel(DbS, DbT, DocInfoList) -> receive got_it -> ok end, [{missing_checked, length(SrcRevs)}, {missing_found, length(MissingRevs)}, - {docs_read, length(Docs)}]; - {ok, []} -> - SaveDocsPid ! {self(), skip, Seq}, - receive got_it -> ok end, - [{missing_checked, length(SrcRevs)}] - end - end, DocInfoList), + {docs_read, length(Docs)}] + end + end, InfoList), SaveDocsPid ! {self(), shutdown}, @@ -345,7 +362,22 @@ enum_docs_since(DbSource, DbTarget, StartSeq, InAcc) -> [] -> {ok, InAcc}; _ -> - Stats = enum_docs_parallel(DbSource, DbTarget, DocInfoList), + UpdateSeqs = [D#doc_info.update_seq || D <- DocInfoList], + SrcRevsList = lists:map(fun(SrcDocInfo) -> + #doc_info{id=Id, + rev=Rev, + conflict_revs=Conflicts, + deleted_conflict_revs=DelConflicts + } = SrcDocInfo, + SrcRevs = [Rev | Conflicts] ++ DelConflicts, + {Id, SrcRevs} + end, DocInfoList), + {ok, MissingRevsList} = get_missing_revs(DbTarget, SrcRevsList), + InfoList = lists:map(fun({{Id, SrcRevs}, Seq}) -> + MissingRevs = proplists:get_value(Id, MissingRevsList, []), + {Id, Seq, SrcRevs, MissingRevs} + end, lists:zip(SrcRevsList, UpdateSeqs)), + Stats = enum_docs_parallel(DbSource, DbTarget, InfoList), OldStats = element(2, InAcc), TotalStats = [ {<<"missing_checked">>, -- cgit v1.2.3