diff options
Diffstat (limited to 'client/src/leap/soledad')
-rw-r--r-- | client/src/leap/soledad/client/__init__.py | 68 | ||||
-rw-r--r-- | client/src/leap/soledad/client/sqlcipher.py | 112 |
2 files changed, 146 insertions, 34 deletions
diff --git a/client/src/leap/soledad/client/__init__.py b/client/src/leap/soledad/client/__init__.py index a0b3f45a..46e3cd5f 100644 --- a/client/src/leap/soledad/client/__init__.py +++ b/client/src/leap/soledad/client/__init__.py @@ -34,6 +34,8 @@ import urlparse import hmac from hashlib import sha256 +from threading import Lock +from collections import defaultdict try: import cchardet as chardet @@ -52,6 +54,7 @@ from leap.soledad.common.errors import ( InvalidTokenError, NotLockedError, AlreadyLockedError, + LockTimedOutError, ) from leap.soledad.common.crypto import ( MacMethods, @@ -245,6 +248,12 @@ class Soledad(object): Prefix for default values for path. """ + syncing_lock = defaultdict(Lock) + """ + A dictionary that hold locks which avoid multiple sync attempts from the + same database replica. + """ + def __init__(self, uuid, passphrase, secrets_path, local_db_path, server_url, cert_file, auth_token=None, secret_id=None): """ @@ -402,6 +411,8 @@ class Soledad(object): token, timeout = self._shared_db.lock() except AlreadyLockedError: raise BootstrapSequenceError('Database is already locked.') + except LockTimedOutError: + raise BootstrapSequenceError('Lock operation timed out.') try: self._get_or_gen_crypto_secrets() @@ -767,7 +778,7 @@ class Soledad(object): ============================== WARNING ============================== This method converts the document's contents to unicode in-place. This - meanse that after calling C{put_doc(doc)}, the contents of the + means that after calling C{put_doc(doc)}, the contents of the document, i.e. C{doc.content}, might be different from before the call. ============================== WARNING ============================== @@ -824,9 +835,9 @@ class Soledad(object): in matching doc_ids order. :rtype: generator """ - return self._db.get_docs(doc_ids, - check_for_conflicts=check_for_conflicts, - include_deleted=include_deleted) + return self._db.get_docs( + doc_ids, check_for_conflicts=check_for_conflicts, + include_deleted=include_deleted) def get_all_docs(self, include_deleted=False): """Get the JSON content for all documents in the database. @@ -842,7 +853,7 @@ class Soledad(object): def _convert_to_unicode(self, content): """ - Converts content to utf8 (or all the strings in content) + Converts content to unicode (or all the strings in content) NOTE: Even though this method supports any type, it will currently ignore contents of lists, tuple or any other @@ -857,13 +868,14 @@ class Soledad(object): if isinstance(content, unicode): return content elif isinstance(content, str): + result = chardet.detect(content) + default = "utf-8" + encoding = result["encoding"] or default try: - result = chardet.detect(content) - default = "utf-8" - encoding = result["encoding"] or default content = content.decode(encoding) - except UnicodeError: - pass + except UnicodeError as e: + logger.error("Unicode error: {0!r}. Using 'replace'".format(e)) + content = content.decode(encoding, 'replace') return content else: if isinstance(content, dict): @@ -928,7 +940,8 @@ class Soledad(object): "number(fieldname, width)", "lower(fieldname)" """ if self._db: - return self._db.create_index(index_name, *index_expressions) + return self._db.create_index( + index_name, *index_expressions) def delete_index(self, index_name): """ @@ -1063,6 +1076,9 @@ class Soledad(object): """ Synchronize the local encrypted replica with a remote replica. + This method blocks until a syncing lock is acquired, so there are no + attempts of concurrent syncs from the same client replica. + :param url: the url of the target replica to sync with :type url: str @@ -1071,11 +1087,13 @@ class Soledad(object): :rtype: str """ if self._db: - local_gen = self._db.sync( - urlparse.urljoin(self.server_url, 'user-%s' % self._uuid), - creds=self._creds, autocreate=True) - signal(SOLEDAD_DONE_DATA_SYNC, self._uuid) - return local_gen + # acquire lock before attempt to sync + with Soledad.syncing_lock[self._db._get_replica_uid()]: + local_gen = self._db.sync( + urlparse.urljoin(self.server_url, 'user-%s' % self._uuid), + creds=self._creds, autocreate=False) + signal(SOLEDAD_DONE_DATA_SYNC, self._uuid) + return local_gen def need_sync(self, url): """ @@ -1193,7 +1211,7 @@ class Soledad(object): """ soledad_assert(self.STORAGE_SECRETS_KEY in data) # check mac of the recovery document - mac_auth = False + #mac_auth = False # XXX ? mac = None if MAC_KEY in data: soledad_assert(data[MAC_KEY] is not None) @@ -1216,7 +1234,7 @@ class Soledad(object): if mac != data[MAC_KEY]: raise WrongMac('Could not authenticate recovery document\'s ' 'contents.') - mac_auth = True + #mac_auth = True # XXX ? # include secrets in the secret pool. secrets = 0 for secret_id, secret_data in data[self.STORAGE_SECRETS_KEY].items(): @@ -1293,9 +1311,17 @@ class VerifiedHTTPSConnection(httplib.HTTPSConnection): # derived from httplib.py def connect(self): - "Connect to a host on a given (SSL) port." - sock = socket.create_connection((self.host, self.port), - SOLEDAD_TIMEOUT, self.source_address) + """ + Connect to a host on a given (SSL) port. + """ + try: + source = self.source_address + sock = socket.create_connection((self.host, self.port), + SOLEDAD_TIMEOUT, source) + except AttributeError: + # source_address was introduced in 2.7 + sock = socket.create_connection((self.host, self.port), + SOLEDAD_TIMEOUT) if self._tunnel_host: self.sock = sock self._tunnel() diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py index 43c871c3..3aea340d 100644 --- a/client/src/leap/soledad/client/sqlcipher.py +++ b/client/src/leap/soledad/client/sqlcipher.py @@ -91,10 +91,10 @@ def open(path, password, create=True, document_factory=None, crypto=None, database does not already exist. :param path: The filesystem path for the database to open. - :param type: str + :type path: str :param create: True/False, should the database be created if it doesn't already exist? - :param type: bool + :param create: bool :param document_factory: A function that will be called with the same parameters as Document.__init__. :type document_factory: callable @@ -147,26 +147,30 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): _index_storage_value = 'expand referenced encrypted' k_lock = threading.Lock() + create_doc_lock = threading.Lock() + update_indexes_lock = threading.Lock() _syncer = None def __init__(self, sqlcipher_file, password, document_factory=None, crypto=None, raw_key=False, cipher='aes-256-cbc', kdf_iter=4000, cipher_page_size=1024): """ - Create a new sqlcipher file. + Connect to an existing SQLCipher database, creating a new sqlcipher + database file if needed. :param sqlcipher_file: The path for the SQLCipher file. :type sqlcipher_file: str :param password: The password that protects the SQLCipher db. :type password: str :param document_factory: A function that will be called with the same - parameters as Document.__init__. + parameters as Document.__init__. :type document_factory: callable :param crypto: An instance of SoledadCrypto so we can encrypt/decrypt - document contents when syncing. + document contents when syncing. :type crypto: soledad.crypto.SoledadCrypto - :param raw_key: Whether C{password} is a raw 64-char hex string or a - passphrase that should be hashed to obtain the encyrption key. + :param raw_key: Whether password is a raw 64-char hex string or a + passphrase that should be hashed to obtain the + encyrption key. :type raw_key: bool :param cipher: The cipher and mode to use. :type cipher: str @@ -190,6 +194,13 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): self._set_crypto_pragmas( self._db_handle, password, raw_key, cipher, kdf_iter, cipher_page_size) + if os.environ.get('LEAP_SQLITE_NOSYNC'): + self._pragma_synchronous_off(self._db_handle) + else: + self._pragma_synchronous_normal(self._db_handle) + if os.environ.get('LEAP_SQLITE_MEMSTORE'): + self._pragma_mem_temp_store(self._db_handle) + self._pragma_write_ahead_logging(self._db_handle) self._real_replica_uid = None self._ensure_schema() self._crypto = crypto @@ -396,6 +407,22 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): 'ALTER TABLE document ' 'ADD COLUMN syncable BOOL NOT NULL DEFAULT TRUE') + def create_doc(self, content, doc_id=None): + """ + Create a new document in the local encrypted database. + + :param content: the contents of the new document + :type content: dict + :param doc_id: an optional identifier specifying the document id + :type doc_id: str + + :return: the new document + :rtype: SoledadDocument + """ + with self.create_doc_lock: + return sqlite_backend.SQLitePartialExpandDatabase.create_doc( + self, content, doc_id=doc_id) + def _put_and_update_indexes(self, old_doc, doc): """ Update a document and all indexes related to it. @@ -405,12 +432,13 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): :param doc: The new version of the document. :type doc: u1db.Document """ - sqlite_backend.SQLitePartialExpandDatabase._put_and_update_indexes( - self, old_doc, doc) - c = self._db_handle.cursor() - c.execute('UPDATE document SET syncable=? ' - 'WHERE doc_id=?', - (doc.syncable, doc.doc_id)) + with self.update_indexes_lock: + sqlite_backend.SQLitePartialExpandDatabase._put_and_update_indexes( + self, old_doc, doc) + c = self._db_handle.cursor() + c.execute('UPDATE document SET syncable=? ' + 'WHERE doc_id=?', + (doc.syncable, doc.doc_id)) def _get_doc(self, doc_id, check_for_conflicts=False): """ @@ -734,6 +762,64 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): # XXX change passphrase param! db_handle.cursor().execute('PRAGMA rekey = "x\'%s"' % passphrase) + @classmethod + def _pragma_synchronous_off(cls, db_handle): + """ + Change the setting of the "synchronous" flag to OFF. + """ + logger.debug("SQLCIPHER: SETTING SYNCHRONOUS OFF") + db_handle.cursor().execute('PRAGMA synchronous=OFF') + + @classmethod + def _pragma_synchronous_normal(cls, db_handle): + """ + Change the setting of the "synchronous" flag to NORMAL. + """ + logger.debug("SQLCIPHER: SETTING SYNCHRONOUS NORMAL") + db_handle.cursor().execute('PRAGMA synchronous=NORMAL') + + @classmethod + def _pragma_mem_temp_store(cls, db_handle): + """ + Use a in-memory store for temporary tables. + """ + logger.debug("SQLCIPHER: SETTING TEMP_STORE MEMORY") + db_handle.cursor().execute('PRAGMA temp_store=MEMORY') + + @classmethod + def _pragma_write_ahead_logging(cls, db_handle): + """ + Enable write-ahead logging, and set the autocheckpoint to 50 pages. + + Setting the autocheckpoint to a small value, we make the reads not + suffer too much performance degradation. + + From the sqlite docs: + + "There is a tradeoff between average read performance and average write + performance. To maximize the read performance, one wants to keep the + WAL as small as possible and hence run checkpoints frequently, perhaps + as often as every COMMIT. To maximize write performance, one wants to + amortize the cost of each checkpoint over as many writes as possible, + meaning that one wants to run checkpoints infrequently and let the WAL + grow as large as possible before each checkpoint. The decision of how + often to run checkpoints may therefore vary from one application to + another depending on the relative read and write performance + requirements of the application. The default strategy is to run a + checkpoint once the WAL reaches 1000 pages" + """ + logger.debug("SQLCIPHER: SETTING WRITE-AHEAD LOGGING") + db_handle.cursor().execute('PRAGMA journal_mode=WAL') + # The optimum value can still use a little bit of tuning, but we favor + # small sizes of the WAL file to get fast reads, since we assume that + # the writes will be quick enough to not block too much. + + # TODO + # As a further improvement, we might want to set autocheckpoint to 0 + # here and do the checkpoints manually in a separate thread, to avoid + # any blocks in the main thread (we should run a loopingcall from here) + db_handle.cursor().execute('PRAGMA wal_autocheckpoint=50') + # Extra query methods: extensions to the base sqlite implmentation. def get_count_from_index(self, index_name, *key_values): |