diff options
Diffstat (limited to 'src/leap')
| -rw-r--r-- | src/leap/common/http.py | 260 | 
1 files changed, 214 insertions, 46 deletions
| diff --git a/src/leap/common/http.py b/src/leap/common/http.py index 8d22f2c..c93e65b 100644 --- a/src/leap/common/http.py +++ b/src/leap/common/http.py @@ -36,21 +36,24 @@ from zope.interface import implements  from twisted.internet import reactor  from twisted.internet import defer -from twisted.internet.defer import succeed +from twisted.python import failure  from twisted.web.client import Agent  from twisted.web.client import HTTPConnectionPool +from twisted.web.client import _HTTP11ClientFactory as HTTP11ClientFactory  from twisted.web.client import readBody  from twisted.web.http_headers import Headers  from twisted.web.iweb import IBodyProducer +from twisted.web._newclient import HTTP11ClientProtocol -def createPool(maxPersistentPerHost=10, persistent=True): -    pool = HTTPConnectionPool(reactor, persistent) -    pool.maxPersistentPerHost = maxPersistentPerHost -    return pool +__all__ = ["HTTPClient"] -_pool = createPool() + +# A default HTTP timeout is used for 2 distinct purposes: +#   1. as HTTP connection timeout, prior to connection estabilshment. +#   2. as data reception timeout, after the connection has been established. +DEFAULT_HTTP_TIMEOUT = 30  # seconds  class HTTPClient(object): @@ -66,28 +69,36 @@ class HTTPClient(object):      in order to avoid resource abuse on huge requests batches.      """ -    def __init__(self, cert_file=None, pool=_pool): +    def __init__(self, cert_file=None, timeout=DEFAULT_HTTP_TIMEOUT):          """          Init the HTTP client          :param cert_file: The path to the certificate file, if None given the                            system's CAs will be used.          :type cert_file: str -        :param pool: An optional dedicated connection pool to override the -                     default shared one. -        :type pool: HTTPConnectionPool +        :param timeout: The amount of time that this Agent will wait for the +                        peer to accept a connection and for each request to be +                        finished. If a pool is passed, then this argument is +                        ignored. +        :type timeout: float          """ -        policy = get_compatible_ssl_context_factory(cert_file) - -        self._pool = pool +        self._timeout = timeout +        self._pool = self._createPool()          self._agent = Agent(              reactor, -            policy, -            pool=pool) -        self._semaphore = defer.DeferredSemaphore(pool.maxPersistentPerHost) +            get_compatible_ssl_context_factory(cert_file), +            pool=self._pool, +            connectTimeout=self._timeout) +        self._semaphore = defer.DeferredSemaphore( +            self._pool.maxPersistentPerHost) -    def request(self, url, method='GET', body=None, headers={}): +    def _createPool(self, maxPersistentPerHost=10, persistent=True): +        pool = _HTTPConnectionPool(reactor, persistent, self._timeout) +        pool.maxPersistentPerHost = maxPersistentPerHost +        return pool + +    def _request(self, url, method, body, headers):          """          Perform an HTTP request. @@ -104,51 +115,208 @@ class HTTPClient(object):          :rtype: twisted.internet.defer.Deferred          """          if body: -            body = HTTPClient.StringBodyProducer(body) -        d = self._semaphore.run(self._agent.request, -                                method, url, headers=Headers(headers), -                                bodyProducer=body) +            body = _StringBodyProducer(body) +        d = self._agent.request( +            method, url, headers=Headers(headers), bodyProducer=body)          d.addCallback(readBody)          return d +    def request(self, url, method='GET', body=None, headers={}): +        """ +        Perform an HTTP request, but limit the maximum amount of concurrent +        connections. + +        :param url: The URL for the request. +        :type url: str +        :param method: The HTTP method of the request. +        :type method: str +        :param body: The body of the request, if any. +        :type body: str +        :param headers: The headers of the request. +        :type headers: dict + +        :return: A deferred that fires with the body of the request. +        :rtype: twisted.internet.defer.Deferred +        """ +        return self._semaphore.run(self._request, url, method, body, headers) +      def close(self):          """          Close any cached connections.          """          self._pool.closeCachedConnections() -    class StringBodyProducer(object): +# +# An IBodyProducer to write the body of an HTTP request as a string. +# + +class _StringBodyProducer(object): +    """ +    A producer that writes the body of a request to a consumer. +    """ + +    implements(IBodyProducer) + +    def __init__(self, body): +        """ +        Initialize the string produer. + +        :param body: The body of the request. +        :type body: str +        """ +        self.body = body +        self.length = len(body) + +    def startProducing(self, consumer): +        """ +        Write the body to the consumer. + +        :param consumer: Any IConsumer provider. +        :type consumer: twisted.internet.interfaces.IConsumer + +        :return: A successful deferred. +        :rtype: twisted.internet.defer.Deferred +        """ +        consumer.write(self.body) +        return defer.succeed(None) + +    def pauseProducing(self): +        pass + +    def stopProducing(self): +        pass + + +# +# Patched twisted.web classes +# + +class _HTTP11ClientProtocol(HTTP11ClientProtocol): +    """ +    A timeout-able HTTP 1.1 client protocol, that is instantiated by the +    _HTTP11ClientFactory below. +    """ + +    def __init__(self, quiescentCallback, timeout): +        """ +        Initialize the protocol. + +        :param quiescentCallback: +        :type quiescentCallback: callable +        :param timeout: A timeout, in seconds, for requests made by this +                        protocol. +        :type timeout: float +        """ +        HTTP11ClientProtocol.__init__(self, quiescentCallback) +        self._timeout = timeout +        self._timeoutCall = None + +    def request(self, request): +        """ +        Issue request over self.transport and return a Deferred which +        will fire with a Response instance or an error. + +        :param request: The object defining the parameters of the request to +                        issue. +        :type request: twisted.web._newclient.Request + +        :return: A deferred which fires after the request has finished. +        :rtype: Deferred +        """ +        d = HTTP11ClientProtocol.request(self, request) +        if self._timeout: +            self._last_buffer_len = 0 +            timeoutCall = reactor.callLater( +                self._timeout, self._doTimeout, request) +            self._timeoutCall = timeoutCall +        return d + +    def _doTimeout(self, request):          """ -        A producer that writes the body of a request to a consumer. +        Give up the request because of a timeout. + +        :param request: The object defining the parameters of the request to +                        issue. +        :type request: twisted.web._newclient.Request +        """ +        self._giveUp( +            failure.Failure( +                defer.TimeoutError( +                    "Getting %s took longer than %s seconds." +                    % (request.absoluteURI, self._timeout)))) + +    def _cancelTimeout(self):          """ +        Cancel the request timeout, when it's finished. +        """ +        if self._timeoutCall.active(): +            self._timeoutCall.cancel() +            self._timeoutCall = None + +    def _finishResponse_WAITING(self, rest): +       """ +       Cancel the timeout when finished receiving the response. +       """ +       self._cancelTimeout() +       HTTP11ClientProtocol._finishResponse_WAITING(self, rest) -        implements(IBodyProducer) +    def _finishResponse_TRANSMITTING(self, rest): +       """ +       Cancel the timeout when finished receiving the response. +       """ +       self._cancelTimeout() +       HTTP11ClientProtocol._finishResponse_TRANSMITTING(self, rest) -        def __init__(self, body): -            """ -            Initialize the string produer. +    def dataReceived(self, bytes): +        """ +        Receive some data and extend the timeout period of this request. + +        :param bytes: A string of indeterminate length. +        :type bytes: str +        """ +        HTTP11ClientProtocol.dataReceived(self, bytes) +        if self._timeoutCall and self._timeoutCall.active(): +            self._timeoutCall.reset(self._timeout) -            :param body: The body of the request. -            :type body: str -            """ -            self.body = body -            self.length = len(body) -        def startProducing(self, consumer): -            """ -            Write the body to the consumer. +class _HTTP11ClientFactory(HTTP11ClientFactory): +    """ +    A timeout-able HTTP 1.1 client protocol factory. +    """ -            :param consumer: Any IConsumer provider. -            :type consumer: twisted.internet.interfaces.IConsumer +    def __init__(self, quiescentCallback, timeout): +        """ +        :param quiescentCallback: The quiescent callback to be passed to +                                  protocol instances, used to return them to +                                  the connection pool. +        :type quiescentCallback: callable(Protocol) +        :param timeout: The timeout, in seconds, for requests made by +                        protocols created by this factory. +        :type timeout: float +        """ +        HTTP11ClientFactory.__init__(self, quiescentCallback) +        self._timeout = timeout + +    def buildProtocol(self, _): +        """ +        Build the HTTP 1.1 client protocol. +        """ +        return _HTTP11ClientProtocol(self._quiescentCallback, self._timeout) + + +class _HTTPConnectionPool(HTTPConnectionPool): +    """ +    A timeout-able HTTP connection pool. +    """ -            :return: A successful deferred. -            :rtype: twisted.internet.defer.Deferred -            """ -            consumer.write(self.body) -            return succeed(None) +    _factory = _HTTP11ClientFactory -        def pauseProducing(self): -            pass +    def __init__(self, reactor, persistent, timeout): +        HTTPConnectionPool.__init__(self, reactor, persistent=persistent) +        self._timeout = timeout -        def stopProducing(self): -            pass +    def _newConnection(self, key, endpoint): +        def quiescentCallback(protocol): +            self._putConnection(key, protocol) +        factory = self._factory(quiescentCallback, timeout=self._timeout) +        return endpoint.connect(factory) | 
