From 31757168f6ad243ec83ba52b2e022298ba08f8d1 Mon Sep 17 00:00:00 2001 From: drebs Date: Mon, 25 May 2015 11:46:24 -0300 Subject: [feature] add pool of http/https connections Instead of opening one TCP connection for each HTTP request, we want to reuse connections. Also, we need to be able to verify SSL certificates. This commit implements both features in the twisted http client sync. --- client/src/leap/soledad/client/api.py | 4 +- client/src/leap/soledad/client/http_client.py | 194 ++++++++++++++++++++++++++ client/src/leap/soledad/client/http_target.py | 53 ++++--- client/src/leap/soledad/client/sqlcipher.py | 13 +- 4 files changed, 230 insertions(+), 34 deletions(-) create mode 100644 client/src/leap/soledad/client/http_client.py (limited to 'client/src/leap') diff --git a/client/src/leap/soledad/client/api.py b/client/src/leap/soledad/client/api.py index ffd95f6c..91e0a4a0 100644 --- a/client/src/leap/soledad/client/api.py +++ b/client/src/leap/soledad/client/api.py @@ -272,7 +272,8 @@ class Soledad(object): replica_uid = self._dbpool.replica_uid self._dbsyncer = SQLCipherU1DBSync( self._sqlcipher_opts, self._crypto, replica_uid, - self._defer_encryption) + SOLEDAD_CERT, + defer_encryption=self._defer_encryption) # # Closing methods @@ -630,6 +631,7 @@ class Soledad(object): Whether to defer decryption of documents, or do it inline while syncing. :type defer_decryption: bool + :return: A deferred whose callback will be invoked with the local generation before the synchronization was performed. :rtype: twisted.internet.defer.Deferred diff --git a/client/src/leap/soledad/client/http_client.py b/client/src/leap/soledad/client/http_client.py new file mode 100644 index 00000000..b08d199e --- /dev/null +++ b/client/src/leap/soledad/client/http_client.py @@ -0,0 +1,194 @@ +# -*- coding: utf-8 -*- +# http_client.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + + +""" +Twisted HTTP/HTTPS client. +""" + +import os + +from zope.interface import implements + +from OpenSSL.crypto import load_certificate +from OpenSSL.crypto import FILETYPE_PEM + +from twisted.internet import reactor +from twisted.internet.ssl import ClientContextFactory +from twisted.internet.ssl import CertificateOptions +from twisted.internet.defer import succeed + +from twisted.web.client import Agent +from twisted.web.client import HTTPConnectionPool +from twisted.web.client import readBody +from twisted.web.http_headers import Headers +from twisted.web.error import Error +from twisted.web.iweb import IBodyProducer + + +from leap.soledad.common.errors import InvalidAuthTokenError + + +# +# Setup a pool of connections +# + +_pool = HTTPConnectionPool(reactor, persistent=True) +_pool.maxPersistentPerHost = 10 +_agent = None + +# if we ever want to trust the system's CAs, we should use an agent like this: +# from twisted.web.client import BrowserLikePolicyForHTTPS +# _agent = Agent(reactor, BrowserLikePolicyForHTTPS(), pool=_pool) + + +# +# SSL/TLS certificate configuration +# + +def configure_certificate(cert_file): + """ + Configure an agent that verifies server certificates against a CA cert + file. + + :param cert_file: The path to the certificate file. + :type cert_file: str + """ + global _agent + cert = _load_cert(cert_file) + _agent = Agent( + reactor, + SoledadClientContextFactory(cert), + pool=_pool) + + +def _load_cert(cert_file): + """ + Load a X509 certificate from a file. + + :param cert_file: The path to the certificate file. + :type cert_file: str + + :return: The X509 certificate. + :rtype: OpenSSL.crypto.X509 + """ + if os.path.exists(cert_file): + with open(cert_file) as f: + data = f.read() + return load_certificate(FILETYPE_PEM, data) + + +class SoledadClientContextFactory(ClientContextFactory): + """ + A context factory that will verify the server's certificate against a + given CA certificate. + """ + + def __init__(self, cacert): + """ + Initialize the context factory. + + :param cacert: The CA certificate. + :type cacert: OpenSSL.crypto.X509 + """ + self._cacert = cacert + + def getContext(self, hostname, port): + opts = CertificateOptions(verify=True, caCerts=[self._cacert]) + return opts.getContext() + + +# +# HTTP request facilities +# + +def _unauth_to_invalid_token_error(failure): + """ + An errback to translate unauthorized errors to our own invalid token + class. + + :param failure: The original failure. + :type failure: twisted.python.failure.Failure + + :return: Either the original failure or an invalid auth token error. + :rtype: twisted.python.failure.Failure + """ + failure.trap(Error) + if failure.getErrorMessage() == "401 Unauthorized": + raise InvalidAuthTokenError + return failure + + +class StringBodyProducer(object): + """ + A producer that writes the body of a request to a consumer. + """ + + implements(IBodyProducer) + + def __init__(self, body): + """ + Initialize the string produer. + + :param body: The body of the request. + :type body: str + """ + self.body = body + self.length = len(body) + + def startProducing(self, consumer): + """ + Write the body to the consumer. + + :param consumer: Any IConsumer provider. + :type consumer: twisted.internet.interfaces.IConsumer + + :return: A successful deferred. + :rtype: twisted.internet.defer.Deferred + """ + consumer.write(self.body) + return succeed(None) + + def pauseProducing(self): + pass + + def stopProducing(self): + pass + + +def httpRequest(url, method='GET', body=None, headers={}): + """ + Perform an HTTP request. + + :param url: The URL for the request. + :type url: str + :param method: The HTTP method of the request. + :type method: str + :param body: The body of the request, if any. + :type body: str + :param headers: The headers of the request. + :type headers: dict + + :return: A deferred that fires with the body of the request. + :rtype: twisted.internet.defer.Deferred + """ + if body: + body = StringBodyProducer(body) + d = _agent.request( + method, url, headers=Headers(headers), bodyProducer=body) + d.addCallbacks(readBody, _unauth_to_invalid_token_error) + return d diff --git a/client/src/leap/soledad/client/http_target.py b/client/src/leap/soledad/client/http_target.py index 75af9cf7..dc6c0e0a 100644 --- a/client/src/leap/soledad/client/http_target.py +++ b/client/src/leap/soledad/client/http_target.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# target.py +# http_target.py # Copyright (C) 2015 LEAP # # This program is free software: you can redistribute it and/or modify @@ -21,6 +21,7 @@ A U1DB backend for encrypting data before sending to server and decrypting after receiving. """ + import json import base64 import logging @@ -30,15 +31,12 @@ from functools import partial from twisted.internet import defer from twisted.internet import reactor -from twisted.web.client import getPage -from twisted.web.error import Error from u1db import errors from u1db import SyncTarget from u1db.remote import utils from leap.soledad.common.document import SoledadDocument -from leap.soledad.common.errors import InvalidAuthTokenError from leap.soledad.client.crypto import is_symmetrically_encrypted from leap.soledad.client.crypto import encrypt_doc @@ -47,24 +45,13 @@ from leap.soledad.client.events import SOLEDAD_SYNC_SEND_STATUS from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS from leap.soledad.client.events import signal from leap.soledad.client.encdecpool import SyncDecrypterPool +from leap.soledad.client.http_client import httpRequest +from leap.soledad.client.http_client import configure_certificate logger = logging.getLogger(__name__) -def _unauth_to_invalid_token_error(failure): - failure.trap(Error) - if failure.getErrorMessage() == "401 Unauthorized": - raise InvalidAuthTokenError - return failure - - -def getSoledadPage(*args, **kwargs): - d = getPage(*args, **kwargs) - d.addErrback(_unauth_to_invalid_token_error) - return d - - class SoledadHTTPSyncTarget(SyncTarget): """ A SyncTarget that encrypts data before sending and decrypts data after @@ -76,7 +63,7 @@ class SoledadHTTPSyncTarget(SyncTarget): written to the main database. """ - def __init__(self, url, source_replica_uid, creds, crypto, + def __init__(self, url, source_replica_uid, creds, crypto, cert_file, sync_db=None, sync_enc_pool=None): """ Initialize the sync target. @@ -93,12 +80,19 @@ class SoledadHTTPSyncTarget(SyncTarget): :param crypto: An instance of SoledadCrypto so we can encrypt/decrypt document contents when syncing. :type crypto: soledad.crypto.SoledadCrypto + :param cert_file: Path to the certificate of the ca used to validate + the SSL certificate used by the remote soledad + server. + :type cert_file: str :param sync_db: Optional. handler for the db with the symmetric encryption of the syncing documents. If None, encryption will be done in-place, instead of retreiving it from the dedicated database. :type sync_db: Sqlite handler + :param verify_ssl: Whether we should perform SSL server certificate + verification. + :type verify_ssl: bool """ if url.endswith("/"): url = url[:-1] @@ -113,6 +107,7 @@ class SoledadHTTPSyncTarget(SyncTarget): # asynchronous encryption/decryption attributes self._decryption_callback = None self._sync_decr_pool = None + configure_certificate(cert_file) def set_creds(self, creds): """ @@ -125,7 +120,7 @@ class SoledadHTTPSyncTarget(SyncTarget): token = creds['token']['token'] auth = '%s:%s' % (uuid, token) b64_token = base64.b64encode(auth) - self._auth_header = {'Authorization': 'Token %s' % b64_token} + self._auth_header = {'Authorization': ['Token %s' % b64_token]} @property def _defer_encryption(self): @@ -153,7 +148,7 @@ class SoledadHTTPSyncTarget(SyncTarget): source_replica_last_known_transaction_id) :rtype: twisted.internet.defer.Deferred """ - raw = yield getSoledadPage(self._url, headers=self._auth_header) + raw = yield httpRequest(self._url, headers=self._auth_header) res = json.loads(raw) defer.returnValue([ res['target_replica_uid'], @@ -197,12 +192,12 @@ class SoledadHTTPSyncTarget(SyncTarget): 'transaction_id': source_replica_transaction_id }) headers = self._auth_header.copy() - headers.update({'content-type': 'application/json'}) - return getSoledadPage( + headers.update({'content-type': ['application/json']}) + return httpRequest( self._url, method='PUT', headers=headers, - postdata=data) + body=data) @defer.inlineCallbacks def sync_exchange(self, docs_by_generation, source_replica_uid, @@ -295,7 +290,7 @@ class SoledadHTTPSyncTarget(SyncTarget): defer.returnValue([None, None]) headers = self._auth_header.copy() - headers.update({'content-type': 'application/x-soledad-sync-put'}) + headers.update({'content-type': ['application/x-soledad-sync-put']}) # add remote replica metadata to the request first_entries = ['['] self._prepare( @@ -335,11 +330,11 @@ class SoledadHTTPSyncTarget(SyncTarget): doc_idx=doc_idx) entries.append('\r\n]') data = ''.join(entries) - result = yield getSoledadPage( + result = yield httpRequest( self._url, method='POST', headers=headers, - postdata=data) + body=data) defer.returnValue(result) def _encrypt_doc(self, doc): @@ -385,7 +380,7 @@ class SoledadHTTPSyncTarget(SyncTarget): self._setup_sync_decr_pool() headers = self._auth_header.copy() - headers.update({'content-type': 'application/x-soledad-sync-get'}) + headers.update({'content-type': ['application/x-soledad-sync-get']}) #--------------------------------------------------------------------- # maybe receive the first document @@ -486,11 +481,11 @@ class SoledadHTTPSyncTarget(SyncTarget): ',', entries, received=received) entries.append('\r\n]') # send headers - return getSoledadPage( + return httpRequest( self._url, method='POST', headers=headers, - postdata=''.join(entries)) + body=''.join(entries)) def _insert_received_doc(self, idx, total, response): """ diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py index 96732325..ed9e95dc 100644 --- a/client/src/leap/soledad/client/sqlcipher.py +++ b/client/src/leap/soledad/client/sqlcipher.py @@ -434,13 +434,14 @@ class SQLCipherU1DBSync(SQLCipherDatabase): """ syncing_lock = defaultdict(threading.Lock) - def __init__(self, opts, soledad_crypto, replica_uid, + def __init__(self, opts, soledad_crypto, replica_uid, cert_file, defer_encryption=False): self._opts = opts self._path = opts.path self._crypto = soledad_crypto self.__replica_uid = replica_uid + self._cert_file = cert_file self._sync_db_key = opts.sync_db_key self._sync_db = None @@ -570,9 +571,8 @@ class SQLCipherU1DBSync(SQLCipherDatabase): :param url: The url of the target replica to sync with. :type url: str - :param creds: - optional dictionary giving credentials. - to authorize the operation with the server. + :param creds: optional dictionary giving credentials to authorize the + operation with the server. :type creds: dict :param defer_decryption: Whether to defer the decryption process using the intermediate @@ -599,6 +599,10 @@ class SQLCipherU1DBSync(SQLCipherDatabase): one instance synchronizing the same database replica at the same time. Because of that, this method blocks until the syncing lock can be acquired. + + :param creds: optional dictionary giving credentials to authorize the + operation with the server. + :type creds: dict """ with self.syncing_lock[self._path]: syncer = self._get_syncer(url, creds=creds) @@ -640,6 +644,7 @@ class SQLCipherU1DBSync(SQLCipherDatabase): self._replica_uid, creds=creds, crypto=self._crypto, + cert_file=self._cert_file, sync_db=self._sync_db, sync_enc_pool=self._sync_enc_pool)) self._syncers[url] = (h, syncer) -- cgit v1.2.3