summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--changes/bug_7234_add-http-request-timeout1
-rw-r--r--src/leap/common/http.py260
2 files changed, 215 insertions, 46 deletions
diff --git a/changes/bug_7234_add-http-request-timeout b/changes/bug_7234_add-http-request-timeout
new file mode 100644
index 0000000..d18b28b
--- /dev/null
+++ b/changes/bug_7234_add-http-request-timeout
@@ -0,0 +1 @@
+ o Add http request timeout. Related to #7234.
diff --git a/src/leap/common/http.py b/src/leap/common/http.py
index 8d22f2c..c93e65b 100644
--- a/src/leap/common/http.py
+++ b/src/leap/common/http.py
@@ -36,21 +36,24 @@ from zope.interface import implements
from twisted.internet import reactor
from twisted.internet import defer
-from twisted.internet.defer import succeed
+from twisted.python import failure
from twisted.web.client import Agent
from twisted.web.client import HTTPConnectionPool
+from twisted.web.client import _HTTP11ClientFactory as HTTP11ClientFactory
from twisted.web.client import readBody
from twisted.web.http_headers import Headers
from twisted.web.iweb import IBodyProducer
+from twisted.web._newclient import HTTP11ClientProtocol
-def createPool(maxPersistentPerHost=10, persistent=True):
- pool = HTTPConnectionPool(reactor, persistent)
- pool.maxPersistentPerHost = maxPersistentPerHost
- return pool
+__all__ = ["HTTPClient"]
-_pool = createPool()
+
+# A default HTTP timeout is used for 2 distinct purposes:
+# 1. as HTTP connection timeout, prior to connection estabilshment.
+# 2. as data reception timeout, after the connection has been established.
+DEFAULT_HTTP_TIMEOUT = 30 # seconds
class HTTPClient(object):
@@ -66,28 +69,36 @@ class HTTPClient(object):
in order to avoid resource abuse on huge requests batches.
"""
- def __init__(self, cert_file=None, pool=_pool):
+ def __init__(self, cert_file=None, timeout=DEFAULT_HTTP_TIMEOUT):
"""
Init the HTTP client
:param cert_file: The path to the certificate file, if None given the
system's CAs will be used.
:type cert_file: str
- :param pool: An optional dedicated connection pool to override the
- default shared one.
- :type pool: HTTPConnectionPool
+ :param timeout: The amount of time that this Agent will wait for the
+ peer to accept a connection and for each request to be
+ finished. If a pool is passed, then this argument is
+ ignored.
+ :type timeout: float
"""
- policy = get_compatible_ssl_context_factory(cert_file)
-
- self._pool = pool
+ self._timeout = timeout
+ self._pool = self._createPool()
self._agent = Agent(
reactor,
- policy,
- pool=pool)
- self._semaphore = defer.DeferredSemaphore(pool.maxPersistentPerHost)
+ get_compatible_ssl_context_factory(cert_file),
+ pool=self._pool,
+ connectTimeout=self._timeout)
+ self._semaphore = defer.DeferredSemaphore(
+ self._pool.maxPersistentPerHost)
- def request(self, url, method='GET', body=None, headers={}):
+ def _createPool(self, maxPersistentPerHost=10, persistent=True):
+ pool = _HTTPConnectionPool(reactor, persistent, self._timeout)
+ pool.maxPersistentPerHost = maxPersistentPerHost
+ return pool
+
+ def _request(self, url, method, body, headers):
"""
Perform an HTTP request.
@@ -104,51 +115,208 @@ class HTTPClient(object):
:rtype: twisted.internet.defer.Deferred
"""
if body:
- body = HTTPClient.StringBodyProducer(body)
- d = self._semaphore.run(self._agent.request,
- method, url, headers=Headers(headers),
- bodyProducer=body)
+ body = _StringBodyProducer(body)
+ d = self._agent.request(
+ method, url, headers=Headers(headers), bodyProducer=body)
d.addCallback(readBody)
return d
+ def request(self, url, method='GET', body=None, headers={}):
+ """
+ Perform an HTTP request, but limit the maximum amount of concurrent
+ connections.
+
+ :param url: The URL for the request.
+ :type url: str
+ :param method: The HTTP method of the request.
+ :type method: str
+ :param body: The body of the request, if any.
+ :type body: str
+ :param headers: The headers of the request.
+ :type headers: dict
+
+ :return: A deferred that fires with the body of the request.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ return self._semaphore.run(self._request, url, method, body, headers)
+
def close(self):
"""
Close any cached connections.
"""
self._pool.closeCachedConnections()
- class StringBodyProducer(object):
+#
+# An IBodyProducer to write the body of an HTTP request as a string.
+#
+
+class _StringBodyProducer(object):
+ """
+ A producer that writes the body of a request to a consumer.
+ """
+
+ implements(IBodyProducer)
+
+ def __init__(self, body):
+ """
+ Initialize the string produer.
+
+ :param body: The body of the request.
+ :type body: str
+ """
+ self.body = body
+ self.length = len(body)
+
+ def startProducing(self, consumer):
+ """
+ Write the body to the consumer.
+
+ :param consumer: Any IConsumer provider.
+ :type consumer: twisted.internet.interfaces.IConsumer
+
+ :return: A successful deferred.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ consumer.write(self.body)
+ return defer.succeed(None)
+
+ def pauseProducing(self):
+ pass
+
+ def stopProducing(self):
+ pass
+
+
+#
+# Patched twisted.web classes
+#
+
+class _HTTP11ClientProtocol(HTTP11ClientProtocol):
+ """
+ A timeout-able HTTP 1.1 client protocol, that is instantiated by the
+ _HTTP11ClientFactory below.
+ """
+
+ def __init__(self, quiescentCallback, timeout):
+ """
+ Initialize the protocol.
+
+ :param quiescentCallback:
+ :type quiescentCallback: callable
+ :param timeout: A timeout, in seconds, for requests made by this
+ protocol.
+ :type timeout: float
+ """
+ HTTP11ClientProtocol.__init__(self, quiescentCallback)
+ self._timeout = timeout
+ self._timeoutCall = None
+
+ def request(self, request):
+ """
+ Issue request over self.transport and return a Deferred which
+ will fire with a Response instance or an error.
+
+ :param request: The object defining the parameters of the request to
+ issue.
+ :type request: twisted.web._newclient.Request
+
+ :return: A deferred which fires after the request has finished.
+ :rtype: Deferred
+ """
+ d = HTTP11ClientProtocol.request(self, request)
+ if self._timeout:
+ self._last_buffer_len = 0
+ timeoutCall = reactor.callLater(
+ self._timeout, self._doTimeout, request)
+ self._timeoutCall = timeoutCall
+ return d
+
+ def _doTimeout(self, request):
"""
- A producer that writes the body of a request to a consumer.
+ Give up the request because of a timeout.
+
+ :param request: The object defining the parameters of the request to
+ issue.
+ :type request: twisted.web._newclient.Request
+ """
+ self._giveUp(
+ failure.Failure(
+ defer.TimeoutError(
+ "Getting %s took longer than %s seconds."
+ % (request.absoluteURI, self._timeout))))
+
+ def _cancelTimeout(self):
"""
+ Cancel the request timeout, when it's finished.
+ """
+ if self._timeoutCall.active():
+ self._timeoutCall.cancel()
+ self._timeoutCall = None
+
+ def _finishResponse_WAITING(self, rest):
+ """
+ Cancel the timeout when finished receiving the response.
+ """
+ self._cancelTimeout()
+ HTTP11ClientProtocol._finishResponse_WAITING(self, rest)
- implements(IBodyProducer)
+ def _finishResponse_TRANSMITTING(self, rest):
+ """
+ Cancel the timeout when finished receiving the response.
+ """
+ self._cancelTimeout()
+ HTTP11ClientProtocol._finishResponse_TRANSMITTING(self, rest)
- def __init__(self, body):
- """
- Initialize the string produer.
+ def dataReceived(self, bytes):
+ """
+ Receive some data and extend the timeout period of this request.
+
+ :param bytes: A string of indeterminate length.
+ :type bytes: str
+ """
+ HTTP11ClientProtocol.dataReceived(self, bytes)
+ if self._timeoutCall and self._timeoutCall.active():
+ self._timeoutCall.reset(self._timeout)
- :param body: The body of the request.
- :type body: str
- """
- self.body = body
- self.length = len(body)
- def startProducing(self, consumer):
- """
- Write the body to the consumer.
+class _HTTP11ClientFactory(HTTP11ClientFactory):
+ """
+ A timeout-able HTTP 1.1 client protocol factory.
+ """
- :param consumer: Any IConsumer provider.
- :type consumer: twisted.internet.interfaces.IConsumer
+ def __init__(self, quiescentCallback, timeout):
+ """
+ :param quiescentCallback: The quiescent callback to be passed to
+ protocol instances, used to return them to
+ the connection pool.
+ :type quiescentCallback: callable(Protocol)
+ :param timeout: The timeout, in seconds, for requests made by
+ protocols created by this factory.
+ :type timeout: float
+ """
+ HTTP11ClientFactory.__init__(self, quiescentCallback)
+ self._timeout = timeout
+
+ def buildProtocol(self, _):
+ """
+ Build the HTTP 1.1 client protocol.
+ """
+ return _HTTP11ClientProtocol(self._quiescentCallback, self._timeout)
+
+
+class _HTTPConnectionPool(HTTPConnectionPool):
+ """
+ A timeout-able HTTP connection pool.
+ """
- :return: A successful deferred.
- :rtype: twisted.internet.defer.Deferred
- """
- consumer.write(self.body)
- return succeed(None)
+ _factory = _HTTP11ClientFactory
- def pauseProducing(self):
- pass
+ def __init__(self, reactor, persistent, timeout):
+ HTTPConnectionPool.__init__(self, reactor, persistent=persistent)
+ self._timeout = timeout
- def stopProducing(self):
- pass
+ def _newConnection(self, key, endpoint):
+ def quiescentCallback(protocol):
+ self._putConnection(key, protocol)
+ factory = self._factory(quiescentCallback, timeout=self._timeout)
+ return endpoint.connect(factory)