summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAdam Kocoloski <kocolosk@apache.org>2009-08-07 19:12:39 +0000
committerAdam Kocoloski <kocolosk@apache.org>2009-08-07 19:12:39 +0000
commit7f9a155b4614190aeb0ac1e6bbdb7e8bbec32a08 (patch)
tree73cf8414f81c4024c34e2e0eeaebee7ba0b62c5a /src
parentdfea1d87c21bbe8c8d894941fc17c9d617c44b9a (diff)
ibrowse wrapper for replicator, will replace do_http_request
git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@802145 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src')
-rw-r--r--src/couchdb/Makefile.am2
-rw-r--r--src/couchdb/couch_db.hrl21
-rw-r--r--src/couchdb/couch_rep.erl26
-rw-r--r--src/couchdb/couch_rep_httpc.erl163
4 files changed, 199 insertions, 13 deletions
diff --git a/src/couchdb/Makefile.am b/src/couchdb/Makefile.am
index ec707d64..c3806310 100644
--- a/src/couchdb/Makefile.am
+++ b/src/couchdb/Makefile.am
@@ -75,6 +75,7 @@ source_files = \
couch_ref_counter.erl \
couch_rep.erl \
couch_rep_changes_feed.erl \
+ couch_rep_httpc.erl \
couch_rep_sup.erl \
couch_server.erl \
couch_server_sup.erl \
@@ -122,6 +123,7 @@ compiled_files = \
couch_ref_counter.beam \
couch_rep.beam \
couch_rep_changes_feed.beam \
+ couch_rep_httpc.beam \
couch_rep_sup.beam \
couch_server.beam \
couch_server_sup.beam \
diff --git a/src/couchdb/couch_db.hrl b/src/couchdb/couch_db.hrl
index d4e0fbd5..53b2eaf9 100644
--- a/src/couchdb/couch_db.hrl
+++ b/src/couchdb/couch_db.hrl
@@ -228,5 +228,26 @@
view_states=nil
}).
+-record(http_db, {
+ url,
+ auth = [],
+ resource = "",
+ headers = [
+ {"User-Agent", "CouchDb/"++couch_server:get_version()},
+ {"Accept", "application/json"},
+ {"Accept-Encoding", "gzip"}
+ ],
+ qs = [],
+ method = get,
+ body = nil,
+ options = [
+ {response_format,binary},
+ {inactivity_timeout, 30000}
+ ],
+ retries = 10,
+ pause = 1,
+ conn = nil
+}).
+
% small value used in revision trees to indicate the revision isn't stored
-define(REV_MISSING, []).
diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl
index b63a2610..1692943a 100644
--- a/src/couchdb/couch_rep.erl
+++ b/src/couchdb/couch_rep.erl
@@ -82,7 +82,7 @@ replicate(Source, Target) ->
%% gen_server callbacks
%%=============================================================================
--record(http_db, {
+-record(old_http_db, {
uri,
headers,
oauth
@@ -391,7 +391,7 @@ attachment_loop(ReqId, Conn) ->
att_stub_converter(DbS, Id, Rev,
#att{name=Name,data=stub,type=Type,len=Length}=Att) ->
- #http_db{uri=DbUrl, headers=Headers} = DbS,
+ #old_http_db{uri=DbUrl, headers=Headers} = DbS,
{Pos, [RevId|_]} = Rev,
Url = lists:flatten([DbUrl, couch_util:url_encode(Id), "/", couch_util:url_encode(?b2l(Name)),
"?rev=", ?b2l(couch_doc:rev_to_str({Pos,RevId}))]),
@@ -498,7 +498,7 @@ make_att_stub_receiver(Url, Headers, Name, Type, Length, Retries, Pause) ->
open_db({remote, Url, Headers, Auth})->
- {ok, #http_db{uri=?b2l(Url), headers=Headers, oauth=Auth}, Url};
+ {ok, #old_http_db{uri=?b2l(Url), headers=Headers, oauth=Auth}, Url};
open_db({local, DbName, UserCtx})->
case couch_db:open(DbName, [{user_ctx, UserCtx}]) of
{ok, Db} -> {ok, Db, DbName};
@@ -506,7 +506,7 @@ open_db({local, DbName, UserCtx})->
end.
-close_db(#http_db{})->
+close_db(#old_http_db{})->
ok;
close_db(Db)->
couch_db:close(Db).
@@ -675,7 +675,7 @@ do_http_request0(Url, Action, Headers, JsonBody, Retries, Pause) ->
do_http_request0(Url, Action, Headers, JsonBody, Retries - 1, 2*Pause)
end.
-ensure_full_commit(#http_db{uri=DbUrl, headers=Headers, oauth=OAuth}) ->
+ensure_full_commit(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth}) ->
{ResultProps} = do_http_request(DbUrl ++ "_ensure_full_commit", post,
Headers, OAuth, true),
true = proplists:get_value(<<"ok">>, ResultProps),
@@ -707,13 +707,13 @@ enum_docs_since(Pid, DbSource, DbTarget, {StartSeq, RevsCount}) ->
-get_db_info(#http_db{uri=DbUrl, headers=Headers, oauth=OAuth}) ->
+get_db_info(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth}) ->
{DbProps} = do_http_request(DbUrl, get, Headers, OAuth),
{ok, [{list_to_atom(?b2l(K)), V} || {K,V} <- DbProps]};
get_db_info(Db) ->
couch_db:get_db_info(Db).
-get_doc_info_list(#http_db{uri=DbUrl, headers=Headers, oauth=OAuth}, StartSeq) ->
+get_doc_info_list(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth}, StartSeq) ->
Url = DbUrl ++ "_all_docs_by_seq?limit=100&startkey="
++ integer_to_list(StartSeq),
{Results} = do_http_request(Url, get, Headers, OAuth),
@@ -739,7 +739,7 @@ get_doc_info_list(DbSource, StartSeq) ->
end, {0, []}),
lists:reverse(DocInfoList).
-get_missing_revs(#http_db{uri=DbUrl, headers=Headers, oauth=OAuth}, DocIdRevsList) ->
+get_missing_revs(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth}, DocIdRevsList) ->
DocIdRevsList2 = [{Id, couch_doc:rev_to_strs(Revs)} || {Id, Revs} <- DocIdRevsList],
{ResponseMembers} = do_http_request(DbUrl ++ "_missing_revs", post, Headers, OAuth,
{DocIdRevsList2}),
@@ -750,7 +750,7 @@ get_missing_revs(Db, DocId) ->
couch_db:get_missing_revs(Db, DocId).
-open_doc(#http_db{uri=DbUrl, headers=Headers, oauth=OAuth}, DocId, Options) ->
+open_doc(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth}, DocId, Options) ->
[] = Options,
case do_http_request(DbUrl ++ couch_util:url_encode(DocId), get, Headers, OAuth) of
{[{<<"error">>, ErrId}, {<<"reason">>, Reason}]} ->
@@ -761,7 +761,7 @@ open_doc(#http_db{uri=DbUrl, headers=Headers, oauth=OAuth}, DocId, Options) ->
open_doc(Db, DocId, Options) ->
couch_db:open_doc(Db, DocId, Options).
-open_doc_revs(#http_db{uri=DbUrl, headers=Headers, oauth=OAuth} = DbS, DocId, Revs0,
+open_doc_revs(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth} = DbS, DocId, Revs0,
[latest]) ->
Revs = couch_doc:rev_to_strs(Revs0),
BaseUrl = DbUrl ++ couch_util:url_encode(DocId) ++ "?revs=true&latest=true",
@@ -845,7 +845,7 @@ binary_memory(Pid) ->
lists:foldl(fun({_Id, Size, _NRefs}, Acc) -> Size+Acc end,
0, element(2,process_info(Pid, binary))).
-update_doc(#http_db{uri=DbUrl, headers=Headers, oauth=OAuth}, #doc{id=DocId}=Doc, Options) ->
+update_doc(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth}, #doc{id=DocId}=Doc, Options) ->
[] = Options,
Url = DbUrl ++ couch_util:url_encode(DocId),
{ResponseMembers} = do_http_request(Url, put, Headers, OAuth,
@@ -857,7 +857,7 @@ update_doc(Db, Doc, Options) ->
update_docs(_, [], _, _) ->
{ok, []};
-update_docs(#http_db{uri=DbUrl, headers=Headers, oauth=OAuth}, Docs, [], replicated_changes) ->
+update_docs(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth}, Docs, [], replicated_changes) ->
JsonDocs = [couch_doc:to_json_obj(Doc, [revs,attachments]) || Doc <- Docs],
ErrorsJson =
do_http_request(DbUrl ++ "_bulk_docs", post, Headers, OAuth,
@@ -877,7 +877,7 @@ update_docs(#http_db{uri=DbUrl, headers=Headers, oauth=OAuth}, Docs, [], replica
update_docs(Db, Docs, Options, UpdateType) ->
couch_db:update_docs(Db, Docs, Options, UpdateType).
-up_to_date(#http_db{}, _Seq) ->
+up_to_date(#old_http_db{}, _Seq) ->
true;
up_to_date(Source, Seq) ->
{ok, NewDb} = couch_db:open(Source#db.name, []),
diff --git a/src/couchdb/couch_rep_httpc.erl b/src/couchdb/couch_rep_httpc.erl
new file mode 100644
index 00000000..aa2b2714
--- /dev/null
+++ b/src/couchdb/couch_rep_httpc.erl
@@ -0,0 +1,163 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_rep_httpc).
+-include("couch_db.hrl").
+-include("../ibrowse/ibrowse.hrl").
+
+-export([db_exists/1, full_url/1, request/1, spawn_worker_process/1,
+ spawn_link_worker_process/1]).
+
+request(Req) when is_record(Req, http_db) ->
+ do_request(Req).
+
+do_request(#http_db{url=Url} = Req) when is_binary(Url) ->
+ do_request(Req#http_db{url = ?b2l(Url)});
+
+do_request(Req) ->
+ #http_db{
+ auth = Auth,
+ headers = Headers0,
+ method = Method,
+ body = B,
+ options = Opts,
+ conn = Conn
+ } = Req,
+ Url = full_url(Req),
+ Headers = case proplists:get_value(<<"oauth">>, Auth) of
+ undefined ->
+ Headers0;
+ {OAuthProps} ->
+ [oauth_header(Url, Method, OAuthProps) | Headers0]
+ end,
+ Body = if B =:= nil -> []; true -> iolist_to_binary(?JSON_ENCODE(B)) end,
+ Resp = case Conn of
+ nil ->
+ ibrowse:send_req(Url, Headers, Method, Body, Opts, infinity);
+ _ ->
+ ibrowse:send_req_direct(Conn, Url, Headers, Method, Body, Opts, infinity)
+ end,
+ process_response(Resp, Req).
+
+db_exists(Req) ->
+ #http_db{
+ url = Url,
+ headers = Headers
+ } = Req,
+ case catch ibrowse:send_req(Url, Headers, head) of
+ {ok, "200", _, _} ->
+ true;
+ {ok, "301", Headers, _} ->
+ MochiHeaders = mochiweb_headers:make(Headers),
+ RedirectUrl = mochiweb_headers:get_value("Location", MochiHeaders),
+ db_exists(Req#http_db{url = RedirectUrl});
+ Error ->
+ ?LOG_DEBUG("DB at ~s could not be found because ~p", [Url, Error]),
+ false
+ end.
+
+full_url(#http_db{url=Url} = Req) when is_binary(Url) ->
+ full_url(Req#http_db{url = ?b2l(Url)});
+
+full_url(#http_db{qs=[]} = Req) ->
+ Req#http_db.url ++ Req#http_db.resource;
+
+full_url(Req) ->
+ #http_db{
+ url = Url,
+ resource = Resource,
+ qs = QS
+ } = Req,
+ QStr = lists:map(fun({K,V}) -> io_lib:format("~s=~s",
+ [couch_util:to_list(K), couch_util:to_list(V)]) end, QS),
+ lists:flatten([Url, Resource, "?", string:join(QStr, "&")]).
+
+process_response({ok, Status, Headers, Body}, Req) ->
+ Code = list_to_integer(Status),
+ if Code =:= 200; Code =:= 201 ->
+ ?JSON_DECODE(maybe_decompress(Headers, Body));
+ Code =:= 301 ->
+ MochiHeaders = mochiweb_headers:make(Headers),
+ RedirectUrl = mochiweb_headers:get_value("Location", MochiHeaders),
+ do_request(Req#http_db{url = RedirectUrl});
+ Code >= 400, Code < 500 ->
+ ?JSON_DECODE(maybe_decompress(Headers, Body));
+ Code =:= 500; Code =:= 502 ->
+ #http_db{pause = Pause, retries = Retries} = Req,
+ ?LOG_INFO("retrying couch_rep_httpc request in ~p seconds " ++
+ % "due to remote server error: ~s~s", [Pause/1000, Req#http_db.url,
+ "due to remote server error: ~p Body ~s", [Pause, Code,
+ Body]),
+ timer:sleep(1000*Pause),
+ do_request(Req#http_db{retries = Retries-1, pause = 2*Pause})
+ end;
+
+process_response({ibrowse_req_id, Id}, _Req) ->
+ {ibrowse_req_id, Id};
+
+process_response({error, _Reason}, #http_db{url=Url, retries=0}) ->
+ ?LOG_ERROR("couch_rep_httpc request failed after 10 retries: ~s", [Url]),
+ exit({http_request_failed, ?l2b(["failed to replicate ", Url])});
+process_response({error, Reason}, Req) ->
+ #http_db{
+ method = Method,
+ retries = Retries,
+ pause = Pause
+ } = Req,
+ ShortReason = case Reason of
+ connection_closed ->
+ connection_closed;
+ {'EXIT', {noproc, _}} ->
+ noproc;
+ {'EXIT', {normal, _}} ->
+ normal;
+ Else ->
+ Else
+ end,
+ ?LOG_DEBUG("retrying couch_rep_httpc ~p request in ~p seconds due to " ++
+ "{error, ~p}", [Method, Pause, ShortReason]),
+ % "{error}", [Method, Pause]),
+ timer:sleep(1000*Pause),
+ do_request(Req#http_db{retries = Retries-1, pause = 2*Pause}).
+
+spawn_worker_process(Req) ->
+ Url = ibrowse_lib:parse_url(Req#http_db.url),
+ {ok, Pid} = ibrowse:spawn_worker_process(Url#url.host, Url#url.port),
+ Pid.
+
+spawn_link_worker_process(Req) ->
+ Url = ibrowse_lib:parse_url(Req#http_db.url),
+ {ok, Pid} = ibrowse:spawn_link_worker_process(Url#url.host, Url#url.port),
+ Pid.
+
+maybe_decompress(Headers, Body) ->
+ MochiHeaders = mochiweb_headers:make(Headers),
+ case mochiweb_headers:get_value("Content-Encoding", MochiHeaders) of
+ "gzip" ->
+ zlib:gunzip(Body);
+ _ ->
+ Body
+ end.
+
+oauth_header(Url, Action, Props) ->
+ ConsumerKey = ?b2l(proplists:get_value(<<"consumer_key">>, Props)),
+ Token = ?b2l(proplists:get_value(<<"token">>, Props)),
+ TokenSecret = ?b2l(proplists:get_value(<<"token_secret">>, Props)),
+ ConsumerSecret = ?b2l(proplists:get_value(<<"consumer_secret">>, Props)),
+ Consumer = {ConsumerKey, ConsumerSecret, hmac_sha1},
+ Method = case Action of
+ get -> "GET";
+ post -> "POST";
+ put -> "PUT"
+ end,
+ Params = oauth:signed_params(Method, Url, [], Consumer, Token, TokenSecret),
+ {"Authorization", "OAuth " ++ oauth_uri:params_to_header_string(Params)}.