diff options
-rw-r--r-- | changes/bug_7234_add-http-request-timeout | 1 | ||||
-rw-r--r-- | src/leap/common/http.py | 260 |
2 files changed, 215 insertions, 46 deletions
diff --git a/changes/bug_7234_add-http-request-timeout b/changes/bug_7234_add-http-request-timeout new file mode 100644 index 0000000..d18b28b --- /dev/null +++ b/changes/bug_7234_add-http-request-timeout @@ -0,0 +1 @@ + o Add http request timeout. Related to #7234. diff --git a/src/leap/common/http.py b/src/leap/common/http.py index 8d22f2c..c93e65b 100644 --- a/src/leap/common/http.py +++ b/src/leap/common/http.py @@ -36,21 +36,24 @@ from zope.interface import implements from twisted.internet import reactor from twisted.internet import defer -from twisted.internet.defer import succeed +from twisted.python import failure from twisted.web.client import Agent from twisted.web.client import HTTPConnectionPool +from twisted.web.client import _HTTP11ClientFactory as HTTP11ClientFactory from twisted.web.client import readBody from twisted.web.http_headers import Headers from twisted.web.iweb import IBodyProducer +from twisted.web._newclient import HTTP11ClientProtocol -def createPool(maxPersistentPerHost=10, persistent=True): - pool = HTTPConnectionPool(reactor, persistent) - pool.maxPersistentPerHost = maxPersistentPerHost - return pool +__all__ = ["HTTPClient"] -_pool = createPool() + +# A default HTTP timeout is used for 2 distinct purposes: +# 1. as HTTP connection timeout, prior to connection estabilshment. +# 2. as data reception timeout, after the connection has been established. +DEFAULT_HTTP_TIMEOUT = 30 # seconds class HTTPClient(object): @@ -66,28 +69,36 @@ class HTTPClient(object): in order to avoid resource abuse on huge requests batches. """ - def __init__(self, cert_file=None, pool=_pool): + def __init__(self, cert_file=None, timeout=DEFAULT_HTTP_TIMEOUT): """ Init the HTTP client :param cert_file: The path to the certificate file, if None given the system's CAs will be used. :type cert_file: str - :param pool: An optional dedicated connection pool to override the - default shared one. - :type pool: HTTPConnectionPool + :param timeout: The amount of time that this Agent will wait for the + peer to accept a connection and for each request to be + finished. If a pool is passed, then this argument is + ignored. + :type timeout: float """ - policy = get_compatible_ssl_context_factory(cert_file) - - self._pool = pool + self._timeout = timeout + self._pool = self._createPool() self._agent = Agent( reactor, - policy, - pool=pool) - self._semaphore = defer.DeferredSemaphore(pool.maxPersistentPerHost) + get_compatible_ssl_context_factory(cert_file), + pool=self._pool, + connectTimeout=self._timeout) + self._semaphore = defer.DeferredSemaphore( + self._pool.maxPersistentPerHost) - def request(self, url, method='GET', body=None, headers={}): + def _createPool(self, maxPersistentPerHost=10, persistent=True): + pool = _HTTPConnectionPool(reactor, persistent, self._timeout) + pool.maxPersistentPerHost = maxPersistentPerHost + return pool + + def _request(self, url, method, body, headers): """ Perform an HTTP request. @@ -104,51 +115,208 @@ class HTTPClient(object): :rtype: twisted.internet.defer.Deferred """ if body: - body = HTTPClient.StringBodyProducer(body) - d = self._semaphore.run(self._agent.request, - method, url, headers=Headers(headers), - bodyProducer=body) + body = _StringBodyProducer(body) + d = self._agent.request( + method, url, headers=Headers(headers), bodyProducer=body) d.addCallback(readBody) return d + def request(self, url, method='GET', body=None, headers={}): + """ + Perform an HTTP request, but limit the maximum amount of concurrent + connections. + + :param url: The URL for the request. + :type url: str + :param method: The HTTP method of the request. + :type method: str + :param body: The body of the request, if any. + :type body: str + :param headers: The headers of the request. + :type headers: dict + + :return: A deferred that fires with the body of the request. + :rtype: twisted.internet.defer.Deferred + """ + return self._semaphore.run(self._request, url, method, body, headers) + def close(self): """ Close any cached connections. """ self._pool.closeCachedConnections() - class StringBodyProducer(object): +# +# An IBodyProducer to write the body of an HTTP request as a string. +# + +class _StringBodyProducer(object): + """ + A producer that writes the body of a request to a consumer. + """ + + implements(IBodyProducer) + + def __init__(self, body): + """ + Initialize the string produer. + + :param body: The body of the request. + :type body: str + """ + self.body = body + self.length = len(body) + + def startProducing(self, consumer): + """ + Write the body to the consumer. + + :param consumer: Any IConsumer provider. + :type consumer: twisted.internet.interfaces.IConsumer + + :return: A successful deferred. + :rtype: twisted.internet.defer.Deferred + """ + consumer.write(self.body) + return defer.succeed(None) + + def pauseProducing(self): + pass + + def stopProducing(self): + pass + + +# +# Patched twisted.web classes +# + +class _HTTP11ClientProtocol(HTTP11ClientProtocol): + """ + A timeout-able HTTP 1.1 client protocol, that is instantiated by the + _HTTP11ClientFactory below. + """ + + def __init__(self, quiescentCallback, timeout): + """ + Initialize the protocol. + + :param quiescentCallback: + :type quiescentCallback: callable + :param timeout: A timeout, in seconds, for requests made by this + protocol. + :type timeout: float + """ + HTTP11ClientProtocol.__init__(self, quiescentCallback) + self._timeout = timeout + self._timeoutCall = None + + def request(self, request): + """ + Issue request over self.transport and return a Deferred which + will fire with a Response instance or an error. + + :param request: The object defining the parameters of the request to + issue. + :type request: twisted.web._newclient.Request + + :return: A deferred which fires after the request has finished. + :rtype: Deferred + """ + d = HTTP11ClientProtocol.request(self, request) + if self._timeout: + self._last_buffer_len = 0 + timeoutCall = reactor.callLater( + self._timeout, self._doTimeout, request) + self._timeoutCall = timeoutCall + return d + + def _doTimeout(self, request): """ - A producer that writes the body of a request to a consumer. + Give up the request because of a timeout. + + :param request: The object defining the parameters of the request to + issue. + :type request: twisted.web._newclient.Request + """ + self._giveUp( + failure.Failure( + defer.TimeoutError( + "Getting %s took longer than %s seconds." + % (request.absoluteURI, self._timeout)))) + + def _cancelTimeout(self): """ + Cancel the request timeout, when it's finished. + """ + if self._timeoutCall.active(): + self._timeoutCall.cancel() + self._timeoutCall = None + + def _finishResponse_WAITING(self, rest): + """ + Cancel the timeout when finished receiving the response. + """ + self._cancelTimeout() + HTTP11ClientProtocol._finishResponse_WAITING(self, rest) - implements(IBodyProducer) + def _finishResponse_TRANSMITTING(self, rest): + """ + Cancel the timeout when finished receiving the response. + """ + self._cancelTimeout() + HTTP11ClientProtocol._finishResponse_TRANSMITTING(self, rest) - def __init__(self, body): - """ - Initialize the string produer. + def dataReceived(self, bytes): + """ + Receive some data and extend the timeout period of this request. + + :param bytes: A string of indeterminate length. + :type bytes: str + """ + HTTP11ClientProtocol.dataReceived(self, bytes) + if self._timeoutCall and self._timeoutCall.active(): + self._timeoutCall.reset(self._timeout) - :param body: The body of the request. - :type body: str - """ - self.body = body - self.length = len(body) - def startProducing(self, consumer): - """ - Write the body to the consumer. +class _HTTP11ClientFactory(HTTP11ClientFactory): + """ + A timeout-able HTTP 1.1 client protocol factory. + """ - :param consumer: Any IConsumer provider. - :type consumer: twisted.internet.interfaces.IConsumer + def __init__(self, quiescentCallback, timeout): + """ + :param quiescentCallback: The quiescent callback to be passed to + protocol instances, used to return them to + the connection pool. + :type quiescentCallback: callable(Protocol) + :param timeout: The timeout, in seconds, for requests made by + protocols created by this factory. + :type timeout: float + """ + HTTP11ClientFactory.__init__(self, quiescentCallback) + self._timeout = timeout + + def buildProtocol(self, _): + """ + Build the HTTP 1.1 client protocol. + """ + return _HTTP11ClientProtocol(self._quiescentCallback, self._timeout) + + +class _HTTPConnectionPool(HTTPConnectionPool): + """ + A timeout-able HTTP connection pool. + """ - :return: A successful deferred. - :rtype: twisted.internet.defer.Deferred - """ - consumer.write(self.body) - return succeed(None) + _factory = _HTTP11ClientFactory - def pauseProducing(self): - pass + def __init__(self, reactor, persistent, timeout): + HTTPConnectionPool.__init__(self, reactor, persistent=persistent) + self._timeout = timeout - def stopProducing(self): - pass + def _newConnection(self, key, endpoint): + def quiescentCallback(protocol): + self._putConnection(key, protocol) + factory = self._factory(quiescentCallback, timeout=self._timeout) + return endpoint.connect(factory) |