summaryrefslogtreecommitdiff
path: root/apps/couch/src/couch_rep_reader.erl
diff options
context:
space:
mode:
Diffstat (limited to 'apps/couch/src/couch_rep_reader.erl')
-rw-r--r--apps/couch/src/couch_rep_reader.erl63
1 files changed, 34 insertions, 29 deletions
diff --git a/apps/couch/src/couch_rep_reader.erl b/apps/couch/src/couch_rep_reader.erl
index 46633994..a7ae45a8 100644
--- a/apps/couch/src/couch_rep_reader.erl
+++ b/apps/couch/src/couch_rep_reader.erl
@@ -20,12 +20,9 @@
-import(couch_util, [url_encode/1]).
-define (BUFFER_SIZE, 1000).
--define (MAX_CONCURRENT_REQUESTS, 10).
--define (MAX_CONNECTIONS, 20).
--define (MAX_PIPELINE_SIZE, 50).
+-define (MAX_CONCURRENT_REQUESTS, 100).
-include("couch_db.hrl").
--include_lib("ibrowse/include/ibrowse.hrl").
-record (state, {
parent,
@@ -53,14 +50,9 @@ next(Pid) ->
init([Parent, Source, MissingRevs_or_DocIds, _PostProps]) ->
process_flag(trap_exit, true),
- if is_record(Source, http_db) ->
- #url{host=Host, port=Port} = ibrowse_lib:parse_url(Source#http_db.url),
- ibrowse:set_max_sessions(Host, Port, ?MAX_CONNECTIONS),
- ibrowse:set_max_pipeline_size(Host, Port, ?MAX_PIPELINE_SIZE);
- true -> ok end,
Self = self(),
ReaderLoop = spawn_link(
- fun() -> reader_loop(Self, Source, MissingRevs_or_DocIds) end
+ fun() -> reader_loop(Self, Parent, Source, MissingRevs_or_DocIds) end
),
MissingRevs = case MissingRevs_or_DocIds of
Pid when is_pid(Pid) ->
@@ -230,7 +222,7 @@ update_sequence_lists(Seq, State) ->
opened_seqs = Opened
}.
-open_doc_revs(#http_db{} = DbS, DocId, Revs) ->
+open_doc_revs(#http_db{url = Url} = DbS, DocId, Revs) ->
%% all this logic just splits up revision lists that are too long for
%% MochiWeb into multiple requests
BaseQS = [{revs,true}, {latest,true}, {att_encoding_info,true}],
@@ -246,36 +238,48 @@ open_doc_revs(#http_db{} = DbS, DocId, Revs) ->
JsonResults = lists:flatten([couch_rep_httpc:request(R) || R <- Requests]),
Transform =
- fun({[{<<"missing">>, Rev}]}) ->
- {{not_found, missing}, couch_doc:parse_rev(Rev)};
- ({[{<<"ok">>, Json}]}) ->
+ fun({[{<<"ok">>, Json}]}, Acc) ->
#doc{id=Id, revs=Rev, atts=Atts} = Doc = couch_doc:from_json_obj(Json),
- Doc#doc{atts=[couch_rep_att:convert_stub(A, {DbS,Id,Rev}) || A <- Atts]}
+ Doc1 = Doc#doc{
+ atts=[couch_rep_att:convert_stub(A, {DbS,Id,Rev}) || A <- Atts]
+ },
+ [Doc1 | Acc];
+ ({ErrorProps}, Acc) ->
+ Err = couch_util:get_value(<<"error">>, ErrorProps,
+ ?JSON_ENCODE({ErrorProps})),
+ ?LOG_ERROR("Replicator: error accessing doc ~s at ~s, reason: ~s",
+ [DocId, couch_util:url_strip_password(Url), Err]),
+ Acc
end,
- [Transform(Result) || Result <- JsonResults].
+ lists:reverse(lists:foldl(Transform, [], JsonResults)).
-open_doc(#http_db{} = DbS, DocId) ->
+open_doc(#http_db{url = Url} = DbS, DocId) ->
% get latest rev of the doc
Req = DbS#http_db{
resource=url_encode(DocId),
qs=[{att_encoding_info, true}]
},
- case couch_rep_httpc:request(Req) of
- {[{<<"error">>,<<"not_found">>}, {<<"reason">>,<<"missing">>}]} ->
- [];
- Json ->
+ {Props} = Json = couch_rep_httpc:request(Req),
+ case couch_util:get_value(<<"_id">>, Props) of
+ Id when is_binary(Id) ->
#doc{id=Id, revs=Rev, atts=Atts} = Doc = couch_doc:from_json_obj(Json),
[Doc#doc{
atts=[couch_rep_att:convert_stub(A, {DbS,Id,Rev}) || A <- Atts]
- }]
+ }];
+ undefined ->
+ Err = couch_util:get_value(<<"error">>, Props, ?JSON_ENCODE(Json)),
+ ?LOG_ERROR("Replicator: error accessing doc ~s at ~s, reason: ~s",
+ [DocId, couch_util:url_strip_password(Url), Err]),
+ []
end.
-reader_loop(ReaderServer, Source, DocIds) when is_list(DocIds) ->
- case Source of
+reader_loop(ReaderServer, Parent, Source1, DocIds) when is_list(DocIds) ->
+ case Source1 of
#http_db{} ->
[gen_server:call(ReaderServer, {open_remote_doc, Id, nil, nil},
infinity) || Id <- DocIds];
_LocalDb ->
+ {ok, Source} = gen_server:call(Parent, get_source_db, infinity),
Docs = lists:foldr(fun(Id, Acc) ->
case couch_db:open_doc(Source, Id) of
{ok, Doc} ->
@@ -288,7 +292,7 @@ reader_loop(ReaderServer, Source, DocIds) when is_list(DocIds) ->
end,
exit(complete);
-reader_loop(ReaderServer, Source, MissingRevsServer) ->
+reader_loop(ReaderServer, Parent, Source, MissingRevsServer) ->
case couch_rep_missing_revs:next(MissingRevsServer) of
complete ->
exit(complete);
@@ -301,22 +305,23 @@ reader_loop(ReaderServer, Source, MissingRevsServer) ->
#http_db{} ->
[gen_server:call(ReaderServer, {open_remote_doc, Id, Seq, Revs},
infinity) || {Id,Seq,Revs} <- SortedIdsRevs],
- reader_loop(ReaderServer, Source, MissingRevsServer);
+ reader_loop(ReaderServer, Parent, Source, MissingRevsServer);
_Local ->
- Source2 = maybe_reopen_db(Source, HighSeq),
+ {ok, Source1} = gen_server:call(Parent, get_source_db, infinity),
+ Source2 = maybe_reopen_db(Source1, HighSeq),
lists:foreach(fun({Id,Seq,Revs}) ->
{ok, Docs} = couch_db:open_doc_revs(Source2, Id, Revs, [latest]),
JustTheDocs = [Doc || {ok, Doc} <- Docs],
gen_server:call(ReaderServer, {add_docs, Seq, JustTheDocs},
infinity)
end, SortedIdsRevs),
- reader_loop(ReaderServer, Source2, MissingRevsServer)
+ couch_db:close(Source2),
+ reader_loop(ReaderServer, Parent, Source2, MissingRevsServer)
end
end.
maybe_reopen_db(#db{update_seq=OldSeq} = Db, HighSeq) when HighSeq > OldSeq ->
{ok, NewDb} = couch_db:open(Db#db.name, [{user_ctx, Db#db.user_ctx}]),
- couch_db:close(Db),
NewDb;
maybe_reopen_db(Db, _HighSeq) ->
Db.