summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordrebs <drebs@leap.se>2015-07-08 19:15:56 -0300
committerRuben Pollan <meskio@sindominio.net>2015-07-22 10:04:22 -0400
commit34f6b07e08540fd9b2ec473d1e4e5a15be4feacc (patch)
tree63eb157e711eee447bbd96adb17f7139dbedc48a
parent46870f1e26ef159bf6fe4744aba3b00a5e81cc3b (diff)
[bug] add http request timeout
The connectTimeout parameter of twisted.web.client.Agent only acts on the connection setup, and the Agent will wait forever for incoming data after the connection has been established. This commit adds a timeout for the connection, and will cancel the deferred if the result has not been received after a certain number of seconds.
-rw-r--r--changes/bug_7234_add-http-request-timeout1
-rw-r--r--src/leap/common/http.py260
2 files changed, 215 insertions, 46 deletions
diff --git a/changes/bug_7234_add-http-request-timeout b/changes/bug_7234_add-http-request-timeout
new file mode 100644
index 0000000..d18b28b
--- /dev/null
+++ b/changes/bug_7234_add-http-request-timeout
@@ -0,0 +1 @@
+ o Add http request timeout. Related to #7234.
diff --git a/src/leap/common/http.py b/src/leap/common/http.py
index 8d22f2c..c93e65b 100644
--- a/src/leap/common/http.py
+++ b/src/leap/common/http.py
@@ -36,21 +36,24 @@ from zope.interface import implements
from twisted.internet import reactor
from twisted.internet import defer
-from twisted.internet.defer import succeed
+from twisted.python import failure
from twisted.web.client import Agent
from twisted.web.client import HTTPConnectionPool
+from twisted.web.client import _HTTP11ClientFactory as HTTP11ClientFactory
from twisted.web.client import readBody
from twisted.web.http_headers import Headers
from twisted.web.iweb import IBodyProducer
+from twisted.web._newclient import HTTP11ClientProtocol
-def createPool(maxPersistentPerHost=10, persistent=True):
- pool = HTTPConnectionPool(reactor, persistent)
- pool.maxPersistentPerHost = maxPersistentPerHost
- return pool
+__all__ = ["HTTPClient"]
-_pool = createPool()
+
+# A default HTTP timeout is used for 2 distinct purposes:
+# 1. as HTTP connection timeout, prior to connection estabilshment.
+# 2. as data reception timeout, after the connection has been established.
+DEFAULT_HTTP_TIMEOUT = 30 # seconds
class HTTPClient(object):
@@ -66,28 +69,36 @@ class HTTPClient(object):
in order to avoid resource abuse on huge requests batches.
"""
- def __init__(self, cert_file=None, pool=_pool):
+ def __init__(self, cert_file=None, timeout=DEFAULT_HTTP_TIMEOUT):
"""
Init the HTTP client
:param cert_file: The path to the certificate file, if None given the
system's CAs will be used.
:type cert_file: str
- :param pool: An optional dedicated connection pool to override the
- default shared one.
- :type pool: HTTPConnectionPool
+ :param timeout: The amount of time that this Agent will wait for the
+ peer to accept a connection and for each request to be
+ finished. If a pool is passed, then this argument is
+ ignored.
+ :type timeout: float
"""
- policy = get_compatible_ssl_context_factory(cert_file)
-
- self._pool = pool
+ self._timeout = timeout
+ self._pool = self._createPool()
self._agent = Agent(
reactor,
- policy,
- pool=pool)
- self._semaphore = defer.DeferredSemaphore(pool.maxPersistentPerHost)
+ get_compatible_ssl_context_factory(cert_file),
+ pool=self._pool,
+ connectTimeout=self._timeout)
+ self._semaphore = defer.DeferredSemaphore(
+ self._pool.maxPersistentPerHost)
- def request(self, url, method='GET', body=None, headers={}):
+ def _createPool(self, maxPersistentPerHost=10, persistent=True):
+ pool = _HTTPConnectionPool(reactor, persistent, self._timeout)
+ pool.maxPersistentPerHost = maxPersistentPerHost
+ return pool
+
+ def _request(self, url, method, body, headers):
"""
Perform an HTTP request.
@@ -104,51 +115,208 @@ class HTTPClient(object):
:rtype: twisted.internet.defer.Deferred
"""
if body:
- body = HTTPClient.StringBodyProducer(body)
- d = self._semaphore.run(self._agent.request,
- method, url, headers=Headers(headers),
- bodyProducer=body)
+ body = _StringBodyProducer(body)
+ d = self._agent.request(
+ method, url, headers=Headers(headers), bodyProducer=body)
d.addCallback(readBody)
return d
+ def request(self, url, method='GET', body=None, headers={}):
+ """
+ Perform an HTTP request, but limit the maximum amount of concurrent
+ connections.
+
+ :param url: The URL for the request.
+ :type url: str
+ :param method: The HTTP method of the request.
+ :type method: str
+ :param body: The body of the request, if any.
+ :type body: str
+ :param headers: The headers of the request.
+ :type headers: dict
+
+ :return: A deferred that fires with the body of the request.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ return self._semaphore.run(self._request, url, method, body, headers)
+
def close(self):
"""
Close any cached connections.
"""
self._pool.closeCachedConnections()
- class StringBodyProducer(object):
+#
+# An IBodyProducer to write the body of an HTTP request as a string.
+#
+
+class _StringBodyProducer(object):
+ """
+ A producer that writes the body of a request to a consumer.
+ """
+
+ implements(IBodyProducer)
+
+ def __init__(self, body):
+ """
+ Initialize the string produer.
+
+ :param body: The body of the request.
+ :type body: str
+ """
+ self.body = body
+ self.length = len(body)
+
+ def startProducing(self, consumer):
+ """
+ Write the body to the consumer.
+
+ :param consumer: Any IConsumer provider.
+ :type consumer: twisted.internet.interfaces.IConsumer
+
+ :return: A successful deferred.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ consumer.write(self.body)
+ return defer.succeed(None)
+
+ def pauseProducing(self):
+ pass
+
+ def stopProducing(self):
+ pass
+
+
+#
+# Patched twisted.web classes
+#
+
+class _HTTP11ClientProtocol(HTTP11ClientProtocol):
+ """
+ A timeout-able HTTP 1.1 client protocol, that is instantiated by the
+ _HTTP11ClientFactory below.
+ """
+
+ def __init__(self, quiescentCallback, timeout):
+ """
+ Initialize the protocol.
+
+ :param quiescentCallback:
+ :type quiescentCallback: callable
+ :param timeout: A timeout, in seconds, for requests made by this
+ protocol.
+ :type timeout: float
+ """
+ HTTP11ClientProtocol.__init__(self, quiescentCallback)
+ self._timeout = timeout
+ self._timeoutCall = None
+
+ def request(self, request):
+ """
+ Issue request over self.transport and return a Deferred which
+ will fire with a Response instance or an error.
+
+ :param request: The object defining the parameters of the request to
+ issue.
+ :type request: twisted.web._newclient.Request
+
+ :return: A deferred which fires after the request has finished.
+ :rtype: Deferred
+ """
+ d = HTTP11ClientProtocol.request(self, request)
+ if self._timeout:
+ self._last_buffer_len = 0
+ timeoutCall = reactor.callLater(
+ self._timeout, self._doTimeout, request)
+ self._timeoutCall = timeoutCall
+ return d
+
+ def _doTimeout(self, request):
"""
- A producer that writes the body of a request to a consumer.
+ Give up the request because of a timeout.
+
+ :param request: The object defining the parameters of the request to
+ issue.
+ :type request: twisted.web._newclient.Request
+ """
+ self._giveUp(
+ failure.Failure(
+ defer.TimeoutError(
+ "Getting %s took longer than %s seconds."
+ % (request.absoluteURI, self._timeout))))
+
+ def _cancelTimeout(self):
"""
+ Cancel the request timeout, when it's finished.
+ """
+ if self._timeoutCall.active():
+ self._timeoutCall.cancel()
+ self._timeoutCall = None
+
+ def _finishResponse_WAITING(self, rest):
+ """
+ Cancel the timeout when finished receiving the response.
+ """
+ self._cancelTimeout()
+ HTTP11ClientProtocol._finishResponse_WAITING(self, rest)
- implements(IBodyProducer)
+ def _finishResponse_TRANSMITTING(self, rest):
+ """
+ Cancel the timeout when finished receiving the response.
+ """
+ self._cancelTimeout()
+ HTTP11ClientProtocol._finishResponse_TRANSMITTING(self, rest)
- def __init__(self, body):
- """
- Initialize the string produer.
+ def dataReceived(self, bytes):
+ """
+ Receive some data and extend the timeout period of this request.
+
+ :param bytes: A string of indeterminate length.
+ :type bytes: str
+ """
+ HTTP11ClientProtocol.dataReceived(self, bytes)
+ if self._timeoutCall and self._timeoutCall.active():
+ self._timeoutCall.reset(self._timeout)
- :param body: The body of the request.
- :type body: str
- """
- self.body = body
- self.length = len(body)
- def startProducing(self, consumer):
- """
- Write the body to the consumer.
+class _HTTP11ClientFactory(HTTP11ClientFactory):
+ """
+ A timeout-able HTTP 1.1 client protocol factory.
+ """
- :param consumer: Any IConsumer provider.
- :type consumer: twisted.internet.interfaces.IConsumer
+ def __init__(self, quiescentCallback, timeout):
+ """
+ :param quiescentCallback: The quiescent callback to be passed to
+ protocol instances, used to return them to
+ the connection pool.
+ :type quiescentCallback: callable(Protocol)
+ :param timeout: The timeout, in seconds, for requests made by
+ protocols created by this factory.
+ :type timeout: float
+ """
+ HTTP11ClientFactory.__init__(self, quiescentCallback)
+ self._timeout = timeout
+
+ def buildProtocol(self, _):
+ """
+ Build the HTTP 1.1 client protocol.
+ """
+ return _HTTP11ClientProtocol(self._quiescentCallback, self._timeout)
+
+
+class _HTTPConnectionPool(HTTPConnectionPool):
+ """
+ A timeout-able HTTP connection pool.
+ """
- :return: A successful deferred.
- :rtype: twisted.internet.defer.Deferred
- """
- consumer.write(self.body)
- return succeed(None)
+ _factory = _HTTP11ClientFactory
- def pauseProducing(self):
- pass
+ def __init__(self, reactor, persistent, timeout):
+ HTTPConnectionPool.__init__(self, reactor, persistent=persistent)
+ self._timeout = timeout
- def stopProducing(self):
- pass
+ def _newConnection(self, key, endpoint):
+ def quiescentCallback(protocol):
+ self._putConnection(key, protocol)
+ factory = self._factory(quiescentCallback, timeout=self._timeout)
+ return endpoint.connect(factory)