summaryrefslogtreecommitdiff
path: root/src/couchdb/couch_rep.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/couchdb/couch_rep.erl')
-rw-r--r--src/couchdb/couch_rep.erl96
1 files changed, 64 insertions, 32 deletions
diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl
index 29f1fc80..ff926584 100644
--- a/src/couchdb/couch_rep.erl
+++ b/src/couchdb/couch_rep.erl
@@ -157,30 +157,54 @@ replicate2(Source, DbSrc, Target, DbTgt, Options) ->
end.
pull_rep(DbTarget, DbSource, SourceSeqNum) ->
- http:set_options([{max_pipeline_length, 101}, {pipeline_timeout, 5000}]),
{ok, {NewSeq, Stats}} =
enum_docs_since(DbSource, DbTarget, SourceSeqNum, {SourceSeqNum, []}),
- http:set_options([{max_pipeline_length, 2}, {pipeline_timeout, 0}]),
{NewSeq, Stats}.
do_http_request(Url, Action, Headers) ->
do_http_request(Url, Action, Headers, []).
do_http_request(Url, Action, Headers, JsonBody) ->
- ?LOG_DEBUG("couch_rep HTTP client request:", []),
- ?LOG_DEBUG("\tAction: ~p", [Action]),
- ?LOG_DEBUG("\tUrl: ~p", [Url]),
- Request =
+ do_http_request(Url, Action, Headers, JsonBody, 10).
+
+do_http_request(Url, Action, _Headers, _JsonBody, 0) ->
+ ?LOG_ERROR("couch_rep HTTP ~p request failed after 10 retries: ~p",
+ [Action, Url]);
+do_http_request(Url, Action, Headers, JsonBody, Retries) ->
+ ?LOG_DEBUG("couch_rep HTTP ~p request: ~p", [Action, Url]),
+ Body =
case JsonBody of
[] ->
- {Url, Headers};
+ <<>>;
_ ->
- {Url, Headers, "application/json; charset=utf-8", iolist_to_binary(?JSON_ENCODE(JsonBody))}
+ iolist_to_binary(?JSON_ENCODE(JsonBody))
end,
- {ok, {{_, ResponseCode,_},_Headers, ResponseBody}} = http:request(Action, Request, [], []),
- if
- ResponseCode >= 200, ResponseCode < 500 ->
- ?JSON_DECODE(ResponseBody)
+ Options = [
+ {content_type, "application/json; charset=utf-8"},
+ {max_pipeline_size, 101},
+ {transfer_encoding, {chunked, 65535}}
+ ],
+ case ibrowse:send_req(Url, Headers, Action, Body, Options) of
+ {ok, Status, ResponseHeaders, ResponseBody} ->
+ ResponseCode = list_to_integer(Status),
+ if
+ ResponseCode >= 200, ResponseCode < 300 ->
+ ?JSON_DECODE(ResponseBody);
+ ResponseCode >= 300, ResponseCode < 400 ->
+ RedirectUrl = mochiweb_headers:get_value("Location",
+ mochiweb_headers:make(ResponseHeaders)),
+ do_http_request(RedirectUrl, Action, Headers, JsonBody, Retries-1);
+ ResponseCode >= 400, ResponseCode < 500 ->
+ ?JSON_DECODE(ResponseBody);
+ ResponseCode == 500 ->
+ ?LOG_INFO("retrying couch_rep HTTP ~p request due to 500 error: ~p",
+ [Action, Url]),
+ do_http_request(Url, Action, Headers, JsonBody, Retries - 1)
+ end;
+ {error, Reason} ->
+ ?LOG_INFO("retrying couch_rep HTTP ~p request due to {error, ~p}: ~p",
+ [Action, Reason, Url]),
+ do_http_request(Url, Action, Headers, JsonBody, Retries - 1)
end.
save_docs_buffer(DbTarget, DocsBuffer, []) ->
@@ -223,20 +247,17 @@ wait_result({Pid,Ref}) ->
{'DOWN', Ref, _, _, Reason} -> exit(Reason)
end.
-enum_docs_parallel(DbS, DbT, DocInfoList) ->
- UpdateSeqs = [D#doc_info.update_seq || D <- DocInfoList],
+enum_docs_parallel(DbS, DbT, InfoList) ->
+ UpdateSeqs = [Seq || {_, Seq, _, _} <- InfoList],
SaveDocsPid = spawn_link(fun() -> save_docs_buffer(DbT,[],UpdateSeqs) end),
- Stats = pmap(fun(SrcDocInfo) ->
- #doc_info{id=Id,
- rev=Rev,
- conflict_revs=Conflicts,
- deleted_conflict_revs=DelConflicts,
- update_seq=Seq} = SrcDocInfo,
- SrcRevs = [Rev | Conflicts] ++ DelConflicts,
-
- case get_missing_revs(DbT, [{Id, SrcRevs}]) of
- {ok, [{Id, MissingRevs}]} ->
+ Stats = pmap(fun({Id, Seq, SrcRevs, MissingRevs}) ->
+ case MissingRevs of
+ [] ->
+ SaveDocsPid ! {self(), skip, Seq},
+ receive got_it -> ok end,
+ [{missing_checked, length(SrcRevs)}];
+ _ ->
{ok, DocResults} = open_doc_revs(DbS, Id, MissingRevs, [latest]),
% only save successful reads
@@ -247,13 +268,9 @@ enum_docs_parallel(DbS, DbT, DocInfoList) ->
receive got_it -> ok end,
[{missing_checked, length(SrcRevs)},
{missing_found, length(MissingRevs)},
- {docs_read, length(Docs)}];
- {ok, []} ->
- SaveDocsPid ! {self(), skip, Seq},
- receive got_it -> ok end,
- [{missing_checked, length(SrcRevs)}]
- end
- end, DocInfoList),
+ {docs_read, length(Docs)}]
+ end
+ end, InfoList),
SaveDocsPid ! {self(), shutdown},
@@ -345,7 +362,22 @@ enum_docs_since(DbSource, DbTarget, StartSeq, InAcc) ->
[] ->
{ok, InAcc};
_ ->
- Stats = enum_docs_parallel(DbSource, DbTarget, DocInfoList),
+ UpdateSeqs = [D#doc_info.update_seq || D <- DocInfoList],
+ SrcRevsList = lists:map(fun(SrcDocInfo) ->
+ #doc_info{id=Id,
+ rev=Rev,
+ conflict_revs=Conflicts,
+ deleted_conflict_revs=DelConflicts
+ } = SrcDocInfo,
+ SrcRevs = [Rev | Conflicts] ++ DelConflicts,
+ {Id, SrcRevs}
+ end, DocInfoList),
+ {ok, MissingRevsList} = get_missing_revs(DbTarget, SrcRevsList),
+ InfoList = lists:map(fun({{Id, SrcRevs}, Seq}) ->
+ MissingRevs = proplists:get_value(Id, MissingRevsList, []),
+ {Id, Seq, SrcRevs, MissingRevs}
+ end, lists:zip(SrcRevsList, UpdateSeqs)),
+ Stats = enum_docs_parallel(DbSource, DbTarget, InfoList),
OldStats = element(2, InAcc),
TotalStats = [
{<<"missing_checked">>,