summaryrefslogtreecommitdiff
path: root/src/leap/soledad/client
diff options
context:
space:
mode:
Diffstat (limited to 'src/leap/soledad/client')
-rw-r--r--src/leap/soledad/client/__init__.py30
-rw-r--r--src/leap/soledad/client/_crypto.py557
-rw-r--r--src/leap/soledad/client/_db/__init__.py0
-rw-r--r--src/leap/soledad/client/_db/adbapi.py298
-rw-r--r--src/leap/soledad/client/_db/blobs.py554
-rw-r--r--src/leap/soledad/client/_db/dbschema.sql42
-rw-r--r--src/leap/soledad/client/_db/pragmas.py379
-rw-r--r--src/leap/soledad/client/_db/sqlcipher.py633
-rw-r--r--src/leap/soledad/client/_db/sqlite.py930
-rw-r--r--src/leap/soledad/client/_document.py254
-rw-r--r--src/leap/soledad/client/_http.py74
-rw-r--r--src/leap/soledad/client/_pipes.py78
-rw-r--r--src/leap/soledad/client/_recovery_code.py33
-rw-r--r--src/leap/soledad/client/_secrets/__init__.py129
-rw-r--r--src/leap/soledad/client/_secrets/crypto.py138
-rw-r--r--src/leap/soledad/client/_secrets/storage.py120
-rw-r--r--src/leap/soledad/client/_secrets/util.py63
-rw-r--r--src/leap/soledad/client/api.py848
-rw-r--r--src/leap/soledad/client/auth.py69
-rw-r--r--src/leap/soledad/client/crypto.py448
-rw-r--r--src/leap/soledad/client/events.py54
-rw-r--r--src/leap/soledad/client/examples/README4
-rw-r--r--src/leap/soledad/client/examples/benchmarks/.gitignore1
-rwxr-xr-xsrc/leap/soledad/client/examples/benchmarks/get_sample.sh3
-rw-r--r--src/leap/soledad/client/examples/benchmarks/measure_index_times.py179
-rw-r--r--src/leap/soledad/client/examples/benchmarks/measure_index_times_custom_docid.py179
-rw-r--r--src/leap/soledad/client/examples/compare.txt8
-rw-r--r--src/leap/soledad/client/examples/manifest.phk50
-rw-r--r--src/leap/soledad/client/examples/plot-async-db.py45
-rw-r--r--src/leap/soledad/client/examples/run_benchmark.py30
-rw-r--r--src/leap/soledad/client/examples/soledad_sync.py63
-rw-r--r--src/leap/soledad/client/examples/use_adbapi.py105
-rw-r--r--src/leap/soledad/client/examples/use_api.py69
-rw-r--r--src/leap/soledad/client/http_target/__init__.py94
-rw-r--r--src/leap/soledad/client/http_target/api.py248
-rw-r--r--src/leap/soledad/client/http_target/fetch.py161
-rw-r--r--src/leap/soledad/client/http_target/fetch_protocol.py157
-rw-r--r--src/leap/soledad/client/http_target/send.py107
-rw-r--r--src/leap/soledad/client/http_target/send_protocol.py75
-rw-r--r--src/leap/soledad/client/http_target/support.py220
-rw-r--r--src/leap/soledad/client/interfaces.py368
-rw-r--r--src/leap/soledad/client/shared_db.py134
-rw-r--r--src/leap/soledad/client/sync.py231
43 files changed, 8262 insertions, 0 deletions
diff --git a/src/leap/soledad/client/__init__.py b/src/leap/soledad/client/__init__.py
new file mode 100644
index 00000000..bcad78db
--- /dev/null
+++ b/src/leap/soledad/client/__init__.py
@@ -0,0 +1,30 @@
+# -*- coding: utf-8 -*-
+# __init__.py
+# Copyright (C) 2013, 2014 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Soledad - Synchronization Of Locally Encrypted Data Among Devices.
+"""
+from leap.soledad.common import soledad_assert
+
+from .api import Soledad
+from ._document import Document, AttachmentStates
+from ._version import get_versions
+
+__version__ = get_versions()['version']
+del get_versions
+
+__all__ = ['soledad_assert', 'Soledad', 'Document', 'AttachmentStates',
+ '__version__']
diff --git a/src/leap/soledad/client/_crypto.py b/src/leap/soledad/client/_crypto.py
new file mode 100644
index 00000000..8cedf52e
--- /dev/null
+++ b/src/leap/soledad/client/_crypto.py
@@ -0,0 +1,557 @@
+# -*- coding: utf-8 -*-
+# _crypto.py
+# Copyright (C) 2016 LEAP Encryption Access Project
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+"""
+Cryptographic operations for the soledad client.
+
+This module implements streaming crypto operations.
+It replaces the old client.crypto module, that will be deprecated in soledad
+0.12.
+
+The algorithm for encrypting and decrypting is as follow:
+
+The KEY is a 32 bytes value.
+The IV is a random 16 bytes value.
+The PREAMBLE is a packed_structure with encryption metadata, such as IV.
+The SEPARATOR is a space.
+
+Encryption
+----------
+
+IV = os.urandom(16)
+PREAMBLE = BLOB_SIGNATURE_MAGIC, ENC_SCHEME, ENC_METHOD, time, IV, doc_id, rev,
+and size.
+
+PREAMBLE = base64_encoded(PREAMBLE)
+CIPHERTEXT = base64_encoded(AES_GCM(KEY, cleartext) + resulting_tag) if armor
+
+CIPHERTEXT = AES_GCM(KEY, cleartext) + resulting_tag if not armor
+# "resulting_tag" came from AES-GCM encryption. It will be the last 16 bytes of
+# our ciphertext.
+
+encrypted_payload = PREAMBLE + SEPARATOR + CIPHERTEXT
+
+Decryption
+----------
+
+Ciphertext and Tag CAN come encoded in base64 (with armor=True) or raw (with
+armor=False). Preamble will always come encoded in base64.
+
+PREAMBLE, CIPHERTEXT = PAYLOAD.SPLIT(' ', 1)
+
+PREAMBLE = base64_decode(PREAMBLE)
+CIPHERTEXT = base64_decode(CIPHERTEXT) if armor else CIPHERTEXT
+
+CIPHERTEXT, TAG = CIPHERTEXT[:-16], CIPHERTEXT[-16:]
+CLEARTEXT = aes_gcm_decrypt(KEY, IV, CIPHERTEXT, TAG, associated_data=PREAMBLE)
+
+AES-GCM will check preamble authenticity as well, since we are using
+Authenticated Encryption with Associated Data (AEAD). Ciphertext and associated
+data (PREAMBLE) authenticity will both be checked together during decryption.
+PREAMBLE consistency (if it matches the desired document, for instance) is
+checked during PREAMBLE reading.
+"""
+
+
+import base64
+import hashlib
+import warnings
+import hmac
+import os
+import struct
+import time
+
+from io import BytesIO
+from collections import namedtuple
+
+from twisted.internet import defer
+from twisted.internet import interfaces
+from twisted.web.client import FileBodyProducer
+
+from leap.soledad.common import soledad_assert
+from cryptography.exceptions import InvalidTag
+from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
+from cryptography.hazmat.backends import default_backend
+
+from zope.interface import implementer
+
+
+SECRET_LENGTH = 64
+SEPARATOR = ' ' # Anything that doesn't belong to base64 encoding
+
+CRYPTO_BACKEND = default_backend()
+
+PACMAN = struct.Struct('2sbbQ16s255p255pQ')
+LEGACY_PACMAN = struct.Struct('2sbbQ16s255p255p')
+BLOB_SIGNATURE_MAGIC = '\x13\x37'
+
+
+ENC_SCHEME = namedtuple('SCHEME', 'symkey')(1)
+ENC_METHOD = namedtuple('METHOD', 'aes_256_ctr aes_256_gcm')(1, 2)
+DocInfo = namedtuple('DocInfo', 'doc_id rev')
+
+
+class EncryptionDecryptionError(Exception):
+ pass
+
+
+class InvalidBlob(Exception):
+ pass
+
+
+class SoledadCrypto(object):
+ """
+ This class provides convenient methods for document encryption and
+ decryption using BlobEncryptor and BlobDecryptor classes.
+ """
+ def __init__(self, secret):
+ """
+ Initialize the crypto object.
+
+ :param secret: The Soledad remote storage secret.
+ :type secret: str
+ """
+ self.secret = secret
+
+ def encrypt_doc(self, doc):
+ """
+ Creates and configures a BlobEncryptor, asking it to start encryption
+ and wrapping the result as a simple JSON string with a "raw" key.
+
+ :param doc: the document to be encrypted.
+ :type doc: Document
+ :return: A deferred whose callback will be invoked with a JSON string
+ containing the ciphertext as the value of "raw" key.
+ :rtype: twisted.internet.defer.Deferred
+ """
+
+ def put_raw(blob):
+ raw = blob.getvalue()
+ return '{"raw": "' + raw + '"}'
+
+ content = BytesIO(str(doc.get_json()))
+ info = DocInfo(doc.doc_id, doc.rev)
+ del doc
+ encryptor = BlobEncryptor(info, content, secret=self.secret)
+ d = encryptor.encrypt()
+ d.addCallback(put_raw)
+ return d
+
+ def decrypt_doc(self, doc):
+ """
+ Creates and configures a BlobDecryptor, asking it decrypt and returning
+ the decrypted cleartext content from the encrypted document.
+
+ :param doc: the document to be decrypted.
+ :type doc: Document
+ :return: The decrypted cleartext content of the document.
+ :rtype: str
+ """
+ info = DocInfo(doc.doc_id, doc.rev)
+ ciphertext = BytesIO()
+ payload = doc.content['raw']
+ del doc
+ ciphertext.write(str(payload))
+ decryptor = BlobDecryptor(info, ciphertext, secret=self.secret)
+ return decryptor.decrypt()
+
+
+def encrypt_sym(data, key, method=ENC_METHOD.aes_256_gcm):
+ """
+ Encrypt data using AES-256 cipher in selected mode.
+
+ :param data: The data to be encrypted.
+ :type data: str
+ :param key: The key used to encrypt data (must be 256 bits long).
+ :type key: str
+
+ :return: A tuple with the initialization vector and the ciphertext, both
+ encoded as base64.
+ :rtype: (str, str)
+ """
+ mode = _mode_by_method(method)
+ encryptor = AESWriter(key, mode=mode)
+ encryptor.write(data)
+ _, ciphertext = encryptor.end()
+ iv = base64.b64encode(encryptor.iv)
+ tag = encryptor.tag or ''
+ return iv, ciphertext + tag
+
+
+def decrypt_sym(data, key, iv, method=ENC_METHOD.aes_256_gcm):
+ """
+ Decrypt data using AES-256 cipher in selected mode.
+
+ :param data: The data to be decrypted.
+ :type data: str
+ :param key: The symmetric key used to decrypt data (must be 256 bits
+ long).
+ :type key: str
+ :param iv: The base64 encoded initialization vector.
+ :type iv: str
+
+ :return: The decrypted data.
+ :rtype: str
+ """
+ _iv = base64.b64decode(str(iv))
+ mode = _mode_by_method(method)
+ tag = None
+ if mode == modes.GCM:
+ data, tag = data[:-16], data[-16:]
+ decryptor = AESWriter(key, _iv, tag=tag, mode=mode)
+ decryptor.write(data)
+ _, plaintext = decryptor.end()
+ return plaintext
+
+
+# TODO maybe rename this to Encryptor, since it will be used by blobs an non
+# blobs in soledad.
+class BlobEncryptor(object):
+ """
+ Produces encrypted data from the cleartext data associated with a given
+ Document using AES-256 cipher in GCM mode.
+
+ The production happens using a Twisted's FileBodyProducer, which uses a
+ Cooperator to schedule calls and can be paused/resumed. Each call takes at
+ most 65536 bytes from the input.
+
+ Both the production input and output are file descriptors, so they can be
+ applied to a stream of data.
+ """
+ # TODO
+ # This class needs further work to allow for proper streaming.
+ # Right now we HAVE TO WAIT until the end of the stream before encoding the
+ # result. It should be possible to do that just encoding the chunks and
+ # passing them to a sink, but for that we have to encode the chunks at
+ # proper alignment (3 bytes?) with b64 if armor is defined.
+
+ def __init__(self, doc_info, content_fd, secret=None, armor=True,
+ sink=None):
+ if not secret:
+ raise EncryptionDecryptionError('no secret given')
+
+ self.doc_id = doc_info.doc_id
+ self.rev = doc_info.rev
+ self.armor = armor
+
+ self._content_fd = content_fd
+ self._content_size = self._get_rounded_size(content_fd)
+ self._producer = FileBodyProducer(content_fd, readSize=2**16)
+
+ self.sym_key = _get_sym_key_for_doc(doc_info.doc_id, secret)
+ self._aes = AESWriter(self.sym_key, _buffer=sink)
+ self._aes.authenticate(self._encode_preamble())
+
+ def _get_rounded_size(self, fd):
+ """
+ Returns a rounded value in order to minimize information leaks due to
+ the original size being exposed.
+ """
+ fd.seek(0, os.SEEK_END)
+ size = _ceiling(fd.tell())
+ fd.seek(0)
+ return size
+
+ @property
+ def iv(self):
+ return self._aes.iv
+
+ @property
+ def tag(self):
+ return self._aes.tag
+
+ def encrypt(self):
+ """
+ Starts producing encrypted data from the cleartext data.
+
+ :return: A deferred which will be fired when encryption ends and whose
+ callback will be invoked with the resulting ciphertext.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ # XXX pass a sink to aes?
+ d = self._producer.startProducing(self._aes)
+ d.addCallback(lambda _: self._end_crypto_stream_and_encode_result())
+ return d
+
+ def _encode_preamble(self):
+ current_time = int(time.time())
+
+ preamble = PACMAN.pack(
+ BLOB_SIGNATURE_MAGIC,
+ ENC_SCHEME.symkey,
+ ENC_METHOD.aes_256_gcm,
+ current_time,
+ self.iv,
+ str(self.doc_id),
+ str(self.rev),
+ self._content_size)
+ return preamble
+
+ def _end_crypto_stream_and_encode_result(self):
+
+ # TODO ---- this needs to be refactored to allow PROPER streaming
+ # We should write the preamble as soon as possible,
+ # Is it possible to write the AES stream as soon as it is encrypted by
+ # chunks?
+ # FIXME also, it needs to be able to encode chunks with base64 if armor
+
+ preamble, encrypted = self._aes.end()
+ result = BytesIO()
+ result.write(
+ base64.urlsafe_b64encode(preamble))
+ result.write(SEPARATOR)
+
+ if self.armor:
+ result.write(
+ base64.urlsafe_b64encode(encrypted + self.tag))
+ else:
+ result.write(encrypted + self.tag)
+
+ result.seek(0)
+ return defer.succeed(result)
+
+
+# TODO maybe rename this to just Decryptor, since it will be used by blobs
+# and non blobs in soledad.
+class BlobDecryptor(object):
+ """
+ Decrypts an encrypted blob associated with a given Document.
+
+ Will raise an exception if the blob doesn't have the expected structure, or
+ if the GCM tag doesn't verify.
+ """
+ def __init__(self, doc_info, ciphertext_fd, result=None,
+ secret=None, armor=True, start_stream=True, tag=None):
+ if not secret:
+ raise EncryptionDecryptionError('no secret given')
+
+ self.doc_id = doc_info.doc_id
+ self.rev = doc_info.rev
+ self.fd = ciphertext_fd
+ self.armor = armor
+ self._producer = None
+ self.result = result or BytesIO()
+ sym_key = _get_sym_key_for_doc(doc_info.doc_id, secret)
+ self.size = None
+ self.tag = None
+
+ preamble, iv = self._consume_preamble()
+ soledad_assert(preamble)
+ soledad_assert(iv)
+
+ self._aes = AESWriter(sym_key, iv, self.result, tag=tag or self.tag)
+ self._aes.authenticate(preamble)
+ if start_stream:
+ self._start_stream()
+
+ @property
+ def decrypted_content_size(self):
+ return self._aes.written
+
+ def _start_stream(self):
+ self._producer = FileBodyProducer(self.fd, readSize=2**16)
+
+ def _consume_preamble(self):
+ """
+ Consume the preamble and write remaining bytes as ciphertext. This
+ function is called during a stream and can be holding both, so we need
+ to consume only preamble and store the remaining.
+ """
+ self.fd.seek(0)
+ try:
+ parts = self.fd.getvalue().split(SEPARATOR, 1)
+ preamble = base64.urlsafe_b64decode(parts[0])
+ if len(parts) == 2:
+ ciphertext = parts[1]
+ if self.armor:
+ ciphertext = base64.urlsafe_b64decode(ciphertext)
+ self.tag, ciphertext = ciphertext[-16:], ciphertext[:-16]
+ self.fd.seek(0)
+ self.fd.write(ciphertext)
+ self.fd.seek(len(ciphertext))
+ self.fd.truncate()
+ self.fd.seek(0)
+
+ except (TypeError, ValueError):
+ raise InvalidBlob
+
+ try:
+ if len(preamble) == LEGACY_PACMAN.size:
+ warnings.warn("Decrypting a legacy document without size. " +
+ "This will be deprecated in 0.12. Doc was: " +
+ "doc_id: %s rev: %s" % (self.doc_id, self.rev),
+ Warning)
+ unpacked_data = LEGACY_PACMAN.unpack(preamble)
+ magic, sch, meth, ts, iv, doc_id, rev = unpacked_data
+ elif len(preamble) == PACMAN.size:
+ unpacked_data = PACMAN.unpack(preamble)
+ magic, sch, meth, ts, iv, doc_id, rev, doc_size = unpacked_data
+ self.size = doc_size
+ else:
+ raise InvalidBlob("Unexpected preamble size %d", len(preamble))
+ except struct.error as e:
+ raise InvalidBlob(e)
+
+ if magic != BLOB_SIGNATURE_MAGIC:
+ raise InvalidBlob
+ # TODO check timestamp. Just as a sanity check, but for instance
+ # we can refuse to process something that is in the future or
+ # too far in the past (1984 would be nice, hehe)
+ if sch != ENC_SCHEME.symkey:
+ raise InvalidBlob('Invalid scheme: %s' % sch)
+ if meth != ENC_METHOD.aes_256_gcm:
+ raise InvalidBlob('Invalid encryption scheme: %s' % meth)
+ if rev != self.rev:
+ msg = 'Invalid revision. Expected: %s, was: %s' % (self.rev, rev)
+ raise InvalidBlob(msg)
+ if doc_id != self.doc_id:
+ msg = 'Invalid doc_id. '
+ + 'Expected: %s, was: %s' % (self.doc_id, doc_id)
+ raise InvalidBlob(msg)
+
+ return preamble, iv
+
+ def _end_stream(self):
+ try:
+ self._aes.end()
+ except InvalidTag:
+ raise InvalidBlob('Invalid Tag. Blob authentication failed.')
+ fd = self.result
+ fd.seek(0)
+ return self.result
+
+ def decrypt(self):
+ """
+ Starts producing encrypted data from the cleartext data.
+
+ :return: A deferred which will be fired when encryption ends and whose
+ callback will be invoked with the resulting ciphertext.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ d = self.startProducing()
+ d.addCallback(lambda _: self._end_stream())
+ return d
+
+ def startProducing(self):
+ if not self._producer:
+ self._start_stream()
+ return self._producer.startProducing(self._aes)
+
+ def endStream(self):
+ self._end_stream()
+
+ def write(self, data):
+ self._aes.write(data)
+
+ def close(self):
+ result = self._aes.end()
+ return result
+
+
+@implementer(interfaces.IConsumer)
+class AESWriter(object):
+ """
+ A Twisted's Consumer implementation that takes an input file descriptor and
+ applies AES-256 cipher in GCM mode.
+
+ It is used both for encryption and decryption of a stream, depending of the
+ value of the tag parameter. If you pass a tag, it will operate in
+ decryption mode, verifying the authenticity of the preamble and ciphertext.
+ If no tag is passed, encryption mode is assumed, which will generate a tag.
+ """
+
+ def __init__(self, key, iv=None, _buffer=None, tag=None, mode=modes.GCM):
+ if len(key) != 32:
+ raise EncryptionDecryptionError('key is not 256 bits')
+
+ if tag is not None:
+ # if tag, we're decrypting
+ assert iv is not None
+
+ self.iv = iv or os.urandom(16)
+ self.buffer = _buffer or BytesIO()
+ cipher = _get_aes_cipher(key, self.iv, tag, mode)
+ cipher = cipher.decryptor() if tag else cipher.encryptor()
+ self.cipher, self.aead = cipher, ''
+ self.written = 0
+
+ def authenticate(self, data):
+ self.aead += data
+ self.cipher.authenticate_additional_data(data)
+
+ @property
+ def tag(self):
+ return getattr(self.cipher, 'tag', None)
+
+ def write(self, data):
+ self.written += len(data)
+ self.buffer.write(self.cipher.update(data))
+
+ def end(self):
+ self.buffer.write(self.cipher.finalize())
+ return self.aead, self.buffer.getvalue()
+
+
+def is_symmetrically_encrypted(content):
+ """
+ Returns True if the document was symmetrically encrypted.
+ 'EzcB' is the base64 encoding of \x13\x37 magic number and 1 (symmetrically
+ encrypted value for enc_scheme flag).
+
+ :param doc: The document content as string
+ :type doc: str
+
+ :rtype: bool
+ """
+ sym_signature = '{"raw": "EzcB'
+ return content and content.startswith(sym_signature)
+
+
+# utils
+
+
+def _hmac_sha256(key, data):
+ return hmac.new(key, data, hashlib.sha256).digest()
+
+
+def _get_sym_key_for_doc(doc_id, secret):
+ key = secret[SECRET_LENGTH:]
+ return _hmac_sha256(key, doc_id)
+
+
+def _get_aes_cipher(key, iv, tag, mode=modes.GCM):
+ mode = mode(iv, tag) if mode == modes.GCM else mode(iv)
+ return Cipher(algorithms.AES(key), mode, backend=CRYPTO_BACKEND)
+
+
+def _mode_by_method(method):
+ if method == ENC_METHOD.aes_256_gcm:
+ return modes.GCM
+ else:
+ return modes.CTR
+
+
+def _ceiling(size):
+ """
+ Some simplistic ceiling scheme that uses powers of 2.
+ We report everything below 4096 bytes as that minimum threshold.
+ See #8759 for research pending for less simplistic/aggresive strategies.
+ """
+ for i in xrange(12, 31):
+ step = 2 ** i
+ if size < step:
+ return step
diff --git a/src/leap/soledad/client/_db/__init__.py b/src/leap/soledad/client/_db/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/src/leap/soledad/client/_db/__init__.py
diff --git a/src/leap/soledad/client/_db/adbapi.py b/src/leap/soledad/client/_db/adbapi.py
new file mode 100644
index 00000000..5c28d108
--- /dev/null
+++ b/src/leap/soledad/client/_db/adbapi.py
@@ -0,0 +1,298 @@
+# -*- coding: utf-8 -*-
+# adbapi.py
+# Copyright (C) 2013, 2014 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+An asyncrhonous interface to soledad using sqlcipher backend.
+It uses twisted.enterprise.adbapi.
+"""
+import re
+import sys
+
+from functools import partial
+
+from twisted.enterprise import adbapi
+from twisted.internet.defer import DeferredSemaphore
+from twisted.python import compat
+from zope.proxy import ProxyBase, setProxiedObject
+
+from leap.soledad.common.log import getLogger
+from leap.soledad.common.errors import DatabaseAccessError
+
+from . import sqlcipher
+from . import pragmas
+
+if sys.version_info[0] < 3:
+ from pysqlcipher import dbapi2
+else:
+ from pysqlcipher3 import dbapi2
+
+
+logger = getLogger(__name__)
+
+
+"""
+How long the SQLCipher connection should wait for the lock to go away until
+raising an exception.
+"""
+SQLCIPHER_CONNECTION_TIMEOUT = 10
+
+"""
+How many times a SQLCipher query should be retried in case of timeout.
+"""
+SQLCIPHER_MAX_RETRIES = 20
+
+
+def getConnectionPool(opts, openfun=None, driver="pysqlcipher"):
+ """
+ Return a connection pool.
+
+ :param opts:
+ Options for the SQLCipher connection.
+ :type opts: SQLCipherOptions
+ :param openfun:
+ Callback invoked after every connect() on the underlying DB-API
+ object.
+ :type openfun: callable
+ :param driver:
+ The connection driver.
+ :type driver: str
+
+ :return: A U1DB connection pool.
+ :rtype: U1DBConnectionPool
+ """
+ if openfun is None and driver == "pysqlcipher":
+ openfun = partial(pragmas.set_init_pragmas, opts=opts)
+ return U1DBConnectionPool(
+ opts,
+ # the following params are relayed "as is" to twisted's
+ # ConnectionPool.
+ "%s.dbapi2" % driver, opts.path, timeout=SQLCIPHER_CONNECTION_TIMEOUT,
+ check_same_thread=False, cp_openfun=openfun)
+
+
+class U1DBConnection(adbapi.Connection):
+ """
+ A wrapper for a U1DB connection instance.
+ """
+
+ u1db_wrapper = sqlcipher.SoledadSQLCipherWrapper
+ """
+ The U1DB wrapper to use.
+ """
+
+ def __init__(self, pool, init_u1db=False):
+ """
+ :param pool: The pool of connections to that owns this connection.
+ :type pool: adbapi.ConnectionPool
+ :param init_u1db: Wether the u1db database should be initialized.
+ :type init_u1db: bool
+ """
+ self.init_u1db = init_u1db
+ try:
+ adbapi.Connection.__init__(self, pool)
+ except dbapi2.DatabaseError as e:
+ raise DatabaseAccessError(
+ 'Error initializing connection to sqlcipher database: %s'
+ % str(e))
+
+ def reconnect(self):
+ """
+ Reconnect to the U1DB database.
+ """
+ if self._connection is not None:
+ self._pool.disconnect(self._connection)
+ self._connection = self._pool.connect()
+
+ if self.init_u1db:
+ self._u1db = self.u1db_wrapper(
+ self._connection,
+ self._pool.opts)
+
+ def __getattr__(self, name):
+ """
+ Route the requested attribute either to the U1DB wrapper or to the
+ connection.
+
+ :param name: The name of the attribute.
+ :type name: str
+ """
+ if name.startswith('u1db_'):
+ attr = re.sub('^u1db_', '', name)
+ return getattr(self._u1db, attr)
+ else:
+ return getattr(self._connection, name)
+
+
+class U1DBTransaction(adbapi.Transaction):
+ """
+ A wrapper for a U1DB 'cursor' object.
+ """
+
+ def __getattr__(self, name):
+ """
+ Route the requested attribute either to the U1DB wrapper of the
+ connection or to the actual connection cursor.
+
+ :param name: The name of the attribute.
+ :type name: str
+ """
+ if name.startswith('u1db_'):
+ attr = re.sub('^u1db_', '', name)
+ return getattr(self._connection._u1db, attr)
+ else:
+ return getattr(self._cursor, name)
+
+
+class U1DBConnectionPool(adbapi.ConnectionPool):
+ """
+ Represent a pool of connections to an U1DB database.
+ """
+
+ connectionFactory = U1DBConnection
+ transactionFactory = U1DBTransaction
+
+ def __init__(self, opts, *args, **kwargs):
+ """
+ Initialize the connection pool.
+ """
+ self.opts = opts
+ try:
+ adbapi.ConnectionPool.__init__(self, *args, **kwargs)
+ except dbapi2.DatabaseError as e:
+ raise DatabaseAccessError(
+ 'Error initializing u1db connection pool: %s' % str(e))
+
+ # all u1db connections, hashed by thread-id
+ self._u1dbconnections = {}
+
+ # The replica uid, primed by the connections on init.
+ self.replica_uid = ProxyBase(None)
+
+ try:
+ conn = self.connectionFactory(
+ self, init_u1db=True)
+ replica_uid = conn._u1db._real_replica_uid
+ setProxiedObject(self.replica_uid, replica_uid)
+ except DatabaseAccessError as e:
+ self.threadpool.stop()
+ raise DatabaseAccessError(
+ "Error initializing connection factory: %s" % str(e))
+
+ def runU1DBQuery(self, meth, *args, **kw):
+ """
+ Execute a U1DB query in a thread, using a pooled connection.
+
+ Concurrent threads trying to update the same database may timeout
+ because of other threads holding the database lock. Because of this,
+ we will retry SQLCIPHER_MAX_RETRIES times and fail after that.
+
+ :param meth: The U1DB wrapper method name.
+ :type meth: str
+
+ :return: a Deferred which will fire the return value of
+ 'self._runU1DBQuery(Transaction(...), *args, **kw)', or a Failure.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ meth = "u1db_%s" % meth
+ semaphore = DeferredSemaphore(SQLCIPHER_MAX_RETRIES)
+
+ def _run_interaction():
+ return self.runInteraction(
+ self._runU1DBQuery, meth, *args, **kw)
+
+ def _errback(failure):
+ failure.trap(dbapi2.OperationalError)
+ if failure.getErrorMessage() == "database is locked":
+ logger.warn("database operation timed out")
+ should_retry = semaphore.acquire()
+ if should_retry:
+ logger.warn("trying again...")
+ return _run_interaction()
+ logger.warn("giving up!")
+ return failure
+
+ d = _run_interaction()
+ d.addErrback(_errback)
+ return d
+
+ def _runU1DBQuery(self, trans, meth, *args, **kw):
+ """
+ Execute a U1DB query.
+
+ :param trans: An U1DB transaction.
+ :type trans: adbapi.Transaction
+ :param meth: the U1DB wrapper method name.
+ :type meth: str
+ """
+ meth = getattr(trans, meth)
+ return meth(*args, **kw)
+ # XXX should return a fetchall?
+
+ # XXX add _runOperation too
+
+ def _runInteraction(self, interaction, *args, **kw):
+ """
+ Interact with the database and return the result.
+
+ :param interaction:
+ A callable object whose first argument is an
+ L{adbapi.Transaction}.
+ :type interaction: callable
+ :return: a Deferred which will fire the return value of
+ 'interaction(Transaction(...), *args, **kw)', or a Failure.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ tid = self.threadID()
+ u1db = self._u1dbconnections.get(tid)
+ conn = self.connectionFactory(
+ self, init_u1db=not bool(u1db))
+
+ if self.replica_uid is None:
+ replica_uid = conn._u1db._real_replica_uid
+ setProxiedObject(self.replica_uid, replica_uid)
+
+ if u1db is None:
+ self._u1dbconnections[tid] = conn._u1db
+ else:
+ conn._u1db = u1db
+
+ trans = self.transactionFactory(self, conn)
+ try:
+ result = interaction(trans, *args, **kw)
+ trans.close()
+ conn.commit()
+ return result
+ except:
+ excType, excValue, excTraceback = sys.exc_info()
+ try:
+ conn.rollback()
+ except:
+ logger.error(None, "Rollback failed")
+ compat.reraise(excValue, excTraceback)
+
+ def finalClose(self):
+ """
+ A final close, only called by the shutdown trigger.
+ """
+ self.shutdownID = None
+ if self.threadpool.started:
+ self.threadpool.stop()
+ self.running = False
+ for conn in self.connections.values():
+ self._close(conn)
+ for u1db in self._u1dbconnections.values():
+ self._close(u1db)
+ self.connections.clear()
diff --git a/src/leap/soledad/client/_db/blobs.py b/src/leap/soledad/client/_db/blobs.py
new file mode 100644
index 00000000..10b90c71
--- /dev/null
+++ b/src/leap/soledad/client/_db/blobs.py
@@ -0,0 +1,554 @@
+# -*- coding: utf-8 -*-
+# _blobs.py
+# Copyright (C) 2017 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Clientside BlobBackend Storage.
+"""
+
+from urlparse import urljoin
+
+import binascii
+import os
+import base64
+
+from io import BytesIO
+from functools import partial
+
+from twisted.logger import Logger
+from twisted.enterprise import adbapi
+from twisted.internet import defer
+from twisted.web.client import FileBodyProducer
+
+import treq
+
+from leap.soledad.common.errors import SoledadError
+from leap.common.files import mkdir_p
+
+from .._document import BlobDoc
+from .._crypto import DocInfo
+from .._crypto import BlobEncryptor
+from .._crypto import BlobDecryptor
+from .._http import HTTPClient
+from .._pipes import TruncatedTailPipe
+from .._pipes import PreamblePipe
+
+from . import pragmas
+from . import sqlcipher
+
+
+logger = Logger()
+FIXED_REV = 'ImmutableRevision' # Blob content is immutable
+
+
+class BlobAlreadyExistsError(SoledadError):
+ pass
+
+
+class ConnectionPool(adbapi.ConnectionPool):
+
+ def insertAndGetLastRowid(self, *args, **kwargs):
+ """
+ Execute an SQL query and return the last rowid.
+
+ See: https://sqlite.org/c3ref/last_insert_rowid.html
+ """
+ return self.runInteraction(
+ self._insertAndGetLastRowid, *args, **kwargs)
+
+ def _insertAndGetLastRowid(self, trans, *args, **kw):
+ trans.execute(*args, **kw)
+ return trans.lastrowid
+
+ def blob(self, table, column, irow, flags):
+ """
+ Open a BLOB for incremental I/O.
+
+ Return a handle to the BLOB that would be selected by:
+
+ SELECT column FROM table WHERE rowid = irow;
+
+ See: https://sqlite.org/c3ref/blob_open.html
+
+ :param table: The table in which to lookup the blob.
+ :type table: str
+ :param column: The column where the BLOB is located.
+ :type column: str
+ :param rowid: The rowid of the BLOB.
+ :type rowid: int
+ :param flags: If zero, BLOB is opened for read-only. If non-zero,
+ BLOB is opened for RW.
+ :type flags: int
+
+ :return: A BLOB handle.
+ :rtype: pysqlcipher.dbapi.Blob
+ """
+ return self.runInteraction(self._blob, table, column, irow, flags)
+
+ def _blob(self, trans, table, column, irow, flags):
+ # TODO: should not use transaction private variable here
+ handle = trans._connection.blob(table, column, irow, flags)
+ return handle
+
+
+def check_http_status(code):
+ if code == 409:
+ raise BlobAlreadyExistsError()
+ elif code != 200:
+ raise SoledadError("Server Error")
+
+
+class DecrypterBuffer(object):
+
+ def __init__(self, blob_id, secret, tag):
+ self.doc_info = DocInfo(blob_id, FIXED_REV)
+ self.secret = secret
+ self.tag = tag
+ self.preamble_pipe = PreamblePipe(self._make_decryptor)
+
+ def _make_decryptor(self, preamble):
+ self.decrypter = BlobDecryptor(
+ self.doc_info, preamble,
+ secret=self.secret,
+ armor=False,
+ start_stream=False,
+ tag=self.tag)
+ return TruncatedTailPipe(self.decrypter, tail_size=len(self.tag))
+
+ def write(self, data):
+ self.preamble_pipe.write(data)
+
+ def close(self):
+ real_size = self.decrypter.decrypted_content_size
+ return self.decrypter._end_stream(), real_size
+
+
+class BlobManager(object):
+ """
+ Ideally, the decrypting flow goes like this:
+
+ - GET a blob from remote server.
+ - Decrypt the preamble
+ - Allocate a zeroblob in the sqlcipher sink
+ - Mark the blob as unusable (ie, not verified)
+ - Decrypt the payload incrementally, and write chunks to sqlcipher
+ ** Is it possible to use a small buffer for the aes writer w/o
+ ** allocating all the memory in openssl?
+ - Finalize the AES decryption
+ - If preamble + payload verifies correctly, mark the blob as usable
+
+ """
+
+ def __init__(
+ self, local_path, remote, key, secret, user, token=None,
+ cert_file=None):
+ if local_path:
+ mkdir_p(os.path.dirname(local_path))
+ self.local = SQLiteBlobBackend(local_path, key)
+ self.remote = remote
+ self.secret = secret
+ self.user = user
+ self._client = HTTPClient(user, token, cert_file)
+
+ def close(self):
+ if hasattr(self, 'local') and self.local:
+ return self.local.close()
+
+ @defer.inlineCallbacks
+ def remote_list(self):
+ uri = urljoin(self.remote, self.user + '/')
+ data = yield self._client.get(uri)
+ defer.returnValue((yield data.json()))
+
+ def local_list(self):
+ return self.local.list()
+
+ @defer.inlineCallbacks
+ def send_missing(self):
+ our_blobs = yield self.local_list()
+ server_blobs = yield self.remote_list()
+ missing = [b_id for b_id in our_blobs if b_id not in server_blobs]
+ logger.info("Amount of documents missing on server: %s" % len(missing))
+ # TODO: Send concurrently when we are able to stream directly from db
+ for blob_id in missing:
+ fd = yield self.local.get(blob_id)
+ logger.info("Upload local blob: %s" % blob_id)
+ yield self._encrypt_and_upload(blob_id, fd)
+
+ @defer.inlineCallbacks
+ def fetch_missing(self):
+ # TODO: Use something to prioritize user requests over general new docs
+ our_blobs = yield self.local_list()
+ server_blobs = yield self.remote_list()
+ docs_we_want = [b_id for b_id in server_blobs if b_id not in our_blobs]
+ logger.info("Fetching new docs from server: %s" % len(docs_we_want))
+ # TODO: Fetch concurrently when we are able to stream directly into db
+ for blob_id in docs_we_want:
+ logger.info("Fetching new doc: %s" % blob_id)
+ yield self.get(blob_id)
+
+ @defer.inlineCallbacks
+ def put(self, doc, size):
+ if (yield self.local.exists(doc.blob_id)):
+ error_message = "Blob already exists: %s" % doc.blob_id
+ raise BlobAlreadyExistsError(error_message)
+ fd = doc.blob_fd
+ # TODO this is a tee really, but ok... could do db and upload
+ # concurrently. not sure if we'd gain something.
+ yield self.local.put(doc.blob_id, fd, size=size)
+ # In fact, some kind of pipe is needed here, where each write on db
+ # handle gets forwarded into a write on the connection handle
+ fd = yield self.local.get(doc.blob_id)
+ yield self._encrypt_and_upload(doc.blob_id, fd)
+
+ @defer.inlineCallbacks
+ def get(self, blob_id):
+ local_blob = yield self.local.get(blob_id)
+ if local_blob:
+ logger.info("Found blob in local database: %s" % blob_id)
+ defer.returnValue(local_blob)
+
+ result = yield self._download_and_decrypt(blob_id)
+
+ if not result:
+ defer.returnValue(None)
+ blob, size = result
+
+ if blob:
+ logger.info("Got decrypted blob of type: %s" % type(blob))
+ blob.seek(0)
+ yield self.local.put(blob_id, blob, size=size)
+ defer.returnValue((yield self.local.get(blob_id)))
+ else:
+ # XXX we shouldn't get here, but we will...
+ # lots of ugly error handling possible:
+ # 1. retry, might be network error
+ # 2. try later, maybe didn't finished streaming
+ # 3.. resignation, might be error while verifying
+ logger.error('sorry, dunno what happened')
+
+ @defer.inlineCallbacks
+ def _encrypt_and_upload(self, blob_id, fd):
+ # TODO ------------------------------------------
+ # this is wrong, is doing 2 stages.
+ # the crypto producer can be passed to
+ # the uploader and react as data is written.
+ # try to rewrite as a tube: pass the fd to aes and let aes writer
+ # produce data to the treq request fd.
+ # ------------------------------------------------
+ logger.info("Staring upload of blob: %s" % blob_id)
+ doc_info = DocInfo(blob_id, FIXED_REV)
+ uri = urljoin(self.remote, self.user + "/" + blob_id)
+ crypter = BlobEncryptor(doc_info, fd, secret=self.secret,
+ armor=False)
+ fd = yield crypter.encrypt()
+ response = yield self._client.put(uri, data=fd)
+ check_http_status(response.code)
+ logger.info("Finished upload: %s" % (blob_id,))
+
+ @defer.inlineCallbacks
+ def _download_and_decrypt(self, blob_id):
+ logger.info("Staring download of blob: %s" % blob_id)
+ # TODO this needs to be connected in a tube
+ uri = urljoin(self.remote, self.user + '/' + blob_id)
+ data = yield self._client.get(uri)
+
+ if data.code == 404:
+ logger.warn("Blob not found in server: %s" % blob_id)
+ defer.returnValue(None)
+ elif not data.headers.hasHeader('Tag'):
+ logger.error("Server didn't send a tag header for: %s" % blob_id)
+ defer.returnValue(None)
+ tag = data.headers.getRawHeaders('Tag')[0]
+ tag = base64.urlsafe_b64decode(tag)
+ buf = DecrypterBuffer(blob_id, self.secret, tag)
+
+ # incrementally collect the body of the response
+ yield treq.collect(data, buf.write)
+ fd, size = buf.close()
+ logger.info("Finished download: (%s, %d)" % (blob_id, size))
+ defer.returnValue((fd, size))
+
+ @defer.inlineCallbacks
+ def delete(self, blob_id):
+ logger.info("Staring deletion of blob: %s" % blob_id)
+ yield self._delete_from_remote(blob_id)
+ if (yield self.local.exists(blob_id)):
+ yield self.local.delete(blob_id)
+
+ def _delete_from_remote(self, blob_id):
+ # TODO this needs to be connected in a tube
+ uri = urljoin(self.remote, self.user + '/' + blob_id)
+ return self._client.delete(uri)
+
+
+class SQLiteBlobBackend(object):
+
+ def __init__(self, path, key=None):
+ self.path = os.path.abspath(
+ os.path.join(path, 'soledad_blob.db'))
+ mkdir_p(os.path.dirname(self.path))
+ if not key:
+ raise ValueError('key cannot be None')
+ backend = 'pysqlcipher.dbapi2'
+ opts = sqlcipher.SQLCipherOptions(
+ '/tmp/ignored', binascii.b2a_hex(key),
+ is_raw_key=True, create=True)
+ pragmafun = partial(pragmas.set_init_pragmas, opts=opts)
+ openfun = _sqlcipherInitFactory(pragmafun)
+
+ self.dbpool = ConnectionPool(
+ backend, self.path, check_same_thread=False, timeout=5,
+ cp_openfun=openfun, cp_min=1, cp_max=2, cp_name='blob_pool')
+
+ def close(self):
+ from twisted._threads import AlreadyQuit
+ try:
+ self.dbpool.close()
+ except AlreadyQuit:
+ pass
+
+ @defer.inlineCallbacks
+ def put(self, blob_id, blob_fd, size=None):
+ logger.info("Saving blob in local database...")
+ insert = 'INSERT INTO blobs (blob_id, payload) VALUES (?, zeroblob(?))'
+ irow = yield self.dbpool.insertAndGetLastRowid(insert, (blob_id, size))
+ handle = yield self.dbpool.blob('blobs', 'payload', irow, 1)
+ blob_fd.seek(0)
+ # XXX I have to copy the buffer here so that I'm able to
+ # return a non-closed file to the caller (blobmanager.get)
+ # FIXME should remove this duplication!
+ # have a look at how treq does cope with closing the handle
+ # for uploading a file
+ producer = FileBodyProducer(blob_fd)
+ done = yield producer.startProducing(handle)
+ logger.info("Finished saving blob in local database.")
+ defer.returnValue(done)
+
+ @defer.inlineCallbacks
+ def get(self, blob_id):
+ # TODO we can also stream the blob value using sqlite
+ # incremental interface for blobs - and just return the raw fd instead
+ select = 'SELECT payload FROM blobs WHERE blob_id = ?'
+ result = yield self.dbpool.runQuery(select, (blob_id,))
+ if result:
+ defer.returnValue(BytesIO(str(result[0][0])))
+
+ @defer.inlineCallbacks
+ def list(self):
+ query = 'select blob_id from blobs'
+ result = yield self.dbpool.runQuery(query)
+ if result:
+ defer.returnValue([b_id[0] for b_id in result])
+ else:
+ defer.returnValue([])
+
+ @defer.inlineCallbacks
+ def exists(self, blob_id):
+ query = 'SELECT blob_id from blobs WHERE blob_id = ?'
+ result = yield self.dbpool.runQuery(query, (blob_id,))
+ defer.returnValue(bool(len(result)))
+
+ def delete(self, blob_id):
+ query = 'DELETE FROM blobs WHERE blob_id = ?'
+ return self.dbpool.runQuery(query, (blob_id,))
+
+
+def _init_blob_table(conn):
+ maybe_create = (
+ "CREATE TABLE IF NOT EXISTS "
+ "blobs ("
+ "blob_id PRIMARY KEY, "
+ "payload BLOB)")
+ conn.execute(maybe_create)
+
+
+def _sqlcipherInitFactory(fun):
+ def _initialize(conn):
+ fun(conn)
+ _init_blob_table(conn)
+ return _initialize
+
+
+#
+# testing facilities
+#
+
+@defer.inlineCallbacks
+def testit(reactor):
+ # configure logging to stdout
+ from twisted.python import log
+ import sys
+ log.startLogging(sys.stdout)
+
+ # parse command line arguments
+ import argparse
+
+ parser = argparse.ArgumentParser()
+ parser.add_argument('--url', default='http://localhost:9000/')
+ parser.add_argument('--path', default='/tmp/blobs')
+ parser.add_argument('--secret', default='secret')
+ parser.add_argument('--uuid', default='user')
+ parser.add_argument('--token', default=None)
+ parser.add_argument('--cert-file', default='')
+
+ subparsers = parser.add_subparsers(help='sub-command help', dest='action')
+
+ # parse upload command
+ parser_upload = subparsers.add_parser(
+ 'upload', help='upload blob and bypass local db')
+ parser_upload.add_argument('payload')
+ parser_upload.add_argument('blob_id')
+
+ # parse download command
+ parser_download = subparsers.add_parser(
+ 'download', help='download blob and bypass local db')
+ parser_download.add_argument('blob_id')
+ parser_download.add_argument('--output-file', default='/tmp/incoming-file')
+
+ # parse put command
+ parser_put = subparsers.add_parser(
+ 'put', help='put blob in local db and upload')
+ parser_put.add_argument('payload')
+ parser_put.add_argument('blob_id')
+
+ # parse get command
+ parser_get = subparsers.add_parser(
+ 'get', help='get blob from local db, get if needed')
+ parser_get.add_argument('blob_id')
+
+ # parse delete command
+ parser_get = subparsers.add_parser(
+ 'delete', help='delete blob from local and remote db')
+ parser_get.add_argument('blob_id')
+
+ # parse list command
+ parser_get = subparsers.add_parser(
+ 'list', help='list local and remote blob ids')
+
+ # parse send_missing command
+ parser_get = subparsers.add_parser(
+ 'send_missing', help='send all pending upload blobs')
+
+ # parse send_missing command
+ parser_get = subparsers.add_parser(
+ 'fetch_missing', help='fetch all new server blobs')
+
+ # parse arguments
+ args = parser.parse_args()
+
+ # TODO convert these into proper unittests
+
+ def _manager():
+ mkdir_p(os.path.dirname(args.path))
+ manager = BlobManager(
+ args.path, args.url,
+ 'A' * 32, args.secret,
+ args.uuid, args.token, args.cert_file)
+ return manager
+
+ @defer.inlineCallbacks
+ def _upload(blob_id, payload):
+ logger.info(":: Starting upload only: %s" % str((blob_id, payload)))
+ manager = _manager()
+ with open(payload, 'r') as fd:
+ yield manager._encrypt_and_upload(blob_id, fd)
+ logger.info(":: Finished upload only: %s" % str((blob_id, payload)))
+
+ @defer.inlineCallbacks
+ def _download(blob_id):
+ logger.info(":: Starting download only: %s" % blob_id)
+ manager = _manager()
+ result = yield manager._download_and_decrypt(blob_id)
+ logger.info(":: Result of download: %s" % str(result))
+ if result:
+ fd, _ = result
+ with open(args.output_file, 'w') as f:
+ logger.info(":: Writing data to %s" % args.output_file)
+ f.write(fd.read())
+ logger.info(":: Finished download only: %s" % blob_id)
+
+ @defer.inlineCallbacks
+ def _put(blob_id, payload):
+ logger.info(":: Starting full put: %s" % blob_id)
+ manager = _manager()
+ size = os.path.getsize(payload)
+ with open(payload) as fd:
+ doc = BlobDoc(fd, blob_id)
+ result = yield manager.put(doc, size=size)
+ logger.info(":: Result of put: %s" % str(result))
+ logger.info(":: Finished full put: %s" % blob_id)
+
+ @defer.inlineCallbacks
+ def _get(blob_id):
+ logger.info(":: Starting full get: %s" % blob_id)
+ manager = _manager()
+ fd = yield manager.get(blob_id)
+ if fd:
+ logger.info(":: Result of get: " + fd.getvalue())
+ logger.info(":: Finished full get: %s" % blob_id)
+
+ @defer.inlineCallbacks
+ def _delete(blob_id):
+ logger.info(":: Starting deletion of: %s" % blob_id)
+ manager = _manager()
+ yield manager.delete(blob_id)
+ logger.info(":: Finished deletion of: %s" % blob_id)
+
+ @defer.inlineCallbacks
+ def _list():
+ logger.info(":: Listing local blobs")
+ manager = _manager()
+ local_list = yield manager.local_list()
+ logger.info(":: Local list: %s" % local_list)
+ logger.info(":: Listing remote blobs")
+ remote_list = yield manager.remote_list()
+ logger.info(":: Remote list: %s" % remote_list)
+
+ @defer.inlineCallbacks
+ def _send_missing():
+ logger.info(":: Sending local pending upload docs")
+ manager = _manager()
+ yield manager.send_missing()
+ logger.info(":: Finished sending missing docs")
+
+ @defer.inlineCallbacks
+ def _fetch_missing():
+ logger.info(":: Fetching remote new docs")
+ manager = _manager()
+ yield manager.fetch_missing()
+ logger.info(":: Finished fetching new docs")
+
+ if args.action == 'upload':
+ yield _upload(args.blob_id, args.payload)
+ elif args.action == 'download':
+ yield _download(args.blob_id)
+ elif args.action == 'put':
+ yield _put(args.blob_id, args.payload)
+ elif args.action == 'get':
+ yield _get(args.blob_id)
+ elif args.action == 'delete':
+ yield _delete(args.blob_id)
+ elif args.action == 'list':
+ yield _list()
+ elif args.action == 'send_missing':
+ yield _send_missing()
+ elif args.action == 'fetch_missing':
+ yield _fetch_missing()
+
+
+if __name__ == '__main__':
+ from twisted.internet.task import react
+ react(testit)
diff --git a/src/leap/soledad/client/_db/dbschema.sql b/src/leap/soledad/client/_db/dbschema.sql
new file mode 100644
index 00000000..ae027fc5
--- /dev/null
+++ b/src/leap/soledad/client/_db/dbschema.sql
@@ -0,0 +1,42 @@
+-- Database schema
+CREATE TABLE transaction_log (
+ generation INTEGER PRIMARY KEY AUTOINCREMENT,
+ doc_id TEXT NOT NULL,
+ transaction_id TEXT NOT NULL
+);
+CREATE TABLE document (
+ doc_id TEXT PRIMARY KEY,
+ doc_rev TEXT NOT NULL,
+ content TEXT
+);
+CREATE TABLE document_fields (
+ doc_id TEXT NOT NULL,
+ field_name TEXT NOT NULL,
+ value TEXT
+);
+CREATE INDEX document_fields_field_value_doc_idx
+ ON document_fields(field_name, value, doc_id);
+
+CREATE TABLE sync_log (
+ replica_uid TEXT PRIMARY KEY,
+ known_generation INTEGER,
+ known_transaction_id TEXT
+);
+CREATE TABLE conflicts (
+ doc_id TEXT,
+ doc_rev TEXT,
+ content TEXT,
+ CONSTRAINT conflicts_pkey PRIMARY KEY (doc_id, doc_rev)
+);
+CREATE TABLE index_definitions (
+ name TEXT,
+ offset INT,
+ field TEXT,
+ CONSTRAINT index_definitions_pkey PRIMARY KEY (name, offset)
+);
+create index index_definitions_field on index_definitions(field);
+CREATE TABLE u1db_config (
+ name TEXT PRIMARY KEY,
+ value TEXT
+);
+INSERT INTO u1db_config VALUES ('sql_schema', '0');
diff --git a/src/leap/soledad/client/_db/pragmas.py b/src/leap/soledad/client/_db/pragmas.py
new file mode 100644
index 00000000..870ed63e
--- /dev/null
+++ b/src/leap/soledad/client/_db/pragmas.py
@@ -0,0 +1,379 @@
+# -*- coding: utf-8 -*-
+# pragmas.py
+# Copyright (C) 2013, 2014 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Different pragmas used in the initialization of the SQLCipher database.
+"""
+import string
+import threading
+import os
+
+from leap.soledad.common import soledad_assert
+from leap.soledad.common.log import getLogger
+
+
+logger = getLogger(__name__)
+
+
+_db_init_lock = threading.Lock()
+
+
+def set_init_pragmas(conn, opts=None, extra_queries=None):
+ """
+ Set the initialization pragmas.
+
+ This includes the crypto pragmas, and any other options that must
+ be passed early to sqlcipher db.
+ """
+ soledad_assert(opts is not None)
+ extra_queries = [] if extra_queries is None else extra_queries
+ with _db_init_lock:
+ # only one execution path should initialize the db
+ _set_init_pragmas(conn, opts, extra_queries)
+
+
+def _set_init_pragmas(conn, opts, extra_queries):
+
+ sync_off = os.environ.get('LEAP_SQLITE_NOSYNC')
+ memstore = os.environ.get('LEAP_SQLITE_MEMSTORE')
+ nowal = os.environ.get('LEAP_SQLITE_NOWAL')
+
+ set_crypto_pragmas(conn, opts)
+
+ if not nowal:
+ set_write_ahead_logging(conn)
+ if sync_off:
+ set_synchronous_off(conn)
+ else:
+ set_synchronous_normal(conn)
+ if memstore:
+ set_mem_temp_store(conn)
+
+ for query in extra_queries:
+ conn.cursor().execute(query)
+
+
+def set_crypto_pragmas(db_handle, sqlcipher_opts):
+ """
+ Set cryptographic params (key, cipher, KDF number of iterations and
+ cipher page size).
+
+ :param db_handle:
+ :type db_handle:
+ :param sqlcipher_opts: options for the SQLCipherDatabase
+ :type sqlcipher_opts: SQLCipherOpts instance
+ """
+ # XXX assert CryptoOptions
+ opts = sqlcipher_opts
+ _set_key(db_handle, opts.key, opts.is_raw_key)
+ _set_cipher(db_handle, opts.cipher)
+ _set_kdf_iter(db_handle, opts.kdf_iter)
+ _set_cipher_page_size(db_handle, opts.cipher_page_size)
+
+
+def _set_key(db_handle, key, is_raw_key):
+ """
+ Set the ``key`` for use with the database.
+
+ The process of creating a new, encrypted database is called 'keying'
+ the database. SQLCipher uses just-in-time key derivation at the point
+ it is first needed for an operation. This means that the key (and any
+ options) must be set before the first operation on the database. As
+ soon as the database is touched (e.g. SELECT, CREATE TABLE, UPDATE,
+ etc.) and pages need to be read or written, the key is prepared for
+ use.
+
+ Implementation Notes:
+
+ * PRAGMA key should generally be called as the first operation on a
+ database.
+
+ :param key: The key for use with the database.
+ :type key: str
+ :param is_raw_key:
+ Whether C{key} is a raw 64-char hex string or a passphrase that should
+ be hashed to obtain the encyrption key.
+ :type is_raw_key: bool
+ """
+ if is_raw_key:
+ _set_key_raw(db_handle, key)
+ else:
+ _set_key_passphrase(db_handle, key)
+
+
+def _set_key_passphrase(db_handle, passphrase):
+ """
+ Set a passphrase for encryption key derivation.
+
+ The key itself can be a passphrase, which is converted to a key using
+ PBKDF2 key derivation. The result is used as the encryption key for
+ the database. By using this method, there is no way to alter the KDF;
+ if you want to do so you should use a raw key instead and derive the
+ key using your own KDF.
+
+ :param db_handle: A handle to the SQLCipher database.
+ :type db_handle: pysqlcipher.Connection
+ :param passphrase: The passphrase used to derive the encryption key.
+ :type passphrase: str
+ """
+ db_handle.cursor().execute("PRAGMA key = '%s'" % passphrase)
+
+
+def _set_key_raw(db_handle, key):
+ """
+ Set a raw hexadecimal encryption key.
+
+ It is possible to specify an exact byte sequence using a blob literal.
+ With this method, it is the calling application's responsibility to
+ ensure that the data provided is a 64 character hex string, which will
+ be converted directly to 32 bytes (256 bits) of key data.
+
+ :param db_handle: A handle to the SQLCipher database.
+ :type db_handle: pysqlcipher.Connection
+ :param key: A 64 character hex string.
+ :type key: str
+ """
+ if not all(c in string.hexdigits for c in key):
+ raise NotAnHexString(key)
+ db_handle.cursor().execute('PRAGMA key = "x\'%s"' % key)
+
+
+def _set_cipher(db_handle, cipher='aes-256-cbc'):
+ """
+ Set the cipher and mode to use for symmetric encryption.
+
+ SQLCipher uses aes-256-cbc as the default cipher and mode of
+ operation. It is possible to change this, though not generally
+ recommended, using PRAGMA cipher.
+
+ SQLCipher makes direct use of libssl, so all cipher options available
+ to libssl are also available for use with SQLCipher. See `man enc` for
+ OpenSSL's supported ciphers.
+
+ Implementation Notes:
+
+ * PRAGMA cipher must be called after PRAGMA key and before the first
+ actual database operation or it will have no effect.
+
+ * If a non-default value is used PRAGMA cipher to create a database,
+ it must also be called every time that database is opened.
+
+ * SQLCipher does not implement its own encryption. Instead it uses the
+ widely available and peer-reviewed OpenSSL libcrypto for all
+ cryptographic functions.
+
+ :param db_handle: A handle to the SQLCipher database.
+ :type db_handle: pysqlcipher.Connection
+ :param cipher: The cipher and mode to use.
+ :type cipher: str
+ """
+ db_handle.cursor().execute("PRAGMA cipher = '%s'" % cipher)
+
+
+def _set_kdf_iter(db_handle, kdf_iter=4000):
+ """
+ Set the number of iterations for the key derivation function.
+
+ SQLCipher uses PBKDF2 key derivation to strengthen the key and make it
+ resistent to brute force and dictionary attacks. The default
+ configuration uses 4000 PBKDF2 iterations (effectively 16,000 SHA1
+ operations). PRAGMA kdf_iter can be used to increase or decrease the
+ number of iterations used.
+
+ Implementation Notes:
+
+ * PRAGMA kdf_iter must be called after PRAGMA key and before the first
+ actual database operation or it will have no effect.
+
+ * If a non-default value is used PRAGMA kdf_iter to create a database,
+ it must also be called every time that database is opened.
+
+ * It is not recommended to reduce the number of iterations if a
+ passphrase is in use.
+
+ :param db_handle: A handle to the SQLCipher database.
+ :type db_handle: pysqlcipher.Connection
+ :param kdf_iter: The number of iterations to use.
+ :type kdf_iter: int
+ """
+ db_handle.cursor().execute("PRAGMA kdf_iter = '%d'" % kdf_iter)
+
+
+def _set_cipher_page_size(db_handle, cipher_page_size=1024):
+ """
+ Set the page size of the encrypted database.
+
+ SQLCipher 2 introduced the new PRAGMA cipher_page_size that can be
+ used to adjust the page size for the encrypted database. The default
+ page size is 1024 bytes, but it can be desirable for some applications
+ to use a larger page size for increased performance. For instance,
+ some recent testing shows that increasing the page size can noticeably
+ improve performance (5-30%) for certain queries that manipulate a
+ large number of pages (e.g. selects without an index, large inserts in
+ a transaction, big deletes).
+
+ To adjust the page size, call the pragma immediately after setting the
+ key for the first time and each subsequent time that you open the
+ database.
+
+ Implementation Notes:
+
+ * PRAGMA cipher_page_size must be called after PRAGMA key and before
+ the first actual database operation or it will have no effect.
+
+ * If a non-default value is used PRAGMA cipher_page_size to create a
+ database, it must also be called every time that database is opened.
+
+ :param db_handle: A handle to the SQLCipher database.
+ :type db_handle: pysqlcipher.Connection
+ :param cipher_page_size: The page size.
+ :type cipher_page_size: int
+ """
+ db_handle.cursor().execute(
+ "PRAGMA cipher_page_size = '%d'" % cipher_page_size)
+
+
+# XXX UNUSED ?
+def set_rekey(db_handle, new_key, is_raw_key):
+ """
+ Change the key of an existing encrypted database.
+
+ To change the key on an existing encrypted database, it must first be
+ unlocked with the current encryption key. Once the database is
+ readable and writeable, PRAGMA rekey can be used to re-encrypt every
+ page in the database with a new key.
+
+ * PRAGMA rekey must be called after PRAGMA key. It can be called at any
+ time once the database is readable.
+
+ * PRAGMA rekey can not be used to encrypted a standard SQLite
+ database! It is only useful for changing the key on an existing
+ database.
+
+ * Previous versions of SQLCipher provided a PRAGMA rekey_cipher and
+ code>PRAGMA rekey_kdf_iter. These are deprecated and should not be
+ used. Instead, use sqlcipher_export().
+
+ :param db_handle: A handle to the SQLCipher database.
+ :type db_handle: pysqlcipher.Connection
+ :param new_key: The new key.
+ :type new_key: str
+ :param is_raw_key: Whether C{password} is a raw 64-char hex string or a
+ passphrase that should be hashed to obtain the encyrption
+ key.
+ :type is_raw_key: bool
+ """
+ if is_raw_key:
+ _set_rekey_raw(db_handle, new_key)
+ else:
+ _set_rekey_passphrase(db_handle, new_key)
+
+
+def _set_rekey_passphrase(db_handle, passphrase):
+ """
+ Change the passphrase for encryption key derivation.
+
+ The key itself can be a passphrase, which is converted to a key using
+ PBKDF2 key derivation. The result is used as the encryption key for
+ the database.
+
+ :param db_handle: A handle to the SQLCipher database.
+ :type db_handle: pysqlcipher.Connection
+ :param passphrase: The passphrase used to derive the encryption key.
+ :type passphrase: str
+ """
+ db_handle.cursor().execute("PRAGMA rekey = '%s'" % passphrase)
+
+
+def _set_rekey_raw(db_handle, key):
+ """
+ Change the raw hexadecimal encryption key.
+
+ It is possible to specify an exact byte sequence using a blob literal.
+ With this method, it is the calling application's responsibility to
+ ensure that the data provided is a 64 character hex string, which will
+ be converted directly to 32 bytes (256 bits) of key data.
+
+ :param db_handle: A handle to the SQLCipher database.
+ :type db_handle: pysqlcipher.Connection
+ :param key: A 64 character hex string.
+ :type key: str
+ """
+ if not all(c in string.hexdigits for c in key):
+ raise NotAnHexString(key)
+ db_handle.cursor().execute('PRAGMA rekey = "x\'%s"' % key)
+
+
+def set_synchronous_off(db_handle):
+ """
+ Change the setting of the "synchronous" flag to OFF.
+ """
+ logger.debug("sqlcipher: setting synchronous off")
+ db_handle.cursor().execute('PRAGMA synchronous=OFF')
+
+
+def set_synchronous_normal(db_handle):
+ """
+ Change the setting of the "synchronous" flag to NORMAL.
+ """
+ logger.debug("sqlcipher: setting synchronous normal")
+ db_handle.cursor().execute('PRAGMA synchronous=NORMAL')
+
+
+def set_mem_temp_store(db_handle):
+ """
+ Use a in-memory store for temporary tables.
+ """
+ logger.debug("sqlcipher: setting temp_store memory")
+ db_handle.cursor().execute('PRAGMA temp_store=MEMORY')
+
+
+def set_write_ahead_logging(db_handle):
+ """
+ Enable write-ahead logging, and set the autocheckpoint to 50 pages.
+
+ Setting the autocheckpoint to a small value, we make the reads not
+ suffer too much performance degradation.
+
+ From the sqlite docs:
+
+ "There is a tradeoff between average read performance and average write
+ performance. To maximize the read performance, one wants to keep the
+ WAL as small as possible and hence run checkpoints frequently, perhaps
+ as often as every COMMIT. To maximize write performance, one wants to
+ amortize the cost of each checkpoint over as many writes as possible,
+ meaning that one wants to run checkpoints infrequently and let the WAL
+ grow as large as possible before each checkpoint. The decision of how
+ often to run checkpoints may therefore vary from one application to
+ another depending on the relative read and write performance
+ requirements of the application. The default strategy is to run a
+ checkpoint once the WAL reaches 1000 pages"
+ """
+ logger.debug("sqlcipher: setting write-ahead logging")
+ db_handle.cursor().execute('PRAGMA journal_mode=WAL')
+
+ # The optimum value can still use a little bit of tuning, but we favor
+ # small sizes of the WAL file to get fast reads, since we assume that
+ # the writes will be quick enough to not block too much.
+
+ db_handle.cursor().execute('PRAGMA wal_autocheckpoint=50')
+
+
+class NotAnHexString(Exception):
+ """
+ Raised when trying to (raw) key the database with a non-hex string.
+ """
+ pass
diff --git a/src/leap/soledad/client/_db/sqlcipher.py b/src/leap/soledad/client/_db/sqlcipher.py
new file mode 100644
index 00000000..d22017bd
--- /dev/null
+++ b/src/leap/soledad/client/_db/sqlcipher.py
@@ -0,0 +1,633 @@
+# -*- coding: utf-8 -*-
+# sqlcipher.py
+# Copyright (C) 2013, 2014 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+A U1DB backend that uses SQLCipher as its persistence layer.
+
+The SQLCipher API (http://sqlcipher.net/sqlcipher-api/) is fully implemented,
+with the exception of the following statements:
+
+ * PRAGMA cipher_use_hmac
+ * PRAGMA cipher_default_use_mac
+
+SQLCipher 2.0 introduced a per-page HMAC to validate that the page data has
+not be tampered with. By default, when creating or opening a database using
+SQLCipher 2, SQLCipher will attempt to use an HMAC check. This change in
+database format means that SQLCipher 2 can't operate on version 1.1.x
+databases by default. Thus, in order to provide backward compatibility with
+SQLCipher 1.1.x, PRAGMA cipher_use_hmac can be used to disable the HMAC
+functionality on specific databases.
+
+In some very specific cases, it is not possible to call PRAGMA cipher_use_hmac
+as one of the first operations on a database. An example of this is when
+trying to ATTACH a 1.1.x database to the main database. In these cases PRAGMA
+cipher_default_use_hmac can be used to globally alter the default use of HMAC
+when opening a database.
+
+So, as the statements above were introduced for backwards compatibility with
+SQLCipher 1.1 databases, we do not implement them as all SQLCipher databases
+handled by Soledad should be created by SQLCipher >= 2.0.
+"""
+import os
+import sys
+
+from functools import partial
+
+from twisted.internet import reactor
+from twisted.internet import defer
+from twisted.enterprise import adbapi
+
+from leap.soledad.common.log import getLogger
+from leap.soledad.common.l2db import errors as u1db_errors
+from leap.soledad.common.errors import DatabaseAccessError
+
+from leap.soledad.client.http_target import SoledadHTTPSyncTarget
+from leap.soledad.client.sync import SoledadSynchronizer
+
+from .._document import Document
+from . import sqlite
+from . import pragmas
+
+if sys.version_info[0] < 3:
+ from pysqlcipher import dbapi2 as sqlcipher_dbapi2
+else:
+ from pysqlcipher3 import dbapi2 as sqlcipher_dbapi2
+
+logger = getLogger(__name__)
+
+
+# Monkey-patch u1db.backends.sqlite with pysqlcipher.dbapi2
+sqlite.dbapi2 = sqlcipher_dbapi2
+
+
+# we may want to collect statistics from the sync process
+DO_STATS = False
+if os.environ.get('SOLEDAD_STATS'):
+ DO_STATS = True
+
+
+def initialize_sqlcipher_db(opts, on_init=None, check_same_thread=True):
+ """
+ Initialize a SQLCipher database.
+
+ :param opts:
+ :type opts: SQLCipherOptions
+ :param on_init: a tuple of queries to be executed on initialization
+ :type on_init: tuple
+ :return: pysqlcipher.dbapi2.Connection
+ """
+ # Note: There seemed to be a bug in sqlite 3.5.9 (with python2.6)
+ # where without re-opening the database on Windows, it
+ # doesn't see the transaction that was just committed
+ # Removing from here now, look at the pysqlite implementation if the
+ # bug shows up in windows.
+
+ if not os.path.isfile(opts.path) and not opts.create:
+ raise u1db_errors.DatabaseDoesNotExist()
+
+ conn = sqlcipher_dbapi2.connect(
+ opts.path, check_same_thread=check_same_thread)
+ pragmas.set_init_pragmas(conn, opts, extra_queries=on_init)
+ return conn
+
+
+def initialize_sqlcipher_adbapi_db(opts, extra_queries=None):
+ from leap.soledad.client import sqlcipher_adbapi
+ return sqlcipher_adbapi.getConnectionPool(
+ opts, extra_queries=extra_queries)
+
+
+class SQLCipherOptions(object):
+ """
+ A container with options for the initialization of an SQLCipher database.
+ """
+
+ @classmethod
+ def copy(cls, source, path=None, key=None, create=None,
+ is_raw_key=None, cipher=None, kdf_iter=None,
+ cipher_page_size=None, sync_db_key=None):
+ """
+ Return a copy of C{source} with parameters different than None
+ replaced by new values.
+ """
+ local_vars = locals()
+ args = []
+ kwargs = {}
+
+ for name in ["path", "key"]:
+ val = local_vars[name]
+ if val is not None:
+ args.append(val)
+ else:
+ args.append(getattr(source, name))
+
+ for name in ["create", "is_raw_key", "cipher", "kdf_iter",
+ "cipher_page_size", "sync_db_key"]:
+ val = local_vars[name]
+ if val is not None:
+ kwargs[name] = val
+ else:
+ kwargs[name] = getattr(source, name)
+
+ return SQLCipherOptions(*args, **kwargs)
+
+ def __init__(self, path, key, create=True, is_raw_key=False,
+ cipher='aes-256-cbc', kdf_iter=4000, cipher_page_size=1024,
+ sync_db_key=None):
+ """
+ :param path: The filesystem path for the database to open.
+ :type path: str
+ :param create:
+ True/False, should the database be created if it doesn't
+ already exist?
+ :param create: bool
+ :param is_raw_key:
+ Whether ``password`` is a raw 64-char hex string or a passphrase
+ that should be hashed to obtain the encyrption key.
+ :type raw_key: bool
+ :param cipher: The cipher and mode to use.
+ :type cipher: str
+ :param kdf_iter: The number of iterations to use.
+ :type kdf_iter: int
+ :param cipher_page_size: The page size.
+ :type cipher_page_size: int
+ """
+ self.path = path
+ self.key = key
+ self.is_raw_key = is_raw_key
+ self.create = create
+ self.cipher = cipher
+ self.kdf_iter = kdf_iter
+ self.cipher_page_size = cipher_page_size
+ self.sync_db_key = sync_db_key
+
+ def __str__(self):
+ """
+ Return string representation of options, for easy debugging.
+
+ :return: String representation of options.
+ :rtype: str
+ """
+ attr_names = filter(lambda a: not a.startswith('_'), dir(self))
+ attr_str = []
+ for a in attr_names:
+ attr_str.append(a + "=" + str(getattr(self, a)))
+ name = self.__class__.__name__
+ return "%s(%s)" % (name, ', '.join(attr_str))
+
+
+#
+# The SQLCipher database
+#
+
+class SQLCipherDatabase(sqlite.SQLitePartialExpandDatabase):
+ """
+ A U1DB implementation that uses SQLCipher as its persistence layer.
+ """
+
+ # The attribute _index_storage_value will be used as the lookup key for the
+ # implementation of the SQLCipher storage backend.
+ _index_storage_value = 'expand referenced encrypted'
+
+ def __init__(self, opts):
+ """
+ Connect to an existing SQLCipher database, creating a new sqlcipher
+ database file if needed.
+
+ *** IMPORTANT ***
+
+ Don't forget to close the database after use by calling the close()
+ method otherwise some resources might not be freed and you may
+ experience several kinds of leakages.
+
+ *** IMPORTANT ***
+
+ :param opts: options for initialization of the SQLCipher database.
+ :type opts: SQLCipherOptions
+ """
+ # ensure the db is encrypted if the file already exists
+ if os.path.isfile(opts.path):
+ self._db_handle = _assert_db_is_encrypted(opts)
+ else:
+ # connect to the sqlcipher database
+ self._db_handle = initialize_sqlcipher_db(opts)
+
+ # TODO ---------------------------------------------------
+ # Everything else in this initialization has to be factored
+ # out, so it can be used from SoledadSQLCipherWrapper.__init__
+ # too.
+ # ---------------------------------------------------------
+
+ self._ensure_schema()
+ self.set_document_factory(doc_factory)
+ self._prime_replica_uid()
+
+ def _prime_replica_uid(self):
+ """
+ In the u1db implementation, _replica_uid is a property
+ that returns the value in _real_replica_uid, and does
+ a db query if no value found.
+ Here we prime the replica uid during initialization so
+ that we don't have to wait for the query afterwards.
+ """
+ self._real_replica_uid = None
+ self._get_replica_uid()
+
+ def _extra_schema_init(self, c):
+ """
+ Add any extra fields, etc to the basic table definitions.
+
+ This method is called by u1db.backends.sqlite_backend._initialize()
+ method, which is executed when the database schema is created. Here,
+ we use it to include the "syncable" property for LeapDocuments.
+
+ :param c: The cursor for querying the database.
+ :type c: dbapi2.cursor
+ """
+ c.execute(
+ 'ALTER TABLE document '
+ 'ADD COLUMN syncable BOOL NOT NULL DEFAULT TRUE')
+
+ #
+ # SQLCipher API methods
+ #
+
+ # Extra query methods: extensions to the base u1db sqlite implmentation.
+
+ def get_count_from_index(self, index_name, *key_values):
+ """
+ Return the count for a given combination of index_name
+ and key values.
+
+ Extension method made from similar methods in u1db version 13.09
+
+ :param index_name: The index to query
+ :type index_name: str
+ :param key_values: values to match. eg, if you have
+ an index with 3 fields then you would have:
+ get_from_index(index_name, val1, val2, val3)
+ :type key_values: tuple
+ :return: count.
+ :rtype: int
+ """
+ c = self._db_handle.cursor()
+ definition = self._get_index_definition(index_name)
+
+ if len(key_values) != len(definition):
+ raise u1db_errors.InvalidValueForIndex()
+ tables = ["document_fields d%d" % i for i in range(len(definition))]
+ novalue_where = ["d.doc_id = d%d.doc_id"
+ " AND d%d.field_name = ?"
+ % (i, i) for i in range(len(definition))]
+ exact_where = [novalue_where[i] + (" AND d%d.value = ?" % (i,))
+ for i in range(len(definition))]
+ args = []
+ where = []
+ for idx, (field, value) in enumerate(zip(definition, key_values)):
+ args.append(field)
+ where.append(exact_where[idx])
+ args.append(value)
+
+ tables = ["document_fields d%d" % i for i in range(len(definition))]
+ statement = (
+ "SELECT COUNT(*) FROM document d, %s WHERE %s " % (
+ ', '.join(tables),
+ ' AND '.join(where),
+ ))
+ try:
+ c.execute(statement, tuple(args))
+ except sqlcipher_dbapi2.OperationalError as e:
+ raise sqlcipher_dbapi2.OperationalError(
+ str(e) + '\nstatement: %s\nargs: %s\n' % (statement, args))
+ res = c.fetchall()
+ return res[0][0]
+
+ def close(self):
+ """
+ Close db connections.
+ """
+ # TODO should be handled by adbapi instead
+ # TODO syncdb should be stopped first
+
+ if logger is not None: # logger might be none if called from __del__
+ logger.debug("SQLCipher backend: closing")
+
+ # close the actual database
+ if getattr(self, '_db_handle', False):
+ self._db_handle.close()
+ self._db_handle = None
+
+ # indexes
+
+ def _put_and_update_indexes(self, old_doc, doc):
+ """
+ Update a document and all indexes related to it.
+
+ :param old_doc: The old version of the document.
+ :type old_doc: u1db.Document
+ :param doc: The new version of the document.
+ :type doc: u1db.Document
+ """
+ sqlite.SQLitePartialExpandDatabase._put_and_update_indexes(
+ self, old_doc, doc)
+ c = self._db_handle.cursor()
+ c.execute('UPDATE document SET syncable=? WHERE doc_id=?',
+ (doc.syncable, doc.doc_id))
+
+ def _get_doc(self, doc_id, check_for_conflicts=False):
+ """
+ Get just the document content, without fancy handling.
+
+ :param doc_id: The unique document identifier
+ :type doc_id: str
+ :param include_deleted: If set to True, deleted documents will be
+ returned with empty content. Otherwise asking for a deleted
+ document will return None.
+ :type include_deleted: bool
+
+ :return: a Document object.
+ :type: u1db.Document
+ """
+ doc = sqlite.SQLitePartialExpandDatabase._get_doc(
+ self, doc_id, check_for_conflicts)
+ if doc:
+ c = self._db_handle.cursor()
+ c.execute('SELECT syncable FROM document WHERE doc_id=?',
+ (doc.doc_id,))
+ result = c.fetchone()
+ doc.syncable = bool(result[0])
+ return doc
+
+ def __del__(self):
+ """
+ Free resources when deleting or garbage collecting the database.
+
+ This is only here to minimze problems if someone ever forgets to call
+ the close() method after using the database; you should not rely on
+ garbage collecting to free up the database resources.
+ """
+ self.close()
+
+
+class SQLCipherU1DBSync(SQLCipherDatabase):
+ """
+ Soledad syncer implementation.
+ """
+
+ """
+ The name of the local symmetrically encrypted documents to
+ sync database file.
+ """
+ LOCAL_SYMMETRIC_SYNC_FILE_NAME = 'sync.u1db'
+
+ """
+ Period or recurrence of the Looping Call that will do the encryption to the
+ syncdb (in seconds).
+ """
+ ENCRYPT_LOOP_PERIOD = 1
+
+ def __init__(self, opts, soledad_crypto, replica_uid, cert_file):
+ self._opts = opts
+ self._path = opts.path
+ self._crypto = soledad_crypto
+ self.__replica_uid = replica_uid
+ self._cert_file = cert_file
+
+ # storage for the documents received during a sync
+ self.received_docs = []
+
+ self.running = False
+ self._db_handle = None
+
+ # initialize the main db before scheduling a start
+ self._initialize_main_db()
+ self._reactor = reactor
+ self._reactor.callWhenRunning(self._start)
+
+ if DO_STATS:
+ self.sync_phase = None
+
+ def commit(self):
+ self._db_handle.commit()
+
+ @property
+ def _replica_uid(self):
+ return str(self.__replica_uid)
+
+ def _start(self):
+ if not self.running:
+ self.running = True
+
+ def _initialize_main_db(self):
+ try:
+ self._db_handle = initialize_sqlcipher_db(
+ self._opts, check_same_thread=False)
+ self._real_replica_uid = None
+ self._ensure_schema()
+ self.set_document_factory(doc_factory)
+ except sqlcipher_dbapi2.DatabaseError as e:
+ raise DatabaseAccessError(str(e))
+
+ @defer.inlineCallbacks
+ def sync(self, url, creds=None):
+ """
+ Synchronize documents with remote replica exposed at url.
+
+ It is not safe to initiate more than one sync process and let them run
+ concurrently. It is responsibility of the caller to ensure that there
+ are no concurrent sync processes running. This is currently controlled
+ by the main Soledad object because it may also run post-sync hooks,
+ which should be run while the lock is locked.
+
+ :param url: The url of the target replica to sync with.
+ :type url: str
+ :param creds: optional dictionary giving credentials to authorize the
+ operation with the server.
+ :type creds: dict
+
+ :return:
+ A Deferred, that will fire with the local generation (type `int`)
+ before the synchronisation was performed.
+ :rtype: Deferred
+ """
+ syncer = self._get_syncer(url, creds=creds)
+ if DO_STATS:
+ self.sync_phase = syncer.sync_phase
+ self.syncer = syncer
+ self.sync_exchange_phase = syncer.sync_exchange_phase
+ local_gen_before_sync = yield syncer.sync()
+ self.received_docs = syncer.received_docs
+ defer.returnValue(local_gen_before_sync)
+
+ def _get_syncer(self, url, creds=None):
+ """
+ Get a synchronizer for ``url`` using ``creds``.
+
+ :param url: The url of the target replica to sync with.
+ :type url: str
+ :param creds: optional dictionary giving credentials.
+ to authorize the operation with the server.
+ :type creds: dict
+
+ :return: A synchronizer.
+ :rtype: Synchronizer
+ """
+ return SoledadSynchronizer(
+ self,
+ SoledadHTTPSyncTarget(
+ url,
+ # XXX is the replica_uid ready?
+ self._replica_uid,
+ creds=creds,
+ crypto=self._crypto,
+ cert_file=self._cert_file))
+
+ #
+ # Symmetric encryption of syncing docs
+ #
+
+ def get_generation(self):
+ # FIXME
+ # XXX this SHOULD BE a callback
+ return self._get_generation()
+
+
+class U1DBSQLiteBackend(sqlite.SQLitePartialExpandDatabase):
+ """
+ A very simple wrapper for u1db around sqlcipher backend.
+
+ Instead of initializing the database on the fly, it just uses an existing
+ connection that is passed to it in the initializer.
+
+ It can be used in tests and debug runs to initialize the adbapi with plain
+ sqlite connections, decoupled from the sqlcipher layer.
+ """
+
+ def __init__(self, conn):
+ self._db_handle = conn
+ self._real_replica_uid = None
+ self._ensure_schema()
+ self._factory = Document
+
+
+class SoledadSQLCipherWrapper(SQLCipherDatabase):
+ """
+ A wrapper for u1db that uses the Soledad-extended sqlcipher backend.
+
+ Instead of initializing the database on the fly, it just uses an existing
+ connection that is passed to it in the initializer.
+
+ It can be used from adbapi to initialize a soledad database after
+ getting a regular connection to a sqlcipher database.
+ """
+ def __init__(self, conn, opts):
+ self._db_handle = conn
+ self._real_replica_uid = None
+ self._ensure_schema()
+ self.set_document_factory(doc_factory)
+ self._prime_replica_uid()
+
+
+def _assert_db_is_encrypted(opts):
+ """
+ Assert that the sqlcipher file contains an encrypted database.
+
+ When opening an existing database, PRAGMA key will not immediately
+ throw an error if the key provided is incorrect. To test that the
+ database can be successfully opened with the provided key, it is
+ necessary to perform some operation on the database (i.e. read from
+ it) and confirm it is success.
+
+ The easiest way to do this is select off the sqlite_master table,
+ which will attempt to read the first page of the database and will
+ parse the schema.
+
+ :param opts:
+ """
+ # We try to open an encrypted database with the regular u1db
+ # backend should raise a DatabaseError exception.
+ # If the regular backend succeeds, then we need to stop because
+ # the database was not properly initialized.
+ try:
+ sqlite.SQLitePartialExpandDatabase(opts.path)
+ except sqlcipher_dbapi2.DatabaseError:
+ # assert that we can access it using SQLCipher with the given
+ # key
+ dummy_query = ('SELECT count(*) FROM sqlite_master',)
+ return initialize_sqlcipher_db(opts, on_init=dummy_query)
+ else:
+ raise DatabaseIsNotEncrypted()
+
+#
+# Exceptions
+#
+
+
+class DatabaseIsNotEncrypted(Exception):
+ """
+ Exception raised when trying to open non-encrypted databases.
+ """
+ pass
+
+
+def doc_factory(doc_id=None, rev=None, json='{}', has_conflicts=False,
+ syncable=True):
+ """
+ Return a default Soledad Document.
+ Used in the initialization for SQLCipherDatabase
+ """
+ return Document(doc_id=doc_id, rev=rev, json=json,
+ has_conflicts=has_conflicts, syncable=syncable)
+
+
+sqlite.SQLiteDatabase.register_implementation(SQLCipherDatabase)
+
+
+#
+# twisted.enterprise.adbapi SQLCipher implementation
+#
+
+SQLCIPHER_CONNECTION_TIMEOUT = 10
+
+
+def getConnectionPool(opts, extra_queries=None):
+ openfun = partial(
+ pragmas.set_init_pragmas,
+ opts=opts,
+ extra_queries=extra_queries)
+ return SQLCipherConnectionPool(
+ database=opts.path,
+ check_same_thread=False,
+ cp_openfun=openfun,
+ timeout=SQLCIPHER_CONNECTION_TIMEOUT)
+
+
+class SQLCipherConnection(adbapi.Connection):
+ pass
+
+
+class SQLCipherTransaction(adbapi.Transaction):
+ pass
+
+
+class SQLCipherConnectionPool(adbapi.ConnectionPool):
+
+ connectionFactory = SQLCipherConnection
+ transactionFactory = SQLCipherTransaction
+
+ def __init__(self, *args, **kwargs):
+ adbapi.ConnectionPool.__init__(
+ self, "pysqlcipher.dbapi2", *args, **kwargs)
diff --git a/src/leap/soledad/client/_db/sqlite.py b/src/leap/soledad/client/_db/sqlite.py
new file mode 100644
index 00000000..4f7b1259
--- /dev/null
+++ b/src/leap/soledad/client/_db/sqlite.py
@@ -0,0 +1,930 @@
+# Copyright 2011 Canonical Ltd.
+# Copyright 2016 LEAP Encryption Access Project
+#
+# This file is part of u1db.
+#
+# u1db is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License version 3
+# as published by the Free Software Foundation.
+#
+# u1db is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with u1db. If not, see <http://www.gnu.org/licenses/>.
+
+"""
+A L2DB implementation that uses SQLite as its persistence layer.
+"""
+
+import errno
+import os
+import json
+import sys
+import time
+import uuid
+import pkg_resources
+
+from sqlite3 import dbapi2
+
+from leap.soledad.common.l2db.backends import CommonBackend, CommonSyncTarget
+from leap.soledad.common.l2db import (
+ Document, errors,
+ query_parser, vectorclock)
+
+
+class SQLiteDatabase(CommonBackend):
+ """A U1DB implementation that uses SQLite as its persistence layer."""
+
+ _sqlite_registry = {}
+
+ def __init__(self, sqlite_file, document_factory=None):
+ """Create a new sqlite file."""
+ self._db_handle = dbapi2.connect(sqlite_file)
+ self._real_replica_uid = None
+ self._ensure_schema()
+ self._factory = document_factory or Document
+
+ def set_document_factory(self, factory):
+ self._factory = factory
+
+ def get_sync_target(self):
+ return SQLiteSyncTarget(self)
+
+ @classmethod
+ def _which_index_storage(cls, c):
+ try:
+ c.execute("SELECT value FROM u1db_config"
+ " WHERE name = 'index_storage'")
+ except dbapi2.OperationalError as e:
+ # The table does not exist yet
+ return None, e
+ else:
+ return c.fetchone()[0], None
+
+ WAIT_FOR_PARALLEL_INIT_HALF_INTERVAL = 0.5
+
+ @classmethod
+ def _open_database(cls, sqlite_file, document_factory=None):
+ if not os.path.isfile(sqlite_file):
+ raise errors.DatabaseDoesNotExist()
+ tries = 2
+ while True:
+ # Note: There seems to be a bug in sqlite 3.5.9 (with python2.6)
+ # where without re-opening the database on Windows, it
+ # doesn't see the transaction that was just committed
+ db_handle = dbapi2.connect(sqlite_file)
+ c = db_handle.cursor()
+ v, err = cls._which_index_storage(c)
+ db_handle.close()
+ if v is not None:
+ break
+ # possibly another process is initializing it, wait for it to be
+ # done
+ if tries == 0:
+ raise err # go for the richest error?
+ tries -= 1
+ time.sleep(cls.WAIT_FOR_PARALLEL_INIT_HALF_INTERVAL)
+ return SQLiteDatabase._sqlite_registry[v](
+ sqlite_file, document_factory=document_factory)
+
+ @classmethod
+ def open_database(cls, sqlite_file, create, backend_cls=None,
+ document_factory=None):
+ try:
+ return cls._open_database(
+ sqlite_file, document_factory=document_factory)
+ except errors.DatabaseDoesNotExist:
+ if not create:
+ raise
+ if backend_cls is None:
+ # default is SQLitePartialExpandDatabase
+ backend_cls = SQLitePartialExpandDatabase
+ return backend_cls(sqlite_file, document_factory=document_factory)
+
+ @staticmethod
+ def delete_database(sqlite_file):
+ try:
+ os.unlink(sqlite_file)
+ except OSError as ex:
+ if ex.errno == errno.ENOENT:
+ raise errors.DatabaseDoesNotExist()
+ raise
+
+ @staticmethod
+ def register_implementation(klass):
+ """Register that we implement an SQLiteDatabase.
+
+ The attribute _index_storage_value will be used as the lookup key.
+ """
+ SQLiteDatabase._sqlite_registry[klass._index_storage_value] = klass
+
+ def _get_sqlite_handle(self):
+ """Get access to the underlying sqlite database.
+
+ This should only be used by the test suite, etc, for examining the
+ state of the underlying database.
+ """
+ return self._db_handle
+
+ def _close_sqlite_handle(self):
+ """Release access to the underlying sqlite database."""
+ self._db_handle.close()
+
+ def close(self):
+ self._close_sqlite_handle()
+
+ def _is_initialized(self, c):
+ """Check if this database has been initialized."""
+ c.execute("PRAGMA case_sensitive_like=ON")
+ try:
+ c.execute("SELECT value FROM u1db_config"
+ " WHERE name = 'sql_schema'")
+ except dbapi2.OperationalError:
+ # The table does not exist yet
+ val = None
+ else:
+ val = c.fetchone()
+ if val is not None:
+ return True
+ return False
+
+ def _initialize(self, c):
+ """Create the schema in the database."""
+ # read the script with sql commands
+ # TODO: Change how we set up the dependency. Most likely use something
+ # like lp:dirspec to grab the file from a common resource
+ # directory. Doesn't specifically need to be handled until we get
+ # to the point of packaging this.
+ schema_content = pkg_resources.resource_string(
+ __name__, 'dbschema.sql')
+ # Note: We'd like to use c.executescript() here, but it seems that
+ # executescript always commits, even if you set
+ # isolation_level = None, so if we want to properly handle
+ # exclusive locking and rollbacks between processes, we need
+ # to execute it line-by-line
+ for line in schema_content.split(';'):
+ if not line:
+ continue
+ c.execute(line)
+ # add extra fields
+ self._extra_schema_init(c)
+ # A unique identifier should be set for this replica. Implementations
+ # don't have to strictly use uuid here, but we do want the uid to be
+ # unique amongst all databases that will sync with each other.
+ # We might extend this to using something with hostname for easier
+ # debugging.
+ self._set_replica_uid_in_transaction(uuid.uuid4().hex)
+ c.execute("INSERT INTO u1db_config VALUES" " ('index_storage', ?)",
+ (self._index_storage_value,))
+
+ def _ensure_schema(self):
+ """Ensure that the database schema has been created."""
+ old_isolation_level = self._db_handle.isolation_level
+ c = self._db_handle.cursor()
+ if self._is_initialized(c):
+ return
+ try:
+ # autocommit/own mgmt of transactions
+ self._db_handle.isolation_level = None
+ with self._db_handle:
+ # only one execution path should initialize the db
+ c.execute("begin exclusive")
+ if self._is_initialized(c):
+ return
+ self._initialize(c)
+ finally:
+ self._db_handle.isolation_level = old_isolation_level
+
+ def _extra_schema_init(self, c):
+ """Add any extra fields, etc to the basic table definitions."""
+
+ def _parse_index_definition(self, index_field):
+ """Parse a field definition for an index, returning a Getter."""
+ # Note: We may want to keep a Parser object around, and cache the
+ # Getter objects for a greater length of time. Specifically, if
+ # you create a bunch of indexes, and then insert 50k docs, you'll
+ # re-parse the indexes between puts. The time to insert the docs
+ # is still likely to dominate put_doc time, though.
+ parser = query_parser.Parser()
+ getter = parser.parse(index_field)
+ return getter
+
+ def _update_indexes(self, doc_id, raw_doc, getters, db_cursor):
+ """Update document_fields for a single document.
+
+ :param doc_id: Identifier for this document
+ :param raw_doc: The python dict representation of the document.
+ :param getters: A list of [(field_name, Getter)]. Getter.get will be
+ called to evaluate the index definition for this document, and the
+ results will be inserted into the db.
+ :param db_cursor: An sqlite Cursor.
+ :return: None
+ """
+ values = []
+ for field_name, getter in getters:
+ for idx_value in getter.get(raw_doc):
+ values.append((doc_id, field_name, idx_value))
+ if values:
+ db_cursor.executemany(
+ "INSERT INTO document_fields VALUES (?, ?, ?)", values)
+
+ def _set_replica_uid(self, replica_uid):
+ """Force the replica_uid to be set."""
+ with self._db_handle:
+ self._set_replica_uid_in_transaction(replica_uid)
+
+ def _set_replica_uid_in_transaction(self, replica_uid):
+ """Set the replica_uid. A transaction should already be held."""
+ c = self._db_handle.cursor()
+ c.execute("INSERT OR REPLACE INTO u1db_config"
+ " VALUES ('replica_uid', ?)",
+ (replica_uid,))
+ self._real_replica_uid = replica_uid
+
+ def _get_replica_uid(self):
+ if self._real_replica_uid is not None:
+ return self._real_replica_uid
+ c = self._db_handle.cursor()
+ c.execute("SELECT value FROM u1db_config WHERE name = 'replica_uid'")
+ val = c.fetchone()
+ if val is None:
+ return None
+ self._real_replica_uid = val[0]
+ return self._real_replica_uid
+
+ _replica_uid = property(_get_replica_uid)
+
+ def _get_generation(self):
+ c = self._db_handle.cursor()
+ c.execute('SELECT max(generation) FROM transaction_log')
+ val = c.fetchone()[0]
+ if val is None:
+ return 0
+ return val
+
+ def _get_generation_info(self):
+ c = self._db_handle.cursor()
+ c.execute(
+ 'SELECT max(generation), transaction_id FROM transaction_log ')
+ val = c.fetchone()
+ if val[0] is None:
+ return(0, '')
+ return val
+
+ def _get_trans_id_for_gen(self, generation):
+ if generation == 0:
+ return ''
+ c = self._db_handle.cursor()
+ c.execute(
+ 'SELECT transaction_id FROM transaction_log WHERE generation = ?',
+ (generation,))
+ val = c.fetchone()
+ if val is None:
+ raise errors.InvalidGeneration
+ return val[0]
+
+ def _get_transaction_log(self):
+ c = self._db_handle.cursor()
+ c.execute("SELECT doc_id, transaction_id FROM transaction_log"
+ " ORDER BY generation")
+ return c.fetchall()
+
+ def _get_doc(self, doc_id, check_for_conflicts=False):
+ """Get just the document content, without fancy handling."""
+ c = self._db_handle.cursor()
+ if check_for_conflicts:
+ c.execute(
+ "SELECT document.doc_rev, document.content, "
+ "count(conflicts.doc_rev) FROM document LEFT OUTER JOIN "
+ "conflicts ON conflicts.doc_id = document.doc_id WHERE "
+ "document.doc_id = ? GROUP BY document.doc_id, "
+ "document.doc_rev, document.content;", (doc_id,))
+ else:
+ c.execute(
+ "SELECT doc_rev, content, 0 FROM document WHERE doc_id = ?",
+ (doc_id,))
+ val = c.fetchone()
+ if val is None:
+ return None
+ doc_rev, content, conflicts = val
+ doc = self._factory(doc_id, doc_rev, content)
+ doc.has_conflicts = conflicts > 0
+ return doc
+
+ def _has_conflicts(self, doc_id):
+ c = self._db_handle.cursor()
+ c.execute("SELECT 1 FROM conflicts WHERE doc_id = ? LIMIT 1",
+ (doc_id,))
+ val = c.fetchone()
+ if val is None:
+ return False
+ else:
+ return True
+
+ def get_doc(self, doc_id, include_deleted=False):
+ doc = self._get_doc(doc_id, check_for_conflicts=True)
+ if doc is None:
+ return None
+ if doc.is_tombstone() and not include_deleted:
+ return None
+ return doc
+
+ def get_all_docs(self, include_deleted=False):
+ """Get all documents from the database."""
+ generation = self._get_generation()
+ results = []
+ c = self._db_handle.cursor()
+ c.execute(
+ "SELECT document.doc_id, document.doc_rev, document.content, "
+ "count(conflicts.doc_rev) FROM document LEFT OUTER JOIN conflicts "
+ "ON conflicts.doc_id = document.doc_id GROUP BY document.doc_id, "
+ "document.doc_rev, document.content;")
+ rows = c.fetchall()
+ for doc_id, doc_rev, content, conflicts in rows:
+ if content is None and not include_deleted:
+ continue
+ doc = self._factory(doc_id, doc_rev, content)
+ doc.has_conflicts = conflicts > 0
+ results.append(doc)
+ return (generation, results)
+
+ def put_doc(self, doc):
+ if doc.doc_id is None:
+ raise errors.InvalidDocId()
+ self._check_doc_id(doc.doc_id)
+ self._check_doc_size(doc)
+ with self._db_handle:
+ old_doc = self._get_doc(doc.doc_id, check_for_conflicts=True)
+ if old_doc and old_doc.has_conflicts:
+ raise errors.ConflictedDoc()
+ if old_doc and doc.rev is None and old_doc.is_tombstone():
+ new_rev = self._allocate_doc_rev(old_doc.rev)
+ else:
+ if old_doc is not None:
+ if old_doc.rev != doc.rev:
+ raise errors.RevisionConflict()
+ else:
+ if doc.rev is not None:
+ raise errors.RevisionConflict()
+ new_rev = self._allocate_doc_rev(doc.rev)
+ doc.rev = new_rev
+ self._put_and_update_indexes(old_doc, doc)
+ return new_rev
+
+ def _expand_to_fields(self, doc_id, base_field, raw_doc, save_none):
+ """Convert a dict representation into named fields.
+
+ So something like: {'key1': 'val1', 'key2': 'val2'}
+ gets converted into: [(doc_id, 'key1', 'val1', 0)
+ (doc_id, 'key2', 'val2', 0)]
+ :param doc_id: Just added to every record.
+ :param base_field: if set, these are nested keys, so each field should
+ be appropriately prefixed.
+ :param raw_doc: The python dictionary.
+ """
+ # TODO: Handle lists
+ values = []
+ for field_name, value in raw_doc.iteritems():
+ if value is None and not save_none:
+ continue
+ if base_field:
+ full_name = base_field + '.' + field_name
+ else:
+ full_name = field_name
+ if value is None or isinstance(value, (int, float, basestring)):
+ values.append((doc_id, full_name, value, len(values)))
+ else:
+ subvalues = self._expand_to_fields(doc_id, full_name, value,
+ save_none)
+ for _, subfield_name, val, _ in subvalues:
+ values.append((doc_id, subfield_name, val, len(values)))
+ return values
+
+ def _put_and_update_indexes(self, old_doc, doc):
+ """Actually insert a document into the database.
+
+ This both updates the existing documents content, and any indexes that
+ refer to this document.
+ """
+ raise NotImplementedError(self._put_and_update_indexes)
+
+ def whats_changed(self, old_generation=0):
+ c = self._db_handle.cursor()
+ c.execute("SELECT generation, doc_id, transaction_id"
+ " FROM transaction_log"
+ " WHERE generation > ? ORDER BY generation DESC",
+ (old_generation,))
+ results = c.fetchall()
+ cur_gen = old_generation
+ seen = set()
+ changes = []
+ newest_trans_id = ''
+ for generation, doc_id, trans_id in results:
+ if doc_id not in seen:
+ changes.append((doc_id, generation, trans_id))
+ seen.add(doc_id)
+ if changes:
+ cur_gen = changes[0][1] # max generation
+ newest_trans_id = changes[0][2]
+ changes.reverse()
+ else:
+ c.execute("SELECT generation, transaction_id"
+ " FROM transaction_log ORDER BY generation DESC LIMIT 1")
+ results = c.fetchone()
+ if not results:
+ cur_gen = 0
+ newest_trans_id = ''
+ else:
+ cur_gen, newest_trans_id = results
+
+ return cur_gen, newest_trans_id, changes
+
+ def delete_doc(self, doc):
+ with self._db_handle:
+ old_doc = self._get_doc(doc.doc_id, check_for_conflicts=True)
+ if old_doc is None:
+ raise errors.DocumentDoesNotExist
+ if old_doc.rev != doc.rev:
+ raise errors.RevisionConflict()
+ if old_doc.is_tombstone():
+ raise errors.DocumentAlreadyDeleted
+ if old_doc.has_conflicts:
+ raise errors.ConflictedDoc()
+ new_rev = self._allocate_doc_rev(doc.rev)
+ doc.rev = new_rev
+ doc.make_tombstone()
+ self._put_and_update_indexes(old_doc, doc)
+ return new_rev
+
+ def _get_conflicts(self, doc_id):
+ c = self._db_handle.cursor()
+ c.execute("SELECT doc_rev, content FROM conflicts WHERE doc_id = ?",
+ (doc_id,))
+ return [self._factory(doc_id, doc_rev, content)
+ for doc_rev, content in c.fetchall()]
+
+ def get_doc_conflicts(self, doc_id):
+ with self._db_handle:
+ conflict_docs = self._get_conflicts(doc_id)
+ if not conflict_docs:
+ return []
+ this_doc = self._get_doc(doc_id)
+ this_doc.has_conflicts = True
+ return [this_doc] + conflict_docs
+
+ def _get_replica_gen_and_trans_id(self, other_replica_uid):
+ c = self._db_handle.cursor()
+ c.execute("SELECT known_generation, known_transaction_id FROM sync_log"
+ " WHERE replica_uid = ?",
+ (other_replica_uid,))
+ val = c.fetchone()
+ if val is None:
+ other_gen = 0
+ trans_id = ''
+ else:
+ other_gen = val[0]
+ trans_id = val[1]
+ return other_gen, trans_id
+
+ def _set_replica_gen_and_trans_id(self, other_replica_uid,
+ other_generation, other_transaction_id):
+ with self._db_handle:
+ self._do_set_replica_gen_and_trans_id(
+ other_replica_uid, other_generation, other_transaction_id)
+
+ def _do_set_replica_gen_and_trans_id(self, other_replica_uid,
+ other_generation,
+ other_transaction_id):
+ c = self._db_handle.cursor()
+ c.execute("INSERT OR REPLACE INTO sync_log VALUES (?, ?, ?)",
+ (other_replica_uid, other_generation,
+ other_transaction_id))
+
+ def _put_doc_if_newer(self, doc, save_conflict, replica_uid=None,
+ replica_gen=None, replica_trans_id=None):
+ return super(SQLiteDatabase, self)._put_doc_if_newer(
+ doc,
+ save_conflict=save_conflict,
+ replica_uid=replica_uid, replica_gen=replica_gen,
+ replica_trans_id=replica_trans_id)
+
+ def _add_conflict(self, c, doc_id, my_doc_rev, my_content):
+ c.execute("INSERT INTO conflicts VALUES (?, ?, ?)",
+ (doc_id, my_doc_rev, my_content))
+
+ def _delete_conflicts(self, c, doc, conflict_revs):
+ deleting = [(doc.doc_id, c_rev) for c_rev in conflict_revs]
+ c.executemany("DELETE FROM conflicts"
+ " WHERE doc_id=? AND doc_rev=?", deleting)
+ doc.has_conflicts = self._has_conflicts(doc.doc_id)
+
+ def _prune_conflicts(self, doc, doc_vcr):
+ if self._has_conflicts(doc.doc_id):
+ autoresolved = False
+ c_revs_to_prune = []
+ for c_doc in self._get_conflicts(doc.doc_id):
+ c_vcr = vectorclock.VectorClockRev(c_doc.rev)
+ if doc_vcr.is_newer(c_vcr):
+ c_revs_to_prune.append(c_doc.rev)
+ elif doc.same_content_as(c_doc):
+ c_revs_to_prune.append(c_doc.rev)
+ doc_vcr.maximize(c_vcr)
+ autoresolved = True
+ if autoresolved:
+ doc_vcr.increment(self._replica_uid)
+ doc.rev = doc_vcr.as_str()
+ c = self._db_handle.cursor()
+ self._delete_conflicts(c, doc, c_revs_to_prune)
+
+ def _force_doc_sync_conflict(self, doc):
+ my_doc = self._get_doc(doc.doc_id)
+ c = self._db_handle.cursor()
+ self._prune_conflicts(doc, vectorclock.VectorClockRev(doc.rev))
+ self._add_conflict(c, doc.doc_id, my_doc.rev, my_doc.get_json())
+ doc.has_conflicts = True
+ self._put_and_update_indexes(my_doc, doc)
+
+ def resolve_doc(self, doc, conflicted_doc_revs):
+ with self._db_handle:
+ cur_doc = self._get_doc(doc.doc_id)
+ # TODO: https://bugs.launchpad.net/u1db/+bug/928274
+ # I think we have a logic bug in resolve_doc
+ # Specifically, cur_doc.rev is always in the final vector
+ # clock of revisions that we supersede, even if it wasn't in
+ # conflicted_doc_revs. We still add it as a conflict, but the
+ # fact that _put_doc_if_newer propagates resolutions means I
+ # think that conflict could accidentally be resolved. We need
+ # to add a test for this case first. (create a rev, create a
+ # conflict, create another conflict, resolve the first rev
+ # and first conflict, then make sure that the resolved
+ # rev doesn't supersede the second conflict rev.) It *might*
+ # not matter, because the superseding rev is in as a
+ # conflict, but it does seem incorrect
+ new_rev = self._ensure_maximal_rev(cur_doc.rev,
+ conflicted_doc_revs)
+ superseded_revs = set(conflicted_doc_revs)
+ c = self._db_handle.cursor()
+ doc.rev = new_rev
+ if cur_doc.rev in superseded_revs:
+ self._put_and_update_indexes(cur_doc, doc)
+ else:
+ self._add_conflict(c, doc.doc_id, new_rev, doc.get_json())
+ # TODO: Is there some way that we could construct a rev that would
+ # end up in superseded_revs, such that we add a conflict, and
+ # then immediately delete it?
+ self._delete_conflicts(c, doc, superseded_revs)
+
+ def list_indexes(self):
+ """Return the list of indexes and their definitions."""
+ c = self._db_handle.cursor()
+ # TODO: How do we test the ordering?
+ c.execute("SELECT name, field FROM index_definitions"
+ " ORDER BY name, offset")
+ definitions = []
+ cur_name = None
+ for name, field in c.fetchall():
+ if cur_name != name:
+ definitions.append((name, []))
+ cur_name = name
+ definitions[-1][-1].append(field)
+ return definitions
+
+ def _get_index_definition(self, index_name):
+ """Return the stored definition for a given index_name."""
+ c = self._db_handle.cursor()
+ c.execute("SELECT field FROM index_definitions"
+ " WHERE name = ? ORDER BY offset", (index_name,))
+ fields = [x[0] for x in c.fetchall()]
+ if not fields:
+ raise errors.IndexDoesNotExist
+ return fields
+
+ @staticmethod
+ def _strip_glob(value):
+ """Remove the trailing * from a value."""
+ assert value[-1] == '*'
+ return value[:-1]
+
+ def _format_query(self, definition, key_values):
+ # First, build the definition. We join the document_fields table
+ # against itself, as many times as the 'width' of our definition.
+ # We then do a query for each key_value, one-at-a-time.
+ # Note: All of these strings are static, we could cache them, etc.
+ tables = ["document_fields d%d" % i for i in range(len(definition))]
+ novalue_where = ["d.doc_id = d%d.doc_id"
+ " AND d%d.field_name = ?"
+ % (i, i) for i in range(len(definition))]
+ wildcard_where = [novalue_where[i] +
+ (" AND d%d.value NOT NULL" % (i,))
+ for i in range(len(definition))]
+ exact_where = [novalue_where[i] +
+ (" AND d%d.value = ?" % (i,))
+ for i in range(len(definition))]
+ like_where = [novalue_where[i] +
+ (" AND d%d.value GLOB ?" % (i,))
+ for i in range(len(definition))]
+ is_wildcard = False
+ # Merge the lists together, so that:
+ # [field1, field2, field3], [val1, val2, val3]
+ # Becomes:
+ # (field1, val1, field2, val2, field3, val3)
+ args = []
+ where = []
+ for idx, (field, value) in enumerate(zip(definition, key_values)):
+ args.append(field)
+ if value.endswith('*'):
+ if value == '*':
+ where.append(wildcard_where[idx])
+ else:
+ # This is a glob match
+ if is_wildcard:
+ # We can't have a partial wildcard following
+ # another wildcard
+ raise errors.InvalidGlobbing
+ where.append(like_where[idx])
+ args.append(value)
+ is_wildcard = True
+ else:
+ if is_wildcard:
+ raise errors.InvalidGlobbing
+ where.append(exact_where[idx])
+ args.append(value)
+ statement = (
+ "SELECT d.doc_id, d.doc_rev, d.content, count(c.doc_rev) FROM "
+ "document d, %s LEFT OUTER JOIN conflicts c ON c.doc_id = "
+ "d.doc_id WHERE %s GROUP BY d.doc_id, d.doc_rev, d.content ORDER "
+ "BY %s;" % (', '.join(tables), ' AND '.join(where), ', '.join(
+ ['d%d.value' % i for i in range(len(definition))])))
+ return statement, args
+
+ def get_from_index(self, index_name, *key_values):
+ definition = self._get_index_definition(index_name)
+ if len(key_values) != len(definition):
+ raise errors.InvalidValueForIndex()
+ statement, args = self._format_query(definition, key_values)
+ c = self._db_handle.cursor()
+ try:
+ c.execute(statement, tuple(args))
+ except dbapi2.OperationalError as e:
+ raise dbapi2.OperationalError(
+ str(e) +
+ '\nstatement: %s\nargs: %s\n' % (statement, args))
+ res = c.fetchall()
+ results = []
+ for row in res:
+ doc = self._factory(row[0], row[1], row[2])
+ doc.has_conflicts = row[3] > 0
+ results.append(doc)
+ return results
+
+ def _format_range_query(self, definition, start_value, end_value):
+ tables = ["document_fields d%d" % i for i in range(len(definition))]
+ novalue_where = [
+ "d.doc_id = d%d.doc_id AND d%d.field_name = ?" % (i, i) for i in
+ range(len(definition))]
+ wildcard_where = [
+ novalue_where[i] + (" AND d%d.value NOT NULL" % (i,)) for i in
+ range(len(definition))]
+ like_where = [
+ novalue_where[i] + (
+ " AND (d%d.value < ? OR d%d.value GLOB ?)" % (i, i)) for i in
+ range(len(definition))]
+ range_where_lower = [
+ novalue_where[i] + (" AND d%d.value >= ?" % (i,)) for i in
+ range(len(definition))]
+ range_where_upper = [
+ novalue_where[i] + (" AND d%d.value <= ?" % (i,)) for i in
+ range(len(definition))]
+ args = []
+ where = []
+ if start_value:
+ if isinstance(start_value, basestring):
+ start_value = (start_value,)
+ if len(start_value) != len(definition):
+ raise errors.InvalidValueForIndex()
+ is_wildcard = False
+ for idx, (field, value) in enumerate(zip(definition, start_value)):
+ args.append(field)
+ if value.endswith('*'):
+ if value == '*':
+ where.append(wildcard_where[idx])
+ else:
+ # This is a glob match
+ if is_wildcard:
+ # We can't have a partial wildcard following
+ # another wildcard
+ raise errors.InvalidGlobbing
+ where.append(range_where_lower[idx])
+ args.append(self._strip_glob(value))
+ is_wildcard = True
+ else:
+ if is_wildcard:
+ raise errors.InvalidGlobbing
+ where.append(range_where_lower[idx])
+ args.append(value)
+ if end_value:
+ if isinstance(end_value, basestring):
+ end_value = (end_value,)
+ if len(end_value) != len(definition):
+ raise errors.InvalidValueForIndex()
+ is_wildcard = False
+ for idx, (field, value) in enumerate(zip(definition, end_value)):
+ args.append(field)
+ if value.endswith('*'):
+ if value == '*':
+ where.append(wildcard_where[idx])
+ else:
+ # This is a glob match
+ if is_wildcard:
+ # We can't have a partial wildcard following
+ # another wildcard
+ raise errors.InvalidGlobbing
+ where.append(like_where[idx])
+ args.append(self._strip_glob(value))
+ args.append(value)
+ is_wildcard = True
+ else:
+ if is_wildcard:
+ raise errors.InvalidGlobbing
+ where.append(range_where_upper[idx])
+ args.append(value)
+ statement = (
+ "SELECT d.doc_id, d.doc_rev, d.content, count(c.doc_rev) FROM "
+ "document d, %s LEFT OUTER JOIN conflicts c ON c.doc_id = "
+ "d.doc_id WHERE %s GROUP BY d.doc_id, d.doc_rev, d.content ORDER "
+ "BY %s;" % (', '.join(tables), ' AND '.join(where), ', '.join(
+ ['d%d.value' % i for i in range(len(definition))])))
+ return statement, args
+
+ def get_range_from_index(self, index_name, start_value=None,
+ end_value=None):
+ """Return all documents with key values in the specified range."""
+ definition = self._get_index_definition(index_name)
+ statement, args = self._format_range_query(
+ definition, start_value, end_value)
+ c = self._db_handle.cursor()
+ try:
+ c.execute(statement, tuple(args))
+ except dbapi2.OperationalError as e:
+ raise dbapi2.OperationalError(
+ str(e) +
+ '\nstatement: %s\nargs: %s\n' % (statement, args))
+ res = c.fetchall()
+ results = []
+ for row in res:
+ doc = self._factory(row[0], row[1], row[2])
+ doc.has_conflicts = row[3] > 0
+ results.append(doc)
+ return results
+
+ def get_index_keys(self, index_name):
+ c = self._db_handle.cursor()
+ definition = self._get_index_definition(index_name)
+ value_fields = ', '.join([
+ 'd%d.value' % i for i in range(len(definition))])
+ tables = ["document_fields d%d" % i for i in range(len(definition))]
+ novalue_where = [
+ "d.doc_id = d%d.doc_id AND d%d.field_name = ?" % (i, i) for i in
+ range(len(definition))]
+ where = [
+ novalue_where[i] + (" AND d%d.value NOT NULL" % (i,)) for i in
+ range(len(definition))]
+ statement = (
+ "SELECT %s FROM document d, %s WHERE %s GROUP BY %s;" % (
+ value_fields, ', '.join(tables), ' AND '.join(where),
+ value_fields))
+ try:
+ c.execute(statement, tuple(definition))
+ except dbapi2.OperationalError as e:
+ raise dbapi2.OperationalError(
+ str(e) +
+ '\nstatement: %s\nargs: %s\n' % (statement, tuple(definition)))
+ return c.fetchall()
+
+ def delete_index(self, index_name):
+ with self._db_handle:
+ c = self._db_handle.cursor()
+ c.execute("DELETE FROM index_definitions WHERE name = ?",
+ (index_name,))
+ c.execute(
+ "DELETE FROM document_fields WHERE document_fields.field_name "
+ " NOT IN (SELECT field from index_definitions)")
+
+
+class SQLiteSyncTarget(CommonSyncTarget):
+
+ def get_sync_info(self, source_replica_uid):
+ source_gen, source_trans_id = self._db._get_replica_gen_and_trans_id(
+ source_replica_uid)
+ my_gen, my_trans_id = self._db._get_generation_info()
+ return (
+ self._db._replica_uid, my_gen, my_trans_id, source_gen,
+ source_trans_id)
+
+ def record_sync_info(self, source_replica_uid, source_replica_generation,
+ source_replica_transaction_id):
+ if self._trace_hook:
+ self._trace_hook('record_sync_info')
+ self._db._set_replica_gen_and_trans_id(
+ source_replica_uid, source_replica_generation,
+ source_replica_transaction_id)
+
+
+class SQLitePartialExpandDatabase(SQLiteDatabase):
+ """An SQLite Backend that expands documents into a document_field table.
+
+ It stores the original document text in document.doc. For fields that are
+ indexed, the data goes into document_fields.
+ """
+
+ _index_storage_value = 'expand referenced'
+
+ def _get_indexed_fields(self):
+ """Determine what fields are indexed."""
+ c = self._db_handle.cursor()
+ c.execute("SELECT field FROM index_definitions")
+ return set([x[0] for x in c.fetchall()])
+
+ def _evaluate_index(self, raw_doc, field):
+ parser = query_parser.Parser()
+ getter = parser.parse(field)
+ return getter.get(raw_doc)
+
+ def _put_and_update_indexes(self, old_doc, doc):
+ c = self._db_handle.cursor()
+ if doc and not doc.is_tombstone():
+ raw_doc = json.loads(doc.get_json())
+ else:
+ raw_doc = {}
+ if old_doc is not None:
+ c.execute("UPDATE document SET doc_rev=?, content=?"
+ " WHERE doc_id = ?",
+ (doc.rev, doc.get_json(), doc.doc_id))
+ c.execute("DELETE FROM document_fields WHERE doc_id = ?",
+ (doc.doc_id,))
+ else:
+ c.execute("INSERT INTO document (doc_id, doc_rev, content)"
+ " VALUES (?, ?, ?)",
+ (doc.doc_id, doc.rev, doc.get_json()))
+ indexed_fields = self._get_indexed_fields()
+ if indexed_fields:
+ # It is expected that len(indexed_fields) is shorter than
+ # len(raw_doc)
+ getters = [(field, self._parse_index_definition(field))
+ for field in indexed_fields]
+ self._update_indexes(doc.doc_id, raw_doc, getters, c)
+ trans_id = self._allocate_transaction_id()
+ c.execute("INSERT INTO transaction_log(doc_id, transaction_id)"
+ " VALUES (?, ?)", (doc.doc_id, trans_id))
+
+ def create_index(self, index_name, *index_expressions):
+ with self._db_handle:
+ c = self._db_handle.cursor()
+ cur_fields = self._get_indexed_fields()
+ definition = [(index_name, idx, field)
+ for idx, field in enumerate(index_expressions)]
+ try:
+ c.executemany("INSERT INTO index_definitions VALUES (?, ?, ?)",
+ definition)
+ except dbapi2.IntegrityError as e:
+ stored_def = self._get_index_definition(index_name)
+ if stored_def == [x[-1] for x in definition]:
+ return
+ raise errors.IndexNameTakenError(
+ str(e) +
+ str(sys.exc_info()[2])
+ )
+ new_fields = set(
+ [f for f in index_expressions if f not in cur_fields])
+ if new_fields:
+ self._update_all_indexes(new_fields)
+
+ def _iter_all_docs(self):
+ c = self._db_handle.cursor()
+ c.execute("SELECT doc_id, content FROM document")
+ while True:
+ next_rows = c.fetchmany()
+ if not next_rows:
+ break
+ for row in next_rows:
+ yield row
+
+ def _update_all_indexes(self, new_fields):
+ """Iterate all the documents, and add content to document_fields.
+
+ :param new_fields: The index definitions that need to be added.
+ """
+ getters = [(field, self._parse_index_definition(field))
+ for field in new_fields]
+ c = self._db_handle.cursor()
+ for doc_id, doc in self._iter_all_docs():
+ if doc is None:
+ continue
+ raw_doc = json.loads(doc)
+ self._update_indexes(doc_id, raw_doc, getters, c)
+
+
+SQLiteDatabase.register_implementation(SQLitePartialExpandDatabase)
diff --git a/src/leap/soledad/client/_document.py b/src/leap/soledad/client/_document.py
new file mode 100644
index 00000000..9c8577cb
--- /dev/null
+++ b/src/leap/soledad/client/_document.py
@@ -0,0 +1,254 @@
+# -*- coding: utf-8 -*-
+# _document.py
+# Copyright (C) 2017 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Public interfaces for adding extra client features to the generic
+SoledadDocument.
+"""
+
+import weakref
+import uuid
+
+from twisted.internet import defer
+
+from zope.interface import Interface
+from zope.interface import implementer
+
+from leap.soledad.common.document import SoledadDocument
+
+
+class IDocumentWithAttachment(Interface):
+ """
+ A document that can have an attachment.
+ """
+
+ def set_store(self, store):
+ """
+ Set the store used by this file to manage attachments.
+
+ :param store: The store used to manage attachments.
+ :type store: Soledad
+ """
+
+ def put_attachment(self, fd):
+ """
+ Attach data to this document.
+
+ Add the attachment to local storage, enqueue for upload.
+
+ The document content will be updated with a pointer to the attachment,
+ but the document has to be manually put in the database to reflect
+ modifications.
+
+ :param fd: A file-like object whose content will be attached to this
+ document.
+ :type fd: file-like
+
+ :return: A deferred which fires when the attachment has been added to
+ local storage.
+ :rtype: Deferred
+ """
+
+ def get_attachment(self):
+ """
+ Return the data attached to this document.
+
+ If document content contains a pointer to the attachment, try to get
+ the attachment from local storage and, if not found, from remote
+ storage.
+
+ :return: A deferred which fires with a file like-object whose content
+ is the attachment of this document, or None if nothing is
+ attached.
+ :rtype: Deferred
+ """
+
+ def delete_attachment(self):
+ """
+ Delete the attachment of this document.
+
+ The pointer to the attachment will be removed from the document
+ content, but the document has to be manually put in the database to
+ reflect modifications.
+
+ :return: A deferred which fires when the attachment has been deleted
+ from local storage.
+ :rtype: Deferred
+ """
+
+ def get_attachment_state(self):
+ """
+ Return the state of the attachment of this document.
+
+ The state is a member of AttachmentStates and is of one of NONE,
+ LOCAL, REMOTE or SYNCED.
+
+ :return: A deferred which fires with The state of the attachment of
+ this document.
+ :rtype: Deferred
+ """
+
+ def is_dirty(self):
+ """
+ Return whether this document's content differs from the contents stored
+ in local database.
+
+ :return: A deferred which fires with True or False, depending on
+ whether this document is dirty or not.
+ :rtype: Deferred
+ """
+
+ def upload_attachment(self):
+ """
+ Upload this document's attachment.
+
+ :return: A deferred which fires with the state of the attachment after
+ it's been uploaded, or NONE if there's no attachment for this
+ document.
+ :rtype: Deferred
+ """
+
+ def download_attachment(self):
+ """
+ Download this document's attachment.
+
+ :return: A deferred which fires with the state of the attachment after
+ it's been downloaded, or NONE if there's no attachment for
+ this document.
+ :rtype: Deferred
+ """
+
+
+class BlobDoc(object):
+
+ # TODO probably not needed, but convenient for testing for now.
+
+ def __init__(self, content, blob_id):
+
+ self.blob_id = blob_id
+ self.is_blob = True
+ self.blob_fd = content
+ if blob_id is None:
+ blob_id = uuid.uuid4().get_hex()
+ self.blob_id = blob_id
+
+
+class AttachmentStates(object):
+ NONE = 0
+ LOCAL = 1
+ REMOTE = 2
+ SYNCED = 4
+
+
+@implementer(IDocumentWithAttachment)
+class Document(SoledadDocument):
+
+ def __init__(self, doc_id=None, rev=None, json='{}', has_conflicts=False,
+ syncable=True, store=None):
+ SoledadDocument.__init__(self, doc_id=doc_id, rev=rev, json=json,
+ has_conflicts=has_conflicts,
+ syncable=syncable)
+ self.set_store(store)
+
+ #
+ # properties
+ #
+
+ @property
+ def _manager(self):
+ if not self.store or not hasattr(self.store, 'blobmanager'):
+ raise Exception('No blob manager found to manage attachments.')
+ return self.store.blobmanager
+
+ @property
+ def _blob_id(self):
+ if self.content and 'blob_id' in self.content:
+ return self.content['blob_id']
+ return None
+
+ def get_store(self):
+ return self._store() if self._store else None
+
+ def set_store(self, store):
+ self._store = weakref.ref(store) if store else None
+
+ store = property(get_store, set_store)
+
+ #
+ # attachment api
+ #
+
+ def put_attachment(self, fd):
+ # add pointer to content
+ blob_id = self._blob_id or str(uuid.uuid4())
+ if not self.content:
+ self.content = {}
+ self.content['blob_id'] = blob_id
+ # put using manager
+ blob = BlobDoc(fd, blob_id)
+ fd.seek(0, 2)
+ size = fd.tell()
+ fd.seek(0)
+ return self._manager.put(blob, size)
+
+ def get_attachment(self):
+ if not self._blob_id:
+ return defer.succeed(None)
+ return self._manager.get(self._blob_id)
+
+ def delete_attachment(self):
+ raise NotImplementedError
+
+ @defer.inlineCallbacks
+ def get_attachment_state(self):
+ state = AttachmentStates.NONE
+
+ if not self._blob_id:
+ defer.returnValue(state)
+
+ local_list = yield self._manager.local_list()
+ if self._blob_id in local_list:
+ state |= AttachmentStates.LOCAL
+
+ remote_list = yield self._manager.remote_list()
+ if self._blob_id in remote_list:
+ state |= AttachmentStates.REMOTE
+
+ defer.returnValue(state)
+
+ @defer.inlineCallbacks
+ def is_dirty(self):
+ stored = yield self.store.get_doc(self.doc_id)
+ if stored.content != self.content:
+ defer.returnValue(True)
+ defer.returnValue(False)
+
+ @defer.inlineCallbacks
+ def upload_attachment(self):
+ if not self._blob_id:
+ defer.returnValue(AttachmentStates.NONE)
+
+ fd = yield self._manager.get_blob(self._blob_id)
+ # TODO: turn following method into a public one
+ yield self._manager._encrypt_and_upload(self._blob_id, fd)
+ defer.returnValue(self.get_attachment_state())
+
+ @defer.inlineCallbacks
+ def download_attachment(self):
+ if not self._blob_id:
+ defer.returnValue(None)
+ yield self.get_attachment()
+ defer.returnValue(self.get_attachment_state())
diff --git a/src/leap/soledad/client/_http.py b/src/leap/soledad/client/_http.py
new file mode 100644
index 00000000..2a6b9e39
--- /dev/null
+++ b/src/leap/soledad/client/_http.py
@@ -0,0 +1,74 @@
+# -*- coding: utf-8 -*-
+# _http.py
+# Copyright (C) 2017 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+A twisted-based, TLS-pinned, token-authenticated HTTP client.
+"""
+import base64
+
+from twisted.internet import reactor
+from twisted.web.iweb import IAgent
+from twisted.web.client import Agent
+from twisted.web.http_headers import Headers
+
+from treq.client import HTTPClient as _HTTPClient
+
+from zope.interface import implementer
+
+from leap.common.certs import get_compatible_ssl_context_factory
+
+
+__all__ = ['HTTPClient', 'PinnedTokenAgent']
+
+
+class HTTPClient(_HTTPClient):
+
+ def __init__(self, uuid, token, cert_file):
+ self._agent = PinnedTokenAgent(uuid, token, cert_file)
+ super(self.__class__, self).__init__(self._agent)
+
+ def set_token(self, token):
+ self._agent.set_token(token)
+
+
+@implementer(IAgent)
+class PinnedTokenAgent(Agent):
+
+ def __init__(self, uuid, token, cert_file):
+ self._uuid = uuid
+ self._token = None
+ self._creds = None
+ self.set_token(token)
+ # pin this agent with the platform TLS certificate
+ factory = get_compatible_ssl_context_factory(cert_file)
+ Agent.__init__(self, reactor, contextFactory=factory)
+
+ def set_token(self, token):
+ self._token = token
+ self._creds = self._encoded_creds()
+
+ def _encoded_creds(self):
+ creds = '%s:%s' % (self._uuid, self._token)
+ encoded = base64.b64encode(creds)
+ return 'Token %s' % encoded
+
+ def request(self, method, uri, headers=None, bodyProducer=None):
+ # authenticate the request
+ headers = headers or Headers()
+ headers.addRawHeader('Authorization', self._creds)
+ # perform the authenticated request
+ return Agent.request(
+ self, method, uri, headers=headers, bodyProducer=bodyProducer)
diff --git a/src/leap/soledad/client/_pipes.py b/src/leap/soledad/client/_pipes.py
new file mode 100644
index 00000000..eef3f1f9
--- /dev/null
+++ b/src/leap/soledad/client/_pipes.py
@@ -0,0 +1,78 @@
+# -*- coding: utf-8 -*-
+# _pipes.py
+# Copyright (C) 2017 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Components for piping data on streams.
+"""
+from io import BytesIO
+
+
+__all__ = ['TruncatedTailPipe', 'PreamblePipe']
+
+
+class TruncatedTailPipe(object):
+ """
+ Truncate the last `tail_size` bytes from the stream.
+ """
+
+ def __init__(self, output=None, tail_size=16):
+ self.tail_size = tail_size
+ self.output = output or BytesIO()
+ self.buffer = BytesIO()
+
+ def write(self, data):
+ self.buffer.write(data)
+ if self.buffer.tell() > self.tail_size:
+ self._truncate_tail()
+
+ def _truncate_tail(self):
+ overflow_size = self.buffer.tell() - self.tail_size
+ self.buffer.seek(0)
+ self.output.write(self.buffer.read(overflow_size))
+ remaining = self.buffer.read()
+ self.buffer.seek(0)
+ self.buffer.write(remaining)
+ self.buffer.truncate()
+
+ def close(self):
+ return self.output
+
+
+class PreamblePipe(object):
+ """
+ Consumes data until a space is found, then calls a callback with it and
+ starts forwarding data to consumer returned by this callback.
+ """
+
+ def __init__(self, callback):
+ self.callback = callback
+ self.preamble = BytesIO()
+ self.output = None
+
+ def write(self, data):
+ if not self.output:
+ self._write_preamble(data)
+ else:
+ self.output.write(data)
+
+ def _write_preamble(self, data):
+ if ' ' not in data:
+ self.preamble.write(data)
+ return
+ preamble_chunk, remaining = data.split(' ', 1)
+ self.preamble.write(preamble_chunk)
+ self.output = self.callback(self.preamble)
+ self.output.write(remaining)
diff --git a/src/leap/soledad/client/_recovery_code.py b/src/leap/soledad/client/_recovery_code.py
new file mode 100644
index 00000000..04235a29
--- /dev/null
+++ b/src/leap/soledad/client/_recovery_code.py
@@ -0,0 +1,33 @@
+# -*- coding: utf-8 -*-
+# _recovery_code.py
+# Copyright (C) 2017 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import os
+import binascii
+
+from leap.soledad.common.log import getLogger
+
+logger = getLogger(__name__)
+
+
+class RecoveryCode(object):
+
+ # When we turn this string to hex, it will double in size
+ code_length = 6
+
+ def generate(self):
+ logger.info("generating new recovery code...")
+ return binascii.hexlify(os.urandom(self.code_length))
diff --git a/src/leap/soledad/client/_secrets/__init__.py b/src/leap/soledad/client/_secrets/__init__.py
new file mode 100644
index 00000000..b6c81cda
--- /dev/null
+++ b/src/leap/soledad/client/_secrets/__init__.py
@@ -0,0 +1,129 @@
+# -*- coding: utf-8 -*-
+# _secrets/__init__.py
+# Copyright (C) 2016 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import os
+import scrypt
+
+from leap.soledad.common.log import getLogger
+
+from leap.soledad.client._secrets.storage import SecretsStorage
+from leap.soledad.client._secrets.crypto import SecretsCrypto
+from leap.soledad.client._secrets.util import emit, UserDataMixin
+
+
+logger = getLogger(__name__)
+
+
+class Secrets(UserDataMixin):
+
+ lengths = {
+ 'remote_secret': 512, # remote_secret is used to encrypt remote data.
+ 'local_salt': 64, # local_salt is used in conjunction with
+ 'local_secret': 448, # local_secret to derive a local_key for storage
+ }
+
+ def __init__(self, soledad):
+ self._soledad = soledad
+ self._secrets = {}
+ self.crypto = SecretsCrypto(soledad)
+ self.storage = SecretsStorage(soledad)
+ self._bootstrap()
+
+ #
+ # bootstrap
+ #
+
+ def _bootstrap(self):
+
+ # attempt to load secrets from local storage
+ encrypted = self.storage.load_local()
+ if encrypted:
+ self._secrets = self.crypto.decrypt(encrypted)
+ # maybe update the format of storage of local secret.
+ if encrypted['version'] < self.crypto.VERSION:
+ self.store_secrets()
+ return
+
+ # no secret was found in local storage, so this is a first run of
+ # soledad for this user in this device. It is mandatory that we check
+ # if there's a secret stored in server.
+ encrypted = self.storage.load_remote()
+ if encrypted:
+ self._secrets = self.crypto.decrypt(encrypted)
+ self.store_secrets()
+ return
+
+ # we have *not* found a secret neither in local nor in remote storage,
+ # so we have to generate a new one, and then store it.
+ self._secrets = self._generate()
+ self.store_secrets()
+
+ #
+ # generation
+ #
+
+ @emit('creating')
+ def _generate(self):
+ logger.info("generating new set of secrets...")
+ secrets = {}
+ for name, length in self.lengths.iteritems():
+ secret = os.urandom(length)
+ secrets[name] = secret
+ logger.info("new set of secrets successfully generated")
+ return secrets
+
+ #
+ # crypto
+ #
+
+ def store_secrets(self):
+ # TODO: we have to improve the logic here, as we want to make sure that
+ # whatever is stored locally should only be used after remote storage
+ # is successful. Otherwise, this soledad could start encrypting with a
+ # secret while another soledad in another device could start encrypting
+ # with another secret, which would lead to decryption failures during
+ # sync.
+ encrypted = self.crypto.encrypt(self._secrets)
+ self.storage.save_local(encrypted)
+ self.storage.save_remote(encrypted)
+
+ #
+ # secrets
+ #
+
+ @property
+ def remote_secret(self):
+ return self._secrets.get('remote_secret')
+
+ @property
+ def local_salt(self):
+ return self._secrets.get('local_salt')
+
+ @property
+ def local_secret(self):
+ return self._secrets.get('local_secret')
+
+ @property
+ def local_key(self):
+ # local storage key is scrypt-derived from `local_secret` and
+ # `local_salt` above
+ secret = scrypt.hash(
+ password=self.local_secret,
+ salt=self.local_salt,
+ buflen=32, # we need a key with 256 bits (32 bytes)
+ )
+ return secret
diff --git a/src/leap/soledad/client/_secrets/crypto.py b/src/leap/soledad/client/_secrets/crypto.py
new file mode 100644
index 00000000..8148151d
--- /dev/null
+++ b/src/leap/soledad/client/_secrets/crypto.py
@@ -0,0 +1,138 @@
+# -*- coding: utf-8 -*-
+# _secrets/crypto.py
+# Copyright (C) 2016 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import binascii
+import json
+import os
+import scrypt
+
+from leap.soledad.common import soledad_assert
+from leap.soledad.common.log import getLogger
+
+from leap.soledad.client._crypto import encrypt_sym, decrypt_sym, ENC_METHOD
+from leap.soledad.client._secrets.util import SecretsError
+
+
+logger = getLogger(__name__)
+
+
+class SecretsCrypto(object):
+
+ VERSION = 2
+
+ def __init__(self, soledad):
+ self._soledad = soledad
+
+ def _get_key(self, salt):
+ passphrase = self._soledad.passphrase.encode('utf8')
+ key = scrypt.hash(passphrase, salt, buflen=32)
+ return key
+
+ #
+ # encryption
+ #
+
+ def encrypt(self, secrets):
+ encoded = {}
+ for name, value in secrets.iteritems():
+ encoded[name] = binascii.b2a_base64(value)
+ plaintext = json.dumps(encoded)
+ salt = os.urandom(64) # TODO: get salt length from somewhere else
+ key = self._get_key(salt)
+ iv, ciphertext = encrypt_sym(plaintext, key,
+ method=ENC_METHOD.aes_256_gcm)
+ encrypted = {
+ 'version': self.VERSION,
+ 'kdf': 'scrypt',
+ 'kdf_salt': binascii.b2a_base64(salt),
+ 'kdf_length': len(key),
+ 'cipher': ENC_METHOD.aes_256_gcm,
+ 'length': len(plaintext),
+ 'iv': str(iv),
+ 'secrets': binascii.b2a_base64(ciphertext),
+ }
+ return encrypted
+
+ #
+ # decryption
+ #
+
+ def decrypt(self, data):
+ version = data.setdefault('version', 1)
+ method = getattr(self, '_decrypt_v%d' % version)
+ try:
+ return method(data)
+ except Exception as e:
+ logger.error('error decrypting secrets: %r' % e)
+ raise SecretsError(e)
+
+ def _decrypt_v1(self, data):
+ # get encrypted secret from dictionary: the old format allowed for
+ # storage of more than one secret, but this feature was never used and
+ # soledad has been using only one secret so far. As there is a corner
+ # case where the old 'active_secret' key might not be set, we just
+ # ignore it and pop the only secret found in the 'storage_secrets' key.
+ secret_id = data['storage_secrets'].keys().pop()
+ encrypted = data['storage_secrets'][secret_id]
+
+ # assert that we know how to decrypt the secret
+ soledad_assert('cipher' in encrypted)
+ cipher = encrypted['cipher']
+ if cipher == 'aes256':
+ cipher = ENC_METHOD.aes_256_ctr
+ soledad_assert(cipher in ENC_METHOD)
+
+ # decrypt
+ salt = binascii.a2b_base64(encrypted['kdf_salt'])
+ key = self._get_key(salt)
+ separator = ':'
+ iv, ciphertext = encrypted['secret'].split(separator, 1)
+ ciphertext = binascii.a2b_base64(ciphertext)
+ plaintext = self._decrypt(key, iv, ciphertext, encrypted, cipher)
+
+ # create secrets dictionary
+ secrets = {
+ 'remote_secret': plaintext[0:512],
+ 'local_salt': plaintext[512:576],
+ 'local_secret': plaintext[576:1024],
+ }
+ return secrets
+
+ def _decrypt_v2(self, encrypted):
+ cipher = encrypted['cipher']
+ soledad_assert(cipher in ENC_METHOD)
+
+ salt = binascii.a2b_base64(encrypted['kdf_salt'])
+ key = self._get_key(salt)
+ iv = encrypted['iv']
+ ciphertext = binascii.a2b_base64(encrypted['secrets'])
+ plaintext = self._decrypt(
+ key, iv, ciphertext, encrypted, cipher)
+ encoded = json.loads(plaintext)
+ secrets = {}
+ for name, value in encoded.iteritems():
+ secrets[name] = binascii.a2b_base64(value)
+ return secrets
+
+ def _decrypt(self, key, iv, ciphertext, encrypted, method):
+ # assert some properties of the stored secret
+ soledad_assert(encrypted['kdf'] == 'scrypt')
+ soledad_assert(encrypted['kdf_length'] == len(key))
+ # decrypt
+ plaintext = decrypt_sym(ciphertext, key, iv, method)
+ soledad_assert(encrypted['length'] == len(plaintext))
+ return plaintext
diff --git a/src/leap/soledad/client/_secrets/storage.py b/src/leap/soledad/client/_secrets/storage.py
new file mode 100644
index 00000000..85713a48
--- /dev/null
+++ b/src/leap/soledad/client/_secrets/storage.py
@@ -0,0 +1,120 @@
+# -*- coding: utf-8 -*-
+# _secrets/storage.py
+# Copyright (C) 2016 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import json
+import six.moves.urllib.parse as urlparse
+
+from hashlib import sha256
+
+from leap.soledad.common import SHARED_DB_NAME
+from leap.soledad.common.log import getLogger
+
+from leap.soledad.client.shared_db import SoledadSharedDatabase
+from leap.soledad.client._document import Document
+from leap.soledad.client._secrets.util import emit, UserDataMixin
+
+
+logger = getLogger(__name__)
+
+
+class SecretsStorage(UserDataMixin):
+
+ def __init__(self, soledad):
+ self._soledad = soledad
+ self._shared_db = self._soledad.shared_db or self._init_shared_db()
+ self.__remote_doc = None
+
+ @property
+ def _creds(self):
+ uuid = self._soledad.uuid
+ token = self._soledad.token
+ return {'token': {'uuid': uuid, 'token': token}}
+
+ #
+ # local storage
+ #
+
+ def load_local(self):
+ path = self._soledad.secrets_path
+ logger.info("trying to load secrets from disk: %s" % path)
+ try:
+ with open(path, 'r') as f:
+ encrypted = json.loads(f.read())
+ logger.info("secrets loaded successfully from disk")
+ return encrypted
+ except IOError:
+ logger.warn("secrets not found in disk")
+ return None
+
+ def save_local(self, encrypted):
+ path = self._soledad.secrets_path
+ json_data = json.dumps(encrypted)
+ with open(path, 'w') as f:
+ f.write(json_data)
+
+ #
+ # remote storage
+ #
+
+ def _init_shared_db(self):
+ url = urlparse.urljoin(self._soledad.server_url, SHARED_DB_NAME)
+ creds = self._creds
+ db = SoledadSharedDatabase.open_database(url, creds)
+ return db
+
+ def _remote_doc_id(self):
+ passphrase = self._soledad.passphrase.encode('utf8')
+ uuid = self._soledad.uuid
+ text = '%s%s' % (passphrase, uuid)
+ digest = sha256(text).hexdigest()
+ return digest
+
+ @property
+ def _remote_doc(self):
+ if not self.__remote_doc and self._shared_db:
+ doc = self._get_remote_doc()
+ self.__remote_doc = doc
+ return self.__remote_doc
+
+ @emit('downloading')
+ def _get_remote_doc(self):
+ logger.info('trying to load secrets from server...')
+ doc = self._shared_db.get_doc(self._remote_doc_id())
+ if doc:
+ logger.info('secrets loaded successfully from server')
+ else:
+ logger.warn('secrets not found in server')
+ return doc
+
+ def load_remote(self):
+ doc = self._remote_doc
+ if not doc:
+ return None
+ encrypted = doc.content
+ return encrypted
+
+ @emit('uploading')
+ def save_remote(self, encrypted):
+ doc = self._remote_doc
+ if not doc:
+ doc = Document(doc_id=self._remote_doc_id())
+ doc.content = encrypted
+ db = self._shared_db
+ if not db:
+ logger.warn('no shared db found')
+ return
+ db.put_doc(doc)
diff --git a/src/leap/soledad/client/_secrets/util.py b/src/leap/soledad/client/_secrets/util.py
new file mode 100644
index 00000000..6401889b
--- /dev/null
+++ b/src/leap/soledad/client/_secrets/util.py
@@ -0,0 +1,63 @@
+# -*- coding:utf-8 -*-
+# _secrets/util.py
+# Copyright (C) 2016 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+from leap.soledad.client import events
+
+
+class SecretsError(Exception):
+ pass
+
+
+class UserDataMixin(object):
+ """
+ When emitting an event, we have to pass a dictionary containing user data.
+ This class only defines a property so we don't have to define it in
+ multiple places.
+ """
+
+ @property
+ def _user_data(self):
+ uuid = self._soledad.uuid
+ userid = self._soledad.userid
+ # TODO: seems that uuid and userid hold the same value! We should check
+ # whether we should pass something different or if the events api
+ # really needs two different values.
+ return {'uuid': uuid, 'userid': userid}
+
+
+def emit(verb):
+ def _decorator(method):
+ def _decorated(self, *args, **kwargs):
+
+ # emit starting event
+ user_data = self._user_data
+ name = 'SOLEDAD_' + verb.upper() + '_KEYS'
+ event = getattr(events, name)
+ events.emit_async(event, user_data)
+
+ # run the method
+ result = method(self, *args, **kwargs)
+
+ # emit a finished event
+ name = 'SOLEDAD_DONE_' + verb.upper() + '_KEYS'
+ event = getattr(events, name)
+ events.emit_async(event, user_data)
+
+ return result
+ return _decorated
+ return _decorator
diff --git a/src/leap/soledad/client/api.py b/src/leap/soledad/client/api.py
new file mode 100644
index 00000000..c62b43f0
--- /dev/null
+++ b/src/leap/soledad/client/api.py
@@ -0,0 +1,848 @@
+# -*- coding: utf-8 -*-
+# api.py
+# Copyright (C) 2013, 2014 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Soledad - Synchronization Of Locally Encrypted Data Among Devices.
+
+This module holds the public api for Soledad.
+
+Soledad is the part of LEAP that manages storage and synchronization of
+application data. It is built on top of U1DB reference Python API and
+implements (1) a SQLCipher backend for local storage in the client, (2) a
+SyncTarget that encrypts data before syncing, and (3) a CouchDB backend for
+remote storage in the server side.
+"""
+import binascii
+import errno
+import os
+import socket
+import ssl
+import uuid
+
+from itertools import chain
+import six.moves.http_client as httplib
+import six.moves.urllib.parse as urlparse
+from six import StringIO
+from collections import defaultdict
+
+from twisted.internet import defer
+from zope.interface import implementer
+
+from leap.common.config import get_path_prefix
+from leap.common.plugins import collect_plugins
+
+from leap.soledad.common import soledad_assert
+from leap.soledad.common import soledad_assert_type
+from leap.soledad.common.log import getLogger
+from leap.soledad.common.l2db.remote import http_client
+from leap.soledad.common.l2db.remote.ssl_match_hostname import match_hostname
+from leap.soledad.common.errors import DatabaseAccessError
+
+from . import events as soledad_events
+from . import interfaces as soledad_interfaces
+from ._crypto import SoledadCrypto
+from ._db import adbapi
+from ._db import blobs
+from ._db import sqlcipher
+from ._recovery_code import RecoveryCode
+from ._secrets import Secrets
+
+
+logger = getLogger(__name__)
+
+
+# we may want to collect statistics from the sync process
+DO_STATS = False
+if os.environ.get('SOLEDAD_STATS'):
+ DO_STATS = True
+
+
+#
+# Constants
+#
+
+"""
+Path to the certificate file used to certify the SSL connection between
+Soledad client and server.
+"""
+SOLEDAD_CERT = None
+
+
+@implementer(soledad_interfaces.ILocalStorage,
+ soledad_interfaces.ISyncableStorage,
+ soledad_interfaces.ISecretsStorage)
+class Soledad(object):
+ """
+ Soledad provides encrypted data storage and sync.
+
+ A Soledad instance is used to store and retrieve data in a local encrypted
+ database and synchronize this database with Soledad server.
+
+ This class is also responsible for bootstrapping users' account by
+ creating cryptographic secrets and/or storing/fetching them on Soledad
+ server.
+ """
+
+ local_db_file_name = 'soledad.u1db'
+ secrets_file_name = "soledad.json"
+ default_prefix = os.path.join(get_path_prefix(), 'leap', 'soledad')
+
+ """
+ A dictionary that holds locks which avoid multiple sync attempts from the
+ same database replica. The dictionary indexes are the paths to each local
+ db, so we guarantee that only one sync happens for a local db at a time.
+ """
+ _sync_lock = defaultdict(defer.DeferredLock)
+
+ def __init__(self, uuid, passphrase, secrets_path, local_db_path,
+ server_url, cert_file, shared_db=None,
+ auth_token=None):
+ """
+ Initialize configuration, cryptographic keys and dbs.
+
+ :param uuid: User's uuid.
+ :type uuid: str
+
+ :param passphrase:
+ The passphrase for locking and unlocking encryption secrets for
+ local and remote storage.
+ :type passphrase: unicode
+
+ :param secrets_path:
+ Path for storing encrypted key used for symmetric encryption.
+ :type secrets_path: str
+
+ :param local_db_path: Path for local encrypted storage db.
+ :type local_db_path: str
+
+ :param server_url:
+ URL for Soledad server. This is used either to sync with the user's
+ remote db and to interact with the shared recovery database.
+ :type server_url: str
+
+ :param cert_file:
+ Path to the certificate of the ca used to validate the SSL
+ certificate used by the remote soledad server.
+ :type cert_file: str
+
+ :param shared_db:
+ The shared database.
+ :type shared_db: HTTPDatabase
+
+ :param auth_token:
+ Authorization token for accessing remote databases.
+ :type auth_token: str
+
+ :raise BootstrapSequenceError:
+ Raised when the secret initialization sequence (i.e. retrieval
+ from server or generation and storage on server) has failed for
+ some reason.
+ """
+ # store config params
+ self.uuid = uuid
+ self.passphrase = passphrase
+ self.secrets_path = secrets_path
+ self._local_db_path = local_db_path
+ self.server_url = server_url
+ self.shared_db = shared_db
+ self.token = auth_token
+
+ self._dbsyncer = None
+
+ # configure SSL certificate
+ global SOLEDAD_CERT
+ SOLEDAD_CERT = cert_file
+
+ self._init_config_with_defaults()
+ self._init_working_dirs()
+
+ self._recovery_code = RecoveryCode()
+ self._secrets = Secrets(self)
+ self._crypto = SoledadCrypto(self._secrets.remote_secret)
+ self._init_blobmanager()
+
+ try:
+ # initialize database access, trap any problems so we can shutdown
+ # smoothly.
+ self._init_u1db_sqlcipher_backend()
+ self._init_u1db_syncer()
+ except DatabaseAccessError:
+ # oops! something went wrong with backend initialization. We
+ # have to close any thread-related stuff we have already opened
+ # here, otherwise there might be zombie threads that may clog the
+ # reactor.
+ if hasattr(self, '_dbpool'):
+ self._dbpool.close()
+ raise
+
+ #
+ # initialization/destruction methods
+ #
+
+ def _init_config_with_defaults(self):
+ """
+ Initialize configuration using default values for missing params.
+ """
+ soledad_assert_type(self.passphrase, unicode)
+
+ def initialize(attr, val):
+ return ((getattr(self, attr, None) is None) and
+ setattr(self, attr, val))
+
+ initialize("_secrets_path", os.path.join(
+ self.default_prefix, self.secrets_file_name))
+ initialize("_local_db_path", os.path.join(
+ self.default_prefix, self.local_db_file_name))
+ # initialize server_url
+ soledad_assert(self.server_url is not None,
+ 'Missing URL for Soledad server.')
+
+ def _init_working_dirs(self):
+ """
+ Create work directories.
+
+ :raise OSError: in case file exists and is not a dir.
+ """
+ paths = map(lambda x: os.path.dirname(x), [
+ self._local_db_path, self._secrets_path])
+ for path in paths:
+ create_path_if_not_exists(path)
+
+ def _init_u1db_sqlcipher_backend(self):
+ """
+ Initialize the U1DB SQLCipher database for local storage.
+
+ Instantiates a modified twisted adbapi that will maintain a threadpool
+ with a u1db-sqclipher connection for each thread, and will return
+ deferreds for each u1db query.
+
+ Currently, Soledad uses the default SQLCipher cipher, i.e.
+ 'aes-256-cbc'. We use scrypt to derive a 256-bit encryption key,
+ and internally the SQLCipherDatabase initialization uses the 'raw
+ PRAGMA key' format to handle the key to SQLCipher.
+ """
+ tohex = binascii.b2a_hex
+ # sqlcipher only accepts the hex version
+ key = tohex(self._secrets.local_key)
+
+ opts = sqlcipher.SQLCipherOptions(
+ self._local_db_path, key,
+ is_raw_key=True, create=True)
+ self._sqlcipher_opts = opts
+ self._dbpool = adbapi.getConnectionPool(opts)
+
+ def _init_u1db_syncer(self):
+ """
+ Initialize the U1DB synchronizer.
+ """
+ replica_uid = self._dbpool.replica_uid
+ self._dbsyncer = sqlcipher.SQLCipherU1DBSync(
+ self._sqlcipher_opts, self._crypto, replica_uid,
+ SOLEDAD_CERT)
+
+ def sync_stats(self):
+ sync_phase = 0
+ if getattr(self._dbsyncer, 'sync_phase', None):
+ sync_phase = self._dbsyncer.sync_phase[0]
+ sync_exchange_phase = 0
+ if getattr(self._dbsyncer, 'syncer', None):
+ if getattr(self._dbsyncer.syncer, 'sync_exchange_phase', None):
+ _p = self._dbsyncer.syncer.sync_exchange_phase[0]
+ sync_exchange_phase = _p
+ return sync_phase, sync_exchange_phase
+
+ def _init_blobmanager(self):
+ path = os.path.join(os.path.dirname(self._local_db_path), 'blobs')
+ url = urlparse.urljoin(self.server_url, 'blobs/%s' % uuid)
+ key = self._secrets.local_key
+ self.blobmanager = blobs.BlobManager(path, url, key, self.uuid,
+ self.token, SOLEDAD_CERT)
+
+ #
+ # Closing methods
+ #
+
+ def close(self):
+ """
+ Close underlying U1DB database.
+ """
+ logger.debug("closing soledad")
+ self._dbpool.close()
+ self.blobmanager.close()
+ if getattr(self, '_dbsyncer', None):
+ self._dbsyncer.close()
+
+ #
+ # ILocalStorage
+ #
+
+ def _defer(self, meth, *args, **kw):
+ """
+ Defer a method to be run on a U1DB connection pool.
+
+ :param meth: A method to defer to the U1DB connection pool.
+ :type meth: callable
+ :return: A deferred.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ return self._dbpool.runU1DBQuery(meth, *args, **kw)
+
+ def put_doc(self, doc):
+ """
+ Update a document.
+
+ If the document currently has conflicts, put will fail.
+ If the database specifies a maximum document size and the document
+ exceeds it, put will fail and raise a DocumentTooBig exception.
+
+ ============================== WARNING ==============================
+ This method converts the document's contents to unicode in-place. This
+ means that after calling `put_doc(doc)`, the contents of the
+ document, i.e. `doc.content`, might be different from before the
+ call.
+ ============================== WARNING ==============================
+
+ :param doc: A document with new content.
+ :type doc: leap.soledad.common.document.Document
+ :return: A deferred whose callback will be invoked with the new
+ revision identifier for the document. The document object will
+ also be updated.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ d = self._defer("put_doc", doc)
+ return d
+
+ def delete_doc(self, doc):
+ """
+ Mark a document as deleted.
+
+ Will abort if the current revision doesn't match doc.rev.
+ This will also set doc.content to None.
+
+ :param doc: A document to be deleted.
+ :type doc: leap.soledad.common.document.Document
+ :return: A deferred.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ soledad_assert(doc is not None, "delete_doc doesn't accept None.")
+ return self._defer("delete_doc", doc)
+
+ def get_doc(self, doc_id, include_deleted=False):
+ """
+ Get the JSON string for the given document.
+
+ :param doc_id: The unique document identifier
+ :type doc_id: str
+ :param include_deleted: If set to True, deleted documents will be
+ returned with empty content. Otherwise asking for a deleted
+ document will return None.
+ :type include_deleted: bool
+ :return: A deferred whose callback will be invoked with a document
+ object.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ return self._defer(
+ "get_doc", doc_id, include_deleted=include_deleted)
+
+ def get_docs(
+ self, doc_ids, check_for_conflicts=True, include_deleted=False):
+ """
+ Get the JSON content for many documents.
+
+ :param doc_ids: A list of document identifiers.
+ :type doc_ids: list
+ :param check_for_conflicts: If set to False, then the conflict check
+ will be skipped, and 'None' will be returned instead of True/False.
+ :type check_for_conflicts: bool
+ :param include_deleted: If set to True, deleted documents will be
+ returned with empty content. Otherwise deleted documents will not
+ be included in the results.
+ :type include_deleted: bool
+ :return: A deferred whose callback will be invoked with an iterable
+ giving the document object for each document id in matching
+ doc_ids order.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ return self._defer(
+ "get_docs", doc_ids, check_for_conflicts=check_for_conflicts,
+ include_deleted=include_deleted)
+
+ def get_all_docs(self, include_deleted=False):
+ """
+ Get the JSON content for all documents in the database.
+
+ :param include_deleted: If set to True, deleted documents will be
+ returned with empty content. Otherwise deleted documents will not
+ be included in the results.
+ :type include_deleted: bool
+
+ :return: A deferred which, when fired, will pass the a tuple
+ containing (generation, [Document]) to the callback, with the
+ current generation of the database, followed by a list of all the
+ documents in the database.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ return self._defer("get_all_docs", include_deleted)
+
+ @defer.inlineCallbacks
+ def create_doc(self, content, doc_id=None):
+ """
+ Create a new document.
+
+ You can optionally specify the document identifier, but the document
+ must not already exist. See 'put_doc' if you want to override an
+ existing document.
+ If the database specifies a maximum document size and the document
+ exceeds it, create will fail and raise a DocumentTooBig exception.
+
+ :param content: A Python dictionary.
+ :type content: dict
+ :param doc_id: An optional identifier specifying the document id.
+ :type doc_id: str
+ :return: A deferred whose callback will be invoked with a document.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ # TODO we probably should pass an optional "encoding" parameter to
+ # create_doc (and probably to put_doc too). There are cases (mail
+ # payloads for example) in which we already have the encoding in the
+ # headers, so we don't need to guess it.
+ doc = yield self._defer("create_doc", content, doc_id=doc_id)
+ doc.set_store(self)
+ defer.returnValue(doc)
+
+ def create_doc_from_json(self, json, doc_id=None):
+ """
+ Create a new document.
+
+ You can optionally specify the document identifier, but the document
+ must not already exist. See 'put_doc' if you want to override an
+ existing document.
+ If the database specifies a maximum document size and the document
+ exceeds it, create will fail and raise a DocumentTooBig exception.
+
+ :param json: The JSON document string
+ :type json: dict
+ :param doc_id: An optional identifier specifying the document id.
+ :type doc_id: str
+ :return: A deferred whose callback will be invoked with a document.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ return self._defer("create_doc_from_json", json, doc_id=doc_id)
+
+ def create_index(self, index_name, *index_expressions):
+ """
+ Create a named index, which can then be queried for future lookups.
+
+ Creating an index which already exists is not an error, and is cheap.
+ Creating an index which does not match the index_expressions of the
+ existing index is an error.
+ Creating an index will block until the expressions have been evaluated
+ and the index generated.
+
+ :param index_name: A unique name which can be used as a key prefix
+ :type index_name: str
+ :param index_expressions: index expressions defining the index
+ information.
+
+ Examples:
+
+ "fieldname", or "fieldname.subfieldname" to index alphabetically
+ sorted on the contents of a field.
+
+ "number(fieldname, width)", "lower(fieldname)"
+ :type index_expresions: list of str
+ :return: A deferred.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ return self._defer("create_index", index_name, *index_expressions)
+
+ def delete_index(self, index_name):
+ """
+ Remove a named index.
+
+ :param index_name: The name of the index we are removing
+ :type index_name: str
+ :return: A deferred.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ return self._defer("delete_index", index_name)
+
+ def list_indexes(self):
+ """
+ List the definitions of all known indexes.
+
+ :return: A deferred whose callback will be invoked with a list of
+ [('index-name', ['field', 'field2'])] definitions.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ return self._defer("list_indexes")
+
+ def get_from_index(self, index_name, *key_values):
+ """
+ Return documents that match the keys supplied.
+
+ You must supply exactly the same number of values as have been defined
+ in the index. It is possible to do a prefix match by using '*' to
+ indicate a wildcard match. You can only supply '*' to trailing entries,
+ (eg 'val', '*', '*' is allowed, but '*', 'val', 'val' is not.)
+ It is also possible to append a '*' to the last supplied value (eg
+ 'val*', '*', '*' or 'val', 'val*', '*', but not 'val*', 'val', '*')
+
+ :param index_name: The index to query
+ :type index_name: str
+ :param key_values: values to match. eg, if you have
+ an index with 3 fields then you would have:
+ get_from_index(index_name, val1, val2, val3)
+ :type key_values: list
+ :return: A deferred whose callback will be invoked with a list of
+ [Document].
+ :rtype: twisted.internet.defer.Deferred
+ """
+ return self._defer("get_from_index", index_name, *key_values)
+
+ def get_count_from_index(self, index_name, *key_values):
+ """
+ Return the count for a given combination of index_name
+ and key values.
+
+ Extension method made from similar methods in u1db version 13.09
+
+ :param index_name: The index to query
+ :type index_name: str
+ :param key_values: values to match. eg, if you have
+ an index with 3 fields then you would have:
+ get_from_index(index_name, val1, val2, val3)
+ :type key_values: tuple
+ :return: A deferred whose callback will be invoked with the count.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ return self._defer("get_count_from_index", index_name, *key_values)
+
+ def get_range_from_index(self, index_name, start_value, end_value):
+ """
+ Return documents that fall within the specified range.
+
+ Both ends of the range are inclusive. For both start_value and
+ end_value, one must supply exactly the same number of values as have
+ been defined in the index, or pass None. In case of a single column
+ index, a string is accepted as an alternative for a tuple with a single
+ value. It is possible to do a prefix match by using '*' to indicate
+ a wildcard match. You can only supply '*' to trailing entries, (eg
+ 'val', '*', '*' is allowed, but '*', 'val', 'val' is not.) It is also
+ possible to append a '*' to the last supplied value (eg 'val*', '*',
+ '*' or 'val', 'val*', '*', but not 'val*', 'val', '*')
+
+ :param index_name: The index to query
+ :type index_name: str
+ :param start_values: tuples of values that define the lower bound of
+ the range. eg, if you have an index with 3 fields then you would
+ have: (val1, val2, val3)
+ :type start_values: tuple
+ :param end_values: tuples of values that define the upper bound of the
+ range. eg, if you have an index with 3 fields then you would have:
+ (val1, val2, val3)
+ :type end_values: tuple
+ :return: A deferred whose callback will be invoked with a list of
+ [Document].
+ :rtype: twisted.internet.defer.Deferred
+ """
+
+ return self._defer(
+ "get_range_from_index", index_name, start_value, end_value)
+
+ def get_index_keys(self, index_name):
+ """
+ Return all keys under which documents are indexed in this index.
+
+ :param index_name: The index to query
+ :type index_name: str
+ :return: A deferred whose callback will be invoked with a list of
+ tuples of indexed keys.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ return self._defer("get_index_keys", index_name)
+
+ def get_doc_conflicts(self, doc_id):
+ """
+ Get the list of conflicts for the given document.
+
+ The order of the conflicts is such that the first entry is the value
+ that would be returned by "get_doc".
+
+ :param doc_id: The unique document identifier
+ :type doc_id: str
+ :return: A deferred whose callback will be invoked with a list of the
+ Document entries that are conflicted.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ return self._defer("get_doc_conflicts", doc_id)
+
+ def resolve_doc(self, doc, conflicted_doc_revs):
+ """
+ Mark a document as no longer conflicted.
+
+ We take the list of revisions that the client knows about that it is
+ superseding. This may be a different list from the actual current
+ conflicts, in which case only those are removed as conflicted. This
+ may fail if the conflict list is significantly different from the
+ supplied information. (sync could have happened in the background from
+ the time you GET_DOC_CONFLICTS until the point where you RESOLVE)
+
+ :param doc: A Document with the new content to be inserted.
+ :type doc: Document
+ :param conflicted_doc_revs: A list of revisions that the new content
+ supersedes.
+ :type conflicted_doc_revs: list(str)
+ :return: A deferred.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ return self._defer("resolve_doc", doc, conflicted_doc_revs)
+
+ @property
+ def local_db_path(self):
+ return self._local_db_path
+
+ @property
+ def userid(self):
+ return self.uuid
+
+ #
+ # ISyncableStorage
+ #
+
+ def sync(self):
+ """
+ Synchronize documents with the server replica.
+
+ This method uses a lock to prevent multiple concurrent sync processes
+ over the same local db file.
+
+ :return: A deferred lock that will run the actual sync process when
+ the lock is acquired, and which will fire with with the local
+ generation before the synchronization was performed.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ # maybe bypass sync
+ # TODO: That's because bitmask may not provide us a token, but
+ # this should be handled on the caller side. Here, calling us without
+ # a token is a real error.
+ if not self.token:
+ generation = self._dbsyncer.get_generation()
+ return defer.succeed(generation)
+
+ d = self.sync_lock.run(
+ self._sync)
+ return d
+
+ def _sync(self):
+ """
+ Synchronize documents with the server replica.
+
+ :return: A deferred whose callback will be invoked with the local
+ generation before the synchronization was performed.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ sync_url = urlparse.urljoin(self.server_url, 'user-%s' % self.uuid)
+ if not self._dbsyncer:
+ return
+ creds = {'token': {'uuid': self.uuid, 'token': self.token}}
+ d = self._dbsyncer.sync(sync_url, creds=creds)
+
+ def _sync_callback(local_gen):
+ self._last_received_docs = docs = self._dbsyncer.received_docs
+
+ # Post-Sync Hooks
+ if docs:
+ iface = soledad_interfaces.ISoledadPostSyncPlugin
+ suitable_plugins = collect_plugins(iface)
+ for plugin in suitable_plugins:
+ watched = plugin.watched_doc_types
+ r = [filter(
+ lambda s: s.startswith(preffix),
+ docs) for preffix in watched]
+ filtered = list(chain(*r))
+ plugin.process_received_docs(filtered)
+
+ return local_gen
+
+ def _sync_errback(failure):
+ s = StringIO()
+ failure.printDetailedTraceback(file=s)
+ msg = "got exception when syncing!\n" + s.getvalue()
+ logger.error(msg)
+ return failure
+
+ def _emit_done_data_sync(passthrough):
+ user_data = {'uuid': self.uuid, 'userid': self.userid}
+ soledad_events.emit_async(
+ soledad_events.SOLEDAD_DONE_DATA_SYNC, user_data)
+ return passthrough
+
+ d.addCallbacks(_sync_callback, _sync_errback)
+ d.addCallback(_emit_done_data_sync)
+ return d
+
+ @property
+ def sync_lock(self):
+ """
+ Class based lock to prevent concurrent syncs using the same local db
+ file.
+
+ :return: A shared lock based on this instance's db file path.
+ :rtype: DeferredLock
+ """
+ return self._sync_lock[self._local_db_path]
+
+ @property
+ def syncing(self):
+ """
+ Return wether Soledad is currently synchronizing with the server.
+
+ :return: Wether Soledad is currently synchronizing with the server.
+ :rtype: bool
+ """
+ return self.sync_lock.locked
+
+ #
+ # ISecretsStorage
+ #
+
+ @property
+ def secrets(self):
+ """
+ Return the secrets object.
+
+ :return: The secrets object.
+ :rtype: Secrets
+ """
+ return self._secrets
+
+ def change_passphrase(self, new_passphrase):
+ """
+ Change the passphrase that encrypts the storage secret.
+
+ :param new_passphrase: The new passphrase.
+ :type new_passphrase: unicode
+
+ :raise NoStorageSecret: Raised if there's no storage secret available.
+ """
+ self.passphrase = new_passphrase
+ self._secrets.store_secrets()
+
+ #
+ # Raw SQLCIPHER Queries
+ #
+
+ def raw_sqlcipher_query(self, *args, **kw):
+ """
+ Run a raw sqlcipher query in the local database, and return a deferred
+ that will be fired with the result.
+ """
+ return self._dbpool.runQuery(*args, **kw)
+
+ def raw_sqlcipher_operation(self, *args, **kw):
+ """
+ Run a raw sqlcipher operation in the local database, and return a
+ deferred that will be fired with None.
+ """
+ return self._dbpool.runOperation(*args, **kw)
+
+ #
+ # Service authentication
+ #
+
+ @defer.inlineCallbacks
+ def get_or_create_service_token(self, service):
+ """
+ Return the stored token for a given service, or generates and stores a
+ random one if it does not exist.
+
+ These tokens can be used to authenticate services.
+ """
+ # FIXME this could use the local sqlcipher database, to avoid
+ # problems with different replicas creating different tokens.
+
+ yield self.create_index('by-servicetoken', 'type', 'service')
+ docs = yield self._get_token_for_service(service)
+ if docs:
+ doc = docs[0]
+ defer.returnValue(doc.content['token'])
+ else:
+ token = str(uuid.uuid4()).replace('-', '')[-24:]
+ yield self._set_token_for_service(service, token)
+ defer.returnValue(token)
+
+ def _get_token_for_service(self, service):
+ return self.get_from_index('by-servicetoken', 'servicetoken', service)
+
+ def _set_token_for_service(self, service, token):
+ doc = {'type': 'servicetoken', 'service': service, 'token': token}
+ return self.create_doc(doc)
+
+ def create_recovery_code(self):
+ return self._recovery_code.generate()
+
+
+def create_path_if_not_exists(path):
+ try:
+ if not os.path.isdir(path):
+ logger.info('creating directory: %s.' % path)
+ os.makedirs(path)
+ except OSError as exc:
+ if exc.errno == errno.EEXIST and os.path.isdir(path):
+ pass
+ else:
+ raise
+
+# ----------------------------------------------------------------------------
+# Monkey patching u1db to be able to provide a custom SSL cert
+# ----------------------------------------------------------------------------
+
+
+# We need a more reasonable timeout (in seconds)
+SOLEDAD_TIMEOUT = 120
+
+
+class VerifiedHTTPSConnection(httplib.HTTPSConnection):
+ """
+ HTTPSConnection verifying server side certificates.
+ """
+ # derived from httplib.py
+
+ def connect(self):
+ """
+ Connect to a host on a given (SSL) port.
+ """
+ try:
+ source = self.source_address
+ sock = socket.create_connection((self.host, self.port),
+ SOLEDAD_TIMEOUT, source)
+ except AttributeError:
+ # source_address was introduced in 2.7
+ sock = socket.create_connection((self.host, self.port),
+ SOLEDAD_TIMEOUT)
+ if self._tunnel_host:
+ self.sock = sock
+ self._tunnel()
+
+ self.sock = ssl.wrap_socket(sock,
+ ca_certs=SOLEDAD_CERT,
+ cert_reqs=ssl.CERT_REQUIRED)
+ match_hostname(self.sock.getpeercert(), self.host)
+
+
+old__VerifiedHTTPSConnection = http_client._VerifiedHTTPSConnection
+http_client._VerifiedHTTPSConnection = VerifiedHTTPSConnection
diff --git a/src/leap/soledad/client/auth.py b/src/leap/soledad/client/auth.py
new file mode 100644
index 00000000..78e9bf1b
--- /dev/null
+++ b/src/leap/soledad/client/auth.py
@@ -0,0 +1,69 @@
+# -*- coding: utf-8 -*-
+# auth.py
+# Copyright (C) 2013 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Methods for token-based authentication.
+
+These methods have to be included in all classes that extend HTTPClient so
+they can do token-based auth requests to the Soledad server.
+"""
+import base64
+
+from leap.soledad.common.l2db import errors
+
+
+class TokenBasedAuth(object):
+ """
+ Encapsulate token-auth methods for classes that inherit from
+ u1db.remote.http_client.HTTPClient.
+ """
+
+ def set_token_credentials(self, uuid, token):
+ """
+ Store given credentials so we can sign the request later.
+
+ :param uuid: The user's uuid.
+ :type uuid: str
+ :param token: The authentication token.
+ :type token: str
+ """
+ self._creds = {'token': (uuid, token)}
+
+ def _sign_request(self, method, url_query, params):
+ """
+ Return an authorization header to be included in the HTTP request, in
+ the form:
+
+ [('Authorization', 'Token <(base64 encoded) uuid:token>')]
+
+ :param method: The HTTP method.
+ :type method: str
+ :param url_query: The URL query string.
+ :type url_query: str
+ :param params: A list with encoded query parameters.
+ :type param: list
+
+ :return: The Authorization header.
+ :rtype: list of tuple
+ """
+ if 'token' in self._creds:
+ uuid, token = self._creds['token']
+ auth = '%s:%s' % (uuid, token)
+ b64_token = base64.b64encode(auth)
+ return [('Authorization', 'Token %s' % b64_token)]
+ else:
+ raise errors.UnknownAuthMethod(
+ 'Wrong credentials: %s' % self._creds)
diff --git a/src/leap/soledad/client/crypto.py b/src/leap/soledad/client/crypto.py
new file mode 100644
index 00000000..0f19c964
--- /dev/null
+++ b/src/leap/soledad/client/crypto.py
@@ -0,0 +1,448 @@
+# -*- coding: utf-8 -*-
+# crypto.py
+# Copyright (C) 2013, 2014 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Cryptographic utilities for Soledad.
+"""
+import os
+import binascii
+import hmac
+import hashlib
+import json
+
+from cryptography.hazmat.backends import default_backend
+from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
+
+from leap.soledad.common import soledad_assert
+from leap.soledad.common import soledad_assert_type
+from leap.soledad.common import crypto
+from leap.soledad.common.log import getLogger
+import warnings
+
+
+logger = getLogger(__name__)
+warnings.warn("'soledad.client.crypto' MODULE DEPRECATED",
+ DeprecationWarning, stacklevel=2)
+
+
+MAC_KEY_LENGTH = 64
+
+crypto_backend = default_backend()
+
+
+def encrypt_sym(data, key):
+ """
+ Encrypt data using AES-256 cipher in CTR mode.
+
+ :param data: The data to be encrypted.
+ :type data: str
+ :param key: The key used to encrypt data (must be 256 bits long).
+ :type key: str
+
+ :return: A tuple with the initialization vector and the encrypted data.
+ :rtype: (long, str)
+ """
+ soledad_assert_type(key, str)
+ soledad_assert(
+ len(key) == 32, # 32 x 8 = 256 bits.
+ 'Wrong key size: %s bits (must be 256 bits long).' %
+ (len(key) * 8))
+
+ iv = os.urandom(16)
+ cipher = Cipher(algorithms.AES(key), modes.CTR(iv), backend=crypto_backend)
+ encryptor = cipher.encryptor()
+ ciphertext = encryptor.update(data) + encryptor.finalize()
+
+ return binascii.b2a_base64(iv), ciphertext
+
+
+def decrypt_sym(data, key, iv):
+ """
+ Decrypt some data previously encrypted using AES-256 cipher in CTR mode.
+
+ :param data: The data to be decrypted.
+ :type data: str
+ :param key: The symmetric key used to decrypt data (must be 256 bits
+ long).
+ :type key: str
+ :param iv: The initialization vector.
+ :type iv: long
+
+ :return: The decrypted data.
+ :rtype: str
+ """
+ soledad_assert_type(key, str)
+ # assert params
+ soledad_assert(
+ len(key) == 32, # 32 x 8 = 256 bits.
+ 'Wrong key size: %s (must be 256 bits long).' % len(key))
+ iv = binascii.a2b_base64(iv)
+ cipher = Cipher(algorithms.AES(key), modes.CTR(iv), backend=crypto_backend)
+ decryptor = cipher.decryptor()
+ return decryptor.update(data) + decryptor.finalize()
+
+
+def doc_mac_key(doc_id, secret):
+ """
+ Generate a key for calculating a MAC for a document whose id is
+ C{doc_id}.
+
+ The key is derived using HMAC having sha256 as underlying hash
+ function. The key used for HMAC is the first MAC_KEY_LENGTH characters
+ of Soledad's storage secret. The HMAC message is C{doc_id}.
+
+ :param doc_id: The id of the document.
+ :type doc_id: str
+
+ :param secret: The Soledad storage secret
+ :type secret: str
+
+ :return: The key.
+ :rtype: str
+ """
+ soledad_assert(secret is not None)
+ return hmac.new(
+ secret[:MAC_KEY_LENGTH],
+ doc_id,
+ hashlib.sha256).digest()
+
+
+class SoledadCrypto(object):
+ """
+ General cryptographic functionality encapsulated in a
+ object that can be passed along.
+ """
+ def __init__(self, secret):
+ """
+ Initialize the crypto object.
+
+ :param secret: The Soledad remote storage secret.
+ :type secret: str
+ """
+ self._secret = secret
+
+ def doc_mac_key(self, doc_id):
+ return doc_mac_key(doc_id, self._secret)
+
+ def doc_passphrase(self, doc_id):
+ """
+ Generate a passphrase for symmetric encryption of document's contents.
+
+ The password is derived using HMAC having sha256 as underlying hash
+ function. The key used for HMAC are the first
+ C{soledad.REMOTE_STORAGE_SECRET_LENGTH} bytes of Soledad's storage
+ secret stripped from the first MAC_KEY_LENGTH characters. The HMAC
+ message is C{doc_id}.
+
+ :param doc_id: The id of the document that will be encrypted using
+ this passphrase.
+ :type doc_id: str
+
+ :return: The passphrase.
+ :rtype: str
+ """
+ soledad_assert(self._secret is not None)
+ return hmac.new(
+ self._secret[MAC_KEY_LENGTH:],
+ doc_id,
+ hashlib.sha256).digest()
+
+ def encrypt_doc(self, doc):
+ """
+ Wrapper around encrypt_docstr that accepts the document as argument.
+
+ :param doc: the document.
+ :type doc: Document
+ """
+ key = self.doc_passphrase(doc.doc_id)
+
+ return encrypt_docstr(
+ doc.get_json(), doc.doc_id, doc.rev, key, self._secret)
+
+ def decrypt_doc(self, doc):
+ """
+ Wrapper around decrypt_doc_dict that accepts the document as argument.
+
+ :param doc: the document.
+ :type doc: Document
+
+ :return: json string with the decrypted document
+ :rtype: str
+ """
+ key = self.doc_passphrase(doc.doc_id)
+ return decrypt_doc_dict(
+ doc.content, doc.doc_id, doc.rev, key, self._secret)
+
+ @property
+ def secret(self):
+ return self._secret
+
+
+#
+# Crypto utilities for a Document.
+#
+
+def mac_doc(doc_id, doc_rev, ciphertext, enc_scheme, enc_method, enc_iv,
+ mac_method, secret):
+ """
+ Calculate a MAC for C{doc} using C{ciphertext}.
+
+ Current MAC method used is HMAC, with the following parameters:
+
+ * key: sha256(storage_secret, doc_id)
+ * msg: doc_id + doc_rev + ciphertext
+ * digestmod: sha256
+
+ :param doc_id: The id of the document.
+ :type doc_id: str
+ :param doc_rev: The revision of the document.
+ :type doc_rev: str
+ :param ciphertext: The content of the document.
+ :type ciphertext: str
+ :param enc_scheme: The encryption scheme.
+ :type enc_scheme: str
+ :param enc_method: The encryption method.
+ :type enc_method: str
+ :param enc_iv: The encryption initialization vector.
+ :type enc_iv: str
+ :param mac_method: The MAC method to use.
+ :type mac_method: str
+ :param secret: The Soledad storage secret
+ :type secret: str
+
+ :return: The calculated MAC.
+ :rtype: str
+
+ :raise crypto.UnknownMacMethodError: Raised when C{mac_method} is unknown.
+ """
+ try:
+ soledad_assert(mac_method == crypto.MacMethods.HMAC)
+ except AssertionError:
+ raise crypto.UnknownMacMethodError
+ template = "{doc_id}{doc_rev}{ciphertext}{enc_scheme}{enc_method}{enc_iv}"
+ content = template.format(
+ doc_id=doc_id,
+ doc_rev=doc_rev,
+ ciphertext=ciphertext,
+ enc_scheme=enc_scheme,
+ enc_method=enc_method,
+ enc_iv=enc_iv)
+ return hmac.new(
+ doc_mac_key(doc_id, secret),
+ content,
+ hashlib.sha256).digest()
+
+
+def encrypt_docstr(docstr, doc_id, doc_rev, key, secret):
+ """
+ Encrypt C{doc}'s content.
+
+ Encrypt doc's contents using AES-256 CTR mode and return a valid JSON
+ string representing the following:
+
+ {
+ crypto.ENC_JSON_KEY: '<encrypted doc JSON string>',
+ crypto.ENC_SCHEME_KEY: 'symkey',
+ crypto.ENC_METHOD_KEY: crypto.EncryptionMethods.AES_256_CTR,
+ crypto.ENC_IV_KEY: '<the initial value used to encrypt>',
+ MAC_KEY: '<mac>'
+ crypto.MAC_METHOD_KEY: 'hmac'
+ }
+
+ :param docstr: A representation of the document to be encrypted.
+ :type docstr: str or unicode.
+
+ :param doc_id: The document id.
+ :type doc_id: str
+
+ :param doc_rev: The document revision.
+ :type doc_rev: str
+
+ :param key: The key used to encrypt ``data`` (must be 256 bits long).
+ :type key: str
+
+ :param secret: The Soledad storage secret (used for MAC auth).
+ :type secret: str
+
+ :return: The JSON serialization of the dict representing the encrypted
+ content.
+ :rtype: str
+ """
+ enc_scheme = crypto.EncryptionSchemes.SYMKEY
+ enc_method = crypto.EncryptionMethods.AES_256_CTR
+ mac_method = crypto.MacMethods.HMAC
+ enc_iv, ciphertext = encrypt_sym(
+ str(docstr), # encryption/decryption routines expect str
+ key)
+ mac = binascii.b2a_hex( # store the mac as hex.
+ mac_doc(
+ doc_id,
+ doc_rev,
+ ciphertext,
+ enc_scheme,
+ enc_method,
+ enc_iv,
+ mac_method,
+ secret))
+ # Return a representation for the encrypted content. In the following, we
+ # convert binary data to hexadecimal representation so the JSON
+ # serialization does not complain about what it tries to serialize.
+ hex_ciphertext = binascii.b2a_hex(ciphertext)
+ logger.debug("encrypting doc: %s" % doc_id)
+ return json.dumps({
+ crypto.ENC_JSON_KEY: hex_ciphertext,
+ crypto.ENC_SCHEME_KEY: enc_scheme,
+ crypto.ENC_METHOD_KEY: enc_method,
+ crypto.ENC_IV_KEY: enc_iv,
+ crypto.MAC_KEY: mac,
+ crypto.MAC_METHOD_KEY: mac_method,
+ })
+
+
+def _verify_doc_mac(doc_id, doc_rev, ciphertext, enc_scheme, enc_method,
+ enc_iv, mac_method, secret, doc_mac):
+ """
+ Verify that C{doc_mac} is a correct MAC for the given document.
+
+ :param doc_id: The id of the document.
+ :type doc_id: str
+ :param doc_rev: The revision of the document.
+ :type doc_rev: str
+ :param ciphertext: The content of the document.
+ :type ciphertext: str
+ :param enc_scheme: The encryption scheme.
+ :type enc_scheme: str
+ :param enc_method: The encryption method.
+ :type enc_method: str
+ :param enc_iv: The encryption initialization vector.
+ :type enc_iv: str
+ :param mac_method: The MAC method to use.
+ :type mac_method: str
+ :param secret: The Soledad storage secret
+ :type secret: str
+ :param doc_mac: The MAC to be verified against.
+ :type doc_mac: str
+
+ :raise crypto.UnknownMacMethodError: Raised when C{mac_method} is unknown.
+ :raise crypto.WrongMacError: Raised when MAC could not be verified.
+ """
+ calculated_mac = mac_doc(
+ doc_id,
+ doc_rev,
+ ciphertext,
+ enc_scheme,
+ enc_method,
+ enc_iv,
+ mac_method,
+ secret)
+ # we compare mac's hashes to avoid possible timing attacks that might
+ # exploit python's builtin comparison operator behaviour, which fails
+ # immediatelly when non-matching bytes are found.
+ doc_mac_hash = hashlib.sha256(
+ binascii.a2b_hex( # the mac is stored as hex
+ doc_mac)).digest()
+ calculated_mac_hash = hashlib.sha256(calculated_mac).digest()
+
+ if doc_mac_hash != calculated_mac_hash:
+ logger.warn("wrong MAC while decrypting doc...")
+ raise crypto.WrongMacError("Could not authenticate document's "
+ "contents.")
+
+
+def decrypt_doc_dict(doc_dict, doc_id, doc_rev, key, secret):
+ """
+ Decrypt a symmetrically encrypted C{doc}'s content.
+
+ Return the JSON string representation of the document's decrypted content.
+
+ The passed doc_dict argument should have the following structure:
+
+ {
+ crypto.ENC_JSON_KEY: '<enc_blob>',
+ crypto.ENC_SCHEME_KEY: '<enc_scheme>',
+ crypto.ENC_METHOD_KEY: '<enc_method>',
+ crypto.ENC_IV_KEY: '<initial value used to encrypt>', # (optional)
+ MAC_KEY: '<mac>'
+ crypto.MAC_METHOD_KEY: 'hmac'
+ }
+
+ C{enc_blob} is the encryption of the JSON serialization of the document's
+ content. For now Soledad just deals with documents whose C{enc_scheme} is
+ crypto.EncryptionSchemes.SYMKEY and C{enc_method} is
+ crypto.EncryptionMethods.AES_256_CTR.
+
+ :param doc_dict: The content of the document to be decrypted.
+ :type doc_dict: dict
+
+ :param doc_id: The document id.
+ :type doc_id: str
+
+ :param doc_rev: The document revision.
+ :type doc_rev: str
+
+ :param key: The key used to encrypt ``data`` (must be 256 bits long).
+ :type key: str
+
+ :param secret: The Soledad storage secret.
+ :type secret: str
+
+ :return: The JSON serialization of the decrypted content.
+ :rtype: str
+
+ :raise UnknownEncryptionMethodError: Raised when trying to decrypt from an
+ unknown encryption method.
+ """
+ # assert document dictionary structure
+ expected_keys = set([
+ crypto.ENC_JSON_KEY,
+ crypto.ENC_SCHEME_KEY,
+ crypto.ENC_METHOD_KEY,
+ crypto.ENC_IV_KEY,
+ crypto.MAC_KEY,
+ crypto.MAC_METHOD_KEY,
+ ])
+ soledad_assert(expected_keys.issubset(set(doc_dict.keys())))
+
+ ciphertext = binascii.a2b_hex(doc_dict[crypto.ENC_JSON_KEY])
+ enc_scheme = doc_dict[crypto.ENC_SCHEME_KEY]
+ enc_method = doc_dict[crypto.ENC_METHOD_KEY]
+ enc_iv = doc_dict[crypto.ENC_IV_KEY]
+ doc_mac = doc_dict[crypto.MAC_KEY]
+ mac_method = doc_dict[crypto.MAC_METHOD_KEY]
+
+ soledad_assert(enc_scheme == crypto.EncryptionSchemes.SYMKEY)
+
+ _verify_doc_mac(
+ doc_id, doc_rev, ciphertext, enc_scheme, enc_method,
+ enc_iv, mac_method, secret, doc_mac)
+
+ return decrypt_sym(ciphertext, key, enc_iv)
+
+
+def is_symmetrically_encrypted(doc):
+ """
+ Return True if the document was symmetrically encrypted.
+
+ :param doc: The document to check.
+ :type doc: Document
+
+ :rtype: bool
+ """
+ if doc.content and crypto.ENC_SCHEME_KEY in doc.content:
+ if doc.content[crypto.ENC_SCHEME_KEY] \
+ == crypto.EncryptionSchemes.SYMKEY:
+ return True
+ return False
diff --git a/src/leap/soledad/client/events.py b/src/leap/soledad/client/events.py
new file mode 100644
index 00000000..058be59c
--- /dev/null
+++ b/src/leap/soledad/client/events.py
@@ -0,0 +1,54 @@
+# -*- coding: utf-8 -*-
+# signal.py
+# Copyright (C) 2014 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+"""
+Signaling functions.
+"""
+
+from leap.common.events import emit_async
+from leap.common.events import catalog
+
+
+SOLEDAD_CREATING_KEYS = catalog.SOLEDAD_CREATING_KEYS
+SOLEDAD_DONE_CREATING_KEYS = catalog.SOLEDAD_DONE_CREATING_KEYS
+SOLEDAD_DOWNLOADING_KEYS = catalog.SOLEDAD_DOWNLOADING_KEYS
+SOLEDAD_DONE_DOWNLOADING_KEYS = \
+ catalog.SOLEDAD_DONE_DOWNLOADING_KEYS
+SOLEDAD_UPLOADING_KEYS = catalog.SOLEDAD_UPLOADING_KEYS
+SOLEDAD_DONE_UPLOADING_KEYS = \
+ catalog.SOLEDAD_DONE_UPLOADING_KEYS
+SOLEDAD_NEW_DATA_TO_SYNC = catalog.SOLEDAD_NEW_DATA_TO_SYNC
+SOLEDAD_DONE_DATA_SYNC = catalog.SOLEDAD_DONE_DATA_SYNC
+SOLEDAD_SYNC_SEND_STATUS = catalog.SOLEDAD_SYNC_SEND_STATUS
+SOLEDAD_SYNC_RECEIVE_STATUS = catalog.SOLEDAD_SYNC_RECEIVE_STATUS
+
+
+__all__ = [
+ "catalog",
+ "emit_async",
+ "SOLEDAD_CREATING_KEYS",
+ "SOLEDAD_DONE_CREATING_KEYS",
+ "SOLEDAD_DOWNLOADING_KEYS",
+ "SOLEDAD_DONE_DOWNLOADING_KEYS",
+ "SOLEDAD_UPLOADING_KEYS",
+ "SOLEDAD_DONE_UPLOADING_KEYS",
+ "SOLEDAD_NEW_DATA_TO_SYNC",
+ "SOLEDAD_DONE_DATA_SYNC",
+ "SOLEDAD_SYNC_SEND_STATUS",
+ "SOLEDAD_SYNC_RECEIVE_STATUS",
+]
diff --git a/src/leap/soledad/client/examples/README b/src/leap/soledad/client/examples/README
new file mode 100644
index 00000000..3aed8377
--- /dev/null
+++ b/src/leap/soledad/client/examples/README
@@ -0,0 +1,4 @@
+Right now, you can find here both an example of use
+and the benchmarking scripts.
+TODO move benchmark scripts to root scripts/ folder,
+and leave here only a minimal example.
diff --git a/src/leap/soledad/client/examples/benchmarks/.gitignore b/src/leap/soledad/client/examples/benchmarks/.gitignore
new file mode 100644
index 00000000..2211df63
--- /dev/null
+++ b/src/leap/soledad/client/examples/benchmarks/.gitignore
@@ -0,0 +1 @@
+*.txt
diff --git a/src/leap/soledad/client/examples/benchmarks/get_sample.sh b/src/leap/soledad/client/examples/benchmarks/get_sample.sh
new file mode 100755
index 00000000..1995eee1
--- /dev/null
+++ b/src/leap/soledad/client/examples/benchmarks/get_sample.sh
@@ -0,0 +1,3 @@
+#!/bin/sh
+mkdir tmp
+wget http://www.gutenberg.org/cache/epub/101/pg101.txt -O hacker_crackdown.txt
diff --git a/src/leap/soledad/client/examples/benchmarks/measure_index_times.py b/src/leap/soledad/client/examples/benchmarks/measure_index_times.py
new file mode 100644
index 00000000..f9349758
--- /dev/null
+++ b/src/leap/soledad/client/examples/benchmarks/measure_index_times.py
@@ -0,0 +1,179 @@
+# -*- coding: utf-8 -*-
+# measure_index_times.py
+# Copyright (C) 2014 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Measure u1db retrieval times for different u1db index situations.
+"""
+from __future__ import print_function
+from functools import partial
+import datetime
+import hashlib
+import os
+import sys
+
+from twisted.internet import defer, reactor
+
+from leap.soledad.common import l2db
+from leap.soledad.client import adbapi
+from leap.soledad.client._db.sqlcipher import SQLCipherOptions
+
+
+folder = os.environ.get("TMPDIR", "tmp")
+numdocs = int(os.environ.get("DOCS", "1000"))
+silent = os.environ.get("SILENT", False)
+tmpdb = os.path.join(folder, "test.soledad")
+
+
+sample_file = os.environ.get("SAMPLE", "hacker_crackdown.txt")
+sample_path = os.path.join(os.curdir, sample_file)
+
+try:
+ with open(sample_file) as f:
+ SAMPLE = f.readlines()
+except Exception:
+ print("[!] Problem opening sample file. Did you download "
+ "the sample, or correctly set 'SAMPLE' env var?")
+ sys.exit(1)
+
+if numdocs > len(SAMPLE):
+ print("[!] Sorry! The requested DOCS number is larger than "
+ "the num of lines in our sample file")
+ sys.exit(1)
+
+
+def debug(*args):
+ if not silent:
+ print(*args)
+
+
+debug("[+] db path:", tmpdb)
+debug("[+] num docs", numdocs)
+
+if os.path.isfile(tmpdb):
+ debug("[+] Removing existing db file...")
+ os.remove(tmpdb)
+
+start_time = datetime.datetime.now()
+
+opts = SQLCipherOptions(tmpdb, "secret", create=True)
+dbpool = adbapi.getConnectionPool(opts)
+
+
+def createDoc(doc):
+ return dbpool.runU1DBQuery("create_doc", doc)
+
+
+db_indexes = {
+ 'by-chash': ['chash'],
+ 'by-number': ['number']}
+
+
+def create_indexes(_):
+ deferreds = []
+ for index, definition in db_indexes.items():
+ d = dbpool.runU1DBQuery("create_index", index, *definition)
+ deferreds.append(d)
+ return defer.gatherResults(deferreds)
+
+
+class TimeWitness(object):
+ def __init__(self, init_time):
+ self.init_time = init_time
+
+ def get_time_count(self):
+ return datetime.datetime.now() - self.init_time
+
+
+def get_from_index(_):
+ init_time = datetime.datetime.now()
+ debug("GETTING FROM INDEX...", init_time)
+
+ def printValue(res, time):
+ print("RESULT->", res)
+ print("Index Query Took: ", time.get_time_count())
+ return res
+
+ d = dbpool.runU1DBQuery(
+ "get_from_index", "by-chash",
+ # "1150c7f10fabce0a57ce13071349fc5064f15bdb0cc1bf2852f74ef3f103aff5")
+ # XXX this is line 89 from the hacker crackdown...
+ # Should accept any other optional hash as an enviroment variable.
+ "57793320d4997a673fc7062652da0596c36a4e9fbe31310d2281e67d56d82469")
+ d.addCallback(printValue, TimeWitness(init_time))
+ return d
+
+
+def getAllDocs():
+ return dbpool.runU1DBQuery("get_all_docs")
+
+
+def errBack(e):
+ debug("[!] ERROR FOUND!!!")
+ e.printTraceback()
+ reactor.stop()
+
+
+def countDocs(_):
+ debug("counting docs...")
+ d = getAllDocs()
+ d.addCallbacks(printResult, errBack)
+ d.addCallbacks(allDone, errBack)
+ return d
+
+
+def printResult(r, **kwargs):
+ if kwargs:
+ debug(*kwargs.values())
+ elif isinstance(r, l2db.Document):
+ debug(r.doc_id, r.content['number'])
+ else:
+ len_results = len(r[1])
+ debug("GOT %s results" % len(r[1]))
+
+ if len_results == numdocs:
+ debug("ALL GOOD")
+ else:
+ debug("[!] MISSING DOCS!!!!!")
+ raise ValueError("We didn't expect this result len")
+
+
+def allDone(_):
+ debug("ALL DONE!")
+
+ end_time = datetime.datetime.now()
+ print((end_time - start_time).total_seconds())
+ reactor.stop()
+
+
+def insert_docs(_):
+ deferreds = []
+ for i in range(numdocs):
+ payload = SAMPLE[i]
+ chash = hashlib.sha256(payload).hexdigest()
+ doc = {"number": i, "payload": payload, 'chash': chash}
+ d = createDoc(doc)
+ d.addCallbacks(partial(printResult, i=i, chash=chash, payload=payload),
+ lambda e: e.printTraceback())
+ deferreds.append(d)
+ return defer.gatherResults(deferreds, consumeErrors=True)
+
+
+d = create_indexes(None)
+d.addCallback(insert_docs)
+d.addCallback(get_from_index)
+d.addCallback(countDocs)
+
+reactor.run()
diff --git a/src/leap/soledad/client/examples/benchmarks/measure_index_times_custom_docid.py b/src/leap/soledad/client/examples/benchmarks/measure_index_times_custom_docid.py
new file mode 100644
index 00000000..4f273c64
--- /dev/null
+++ b/src/leap/soledad/client/examples/benchmarks/measure_index_times_custom_docid.py
@@ -0,0 +1,179 @@
+# -*- coding: utf-8 -*-
+# measure_index_times.py
+# Copyright (C) 2014 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Measure u1db retrieval times for different u1db index situations.
+"""
+from __future__ import print_function
+from functools import partial
+import datetime
+import hashlib
+import os
+import sys
+
+from twisted.internet import defer, reactor
+
+from leap.soledad.client import adbapi
+from leap.soledad.client._db.sqlcipher import SQLCipherOptions
+from leap.soledad.common import l2db
+
+
+folder = os.environ.get("TMPDIR", "tmp")
+numdocs = int(os.environ.get("DOCS", "1000"))
+silent = os.environ.get("SILENT", False)
+tmpdb = os.path.join(folder, "test.soledad")
+
+
+sample_file = os.environ.get("SAMPLE", "hacker_crackdown.txt")
+sample_path = os.path.join(os.curdir, sample_file)
+
+try:
+ with open(sample_file) as f:
+ SAMPLE = f.readlines()
+except Exception:
+ print("[!] Problem opening sample file. Did you download "
+ "the sample, or correctly set 'SAMPLE' env var?")
+ sys.exit(1)
+
+if numdocs > len(SAMPLE):
+ print("[!] Sorry! The requested DOCS number is larger than "
+ "the num of lines in our sample file")
+ sys.exit(1)
+
+
+def debug(*args):
+ if not silent:
+ print(*args)
+
+
+debug("[+] db path:", tmpdb)
+debug("[+] num docs", numdocs)
+
+if os.path.isfile(tmpdb):
+ debug("[+] Removing existing db file...")
+ os.remove(tmpdb)
+
+start_time = datetime.datetime.now()
+
+opts = SQLCipherOptions(tmpdb, "secret", create=True)
+dbpool = adbapi.getConnectionPool(opts)
+
+
+def createDoc(doc, doc_id):
+ return dbpool.runU1DBQuery("create_doc", doc, doc_id=doc_id)
+
+
+db_indexes = {
+ 'by-chash': ['chash'],
+ 'by-number': ['number']}
+
+
+def create_indexes(_):
+ deferreds = []
+ for index, definition in db_indexes.items():
+ d = dbpool.runU1DBQuery("create_index", index, *definition)
+ deferreds.append(d)
+ return defer.gatherResults(deferreds)
+
+
+class TimeWitness(object):
+ def __init__(self, init_time):
+ self.init_time = init_time
+
+ def get_time_count(self):
+ return datetime.datetime.now() - self.init_time
+
+
+def get_from_index(_):
+ init_time = datetime.datetime.now()
+ debug("GETTING FROM INDEX...", init_time)
+
+ def printValue(res, time):
+ print("RESULT->", res)
+ print("Index Query Took: ", time.get_time_count())
+ return res
+
+ d = dbpool.runU1DBQuery(
+ "get_doc",
+ # "1150c7f10fabce0a57ce13071349fc5064f15bdb0cc1bf2852f74ef3f103aff5")
+ # XXX this is line 89 from the hacker crackdown...
+ # Should accept any other optional hash as an enviroment variable.
+ "57793320d4997a673fc7062652da0596c36a4e9fbe31310d2281e67d56d82469")
+ d.addCallback(printValue, TimeWitness(init_time))
+ return d
+
+
+def getAllDocs():
+ return dbpool.runU1DBQuery("get_all_docs")
+
+
+def errBack(e):
+ debug("[!] ERROR FOUND!!!")
+ e.printTraceback()
+ reactor.stop()
+
+
+def countDocs(_):
+ debug("counting docs...")
+ d = getAllDocs()
+ d.addCallbacks(printResult, errBack)
+ d.addCallbacks(allDone, errBack)
+ return d
+
+
+def printResult(r, **kwargs):
+ if kwargs:
+ debug(*kwargs.values())
+ elif isinstance(r, l2db.Document):
+ debug(r.doc_id, r.content['number'])
+ else:
+ len_results = len(r[1])
+ debug("GOT %s results" % len(r[1]))
+
+ if len_results == numdocs:
+ debug("ALL GOOD")
+ else:
+ debug("[!] MISSING DOCS!!!!!")
+ raise ValueError("We didn't expect this result len")
+
+
+def allDone(_):
+ debug("ALL DONE!")
+
+ end_time = datetime.datetime.now()
+ print((end_time - start_time).total_seconds())
+ reactor.stop()
+
+
+def insert_docs(_):
+ deferreds = []
+ for i in range(numdocs):
+ payload = SAMPLE[i]
+ chash = hashlib.sha256(payload).hexdigest()
+ doc = {"number": i, "payload": payload, 'chash': chash}
+ d = createDoc(doc, doc_id=chash)
+ d.addCallbacks(partial(printResult, i=i, chash=chash, payload=payload),
+ lambda e: e.printTraceback())
+ deferreds.append(d)
+ return defer.gatherResults(deferreds, consumeErrors=True)
+
+
+d = create_indexes(None)
+d.addCallback(insert_docs)
+d.addCallback(get_from_index)
+d.addCallback(countDocs)
+
+reactor.run()
diff --git a/src/leap/soledad/client/examples/compare.txt b/src/leap/soledad/client/examples/compare.txt
new file mode 100644
index 00000000..19a1325a
--- /dev/null
+++ b/src/leap/soledad/client/examples/compare.txt
@@ -0,0 +1,8 @@
+TIMES=100 TMPDIR=/media/sdb5/leap python use_adbapi.py 1.34s user 0.16s system 53% cpu 2.832 total
+TIMES=100 TMPDIR=/media/sdb5/leap python use_api.py 1.22s user 0.14s system 62% cpu 2.181 total
+
+TIMES=1000 TMPDIR=/media/sdb5/leap python use_api.py 2.18s user 0.34s system 27% cpu 9.213 total
+TIMES=1000 TMPDIR=/media/sdb5/leap python use_adbapi.py 2.40s user 0.34s system 39% cpu 7.004 total
+
+TIMES=5000 TMPDIR=/media/sdb5/leap python use_api.py 6.63s user 1.27s system 13% cpu 57.882 total
+TIMES=5000 TMPDIR=/media/sdb5/leap python use_adbapi.py 6.84s user 1.26s system 36% cpu 22.367 total
diff --git a/src/leap/soledad/client/examples/manifest.phk b/src/leap/soledad/client/examples/manifest.phk
new file mode 100644
index 00000000..2c86c07d
--- /dev/null
+++ b/src/leap/soledad/client/examples/manifest.phk
@@ -0,0 +1,50 @@
+The Hacker's Manifesto
+
+The Hacker's Manifesto
+by: The Mentor
+
+Another one got caught today, it's all over the papers. "Teenager
+Arrested in Computer Crime Scandal", "Hacker Arrested after Bank
+Tampering." "Damn kids. They're all alike." But did you, in your
+three-piece psychology and 1950's technobrain, ever take a look behind
+the eyes of the hacker? Did you ever wonder what made him tick, what
+forces shaped him, what may have molded him? I am a hacker, enter my
+world. Mine is a world that begins with school. I'm smarter than most of
+the other kids, this crap they teach us bores me. "Damn underachiever.
+They're all alike." I'm in junior high or high school. I've listened to
+teachers explain for the fifteenth time how to reduce a fraction. I
+understand it. "No, Ms. Smith, I didn't show my work. I did it in
+my head." "Damn kid. Probably copied it. They're all alike." I made a
+discovery today. I found a computer. Wait a second, this is cool. It does
+what I want it to. If it makes a mistake, it's because I screwed it up.
+Not because it doesn't like me, or feels threatened by me, or thinks I'm
+a smart ass, or doesn't like teaching and shouldn't be here. Damn kid.
+All he does is play games. They're all alike. And then it happened... a
+door opened to a world... rushing through the phone line like heroin
+through an addict's veins, an electronic pulse is sent out, a refuge from
+the day-to-day incompetencies is sought... a board is found. "This is
+it... this is where I belong..." I know everyone here... even if I've
+never met them, never talked to them, may never hear from them again... I
+know you all... Damn kid. Tying up the phone line again. They're all
+alike... You bet your ass we're all alike... we've been spoon-fed baby
+food at school when we hungered for steak... the bits of meat that you
+did let slip through were pre-chewed and tasteless. We've been dominated
+by sadists, or ignored by the apathetic. The few that had something to
+teach found us willing pupils, but those few are like drops of water in
+the desert. This is our world now... the world of the electron and the
+switch, the beauty of the baud. We make use of a service already existing
+without paying for what could be dirt-cheap if it wasn't run by
+profiteering gluttons, and you call us criminals. We explore... and you
+call us criminals. We seek after knowledge... and you call us criminals.
+We exist without skin color, without nationality, without religious
+bias... and you call us criminals. You build atomic bombs, you wage wars,
+you murder, cheat, and lie to us and try to make us believe it's for our
+own good, yet we're the criminals. Yes, I am a criminal. My crime is that
+of curiosity. My crime is that of judging people by what they say and
+think, not what they look like. My crime is that of outsmarting you,
+something that you will never forgive me for. I am a hacker, and this is
+my manifesto. You may stop this individual, but you can't stop us all...
+after all, we're all alike.
+
+This was the last published file written by The Mentor. Shortly after
+releasing it, he was busted by the FBI. The Mentor, sadly missed.
diff --git a/src/leap/soledad/client/examples/plot-async-db.py b/src/leap/soledad/client/examples/plot-async-db.py
new file mode 100644
index 00000000..018a1a1d
--- /dev/null
+++ b/src/leap/soledad/client/examples/plot-async-db.py
@@ -0,0 +1,45 @@
+import csv
+from matplotlib import pyplot as plt
+
+FILE = "bench.csv"
+
+# config the plot
+plt.xlabel('number of inserts')
+plt.ylabel('time (seconds)')
+plt.title('SQLCipher parallelization')
+
+kwargs = {
+ 'linewidth': 1.0,
+ 'linestyle': '-',
+}
+
+series = (('sync', 'r'),
+ ('async', 'g'))
+
+data = {'mark': [],
+ 'sync': [],
+ 'async': []}
+
+with open(FILE, 'rb') as csvfile:
+ series_reader = csv.reader(csvfile, delimiter=',')
+ for m, s, a in series_reader:
+ data['mark'].append(int(m))
+ data['sync'].append(float(s))
+ data['async'].append(float(a))
+
+xmax = max(data['mark'])
+xmin = min(data['mark'])
+ymax = max(data['sync'] + data['async'])
+ymin = min(data['sync'] + data['async'])
+
+for run in series:
+ name = run[0]
+ color = run[1]
+ plt.plot(data['mark'], data[name], label=name, color=color, **kwargs)
+
+plt.axes().annotate("", xy=(xmax, ymax))
+plt.axes().annotate("", xy=(xmin, ymin))
+
+plt.grid()
+plt.legend()
+plt.show()
diff --git a/src/leap/soledad/client/examples/run_benchmark.py b/src/leap/soledad/client/examples/run_benchmark.py
new file mode 100644
index 00000000..ddedf433
--- /dev/null
+++ b/src/leap/soledad/client/examples/run_benchmark.py
@@ -0,0 +1,30 @@
+"""
+Run a mini-benchmark between regular api and dbapi
+"""
+import commands
+import os
+import time
+
+TMPDIR = os.environ.get("TMPDIR", "/tmp")
+CSVFILE = 'bench.csv'
+
+cmd = "SILENT=1 TIMES={times} TMPDIR={tmpdir} python ./use_{version}api.py"
+
+
+def parse_time(r):
+ return r.split('\n')[-1]
+
+
+with open(CSVFILE, 'w') as log:
+
+ for times in range(0, 10000, 500):
+ cmd1 = cmd.format(times=times, tmpdir=TMPDIR, version="")
+ sync_time = parse_time(commands.getoutput(cmd1))
+
+ cmd2 = cmd.format(times=times, tmpdir=TMPDIR, version="adb")
+ async_time = parse_time(commands.getoutput(cmd2))
+
+ print times, sync_time, async_time
+ log.write("%s, %s, %s\n" % (times, sync_time, async_time))
+ log.flush()
+ time.sleep(2)
diff --git a/src/leap/soledad/client/examples/soledad_sync.py b/src/leap/soledad/client/examples/soledad_sync.py
new file mode 100644
index 00000000..3aed10eb
--- /dev/null
+++ b/src/leap/soledad/client/examples/soledad_sync.py
@@ -0,0 +1,63 @@
+from leap.bitmask.config.providerconfig import ProviderConfig
+from leap.bitmask.crypto.srpauth import SRPAuth
+from leap.soledad.client import Soledad
+from twisted.internet import reactor
+import logging
+logging.basicConfig(level=logging.DEBUG)
+
+
+# EDIT THIS --------------------------------------------
+user = u"USERNAME"
+uuid = u"USERUUID"
+_pass = u"USERPASS"
+server_url = "https://soledad.server.example.org:2323"
+# EDIT THIS --------------------------------------------
+
+secrets_path = "/tmp/%s.secrets" % uuid
+local_db_path = "/tmp/%s.soledad" % uuid
+cert_file = "/tmp/cacert.pem"
+provider_config = '/tmp/cdev.json'
+
+
+provider = ProviderConfig()
+provider.load(provider_config)
+
+soledad = None
+
+
+def printStuff(r):
+ print r
+
+
+def printErr(err):
+ logging.exception(err.value)
+
+
+def init_soledad(_):
+ token = srpauth.get_token()
+ print "token", token
+
+ global soledad
+ soledad = Soledad(uuid, _pass, secrets_path, local_db_path,
+ server_url, cert_file,
+ auth_token=token)
+
+ def getall(_):
+ d = soledad.get_all_docs()
+ return d
+
+ d1 = soledad.create_doc({"test": 42})
+ d1.addCallback(getall)
+ d1.addCallbacks(printStuff, printErr)
+
+ d2 = soledad.sync()
+ d2.addCallbacks(printStuff, printErr)
+ d2.addBoth(lambda r: reactor.stop())
+
+
+srpauth = SRPAuth(provider)
+
+d = srpauth.authenticate(user, _pass)
+d.addCallbacks(init_soledad, printErr)
+
+reactor.run()
diff --git a/src/leap/soledad/client/examples/use_adbapi.py b/src/leap/soledad/client/examples/use_adbapi.py
new file mode 100644
index 00000000..ddb1eaae
--- /dev/null
+++ b/src/leap/soledad/client/examples/use_adbapi.py
@@ -0,0 +1,105 @@
+# -*- coding: utf-8 -*-
+# use_adbapi.py
+# Copyright (C) 2014 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Example of use of the asynchronous soledad api.
+"""
+from __future__ import print_function
+import datetime
+import os
+
+from twisted.internet import defer, reactor
+
+from leap.soledad.client import adbapi
+from leap.soledad.client._db.sqlcipher import SQLCipherOptions
+from leap.soledad.common import l2db
+
+
+folder = os.environ.get("TMPDIR", "tmp")
+times = int(os.environ.get("TIMES", "1000"))
+silent = os.environ.get("SILENT", False)
+
+tmpdb = os.path.join(folder, "test.soledad")
+
+
+def debug(*args):
+ if not silent:
+ print(*args)
+
+
+debug("[+] db path:", tmpdb)
+debug("[+] times", times)
+
+if os.path.isfile(tmpdb):
+ debug("[+] Removing existing db file...")
+ os.remove(tmpdb)
+
+start_time = datetime.datetime.now()
+
+opts = SQLCipherOptions(tmpdb, "secret", create=True)
+dbpool = adbapi.getConnectionPool(opts)
+
+
+def createDoc(doc):
+ return dbpool.runU1DBQuery("create_doc", doc)
+
+
+def getAllDocs():
+ return dbpool.runU1DBQuery("get_all_docs")
+
+
+def countDocs(_):
+ debug("counting docs...")
+ d = getAllDocs()
+ d.addCallbacks(printResult, lambda e: e.printTraceback())
+ d.addBoth(allDone)
+
+
+def printResult(r):
+ if isinstance(r, l2db.Document):
+ debug(r.doc_id, r.content['number'])
+ else:
+ len_results = len(r[1])
+ debug("GOT %s results" % len(r[1]))
+
+ if len_results == times:
+ debug("ALL GOOD")
+ else:
+ raise ValueError("We didn't expect this result len")
+
+
+def allDone(_):
+ debug("ALL DONE!")
+ if silent:
+ end_time = datetime.datetime.now()
+ print((end_time - start_time).total_seconds())
+ reactor.stop()
+
+
+deferreds = []
+payload = open('manifest.phk').read()
+
+for i in range(times):
+ doc = {"number": i, "payload": payload}
+ d = createDoc(doc)
+ d.addCallbacks(printResult, lambda e: e.printTraceback())
+ deferreds.append(d)
+
+
+all_done = defer.gatherResults(deferreds, consumeErrors=True)
+all_done.addCallback(countDocs)
+
+reactor.run()
diff --git a/src/leap/soledad/client/examples/use_api.py b/src/leap/soledad/client/examples/use_api.py
new file mode 100644
index 00000000..db77c4b3
--- /dev/null
+++ b/src/leap/soledad/client/examples/use_api.py
@@ -0,0 +1,69 @@
+# -*- coding: utf-8 -*-
+# use_api.py
+# Copyright (C) 2014 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Example of use of the soledad api.
+"""
+from __future__ import print_function
+import datetime
+import os
+
+from leap.soledad.client import sqlcipher
+from leap.soledad.client.sqlcipher import SQLCipherOptions
+
+
+folder = os.environ.get("TMPDIR", "tmp")
+times = int(os.environ.get("TIMES", "1000"))
+silent = os.environ.get("SILENT", False)
+
+tmpdb = os.path.join(folder, "test.soledad")
+
+
+def debug(*args):
+ if not silent:
+ print(*args)
+
+
+debug("[+] db path:", tmpdb)
+debug("[+] times", times)
+
+if os.path.isfile(tmpdb):
+ debug("[+] Removing existing db file...")
+ os.remove(tmpdb)
+
+start_time = datetime.datetime.now()
+
+opts = SQLCipherOptions(tmpdb, "secret", create=True)
+db = sqlcipher.SQLCipherDatabase(opts)
+
+
+def allDone():
+ debug("ALL DONE!")
+
+
+payload = open('manifest.phk').read()
+
+for i in range(times):
+ doc = {"number": i, "payload": payload}
+ d = db.create_doc(doc)
+ debug(d.doc_id, d.content['number'])
+
+debug("Count", len(db.get_all_docs()[1]))
+if silent:
+ end_time = datetime.datetime.now()
+ print((end_time - start_time).total_seconds())
+
+allDone()
diff --git a/src/leap/soledad/client/http_target/__init__.py b/src/leap/soledad/client/http_target/__init__.py
new file mode 100644
index 00000000..b67d03f6
--- /dev/null
+++ b/src/leap/soledad/client/http_target/__init__.py
@@ -0,0 +1,94 @@
+# -*- coding: utf-8 -*-
+# __init__.py
+# Copyright (C) 2015 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+"""
+A U1DB backend for encrypting data before sending to server and decrypting
+after receiving.
+"""
+
+
+import os
+
+from twisted.web.client import Agent
+from twisted.internet import reactor
+
+from leap.common.certs import get_compatible_ssl_context_factory
+from leap.soledad.common.log import getLogger
+from leap.soledad.client.http_target.send import HTTPDocSender
+from leap.soledad.client.http_target.api import SyncTargetAPI
+from leap.soledad.client.http_target.fetch import HTTPDocFetcher
+from leap.soledad.client import crypto as old_crypto
+
+
+logger = getLogger(__name__)
+
+
+# we may want to collect statistics from the sync process
+DO_STATS = False
+if os.environ.get('SOLEDAD_STATS'):
+ DO_STATS = True
+
+
+class SoledadHTTPSyncTarget(SyncTargetAPI, HTTPDocSender, HTTPDocFetcher):
+
+ """
+ A SyncTarget that encrypts data before sending and decrypts data after
+ receiving.
+
+ Normally encryption will have been written to the sync database upon
+ document modification. The sync database is also used to write temporarily
+ the parsed documents that the remote send us, before being decrypted and
+ written to the main database.
+ """
+ def __init__(self, url, source_replica_uid, creds, crypto, cert_file):
+ """
+ Initialize the sync target.
+
+ :param url: The server sync url.
+ :type url: str
+ :param source_replica_uid: The source replica uid which we use when
+ deferring decryption.
+ :type source_replica_uid: str
+ :param creds: A dictionary containing the uuid and token.
+ :type creds: creds
+ :param crypto: An instance of SoledadCrypto so we can encrypt/decrypt
+ document contents when syncing.
+ :type crypto: soledad._crypto.SoledadCrypto
+ :param cert_file: Path to the certificate of the ca used to validate
+ the SSL certificate used by the remote soledad
+ server.
+ :type cert_file: str
+ """
+ if url.endswith("/"):
+ url = url[:-1]
+ self._url = str(url) + "/sync-from/" + str(source_replica_uid)
+ self.source_replica_uid = source_replica_uid
+ self._auth_header = None
+ self._uuid = None
+ self.set_creds(creds)
+ self._crypto = crypto
+ # TODO: DEPRECATED CRYPTO
+ self._deprecated_crypto = old_crypto.SoledadCrypto(crypto.secret)
+ self._insert_doc_cb = None
+
+ # Twisted default Agent with our own ssl context factory
+ factory = get_compatible_ssl_context_factory(cert_file)
+ self._http = Agent(reactor, factory)
+
+ if DO_STATS:
+ self.sync_exchange_phase = [0]
diff --git a/src/leap/soledad/client/http_target/api.py b/src/leap/soledad/client/http_target/api.py
new file mode 100644
index 00000000..c68185c6
--- /dev/null
+++ b/src/leap/soledad/client/http_target/api.py
@@ -0,0 +1,248 @@
+# -*- coding: utf-8 -*-
+# api.py
+# Copyright (C) 2015 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import os
+import json
+import base64
+
+from six import StringIO
+from uuid import uuid4
+
+from twisted.internet import defer
+from twisted.web.http_headers import Headers
+from twisted.web.client import FileBodyProducer
+
+from leap.soledad.client.http_target.support import readBody
+from leap.soledad.common.errors import InvalidAuthTokenError
+from leap.soledad.common.l2db.errors import HTTPError
+from leap.soledad.common.l2db import SyncTarget
+
+
+# we may want to collect statistics from the sync process
+DO_STATS = False
+if os.environ.get('SOLEDAD_STATS'):
+ DO_STATS = True
+
+
+class SyncTargetAPI(SyncTarget):
+ """
+ Declares public methods and implements u1db.SyncTarget.
+ """
+
+ @property
+ def uuid(self):
+ return self._uuid
+
+ def set_creds(self, creds):
+ """
+ Update credentials.
+
+ :param creds: A dictionary containing the uuid and token.
+ :type creds: dict
+ """
+ uuid = creds['token']['uuid']
+ token = creds['token']['token']
+ self._uuid = uuid
+ auth = '%s:%s' % (uuid, token)
+ b64_token = base64.b64encode(auth)
+ self._auth_header = {'Authorization': ['Token %s' % b64_token]}
+
+ @property
+ def _base_header(self):
+ return self._auth_header.copy() if self._auth_header else {}
+
+ def _http_request(self, url, method='GET', body=None, headers=None,
+ content_type=None, body_reader=readBody,
+ body_producer=None):
+ headers = headers or self._base_header
+ if content_type:
+ headers.update({'content-type': [content_type]})
+ if not body_producer and body:
+ body = FileBodyProducer(StringIO(body))
+ elif body_producer:
+ # Upload case, check send.py
+ body = body_producer(body)
+ d = self._http.request(
+ method, url, headers=Headers(headers), bodyProducer=body)
+ d.addCallback(body_reader)
+ d.addErrback(_unauth_to_invalid_token_error)
+ return d
+
+ @defer.inlineCallbacks
+ def get_sync_info(self, source_replica_uid):
+ """
+ Return information about known state of remote database.
+
+ Return the replica_uid and the current database generation of the
+ remote database, and its last-seen database generation for the client
+ replica.
+
+ :param source_replica_uid: The client-size replica uid.
+ :type source_replica_uid: str
+
+ :return: A deferred which fires with (target_replica_uid,
+ target_replica_generation, target_trans_id,
+ source_replica_last_known_generation,
+ source_replica_last_known_transaction_id)
+ :rtype: twisted.internet.defer.Deferred
+ """
+ raw = yield self._http_request(self._url)
+ res = json.loads(raw)
+ defer.returnValue((
+ res['target_replica_uid'],
+ res['target_replica_generation'],
+ res['target_replica_transaction_id'],
+ res['source_replica_generation'],
+ res['source_transaction_id']
+ ))
+
+ def record_sync_info(
+ self, source_replica_uid, source_replica_generation,
+ source_replica_transaction_id):
+ """
+ Record tip information for another replica.
+
+ After sync_exchange has been processed, the caller will have
+ received new content from this replica. This call allows the
+ source replica instigating the sync to inform us what their
+ generation became after applying the documents we returned.
+
+ This is used to allow future sync operations to not need to repeat data
+ that we just talked about. It also means that if this is called at the
+ wrong time, there can be database records that will never be
+ synchronized.
+
+ :param source_replica_uid: The identifier for the source replica.
+ :type source_replica_uid: str
+ :param source_replica_generation: The database generation for the
+ source replica.
+ :type source_replica_generation: int
+ :param source_replica_transaction_id: The transaction id associated
+ with the source replica
+ generation.
+ :type source_replica_transaction_id: str
+
+ :return: A deferred which fires with the result of the query.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ data = json.dumps({
+ 'generation': source_replica_generation,
+ 'transaction_id': source_replica_transaction_id
+ })
+ return self._http_request(
+ self._url,
+ method='PUT',
+ body=data,
+ content_type='application/json')
+
+ @defer.inlineCallbacks
+ def sync_exchange(self, docs_by_generation, source_replica_uid,
+ last_known_generation, last_known_trans_id,
+ insert_doc_cb, ensure_callback=None,
+ sync_id=None):
+ """
+ Find out which documents the remote database does not know about,
+ encrypt and send them. After that, receive documents from the remote
+ database.
+
+ :param docs_by_generations: A list of (doc_id, generation, trans_id)
+ of local documents that were changed since
+ the last local generation the remote
+ replica knows about.
+ :type docs_by_generations: list of tuples
+
+ :param source_replica_uid: The uid of the source replica.
+ :type source_replica_uid: str
+
+ :param last_known_generation: Target's last known generation.
+ :type last_known_generation: int
+
+ :param last_known_trans_id: Target's last known transaction id.
+ :type last_known_trans_id: str
+
+ :param insert_doc_cb: A callback for inserting received documents from
+ target. If not overriden, this will call u1db
+ insert_doc_from_target in synchronizer, which
+ implements the TAKE OTHER semantics.
+ :type insert_doc_cb: function
+
+ :param ensure_callback: A callback that ensures we know the target
+ replica uid if the target replica was just
+ created.
+ :type ensure_callback: function
+
+ :return: A deferred which fires with the new generation and
+ transaction id of the target replica.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ # ---------- phase 1: send docs to server ----------------------------
+ if DO_STATS:
+ self.sync_exchange_phase[0] += 1
+ # --------------------------------------------------------------------
+
+ self._ensure_callback = ensure_callback
+
+ if sync_id is None:
+ sync_id = str(uuid4())
+ self.source_replica_uid = source_replica_uid
+
+ # save a reference to the callback so we can use it after decrypting
+ self._insert_doc_cb = insert_doc_cb
+
+ gen_after_send, trans_id_after_send = yield self._send_docs(
+ docs_by_generation,
+ last_known_generation,
+ last_known_trans_id,
+ sync_id)
+
+ # ---------- phase 2: receive docs -----------------------------------
+ if DO_STATS:
+ self.sync_exchange_phase[0] += 1
+ # --------------------------------------------------------------------
+
+ cur_target_gen, cur_target_trans_id = yield self._receive_docs(
+ last_known_generation, last_known_trans_id,
+ ensure_callback, sync_id)
+
+ # update gen and trans id info in case we just sent and did not
+ # receive docs.
+ if gen_after_send is not None and gen_after_send > cur_target_gen:
+ cur_target_gen = gen_after_send
+ cur_target_trans_id = trans_id_after_send
+
+ # ---------- phase 3: sync exchange is over --------------------------
+ if DO_STATS:
+ self.sync_exchange_phase[0] += 1
+ # --------------------------------------------------------------------
+
+ defer.returnValue([cur_target_gen, cur_target_trans_id])
+
+
+def _unauth_to_invalid_token_error(failure):
+ """
+ An errback to translate unauthorized errors to our own invalid token
+ class.
+
+ :param failure: The original failure.
+ :type failure: twisted.python.failure.Failure
+
+ :return: Either the original failure or an invalid auth token error.
+ :rtype: twisted.python.failure.Failure
+ """
+ failure.trap(HTTPError)
+ if failure.value.status == 401:
+ raise InvalidAuthTokenError
+ return failure
diff --git a/src/leap/soledad/client/http_target/fetch.py b/src/leap/soledad/client/http_target/fetch.py
new file mode 100644
index 00000000..9d456830
--- /dev/null
+++ b/src/leap/soledad/client/http_target/fetch.py
@@ -0,0 +1,161 @@
+# -*- coding: utf-8 -*-
+# fetch.py
+# Copyright (C) 2015 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import json
+from twisted.internet import defer
+from twisted.internet import threads
+
+from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS
+from leap.soledad.client.events import emit_async
+from leap.soledad.client.http_target.support import RequestBody
+from leap.soledad.common.log import getLogger
+from leap.soledad.client._crypto import is_symmetrically_encrypted
+from leap.soledad.common.l2db import errors
+from leap.soledad.client import crypto as old_crypto
+
+from .._document import Document
+from . import fetch_protocol
+
+logger = getLogger(__name__)
+
+
+class HTTPDocFetcher(object):
+ """
+ Handles Document fetching from Soledad server, using HTTP as transport.
+ Steps:
+ * Prepares metadata by asking server for one document
+ * Fetch the total on response and prepare to ask all remaining
+ * (async) Documents will come encrypted.
+ So we parse, decrypt and insert locally as they arrive.
+ """
+
+ # The uuid of the local replica.
+ # Any class inheriting from this one should provide a meaningful attribute
+ # if the sync status event is meant to be used somewhere else.
+
+ uuid = 'undefined'
+ userid = 'undefined'
+
+ @defer.inlineCallbacks
+ def _receive_docs(self, last_known_generation, last_known_trans_id,
+ ensure_callback, sync_id):
+ new_generation = last_known_generation
+ new_transaction_id = last_known_trans_id
+ # Acts as a queue, ensuring line order on async processing
+ # as `self._insert_doc_cb` cant be run concurrently or out of order.
+ # DeferredSemaphore solves the concurrency and its implementation uses
+ # a queue, solving the ordering.
+ # FIXME: Find a proper solution to avoid surprises on Twisted changes
+ self.semaphore = defer.DeferredSemaphore(1)
+
+ metadata = yield self._fetch_all(
+ last_known_generation, last_known_trans_id,
+ sync_id)
+ number_of_changes, ngen, ntrans = self._parse_metadata(metadata)
+
+ # wait for pending inserts
+ yield self.semaphore.acquire()
+
+ if ngen:
+ new_generation = ngen
+ new_transaction_id = ntrans
+
+ defer.returnValue([new_generation, new_transaction_id])
+
+ def _fetch_all(self, last_known_generation,
+ last_known_trans_id, sync_id):
+ # add remote replica metadata to the request
+ body = RequestBody(
+ last_known_generation=last_known_generation,
+ last_known_trans_id=last_known_trans_id,
+ sync_id=sync_id,
+ ensure=self._ensure_callback is not None)
+ self._received_docs = 0
+ # build a stream reader with _doc_parser as a callback
+ body_reader = fetch_protocol.build_body_reader(self._doc_parser)
+ # start download stream
+ return self._http_request(
+ self._url,
+ method='POST',
+ body=str(body),
+ content_type='application/x-soledad-sync-get',
+ body_reader=body_reader)
+
+ @defer.inlineCallbacks
+ def _doc_parser(self, doc_info, content, total):
+ """
+ Insert a received document into the local replica, decrypting
+ if necessary. The case where it's not decrypted is when a doc gets
+ inserted from Server side with a GPG encrypted content.
+
+ :param doc_info: Dictionary representing Document information.
+ :type doc_info: dict
+ :param content: The Document's content.
+ :type idx: str
+ :param total: The total number of operations.
+ :type total: int
+ """
+ yield self.semaphore.run(self.__atomic_doc_parse, doc_info, content,
+ total)
+
+ @defer.inlineCallbacks
+ def __atomic_doc_parse(self, doc_info, content, total):
+ doc = Document(doc_info['id'], doc_info['rev'], content)
+ if is_symmetrically_encrypted(content):
+ content = (yield self._crypto.decrypt_doc(doc)).getvalue()
+ elif old_crypto.is_symmetrically_encrypted(doc):
+ content = self._deprecated_crypto.decrypt_doc(doc)
+ doc.set_json(content)
+
+ # TODO insert blobs here on the blob backend
+ # FIXME: This is wrong. Using the very same SQLite connection object
+ # from multiple threads is dangerous. We should bring the dbpool here
+ # or find an alternative. Deferring to a thread only helps releasing
+ # the reactor for other tasks as this is an IO intensive call.
+ yield threads.deferToThread(self._insert_doc_cb,
+ doc, doc_info['gen'], doc_info['trans_id'])
+ self._received_docs += 1
+ user_data = {'uuid': self.uuid, 'userid': self.userid}
+ _emit_receive_status(user_data, self._received_docs, total=total)
+
+ def _parse_metadata(self, metadata):
+ """
+ Parse the response from the server containing the sync metadata.
+
+ :param response: Metadata as string
+ :type response: str
+
+ :return: (number_of_changes, new_gen, new_trans_id)
+ :rtype: tuple
+ """
+ try:
+ metadata = json.loads(metadata)
+ # make sure we have replica_uid from fresh new dbs
+ if self._ensure_callback and 'replica_uid' in metadata:
+ self._ensure_callback(metadata['replica_uid'])
+ return (metadata['number_of_changes'], metadata['new_generation'],
+ metadata['new_transaction_id'])
+ except (ValueError, KeyError):
+ raise errors.BrokenSyncStream('Metadata parsing failed')
+
+
+def _emit_receive_status(user_data, received_docs, total):
+ content = {'received': received_docs, 'total': total}
+ emit_async(SOLEDAD_SYNC_RECEIVE_STATUS, user_data, content)
+
+ if received_docs % 20 == 0:
+ msg = "%d/%d" % (received_docs, total)
+ logger.debug("Sync receive status: %s" % msg)
diff --git a/src/leap/soledad/client/http_target/fetch_protocol.py b/src/leap/soledad/client/http_target/fetch_protocol.py
new file mode 100644
index 00000000..851eb3a1
--- /dev/null
+++ b/src/leap/soledad/client/http_target/fetch_protocol.py
@@ -0,0 +1,157 @@
+# -*- coding: utf-8 -*-
+# fetch_protocol.py
+# Copyright (C) 2016 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import json
+from functools import partial
+from six import StringIO
+from twisted.web._newclient import ResponseDone
+from leap.soledad.common.l2db import errors
+from leap.soledad.common.l2db.remote import utils
+from leap.soledad.common.log import getLogger
+from .support import ReadBodyProtocol
+from .support import readBody
+
+logger = getLogger(__name__)
+
+
+class DocStreamReceiver(ReadBodyProtocol):
+ """
+ A protocol implementation that can parse incoming data from server based
+ on a line format specified on u1db implementation. Except that we split doc
+ attributes from content to ease parsing and increment throughput for larger
+ documents.
+ [\r\n
+ {metadata},\r\n
+ {doc_info},\r\n
+ {content},\r\n
+ ...
+ {doc_info},\r\n
+ {content},\r\n
+ ]
+ """
+
+ def __init__(self, response, deferred, doc_reader):
+ self.deferred = deferred
+ self.status = response.code if response else None
+ self.message = response.phrase if response else None
+ self.headers = response.headers if response else {}
+ self.delimiter = '\r\n'
+ self.metadata = ''
+ self._doc_reader = doc_reader
+ self.reset()
+
+ def reset(self):
+ self._line = 0
+ self._buffer = StringIO()
+ self._properly_finished = False
+
+ def connectionLost(self, reason):
+ """
+ Deliver the accumulated response bytes to the waiting L{Deferred}, if
+ the response body has been completely received without error.
+ """
+ if self.deferred.called:
+ return
+ try:
+ if reason.check(ResponseDone):
+ self.dataBuffer = self.metadata
+ else:
+ self.dataBuffer = self.finish()
+ except errors.BrokenSyncStream as e:
+ return self.deferred.errback(e)
+ return ReadBodyProtocol.connectionLost(self, reason)
+
+ def consumeBufferLines(self):
+ """
+ Consumes lines from buffer and rewind it, writing remaining data
+ that didn't formed a line back into buffer.
+ """
+ content = self._buffer.getvalue()[0:self._buffer.tell()]
+ self._buffer.seek(0)
+ lines = content.split(self.delimiter)
+ self._buffer.write(lines.pop(-1))
+ return lines
+
+ def dataReceived(self, data):
+ """
+ Buffer incoming data until a line breaks comes in. We check only
+ the incoming data for efficiency.
+ """
+ self._buffer.write(data)
+ if '\n' not in data:
+ return
+ lines = self.consumeBufferLines()
+ while lines:
+ line, _ = utils.check_and_strip_comma(lines.pop(0))
+ self.lineReceived(line)
+ self._line += 1
+
+ def lineReceived(self, line):
+ """
+ Protocol implementation.
+ 0: [\r\n
+ 1: {metadata},\r\n
+ (even): {doc_info},\r\n
+ (odd): {data},\r\n
+ (last): ]
+ """
+ if self._properly_finished:
+ raise errors.BrokenSyncStream("Reading a finished stream")
+ if ']' == line:
+ self._properly_finished = True
+ elif self._line == 0:
+ if line is not '[':
+ raise errors.BrokenSyncStream("Invalid start")
+ elif self._line == 1:
+ self.metadata = line
+ if 'error' in self.metadata:
+ raise errors.BrokenSyncStream("Error from server: %s" % line)
+ self.total = json.loads(line).get('number_of_changes', -1)
+ elif (self._line % 2) == 0:
+ self.current_doc = json.loads(line)
+ if 'error' in self.current_doc:
+ raise errors.BrokenSyncStream("Error from server: %s" % line)
+ else:
+ d = self._doc_reader(
+ self.current_doc, line.strip() or None, self.total)
+ d.addErrback(self.deferred.errback)
+
+ def finish(self):
+ """
+ Checks that ']' came and stream was properly closed.
+ """
+ if not self._properly_finished:
+ raise errors.BrokenSyncStream('Stream not properly closed')
+ content = self._buffer.getvalue()[0:self._buffer.tell()]
+ self._buffer.close()
+ return content
+
+
+def build_body_reader(doc_reader):
+ """
+ Get the documents from a sync stream and call doc_reader on each
+ doc received.
+
+ @param doc_reader: Function to be called for processing an incoming doc.
+ Will be called with doc metadata (dict parsed from 1st line) and doc
+ content (string)
+ @type doc_reader: function
+
+ @return: A function that can be called by the http Agent to create and
+ configure the proper protocol.
+ """
+ protocolClass = partial(DocStreamReceiver, doc_reader=doc_reader)
+ return partial(readBody, protocolClass=protocolClass)
diff --git a/src/leap/soledad/client/http_target/send.py b/src/leap/soledad/client/http_target/send.py
new file mode 100644
index 00000000..2b286ec5
--- /dev/null
+++ b/src/leap/soledad/client/http_target/send.py
@@ -0,0 +1,107 @@
+# -*- coding: utf-8 -*-
+# send.py
+# Copyright (C) 2015 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import json
+
+from twisted.internet import defer
+
+from leap.soledad.common.log import getLogger
+from leap.soledad.client.events import emit_async
+from leap.soledad.client.events import SOLEDAD_SYNC_SEND_STATUS
+from leap.soledad.client.http_target.support import RequestBody
+from .send_protocol import DocStreamProducer
+
+logger = getLogger(__name__)
+
+
+class HTTPDocSender(object):
+ """
+ Handles Document uploading from Soledad server, using HTTP as transport.
+ They need to be encrypted and metadata prepared before sending.
+ """
+
+ # The uuid of the local replica.
+ # Any class inheriting from this one should provide a meaningful attribute
+ # if the sync status event is meant to be used somewhere else.
+
+ uuid = 'undefined'
+ userid = 'undefined'
+
+ @defer.inlineCallbacks
+ def _send_docs(self, docs_by_generation, last_known_generation,
+ last_known_trans_id, sync_id):
+
+ if not docs_by_generation:
+ defer.returnValue([None, None])
+
+ # add remote replica metadata to the request
+ body = RequestBody(
+ last_known_generation=last_known_generation,
+ last_known_trans_id=last_known_trans_id,
+ sync_id=sync_id,
+ ensure=self._ensure_callback is not None)
+ result = yield self._send_batch(body, docs_by_generation)
+ response_dict = json.loads(result)[0]
+ gen_after_send = response_dict['new_generation']
+ trans_id_after_send = response_dict['new_transaction_id']
+ defer.returnValue([gen_after_send, trans_id_after_send])
+
+ @defer.inlineCallbacks
+ def _send_batch(self, body, docs):
+ total, calls = len(docs), []
+ for i, entry in enumerate(docs):
+ calls.append((self._prepare_one_doc,
+ entry, body, i + 1, total))
+ result = yield self._send_request(body, calls)
+ _emit_send_status(self.uuid, body.consumed, total)
+
+ defer.returnValue(result)
+
+ def _send_request(self, body, calls):
+ return self._http_request(
+ self._url,
+ method='POST',
+ body=(body, calls),
+ content_type='application/x-soledad-sync-put',
+ body_producer=DocStreamProducer)
+
+ @defer.inlineCallbacks
+ def _prepare_one_doc(self, entry, body, idx, total):
+ get_doc_call, gen, trans_id = entry
+ doc, content = yield self._encrypt_doc(get_doc_call)
+ body.insert_info(
+ id=doc.doc_id, rev=doc.rev, content=content, gen=gen,
+ trans_id=trans_id, number_of_docs=total,
+ doc_idx=idx)
+ _emit_send_status(self.uuid, body.consumed, total)
+
+ @defer.inlineCallbacks
+ def _encrypt_doc(self, get_doc_call):
+ f, args, kwargs = get_doc_call
+ doc = yield f(*args, **kwargs)
+ if doc.is_tombstone():
+ defer.returnValue((doc, None))
+ else:
+ content = yield self._crypto.encrypt_doc(doc)
+ defer.returnValue((doc, content))
+
+
+def _emit_send_status(user_data, idx, total):
+ content = {'sent': idx, 'total': total}
+ emit_async(SOLEDAD_SYNC_SEND_STATUS, user_data, content)
+
+ msg = "%d/%d" % (idx, total)
+ logger.debug("Sync send status: %s" % msg)
diff --git a/src/leap/soledad/client/http_target/send_protocol.py b/src/leap/soledad/client/http_target/send_protocol.py
new file mode 100644
index 00000000..4941aa34
--- /dev/null
+++ b/src/leap/soledad/client/http_target/send_protocol.py
@@ -0,0 +1,75 @@
+# -*- coding: utf-8 -*-
+# send_protocol.py
+# Copyright (C) 2016 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+from zope.interface import implementer
+from twisted.internet import defer
+from twisted.internet import reactor
+from twisted.web.iweb import IBodyProducer
+from twisted.web.iweb import UNKNOWN_LENGTH
+
+
+@implementer(IBodyProducer)
+class DocStreamProducer(object):
+ """
+ A producer that writes the body of a request to a consumer.
+ """
+
+ def __init__(self, producer):
+ """
+ Initialize the string produer.
+
+ :param producer: A RequestBody instance and a list of producer calls
+ :type producer: (.support.RequestBody, [(function, *args)])
+ """
+ self.body, self.producer = producer
+ self.length = UNKNOWN_LENGTH
+ self.pause = False
+ self.stop = False
+
+ @defer.inlineCallbacks
+ def startProducing(self, consumer):
+ """
+ Write the body to the consumer.
+
+ :param consumer: Any IConsumer provider.
+ :type consumer: twisted.internet.interfaces.IConsumer
+
+ :return: A Deferred that fires when production ends.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ while self.producer and not self.stop:
+ if self.pause:
+ yield self.sleep(0.001)
+ continue
+ call = self.producer.pop(0)
+ fun, args = call[0], call[1:]
+ yield fun(*args)
+ consumer.write(self.body.pop(1, leave_open=True))
+ consumer.write(self.body.pop(0)) # close stream
+
+ def sleep(self, secs):
+ d = defer.Deferred()
+ reactor.callLater(secs, d.callback, None)
+ return d
+
+ def pauseProducing(self):
+ self.pause = True
+
+ def stopProducing(self):
+ self.stop = True
+
+ def resumeProducing(self):
+ self.pause = False
diff --git a/src/leap/soledad/client/http_target/support.py b/src/leap/soledad/client/http_target/support.py
new file mode 100644
index 00000000..d8d8e420
--- /dev/null
+++ b/src/leap/soledad/client/http_target/support.py
@@ -0,0 +1,220 @@
+# -*- coding: utf-8 -*-
+# support.py
+# Copyright (C) 2015 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import warnings
+import json
+
+from twisted.internet import defer
+from twisted.web.client import _ReadBodyProtocol
+from twisted.web.client import PartialDownloadError
+from twisted.web._newclient import ResponseDone
+from twisted.web._newclient import PotentialDataLoss
+
+from leap.soledad.common.l2db import errors
+from leap.soledad.common.l2db.remote import http_errors
+
+# we want to make sure that HTTP errors will raise appropriate u1db errors,
+# that is, fire errbacks with the appropriate failures, in the context of
+# twisted. Because of that, we redefine the http body reader used by the HTTP
+# client below.
+
+
+class ReadBodyProtocol(_ReadBodyProtocol):
+ """
+ From original Twisted implementation, focused on adding our error
+ handling and ensuring that the proper u1db error is raised.
+ """
+
+ def __init__(self, response, deferred):
+ """
+ Initialize the protocol, additionally storing the response headers.
+ """
+ _ReadBodyProtocol.__init__(
+ self, response.code, response.phrase, deferred)
+ self.headers = response.headers
+
+ # ---8<--- snippet from u1db.remote.http_client, modified to use errbacks
+ def _error(self, respdic):
+ descr = respdic.get("error")
+ exc_cls = errors.wire_description_to_exc.get(descr)
+ if exc_cls is not None:
+ message = respdic.get("message")
+ self.deferred.errback(exc_cls(message))
+ else:
+ self.deferred.errback(
+ errors.HTTPError(self.status, respdic, self.headers))
+ # ---8<--- end of snippet from u1db.remote.http_client
+
+ def connectionLost(self, reason):
+ """
+ Deliver the accumulated response bytes to the waiting L{Deferred}, if
+ the response body has been completely received without error.
+ """
+ if reason.check(ResponseDone):
+
+ body = b''.join(self.dataBuffer)
+
+ # ---8<--- snippet from u1db.remote.http_client
+ if self.status in (200, 201):
+ self.deferred.callback(body)
+ elif self.status in http_errors.ERROR_STATUSES:
+ try:
+ respdic = json.loads(body)
+ except ValueError:
+ self.deferred.errback(
+ errors.HTTPError(self.status, body, self.headers))
+ else:
+ self._error(respdic)
+ # special cases
+ elif self.status == 503:
+ self.deferred.errback(errors.Unavailable(body, self.headers))
+ else:
+ self.deferred.errback(
+ errors.HTTPError(self.status, body, self.headers))
+ # ---8<--- end of snippet from u1db.remote.http_client
+
+ elif reason.check(PotentialDataLoss):
+ self.deferred.errback(
+ PartialDownloadError(self.status, self.message,
+ b''.join(self.dataBuffer)))
+ else:
+ self.deferred.errback(reason)
+
+
+def readBody(response, protocolClass=ReadBodyProtocol):
+ """
+ Get the body of an L{IResponse} and return it as a byte string.
+
+ This is a helper function for clients that don't want to incrementally
+ receive the body of an HTTP response.
+
+ @param response: The HTTP response for which the body will be read.
+ @type response: L{IResponse} provider
+
+ @return: A L{Deferred} which will fire with the body of the response.
+ Cancelling it will close the connection to the server immediately.
+ """
+ def cancel(deferred):
+ """
+ Cancel a L{readBody} call, close the connection to the HTTP server
+ immediately, if it is still open.
+
+ @param deferred: The cancelled L{defer.Deferred}.
+ """
+ abort = getAbort()
+ if abort is not None:
+ abort()
+
+ d = defer.Deferred(cancel)
+ protocol = protocolClass(response, d)
+
+ def getAbort():
+ return getattr(protocol.transport, 'abortConnection', None)
+
+ response.deliverBody(protocol)
+
+ if protocol.transport is not None and getAbort() is None:
+ warnings.warn(
+ 'Using readBody with a transport that does not have an '
+ 'abortConnection method',
+ category=DeprecationWarning,
+ stacklevel=2)
+
+ return d
+
+
+class RequestBody(object):
+ """
+ This class is a helper to generate send and fetch requests.
+ The expected format is something like:
+ [
+ {headers},
+ {entry1},
+ {...},
+ {entryN},
+ ]
+ """
+
+ def __init__(self, **header_dict):
+ """
+ Creates a new RequestBody holding header information.
+
+ :param header_dict: A dictionary with the headers.
+ :type header_dict: dict
+ """
+ self.headers = header_dict
+ self.entries = []
+ self.consumed = 0
+
+ def insert_info(self, **entry_dict):
+ """
+ Dumps an entry into JSON format and add it to entries list.
+ Adds 'content' key on a new line if it's present.
+
+ :param entry_dict: Entry as a dictionary
+ :type entry_dict: dict
+ """
+ content = ''
+ if 'content' in entry_dict:
+ content = ',\r\n' + (entry_dict['content'] or '')
+ entry = json.dumps(entry_dict) + content
+ self.entries.append(entry)
+
+ def pop(self, amount=10, leave_open=False):
+ """
+ Removes entries and returns it formatted and ready
+ to be sent.
+
+ :param amount: number of entries to pop and format
+ :type amount: int
+
+ :param leave_open: flag to skip stream closing
+ :type amount: bool
+
+ :return: formatted body ready to be sent
+ :rtype: str
+ """
+ start = self.consumed == 0
+ amount = min([len(self.entries), amount])
+ entries = [self.entries.pop(0) for i in xrange(amount)]
+ self.consumed += amount
+ end = len(self.entries) == 0 if not leave_open else False
+ return self.entries_to_str(entries, start, end)
+
+ def __str__(self):
+ return self.pop(len(self.entries))
+
+ def __len__(self):
+ return len(self.entries)
+
+ def entries_to_str(self, entries=None, start=True, end=True):
+ """
+ Format a list of entries into the body format expected
+ by the server.
+
+ :param entries: entries to format
+ :type entries: list
+
+ :return: formatted body ready to be sent
+ :rtype: str
+ """
+ data = ''
+ if start:
+ data = '[\r\n' + json.dumps(self.headers)
+ data += ''.join(',\r\n' + entry for entry in entries)
+ if end:
+ data += '\r\n]'
+ return data
diff --git a/src/leap/soledad/client/interfaces.py b/src/leap/soledad/client/interfaces.py
new file mode 100644
index 00000000..0600449f
--- /dev/null
+++ b/src/leap/soledad/client/interfaces.py
@@ -0,0 +1,368 @@
+# -*- coding: utf-8 -*-
+# interfaces.py
+# Copyright (C) 2014 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Interfaces used by the Soledad Client.
+"""
+from zope.interface import Interface, Attribute
+
+#
+# Plugins
+#
+
+
+class ISoledadPostSyncPlugin(Interface):
+ """
+ I implement the minimal methods and attributes for a plugin that can be
+ called after a soledad synchronization has ended.
+ """
+
+ def process_received_docs(self, doc_id_list):
+ """
+ Do something with the passed list of doc_ids received after the last
+ sync.
+
+ :param doc_id_list: a list of strings for the received doc_ids
+ """
+
+ watched_doc_types = Attribute("""
+ a tuple of the watched doc types for this plugin. So far, the
+ `doc-types` convention is just the preffix of the doc_id, which is
+ basically its first character, followed by a dash. So, for instance,
+ `M-` is used for meta-docs in mail, and `F-` is used for flag-docs in
+ mail. For now there's no central register of all the doc-types
+ used.""")
+
+
+#
+# Soledad storage
+#
+
+class ILocalStorage(Interface):
+ """
+ I implement core methods for the u1db local storage of documents and
+ indexes.
+ """
+ local_db_path = Attribute(
+ "The path for the local database replica")
+ local_db_file_name = Attribute(
+ "The name of the local SQLCipher U1DB database file")
+ uuid = Attribute("The user uuid")
+ default_prefix = Attribute(
+ "Prefix for default values for path")
+
+ def put_doc(self, doc):
+ """
+ Update a document in the local encrypted database.
+
+ :param doc: the document to update
+ :type doc: Document
+
+ :return:
+ a deferred that will fire with the new revision identifier for
+ the document
+ :rtype: Deferred
+ """
+
+ def delete_doc(self, doc):
+ """
+ Delete a document from the local encrypted database.
+
+ :param doc: the document to delete
+ :type doc: Document
+
+ :return:
+ a deferred that will fire with ...
+ :rtype: Deferred
+ """
+
+ def get_doc(self, doc_id, include_deleted=False):
+ """
+ Retrieve a document from the local encrypted database.
+
+ :param doc_id: the unique document identifier
+ :type doc_id: str
+ :param include_deleted:
+ if True, deleted documents will be returned with empty content;
+ otherwise asking for a deleted document will return None
+ :type include_deleted: bool
+
+ :return:
+ A deferred that will fire with the document object, containing a
+ Document, or None if it could not be found
+ :rtype: Deferred
+ """
+
+ def get_docs(self, doc_ids, check_for_conflicts=True,
+ include_deleted=False):
+ """
+ Get the content for many documents.
+
+ :param doc_ids: a list of document identifiers
+ :type doc_ids: list
+ :param check_for_conflicts: if set False, then the conflict check will
+ be skipped, and 'None' will be returned instead of True/False
+ :type check_for_conflicts: bool
+
+ :return:
+ A deferred that will fire with an iterable giving the Document
+ object for each document id in matching doc_ids order.
+ :rtype: Deferred
+ """
+
+ def get_all_docs(self, include_deleted=False):
+ """
+ Get the JSON content for all documents in the database.
+
+ :param include_deleted: If set to True, deleted documents will be
+ returned with empty content. Otherwise deleted
+ documents will not be included in the results.
+ :return:
+ A deferred that will fire with (generation, [Document]): that is,
+ the current generation of the database, followed by a list of all
+ the documents in the database.
+ :rtype: Deferred
+ """
+
+ def create_doc(self, content, doc_id=None):
+ """
+ Create a new document in the local encrypted database.
+
+ :param content: the contents of the new document
+ :type content: dict
+ :param doc_id: an optional identifier specifying the document id
+ :type doc_id: str
+
+ :return:
+ A deferred tht will fire with the new document (Document
+ instance).
+ :rtype: Deferred
+ """
+
+ def create_doc_from_json(self, json, doc_id=None):
+ """
+ Create a new document.
+
+ You can optionally specify the document identifier, but the document
+ must not already exist. See 'put_doc' if you want to override an
+ existing document.
+ If the database specifies a maximum document size and the document
+ exceeds it, create will fail and raise a DocumentTooBig exception.
+
+ :param json: The JSON document string
+ :type json: str
+ :param doc_id: An optional identifier specifying the document id.
+ :type doc_id:
+ :return:
+ A deferred that will fire with the new document (A Document
+ instance)
+ :rtype: Deferred
+ """
+
+ def create_index(self, index_name, *index_expressions):
+ """
+ Create an named index, which can then be queried for future lookups.
+ Creating an index which already exists is not an error, and is cheap.
+ Creating an index which does not match the index_expressions of the
+ existing index is an error.
+ Creating an index will block until the expressions have been evaluated
+ and the index generated.
+
+ :param index_name: A unique name which can be used as a key prefix
+ :type index_name: str
+ :param index_expressions:
+ index expressions defining the index information.
+ :type index_expressions: dict
+
+ Examples:
+
+ "fieldname", or "fieldname.subfieldname" to index alphabetically
+ sorted on the contents of a field.
+
+ "number(fieldname, width)", "lower(fieldname)"
+ """
+
+ def delete_index(self, index_name):
+ """
+ Remove a named index.
+
+ :param index_name: The name of the index we are removing
+ :type index_name: str
+ """
+
+ def list_indexes(self):
+ """
+ List the definitions of all known indexes.
+
+ :return: A list of [('index-name', ['field', 'field2'])] definitions.
+ :rtype: Deferred
+ """
+
+ def get_from_index(self, index_name, *key_values):
+ """
+ Return documents that match the keys supplied.
+
+ You must supply exactly the same number of values as have been defined
+ in the index. It is possible to do a prefix match by using '*' to
+ indicate a wildcard match. You can only supply '*' to trailing entries,
+ (eg 'val', '*', '*' is allowed, but '*', 'val', 'val' is not.)
+ It is also possible to append a '*' to the last supplied value (eg
+ 'val*', '*', '*' or 'val', 'val*', '*', but not 'val*', 'val', '*')
+
+ :param index_name: The index to query
+ :type index_name: str
+ :param key_values: values to match. eg, if you have
+ an index with 3 fields then you would have:
+ get_from_index(index_name, val1, val2, val3)
+ :type key_values: tuple
+ :return: List of [Document]
+ :rtype: list
+ """
+
+ def get_count_from_index(self, index_name, *key_values):
+ """
+ Return the count of the documents that match the keys and
+ values supplied.
+
+ :param index_name: The index to query
+ :type index_name: str
+ :param key_values: values to match. eg, if you have
+ an index with 3 fields then you would have:
+ get_from_index(index_name, val1, val2, val3)
+ :type key_values: tuple
+ :return: count.
+ :rtype: int
+ """
+
+ def get_range_from_index(self, index_name, start_value, end_value):
+ """
+ Return documents that fall within the specified range.
+
+ Both ends of the range are inclusive. For both start_value and
+ end_value, one must supply exactly the same number of values as have
+ been defined in the index, or pass None. In case of a single column
+ index, a string is accepted as an alternative for a tuple with a single
+ value. It is possible to do a prefix match by using '*' to indicate
+ a wildcard match. You can only supply '*' to trailing entries, (eg
+ 'val', '*', '*' is allowed, but '*', 'val', 'val' is not.) It is also
+ possible to append a '*' to the last supplied value (eg 'val*', '*',
+ '*' or 'val', 'val*', '*', but not 'val*', 'val', '*')
+
+ :param index_name: The index to query
+ :type index_name: str
+ :param start_values: tuples of values that define the lower bound of
+ the range. eg, if you have an index with 3 fields then you would
+ have: (val1, val2, val3)
+ :type start_values: tuple
+ :param end_values: tuples of values that define the upper bound of the
+ range. eg, if you have an index with 3 fields then you would have:
+ (val1, val2, val3)
+ :type end_values: tuple
+ :return: A deferred that will fire with a list of [Document]
+ :rtype: Deferred
+ """
+
+ def get_index_keys(self, index_name):
+ """
+ Return all keys under which documents are indexed in this index.
+
+ :param index_name: The index to query
+ :type index_name: str
+ :return:
+ A deferred that will fire with a list of tuples of indexed keys.
+ :rtype: Deferred
+ """
+
+ def get_doc_conflicts(self, doc_id):
+ """
+ Get the list of conflicts for the given document.
+
+ :param doc_id: the document id
+ :type doc_id: str
+
+ :return:
+ A deferred that will fire with a list of the document entries that
+ are conflicted.
+ :rtype: Deferred
+ """
+
+ def resolve_doc(self, doc, conflicted_doc_revs):
+ """
+ Mark a document as no longer conflicted.
+
+ :param doc: a document with the new content to be inserted.
+ :type doc: Document
+ :param conflicted_doc_revs:
+ A deferred that will fire with a list of revisions that the new
+ content supersedes.
+ :type conflicted_doc_revs: list
+ """
+
+
+class ISyncableStorage(Interface):
+ """
+ I implement methods to synchronize with a remote replica.
+ """
+ replica_uid = Attribute("The uid of the local replica")
+ syncing = Attribute(
+ "Property, True if the syncer is syncing.")
+ token = Attribute("The authentication Token.")
+
+ def sync(self):
+ """
+ Synchronize the local encrypted replica with a remote replica.
+
+ This method blocks until a syncing lock is acquired, so there are no
+ attempts of concurrent syncs from the same client replica.
+
+ :param url: the url of the target replica to sync with
+ :type url: str
+
+ :return:
+ A deferred that will fire with the local generation before the
+ synchronisation was performed.
+ :rtype: str
+ """
+
+ def stop_sync(self):
+ """
+ Stop the current syncing process.
+ """
+
+
+class ISecretsStorage(Interface):
+ """
+ I implement methods needed for initializing and accessing secrets, that are
+ synced against the Shared Recovery Database.
+ """
+ secrets_file_name = Attribute(
+ "The name of the file where the storage secrets will be stored")
+
+ # XXX this used internally from secrets, so it might be good to preserve
+ # as a public boundary with other components.
+
+ # We should also probably document its interface.
+ secrets = Attribute("A SoledadSecrets object containing access to secrets")
+
+ def change_passphrase(self, new_passphrase):
+ """
+ Change the passphrase that encrypts the storage secret.
+
+ :param new_passphrase: The new passphrase.
+ :type new_passphrase: unicode
+
+ :raise NoStorageSecret: Raised if there's no storage secret available.
+ """
diff --git a/src/leap/soledad/client/shared_db.py b/src/leap/soledad/client/shared_db.py
new file mode 100644
index 00000000..4f70c74b
--- /dev/null
+++ b/src/leap/soledad/client/shared_db.py
@@ -0,0 +1,134 @@
+# -*- coding: utf-8 -*-
+# shared_db.py
+# Copyright (C) 2013 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+A shared database for storing/retrieving encrypted key material.
+"""
+from leap.soledad.common.l2db.remote.http_database import HTTPDatabase
+
+from leap.soledad.client.auth import TokenBasedAuth
+
+
+# ----------------------------------------------------------------------------
+# Soledad shared database
+# ----------------------------------------------------------------------------
+
+# TODO could have a hierarchy of soledad exceptions.
+
+
+class NoTokenForAuth(Exception):
+ """
+ No token was found for token-based authentication.
+ """
+
+
+class Unauthorized(Exception):
+ """
+ User does not have authorization to perform task.
+ """
+
+
+class ImproperlyConfiguredError(Exception):
+ """
+ Wrong parameters in the database configuration.
+ """
+
+
+class SoledadSharedDatabase(HTTPDatabase, TokenBasedAuth):
+ """
+ This is a shared recovery database that enables users to store their
+ encryption secrets in the server and retrieve them afterwards.
+ """
+ # TODO: prevent client from messing with the shared DB.
+ # TODO: define and document API.
+
+ #
+ # Token auth methods.
+ #
+
+ def set_token_credentials(self, uuid, token):
+ """
+ Store given credentials so we can sign the request later.
+
+ :param uuid: The user's uuid.
+ :type uuid: str
+ :param token: The authentication token.
+ :type token: str
+ """
+ TokenBasedAuth.set_token_credentials(self, uuid, token)
+
+ def _sign_request(self, method, url_query, params):
+ """
+ Return an authorization header to be included in the HTTP request.
+
+ :param method: The HTTP method.
+ :type method: str
+ :param url_query: The URL query string.
+ :type url_query: str
+ :param params: A list with encoded query parameters.
+ :type param: list
+
+ :return: The Authorization header.
+ :rtype: list of tuple
+ """
+ return TokenBasedAuth._sign_request(self, method, url_query, params)
+
+ #
+ # Modified HTTPDatabase methods.
+ #
+
+ @staticmethod
+ def open_database(url, creds=None):
+ """
+ Open a Soledad shared database.
+
+ :param url: URL of the remote database.
+ :type url: str
+ :param creds: A tuple containing the authentication method and
+ credentials.
+ :type creds: tuple
+
+ :return: The shared database in the given url.
+ :rtype: SoledadSharedDatabase
+ """
+ db = SoledadSharedDatabase(url, creds=creds)
+ return db
+
+ @staticmethod
+ def delete_database(url):
+ """
+ Dummy method that prevents from deleting shared database.
+
+ :raise: This will always raise an Unauthorized exception.
+
+ :param url: The database URL.
+ :type url: str
+ """
+ raise Unauthorized("Can't delete shared database.")
+
+ def __init__(self, url, document_factory=None, creds=None):
+ """
+ Initialize database with auth token and encryption powers.
+
+ :param url: URL of the remote database.
+ :type url: str
+ :param document_factory: A factory for U1BD documents.
+ :type document_factory: u1db.Document
+ :param creds: A tuple containing the authentication method and
+ credentials.
+ :type creds: tuple
+ """
+ HTTPDatabase.__init__(self, url, document_factory, creds)
diff --git a/src/leap/soledad/client/sync.py b/src/leap/soledad/client/sync.py
new file mode 100644
index 00000000..2a927189
--- /dev/null
+++ b/src/leap/soledad/client/sync.py
@@ -0,0 +1,231 @@
+# -*- coding: utf-8 -*-
+# sync.py
+# Copyright (C) 2014 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Soledad synchronization utilities.
+"""
+import os
+
+from twisted.internet import defer
+
+from leap.soledad.common.log import getLogger
+from leap.soledad.common.l2db import errors
+from leap.soledad.common.l2db.sync import Synchronizer
+from leap.soledad.common.errors import BackendNotReadyError
+
+
+logger = getLogger(__name__)
+
+
+# we may want to collect statistics from the sync process
+DO_STATS = False
+if os.environ.get('SOLEDAD_STATS'):
+ DO_STATS = True
+
+
+class SoledadSynchronizer(Synchronizer):
+ """
+ Collect the state around synchronizing 2 U1DB replicas.
+
+ Synchronization is bi-directional, in that new items in the source are sent
+ to the target, and new items in the target are returned to the source.
+ However, it still recognizes that one side is initiating the request. Also,
+ at the moment, conflicts are only created in the source.
+
+ Also modified to allow for interrupting the synchronization process.
+ """
+ received_docs = []
+
+ def __init__(self, *args, **kwargs):
+ Synchronizer.__init__(self, *args, **kwargs)
+ if DO_STATS:
+ self.sync_phase = [0]
+ self.sync_exchange_phase = None
+
+ @defer.inlineCallbacks
+ def sync(self):
+ """
+ Synchronize documents between source and target.
+
+ :return: A deferred which will fire after the sync has finished with
+ the local generation before the synchronization was performed.
+ :rtype: twisted.internet.defer.Deferred
+ """
+
+ sync_target = self.sync_target
+ self.received_docs = []
+
+ # ---------- phase 1: get sync info from server ----------------------
+ if DO_STATS:
+ self.sync_phase[0] += 1
+ self.sync_exchange_phase = self.sync_target.sync_exchange_phase
+ # --------------------------------------------------------------------
+
+ # get target identifier, its current generation,
+ # and its last-seen database generation for this source
+ ensure_callback = None
+ try:
+ (self.target_replica_uid, target_gen, target_trans_id,
+ target_my_gen, target_my_trans_id) = yield \
+ sync_target.get_sync_info(self.source._replica_uid)
+ except (errors.DatabaseDoesNotExist, BackendNotReadyError) as e:
+ logger.warn("Database isn't ready on server. Will be created.")
+ logger.warn("Reason: %s" % e.__class__)
+ self.target_replica_uid = None
+ target_gen, target_trans_id = 0, ''
+ target_my_gen, target_my_trans_id = 0, ''
+
+ logger.debug("target replica uid: %s" % self.target_replica_uid)
+ logger.debug("target generation: %d" % target_gen)
+ logger.debug("target trans id: %s" % target_trans_id)
+ logger.debug("target my gen: %d" % target_my_gen)
+ logger.debug("target my trans_id: %s" % target_my_trans_id)
+ logger.debug("source replica_uid: %s" % self.source._replica_uid)
+
+ # make sure we'll have access to target replica uid once it exists
+ if self.target_replica_uid is None:
+
+ def ensure_callback(replica_uid):
+ self.target_replica_uid = replica_uid
+
+ # make sure we're not syncing one replica with itself
+ if self.target_replica_uid == self.source._replica_uid:
+ raise errors.InvalidReplicaUID
+
+ # validate the info the target has about the source replica
+ self.source.validate_gen_and_trans_id(
+ target_my_gen, target_my_trans_id)
+
+ # ---------- phase 2: what's changed ---------------------------------
+ if DO_STATS:
+ self.sync_phase[0] += 1
+ # --------------------------------------------------------------------
+
+ # what's changed since that generation and this current gen
+ my_gen, _, changes = self.source.whats_changed(target_my_gen)
+ logger.debug("there are %d documents to send" % len(changes))
+
+ # get source last-seen database generation for the target
+ if self.target_replica_uid is None:
+ target_last_known_gen, target_last_known_trans_id = 0, ''
+ else:
+ target_last_known_gen, target_last_known_trans_id = \
+ self.source._get_replica_gen_and_trans_id(
+ self.target_replica_uid)
+ logger.debug(
+ "last known target gen: %d" % target_last_known_gen)
+ logger.debug(
+ "last known target trans_id: %s" % target_last_known_trans_id)
+
+ # validate transaction ids
+ if not changes and target_last_known_gen == target_gen:
+ if target_trans_id != target_last_known_trans_id:
+ raise errors.InvalidTransactionId
+ defer.returnValue(my_gen)
+
+ # ---------- phase 3: sync exchange ----------------------------------
+ if DO_STATS:
+ self.sync_phase[0] += 1
+ # --------------------------------------------------------------------
+
+ docs_by_generation = self._docs_by_gen_from_changes(changes)
+
+ # exchange documents and try to insert the returned ones with
+ # the target, return target synced-up-to gen.
+ new_gen, new_trans_id = yield sync_target.sync_exchange(
+ docs_by_generation, self.source._replica_uid,
+ target_last_known_gen, target_last_known_trans_id,
+ self._insert_doc_from_target, ensure_callback=ensure_callback)
+ ids_sent = [doc_id for doc_id, _, _ in changes]
+ logger.debug("target gen after sync: %d" % new_gen)
+ logger.debug("target trans_id after sync: %s" % new_trans_id)
+ if hasattr(self.source, 'commit'): # sqlcipher backend speed up
+ self.source.commit() # insert it all in a single transaction
+ info = {
+ "target_replica_uid": self.target_replica_uid,
+ "new_gen": new_gen,
+ "new_trans_id": new_trans_id,
+ "my_gen": my_gen
+ }
+ self._syncing_info = info
+
+ # ---------- phase 4: complete sync ----------------------------------
+ if DO_STATS:
+ self.sync_phase[0] += 1
+ # --------------------------------------------------------------------
+
+ yield self.complete_sync()
+
+ _, _, changes = self.source.whats_changed(target_my_gen)
+ changed_doc_ids = [doc_id for doc_id, _, _ in changes]
+
+ just_received = list(set(changed_doc_ids) - set(ids_sent))
+ self.received_docs = just_received
+
+ # ---------- phase 5: sync is over -----------------------------------
+ if DO_STATS:
+ self.sync_phase[0] += 1
+ # --------------------------------------------------------------------
+
+ defer.returnValue(my_gen)
+
+ def _docs_by_gen_from_changes(self, changes):
+ docs_by_generation = []
+ kwargs = {'include_deleted': True}
+ for doc_id, gen, trans in changes:
+ get_doc = (self.source.get_doc, (doc_id,), kwargs)
+ docs_by_generation.append((get_doc, gen, trans))
+ return docs_by_generation
+
+ def complete_sync(self):
+ """
+ Last stage of the synchronization:
+ (a) record last known generation and transaction uid for the remote
+ replica, and
+ (b) make target aware of our current reached generation.
+
+ :return: A deferred which will fire when the sync has been completed.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ logger.debug("completing deferred last step in sync...")
+
+ # record target synced-up-to generation including applying what we
+ # sent
+ info = self._syncing_info
+ self.source._set_replica_gen_and_trans_id(
+ info["target_replica_uid"], info["new_gen"], info["new_trans_id"])
+
+ # if gapless record current reached generation with target
+ return self._record_sync_info_with_the_target(info["my_gen"])
+
+ def _record_sync_info_with_the_target(self, start_generation):
+ """
+ Store local replica metadata in server.
+
+ :param start_generation: The local generation when the sync was
+ started.
+ :type start_generation: int
+
+ :return: A deferred which will fire when the operation has been
+ completed.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ cur_gen, trans_id = self.source._get_generation_info()
+ if (cur_gen == start_generation + self.num_inserted and
+ self.num_inserted > 0):
+ return self.sync_target.record_sync_info(
+ self.source._replica_uid, cur_gen, trans_id)
+ return defer.succeed(None)