summaryrefslogtreecommitdiff
path: root/client/src/leap/soledad
diff options
context:
space:
mode:
authorKali Kaneko <kali@leap.se>2015-02-11 14:03:29 -0400
committerKali Kaneko <kali@leap.se>2015-02-11 14:03:29 -0400
commit58256957af8329f49d983852063eeaec74179c4d (patch)
tree4d95bf772a8845669bf9b5c7b6dc26f58bfdaa59 /client/src/leap/soledad
parentfa8dacef003d30cd9b56f7e2b07baa3b387c1e20 (diff)
parent14f34b1f64a667bf4a146e8579f95c5d308a1f77 (diff)
Merge branch 'feature/async-api' into develop
Diffstat (limited to 'client/src/leap/soledad')
-rw-r--r--client/src/leap/soledad/client/__init__.py823
-rw-r--r--client/src/leap/soledad/client/adbapi.py270
-rw-r--r--client/src/leap/soledad/client/api.py873
-rw-r--r--client/src/leap/soledad/client/crypto.py30
-rw-r--r--client/src/leap/soledad/client/examples/README4
-rw-r--r--client/src/leap/soledad/client/examples/benchmarks/.gitignore1
-rwxr-xr-xclient/src/leap/soledad/client/examples/benchmarks/get_sample.sh3
-rw-r--r--client/src/leap/soledad/client/examples/benchmarks/measure_index_times.py177
-rw-r--r--client/src/leap/soledad/client/examples/benchmarks/measure_index_times_custom_docid.py177
-rw-r--r--client/src/leap/soledad/client/examples/compare.txt8
-rw-r--r--client/src/leap/soledad/client/examples/manifest.phk50
-rw-r--r--client/src/leap/soledad/client/examples/plot-async-db.py45
-rw-r--r--client/src/leap/soledad/client/examples/run_benchmark.py28
-rw-r--r--client/src/leap/soledad/client/examples/soledad_sync.py65
-rw-r--r--client/src/leap/soledad/client/examples/use_adbapi.py103
-rw-r--r--client/src/leap/soledad/client/examples/use_api.py67
-rw-r--r--client/src/leap/soledad/client/interfaces.py362
-rw-r--r--client/src/leap/soledad/client/mp_safe_db.py112
-rw-r--r--client/src/leap/soledad/client/pragmas.py336
-rw-r--r--client/src/leap/soledad/client/secrets.py108
-rw-r--r--client/src/leap/soledad/client/shared_db.py57
-rw-r--r--client/src/leap/soledad/client/sqlcipher.py1473
-rw-r--r--client/src/leap/soledad/client/sync.py16
-rw-r--r--client/src/leap/soledad/client/target.py94
24 files changed, 3366 insertions, 1916 deletions
diff --git a/client/src/leap/soledad/client/__init__.py b/client/src/leap/soledad/client/__init__.py
index 0750dfbe..245a8971 100644
--- a/client/src/leap/soledad/client/__init__.py
+++ b/client/src/leap/soledad/client/__init__.py
@@ -16,827 +16,12 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
Soledad - Synchronization Of Locally Encrypted Data Among Devices.
-
-Soledad is the part of LEAP that manages storage and synchronization of
-application data. It is built on top of U1DB reference Python API and
-implements (1) a SQLCipher backend for local storage in the client, (2) a
-SyncTarget that encrypts data before syncing, and (3) a CouchDB backend for
-remote storage in the server side.
-"""
-import binascii
-import errno
-import httplib
-import logging
-import os
-import socket
-import ssl
-import urlparse
-
-
-try:
- import cchardet as chardet
-except ImportError:
- import chardet
-
-from u1db.remote import http_client
-from u1db.remote.ssl_match_hostname import match_hostname
-
-from leap.common.config import get_path_prefix
-from leap.soledad.common import (
- SHARED_DB_NAME,
- soledad_assert,
- soledad_assert_type
-)
-from leap.soledad.client.events import (
- SOLEDAD_NEW_DATA_TO_SYNC,
- SOLEDAD_DONE_DATA_SYNC,
- signal,
-)
-from leap.soledad.common.document import SoledadDocument
-from leap.soledad.client.crypto import SoledadCrypto
-from leap.soledad.client.secrets import SoledadSecrets
-from leap.soledad.client.shared_db import SoledadSharedDatabase
-from leap.soledad.client.sqlcipher import open as sqlcipher_open
-from leap.soledad.client.sqlcipher import SQLCipherDatabase
-from leap.soledad.client.target import SoledadSyncTarget
-
-
-logger = logging.getLogger(name=__name__)
-
-
-#
-# Constants
-#
-
-SOLEDAD_CERT = None
"""
-Path to the certificate file used to certify the SSL connection between
-Soledad client and server.
-"""
-
-
-#
-# Soledad: local encrypted storage and remote encrypted sync.
-#
-
-class Soledad(object):
- """
- Soledad provides encrypted data storage and sync.
-
- A Soledad instance is used to store and retrieve data in a local encrypted
- database and synchronize this database with Soledad server.
-
- This class is also responsible for bootstrapping users' account by
- creating cryptographic secrets and/or storing/fetching them on Soledad
- server.
-
- Soledad uses C{leap.common.events} to signal events. The possible events
- to be signaled are:
-
- SOLEDAD_CREATING_KEYS: emitted during bootstrap sequence when key
- generation starts.
- SOLEDAD_DONE_CREATING_KEYS: emitted during bootstrap sequence when key
- generation finishes.
- SOLEDAD_UPLOADING_KEYS: emitted during bootstrap sequence when soledad
- starts sending keys to server.
- SOLEDAD_DONE_UPLOADING_KEYS: emitted during bootstrap sequence when
- soledad finishes sending keys to server.
- SOLEDAD_DOWNLOADING_KEYS: emitted during bootstrap sequence when
- soledad starts to retrieve keys from server.
- SOLEDAD_DONE_DOWNLOADING_KEYS: emitted during bootstrap sequence when
- soledad finishes downloading keys from server.
- SOLEDAD_NEW_DATA_TO_SYNC: emitted upon call to C{need_sync()} when
- there's indeed new data to be synchronized between local database
- replica and server's replica.
- SOLEDAD_DONE_DATA_SYNC: emitted inside C{sync()} method when it has
- finished synchronizing with remote replica.
- """
-
- LOCAL_DATABASE_FILE_NAME = 'soledad.u1db'
- """
- The name of the local SQLCipher U1DB database file.
- """
-
- STORAGE_SECRETS_FILE_NAME = "soledad.json"
- """
- The name of the file where the storage secrets will be stored.
- """
-
- DEFAULT_PREFIX = os.path.join(get_path_prefix(), 'leap', 'soledad')
- """
- Prefix for default values for path.
- """
-
- def __init__(self, uuid, passphrase, secrets_path, local_db_path,
- server_url, cert_file,
- auth_token=None, secret_id=None, defer_encryption=False):
- """
- Initialize configuration, cryptographic keys and dbs.
-
- :param uuid: User's uuid.
- :type uuid: str
-
- :param passphrase: The passphrase for locking and unlocking encryption
- secrets for local and remote storage.
- :type passphrase: unicode
-
- :param secrets_path: Path for storing encrypted key used for
- symmetric encryption.
- :type secrets_path: str
-
- :param local_db_path: Path for local encrypted storage db.
- :type local_db_path: str
-
- :param server_url: URL for Soledad server. This is used either to sync
- with the user's remote db and to interact with the
- shared recovery database.
- :type server_url: str
-
- :param cert_file: Path to the certificate of the ca used
- to validate the SSL certificate used by the remote
- soledad server.
- :type cert_file: str
-
- :param auth_token: Authorization token for accessing remote databases.
- :type auth_token: str
-
- :param secret_id: The id of the storage secret to be used.
- :type secret_id: str
-
- :param defer_encryption: Whether to defer encryption/decryption of
- documents, or do it inline while syncing.
- :type defer_encryption: bool
-
- :raise BootstrapSequenceError: Raised when the secret generation and
- storage on server sequence has failed
- for some reason.
- """
- # store config params
- self._uuid = uuid
- self._passphrase = passphrase
- self._secrets_path = secrets_path
- self._local_db_path = local_db_path
- self._server_url = server_url
- # configure SSL certificate
- global SOLEDAD_CERT
- SOLEDAD_CERT = cert_file
- self._set_token(auth_token)
- self._defer_encryption = defer_encryption
-
- self._init_config()
- self._init_dirs()
-
- # init crypto variables
- self._shared_db_instance = None
- self._crypto = SoledadCrypto(self)
- self._secrets = SoledadSecrets(
- self._uuid,
- self._passphrase,
- self._secrets_path,
- self._shared_db,
- self._crypto,
- secret_id=secret_id)
-
- # initiate bootstrap sequence
- self._bootstrap() # might raise BootstrapSequenceError()
-
- def _init_config(self):
- """
- Initialize configuration using default values for missing params.
- """
- soledad_assert_type(self._passphrase, unicode)
- # initialize secrets_path
- if self._secrets_path is None:
- self._secrets_path = os.path.join(
- self.DEFAULT_PREFIX, self.STORAGE_SECRETS_FILE_NAME)
- # initialize local_db_path
- if self._local_db_path is None:
- self._local_db_path = os.path.join(
- self.DEFAULT_PREFIX, self.LOCAL_DATABASE_FILE_NAME)
- # initialize server_url
- soledad_assert(
- self._server_url is not None,
- 'Missing URL for Soledad server.')
-
- #
- # initialization/destruction methods
- #
-
- def _bootstrap(self):
- """
- Bootstrap local Soledad instance.
-
- :raise BootstrapSequenceError: Raised when the secret generation and
- storage on server sequence has failed for some reason.
- """
- try:
- self._secrets.bootstrap()
- self._init_db()
- except:
- raise
-
- def _init_dirs(self):
- """
- Create work directories.
-
- :raise OSError: in case file exists and is not a dir.
- """
- paths = map(
- lambda x: os.path.dirname(x),
- [self._local_db_path, self._secrets_path])
- for path in paths:
- try:
- if not os.path.isdir(path):
- logger.info('Creating directory: %s.' % path)
- os.makedirs(path)
- except OSError as exc:
- if exc.errno == errno.EEXIST and os.path.isdir(path):
- pass
- else:
- raise
-
- def _init_db(self):
- """
- Initialize the U1DB SQLCipher database for local storage.
-
- Currently, Soledad uses the default SQLCipher cipher, i.e.
- 'aes-256-cbc'. We use scrypt to derive a 256-bit encryption key and
- uses the 'raw PRAGMA key' format to handle the key to SQLCipher.
- """
- key = self._secrets.get_local_storage_key()
- sync_db_key = self._secrets.get_sync_db_key()
- self._db = sqlcipher_open(
- self._local_db_path,
- binascii.b2a_hex(key), # sqlcipher only accepts the hex version
- create=True,
- document_factory=SoledadDocument,
- crypto=self._crypto,
- raw_key=True,
- defer_encryption=self._defer_encryption,
- sync_db_key=binascii.b2a_hex(sync_db_key))
-
- def close(self):
- """
- Close underlying U1DB database.
- """
- logger.debug("Closing soledad")
- if hasattr(self, '_db') and isinstance(
- self._db,
- SQLCipherDatabase):
- self._db.stop_sync()
- self._db.close()
-
- @property
- def _shared_db(self):
- """
- Return an instance of the shared recovery database object.
-
- :return: The shared database.
- :rtype: SoledadSharedDatabase
- """
- if self._shared_db_instance is None:
- self._shared_db_instance = SoledadSharedDatabase.open_database(
- urlparse.urljoin(self.server_url, SHARED_DB_NAME),
- self._uuid,
- False, # db should exist at this point.
- creds=self._creds)
- return self._shared_db_instance
-
- #
- # Document storage, retrieval and sync.
- #
-
- def put_doc(self, doc):
- """
- Update a document in the local encrypted database.
-
- ============================== WARNING ==============================
- This method converts the document's contents to unicode in-place. This
- means that after calling C{put_doc(doc)}, the contents of the
- document, i.e. C{doc.content}, might be different from before the
- call.
- ============================== WARNING ==============================
-
- :param doc: the document to update
- :type doc: SoledadDocument
-
- :return: the new revision identifier for the document
- :rtype: str
- """
- doc.content = self._convert_to_unicode(doc.content)
- return self._db.put_doc(doc)
-
- def delete_doc(self, doc):
- """
- Delete a document from the local encrypted database.
-
- :param doc: the document to delete
- :type doc: SoledadDocument
-
- :return: the new revision identifier for the document
- :rtype: str
- """
- return self._db.delete_doc(doc)
-
- def get_doc(self, doc_id, include_deleted=False):
- """
- Retrieve a document from the local encrypted database.
-
- :param doc_id: the unique document identifier
- :type doc_id: str
- :param include_deleted: if True, deleted documents will be
- returned with empty content; otherwise asking
- for a deleted document will return None
- :type include_deleted: bool
-
- :return: the document object or None
- :rtype: SoledadDocument
- """
- return self._db.get_doc(doc_id, include_deleted=include_deleted)
-
- def get_docs(self, doc_ids, check_for_conflicts=True,
- include_deleted=False):
- """
- Get the content for many documents.
-
- :param doc_ids: a list of document identifiers
- :type doc_ids: list
- :param check_for_conflicts: if set False, then the conflict check will
- be skipped, and 'None' will be returned instead of True/False
- :type check_for_conflicts: bool
-
- :return: iterable giving the Document object for each document id
- in matching doc_ids order.
- :rtype: generator
- """
- return self._db.get_docs(
- doc_ids, check_for_conflicts=check_for_conflicts,
- include_deleted=include_deleted)
-
- def get_all_docs(self, include_deleted=False):
- """Get the JSON content for all documents in the database.
-
- :param include_deleted: If set to True, deleted documents will be
- returned with empty content. Otherwise deleted
- documents will not be included in the results.
- :return: (generation, [Document])
- The current generation of the database, followed by a list of
- all the documents in the database.
- """
- return self._db.get_all_docs(include_deleted)
-
- def _convert_to_unicode(self, content):
- """
- Converts content to unicode (or all the strings in content)
-
- NOTE: Even though this method supports any type, it will
- currently ignore contents of lists, tuple or any other
- iterable than dict. We don't need support for these at the
- moment
-
- :param content: content to convert
- :type content: object
-
- :rtype: object
- """
- if isinstance(content, unicode):
- return content
- elif isinstance(content, str):
- result = chardet.detect(content)
- default = "utf-8"
- encoding = result["encoding"] or default
- try:
- content = content.decode(encoding)
- except UnicodeError as e:
- logger.error("Unicode error: {0!r}. Using 'replace'".format(e))
- content = content.decode(encoding, 'replace')
- return content
- else:
- if isinstance(content, dict):
- for key in content.keys():
- content[key] = self._convert_to_unicode(content[key])
- return content
-
- def create_doc(self, content, doc_id=None):
- """
- Create a new document in the local encrypted database.
-
- :param content: the contents of the new document
- :type content: dict
- :param doc_id: an optional identifier specifying the document id
- :type doc_id: str
-
- :return: the new document
- :rtype: SoledadDocument
- """
- return self._db.create_doc(
- self._convert_to_unicode(content), doc_id=doc_id)
-
- def create_doc_from_json(self, json, doc_id=None):
- """
- Create a new document.
-
- You can optionally specify the document identifier, but the document
- must not already exist. See 'put_doc' if you want to override an
- existing document.
- If the database specifies a maximum document size and the document
- exceeds it, create will fail and raise a DocumentTooBig exception.
-
- :param json: The JSON document string
- :type json: str
- :param doc_id: An optional identifier specifying the document id.
- :type doc_id:
- :return: The new document
- :rtype: SoledadDocument
- """
- return self._db.create_doc_from_json(json, doc_id=doc_id)
-
- def create_index(self, index_name, *index_expressions):
- """
- Create an named index, which can then be queried for future lookups.
- Creating an index which already exists is not an error, and is cheap.
- Creating an index which does not match the index_expressions of the
- existing index is an error.
- Creating an index will block until the expressions have been evaluated
- and the index generated.
-
- :param index_name: A unique name which can be used as a key prefix
- :type index_name: str
- :param index_expressions: index expressions defining the index
- information.
- :type index_expressions: dict
-
- Examples:
-
- "fieldname", or "fieldname.subfieldname" to index alphabetically
- sorted on the contents of a field.
-
- "number(fieldname, width)", "lower(fieldname)"
- """
- if self._db:
- return self._db.create_index(
- index_name, *index_expressions)
-
- def delete_index(self, index_name):
- """
- Remove a named index.
-
- :param index_name: The name of the index we are removing
- :type index_name: str
- """
- if self._db:
- return self._db.delete_index(index_name)
-
- def list_indexes(self):
- """
- List the definitions of all known indexes.
-
- :return: A list of [('index-name', ['field', 'field2'])] definitions.
- :rtype: list
- """
- if self._db:
- return self._db.list_indexes()
-
- def get_from_index(self, index_name, *key_values):
- """
- Return documents that match the keys supplied.
-
- You must supply exactly the same number of values as have been defined
- in the index. It is possible to do a prefix match by using '*' to
- indicate a wildcard match. You can only supply '*' to trailing entries,
- (eg 'val', '*', '*' is allowed, but '*', 'val', 'val' is not.)
- It is also possible to append a '*' to the last supplied value (eg
- 'val*', '*', '*' or 'val', 'val*', '*', but not 'val*', 'val', '*')
-
- :param index_name: The index to query
- :type index_name: str
- :param key_values: values to match. eg, if you have
- an index with 3 fields then you would have:
- get_from_index(index_name, val1, val2, val3)
- :type key_values: tuple
- :return: List of [Document]
- :rtype: list
- """
- if self._db:
- return self._db.get_from_index(index_name, *key_values)
-
- def get_count_from_index(self, index_name, *key_values):
- """
- Return the count of the documents that match the keys and
- values supplied.
-
- :param index_name: The index to query
- :type index_name: str
- :param key_values: values to match. eg, if you have
- an index with 3 fields then you would have:
- get_from_index(index_name, val1, val2, val3)
- :type key_values: tuple
- :return: count.
- :rtype: int
- """
- if self._db:
- return self._db.get_count_from_index(index_name, *key_values)
-
- def get_range_from_index(self, index_name, start_value, end_value):
- """
- Return documents that fall within the specified range.
-
- Both ends of the range are inclusive. For both start_value and
- end_value, one must supply exactly the same number of values as have
- been defined in the index, or pass None. In case of a single column
- index, a string is accepted as an alternative for a tuple with a single
- value. It is possible to do a prefix match by using '*' to indicate
- a wildcard match. You can only supply '*' to trailing entries, (eg
- 'val', '*', '*' is allowed, but '*', 'val', 'val' is not.) It is also
- possible to append a '*' to the last supplied value (eg 'val*', '*',
- '*' or 'val', 'val*', '*', but not 'val*', 'val', '*')
-
- :param index_name: The index to query
- :type index_name: str
- :param start_values: tuples of values that define the lower bound of
- the range. eg, if you have an index with 3 fields then you would
- have: (val1, val2, val3)
- :type start_values: tuple
- :param end_values: tuples of values that define the upper bound of the
- range. eg, if you have an index with 3 fields then you would have:
- (val1, val2, val3)
- :type end_values: tuple
- :return: List of [Document]
- :rtype: list
- """
- if self._db:
- return self._db.get_range_from_index(
- index_name, start_value, end_value)
-
- def get_index_keys(self, index_name):
- """
- Return all keys under which documents are indexed in this index.
-
- :param index_name: The index to query
- :type index_name: str
- :return: [] A list of tuples of indexed keys.
- :rtype: list
- """
- if self._db:
- return self._db.get_index_keys(index_name)
-
- def get_doc_conflicts(self, doc_id):
- """
- Get the list of conflicts for the given document.
-
- :param doc_id: the document id
- :type doc_id: str
-
- :return: a list of the document entries that are conflicted
- :rtype: list
- """
- if self._db:
- return self._db.get_doc_conflicts(doc_id)
-
- def resolve_doc(self, doc, conflicted_doc_revs):
- """
- Mark a document as no longer conflicted.
-
- :param doc: a document with the new content to be inserted.
- :type doc: SoledadDocument
- :param conflicted_doc_revs: a list of revisions that the new content
- supersedes.
- :type conflicted_doc_revs: list
- """
- if self._db:
- return self._db.resolve_doc(doc, conflicted_doc_revs)
-
- def sync(self, defer_decryption=True):
- """
- Synchronize the local encrypted replica with a remote replica.
-
- This method blocks until a syncing lock is acquired, so there are no
- attempts of concurrent syncs from the same client replica.
-
- :param url: the url of the target replica to sync with
- :type url: str
-
- :param defer_decryption: Whether to defer the decryption process using
- the intermediate database. If False,
- decryption will be done inline.
- :type defer_decryption: bool
-
- :return: The local generation before the synchronisation was
- performed.
- :rtype: str
- """
- if self._db:
- try:
- local_gen = self._db.sync(
- urlparse.urljoin(self.server_url, 'user-%s' % self._uuid),
- creds=self._creds, autocreate=False,
- defer_decryption=defer_decryption)
- signal(SOLEDAD_DONE_DATA_SYNC, self._uuid)
- return local_gen
- except Exception as e:
- logger.error("Soledad exception when syncing: %s" % str(e))
-
- def stop_sync(self):
- """
- Stop the current syncing process.
- """
- if self._db:
- self._db.stop_sync()
-
- def need_sync(self, url):
- """
- Return if local db replica differs from remote url's replica.
-
- :param url: The remote replica to compare with local replica.
- :type url: str
-
- :return: Whether remote replica and local replica differ.
- :rtype: bool
- """
- target = SoledadSyncTarget(
- url, self._db._get_replica_uid(), creds=self._creds,
- crypto=self._crypto)
- info = target.get_sync_info(self._db._get_replica_uid())
- # compare source generation with target's last known source generation
- if self._db._get_generation() != info[4]:
- signal(SOLEDAD_NEW_DATA_TO_SYNC, self._uuid)
- return True
- return False
-
- @property
- def syncing(self):
- """
- Property, True if the syncer is syncing.
- """
- return self._db.syncing
-
- def _set_token(self, token):
- """
- Set the authentication token for remote database access.
-
- Build the credentials dictionary with the following format:
-
- self._{
- 'token': {
- 'uuid': '<uuid>'
- 'token': '<token>'
- }
-
- :param token: The authentication token.
- :type token: str
- """
- self._creds = {
- 'token': {
- 'uuid': self._uuid,
- 'token': token,
- }
- }
-
- def _get_token(self):
- """
- Return current token from credentials dictionary.
- """
- return self._creds['token']['token']
-
- token = property(_get_token, _set_token, doc='The authentication Token.')
-
- #
- # Setters/getters
- #
-
- def _get_uuid(self):
- return self._uuid
-
- uuid = property(_get_uuid, doc='The user uuid.')
-
- def get_secret_id(self):
- return self._secrets.secret_id
-
- def set_secret_id(self, secret_id):
- self._secrets.set_secret_id(secret_id)
-
- secret_id = property(
- get_secret_id,
- set_secret_id,
- doc='The active secret id.')
-
- def _set_secrets_path(self, secrets_path):
- self._secrets.secrets_path = secrets_path
-
- def _get_secrets_path(self):
- return self._secrets.secrets_path
-
- secrets_path = property(
- _get_secrets_path,
- _set_secrets_path,
- doc='The path for the file containing the encrypted symmetric secret.')
-
- def _get_local_db_path(self):
- return self._local_db_path
-
- local_db_path = property(
- _get_local_db_path,
- doc='The path for the local database replica.')
-
- def _get_server_url(self):
- return self._server_url
-
- server_url = property(
- _get_server_url,
- doc='The URL of the Soledad server.')
-
- @property
- def storage_secret(self):
- """
- Return the secret used for symmetric encryption.
- """
- return self._secrets.storage_secret
-
- @property
- def remote_storage_secret(self):
- """
- Return the secret used for encryption of remotely stored data.
- """
- return self._secrets.remote_storage_secret
-
- @property
- def secrets(self):
- return self._secrets
-
- @property
- def passphrase(self):
- return self._secrets.passphrase
-
- def change_passphrase(self, new_passphrase):
- """
- Change the passphrase that encrypts the storage secret.
-
- :param new_passphrase: The new passphrase.
- :type new_passphrase: unicode
-
- :raise NoStorageSecret: Raised if there's no storage secret available.
- """
- self._secrets.change_passphrase(new_passphrase)
-
-
-# ----------------------------------------------------------------------------
-# Monkey patching u1db to be able to provide a custom SSL cert
-# ----------------------------------------------------------------------------
-
-# We need a more reasonable timeout (in seconds)
-SOLEDAD_TIMEOUT = 120
-
-
-class VerifiedHTTPSConnection(httplib.HTTPSConnection):
- """
- HTTPSConnection verifying server side certificates.
- """
- # derived from httplib.py
-
- def connect(self):
- """
- Connect to a host on a given (SSL) port.
- """
- try:
- source = self.source_address
- sock = socket.create_connection((self.host, self.port),
- SOLEDAD_TIMEOUT, source)
- except AttributeError:
- # source_address was introduced in 2.7
- sock = socket.create_connection((self.host, self.port),
- SOLEDAD_TIMEOUT)
- if self._tunnel_host:
- self.sock = sock
- self._tunnel()
-
- highest_supported = ssl.PROTOCOL_SSLv23
-
- try:
- # needs python 2.7.9+
- # negotiate the best available version,
- # but explicitely disabled bad ones.
- ctx = ssl.SSLContext(highest_supported)
- ctx.options |= ssl.OP_NO_SSLv2
- ctx.options |= ssl.OP_NO_SSLv3
-
- ctx.load_verify_locations(cafile=SOLEDAD_CERT)
- ctx.verify_mode = ssl.CERT_REQUIRED
- self.sock = ctx.wrap_socket(sock)
-
- except AttributeError:
- self.sock = ssl.wrap_socket(
- sock, ca_certs=SOLEDAD_CERT, cert_reqs=ssl.CERT_REQUIRED,
- ssl_version=highest_supported)
-
- match_hostname(self.sock.getpeercert(), self.host)
-
-
-old__VerifiedHTTPSConnection = http_client._VerifiedHTTPSConnection
-http_client._VerifiedHTTPSConnection = VerifiedHTTPSConnection
-
-
-__all__ = ['soledad_assert', 'Soledad']
+from leap.soledad.client.api import Soledad
+from leap.soledad.common import soledad_assert
from ._version import get_versions
__version__ = get_versions()['version']
del get_versions
+
+__all__ = ['soledad_assert', 'Soledad', '__version__']
diff --git a/client/src/leap/soledad/client/adbapi.py b/client/src/leap/soledad/client/adbapi.py
new file mode 100644
index 00000000..7ad10db5
--- /dev/null
+++ b/client/src/leap/soledad/client/adbapi.py
@@ -0,0 +1,270 @@
+# -*- coding: utf-8 -*-
+# adbapi.py
+# Copyright (C) 2013, 2014 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+An asyncrhonous interface to soledad using sqlcipher backend.
+It uses twisted.enterprise.adbapi.
+"""
+import re
+import os
+import sys
+import logging
+
+from functools import partial
+from threading import BoundedSemaphore
+
+from twisted.enterprise import adbapi
+from twisted.python import log
+from zope.proxy import ProxyBase, setProxiedObject
+from pysqlcipher.dbapi2 import OperationalError
+
+from leap.soledad.client import sqlcipher as soledad_sqlcipher
+
+
+logger = logging.getLogger(name=__name__)
+
+
+DEBUG_SQL = os.environ.get("LEAP_DEBUG_SQL")
+if DEBUG_SQL:
+ log.startLogging(sys.stdout)
+
+"""
+How long the SQLCipher connection should wait for the lock to go away until
+raising an exception.
+"""
+SQLCIPHER_CONNECTION_TIMEOUT = 10
+
+"""
+How many times a SQLCipher query should be retried in case of timeout.
+"""
+SQLCIPHER_MAX_RETRIES = 10
+
+
+def getConnectionPool(opts, openfun=None, driver="pysqlcipher"):
+ """
+ Return a connection pool.
+
+ :param opts:
+ Options for the SQLCipher connection.
+ :type opts: SQLCipherOptions
+ :param openfun:
+ Callback invoked after every connect() on the underlying DB-API
+ object.
+ :type openfun: callable
+ :param driver:
+ The connection driver.
+ :type driver: str
+
+ :return: A U1DB connection pool.
+ :rtype: U1DBConnectionPool
+ """
+ if openfun is None and driver == "pysqlcipher":
+ openfun = partial(soledad_sqlcipher.set_init_pragmas, opts=opts)
+ return U1DBConnectionPool(
+ "%s.dbapi2" % driver, database=opts.path,
+ check_same_thread=False, cp_openfun=openfun,
+ timeout=SQLCIPHER_CONNECTION_TIMEOUT)
+
+
+class U1DBConnection(adbapi.Connection):
+ """
+ A wrapper for a U1DB connection instance.
+ """
+
+ u1db_wrapper = soledad_sqlcipher.SoledadSQLCipherWrapper
+ """
+ The U1DB wrapper to use.
+ """
+
+ def __init__(self, pool, init_u1db=False):
+ """
+ :param pool: The pool of connections to that owns this connection.
+ :type pool: adbapi.ConnectionPool
+ :param init_u1db: Wether the u1db database should be initialized.
+ :type init_u1db: bool
+ """
+ self.init_u1db = init_u1db
+ adbapi.Connection.__init__(self, pool)
+
+ def reconnect(self):
+ """
+ Reconnect to the U1DB database.
+ """
+ if self._connection is not None:
+ self._pool.disconnect(self._connection)
+ self._connection = self._pool.connect()
+
+ if self.init_u1db:
+ self._u1db = self.u1db_wrapper(self._connection)
+
+ def __getattr__(self, name):
+ """
+ Route the requested attribute either to the U1DB wrapper or to the
+ connection.
+
+ :param name: The name of the attribute.
+ :type name: str
+ """
+ if name.startswith('u1db_'):
+ attr = re.sub('^u1db_', '', name)
+ return getattr(self._u1db, attr)
+ else:
+ return getattr(self._connection, name)
+
+
+class U1DBTransaction(adbapi.Transaction):
+ """
+ A wrapper for a U1DB 'cursor' object.
+ """
+
+ def __getattr__(self, name):
+ """
+ Route the requested attribute either to the U1DB wrapper of the
+ connection or to the actual connection cursor.
+
+ :param name: The name of the attribute.
+ :type name: str
+ """
+ if name.startswith('u1db_'):
+ attr = re.sub('^u1db_', '', name)
+ return getattr(self._connection._u1db, attr)
+ else:
+ return getattr(self._cursor, name)
+
+
+class U1DBConnectionPool(adbapi.ConnectionPool):
+ """
+ Represent a pool of connections to an U1DB database.
+ """
+
+ connectionFactory = U1DBConnection
+ transactionFactory = U1DBTransaction
+
+ def __init__(self, *args, **kwargs):
+ """
+ Initialize the connection pool.
+ """
+ adbapi.ConnectionPool.__init__(self, *args, **kwargs)
+ # all u1db connections, hashed by thread-id
+ self._u1dbconnections = {}
+
+ # The replica uid, primed by the connections on init.
+ self.replica_uid = ProxyBase(None)
+
+ conn = self.connectionFactory(self, init_u1db=True)
+ replica_uid = conn._u1db._real_replica_uid
+ setProxiedObject(self.replica_uid, replica_uid)
+
+ def runU1DBQuery(self, meth, *args, **kw):
+ """
+ Execute a U1DB query in a thread, using a pooled connection.
+
+ Concurrent threads trying to update the same database may timeout
+ because of other threads holding the database lock. Because of this,
+ we will retry SQLCIPHER_MAX_RETRIES times and fail after that.
+
+ :param meth: The U1DB wrapper method name.
+ :type meth: str
+
+ :return: a Deferred which will fire the return value of
+ 'self._runU1DBQuery(Transaction(...), *args, **kw)', or a Failure.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ meth = "u1db_%s" % meth
+ semaphore = BoundedSemaphore(SQLCIPHER_MAX_RETRIES - 1)
+
+ def _run_interaction():
+ return self.runInteraction(
+ self._runU1DBQuery, meth, *args, **kw)
+
+ def _errback(failure):
+ failure.trap(OperationalError)
+ if failure.getErrorMessage() == "database is locked":
+ should_retry = semaphore.acquire(False)
+ if should_retry:
+ logger.warning(
+ "Database operation timed out while waiting for "
+ "lock, trying again...")
+ return _run_interaction()
+ return failure
+
+ d = _run_interaction()
+ d.addErrback(_errback)
+ return d
+
+ def _runU1DBQuery(self, trans, meth, *args, **kw):
+ """
+ Execute a U1DB query.
+
+ :param trans: An U1DB transaction.
+ :type trans: adbapi.Transaction
+ :param meth: the U1DB wrapper method name.
+ :type meth: str
+ """
+ meth = getattr(trans, meth)
+ return meth(*args, **kw)
+
+ def _runInteraction(self, interaction, *args, **kw):
+ """
+ Interact with the database and return the result.
+
+ :param interaction:
+ A callable object whose first argument is an
+ L{adbapi.Transaction}.
+ :type interaction: callable
+ :return: a Deferred which will fire the return value of
+ 'interaction(Transaction(...), *args, **kw)', or a Failure.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ tid = self.threadID()
+ u1db = self._u1dbconnections.get(tid)
+ conn = self.connectionFactory(self, init_u1db=not bool(u1db))
+
+ if self.replica_uid is None:
+ replica_uid = conn._u1db._real_replica_uid
+ setProxiedObject(self.replica_uid, replica_uid)
+
+ if u1db is None:
+ self._u1dbconnections[tid] = conn._u1db
+ else:
+ conn._u1db = u1db
+
+ trans = self.transactionFactory(self, conn)
+ try:
+ result = interaction(trans, *args, **kw)
+ trans.close()
+ conn.commit()
+ return result
+ except:
+ excType, excValue, excTraceback = sys.exc_info()
+ try:
+ conn.rollback()
+ except:
+ log.err(None, "Rollback failed")
+ raise excType, excValue, excTraceback
+
+ def finalClose(self):
+ """
+ A final close, only called by the shutdown trigger.
+ """
+ self.shutdownID = None
+ self.threadpool.stop()
+ self.running = False
+ for conn in self.connections.values():
+ self._close(conn)
+ for u1db in self._u1dbconnections.values():
+ self._close(u1db)
+ self.connections.clear()
diff --git a/client/src/leap/soledad/client/api.py b/client/src/leap/soledad/client/api.py
new file mode 100644
index 00000000..88bb4969
--- /dev/null
+++ b/client/src/leap/soledad/client/api.py
@@ -0,0 +1,873 @@
+# -*- coding: utf-8 -*-
+# api.py
+# Copyright (C) 2013, 2014 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Soledad - Synchronization Of Locally Encrypted Data Among Devices.
+
+This module holds the public api for Soledad.
+
+Soledad is the part of LEAP that manages storage and synchronization of
+application data. It is built on top of U1DB reference Python API and
+implements (1) a SQLCipher backend for local storage in the client, (2) a
+SyncTarget that encrypts data before syncing, and (3) a CouchDB backend for
+remote storage in the server side.
+"""
+import binascii
+import errno
+import httplib
+import logging
+import os
+import socket
+import ssl
+import urlparse
+
+try:
+ import cchardet as chardet
+except ImportError:
+ import chardet
+
+from u1db.remote import http_client
+from u1db.remote.ssl_match_hostname import match_hostname
+from zope.interface import implements
+
+from twisted.python import log
+
+from leap.common.config import get_path_prefix
+
+from leap.soledad.common import SHARED_DB_NAME
+from leap.soledad.common import soledad_assert
+from leap.soledad.common import soledad_assert_type
+
+from leap.soledad.client import adbapi
+from leap.soledad.client import events as soledad_events
+from leap.soledad.client import interfaces as soledad_interfaces
+from leap.soledad.client.crypto import SoledadCrypto
+from leap.soledad.client.secrets import SoledadSecrets
+from leap.soledad.client.shared_db import SoledadSharedDatabase
+from leap.soledad.client.sqlcipher import SQLCipherOptions, SQLCipherU1DBSync
+
+logger = logging.getLogger(name=__name__)
+
+#
+# Constants
+#
+
+"""
+Path to the certificate file used to certify the SSL connection between
+Soledad client and server.
+"""
+SOLEDAD_CERT = None
+
+
+class Soledad(object):
+ """
+ Soledad provides encrypted data storage and sync.
+
+ A Soledad instance is used to store and retrieve data in a local encrypted
+ database and synchronize this database with Soledad server.
+
+ This class is also responsible for bootstrapping users' account by
+ creating cryptographic secrets and/or storing/fetching them on Soledad
+ server.
+
+ Soledad uses ``leap.common.events`` to signal events. The possible events
+ to be signaled are:
+
+ SOLEDAD_CREATING_KEYS: emitted during bootstrap sequence when key
+ generation starts.
+ SOLEDAD_DONE_CREATING_KEYS: emitted during bootstrap sequence when key
+ generation finishes.
+ SOLEDAD_UPLOADING_KEYS: emitted during bootstrap sequence when soledad
+ starts sending keys to server.
+ SOLEDAD_DONE_UPLOADING_KEYS: emitted during bootstrap sequence when
+ soledad finishes sending keys to server.
+ SOLEDAD_DOWNLOADING_KEYS: emitted during bootstrap sequence when
+ soledad starts to retrieve keys from server.
+ SOLEDAD_DONE_DOWNLOADING_KEYS: emitted during bootstrap sequence when
+ soledad finishes downloading keys from server.
+ SOLEDAD_NEW_DATA_TO_SYNC: emitted upon call to C{need_sync()} when
+ there's indeed new data to be synchronized between local database
+ replica and server's replica.
+ SOLEDAD_DONE_DATA_SYNC: emitted inside C{sync()} method when it has
+ finished synchronizing with remote replica.
+ """
+ implements(soledad_interfaces.ILocalStorage,
+ soledad_interfaces.ISyncableStorage,
+ soledad_interfaces.ISecretsStorage)
+
+ local_db_file_name = 'soledad.u1db'
+ secrets_file_name = "soledad.json"
+ default_prefix = os.path.join(get_path_prefix(), 'leap', 'soledad')
+
+ def __init__(self, uuid, passphrase, secrets_path, local_db_path,
+ server_url, cert_file, shared_db=None,
+ auth_token=None, defer_encryption=False, syncable=True):
+ """
+ Initialize configuration, cryptographic keys and dbs.
+
+ :param uuid: User's uuid.
+ :type uuid: str
+
+ :param passphrase:
+ The passphrase for locking and unlocking encryption secrets for
+ local and remote storage.
+ :type passphrase: unicode
+
+ :param secrets_path:
+ Path for storing encrypted key used for symmetric encryption.
+ :type secrets_path: str
+
+ :param local_db_path: Path for local encrypted storage db.
+ :type local_db_path: str
+
+ :param server_url:
+ URL for Soledad server. This is used either to sync with the user's
+ remote db and to interact with the shared recovery database.
+ :type server_url: str
+
+ :param cert_file:
+ Path to the certificate of the ca used to validate the SSL
+ certificate used by the remote soledad server.
+ :type cert_file: str
+
+ :param shared_db:
+ The shared database.
+ :type shared_db: HTTPDatabase
+
+ :param auth_token:
+ Authorization token for accessing remote databases.
+ :type auth_token: str
+
+ :param defer_encryption:
+ Whether to defer encryption/decryption of documents, or do it
+ inline while syncing.
+ :type defer_encryption: bool
+
+ :param syncable:
+ If set to ``False``, this database will not attempt to synchronize
+ with remote replicas (default is ``True``)
+ :type syncable: bool
+
+ :raise BootstrapSequenceError:
+ Raised when the secret initialization sequence (i.e. retrieval
+ from server or generation and storage on server) has failed for
+ some reason.
+ """
+ # store config params
+ self._uuid = uuid
+ self._passphrase = passphrase
+ self._local_db_path = local_db_path
+ self._server_url = server_url
+ self._defer_encryption = defer_encryption
+ self._secrets_path = None
+
+ self.shared_db = shared_db
+
+ # configure SSL certificate
+ global SOLEDAD_CERT
+ SOLEDAD_CERT = cert_file
+
+ # init crypto variables
+ self._set_token(auth_token)
+ self._crypto = SoledadCrypto(self)
+
+ self._init_config_with_defaults()
+ self._init_working_dirs()
+
+ self._secrets_path = secrets_path
+
+ # Initialize shared recovery database
+ self.init_shared_db(server_url, uuid, self._creds, syncable=syncable)
+
+ # The following can raise BootstrapSequenceError, that will be
+ # propagated upwards.
+ self._init_secrets()
+ self._init_u1db_sqlcipher_backend()
+
+ if syncable:
+ self._init_u1db_syncer()
+
+ #
+ # initialization/destruction methods
+ #
+ def _init_config_with_defaults(self):
+ """
+ Initialize configuration using default values for missing params.
+ """
+ soledad_assert_type(self._passphrase, unicode)
+ initialize = lambda attr, val: getattr(
+ self, attr, None) is None and setattr(self, attr, val)
+
+ initialize("_secrets_path", os.path.join(
+ self.default_prefix, self.secrets_file_name))
+ initialize("_local_db_path", os.path.join(
+ self.default_prefix, self.local_db_file_name))
+ # initialize server_url
+ soledad_assert(self._server_url is not None,
+ 'Missing URL for Soledad server.')
+
+ def _init_working_dirs(self):
+ """
+ Create work directories.
+
+ :raise OSError: in case file exists and is not a dir.
+ """
+ paths = map(lambda x: os.path.dirname(x), [
+ self._local_db_path, self._secrets_path])
+ for path in paths:
+ create_path_if_not_exists(path)
+
+ def _init_secrets(self):
+ """
+ Initialize Soledad secrets.
+ """
+ self._secrets = SoledadSecrets(
+ self.uuid, self._passphrase, self._secrets_path,
+ self.shared_db, self._crypto)
+ self._secrets.bootstrap()
+
+ def _init_u1db_sqlcipher_backend(self):
+ """
+ Initialize the U1DB SQLCipher database for local storage.
+
+ Instantiates a modified twisted adbapi that will maintain a threadpool
+ with a u1db-sqclipher connection for each thread, and will return
+ deferreds for each u1db query.
+
+ Currently, Soledad uses the default SQLCipher cipher, i.e.
+ 'aes-256-cbc'. We use scrypt to derive a 256-bit encryption key,
+ and internally the SQLCipherDatabase initialization uses the 'raw
+ PRAGMA key' format to handle the key to SQLCipher.
+ """
+ tohex = binascii.b2a_hex
+ # sqlcipher only accepts the hex version
+ key = tohex(self._secrets.get_local_storage_key())
+ sync_db_key = tohex(self._secrets.get_sync_db_key())
+
+ opts = SQLCipherOptions(
+ self._local_db_path, key,
+ is_raw_key=True, create=True,
+ defer_encryption=self._defer_encryption,
+ sync_db_key=sync_db_key,
+ )
+ self._sqlcipher_opts = opts
+ self._dbpool = adbapi.getConnectionPool(opts)
+
+ def _init_u1db_syncer(self):
+ """
+ Initialize the U1DB synchronizer.
+ """
+ replica_uid = self._dbpool.replica_uid
+ self._dbsyncer = SQLCipherU1DBSync(
+ self._sqlcipher_opts, self._crypto, replica_uid,
+ self._defer_encryption)
+
+ #
+ # Closing methods
+ #
+
+ def close(self):
+ """
+ Close underlying U1DB database.
+ """
+ logger.debug("Closing soledad")
+ self._dbpool.close()
+ if getattr(self, '_dbsyncer', None):
+ self._dbsyncer.close()
+
+ #
+ # ILocalStorage
+ #
+
+ def _defer(self, meth, *args, **kw):
+ """
+ Defer a method to be run on a U1DB connection pool.
+
+ :param meth: A method to defer to the U1DB connection pool.
+ :type meth: callable
+ :return: A deferred.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ return self._dbpool.runU1DBQuery(meth, *args, **kw)
+
+ def put_doc(self, doc):
+ """
+ Update a document.
+
+ If the document currently has conflicts, put will fail.
+ If the database specifies a maximum document size and the document
+ exceeds it, put will fail and raise a DocumentTooBig exception.
+
+ ============================== WARNING ==============================
+ This method converts the document's contents to unicode in-place. This
+ means that after calling `put_doc(doc)`, the contents of the
+ document, i.e. `doc.content`, might be different from before the
+ call.
+ ============================== WARNING ==============================
+
+ :param doc: A document with new content.
+ :type doc: leap.soledad.common.document.SoledadDocument
+ :return: A deferred whose callback will be invoked with the new
+ revision identifier for the document. The document object will
+ also be updated.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ doc.content = _convert_to_unicode(doc.content)
+ return self._defer("put_doc", doc)
+
+ def delete_doc(self, doc):
+ """
+ Mark a document as deleted.
+
+ Will abort if the current revision doesn't match doc.rev.
+ This will also set doc.content to None.
+
+ :param doc: A document to be deleted.
+ :type doc: leap.soledad.common.document.SoledadDocument
+ :return: A deferred.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ return self._defer("delete_doc", doc)
+
+ def get_doc(self, doc_id, include_deleted=False):
+ """
+ Get the JSON string for the given document.
+
+ :param doc_id: The unique document identifier
+ :type doc_id: str
+ :param include_deleted: If set to True, deleted documents will be
+ returned with empty content. Otherwise asking for a deleted
+ document will return None.
+ :type include_deleted: bool
+ :return: A deferred whose callback will be invoked with a document
+ object.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ return self._defer(
+ "get_doc", doc_id, include_deleted=include_deleted)
+
+ def get_docs(
+ self, doc_ids, check_for_conflicts=True, include_deleted=False):
+ """
+ Get the JSON content for many documents.
+
+ :param doc_ids: A list of document identifiers.
+ :type doc_ids: list
+ :param check_for_conflicts: If set to False, then the conflict check
+ will be skipped, and 'None' will be returned instead of True/False.
+ :type check_for_conflicts: bool
+ :param include_deleted: If set to True, deleted documents will be
+ returned with empty content. Otherwise deleted documents will not
+ be included in the results.
+ :type include_deleted: bool
+ :return: A deferred whose callback will be invoked with an iterable
+ giving the document object for each document id in matching
+ doc_ids order.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ return self._defer(
+ "get_docs", doc_ids, check_for_conflicts=check_for_conflicts,
+ include_deleted=include_deleted)
+
+ def get_all_docs(self, include_deleted=False):
+ """
+ Get the JSON content for all documents in the database.
+
+ :param include_deleted: If set to True, deleted documents will be
+ returned with empty content. Otherwise deleted documents will not
+ be included in the results.
+ :type include_deleted: bool
+
+ :return: A deferred which, when fired, will pass the a tuple
+ containing (generation, [Document]) to the callback, with the
+ current generation of the database, followed by a list of all the
+ documents in the database.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ return self._defer("get_all_docs", include_deleted)
+
+ def create_doc(self, content, doc_id=None):
+ """
+ Create a new document.
+
+ You can optionally specify the document identifier, but the document
+ must not already exist. See 'put_doc' if you want to override an
+ existing document.
+ If the database specifies a maximum document size and the document
+ exceeds it, create will fail and raise a DocumentTooBig exception.
+
+ :param content: A Python dictionary.
+ :type content: dict
+ :param doc_id: An optional identifier specifying the document id.
+ :type doc_id: str
+ :return: A deferred whose callback will be invoked with a document.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ return self._defer(
+ "create_doc", _convert_to_unicode(content), doc_id=doc_id)
+
+ def create_doc_from_json(self, json, doc_id=None):
+ """
+ Create a new document.
+
+ You can optionally specify the document identifier, but the document
+ must not already exist. See 'put_doc' if you want to override an
+ existing document.
+ If the database specifies a maximum document size and the document
+ exceeds it, create will fail and raise a DocumentTooBig exception.
+
+ :param json: The JSON document string
+ :type json: dict
+ :param doc_id: An optional identifier specifying the document id.
+ :type doc_id: str
+ :return: A deferred whose callback will be invoked with a document.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ return self._defer("create_doc_from_json", json, doc_id=doc_id)
+
+ def create_index(self, index_name, *index_expressions):
+ """
+ Create a named index, which can then be queried for future lookups.
+
+ Creating an index which already exists is not an error, and is cheap.
+ Creating an index which does not match the index_expressions of the
+ existing index is an error.
+ Creating an index will block until the expressions have been evaluated
+ and the index generated.
+
+ :param index_name: A unique name which can be used as a key prefix
+ :type index_name: str
+ :param index_expressions: index expressions defining the index
+ information.
+
+ Examples:
+
+ "fieldname", or "fieldname.subfieldname" to index alphabetically
+ sorted on the contents of a field.
+
+ "number(fieldname, width)", "lower(fieldname)"
+ :type index_expresions: list of str
+ :return: A deferred.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ return self._defer("create_index", index_name, *index_expressions)
+
+ def delete_index(self, index_name):
+ """
+ Remove a named index.
+
+ :param index_name: The name of the index we are removing
+ :type index_name: str
+ :return: A deferred.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ return self._defer("delete_index", index_name)
+
+ def list_indexes(self):
+ """
+ List the definitions of all known indexes.
+
+ :return: A deferred whose callback will be invoked with a list of
+ [('index-name', ['field', 'field2'])] definitions.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ return self._defer("list_indexes")
+
+ def get_from_index(self, index_name, *key_values):
+ """
+ Return documents that match the keys supplied.
+
+ You must supply exactly the same number of values as have been defined
+ in the index. It is possible to do a prefix match by using '*' to
+ indicate a wildcard match. You can only supply '*' to trailing entries,
+ (eg 'val', '*', '*' is allowed, but '*', 'val', 'val' is not.)
+ It is also possible to append a '*' to the last supplied value (eg
+ 'val*', '*', '*' or 'val', 'val*', '*', but not 'val*', 'val', '*')
+
+ :param index_name: The index to query
+ :type index_name: str
+ :param key_values: values to match. eg, if you have
+ an index with 3 fields then you would have:
+ get_from_index(index_name, val1, val2, val3)
+ :type key_values: list
+ :return: A deferred whose callback will be invoked with a list of
+ [Document].
+ :rtype: twisted.internet.defer.Deferred
+ """
+ return self._defer("get_from_index", index_name, *key_values)
+
+ def get_count_from_index(self, index_name, *key_values):
+ """
+ Return the count for a given combination of index_name
+ and key values.
+
+ Extension method made from similar methods in u1db version 13.09
+
+ :param index_name: The index to query
+ :type index_name: str
+ :param key_values: values to match. eg, if you have
+ an index with 3 fields then you would have:
+ get_from_index(index_name, val1, val2, val3)
+ :type key_values: tuple
+ :return: A deferred whose callback will be invoked with the count.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ return self._defer("get_count_from_index", index_name, *key_values)
+
+ def get_range_from_index(self, index_name, start_value, end_value):
+ """
+ Return documents that fall within the specified range.
+
+ Both ends of the range are inclusive. For both start_value and
+ end_value, one must supply exactly the same number of values as have
+ been defined in the index, or pass None. In case of a single column
+ index, a string is accepted as an alternative for a tuple with a single
+ value. It is possible to do a prefix match by using '*' to indicate
+ a wildcard match. You can only supply '*' to trailing entries, (eg
+ 'val', '*', '*' is allowed, but '*', 'val', 'val' is not.) It is also
+ possible to append a '*' to the last supplied value (eg 'val*', '*',
+ '*' or 'val', 'val*', '*', but not 'val*', 'val', '*')
+
+ :param index_name: The index to query
+ :type index_name: str
+ :param start_values: tuples of values that define the lower bound of
+ the range. eg, if you have an index with 3 fields then you would
+ have: (val1, val2, val3)
+ :type start_values: tuple
+ :param end_values: tuples of values that define the upper bound of the
+ range. eg, if you have an index with 3 fields then you would have:
+ (val1, val2, val3)
+ :type end_values: tuple
+ :return: A deferred whose callback will be invoked with a list of
+ [Document].
+ :rtype: twisted.internet.defer.Deferred
+ """
+
+ return self._defer(
+ "get_range_from_index", index_name, start_value, end_value)
+
+ def get_index_keys(self, index_name):
+ """
+ Return all keys under which documents are indexed in this index.
+
+ :param index_name: The index to query
+ :type index_name: str
+ :return: A deferred whose callback will be invoked with a list of
+ tuples of indexed keys.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ return self._defer("get_index_keys", index_name)
+
+ def get_doc_conflicts(self, doc_id):
+ """
+ Get the list of conflicts for the given document.
+
+ The order of the conflicts is such that the first entry is the value
+ that would be returned by "get_doc".
+
+ :param doc_id: The unique document identifier
+ :type doc_id: str
+ :return: A deferred whose callback will be invoked with a list of the
+ Document entries that are conflicted.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ return self._defer("get_doc_conflicts", doc_id)
+
+ def resolve_doc(self, doc, conflicted_doc_revs):
+ """
+ Mark a document as no longer conflicted.
+
+ We take the list of revisions that the client knows about that it is
+ superseding. This may be a different list from the actual current
+ conflicts, in which case only those are removed as conflicted. This
+ may fail if the conflict list is significantly different from the
+ supplied information. (sync could have happened in the background from
+ the time you GET_DOC_CONFLICTS until the point where you RESOLVE)
+
+ :param doc: A Document with the new content to be inserted.
+ :type doc: SoledadDocument
+ :param conflicted_doc_revs: A list of revisions that the new content
+ supersedes.
+ :type conflicted_doc_revs: list(str)
+ :return: A deferred.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ return self._defer("resolve_doc", doc, conflicted_doc_revs)
+
+ @property
+ def local_db_path(self):
+ return self._local_db_path
+
+ @property
+ def uuid(self):
+ return self._uuid
+
+ #
+ # ISyncableStorage
+ #
+
+ def sync(self, defer_decryption=True):
+ """
+ Synchronize documents with the server replica.
+
+ :param defer_decryption:
+ Whether to defer decryption of documents, or do it inline while
+ syncing.
+ :type defer_decryption: bool
+ :return: A deferred whose callback will be invoked with the local
+ generation before the synchronization was performed.
+ :rtype: twisted.internet.defer.Deferred
+ """
+
+ # -----------------------------------------------------------------
+ # TODO this needs work.
+ # Should review/write tests to check that this:
+
+ # (1) Defer to the syncer pool -- DONE (on dbsyncer)
+ # (2) Return the deferred
+ # (3) Add the callback for signaling the event (executed on reactor
+ # thread)
+ # (4) Check that the deferred is called with the local gen.
+
+ # -----------------------------------------------------------------
+
+ def on_sync_done(local_gen):
+ soledad_events.signal(
+ soledad_events.SOLEDAD_DONE_DATA_SYNC, self.uuid)
+ return local_gen
+
+ sync_url = urlparse.urljoin(self._server_url, 'user-%s' % self.uuid)
+ try:
+ d = self._dbsyncer.sync(
+ sync_url,
+ creds=self._creds, autocreate=False,
+ defer_decryption=defer_decryption)
+
+ d.addCallbacks(on_sync_done, lambda err: log.err(err))
+ return d
+
+ # TODO catch the exception by adding an Errback
+ except Exception as e:
+ logger.error("Soledad exception when syncing: %s" % str(e))
+
+ def stop_sync(self):
+ self._dbsyncer.stop_sync()
+
+ @property
+ def syncing(self):
+ """
+ Return wether Soledad is currently synchronizing with the server.
+
+ :return: Wether Soledad is currently synchronizing with the server.
+ :rtype: bool
+ """
+ return self._dbsyncer.syncing
+
+ def _set_token(self, token):
+ """
+ Set the authentication token for remote database access.
+
+ Internally, this builds the credentials dictionary with the following
+ format:
+
+ {
+ 'token': {
+ 'uuid': '<uuid>'
+ 'token': '<token>'
+ }
+ }
+
+ :param token: The authentication token.
+ :type token: str
+ """
+ self._creds = {
+ 'token': {
+ 'uuid': self.uuid,
+ 'token': token,
+ }
+ }
+
+ def _get_token(self):
+ """
+ Return current token from credentials dictionary.
+ """
+ return self._creds['token']['token']
+
+ token = property(_get_token, _set_token, doc='The authentication Token.')
+
+ #
+ # ISecretsStorage
+ #
+
+ def init_shared_db(self, server_url, uuid, creds, syncable=True):
+ """
+ Initialize the shared database.
+
+ :param server_url: URL of the remote database.
+ :type server_url: str
+ :param uuid: The user's unique id.
+ :type uuid: str
+ :param creds: A tuple containing the authentication method and
+ credentials.
+ :type creds: tuple
+ :param syncable:
+ If syncable is False, the database will not attempt to sync against
+ a remote replica.
+ :type syncable: bool
+ """
+ # only case this is False is for testing purposes
+ if self.shared_db is None:
+ shared_db_url = urlparse.urljoin(server_url, SHARED_DB_NAME)
+ self.shared_db = SoledadSharedDatabase.open_database(
+ shared_db_url,
+ uuid,
+ creds=creds,
+ syncable=syncable)
+
+ @property
+ def storage_secret(self):
+ """
+ Return the secret used for local storage encryption.
+
+ :return: The secret used for local storage encryption.
+ :rtype: str
+ """
+ return self._secrets.storage_secret
+
+ @property
+ def remote_storage_secret(self):
+ """
+ Return the secret used for encryption of remotely stored data.
+
+ :return: The secret used for remote storage encryption.
+ :rtype: str
+ """
+ return self._secrets.remote_storage_secret
+
+ @property
+ def secrets(self):
+ """
+ Return the secrets object.
+
+ :return: The secrets object.
+ :rtype: SoledadSecrets
+ """
+ return self._secrets
+
+ def change_passphrase(self, new_passphrase):
+ """
+ Change the passphrase that encrypts the storage secret.
+
+ :param new_passphrase: The new passphrase.
+ :type new_passphrase: unicode
+
+ :raise NoStorageSecret: Raised if there's no storage secret available.
+ """
+ self._secrets.change_passphrase(new_passphrase)
+
+ #
+ # Raw SQLCIPHER Queries
+ #
+
+ def raw_sqlcipher_query(self, *args, **kw):
+ """
+ Run a raw sqlcipher query in the local database.
+ """
+ return self._dbpool.runQuery(*args, **kw)
+
+
+def _convert_to_unicode(content):
+ """
+ Convert content to unicode (or all the strings in content).
+
+ NOTE: Even though this method supports any type, it will
+ currently ignore contents of lists, tuple or any other
+ iterable than dict. We don't need support for these at the
+ moment
+
+ :param content: content to convert
+ :type content: object
+
+ :rtype: object
+ """
+ if isinstance(content, unicode):
+ return content
+ elif isinstance(content, str):
+ result = chardet.detect(content)
+ default = "utf-8"
+ encoding = result["encoding"] or default
+ try:
+ content = content.decode(encoding)
+ except UnicodeError as e:
+ logger.error("Unicode error: {0!r}. Using 'replace'".format(e))
+ content = content.decode(encoding, 'replace')
+ return content
+ else:
+ if isinstance(content, dict):
+ for key in content.keys():
+ content[key] = _convert_to_unicode(content[key])
+ return content
+
+
+def create_path_if_not_exists(path):
+ try:
+ if not os.path.isdir(path):
+ logger.info('Creating directory: %s.' % path)
+ os.makedirs(path)
+ except OSError as exc:
+ if exc.errno == errno.EEXIST and os.path.isdir(path):
+ pass
+ else:
+ raise
+
+# ----------------------------------------------------------------------------
+# Monkey patching u1db to be able to provide a custom SSL cert
+# ----------------------------------------------------------------------------
+
+# We need a more reasonable timeout (in seconds)
+SOLEDAD_TIMEOUT = 120
+
+
+class VerifiedHTTPSConnection(httplib.HTTPSConnection):
+ """
+ HTTPSConnection verifying server side certificates.
+ """
+ # derived from httplib.py
+
+ def connect(self):
+ """
+ Connect to a host on a given (SSL) port.
+ """
+ try:
+ source = self.source_address
+ sock = socket.create_connection((self.host, self.port),
+ SOLEDAD_TIMEOUT, source)
+ except AttributeError:
+ # source_address was introduced in 2.7
+ sock = socket.create_connection((self.host, self.port),
+ SOLEDAD_TIMEOUT)
+ if self._tunnel_host:
+ self.sock = sock
+ self._tunnel()
+
+ self.sock = ssl.wrap_socket(sock,
+ ca_certs=SOLEDAD_CERT,
+ cert_reqs=ssl.CERT_REQUIRED)
+ match_hostname(self.sock.getpeercert(), self.host)
+
+
+old__VerifiedHTTPSConnection = http_client._VerifiedHTTPSConnection
+http_client._VerifiedHTTPSConnection = VerifiedHTTPSConnection
diff --git a/client/src/leap/soledad/client/crypto.py b/client/src/leap/soledad/client/crypto.py
index d6d9a618..950576ec 100644
--- a/client/src/leap/soledad/client/crypto.py
+++ b/client/src/leap/soledad/client/crypto.py
@@ -521,7 +521,7 @@ class SyncEncryptDecryptPool(object):
"""
Base class for encrypter/decrypter pools.
"""
- WORKERS = 5
+ WORKERS = multiprocessing.cpu_count()
def __init__(self, crypto, sync_db, write_lock):
"""
@@ -530,8 +530,8 @@ class SyncEncryptDecryptPool(object):
:param crypto: A SoledadCryto instance to perform the encryption.
:type crypto: leap.soledad.crypto.SoledadCrypto
- :param sync_db: a database connection handle
- :type sync_db: handle
+ :param sync_db: A database connection handle
+ :type sync_db: pysqlcipher.dbapi2.Connection
:param write_lock: a write lock for controlling concurrent access
to the sync_db
@@ -590,7 +590,7 @@ class SyncEncrypterPool(SyncEncryptDecryptPool):
of documents to be synced.
"""
# TODO implement throttling to reduce cpu usage??
- WORKERS = 5
+ WORKERS = multiprocessing.cpu_count()
TABLE_NAME = "docs_tosync"
FIELD_NAMES = "doc_id, rev, content"
@@ -909,8 +909,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
if encrypted is not None:
sql += " WHERE encrypted = %d" % int(encrypted)
sql += " ORDER BY gen ASC"
- docs = self._sync_db.select(sql)
- return docs
+ return self._fetchall(sql)
def get_insertable_docs_by_gen(self):
"""
@@ -927,15 +926,12 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
decrypted_docs = self.get_docs_by_generation(encrypted=False)
insertable = []
for doc_id, rev, _, gen, trans_id, encrypted in all_docs:
- try:
- next_doc_id, _, next_content, _, _, _ = decrypted_docs.next()
+ for next_doc_id, _, next_content, _, _, _ in decrypted_docs:
if doc_id == next_doc_id:
content = next_content
insertable.append((doc_id, rev, content, gen, trans_id))
else:
break
- except StopIteration:
- break
return insertable
def count_docs_in_sync_db(self, encrypted=None):
@@ -955,9 +951,9 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
sql = "SELECT COUNT(*) FROM %s" % (self.TABLE_NAME,)
if encrypted is not None:
sql += " WHERE encrypted = %d" % int(encrypted)
- res = self._sync_db.select(sql)
- if res is not None:
- val = res.next()
+ res = self._fetchall(sql)
+ if res:
+ val = res.pop()
return val[0]
else:
return 0
@@ -1035,4 +1031,10 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
Empty the received docs table of the sync database.
"""
sql = "DELETE FROM %s WHERE 1" % (self.TABLE_NAME,)
- res = self._sync_db.execute(sql)
+ self._sync_db.execute(sql)
+
+ def _fetchall(self, *args, **kwargs):
+ with self._sync_db:
+ c = self._sync_db.cursor()
+ c.execute(*args, **kwargs)
+ return c.fetchall()
diff --git a/client/src/leap/soledad/client/examples/README b/client/src/leap/soledad/client/examples/README
new file mode 100644
index 00000000..3aed8377
--- /dev/null
+++ b/client/src/leap/soledad/client/examples/README
@@ -0,0 +1,4 @@
+Right now, you can find here both an example of use
+and the benchmarking scripts.
+TODO move benchmark scripts to root scripts/ folder,
+and leave here only a minimal example.
diff --git a/client/src/leap/soledad/client/examples/benchmarks/.gitignore b/client/src/leap/soledad/client/examples/benchmarks/.gitignore
new file mode 100644
index 00000000..2211df63
--- /dev/null
+++ b/client/src/leap/soledad/client/examples/benchmarks/.gitignore
@@ -0,0 +1 @@
+*.txt
diff --git a/client/src/leap/soledad/client/examples/benchmarks/get_sample.sh b/client/src/leap/soledad/client/examples/benchmarks/get_sample.sh
new file mode 100755
index 00000000..1995eee1
--- /dev/null
+++ b/client/src/leap/soledad/client/examples/benchmarks/get_sample.sh
@@ -0,0 +1,3 @@
+#!/bin/sh
+mkdir tmp
+wget http://www.gutenberg.org/cache/epub/101/pg101.txt -O hacker_crackdown.txt
diff --git a/client/src/leap/soledad/client/examples/benchmarks/measure_index_times.py b/client/src/leap/soledad/client/examples/benchmarks/measure_index_times.py
new file mode 100644
index 00000000..7fa1e38f
--- /dev/null
+++ b/client/src/leap/soledad/client/examples/benchmarks/measure_index_times.py
@@ -0,0 +1,177 @@
+# -*- coding: utf-8 -*-
+# measure_index_times.py
+# Copyright (C) 2014 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Measure u1db retrieval times for different u1db index situations.
+"""
+from __future__ import print_function
+from functools import partial
+import datetime
+import hashlib
+import os
+import sys
+
+import u1db
+from twisted.internet import defer, reactor
+
+from leap.soledad.client import adbapi
+from leap.soledad.client.sqlcipher import SQLCipherOptions
+
+
+folder = os.environ.get("TMPDIR", "tmp")
+numdocs = int(os.environ.get("DOCS", "1000"))
+silent = os.environ.get("SILENT", False)
+tmpdb = os.path.join(folder, "test.soledad")
+
+
+sample_file = os.environ.get("SAMPLE", "hacker_crackdown.txt")
+sample_path = os.path.join(os.curdir, sample_file)
+
+try:
+ with open(sample_file) as f:
+ SAMPLE = f.readlines()
+except Exception:
+ print("[!] Problem opening sample file. Did you download "
+ "the sample, or correctly set 'SAMPLE' env var?")
+ sys.exit(1)
+
+if numdocs > len(SAMPLE):
+ print("[!] Sorry! The requested DOCS number is larger than "
+ "the num of lines in our sample file")
+ sys.exit(1)
+
+
+def debug(*args):
+ if not silent:
+ print(*args)
+
+debug("[+] db path:", tmpdb)
+debug("[+] num docs", numdocs)
+
+if os.path.isfile(tmpdb):
+ debug("[+] Removing existing db file...")
+ os.remove(tmpdb)
+
+start_time = datetime.datetime.now()
+
+opts = SQLCipherOptions(tmpdb, "secret", create=True)
+dbpool = adbapi.getConnectionPool(opts)
+
+
+def createDoc(doc):
+ return dbpool.runU1DBQuery("create_doc", doc)
+
+db_indexes = {
+ 'by-chash': ['chash'],
+ 'by-number': ['number']}
+
+
+def create_indexes(_):
+ deferreds = []
+ for index, definition in db_indexes.items():
+ d = dbpool.runU1DBQuery("create_index", index, *definition)
+ deferreds.append(d)
+ return defer.gatherResults(deferreds)
+
+
+class TimeWitness(object):
+ def __init__(self, init_time):
+ self.init_time = init_time
+
+ def get_time_count(self):
+ return datetime.datetime.now() - self.init_time
+
+
+def get_from_index(_):
+ init_time = datetime.datetime.now()
+ debug("GETTING FROM INDEX...", init_time)
+
+ def printValue(res, time):
+ print("RESULT->", res)
+ print("Index Query Took: ", time.get_time_count())
+ return res
+
+ d = dbpool.runU1DBQuery(
+ "get_from_index", "by-chash",
+ #"1150c7f10fabce0a57ce13071349fc5064f15bdb0cc1bf2852f74ef3f103aff5")
+ # XXX this is line 89 from the hacker crackdown...
+ # Should accept any other optional hash as an enviroment variable.
+ "57793320d4997a673fc7062652da0596c36a4e9fbe31310d2281e67d56d82469")
+ d.addCallback(printValue, TimeWitness(init_time))
+ return d
+
+
+def getAllDocs():
+ return dbpool.runU1DBQuery("get_all_docs")
+
+
+def errBack(e):
+ debug("[!] ERROR FOUND!!!")
+ e.printTraceback()
+ reactor.stop()
+
+
+def countDocs(_):
+ debug("counting docs...")
+ d = getAllDocs()
+ d.addCallbacks(printResult, errBack)
+ d.addCallbacks(allDone, errBack)
+ return d
+
+
+def printResult(r, **kwargs):
+ if kwargs:
+ debug(*kwargs.values())
+ elif isinstance(r, u1db.Document):
+ debug(r.doc_id, r.content['number'])
+ else:
+ len_results = len(r[1])
+ debug("GOT %s results" % len(r[1]))
+
+ if len_results == numdocs:
+ debug("ALL GOOD")
+ else:
+ debug("[!] MISSING DOCS!!!!!")
+ raise ValueError("We didn't expect this result len")
+
+
+def allDone(_):
+ debug("ALL DONE!")
+
+ #if silent:
+ end_time = datetime.datetime.now()
+ print((end_time - start_time).total_seconds())
+ reactor.stop()
+
+
+def insert_docs(_):
+ deferreds = []
+ for i in range(numdocs):
+ payload = SAMPLE[i]
+ chash = hashlib.sha256(payload).hexdigest()
+ doc = {"number": i, "payload": payload, 'chash': chash}
+ d = createDoc(doc)
+ d.addCallbacks(partial(printResult, i=i, chash=chash, payload=payload),
+ lambda e: e.printTraceback())
+ deferreds.append(d)
+ return defer.gatherResults(deferreds, consumeErrors=True)
+
+d = create_indexes(None)
+d.addCallback(insert_docs)
+d.addCallback(get_from_index)
+d.addCallback(countDocs)
+
+reactor.run()
diff --git a/client/src/leap/soledad/client/examples/benchmarks/measure_index_times_custom_docid.py b/client/src/leap/soledad/client/examples/benchmarks/measure_index_times_custom_docid.py
new file mode 100644
index 00000000..c6d76e6b
--- /dev/null
+++ b/client/src/leap/soledad/client/examples/benchmarks/measure_index_times_custom_docid.py
@@ -0,0 +1,177 @@
+# -*- coding: utf-8 -*-
+# measure_index_times.py
+# Copyright (C) 2014 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Measure u1db retrieval times for different u1db index situations.
+"""
+from __future__ import print_function
+from functools import partial
+import datetime
+import hashlib
+import os
+import sys
+
+import u1db
+from twisted.internet import defer, reactor
+
+from leap.soledad.client import adbapi
+from leap.soledad.client.sqlcipher import SQLCipherOptions
+
+
+folder = os.environ.get("TMPDIR", "tmp")
+numdocs = int(os.environ.get("DOCS", "1000"))
+silent = os.environ.get("SILENT", False)
+tmpdb = os.path.join(folder, "test.soledad")
+
+
+sample_file = os.environ.get("SAMPLE", "hacker_crackdown.txt")
+sample_path = os.path.join(os.curdir, sample_file)
+
+try:
+ with open(sample_file) as f:
+ SAMPLE = f.readlines()
+except Exception:
+ print("[!] Problem opening sample file. Did you download "
+ "the sample, or correctly set 'SAMPLE' env var?")
+ sys.exit(1)
+
+if numdocs > len(SAMPLE):
+ print("[!] Sorry! The requested DOCS number is larger than "
+ "the num of lines in our sample file")
+ sys.exit(1)
+
+
+def debug(*args):
+ if not silent:
+ print(*args)
+
+debug("[+] db path:", tmpdb)
+debug("[+] num docs", numdocs)
+
+if os.path.isfile(tmpdb):
+ debug("[+] Removing existing db file...")
+ os.remove(tmpdb)
+
+start_time = datetime.datetime.now()
+
+opts = SQLCipherOptions(tmpdb, "secret", create=True)
+dbpool = adbapi.getConnectionPool(opts)
+
+
+def createDoc(doc, doc_id):
+ return dbpool.runU1DBQuery("create_doc", doc, doc_id=doc_id)
+
+db_indexes = {
+ 'by-chash': ['chash'],
+ 'by-number': ['number']}
+
+
+def create_indexes(_):
+ deferreds = []
+ for index, definition in db_indexes.items():
+ d = dbpool.runU1DBQuery("create_index", index, *definition)
+ deferreds.append(d)
+ return defer.gatherResults(deferreds)
+
+
+class TimeWitness(object):
+ def __init__(self, init_time):
+ self.init_time = init_time
+
+ def get_time_count(self):
+ return datetime.datetime.now() - self.init_time
+
+
+def get_from_index(_):
+ init_time = datetime.datetime.now()
+ debug("GETTING FROM INDEX...", init_time)
+
+ def printValue(res, time):
+ print("RESULT->", res)
+ print("Index Query Took: ", time.get_time_count())
+ return res
+
+ d = dbpool.runU1DBQuery(
+ "get_doc",
+ #"1150c7f10fabce0a57ce13071349fc5064f15bdb0cc1bf2852f74ef3f103aff5")
+ # XXX this is line 89 from the hacker crackdown...
+ # Should accept any other optional hash as an enviroment variable.
+ "57793320d4997a673fc7062652da0596c36a4e9fbe31310d2281e67d56d82469")
+ d.addCallback(printValue, TimeWitness(init_time))
+ return d
+
+
+def getAllDocs():
+ return dbpool.runU1DBQuery("get_all_docs")
+
+
+def errBack(e):
+ debug("[!] ERROR FOUND!!!")
+ e.printTraceback()
+ reactor.stop()
+
+
+def countDocs(_):
+ debug("counting docs...")
+ d = getAllDocs()
+ d.addCallbacks(printResult, errBack)
+ d.addCallbacks(allDone, errBack)
+ return d
+
+
+def printResult(r, **kwargs):
+ if kwargs:
+ debug(*kwargs.values())
+ elif isinstance(r, u1db.Document):
+ debug(r.doc_id, r.content['number'])
+ else:
+ len_results = len(r[1])
+ debug("GOT %s results" % len(r[1]))
+
+ if len_results == numdocs:
+ debug("ALL GOOD")
+ else:
+ debug("[!] MISSING DOCS!!!!!")
+ raise ValueError("We didn't expect this result len")
+
+
+def allDone(_):
+ debug("ALL DONE!")
+
+ #if silent:
+ end_time = datetime.datetime.now()
+ print((end_time - start_time).total_seconds())
+ reactor.stop()
+
+
+def insert_docs(_):
+ deferreds = []
+ for i in range(numdocs):
+ payload = SAMPLE[i]
+ chash = hashlib.sha256(payload).hexdigest()
+ doc = {"number": i, "payload": payload, 'chash': chash}
+ d = createDoc(doc, doc_id=chash)
+ d.addCallbacks(partial(printResult, i=i, chash=chash, payload=payload),
+ lambda e: e.printTraceback())
+ deferreds.append(d)
+ return defer.gatherResults(deferreds, consumeErrors=True)
+
+d = create_indexes(None)
+d.addCallback(insert_docs)
+d.addCallback(get_from_index)
+d.addCallback(countDocs)
+
+reactor.run()
diff --git a/client/src/leap/soledad/client/examples/compare.txt b/client/src/leap/soledad/client/examples/compare.txt
new file mode 100644
index 00000000..19a1325a
--- /dev/null
+++ b/client/src/leap/soledad/client/examples/compare.txt
@@ -0,0 +1,8 @@
+TIMES=100 TMPDIR=/media/sdb5/leap python use_adbapi.py 1.34s user 0.16s system 53% cpu 2.832 total
+TIMES=100 TMPDIR=/media/sdb5/leap python use_api.py 1.22s user 0.14s system 62% cpu 2.181 total
+
+TIMES=1000 TMPDIR=/media/sdb5/leap python use_api.py 2.18s user 0.34s system 27% cpu 9.213 total
+TIMES=1000 TMPDIR=/media/sdb5/leap python use_adbapi.py 2.40s user 0.34s system 39% cpu 7.004 total
+
+TIMES=5000 TMPDIR=/media/sdb5/leap python use_api.py 6.63s user 1.27s system 13% cpu 57.882 total
+TIMES=5000 TMPDIR=/media/sdb5/leap python use_adbapi.py 6.84s user 1.26s system 36% cpu 22.367 total
diff --git a/client/src/leap/soledad/client/examples/manifest.phk b/client/src/leap/soledad/client/examples/manifest.phk
new file mode 100644
index 00000000..2c86c07d
--- /dev/null
+++ b/client/src/leap/soledad/client/examples/manifest.phk
@@ -0,0 +1,50 @@
+The Hacker's Manifesto
+
+The Hacker's Manifesto
+by: The Mentor
+
+Another one got caught today, it's all over the papers. "Teenager
+Arrested in Computer Crime Scandal", "Hacker Arrested after Bank
+Tampering." "Damn kids. They're all alike." But did you, in your
+three-piece psychology and 1950's technobrain, ever take a look behind
+the eyes of the hacker? Did you ever wonder what made him tick, what
+forces shaped him, what may have molded him? I am a hacker, enter my
+world. Mine is a world that begins with school. I'm smarter than most of
+the other kids, this crap they teach us bores me. "Damn underachiever.
+They're all alike." I'm in junior high or high school. I've listened to
+teachers explain for the fifteenth time how to reduce a fraction. I
+understand it. "No, Ms. Smith, I didn't show my work. I did it in
+my head." "Damn kid. Probably copied it. They're all alike." I made a
+discovery today. I found a computer. Wait a second, this is cool. It does
+what I want it to. If it makes a mistake, it's because I screwed it up.
+Not because it doesn't like me, or feels threatened by me, or thinks I'm
+a smart ass, or doesn't like teaching and shouldn't be here. Damn kid.
+All he does is play games. They're all alike. And then it happened... a
+door opened to a world... rushing through the phone line like heroin
+through an addict's veins, an electronic pulse is sent out, a refuge from
+the day-to-day incompetencies is sought... a board is found. "This is
+it... this is where I belong..." I know everyone here... even if I've
+never met them, never talked to them, may never hear from them again... I
+know you all... Damn kid. Tying up the phone line again. They're all
+alike... You bet your ass we're all alike... we've been spoon-fed baby
+food at school when we hungered for steak... the bits of meat that you
+did let slip through were pre-chewed and tasteless. We've been dominated
+by sadists, or ignored by the apathetic. The few that had something to
+teach found us willing pupils, but those few are like drops of water in
+the desert. This is our world now... the world of the electron and the
+switch, the beauty of the baud. We make use of a service already existing
+without paying for what could be dirt-cheap if it wasn't run by
+profiteering gluttons, and you call us criminals. We explore... and you
+call us criminals. We seek after knowledge... and you call us criminals.
+We exist without skin color, without nationality, without religious
+bias... and you call us criminals. You build atomic bombs, you wage wars,
+you murder, cheat, and lie to us and try to make us believe it's for our
+own good, yet we're the criminals. Yes, I am a criminal. My crime is that
+of curiosity. My crime is that of judging people by what they say and
+think, not what they look like. My crime is that of outsmarting you,
+something that you will never forgive me for. I am a hacker, and this is
+my manifesto. You may stop this individual, but you can't stop us all...
+after all, we're all alike.
+
+This was the last published file written by The Mentor. Shortly after
+releasing it, he was busted by the FBI. The Mentor, sadly missed.
diff --git a/client/src/leap/soledad/client/examples/plot-async-db.py b/client/src/leap/soledad/client/examples/plot-async-db.py
new file mode 100644
index 00000000..018a1a1d
--- /dev/null
+++ b/client/src/leap/soledad/client/examples/plot-async-db.py
@@ -0,0 +1,45 @@
+import csv
+from matplotlib import pyplot as plt
+
+FILE = "bench.csv"
+
+# config the plot
+plt.xlabel('number of inserts')
+plt.ylabel('time (seconds)')
+plt.title('SQLCipher parallelization')
+
+kwargs = {
+ 'linewidth': 1.0,
+ 'linestyle': '-',
+}
+
+series = (('sync', 'r'),
+ ('async', 'g'))
+
+data = {'mark': [],
+ 'sync': [],
+ 'async': []}
+
+with open(FILE, 'rb') as csvfile:
+ series_reader = csv.reader(csvfile, delimiter=',')
+ for m, s, a in series_reader:
+ data['mark'].append(int(m))
+ data['sync'].append(float(s))
+ data['async'].append(float(a))
+
+xmax = max(data['mark'])
+xmin = min(data['mark'])
+ymax = max(data['sync'] + data['async'])
+ymin = min(data['sync'] + data['async'])
+
+for run in series:
+ name = run[0]
+ color = run[1]
+ plt.plot(data['mark'], data[name], label=name, color=color, **kwargs)
+
+plt.axes().annotate("", xy=(xmax, ymax))
+plt.axes().annotate("", xy=(xmin, ymin))
+
+plt.grid()
+plt.legend()
+plt.show()
diff --git a/client/src/leap/soledad/client/examples/run_benchmark.py b/client/src/leap/soledad/client/examples/run_benchmark.py
new file mode 100644
index 00000000..a112cf45
--- /dev/null
+++ b/client/src/leap/soledad/client/examples/run_benchmark.py
@@ -0,0 +1,28 @@
+"""
+Run a mini-benchmark between regular api and dbapi
+"""
+import commands
+import os
+import time
+
+TMPDIR = os.environ.get("TMPDIR", "/tmp")
+CSVFILE = 'bench.csv'
+
+cmd = "SILENT=1 TIMES={times} TMPDIR={tmpdir} python ./use_{version}api.py"
+
+parse_time = lambda r: r.split('\n')[-1]
+
+
+with open(CSVFILE, 'w') as log:
+
+ for times in range(0, 10000, 500):
+ cmd1 = cmd.format(times=times, tmpdir=TMPDIR, version="")
+ sync_time = parse_time(commands.getoutput(cmd1))
+
+ cmd2 = cmd.format(times=times, tmpdir=TMPDIR, version="adb")
+ async_time = parse_time(commands.getoutput(cmd2))
+
+ print times, sync_time, async_time
+ log.write("%s, %s, %s\n" % (times, sync_time, async_time))
+ log.flush()
+ time.sleep(2)
diff --git a/client/src/leap/soledad/client/examples/soledad_sync.py b/client/src/leap/soledad/client/examples/soledad_sync.py
new file mode 100644
index 00000000..6d0f6595
--- /dev/null
+++ b/client/src/leap/soledad/client/examples/soledad_sync.py
@@ -0,0 +1,65 @@
+from leap.bitmask.config.providerconfig import ProviderConfig
+from leap.bitmask.crypto.srpauth import SRPAuth
+from leap.soledad.client import Soledad
+
+import logging
+logging.basicConfig(level=logging.DEBUG)
+
+
+# EDIT THIS --------------------------------------------
+user = u"USERNAME"
+uuid = u"USERUUID"
+_pass = u"USERPASS"
+server_url = "https://soledad.server.example.org:2323"
+# EDIT THIS --------------------------------------------
+
+secrets_path = "/tmp/%s.secrets" % uuid
+local_db_path = "/tmp/%s.soledad" % uuid
+cert_file = "/tmp/cacert.pem"
+provider_config = '/tmp/cdev.json'
+
+
+provider = ProviderConfig()
+provider.load(provider_config)
+
+soledad = None
+
+
+def printStuff(r):
+ print r
+
+
+def printErr(err):
+ logging.exception(err.value)
+
+
+def init_soledad(_):
+ token = srpauth.get_token()
+ print "token", token
+
+ global soledad
+ soledad = Soledad(uuid, _pass, secrets_path, local_db_path,
+ server_url, cert_file,
+ auth_token=token, defer_encryption=False)
+
+ def getall(_):
+ d = soledad.get_all_docs()
+ return d
+
+ d1 = soledad.create_doc({"test": 42})
+ d1.addCallback(getall)
+ d1.addCallbacks(printStuff, printErr)
+
+ d2 = soledad.sync()
+ d2.addCallbacks(printStuff, printErr)
+ d2.addBoth(lambda r: reactor.stop())
+
+
+srpauth = SRPAuth(provider)
+
+d = srpauth.authenticate(user, _pass)
+d.addCallbacks(init_soledad, printErr)
+
+
+from twisted.internet import reactor
+reactor.run()
diff --git a/client/src/leap/soledad/client/examples/use_adbapi.py b/client/src/leap/soledad/client/examples/use_adbapi.py
new file mode 100644
index 00000000..d7bd21f2
--- /dev/null
+++ b/client/src/leap/soledad/client/examples/use_adbapi.py
@@ -0,0 +1,103 @@
+# -*- coding: utf-8 -*-
+# use_adbapi.py
+# Copyright (C) 2014 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Example of use of the asynchronous soledad api.
+"""
+from __future__ import print_function
+import datetime
+import os
+
+import u1db
+from twisted.internet import defer, reactor
+
+from leap.soledad.client import adbapi
+from leap.soledad.client.sqlcipher import SQLCipherOptions
+
+
+folder = os.environ.get("TMPDIR", "tmp")
+times = int(os.environ.get("TIMES", "1000"))
+silent = os.environ.get("SILENT", False)
+
+tmpdb = os.path.join(folder, "test.soledad")
+
+
+def debug(*args):
+ if not silent:
+ print(*args)
+
+debug("[+] db path:", tmpdb)
+debug("[+] times", times)
+
+if os.path.isfile(tmpdb):
+ debug("[+] Removing existing db file...")
+ os.remove(tmpdb)
+
+start_time = datetime.datetime.now()
+
+opts = SQLCipherOptions(tmpdb, "secret", create=True)
+dbpool = adbapi.getConnectionPool(opts)
+
+
+def createDoc(doc):
+ return dbpool.runU1DBQuery("create_doc", doc)
+
+
+def getAllDocs():
+ return dbpool.runU1DBQuery("get_all_docs")
+
+
+def countDocs(_):
+ debug("counting docs...")
+ d = getAllDocs()
+ d.addCallbacks(printResult, lambda e: e.printTraceback())
+ d.addBoth(allDone)
+
+
+def printResult(r):
+ if isinstance(r, u1db.Document):
+ debug(r.doc_id, r.content['number'])
+ else:
+ len_results = len(r[1])
+ debug("GOT %s results" % len(r[1]))
+
+ if len_results == times:
+ debug("ALL GOOD")
+ else:
+ raise ValueError("We didn't expect this result len")
+
+
+def allDone(_):
+ debug("ALL DONE!")
+ if silent:
+ end_time = datetime.datetime.now()
+ print((end_time - start_time).total_seconds())
+ reactor.stop()
+
+deferreds = []
+payload = open('manifest.phk').read()
+
+for i in range(times):
+ doc = {"number": i, "payload": payload}
+ d = createDoc(doc)
+ d.addCallbacks(printResult, lambda e: e.printTraceback())
+ deferreds.append(d)
+
+
+all_done = defer.gatherResults(deferreds, consumeErrors=True)
+all_done.addCallback(countDocs)
+
+reactor.run()
diff --git a/client/src/leap/soledad/client/examples/use_api.py b/client/src/leap/soledad/client/examples/use_api.py
new file mode 100644
index 00000000..e2501c98
--- /dev/null
+++ b/client/src/leap/soledad/client/examples/use_api.py
@@ -0,0 +1,67 @@
+# -*- coding: utf-8 -*-
+# use_api.py
+# Copyright (C) 2014 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Example of use of the soledad api.
+"""
+from __future__ import print_function
+import datetime
+import os
+
+from leap.soledad.client import sqlcipher
+from leap.soledad.client.sqlcipher import SQLCipherOptions
+
+
+folder = os.environ.get("TMPDIR", "tmp")
+times = int(os.environ.get("TIMES", "1000"))
+silent = os.environ.get("SILENT", False)
+
+tmpdb = os.path.join(folder, "test.soledad")
+
+
+def debug(*args):
+ if not silent:
+ print(*args)
+
+debug("[+] db path:", tmpdb)
+debug("[+] times", times)
+
+if os.path.isfile(tmpdb):
+ debug("[+] Removing existing db file...")
+ os.remove(tmpdb)
+
+start_time = datetime.datetime.now()
+
+opts = SQLCipherOptions(tmpdb, "secret", create=True)
+db = sqlcipher.SQLCipherDatabase(opts)
+
+
+def allDone():
+ debug("ALL DONE!")
+
+payload = open('manifest.phk').read()
+
+for i in range(times):
+ doc = {"number": i, "payload": payload}
+ d = db.create_doc(doc)
+ debug(d.doc_id, d.content['number'])
+
+debug("Count", len(db.get_all_docs()[1]))
+if silent:
+ end_time = datetime.datetime.now()
+ print((end_time - start_time).total_seconds())
+
+allDone()
diff --git a/client/src/leap/soledad/client/interfaces.py b/client/src/leap/soledad/client/interfaces.py
new file mode 100644
index 00000000..4f7b0779
--- /dev/null
+++ b/client/src/leap/soledad/client/interfaces.py
@@ -0,0 +1,362 @@
+# -*- coding: utf-8 -*-
+# interfaces.py
+# Copyright (C) 2014 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Interfaces used by the Soledad Client.
+"""
+from zope.interface import Interface, Attribute
+
+
+class ILocalStorage(Interface):
+ """
+ I implement core methods for the u1db local storage of documents and
+ indexes.
+ """
+ local_db_path = Attribute(
+ "The path for the local database replica")
+ local_db_file_name = Attribute(
+ "The name of the local SQLCipher U1DB database file")
+ uuid = Attribute("The user uuid")
+ default_prefix = Attribute(
+ "Prefix for default values for path")
+
+ def put_doc(self, doc):
+ """
+ Update a document in the local encrypted database.
+
+ :param doc: the document to update
+ :type doc: SoledadDocument
+
+ :return:
+ a deferred that will fire with the new revision identifier for
+ the document
+ :rtype: Deferred
+ """
+
+ def delete_doc(self, doc):
+ """
+ Delete a document from the local encrypted database.
+
+ :param doc: the document to delete
+ :type doc: SoledadDocument
+
+ :return:
+ a deferred that will fire with ...
+ :rtype: Deferred
+ """
+
+ def get_doc(self, doc_id, include_deleted=False):
+ """
+ Retrieve a document from the local encrypted database.
+
+ :param doc_id: the unique document identifier
+ :type doc_id: str
+ :param include_deleted:
+ if True, deleted documents will be returned with empty content;
+ otherwise asking for a deleted document will return None
+ :type include_deleted: bool
+
+ :return:
+ A deferred that will fire with the document object, containing a
+ SoledadDocument, or None if it could not be found
+ :rtype: Deferred
+ """
+
+ def get_docs(self, doc_ids, check_for_conflicts=True,
+ include_deleted=False):
+ """
+ Get the content for many documents.
+
+ :param doc_ids: a list of document identifiers
+ :type doc_ids: list
+ :param check_for_conflicts: if set False, then the conflict check will
+ be skipped, and 'None' will be returned instead of True/False
+ :type check_for_conflicts: bool
+
+ :return:
+ A deferred that will fire with an iterable giving the Document
+ object for each document id in matching doc_ids order.
+ :rtype: Deferred
+ """
+
+ def get_all_docs(self, include_deleted=False):
+ """
+ Get the JSON content for all documents in the database.
+
+ :param include_deleted: If set to True, deleted documents will be
+ returned with empty content. Otherwise deleted
+ documents will not be included in the results.
+ :return:
+ A deferred that will fire with (generation, [Document]): that is,
+ the current generation of the database, followed by a list of all
+ the documents in the database.
+ :rtype: Deferred
+ """
+
+ def create_doc(self, content, doc_id=None):
+ """
+ Create a new document in the local encrypted database.
+
+ :param content: the contents of the new document
+ :type content: dict
+ :param doc_id: an optional identifier specifying the document id
+ :type doc_id: str
+
+ :return:
+ A deferred tht will fire with the new document (SoledadDocument
+ instance).
+ :rtype: Deferred
+ """
+
+ def create_doc_from_json(self, json, doc_id=None):
+ """
+ Create a new document.
+
+ You can optionally specify the document identifier, but the document
+ must not already exist. See 'put_doc' if you want to override an
+ existing document.
+ If the database specifies a maximum document size and the document
+ exceeds it, create will fail and raise a DocumentTooBig exception.
+
+ :param json: The JSON document string
+ :type json: str
+ :param doc_id: An optional identifier specifying the document id.
+ :type doc_id:
+ :return:
+ A deferred that will fire with the new document (A SoledadDocument
+ instance)
+ :rtype: Deferred
+ """
+
+ def create_index(self, index_name, *index_expressions):
+ """
+ Create an named index, which can then be queried for future lookups.
+ Creating an index which already exists is not an error, and is cheap.
+ Creating an index which does not match the index_expressions of the
+ existing index is an error.
+ Creating an index will block until the expressions have been evaluated
+ and the index generated.
+
+ :param index_name: A unique name which can be used as a key prefix
+ :type index_name: str
+ :param index_expressions:
+ index expressions defining the index information.
+ :type index_expressions: dict
+
+ Examples:
+
+ "fieldname", or "fieldname.subfieldname" to index alphabetically
+ sorted on the contents of a field.
+
+ "number(fieldname, width)", "lower(fieldname)"
+ """
+
+ def delete_index(self, index_name):
+ """
+ Remove a named index.
+
+ :param index_name: The name of the index we are removing
+ :type index_name: str
+ """
+
+ def list_indexes(self):
+ """
+ List the definitions of all known indexes.
+
+ :return: A list of [('index-name', ['field', 'field2'])] definitions.
+ :rtype: Deferred
+ """
+
+ def get_from_index(self, index_name, *key_values):
+ """
+ Return documents that match the keys supplied.
+
+ You must supply exactly the same number of values as have been defined
+ in the index. It is possible to do a prefix match by using '*' to
+ indicate a wildcard match. You can only supply '*' to trailing entries,
+ (eg 'val', '*', '*' is allowed, but '*', 'val', 'val' is not.)
+ It is also possible to append a '*' to the last supplied value (eg
+ 'val*', '*', '*' or 'val', 'val*', '*', but not 'val*', 'val', '*')
+
+ :param index_name: The index to query
+ :type index_name: str
+ :param key_values: values to match. eg, if you have
+ an index with 3 fields then you would have:
+ get_from_index(index_name, val1, val2, val3)
+ :type key_values: tuple
+ :return: List of [Document]
+ :rtype: list
+ """
+
+ def get_count_from_index(self, index_name, *key_values):
+ """
+ Return the count of the documents that match the keys and
+ values supplied.
+
+ :param index_name: The index to query
+ :type index_name: str
+ :param key_values: values to match. eg, if you have
+ an index with 3 fields then you would have:
+ get_from_index(index_name, val1, val2, val3)
+ :type key_values: tuple
+ :return: count.
+ :rtype: int
+ """
+
+ def get_range_from_index(self, index_name, start_value, end_value):
+ """
+ Return documents that fall within the specified range.
+
+ Both ends of the range are inclusive. For both start_value and
+ end_value, one must supply exactly the same number of values as have
+ been defined in the index, or pass None. In case of a single column
+ index, a string is accepted as an alternative for a tuple with a single
+ value. It is possible to do a prefix match by using '*' to indicate
+ a wildcard match. You can only supply '*' to trailing entries, (eg
+ 'val', '*', '*' is allowed, but '*', 'val', 'val' is not.) It is also
+ possible to append a '*' to the last supplied value (eg 'val*', '*',
+ '*' or 'val', 'val*', '*', but not 'val*', 'val', '*')
+
+ :param index_name: The index to query
+ :type index_name: str
+ :param start_values: tuples of values that define the lower bound of
+ the range. eg, if you have an index with 3 fields then you would
+ have: (val1, val2, val3)
+ :type start_values: tuple
+ :param end_values: tuples of values that define the upper bound of the
+ range. eg, if you have an index with 3 fields then you would have:
+ (val1, val2, val3)
+ :type end_values: tuple
+ :return: A deferred that will fire with a list of [Document]
+ :rtype: Deferred
+ """
+
+ def get_index_keys(self, index_name):
+ """
+ Return all keys under which documents are indexed in this index.
+
+ :param index_name: The index to query
+ :type index_name: str
+ :return:
+ A deferred that will fire with a list of tuples of indexed keys.
+ :rtype: Deferred
+ """
+
+ def get_doc_conflicts(self, doc_id):
+ """
+ Get the list of conflicts for the given document.
+
+ :param doc_id: the document id
+ :type doc_id: str
+
+ :return:
+ A deferred that will fire with a list of the document entries that
+ are conflicted.
+ :rtype: Deferred
+ """
+
+ def resolve_doc(self, doc, conflicted_doc_revs):
+ """
+ Mark a document as no longer conflicted.
+
+ :param doc: a document with the new content to be inserted.
+ :type doc: SoledadDocument
+ :param conflicted_doc_revs:
+ A deferred that will fire with a list of revisions that the new
+ content supersedes.
+ :type conflicted_doc_revs: list
+ """
+
+
+class ISyncableStorage(Interface):
+ """
+ I implement methods to synchronize with a remote replica.
+ """
+ replica_uid = Attribute("The uid of the local replica")
+ syncing = Attribute(
+ "Property, True if the syncer is syncing.")
+ token = Attribute("The authentication Token.")
+
+ def sync(self, defer_decryption=True):
+ """
+ Synchronize the local encrypted replica with a remote replica.
+
+ This method blocks until a syncing lock is acquired, so there are no
+ attempts of concurrent syncs from the same client replica.
+
+ :param url: the url of the target replica to sync with
+ :type url: str
+
+ :param defer_decryption:
+ Whether to defer the decryption process using the intermediate
+ database. If False, decryption will be done inline.
+ :type defer_decryption: bool
+
+ :return:
+ A deferred that will fire with the local generation before the
+ synchronisation was performed.
+ :rtype: str
+ """
+
+ def stop_sync(self):
+ """
+ Stop the current syncing process.
+ """
+
+
+class ISecretsStorage(Interface):
+ """
+ I implement methods needed for initializing and accessing secrets, that are
+ synced against the Shared Recovery Database.
+ """
+ secrets_file_name = Attribute(
+ "The name of the file where the storage secrets will be stored")
+
+ storage_secret = Attribute("")
+ remote_storage_secret = Attribute("")
+ shared_db = Attribute("The shared db object")
+
+ # XXX this used internally from secrets, so it might be good to preserve
+ # as a public boundary with other components.
+
+ # We should also probably document its interface.
+ secrets = Attribute("A SoledadSecrets object containing access to secrets")
+
+ def init_shared_db(self, server_url, uuid, creds):
+ """
+ Initialize the shared recovery database.
+
+ :param server_url:
+ :type server_url:
+ :param uuid:
+ :type uuid:
+ :param creds:
+ :type creds:
+ """
+
+ def change_passphrase(self, new_passphrase):
+ """
+ Change the passphrase that encrypts the storage secret.
+
+ :param new_passphrase: The new passphrase.
+ :type new_passphrase: unicode
+
+ :raise NoStorageSecret: Raised if there's no storage secret available.
+ """
+
+ # XXX not in use. Uncomment if we ever decide to allow
+ # multiple secrets.
+ # secret_id = Attribute("The id of the storage secret to be used")
diff --git a/client/src/leap/soledad/client/mp_safe_db.py b/client/src/leap/soledad/client/mp_safe_db.py
deleted file mode 100644
index 780b7153..00000000
--- a/client/src/leap/soledad/client/mp_safe_db.py
+++ /dev/null
@@ -1,112 +0,0 @@
-# -*- coding: utf-8 -*-
-# mp_safe_db.py
-# Copyright (C) 2014 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-
-"""
-Multiprocessing-safe SQLite database.
-"""
-
-
-from threading import Thread
-from Queue import Queue
-from pysqlcipher import dbapi2
-
-
-# Thanks to http://code.activestate.com/recipes/526618/
-
-class MPSafeSQLiteDB(Thread):
- """
- A multiprocessing-safe SQLite database accessor.
- """
-
- CLOSE = "--close--"
- NO_MORE = "--no more--"
-
- def __init__(self, db_path):
- """
- Initialize the process
- """
- Thread.__init__(self)
- self._db_path = db_path
- self._requests = Queue()
- self.start()
-
- def run(self):
- """
- Run the multiprocessing-safe database accessor.
- """
- conn = dbapi2.connect(self._db_path)
- while True:
- req, arg, res = self._requests.get()
- if req == self.CLOSE:
- break
- with conn:
- cursor = conn.cursor()
- cursor.execute(req, arg)
- if res:
- for rec in cursor.fetchall():
- res.put(rec)
- res.put(self.NO_MORE)
- conn.close()
-
- def execute(self, req, arg=None, res=None):
- """
- Execute a request on the database.
-
- :param req: The request to be executed.
- :type req: str
- :param arg: The arguments for the request.
- :type arg: tuple
- :param res: A queue to write request results.
- :type res: multiprocessing.Queue
- """
- self._requests.put((req, arg or tuple(), res))
-
- def select(self, req, arg=None):
- """
- Run a select query on the database and yield results.
-
- :param req: The request to be executed.
- :type req: str
- :param arg: The arguments for the request.
- :type arg: tuple
- """
- res = Queue()
- self.execute(req, arg, res)
- while True:
- rec=res.get()
- if rec == self.NO_MORE:
- break
- yield rec
-
- def close(self):
- """
- Close the database connection.
- """
- self.execute(self.CLOSE)
- self.join()
-
- def cursor(self):
- """
- Return a fake cursor object.
-
- Not really a cursor, but allows for calling db.cursor().execute().
-
- :return: Self.
- :rtype: MPSafeSQLiteDatabase
- """
- return self
diff --git a/client/src/leap/soledad/client/pragmas.py b/client/src/leap/soledad/client/pragmas.py
new file mode 100644
index 00000000..2e9c53a3
--- /dev/null
+++ b/client/src/leap/soledad/client/pragmas.py
@@ -0,0 +1,336 @@
+# -*- coding: utf-8 -*-
+# pragmas.py
+# Copyright (C) 2013, 2014 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Different pragmas used in the initialization of the SQLCipher database.
+"""
+import logging
+import string
+
+logger = logging.getLogger(__name__)
+
+
+def set_crypto_pragmas(db_handle, sqlcipher_opts):
+ """
+ Set cryptographic params (key, cipher, KDF number of iterations and
+ cipher page size).
+
+ :param db_handle:
+ :type db_handle:
+ :param sqlcipher_opts: options for the SQLCipherDatabase
+ :type sqlcipher_opts: SQLCipherOpts instance
+ """
+ # XXX assert CryptoOptions
+ opts = sqlcipher_opts
+ _set_key(db_handle, opts.key, opts.is_raw_key)
+ _set_cipher(db_handle, opts.cipher)
+ _set_kdf_iter(db_handle, opts.kdf_iter)
+ _set_cipher_page_size(db_handle, opts.cipher_page_size)
+
+
+def _set_key(db_handle, key, is_raw_key):
+ """
+ Set the ``key`` for use with the database.
+
+ The process of creating a new, encrypted database is called 'keying'
+ the database. SQLCipher uses just-in-time key derivation at the point
+ it is first needed for an operation. This means that the key (and any
+ options) must be set before the first operation on the database. As
+ soon as the database is touched (e.g. SELECT, CREATE TABLE, UPDATE,
+ etc.) and pages need to be read or written, the key is prepared for
+ use.
+
+ Implementation Notes:
+
+ * PRAGMA key should generally be called as the first operation on a
+ database.
+
+ :param key: The key for use with the database.
+ :type key: str
+ :param is_raw_key:
+ Whether C{key} is a raw 64-char hex string or a passphrase that should
+ be hashed to obtain the encyrption key.
+ :type is_raw_key: bool
+ """
+ if is_raw_key:
+ _set_key_raw(db_handle, key)
+ else:
+ _set_key_passphrase(db_handle, key)
+
+
+def _set_key_passphrase(db_handle, passphrase):
+ """
+ Set a passphrase for encryption key derivation.
+
+ The key itself can be a passphrase, which is converted to a key using
+ PBKDF2 key derivation. The result is used as the encryption key for
+ the database. By using this method, there is no way to alter the KDF;
+ if you want to do so you should use a raw key instead and derive the
+ key using your own KDF.
+
+ :param db_handle: A handle to the SQLCipher database.
+ :type db_handle: pysqlcipher.Connection
+ :param passphrase: The passphrase used to derive the encryption key.
+ :type passphrase: str
+ """
+ db_handle.cursor().execute("PRAGMA key = '%s'" % passphrase)
+
+
+def _set_key_raw(db_handle, key):
+ """
+ Set a raw hexadecimal encryption key.
+
+ It is possible to specify an exact byte sequence using a blob literal.
+ With this method, it is the calling application's responsibility to
+ ensure that the data provided is a 64 character hex string, which will
+ be converted directly to 32 bytes (256 bits) of key data.
+
+ :param db_handle: A handle to the SQLCipher database.
+ :type db_handle: pysqlcipher.Connection
+ :param key: A 64 character hex string.
+ :type key: str
+ """
+ if not all(c in string.hexdigits for c in key):
+ raise NotAnHexString(key)
+ db_handle.cursor().execute('PRAGMA key = "x\'%s"' % key)
+
+
+def _set_cipher(db_handle, cipher='aes-256-cbc'):
+ """
+ Set the cipher and mode to use for symmetric encryption.
+
+ SQLCipher uses aes-256-cbc as the default cipher and mode of
+ operation. It is possible to change this, though not generally
+ recommended, using PRAGMA cipher.
+
+ SQLCipher makes direct use of libssl, so all cipher options available
+ to libssl are also available for use with SQLCipher. See `man enc` for
+ OpenSSL's supported ciphers.
+
+ Implementation Notes:
+
+ * PRAGMA cipher must be called after PRAGMA key and before the first
+ actual database operation or it will have no effect.
+
+ * If a non-default value is used PRAGMA cipher to create a database,
+ it must also be called every time that database is opened.
+
+ * SQLCipher does not implement its own encryption. Instead it uses the
+ widely available and peer-reviewed OpenSSL libcrypto for all
+ cryptographic functions.
+
+ :param db_handle: A handle to the SQLCipher database.
+ :type db_handle: pysqlcipher.Connection
+ :param cipher: The cipher and mode to use.
+ :type cipher: str
+ """
+ db_handle.cursor().execute("PRAGMA cipher = '%s'" % cipher)
+
+
+def _set_kdf_iter(db_handle, kdf_iter=4000):
+ """
+ Set the number of iterations for the key derivation function.
+
+ SQLCipher uses PBKDF2 key derivation to strengthen the key and make it
+ resistent to brute force and dictionary attacks. The default
+ configuration uses 4000 PBKDF2 iterations (effectively 16,000 SHA1
+ operations). PRAGMA kdf_iter can be used to increase or decrease the
+ number of iterations used.
+
+ Implementation Notes:
+
+ * PRAGMA kdf_iter must be called after PRAGMA key and before the first
+ actual database operation or it will have no effect.
+
+ * If a non-default value is used PRAGMA kdf_iter to create a database,
+ it must also be called every time that database is opened.
+
+ * It is not recommended to reduce the number of iterations if a
+ passphrase is in use.
+
+ :param db_handle: A handle to the SQLCipher database.
+ :type db_handle: pysqlcipher.Connection
+ :param kdf_iter: The number of iterations to use.
+ :type kdf_iter: int
+ """
+ db_handle.cursor().execute("PRAGMA kdf_iter = '%d'" % kdf_iter)
+
+
+def _set_cipher_page_size(db_handle, cipher_page_size=1024):
+ """
+ Set the page size of the encrypted database.
+
+ SQLCipher 2 introduced the new PRAGMA cipher_page_size that can be
+ used to adjust the page size for the encrypted database. The default
+ page size is 1024 bytes, but it can be desirable for some applications
+ to use a larger page size for increased performance. For instance,
+ some recent testing shows that increasing the page size can noticeably
+ improve performance (5-30%) for certain queries that manipulate a
+ large number of pages (e.g. selects without an index, large inserts in
+ a transaction, big deletes).
+
+ To adjust the page size, call the pragma immediately after setting the
+ key for the first time and each subsequent time that you open the
+ database.
+
+ Implementation Notes:
+
+ * PRAGMA cipher_page_size must be called after PRAGMA key and before
+ the first actual database operation or it will have no effect.
+
+ * If a non-default value is used PRAGMA cipher_page_size to create a
+ database, it must also be called every time that database is opened.
+
+ :param db_handle: A handle to the SQLCipher database.
+ :type db_handle: pysqlcipher.Connection
+ :param cipher_page_size: The page size.
+ :type cipher_page_size: int
+ """
+ db_handle.cursor().execute(
+ "PRAGMA cipher_page_size = '%d'" % cipher_page_size)
+
+
+# XXX UNUSED ?
+def set_rekey(db_handle, new_key, is_raw_key):
+ """
+ Change the key of an existing encrypted database.
+
+ To change the key on an existing encrypted database, it must first be
+ unlocked with the current encryption key. Once the database is
+ readable and writeable, PRAGMA rekey can be used to re-encrypt every
+ page in the database with a new key.
+
+ * PRAGMA rekey must be called after PRAGMA key. It can be called at any
+ time once the database is readable.
+
+ * PRAGMA rekey can not be used to encrypted a standard SQLite
+ database! It is only useful for changing the key on an existing
+ database.
+
+ * Previous versions of SQLCipher provided a PRAGMA rekey_cipher and
+ code>PRAGMA rekey_kdf_iter. These are deprecated and should not be
+ used. Instead, use sqlcipher_export().
+
+ :param db_handle: A handle to the SQLCipher database.
+ :type db_handle: pysqlcipher.Connection
+ :param new_key: The new key.
+ :type new_key: str
+ :param is_raw_key: Whether C{password} is a raw 64-char hex string or a
+ passphrase that should be hashed to obtain the encyrption
+ key.
+ :type is_raw_key: bool
+ """
+ if is_raw_key:
+ _set_rekey_raw(db_handle, new_key)
+ else:
+ _set_rekey_passphrase(db_handle, new_key)
+
+
+def _set_rekey_passphrase(db_handle, passphrase):
+ """
+ Change the passphrase for encryption key derivation.
+
+ The key itself can be a passphrase, which is converted to a key using
+ PBKDF2 key derivation. The result is used as the encryption key for
+ the database.
+
+ :param db_handle: A handle to the SQLCipher database.
+ :type db_handle: pysqlcipher.Connection
+ :param passphrase: The passphrase used to derive the encryption key.
+ :type passphrase: str
+ """
+ db_handle.cursor().execute("PRAGMA rekey = '%s'" % passphrase)
+
+
+def _set_rekey_raw(db_handle, key):
+ """
+ Change the raw hexadecimal encryption key.
+
+ It is possible to specify an exact byte sequence using a blob literal.
+ With this method, it is the calling application's responsibility to
+ ensure that the data provided is a 64 character hex string, which will
+ be converted directly to 32 bytes (256 bits) of key data.
+
+ :param db_handle: A handle to the SQLCipher database.
+ :type db_handle: pysqlcipher.Connection
+ :param key: A 64 character hex string.
+ :type key: str
+ """
+ if not all(c in string.hexdigits for c in key):
+ raise NotAnHexString(key)
+ db_handle.cursor().execute('PRAGMA rekey = "x\'%s"' % key)
+
+
+def set_synchronous_off(db_handle):
+ """
+ Change the setting of the "synchronous" flag to OFF.
+ """
+ logger.debug("SQLCIPHER: SETTING SYNCHRONOUS OFF")
+ db_handle.cursor().execute('PRAGMA synchronous=OFF')
+
+
+def set_synchronous_normal(db_handle):
+ """
+ Change the setting of the "synchronous" flag to NORMAL.
+ """
+ logger.debug("SQLCIPHER: SETTING SYNCHRONOUS NORMAL")
+ db_handle.cursor().execute('PRAGMA synchronous=NORMAL')
+
+
+def set_mem_temp_store(db_handle):
+ """
+ Use a in-memory store for temporary tables.
+ """
+ logger.debug("SQLCIPHER: SETTING TEMP_STORE MEMORY")
+ db_handle.cursor().execute('PRAGMA temp_store=MEMORY')
+
+
+def set_write_ahead_logging(db_handle):
+ """
+ Enable write-ahead logging, and set the autocheckpoint to 50 pages.
+
+ Setting the autocheckpoint to a small value, we make the reads not
+ suffer too much performance degradation.
+
+ From the sqlite docs:
+
+ "There is a tradeoff between average read performance and average write
+ performance. To maximize the read performance, one wants to keep the
+ WAL as small as possible and hence run checkpoints frequently, perhaps
+ as often as every COMMIT. To maximize write performance, one wants to
+ amortize the cost of each checkpoint over as many writes as possible,
+ meaning that one wants to run checkpoints infrequently and let the WAL
+ grow as large as possible before each checkpoint. The decision of how
+ often to run checkpoints may therefore vary from one application to
+ another depending on the relative read and write performance
+ requirements of the application. The default strategy is to run a
+ checkpoint once the WAL reaches 1000 pages"
+ """
+ logger.debug("SQLCIPHER: SETTING WRITE-AHEAD LOGGING")
+ db_handle.cursor().execute('PRAGMA journal_mode=WAL')
+
+ # The optimum value can still use a little bit of tuning, but we favor
+ # small sizes of the WAL file to get fast reads, since we assume that
+ # the writes will be quick enough to not block too much.
+
+ db_handle.cursor().execute('PRAGMA wal_autocheckpoint=50')
+
+
+class NotAnHexString(Exception):
+ """
+ Raised when trying to (raw) key the database with a non-hex string.
+ """
+ pass
diff --git a/client/src/leap/soledad/client/secrets.py b/client/src/leap/soledad/client/secrets.py
index 970ac82f..af781a26 100644
--- a/client/src/leap/soledad/client/secrets.py
+++ b/client/src/leap/soledad/client/secrets.py
@@ -132,6 +132,7 @@ class SoledadSecrets(object):
UUID_KEY = 'uuid'
STORAGE_SECRETS_KEY = 'storage_secrets'
+ ACTIVE_SECRET_KEY = 'active_secret'
SECRET_KEY = 'secret'
CIPHER_KEY = 'cipher'
LENGTH_KEY = 'length'
@@ -144,8 +145,7 @@ class SoledadSecrets(object):
Keys used to access storage secrets in recovery documents.
"""
- def __init__(self, uuid, passphrase, secrets_path, shared_db, crypto,
- secret_id=None):
+ def __init__(self, uuid, passphrase, secrets_path, shared_db, crypto):
"""
Initialize the secrets manager.
@@ -161,17 +161,20 @@ class SoledadSecrets(object):
:type shared_db: leap.soledad.client.shared_db.SoledadSharedDatabase
:param crypto: A soledad crypto object.
:type crypto: SoledadCrypto
- :param secret_id: The id of the storage secret to be used.
- :type secret_id: str
"""
+ # XXX removed since not in use
+ # We will pick the first secret available.
+ # param secret_id: The id of the storage secret to be used.
+
self._uuid = uuid
self._passphrase = passphrase
self._secrets_path = secrets_path
self._shared_db = shared_db
self._crypto = crypto
- self._secret_id = secret_id
self._secrets = {}
+ self._secret_id = None
+
def bootstrap(self):
"""
Bootstrap secrets.
@@ -247,7 +250,8 @@ class SoledadSecrets(object):
try:
self._load_secrets() # try to load from disk
except IOError as e:
- logger.warning('IOError while loading secrets from disk: %s' % str(e))
+ logger.warning(
+ 'IOError while loading secrets from disk: %s' % str(e))
return False
return self.storage_secret is not None
@@ -262,10 +266,13 @@ class SoledadSecrets(object):
content = None
with open(self._secrets_path, 'r') as f:
content = json.loads(f.read())
- _, mac = self._import_recovery_document(content)
+ _, mac, active_secret = self._import_recovery_document(content)
# choose first secret if no secret_id was given
if self._secret_id is None:
- self.set_secret_id(self._secrets.items()[0][0])
+ if active_secret is None:
+ self.set_secret_id(self._secrets.items()[0][0])
+ else:
+ self.set_secret_id(active_secret)
# enlarge secret if needed
enlarged = False
if len(self._secrets[self._secret_id]) < self.GEN_SECRET_LENGTH:
@@ -286,18 +293,24 @@ class SoledadSecrets(object):
:raises BootstrapSequenceError: Raised when unable to store secrets in
shared database.
"""
- doc = self._get_secrets_from_shared_db()
+ if self._shared_db.syncable:
+ doc = self._get_secrets_from_shared_db()
+ else:
+ doc = None
- if doc:
+ if doc is not None:
logger.info(
'Found cryptographic secrets in shared recovery '
'database.')
- _, mac = self._import_recovery_document(doc.content)
+ _, mac, active_secret = self._import_recovery_document(doc.content)
if mac is False:
self.put_secrets_in_shared_db()
self._store_secrets() # save new secrets in local file
if self._secret_id is None:
- self.set_secret_id(self._secrets.items()[0][0])
+ if active_secret is None:
+ self.set_secret_id(self._secrets.items()[0][0])
+ else:
+ self.set_secret_id(active_secret)
else:
# STAGE 3 - there are no secrets in server also, so
# generate a secret and store it in remote db.
@@ -305,21 +318,24 @@ class SoledadSecrets(object):
'No cryptographic secrets found, creating new '
' secrets...')
self.set_secret_id(self._gen_secret())
- try:
- self._put_secrets_in_shared_db()
- except Exception as ex:
- # storing generated secret in shared db failed for
- # some reason, so we erase the generated secret and
- # raise.
+
+ if self._shared_db.syncable:
try:
- os.unlink(self._secrets_path)
- except OSError as e:
- if e.errno != errno.ENOENT: # no such file or directory
- logger.exception(e)
- logger.exception(ex)
- raise BootstrapSequenceError(
- 'Could not store generated secret in the shared '
- 'database, bailing out...')
+ self._put_secrets_in_shared_db()
+ except Exception as ex:
+ # storing generated secret in shared db failed for
+ # some reason, so we erase the generated secret and
+ # raise.
+ try:
+ os.unlink(self._secrets_path)
+ except OSError as e:
+ if e.errno != errno.ENOENT:
+ # no such file or directory
+ logger.exception(e)
+ logger.exception(ex)
+ raise BootstrapSequenceError(
+ 'Could not store generated secret in the shared '
+ 'database, bailing out...')
#
# Shared DB related methods
@@ -354,6 +370,7 @@ class SoledadSecrets(object):
'secret': '<encrypted storage_secret>',
},
},
+ 'active_secret': '<secret_id>',
'kdf': 'scrypt',
'kdf_salt': '<b64 repr of salt>',
'kdf_length: <key length>,
@@ -379,13 +396,14 @@ class SoledadSecrets(object):
# create the recovery document
data = {
self.STORAGE_SECRETS_KEY: encrypted_secrets,
+ self.ACTIVE_SECRET_KEY: self._secret_id,
self.KDF_KEY: self.KDF_SCRYPT,
self.KDF_SALT_KEY: binascii.b2a_base64(salt),
self.KDF_LENGTH_KEY: len(key),
crypto.MAC_METHOD_KEY: crypto.MacMethods.HMAC,
crypto.MAC_KEY: hmac.new(
key,
- json.dumps(encrypted_secrets),
+ json.dumps(encrypted_secrets, sort_keys=True),
sha256).hexdigest(),
}
return data
@@ -401,8 +419,9 @@ class SoledadSecrets(object):
:param data: The recovery document.
:type data: dict
- :return: A tuple containing the number of imported secrets and whether
- there was MAC informationa available for authenticating.
+ :return: A tuple containing the number of imported secrets, whether
+ there was MAC information available for authenticating, and
+ the secret_id of the last active secret.
:rtype: (int, bool)
"""
soledad_assert(self.STORAGE_SECRETS_KEY in data)
@@ -421,7 +440,8 @@ class SoledadSecrets(object):
buflen=32)
mac = hmac.new(
key,
- json.dumps(data[self.STORAGE_SECRETS_KEY]),
+ json.dumps(
+ data[self.STORAGE_SECRETS_KEY], sort_keys=True),
sha256).hexdigest()
else:
raise crypto.UnknownMacMethodError('Unknown MAC method: %s.' %
@@ -431,7 +451,13 @@ class SoledadSecrets(object):
'contents.')
# include secrets in the secret pool.
secret_count = 0
- for secret_id, encrypted_secret in data[self.STORAGE_SECRETS_KEY].items():
+ secrets = data[self.STORAGE_SECRETS_KEY].items()
+ active_secret = None
+ # XXX remove check for existence of key (included for backwards
+ # compatibility)
+ if self.ACTIVE_SECRET_KEY in data:
+ active_secret = data[self.ACTIVE_SECRET_KEY]
+ for secret_id, encrypted_secret in secrets:
if secret_id not in self._secrets:
try:
self._secrets[secret_id] = \
@@ -440,7 +466,7 @@ class SoledadSecrets(object):
except SecretsException as e:
logger.error("Failed to decrypt storage secret: %s"
% str(e))
- return secret_count, mac
+ return secret_count, mac, active_secret
def _get_secrets_from_shared_db(self):
"""
@@ -661,8 +687,8 @@ class SoledadSecrets(object):
self._secrets_path = secrets_path
secrets_path = property(
- _get_secrets_path,
- _set_secrets_path,
+ _get_secrets_path,
+ _set_secrets_path,
doc='The path for the file containing the encrypted symmetric secret.')
@property
@@ -686,7 +712,7 @@ class SoledadSecrets(object):
Return the secret for remote storage.
"""
key_start = 0
- key_end = self.REMOTE_STORAGE_SECRET_LENGTH
+ key_end = self.REMOTE_STORAGE_SECRET_LENGTH
return self.storage_secret[key_start:key_end]
#
@@ -700,8 +726,10 @@ class SoledadSecrets(object):
:return: The local storage secret.
:rtype: str
"""
- pwd_start = self.REMOTE_STORAGE_SECRET_LENGTH + self.SALT_LENGTH
- pwd_end = self.REMOTE_STORAGE_SECRET_LENGTH + self.LOCAL_STORAGE_SECRET_LENGTH
+ secret_len = self.REMOTE_STORAGE_SECRET_LENGTH
+ lsecret_len = self.LOCAL_STORAGE_SECRET_LENGTH
+ pwd_start = secret_len + self.SALT_LENGTH
+ pwd_end = secret_len + lsecret_len
return self.storage_secret[pwd_start:pwd_end]
def _get_local_storage_salt(self):
@@ -728,9 +756,9 @@ class SoledadSecrets(object):
buflen=32, # we need a key with 256 bits (32 bytes)
)
- #
- # sync db key
- #
+ #
+ # sync db key
+ #
def _get_sync_db_salt(self):
"""
diff --git a/client/src/leap/soledad/client/shared_db.py b/client/src/leap/soledad/client/shared_db.py
index 52e51c6f..f1a2642e 100644
--- a/client/src/leap/soledad/client/shared_db.py
+++ b/client/src/leap/soledad/client/shared_db.py
@@ -14,19 +14,11 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-
"""
A shared database for storing/retrieving encrypted key material.
"""
-
-import simplejson as json
-
-
from u1db.remote import http_database
-
-from leap.soledad.common import SHARED_DB_LOCK_DOC_ID_PREFIX
from leap.soledad.client.auth import TokenBasedAuth
@@ -34,6 +26,9 @@ from leap.soledad.client.auth import TokenBasedAuth
# Soledad shared database
# ----------------------------------------------------------------------------
+# TODO could have a hierarchy of soledad exceptions.
+
+
class NoTokenForAuth(Exception):
"""
No token was found for token-based authentication.
@@ -46,6 +41,12 @@ class Unauthorized(Exception):
"""
+class ImproperlyConfiguredError(Exception):
+ """
+ Wrong parameters in the database configuration.
+ """
+
+
class SoledadSharedDatabase(http_database.HTTPDatabase, TokenBasedAuth):
"""
This is a shared recovery database that enables users to store their
@@ -54,6 +55,10 @@ class SoledadSharedDatabase(http_database.HTTPDatabase, TokenBasedAuth):
# TODO: prevent client from messing with the shared DB.
# TODO: define and document API.
+ # If syncable is False, the database will not attempt to sync against
+ # a remote replica. Default is True.
+ syncable = True
+
#
# Token auth methods.
#
@@ -90,9 +95,7 @@ class SoledadSharedDatabase(http_database.HTTPDatabase, TokenBasedAuth):
#
@staticmethod
- def open_database(url, uuid, create, creds=None):
- # TODO: users should not be able to create the shared database, so we
- # have to remove this from here in the future.
+ def open_database(url, uuid, creds=None, syncable=True):
"""
Open a Soledad shared database.
@@ -100,17 +103,23 @@ class SoledadSharedDatabase(http_database.HTTPDatabase, TokenBasedAuth):
:type url: str
:param uuid: The user's unique id.
:type uuid: str
- :param create: Should the database be created if it does not already
- exist?
- :type create: bool
- :param token: An authentication token for accessing the shared db.
- :type token: str
+ :param creds: A tuple containing the authentication method and
+ credentials.
+ :type creds: tuple
+ :param syncable:
+ If syncable is False, the database will not attempt to sync against
+ a remote replica.
+ :type syncable: bool
:return: The shared database in the given url.
:rtype: SoledadSharedDatabase
"""
+ # XXX fix below, doesn't work with tests.
+ #if syncable and not url.startswith('https://'):
+ # raise ImproperlyConfiguredError(
+ # "Remote soledad server must be an https URI")
db = SoledadSharedDatabase(url, uuid, creds=creds)
- db.open(create)
+ db.syncable = syncable
return db
@staticmethod
@@ -153,9 +162,12 @@ class SoledadSharedDatabase(http_database.HTTPDatabase, TokenBasedAuth):
:raise HTTPError: Raised if any HTTP error occurs.
"""
- res, headers = self._request_json('PUT', ['lock', self._uuid],
- body={})
- return res['token'], res['timeout']
+ if self.syncable:
+ res, headers = self._request_json(
+ 'PUT', ['lock', self._uuid], body={})
+ return res['token'], res['timeout']
+ else:
+ return None, None
def unlock(self, token):
"""
@@ -166,5 +178,6 @@ class SoledadSharedDatabase(http_database.HTTPDatabase, TokenBasedAuth):
:raise HTTPError:
"""
- res, headers = self._request_json('DELETE', ['lock', self._uuid],
- params={'token': token})
+ if self.syncable:
+ _, _ = self._request_json(
+ 'DELETE', ['lock', self._uuid], params={'token': token})
diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py
index 45629045..91821c25 100644
--- a/client/src/leap/soledad/client/sqlcipher.py
+++ b/client/src/leap/soledad/client/sqlcipher.py
@@ -44,120 +44,178 @@ handled by Soledad should be created by SQLCipher >= 2.0.
import logging
import multiprocessing
import os
-import string
import threading
-import time
import json
+import u1db
+
+from u1db import errors as u1db_errors
+from u1db.backends import sqlite_backend
from hashlib import sha256
from contextlib import contextmanager
from collections import defaultdict
from httplib import CannotSendRequest
-from pysqlcipher import dbapi2
-from u1db.backends import sqlite_backend
-from u1db import errors as u1db_errors
-from taskthread import TimerTask
+from pysqlcipher import dbapi2 as sqlcipher_dbapi2
-from leap.soledad.client.crypto import SyncEncrypterPool, SyncDecrypterPool
+from twisted.internet import reactor
+from twisted.internet.task import LoopingCall
+from twisted.internet.threads import deferToThreadPool
+from twisted.python.threadpool import ThreadPool
+from twisted.python import log
+
+from leap.soledad.client import crypto
from leap.soledad.client.target import SoledadSyncTarget
from leap.soledad.client.target import PendingReceivedDocsSyncError
from leap.soledad.client.sync import SoledadSynchronizer
-from leap.soledad.client.mp_safe_db import MPSafeSQLiteDB
+
+from leap.soledad.client import pragmas
from leap.soledad.common import soledad_assert
from leap.soledad.common.document import SoledadDocument
logger = logging.getLogger(__name__)
+
# Monkey-patch u1db.backends.sqlite_backend with pysqlcipher.dbapi2
-sqlite_backend.dbapi2 = dbapi2
+sqlite_backend.dbapi2 = sqlcipher_dbapi2
-# It seems that, as long as we are not using old sqlite versions, serialized
-# mode is enabled by default at compile time. So accessing db connections from
-# different threads should be safe, as long as no attempt is made to use them
-# from multiple threads with no locking.
-# See https://sqlite.org/threadsafe.html
-# and http://bugs.python.org/issue16509
-SQLITE_CHECK_SAME_THREAD = False
+def initialize_sqlcipher_db(opts, on_init=None, check_same_thread=True):
+ """
+ Initialize a SQLCipher database.
-# We set isolation_level to None to setup autocommit mode.
-# See: http://docs.python.org/2/library/sqlite3.html#controlling-transactions
-# This avoids problems with sequential operations using the same soledad object
-# trying to open new transactions
-# (The error was:
-# OperationalError:cannot start a transaction within a transaction.)
-SQLITE_ISOLATION_LEVEL = None
+ :param opts:
+ :type opts: SQLCipherOptions
+ :param on_init: a tuple of queries to be executed on initialization
+ :type on_init: tuple
+ :return: pysqlcipher.dbapi2.Connection
+ """
+ # Note: There seemed to be a bug in sqlite 3.5.9 (with python2.6)
+ # where without re-opening the database on Windows, it
+ # doesn't see the transaction that was just committed
+ # Removing from here now, look at the pysqlite implementation if the
+ # bug shows up in windows.
+ if not os.path.isfile(opts.path) and not opts.create:
+ raise u1db_errors.DatabaseDoesNotExist()
-def open(path, password, create=True, document_factory=None, crypto=None,
- raw_key=False, cipher='aes-256-cbc', kdf_iter=4000,
- cipher_page_size=1024, defer_encryption=False, sync_db_key=None):
- """
- Open a database at the given location.
-
- *** IMPORTANT ***
-
- Don't forget to close the database after use by calling the close()
- method otherwise some resources might not be freed and you may experience
- several kinds of leakages.
-
- *** IMPORTANT ***
-
- Will raise u1db.errors.DatabaseDoesNotExist if create=False and the
- database does not already exist.
-
- :param path: The filesystem path for the database to open.
- :type path: str
- :param create: True/False, should the database be created if it doesn't
- already exist?
- :param create: bool
- :param document_factory: A function that will be called with the same
- parameters as Document.__init__.
- :type document_factory: callable
- :param crypto: An instance of SoledadCrypto so we can encrypt/decrypt
- document contents when syncing.
- :type crypto: soledad.crypto.SoledadCrypto
- :param raw_key: Whether C{password} is a raw 64-char hex string or a
- passphrase that should be hashed to obtain the encyrption key.
- :type raw_key: bool
- :param cipher: The cipher and mode to use.
- :type cipher: str
- :param kdf_iter: The number of iterations to use.
- :type kdf_iter: int
- :param cipher_page_size: The page size.
- :type cipher_page_size: int
- :param defer_encryption: Whether to defer encryption/decryption of
- documents, or do it inline while syncing.
- :type defer_encryption: bool
-
- :return: An instance of Database.
- :rtype SQLCipherDatabase
- """
- return SQLCipherDatabase.open_database(
- path, password, create=create, document_factory=document_factory,
- crypto=crypto, raw_key=raw_key, cipher=cipher, kdf_iter=kdf_iter,
- cipher_page_size=cipher_page_size, defer_encryption=defer_encryption,
- sync_db_key=sync_db_key)
+ conn = sqlcipher_dbapi2.connect(
+ opts.path, check_same_thread=check_same_thread)
+ set_init_pragmas(conn, opts, extra_queries=on_init)
+ return conn
-#
-# Exceptions
-#
+_db_init_lock = threading.Lock()
-class DatabaseIsNotEncrypted(Exception):
+
+def set_init_pragmas(conn, opts=None, extra_queries=None):
"""
- Exception raised when trying to open non-encrypted databases.
+ Set the initialization pragmas.
+
+ This includes the crypto pragmas, and any other options that must
+ be passed early to sqlcipher db.
"""
- pass
+ soledad_assert(opts is not None)
+ extra_queries = [] if extra_queries is None else extra_queries
+ with _db_init_lock:
+ # only one execution path should initialize the db
+ _set_init_pragmas(conn, opts, extra_queries)
+
+
+def _set_init_pragmas(conn, opts, extra_queries):
+
+ sync_off = os.environ.get('LEAP_SQLITE_NOSYNC')
+ memstore = os.environ.get('LEAP_SQLITE_MEMSTORE')
+ nowal = os.environ.get('LEAP_SQLITE_NOWAL')
+
+ pragmas.set_crypto_pragmas(conn, opts)
+ if not nowal:
+ pragmas.set_write_ahead_logging(conn)
+ if sync_off:
+ pragmas.set_synchronous_off(conn)
+ else:
+ pragmas.set_synchronous_normal(conn)
+ if memstore:
+ pragmas.set_mem_temp_store(conn)
-class NotAnHexString(Exception):
+ for query in extra_queries:
+ conn.cursor().execute(query)
+
+
+class SQLCipherOptions(object):
"""
- Raised when trying to (raw) key the database with a non-hex string.
+ A container with options for the initialization of an SQLCipher database.
"""
- pass
+
+ @classmethod
+ def copy(cls, source, path=None, key=None, create=None,
+ is_raw_key=None, cipher=None, kdf_iter=None, cipher_page_size=None,
+ defer_encryption=None, sync_db_key=None):
+ """
+ Return a copy of C{source} with parameters different than None
+ replaced by new values.
+ """
+ return SQLCipherOptions(
+ path if path else source.path,
+ key if key else source.key,
+ create=create if create else source.create,
+ is_raw_key=is_raw_key if is_raw_key else source.is_raw_key,
+ cipher=cipher if cipher else source.cipher,
+ kdf_iter=kdf_iter if kdf_iter else source.kdf_iter,
+ cipher_page_size=cipher_page_size if cipher_page_size else source.cipher_page_size,
+ defer_encryption=defer_encryption if defer_encryption else source.defer_encryption,
+ sync_db_key=sync_db_key if sync_db_key else source.sync_db_key)
+
+ def __init__(self, path, key, create=True, is_raw_key=False,
+ cipher='aes-256-cbc', kdf_iter=4000, cipher_page_size=1024,
+ defer_encryption=False, sync_db_key=None):
+ """
+ :param path: The filesystem path for the database to open.
+ :type path: str
+ :param create:
+ True/False, should the database be created if it doesn't
+ already exist?
+ :param create: bool
+ :param is_raw_key:
+ Whether ``password`` is a raw 64-char hex string or a passphrase
+ that should be hashed to obtain the encyrption key.
+ :type raw_key: bool
+ :param cipher: The cipher and mode to use.
+ :type cipher: str
+ :param kdf_iter: The number of iterations to use.
+ :type kdf_iter: int
+ :param cipher_page_size: The page size.
+ :type cipher_page_size: int
+ :param defer_encryption:
+ Whether to defer encryption/decryption of documents, or do it
+ inline while syncing.
+ :type defer_encryption: bool
+ """
+ self.path = path
+ self.key = key
+ self.is_raw_key = is_raw_key
+ self.create = create
+ self.cipher = cipher
+ self.kdf_iter = kdf_iter
+ self.cipher_page_size = cipher_page_size
+ self.defer_encryption = defer_encryption
+ self.sync_db_key = sync_db_key
+
+ def __str__(self):
+ """
+ Return string representation of options, for easy debugging.
+
+ :return: String representation of options.
+ :rtype: str
+ """
+ attr_names = filter(lambda a: not a.startswith('_'), dir(self))
+ attr_str = []
+ for a in attr_names:
+ attr_str.append(a + "=" + str(getattr(self, a)))
+ name = self.__class__.__name__
+ return "%s(%s)" % (name, ', '.join(attr_str))
#
@@ -170,11 +228,219 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
"""
defer_encryption = False
+ # The attribute _index_storage_value will be used as the lookup key for the
+ # implementation of the SQLCipher storage backend.
_index_storage_value = 'expand referenced encrypted'
- k_lock = threading.Lock()
- create_doc_lock = threading.Lock()
- update_indexes_lock = threading.Lock()
- _sync_watcher = None
+
+ def __init__(self, opts):
+ """
+ Connect to an existing SQLCipher database, creating a new sqlcipher
+ database file if needed.
+
+ *** IMPORTANT ***
+
+ Don't forget to close the database after use by calling the close()
+ method otherwise some resources might not be freed and you may
+ experience several kinds of leakages.
+
+ *** IMPORTANT ***
+
+ :param opts: options for initialization of the SQLCipher database.
+ :type opts: SQLCipherOptions
+ """
+ # ensure the db is encrypted if the file already exists
+ if os.path.isfile(opts.path):
+ _assert_db_is_encrypted(opts)
+
+ # connect to the sqlcipher database
+ self._db_handle = initialize_sqlcipher_db(opts)
+
+ # TODO ---------------------------------------------------
+ # Everything else in this initialization has to be factored
+ # out, so it can be used from SoledadSQLCipherWrapper.__init__
+ # too.
+ # ---------------------------------------------------------
+
+ self._ensure_schema()
+ self.set_document_factory(soledad_doc_factory)
+ self._prime_replica_uid()
+
+ def _prime_replica_uid(self):
+ """
+ In the u1db implementation, _replica_uid is a property
+ that returns the value in _real_replica_uid, and does
+ a db query if no value found.
+ Here we prime the replica uid during initialization so
+ that we don't have to wait for the query afterwards.
+ """
+ self._real_replica_uid = None
+ self._get_replica_uid()
+
+ def _extra_schema_init(self, c):
+ """
+ Add any extra fields, etc to the basic table definitions.
+
+ This method is called by u1db.backends.sqlite_backend._initialize()
+ method, which is executed when the database schema is created. Here,
+ we use it to include the "syncable" property for LeapDocuments.
+
+ :param c: The cursor for querying the database.
+ :type c: dbapi2.cursor
+ """
+ c.execute(
+ 'ALTER TABLE document '
+ 'ADD COLUMN syncable BOOL NOT NULL DEFAULT TRUE')
+
+ #
+ # Document operations
+ #
+
+ def put_doc(self, doc):
+ """
+ Overwrite the put_doc method, to enqueue the modified document for
+ encryption before sync.
+
+ :param doc: The document to be put.
+ :type doc: u1db.Document
+
+ :return: The new document revision.
+ :rtype: str
+ """
+ doc_rev = sqlite_backend.SQLitePartialExpandDatabase.put_doc(self, doc)
+
+ # TODO XXX move to API XXX
+ if self.defer_encryption:
+ self.sync_queue.put_nowait(doc)
+ return doc_rev
+
+ #
+ # SQLCipher API methods
+ #
+
+ # Extra query methods: extensions to the base u1db sqlite implmentation.
+
+ def get_count_from_index(self, index_name, *key_values):
+ """
+ Return the count for a given combination of index_name
+ and key values.
+
+ Extension method made from similar methods in u1db version 13.09
+
+ :param index_name: The index to query
+ :type index_name: str
+ :param key_values: values to match. eg, if you have
+ an index with 3 fields then you would have:
+ get_from_index(index_name, val1, val2, val3)
+ :type key_values: tuple
+ :return: count.
+ :rtype: int
+ """
+ c = self._db_handle.cursor()
+ definition = self._get_index_definition(index_name)
+
+ if len(key_values) != len(definition):
+ raise u1db_errors.InvalidValueForIndex()
+ tables = ["document_fields d%d" % i for i in range(len(definition))]
+ novalue_where = ["d.doc_id = d%d.doc_id"
+ " AND d%d.field_name = ?"
+ % (i, i) for i in range(len(definition))]
+ exact_where = [novalue_where[i]
+ + (" AND d%d.value = ?" % (i,))
+ for i in range(len(definition))]
+ args = []
+ where = []
+ for idx, (field, value) in enumerate(zip(definition, key_values)):
+ args.append(field)
+ where.append(exact_where[idx])
+ args.append(value)
+
+ tables = ["document_fields d%d" % i for i in range(len(definition))]
+ statement = (
+ "SELECT COUNT(*) FROM document d, %s WHERE %s " % (
+ ', '.join(tables),
+ ' AND '.join(where),
+ ))
+ try:
+ c.execute(statement, tuple(args))
+ except sqlcipher_dbapi2.OperationalError, e:
+ raise sqlcipher_dbapi2.OperationalError(
+ str(e) + '\nstatement: %s\nargs: %s\n' % (statement, args))
+ res = c.fetchall()
+ return res[0][0]
+
+ def close(self):
+ """
+ Close db connections.
+ """
+ # TODO should be handled by adbapi instead
+ # TODO syncdb should be stopped first
+
+ if logger is not None: # logger might be none if called from __del__
+ logger.debug("SQLCipher backend: closing")
+
+ # close the actual database
+ if getattr(self, '_db_handle', False):
+ self._db_handle.close()
+ self._db_handle = None
+
+ # indexes
+
+ def _put_and_update_indexes(self, old_doc, doc):
+ """
+ Update a document and all indexes related to it.
+
+ :param old_doc: The old version of the document.
+ :type old_doc: u1db.Document
+ :param doc: The new version of the document.
+ :type doc: u1db.Document
+ """
+ sqlite_backend.SQLitePartialExpandDatabase._put_and_update_indexes(
+ self, old_doc, doc)
+ c = self._db_handle.cursor()
+ c.execute('UPDATE document SET syncable=? WHERE doc_id=?',
+ (doc.syncable, doc.doc_id))
+
+ def _get_doc(self, doc_id, check_for_conflicts=False):
+ """
+ Get just the document content, without fancy handling.
+
+ :param doc_id: The unique document identifier
+ :type doc_id: str
+ :param include_deleted: If set to True, deleted documents will be
+ returned with empty content. Otherwise asking for a deleted
+ document will return None.
+ :type include_deleted: bool
+
+ :return: a Document object.
+ :type: u1db.Document
+ """
+ doc = sqlite_backend.SQLitePartialExpandDatabase._get_doc(
+ self, doc_id, check_for_conflicts)
+ if doc:
+ c = self._db_handle.cursor()
+ c.execute('SELECT syncable FROM document WHERE doc_id=?',
+ (doc.doc_id,))
+ result = c.fetchone()
+ doc.syncable = bool(result[0])
+ return doc
+
+ def __del__(self):
+ """
+ Free resources when deleting or garbage collecting the database.
+
+ This is only here to minimze problems if someone ever forgets to call
+ the close() method after using the database; you should not rely on
+ garbage collecting to free up the database resources.
+ """
+ self.close()
+
+
+class SQLCipherU1DBSync(SQLCipherDatabase):
+ """
+ Soledad syncer implementation.
+ """
+
+ _sync_loop = None
_sync_enc_pool = None
"""
@@ -187,268 +453,164 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
A dictionary that hold locks which avoid multiple sync attempts from the
same database replica.
"""
+ # XXX We do not need the lock here now. Remove.
encrypting_lock = threading.Lock()
"""
- Period or recurrence of the periodic encrypting task, in seconds.
+ Period or recurrence of the Looping Call that will do the encryption to the
+ syncdb (in seconds).
"""
- ENCRYPT_TASK_PERIOD = 1
+ ENCRYPT_LOOP_PERIOD = 1
- syncing_lock = defaultdict(threading.Lock)
"""
A dictionary that hold locks which avoid multiple sync attempts from the
same database replica.
"""
+ syncing_lock = defaultdict(threading.Lock)
- def __init__(self, sqlcipher_file, password, document_factory=None,
- crypto=None, raw_key=False, cipher='aes-256-cbc',
- kdf_iter=4000, cipher_page_size=1024, sync_db_key=None):
- """
- Connect to an existing SQLCipher database, creating a new sqlcipher
- database file if needed.
-
- *** IMPORTANT ***
-
- Don't forget to close the database after use by calling the close()
- method otherwise some resources might not be freed and you may
- experience several kinds of leakages.
-
- *** IMPORTANT ***
-
- :param sqlcipher_file: The path for the SQLCipher file.
- :type sqlcipher_file: str
- :param password: The password that protects the SQLCipher db.
- :type password: str
- :param document_factory: A function that will be called with the same
- parameters as Document.__init__.
- :type document_factory: callable
- :param crypto: An instance of SoledadCrypto so we can encrypt/decrypt
- document contents when syncing.
- :type crypto: soledad.crypto.SoledadCrypto
- :param raw_key: Whether password is a raw 64-char hex string or a
- passphrase that should be hashed to obtain the
- encyrption key.
- :type raw_key: bool
- :param cipher: The cipher and mode to use.
- :type cipher: str
- :param kdf_iter: The number of iterations to use.
- :type kdf_iter: int
- :param cipher_page_size: The page size.
- :type cipher_page_size: int
- """
- # ensure the db is encrypted if the file already exists
- if os.path.exists(sqlcipher_file):
- self.assert_db_is_encrypted(
- sqlcipher_file, password, raw_key, cipher, kdf_iter,
- cipher_page_size)
+ def __init__(self, opts, soledad_crypto, replica_uid,
+ defer_encryption=False):
- # connect to the sqlcipher database
- with self.k_lock:
- self._db_handle = dbapi2.connect(
- sqlcipher_file,
- isolation_level=SQLITE_ISOLATION_LEVEL,
- check_same_thread=SQLITE_CHECK_SAME_THREAD)
- # set SQLCipher cryptographic parameters
- self._set_crypto_pragmas(
- self._db_handle, password, raw_key, cipher, kdf_iter,
- cipher_page_size)
- if os.environ.get('LEAP_SQLITE_NOSYNC'):
- self._pragma_synchronous_off(self._db_handle)
- else:
- self._pragma_synchronous_normal(self._db_handle)
- if os.environ.get('LEAP_SQLITE_MEMSTORE'):
- self._pragma_mem_temp_store(self._db_handle)
- self._pragma_write_ahead_logging(self._db_handle)
- self._real_replica_uid = None
- self._ensure_schema()
- self._crypto = crypto
+ self._opts = opts
+ self._path = opts.path
+ self._crypto = soledad_crypto
+ self.__replica_uid = replica_uid
- # define sync-db attrs
- self._sqlcipher_file = sqlcipher_file
- self._sync_db_key = sync_db_key
+ self._sync_db_key = opts.sync_db_key
self._sync_db = None
self._sync_db_write_lock = None
self._sync_enc_pool = None
self.sync_queue = None
- if self.defer_encryption:
- # initialize sync db
- self._init_sync_db()
- # initialize syncing queue encryption pool
- self._sync_enc_pool = SyncEncrypterPool(
- self._crypto, self._sync_db, self._sync_db_write_lock)
- self._sync_watcher = TimerTask(self._encrypt_syncing_docs,
- self.ENCRYPT_TASK_PERIOD)
- self._sync_watcher.start()
-
- def factory(doc_id=None, rev=None, json='{}', has_conflicts=False,
- syncable=True):
- return SoledadDocument(doc_id=doc_id, rev=rev, json=json,
- has_conflicts=has_conflicts,
- syncable=syncable)
- self.set_document_factory(factory)
# we store syncers in a dictionary indexed by the target URL. We also
# store a hash of the auth info in case auth info expires and we need
# to rebuild the syncer for that target. The final self._syncers
# format is the following:
#
- # self._syncers = {'<url>': ('<auth_hash>', syncer), ...}
+ # self._syncers = {'<url>': ('<auth_hash>', syncer), ...}
+
self._syncers = {}
+ self._sync_db_write_lock = threading.Lock()
+ self.sync_queue = multiprocessing.Queue()
- @classmethod
- def _open_database(cls, sqlcipher_file, password, document_factory=None,
- crypto=None, raw_key=False, cipher='aes-256-cbc',
- kdf_iter=4000, cipher_page_size=1024,
- defer_encryption=False, sync_db_key=None):
- """
- Open a SQLCipher database.
-
- :param sqlcipher_file: The path for the SQLCipher file.
- :type sqlcipher_file: str
- :param password: The password that protects the SQLCipher db.
- :type password: str
- :param document_factory: A function that will be called with the same
- parameters as Document.__init__.
- :type document_factory: callable
- :param crypto: An instance of SoledadCrypto so we can encrypt/decrypt
- document contents when syncing.
- :type crypto: soledad.crypto.SoledadCrypto
- :param raw_key: Whether C{password} is a raw 64-char hex string or a
- passphrase that should be hashed to obtain the encyrption key.
- :type raw_key: bool
- :param cipher: The cipher and mode to use.
- :type cipher: str
- :param kdf_iter: The number of iterations to use.
- :type kdf_iter: int
- :param cipher_page_size: The page size.
- :type cipher_page_size: int
- :param defer_encryption: Whether to defer encryption/decryption of
- documents, or do it inline while syncing.
- :type defer_encryption: bool
+ self.running = False
+ self._sync_threadpool = None
+ self._initialize_sync_threadpool()
- :return: The database object.
- :rtype: SQLCipherDatabase
- """
- cls.defer_encryption = defer_encryption
- if not os.path.isfile(sqlcipher_file):
- raise u1db_errors.DatabaseDoesNotExist()
-
- tries = 2
- # Note: There seems to be a bug in sqlite 3.5.9 (with python2.6)
- # where without re-opening the database on Windows, it
- # doesn't see the transaction that was just committed
- while True:
-
- with cls.k_lock:
- db_handle = dbapi2.connect(
- sqlcipher_file,
- check_same_thread=SQLITE_CHECK_SAME_THREAD)
-
- try:
- # set cryptographic params
- cls._set_crypto_pragmas(
- db_handle, password, raw_key, cipher, kdf_iter,
- cipher_page_size)
- c = db_handle.cursor()
- # XXX if we use it here, it should be public
- v, err = cls._which_index_storage(c)
- except Exception as exc:
- logger.warning("ERROR OPENING DATABASE!")
- logger.debug("error was: %r" % exc)
- v, err = None, exc
- finally:
- db_handle.close()
- if v is not None:
- break
- # possibly another process is initializing it, wait for it to be
- # done
- if tries == 0:
- raise err # go for the richest error?
- tries -= 1
- time.sleep(cls.WAIT_FOR_PARALLEL_INIT_HALF_INTERVAL)
- return SQLCipherDatabase._sqlite_registry[v](
- sqlcipher_file, password, document_factory=document_factory,
- crypto=crypto, raw_key=raw_key, cipher=cipher, kdf_iter=kdf_iter,
- cipher_page_size=cipher_page_size, sync_db_key=sync_db_key)
+ self._reactor = reactor
+ self._reactor.callWhenRunning(self._start)
- @classmethod
- def open_database(cls, sqlcipher_file, password, create, backend_cls=None,
- document_factory=None, crypto=None, raw_key=False,
- cipher='aes-256-cbc', kdf_iter=4000,
- cipher_page_size=1024, defer_encryption=False,
- sync_db_key=None):
- """
- Open a SQLCipher database.
+ self.ready = True
+ self._db_handle = None
+ self._initialize_syncer_main_db()
- *** IMPORTANT ***
+ if defer_encryption:
+ self._initialize_sync_db(opts)
- Don't forget to close the database after use by calling the close()
- method otherwise some resources might not be freed and you may
- experience several kinds of leakages.
+ # initialize syncing queue encryption pool
+ self._sync_enc_pool = crypto.SyncEncrypterPool(
+ self._crypto, self._sync_db, self._sync_db_write_lock)
- *** IMPORTANT ***
+ # -----------------------------------------------------------------
+ # From the documentation: If f returns a deferred, rescheduling
+ # will not take place until the deferred has fired. The result
+ # value is ignored.
- :param sqlcipher_file: The path for the SQLCipher file.
- :type sqlcipher_file: str
+ # TODO use this to avoid multiple sync attempts if the sync has not
+ # finished!
+ # -----------------------------------------------------------------
- :param password: The password that protects the SQLCipher db.
- :type password: str
+ # XXX this was called sync_watcher --- trace any remnants
+ self._sync_loop = LoopingCall(self._encrypt_syncing_docs)
+ self._sync_loop.start(self.ENCRYPT_LOOP_PERIOD)
- :param create: Should the datbase be created if it does not already
- exist?
- :type create: bool
+ self.shutdownID = None
- :param backend_cls: A class to use as backend.
- :type backend_cls: type
+ @property
+ def _replica_uid(self):
+ return str(self.__replica_uid)
- :param document_factory: A function that will be called with the same
- parameters as Document.__init__.
- :type document_factory: callable
+ def _start(self):
+ if not self.running:
+ self._sync_threadpool.start()
+ self.shutdownID = self._reactor.addSystemEventTrigger(
+ 'during', 'shutdown', self.finalClose)
+ self.running = True
- :param crypto: An instance of SoledadCrypto so we can encrypt/decrypt
- document contents when syncing.
- :type crypto: soledad.crypto.SoledadCrypto
+ def _defer_to_sync_threadpool(self, meth, *args, **kwargs):
+ return deferToThreadPool(
+ self._reactor, self._sync_threadpool, meth, *args, **kwargs)
- :param raw_key: Whether C{password} is a raw 64-char hex string or a
- passphrase that should be hashed to obtain the
- encyrption key.
- :type raw_key: bool
+ def _initialize_syncer_main_db(self):
- :param cipher: The cipher and mode to use.
- :type cipher: str
+ def init_db():
- :param kdf_iter: The number of iterations to use.
- :type kdf_iter: int
+ # XXX DEBUG -----------------------------------------
+ # REMOVE ME when merging.
- :param cipher_page_size: The page size.
- :type cipher_page_size: int
+ #import thread
+ #print "initializing in thread", thread.get_ident()
+ # ---------------------------------------------------
+ self._db_handle = initialize_sqlcipher_db(
+ self._opts, check_same_thread=False)
+ self._real_replica_uid = None
+ self._ensure_schema()
+ self.set_document_factory(soledad_doc_factory)
- :param defer_encryption: Whether to defer encryption/decryption of
- documents, or do it inline while syncing.
- :type defer_encryption: bool
+ return self._defer_to_sync_threadpool(init_db)
- :return: The database object.
- :rtype: SQLCipherDatabase
+ def _initialize_sync_threadpool(self):
"""
- cls.defer_encryption = defer_encryption
- try:
- return cls._open_database(
- sqlcipher_file, password, document_factory=document_factory,
- crypto=crypto, raw_key=raw_key, cipher=cipher,
- kdf_iter=kdf_iter, cipher_page_size=cipher_page_size,
- defer_encryption=defer_encryption, sync_db_key=sync_db_key)
- except u1db_errors.DatabaseDoesNotExist:
- if not create:
- raise
- # TODO: remove backend class from here.
- if backend_cls is None:
- # default is SQLCipherPartialExpandDatabase
- backend_cls = SQLCipherDatabase
- return backend_cls(
- sqlcipher_file, password, document_factory=document_factory,
- crypto=crypto, raw_key=raw_key, cipher=cipher,
- kdf_iter=kdf_iter, cipher_page_size=cipher_page_size,
- sync_db_key=sync_db_key)
+ Initialize a ThreadPool with exactly one thread, that will be used to
+ run all the network blocking calls for syncing on a separate thread.
+
+ TODO this needs to be ported away from urllib and into twisted async
+ calls, and then we can ditch this syncing thread and reintegrate into
+ the main reactor.
+ """
+ self._sync_threadpool = ThreadPool(0, 1)
+
+ def _initialize_sync_db(self, opts):
+ """
+ Initialize the Symmetrically-Encrypted document to be synced database,
+ and the queue to communicate with subprocess workers.
+
+ :param opts:
+ :type opts: SQLCipherOptions
+ """
+ soledad_assert(opts.sync_db_key is not None)
+ sync_db_path = None
+ if opts.path != ":memory:":
+ sync_db_path = "%s-sync" % opts.path
+ else:
+ sync_db_path = ":memory:"
+
+ # we copy incoming options because the opts object might be used
+ # somewhere else
+ sync_opts = SQLCipherOptions.copy(
+ opts, path=sync_db_path, create=True)
+ self._sync_db = initialize_sqlcipher_db(
+ sync_opts, on_init=self._sync_db_extra_init,
+ check_same_thread=False)
+ pragmas.set_crypto_pragmas(self._sync_db, opts)
+ # ---------------------------------------------------------
+
+ @property
+ def _sync_db_extra_init(self):
+ """
+ Queries for creating tables for the local sync documents db if needed.
+ They are passed as extra initialization to initialize_sqlciphjer_db
+
+ :rtype: tuple of strings
+ """
+ maybe_create = "CREATE TABLE IF NOT EXISTS %s (%s)"
+ encr = crypto.SyncEncrypterPool
+ decr = crypto.SyncDecrypterPool
+ sql_encr_table_query = (maybe_create % (
+ encr.TABLE_NAME, encr.FIELD_NAMES))
+ sql_decr_table_query = (maybe_create % (
+ decr.TABLE_NAME, decr.FIELD_NAMES))
+ return (sql_encr_table_query, sql_decr_table_query)
def sync(self, url, creds=None, autocreate=True, defer_decryption=True):
"""
@@ -460,27 +622,49 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
:param url: The url of the target replica to sync with.
:type url: str
- :param creds: optional dictionary giving credentials.
+ :param creds:
+ optional dictionary giving credentials.
to authorize the operation with the server.
:type creds: dict
:param autocreate: Ask the target to create the db if non-existent.
:type autocreate: bool
- :param defer_decryption: Whether to defer the decryption process using
- the intermediate database. If False,
- decryption will be done inline.
+ :param defer_decryption:
+ Whether to defer the decryption process using the intermediate
+ database. If False, decryption will be done inline.
:type defer_decryption: bool
- :return: The local generation before the synchronisation was performed.
- :rtype: int
- """
+ :return:
+ A Deferred, that will fire with the local generation (type `int`)
+ before the synchronisation was performed.
+ :rtype: deferred
+ """
+ if not self.ready:
+ print "not ready yet..."
+ # XXX ---------------------------------------------------------
+ # This might happen because the database has not yet been
+ # initialized (it's deferred to the theadpool).
+ # A good strategy might involve to return a deferred that will
+ # callLater this same function after a timeout (deferLater)
+ # Might want to keep track of retries and cancel too.
+ # --------------------------------------------------------------
+ kwargs = {'creds': creds, 'autocreate': autocreate,
+ 'defer_decryption': defer_decryption}
+ return self._defer_to_sync_threadpool(self._sync, url, **kwargs)
+
+ def _sync(self, url, creds=None, autocreate=True, defer_decryption=True):
res = None
+
# the following context manager blocks until the syncing lock can be
# acquired.
- if defer_decryption:
- self._init_sync_db()
- with self.syncer(url, creds=creds) as syncer:
+ # TODO review, I think this is no longer needed with a 1-thread
+ # threadpool.
+
+ log.msg("in _sync")
+ self.__url = url
+ with self._syncer(url, creds=creds) as syncer:
# XXX could mark the critical section here...
try:
+ log.msg('syncer sync...')
res = syncer.sync(autocreate=autocreate,
defer_decryption=defer_decryption)
@@ -500,12 +684,15 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
"""
Interrupt all ongoing syncs.
"""
+ self._stop_sync()
+
+ def _stop_sync(self):
for url in self._syncers:
_, syncer = self._syncers[url]
syncer.stop()
@contextmanager
- def syncer(self, url, creds=None):
+ def _syncer(self, url, creds=None):
"""
Accesor for synchronizer.
@@ -514,13 +701,13 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
Because of that, this method blocks until the syncing lock can be
acquired.
"""
- with SQLCipherDatabase.syncing_lock[self._get_replica_uid()]:
+ with self.syncing_lock[self._path]:
syncer = self._get_syncer(url, creds=creds)
yield syncer
@property
def syncing(self):
- lock = SQLCipherDatabase.syncing_lock[self._get_replica_uid()]
+ lock = self.syncing_lock[self._path]
acquired_lock = lock.acquire(False)
if acquired_lock is False:
return True
@@ -529,7 +716,7 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
def _get_syncer(self, url, creds=None):
"""
- Get a synchronizer for C{url} using C{creds}.
+ Get a synchronizer for ``url`` using ``creds``.
:param url: The url of the target replica to sync with.
:type url: str
@@ -550,6 +737,7 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
syncer = SoledadSynchronizer(
self,
SoledadSyncTarget(url,
+ # XXX is the replica_uid ready?
self._replica_uid,
creds=creds,
crypto=self._crypto,
@@ -562,58 +750,6 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
syncer.num_inserted = 0
return syncer
- def _extra_schema_init(self, c):
- """
- Add any extra fields, etc to the basic table definitions.
-
- This method is called by u1db.backends.sqlite_backend._initialize()
- method, which is executed when the database schema is created. Here,
- we use it to include the "syncable" property for LeapDocuments.
-
- :param c: The cursor for querying the database.
- :type c: dbapi2.cursor
- """
- c.execute(
- 'ALTER TABLE document '
- 'ADD COLUMN syncable BOOL NOT NULL DEFAULT TRUE')
-
- def _init_sync_db(self):
- """
- Initialize the Symmetrically-Encrypted document to be synced database,
- and the queue to communicate with subprocess workers.
- """
- if self._sync_db is None:
- soledad_assert(self._sync_db_key is not None)
- sync_db_path = None
- if self._sqlcipher_file != ":memory:":
- sync_db_path = "%s-sync" % self._sqlcipher_file
- else:
- sync_db_path = ":memory:"
- self._sync_db = MPSafeSQLiteDB(sync_db_path)
- # protect the sync db with a password
- if self._sync_db_key is not None:
- self._set_crypto_pragmas(
- self._sync_db, self._sync_db_key, False,
- 'aes-256-cbc', 4000, 1024)
- self._sync_db_write_lock = threading.Lock()
- self._create_sync_db_tables()
- self.sync_queue = multiprocessing.Queue()
-
- def _create_sync_db_tables(self):
- """
- Create tables for the local sync documents db if needed.
- """
- encr = SyncEncrypterPool
- decr = SyncDecrypterPool
- sql_encr = ("CREATE TABLE IF NOT EXISTS %s (%s)" % (
- encr.TABLE_NAME, encr.FIELD_NAMES))
- sql_decr = ("CREATE TABLE IF NOT EXISTS %s (%s)" % (
- decr.TABLE_NAME, decr.FIELD_NAMES))
-
- with self._sync_db_write_lock:
- self._sync_db.execute(sql_encr)
- self._sync_db.execute(sql_decr)
-
#
# Symmetric encryption of syncing docs
#
@@ -624,8 +760,11 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
to be encrypted in the sync db. They will be read by the
SoledadSyncTarget during the sync_exchange.
- Called periodical from the TimerTask self._sync_watcher.
+ Called periodically from the LoopingCall self._sync_loop.
"""
+ # TODO should return a deferred that would firewhen the encryption is
+ # done. See note on __init__
+
lock = self.encrypting_lock
# optional wait flag used to avoid blocking
if not lock.acquire(False):
@@ -643,488 +782,28 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
finally:
lock.release()
- #
- # Document operations
- #
-
- def put_doc(self, doc):
- """
- Overwrite the put_doc method, to enqueue the modified document for
- encryption before sync.
-
- :param doc: The document to be put.
- :type doc: u1db.Document
-
- :return: The new document revision.
- :rtype: str
- """
- doc_rev = sqlite_backend.SQLitePartialExpandDatabase.put_doc(
- self, doc)
- if self.defer_encryption:
- self.sync_queue.put_nowait(doc)
- return doc_rev
-
- # indexes
-
- def _put_and_update_indexes(self, old_doc, doc):
- """
- Update a document and all indexes related to it.
-
- :param old_doc: The old version of the document.
- :type old_doc: u1db.Document
- :param doc: The new version of the document.
- :type doc: u1db.Document
- """
- with self.update_indexes_lock:
- sqlite_backend.SQLitePartialExpandDatabase._put_and_update_indexes(
- self, old_doc, doc)
- c = self._db_handle.cursor()
- c.execute('UPDATE document SET syncable=? '
- 'WHERE doc_id=?',
- (doc.syncable, doc.doc_id))
-
- def _get_doc(self, doc_id, check_for_conflicts=False):
- """
- Get just the document content, without fancy handling.
-
- :param doc_id: The unique document identifier
- :type doc_id: str
- :param include_deleted: If set to True, deleted documents will be
- returned with empty content. Otherwise asking for a deleted
- document will return None.
- :type include_deleted: bool
-
- :return: a Document object.
- :type: u1db.Document
- """
- doc = sqlite_backend.SQLitePartialExpandDatabase._get_doc(
- self, doc_id, check_for_conflicts)
- if doc:
- c = self._db_handle.cursor()
- c.execute('SELECT syncable FROM document '
- 'WHERE doc_id=?',
- (doc.doc_id,))
- result = c.fetchone()
- doc.syncable = bool(result[0])
- return doc
-
- #
- # SQLCipher API methods
- #
-
- @classmethod
- def assert_db_is_encrypted(cls, sqlcipher_file, key, raw_key, cipher,
- kdf_iter, cipher_page_size):
- """
- Assert that C{sqlcipher_file} contains an encrypted database.
-
- When opening an existing database, PRAGMA key will not immediately
- throw an error if the key provided is incorrect. To test that the
- database can be successfully opened with the provided key, it is
- necessary to perform some operation on the database (i.e. read from
- it) and confirm it is success.
-
- The easiest way to do this is select off the sqlite_master table,
- which will attempt to read the first page of the database and will
- parse the schema.
-
- :param sqlcipher_file: The path for the SQLCipher file.
- :type sqlcipher_file: str
- :param key: The key that protects the SQLCipher db.
- :type key: str
- :param raw_key: Whether C{key} is a raw 64-char hex string or a
- passphrase that should be hashed to obtain the encyrption key.
- :type raw_key: bool
- :param cipher: The cipher and mode to use.
- :type cipher: str
- :param kdf_iter: The number of iterations to use.
- :type kdf_iter: int
- :param cipher_page_size: The page size.
- :type cipher_page_size: int
- """
- try:
- # try to open an encrypted database with the regular u1db
- # backend should raise a DatabaseError exception.
- sqlite_backend.SQLitePartialExpandDatabase(sqlcipher_file)
- raise DatabaseIsNotEncrypted()
- except dbapi2.DatabaseError:
- # assert that we can access it using SQLCipher with the given
- # key
- with cls.k_lock:
- db_handle = dbapi2.connect(
- sqlcipher_file,
- isolation_level=SQLITE_ISOLATION_LEVEL,
- check_same_thread=SQLITE_CHECK_SAME_THREAD)
- cls._set_crypto_pragmas(
- db_handle, key, raw_key, cipher,
- kdf_iter, cipher_page_size)
- db_handle.cursor().execute(
- 'SELECT count(*) FROM sqlite_master')
-
- @classmethod
- def _set_crypto_pragmas(cls, db_handle, key, raw_key, cipher, kdf_iter,
- cipher_page_size):
- """
- Set cryptographic params (key, cipher, KDF number of iterations and
- cipher page size).
- """
- cls._pragma_key(db_handle, key, raw_key)
- cls._pragma_cipher(db_handle, cipher)
- cls._pragma_kdf_iter(db_handle, kdf_iter)
- cls._pragma_cipher_page_size(db_handle, cipher_page_size)
-
- @classmethod
- def _pragma_key(cls, db_handle, key, raw_key):
- """
- Set the C{key} for use with the database.
-
- The process of creating a new, encrypted database is called 'keying'
- the database. SQLCipher uses just-in-time key derivation at the point
- it is first needed for an operation. This means that the key (and any
- options) must be set before the first operation on the database. As
- soon as the database is touched (e.g. SELECT, CREATE TABLE, UPDATE,
- etc.) and pages need to be read or written, the key is prepared for
- use.
-
- Implementation Notes:
-
- * PRAGMA key should generally be called as the first operation on a
- database.
-
- :param key: The key for use with the database.
- :type key: str
- :param raw_key: Whether C{key} is a raw 64-char hex string or a
- passphrase that should be hashed to obtain the encyrption key.
- :type raw_key: bool
- """
- if raw_key:
- cls._pragma_key_raw(db_handle, key)
- else:
- cls._pragma_key_passphrase(db_handle, key)
-
- @classmethod
- def _pragma_key_passphrase(cls, db_handle, passphrase):
- """
- Set a passphrase for encryption key derivation.
-
- The key itself can be a passphrase, which is converted to a key using
- PBKDF2 key derivation. The result is used as the encryption key for
- the database. By using this method, there is no way to alter the KDF;
- if you want to do so you should use a raw key instead and derive the
- key using your own KDF.
-
- :param db_handle: A handle to the SQLCipher database.
- :type db_handle: pysqlcipher.Connection
- :param passphrase: The passphrase used to derive the encryption key.
- :type passphrase: str
- """
- db_handle.cursor().execute("PRAGMA key = '%s'" % passphrase)
-
- @classmethod
- def _pragma_key_raw(cls, db_handle, key):
- """
- Set a raw hexadecimal encryption key.
-
- It is possible to specify an exact byte sequence using a blob literal.
- With this method, it is the calling application's responsibility to
- ensure that the data provided is a 64 character hex string, which will
- be converted directly to 32 bytes (256 bits) of key data.
-
- :param db_handle: A handle to the SQLCipher database.
- :type db_handle: pysqlcipher.Connection
- :param key: A 64 character hex string.
- :type key: str
- """
- if not all(c in string.hexdigits for c in key):
- raise NotAnHexString(key)
- db_handle.cursor().execute('PRAGMA key = "x\'%s"' % key)
-
- @classmethod
- def _pragma_cipher(cls, db_handle, cipher='aes-256-cbc'):
- """
- Set the cipher and mode to use for symmetric encryption.
-
- SQLCipher uses aes-256-cbc as the default cipher and mode of
- operation. It is possible to change this, though not generally
- recommended, using PRAGMA cipher.
-
- SQLCipher makes direct use of libssl, so all cipher options available
- to libssl are also available for use with SQLCipher. See `man enc` for
- OpenSSL's supported ciphers.
-
- Implementation Notes:
-
- * PRAGMA cipher must be called after PRAGMA key and before the first
- actual database operation or it will have no effect.
-
- * If a non-default value is used PRAGMA cipher to create a database,
- it must also be called every time that database is opened.
-
- * SQLCipher does not implement its own encryption. Instead it uses the
- widely available and peer-reviewed OpenSSL libcrypto for all
- cryptographic functions.
-
- :param db_handle: A handle to the SQLCipher database.
- :type db_handle: pysqlcipher.Connection
- :param cipher: The cipher and mode to use.
- :type cipher: str
- """
- db_handle.cursor().execute("PRAGMA cipher = '%s'" % cipher)
-
- @classmethod
- def _pragma_kdf_iter(cls, db_handle, kdf_iter=4000):
- """
- Set the number of iterations for the key derivation function.
-
- SQLCipher uses PBKDF2 key derivation to strengthen the key and make it
- resistent to brute force and dictionary attacks. The default
- configuration uses 4000 PBKDF2 iterations (effectively 16,000 SHA1
- operations). PRAGMA kdf_iter can be used to increase or decrease the
- number of iterations used.
+ def get_generation(self):
+ # FIXME
+ # XXX this SHOULD BE a callback
+ return self._get_generation()
- Implementation Notes:
-
- * PRAGMA kdf_iter must be called after PRAGMA key and before the first
- actual database operation or it will have no effect.
-
- * If a non-default value is used PRAGMA kdf_iter to create a database,
- it must also be called every time that database is opened.
-
- * It is not recommended to reduce the number of iterations if a
- passphrase is in use.
-
- :param db_handle: A handle to the SQLCipher database.
- :type db_handle: pysqlcipher.Connection
- :param kdf_iter: The number of iterations to use.
- :type kdf_iter: int
- """
- db_handle.cursor().execute("PRAGMA kdf_iter = '%d'" % kdf_iter)
-
- @classmethod
- def _pragma_cipher_page_size(cls, db_handle, cipher_page_size=1024):
- """
- Set the page size of the encrypted database.
-
- SQLCipher 2 introduced the new PRAGMA cipher_page_size that can be
- used to adjust the page size for the encrypted database. The default
- page size is 1024 bytes, but it can be desirable for some applications
- to use a larger page size for increased performance. For instance,
- some recent testing shows that increasing the page size can noticeably
- improve performance (5-30%) for certain queries that manipulate a
- large number of pages (e.g. selects without an index, large inserts in
- a transaction, big deletes).
-
- To adjust the page size, call the pragma immediately after setting the
- key for the first time and each subsequent time that you open the
- database.
-
- Implementation Notes:
-
- * PRAGMA cipher_page_size must be called after PRAGMA key and before
- the first actual database operation or it will have no effect.
-
- * If a non-default value is used PRAGMA cipher_page_size to create a
- database, it must also be called every time that database is opened.
-
- :param db_handle: A handle to the SQLCipher database.
- :type db_handle: pysqlcipher.Connection
- :param cipher_page_size: The page size.
- :type cipher_page_size: int
- """
- db_handle.cursor().execute(
- "PRAGMA cipher_page_size = '%d'" % cipher_page_size)
-
- @classmethod
- def _pragma_rekey(cls, db_handle, new_key, raw_key):
- """
- Change the key of an existing encrypted database.
-
- To change the key on an existing encrypted database, it must first be
- unlocked with the current encryption key. Once the database is
- readable and writeable, PRAGMA rekey can be used to re-encrypt every
- page in the database with a new key.
-
- * PRAGMA rekey must be called after PRAGMA key. It can be called at any
- time once the database is readable.
-
- * PRAGMA rekey can not be used to encrypted a standard SQLite
- database! It is only useful for changing the key on an existing
- database.
-
- * Previous versions of SQLCipher provided a PRAGMA rekey_cipher and
- code>PRAGMA rekey_kdf_iter. These are deprecated and should not be
- used. Instead, use sqlcipher_export().
-
- :param db_handle: A handle to the SQLCipher database.
- :type db_handle: pysqlcipher.Connection
- :param new_key: The new key.
- :type new_key: str
- :param raw_key: Whether C{password} is a raw 64-char hex string or a
- passphrase that should be hashed to obtain the encyrption key.
- :type raw_key: bool
- """
- # XXX change key param!
- if raw_key:
- cls._pragma_rekey_raw(db_handle, key)
- else:
- cls._pragma_rekey_passphrase(db_handle, key)
-
- @classmethod
- def _pragma_rekey_passphrase(cls, db_handle, passphrase):
- """
- Change the passphrase for encryption key derivation.
-
- The key itself can be a passphrase, which is converted to a key using
- PBKDF2 key derivation. The result is used as the encryption key for
- the database.
-
- :param db_handle: A handle to the SQLCipher database.
- :type db_handle: pysqlcipher.Connection
- :param passphrase: The passphrase used to derive the encryption key.
- :type passphrase: str
- """
- db_handle.cursor().execute("PRAGMA rekey = '%s'" % passphrase)
-
- @classmethod
- def _pragma_rekey_raw(cls, db_handle, key):
- """
- Change the raw hexadecimal encryption key.
-
- It is possible to specify an exact byte sequence using a blob literal.
- With this method, it is the calling application's responsibility to
- ensure that the data provided is a 64 character hex string, which will
- be converted directly to 32 bytes (256 bits) of key data.
-
- :param db_handle: A handle to the SQLCipher database.
- :type db_handle: pysqlcipher.Connection
- :param key: A 64 character hex string.
- :type key: str
+ def finalClose(self):
"""
- if not all(c in string.hexdigits for c in key):
- raise NotAnHexString(key)
- # XXX change passphrase param!
- db_handle.cursor().execute('PRAGMA rekey = "x\'%s"' % passphrase)
-
- @classmethod
- def _pragma_synchronous_off(cls, db_handle):
- """
- Change the setting of the "synchronous" flag to OFF.
- """
- logger.debug("SQLCIPHER: SETTING SYNCHRONOUS OFF")
- db_handle.cursor().execute('PRAGMA synchronous=OFF')
-
- @classmethod
- def _pragma_synchronous_normal(cls, db_handle):
- """
- Change the setting of the "synchronous" flag to NORMAL.
- """
- logger.debug("SQLCIPHER: SETTING SYNCHRONOUS NORMAL")
- db_handle.cursor().execute('PRAGMA synchronous=NORMAL')
-
- @classmethod
- def _pragma_mem_temp_store(cls, db_handle):
- """
- Use a in-memory store for temporary tables.
- """
- logger.debug("SQLCIPHER: SETTING TEMP_STORE MEMORY")
- db_handle.cursor().execute('PRAGMA temp_store=MEMORY')
-
- @classmethod
- def _pragma_write_ahead_logging(cls, db_handle):
- """
- Enable write-ahead logging, and set the autocheckpoint to 50 pages.
-
- Setting the autocheckpoint to a small value, we make the reads not
- suffer too much performance degradation.
-
- From the sqlite docs:
-
- "There is a tradeoff between average read performance and average write
- performance. To maximize the read performance, one wants to keep the
- WAL as small as possible and hence run checkpoints frequently, perhaps
- as often as every COMMIT. To maximize write performance, one wants to
- amortize the cost of each checkpoint over as many writes as possible,
- meaning that one wants to run checkpoints infrequently and let the WAL
- grow as large as possible before each checkpoint. The decision of how
- often to run checkpoints may therefore vary from one application to
- another depending on the relative read and write performance
- requirements of the application. The default strategy is to run a
- checkpoint once the WAL reaches 1000 pages"
- """
- logger.debug("SQLCIPHER: SETTING WRITE-AHEAD LOGGING")
- db_handle.cursor().execute('PRAGMA journal_mode=WAL')
- # The optimum value can still use a little bit of tuning, but we favor
- # small sizes of the WAL file to get fast reads, since we assume that
- # the writes will be quick enough to not block too much.
-
- # TODO
- # As a further improvement, we might want to set autocheckpoint to 0
- # here and do the checkpoints manually in a separate thread, to avoid
- # any blocks in the main thread (we should run a loopingcall from here)
- db_handle.cursor().execute('PRAGMA wal_autocheckpoint=50')
-
- # Extra query methods: extensions to the base sqlite implmentation.
-
- def get_count_from_index(self, index_name, *key_values):
- """
- Returns the count for a given combination of index_name
- and key values.
-
- Extension method made from similar methods in u1db version 13.09
-
- :param index_name: The index to query
- :type index_name: str
- :param key_values: values to match. eg, if you have
- an index with 3 fields then you would have:
- get_from_index(index_name, val1, val2, val3)
- :type key_values: tuple
- :return: count.
- :rtype: int
+ This should only be called by the shutdown trigger.
"""
- c = self._db_handle.cursor()
- definition = self._get_index_definition(index_name)
-
- if len(key_values) != len(definition):
- raise u1db_errors.InvalidValueForIndex()
- tables = ["document_fields d%d" % i for i in range(len(definition))]
- novalue_where = ["d.doc_id = d%d.doc_id"
- " AND d%d.field_name = ?"
- % (i, i) for i in range(len(definition))]
- exact_where = [novalue_where[i]
- + (" AND d%d.value = ?" % (i,))
- for i in range(len(definition))]
- args = []
- where = []
- for idx, (field, value) in enumerate(zip(definition, key_values)):
- args.append(field)
- where.append(exact_where[idx])
- args.append(value)
-
- tables = ["document_fields d%d" % i for i in range(len(definition))]
- statement = (
- "SELECT COUNT(*) FROM document d, %s WHERE %s " % (
- ', '.join(tables),
- ' AND '.join(where),
- ))
- try:
- c.execute(statement, tuple(args))
- except dbapi2.OperationalError, e:
- raise dbapi2.OperationalError(
- str(e) + '\nstatement: %s\nargs: %s\n' % (statement, args))
- res = c.fetchall()
- return res[0][0]
+ self.shutdownID = None
+ self._sync_threadpool.stop()
+ self.running = False
def close(self):
"""
- Close db_handle and close syncer.
+ Close the syncer and syncdb orderly
"""
- if logger is not None: # logger might be none if called from __del__
- logger.debug("Sqlcipher backend: closing")
- # stop the sync watcher for deferred encryption
- if self._sync_watcher is not None:
- self._sync_watcher.stop()
- self._sync_watcher.shutdown()
- self._sync_watcher = None
+ # stop the sync loop for deferred encryption
+ if self._sync_loop is not None:
+ self._sync_loop.reset()
+ self._sync_loop.stop()
+ self._sync_loop = None
# close all open syncers
for url in self._syncers:
_, syncer = self._syncers[url]
@@ -1134,10 +813,7 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
if self._sync_enc_pool is not None:
self._sync_enc_pool.close()
self._sync_enc_pool = None
- # close the actual database
- if self._db_handle is not None:
- self._db_handle.close()
- self._db_handle = None
+
# close the sync database
if self._sync_db is not None:
self._sync_db.close()
@@ -1148,19 +824,92 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
del self.sync_queue
self.sync_queue = None
- def __del__(self):
- """
- Free resources when deleting or garbage collecting the database.
- This is only here to minimze problems if someone ever forgets to call
- the close() method after using the database; you should not rely on
- garbage collecting to free up the database resources.
- """
- self.close()
+class U1DBSQLiteBackend(sqlite_backend.SQLitePartialExpandDatabase):
+ """
+ A very simple wrapper for u1db around sqlcipher backend.
- @property
- def replica_uid(self):
- return self._get_replica_uid()
+ Instead of initializing the database on the fly, it just uses an existing
+ connection that is passed to it in the initializer.
+
+ It can be used in tests and debug runs to initialize the adbapi with plain
+ sqlite connections, decoupled from the sqlcipher layer.
+ """
+
+ def __init__(self, conn):
+ self._db_handle = conn
+ self._real_replica_uid = None
+ self._ensure_schema()
+ self._factory = u1db.Document
+
+
+class SoledadSQLCipherWrapper(SQLCipherDatabase):
+ """
+ A wrapper for u1db that uses the Soledad-extended sqlcipher backend.
+
+ Instead of initializing the database on the fly, it just uses an existing
+ connection that is passed to it in the initializer.
+
+ It can be used from adbapi to initialize a soledad database after
+ getting a regular connection to a sqlcipher database.
+ """
+ def __init__(self, conn):
+ self._db_handle = conn
+ self._real_replica_uid = None
+ self._ensure_schema()
+ self.set_document_factory(soledad_doc_factory)
+ self._prime_replica_uid()
+
+
+def _assert_db_is_encrypted(opts):
+ """
+ Assert that the sqlcipher file contains an encrypted database.
+ When opening an existing database, PRAGMA key will not immediately
+ throw an error if the key provided is incorrect. To test that the
+ database can be successfully opened with the provided key, it is
+ necessary to perform some operation on the database (i.e. read from
+ it) and confirm it is success.
+
+ The easiest way to do this is select off the sqlite_master table,
+ which will attempt to read the first page of the database and will
+ parse the schema.
+
+ :param opts:
+ """
+ # We try to open an encrypted database with the regular u1db
+ # backend should raise a DatabaseError exception.
+ # If the regular backend succeeds, then we need to stop because
+ # the database was not properly initialized.
+ try:
+ sqlite_backend.SQLitePartialExpandDatabase(opts.path)
+ except sqlcipher_dbapi2.DatabaseError:
+ # assert that we can access it using SQLCipher with the given
+ # key
+ dummy_query = ('SELECT count(*) FROM sqlite_master',)
+ initialize_sqlcipher_db(opts, on_init=dummy_query)
+ else:
+ raise DatabaseIsNotEncrypted()
+
+#
+# Exceptions
+#
+
+
+class DatabaseIsNotEncrypted(Exception):
+ """
+ Exception raised when trying to open non-encrypted databases.
+ """
+ pass
+
+
+def soledad_doc_factory(doc_id=None, rev=None, json='{}', has_conflicts=False,
+ syncable=True):
+ """
+ Return a default Soledad Document.
+ Used in the initialization for SQLCipherDatabase
+ """
+ return SoledadDocument(doc_id=doc_id, rev=rev, json=json,
+ has_conflicts=has_conflicts, syncable=syncable)
sqlite_backend.SQLiteDatabase.register_implementation(SQLCipherDatabase)
diff --git a/client/src/leap/soledad/client/sync.py b/client/src/leap/soledad/client/sync.py
index 0297c75c..1a5e2989 100644
--- a/client/src/leap/soledad/client/sync.py
+++ b/client/src/leap/soledad/client/sync.py
@@ -14,21 +14,16 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-
"""
Soledad synchronization utilities.
-
Extend u1db Synchronizer with the ability to:
- * Defer the update of the known replica uid until all the decryption of
+ * Postpone the update of the known replica uid until all the decryption of
the incoming messages has been processed.
* Be interrupted and recovered.
"""
-
-
import logging
import traceback
from threading import Lock
@@ -52,6 +47,8 @@ class SoledadSynchronizer(Synchronizer):
Also modified to allow for interrupting the synchronization process.
"""
+ # TODO can delegate the syncing to the api object, living in the reactor
+ # thread, and use a simple flag.
syncing_lock = Lock()
def stop(self):
@@ -118,9 +115,10 @@ class SoledadSynchronizer(Synchronizer):
" target generation: %d\n"
" target trans id: %s\n"
" target my gen: %d\n"
- " target my trans_id: %s"
+ " target my trans_id: %s\n"
+ " source replica_uid: %s\n"
% (self.target_replica_uid, target_gen, target_trans_id,
- target_my_gen, target_my_trans_id))
+ target_my_gen, target_my_trans_id, self.source._replica_uid))
# make sure we'll have access to target replica uid once it exists
if self.target_replica_uid is None:
@@ -236,6 +234,8 @@ class SoledadSynchronizer(Synchronizer):
# release if something in the syncdb-decrypt goes wrong. we could keep
# track of the release date and cleanup unrealistic sync entries after
# some time.
+
+ # TODO use cancellable deferreds instead
locked = self.syncing_lock.locked()
return locked
diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py
index 651d3ee5..dd61c070 100644
--- a/client/src/leap/soledad/client/target.py
+++ b/client/src/leap/soledad/client/target.py
@@ -14,14 +14,10 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-
"""
A U1DB backend for encrypting data before sending to server and decrypting
after receiving.
"""
-
-
import cStringIO
import gzip
import logging
@@ -34,7 +30,7 @@ from time import sleep
from uuid import uuid4
import simplejson as json
-from taskthread import TimerTask
+
from u1db import errors
from u1db.remote import utils, http_errors
from u1db.remote.http_target import HTTPSyncTarget
@@ -42,6 +38,8 @@ from u1db.remote.http_client import _encode_query_parameter, HTTPClientBase
from zope.proxy import ProxyBase
from zope.proxy import sameProxiedObjects, setProxiedObject
+from twisted.internet.task import LoopingCall
+
from leap.soledad.common.document import SoledadDocument
from leap.soledad.client.auth import TokenBasedAuth
from leap.soledad.client.crypto import is_symmetrically_encrypted
@@ -190,7 +188,7 @@ class DocumentSyncerThread(threading.Thread):
self._doc_syncer.failure_callback(
self._idx, self._total, self._exception)
- self._failed_method(self)
+ self._failed_method()
# we do not release the callback lock here because we
# failed and so we don't want other threads to succeed.
@@ -350,7 +348,7 @@ class DocumentSyncerPool(object):
self._threads.remove(syncer_thread)
self._semaphore_pool.release()
- def cancel_threads(self, calling_thread):
+ def cancel_threads(self):
"""
Stop all threads in the pool.
"""
@@ -755,7 +753,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
"""
Period of recurrence of the periodic decrypting task, in seconds.
"""
- DECRYPT_TASK_PERIOD = 0.5
+ DECRYPT_LOOP_PERIOD = 0.5
#
# Modified HTTPSyncTarget methods.
@@ -796,13 +794,14 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
self._sync_exchange_lock = threading.Lock()
self.source_replica_uid = source_replica_uid
self._defer_decryption = False
+ self._syncer_pool = None
# deferred decryption attributes
self._sync_db = None
self._sync_db_write_lock = None
self._decryption_callback = None
self._sync_decr_pool = None
- self._sync_watcher = None
+ self._sync_loop = None
if sync_db and sync_db_write_lock is not None:
self._sync_db = sync_db
self._sync_db_write_lock = sync_db_write_lock
@@ -828,23 +827,22 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
self._sync_decr_pool.close()
self._sync_decr_pool = None
- def _setup_sync_watcher(self):
+ def _setup_sync_loop(self):
"""
- Set up the sync watcher for deferred decryption.
+ Set up the sync loop for deferred decryption.
"""
- if self._sync_watcher is None:
- self._sync_watcher = TimerTask(
- self._decrypt_syncing_received_docs,
- delay=self.DECRYPT_TASK_PERIOD)
+ if self._sync_loop is None:
+ self._sync_loop = LoopingCall(
+ self._decrypt_syncing_received_docs)
+ self._sync_loop.start(self.DECRYPT_LOOP_PERIOD)
- def _teardown_sync_watcher(self):
+ def _teardown_sync_loop(self):
"""
- Tear down the sync watcher.
+ Tear down the sync loop.
"""
- if self._sync_watcher is not None:
- self._sync_watcher.stop()
- self._sync_watcher.shutdown()
- self._sync_watcher = None
+ if self._sync_loop is not None:
+ self._sync_loop.stop()
+ self._sync_loop = None
def _get_replica_uid(self, url):
"""
@@ -955,7 +953,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
def _get_remote_docs(self, url, last_known_generation, last_known_trans_id,
headers, return_doc_cb, ensure_callback, sync_id,
- syncer_pool, defer_decryption=False):
+ defer_decryption=False):
"""
Fetch sync documents from the remote database and insert them in the
local database.
@@ -1016,7 +1014,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
break
# launch a thread to fetch one document from target
- t = syncer_pool.new_syncer_thread(
+ t = self._syncer_pool.new_syncer_thread(
idx, number_of_changes,
last_callback_lock=last_callback_lock)
@@ -1050,6 +1048,8 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
t.join()
if t.success:
number_of_changes, _, _ = t.result
+ else:
+ raise t.exception
first_request = False
# make sure all threads finished and we have up-to-date info
@@ -1060,6 +1060,8 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
t.join()
if t.success:
last_successful_thread = t
+ else:
+ raise t.exception
# get information about last successful thread
if last_successful_thread is not None:
@@ -1131,7 +1133,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
if defer_decryption and self._sync_db is not None:
self._sync_exchange_lock.acquire()
self._setup_sync_decr_pool()
- self._setup_sync_watcher()
+ self._setup_sync_loop()
self._defer_decryption = True
else:
# fall back
@@ -1165,9 +1167,9 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
logger.debug("Soledad sync send status: %s" % msg)
defer_encryption = self._sync_db is not None
- syncer_pool = DocumentSyncerPool(
+ self._syncer_pool = DocumentSyncerPool(
self._raw_url, self._raw_creds, url, headers, ensure_callback,
- self.stop)
+ self.stop_syncer)
threads = []
last_callback_lock = None
sent = 0
@@ -1212,7 +1214,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
# -------------------------------------------------------------
# end of symmetric encryption
# -------------------------------------------------------------
- t = syncer_pool.new_syncer_thread(
+ t = self._syncer_pool.new_syncer_thread(
sent + 1, total, last_request_lock=last_request_lock,
last_callback_lock=last_callback_lock)
@@ -1267,6 +1269,8 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
if t.success:
synced.append((doc.doc_id, doc.rev))
last_successful_thread = t
+ else:
+ raise t.exception
# delete documents from the sync database
if defer_encryption:
@@ -1285,17 +1289,16 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
cur_target_gen, cur_target_trans_id = self._get_remote_docs(
url,
last_known_generation, last_known_trans_id, headers,
- return_doc_cb, ensure_callback, sync_id, syncer_pool,
+ return_doc_cb, ensure_callback, sync_id,
defer_decryption=defer_decryption)
- syncer_pool.cleanup()
+ self._syncer_pool.cleanup()
# decrypt docs in case of deferred decryption
if defer_decryption:
- self._sync_watcher.start()
while self.clear_to_sync() is False:
- sleep(self.DECRYPT_TASK_PERIOD)
- self._teardown_sync_watcher()
+ sleep(self.DECRYPT_LOOP_PERIOD)
+ self._teardown_sync_loop()
self._teardown_sync_decr_pool()
self._sync_exchange_lock.release()
@@ -1306,6 +1309,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
cur_target_trans_id = trans_id_after_send
self.stop()
+ self._syncer_pool = None
return cur_target_gen, cur_target_trans_id
def start(self):
@@ -1315,6 +1319,11 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
with self._stop_lock:
self._stopped = False
+
+ def stop_syncer(self):
+ with self._stop_lock:
+ self._stopped = True
+
def stop(self):
"""
Mark current sync session as stopped.
@@ -1323,8 +1332,9 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
enough information to the synchronizer so the sync session can be
recovered afterwards.
"""
- with self._stop_lock:
- self._stopped = True
+ self.stop_syncer()
+ if self._syncer_pool:
+ self._syncer_pool.cancel_threads()
@property
def stopped(self):
@@ -1351,11 +1361,11 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
encr = SyncEncrypterPool
sql = ("SELECT content FROM %s WHERE doc_id=? and rev=?" % (
encr.TABLE_NAME,))
- res = self._sync_db.select(sql, (doc_id, doc_rev))
- try:
- val = res.next()
+ res = self._fetchall(sql, (doc_id, doc_rev))
+ if res:
+ val = res.pop()
return val[0]
- except StopIteration:
+ else:
# no doc found
return None
@@ -1460,7 +1470,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
Decrypt the documents received from remote replica and insert them
into the local one.
- Called periodically from TimerTask self._sync_watcher.
+ Called periodically from LoopingCall self._sync_loop.
"""
if sameProxiedObjects(
self._insert_doc_cb.get(self.source_replica_uid),
@@ -1497,3 +1507,9 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
:type token: str
"""
TokenBasedAuth.set_token_credentials(self, uuid, token)
+
+ def _fetchall(self, *args, **kwargs):
+ with self._sync_db:
+ c = self._sync_db.cursor()
+ c.execute(*args, **kwargs)
+ return c.fetchall()