diff options
| -rw-r--r-- | client/src/leap/soledad/client/target.py | 1604 | ||||
| -rw-r--r-- | common/src/leap/soledad/common/couch.py | 45 | ||||
| -rw-r--r-- | common/src/leap/soledad/common/ddocs/syncs/updates/put.js | 118 | ||||
| -rw-r--r-- | server/src/leap/soledad/server/sync.py | 25 | 
4 files changed, 1342 insertions, 450 deletions
| diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py index 968545b6..28edd027 100644 --- a/client/src/leap/soledad/client/target.py +++ b/client/src/leap/soledad/client/target.py @@ -14,458 +14,469 @@  #  # You should have received a copy of the GNU General Public License  # along with this program. If not, see <http://www.gnu.org/licenses/>. + +  """  A U1DB backend for encrypting data before sending to server and decrypting  after receiving.  """ -import binascii + +  import cStringIO  import gzip -import hashlib -import hmac  import logging +import re  import urllib  import threading +import urlparse -import simplejson as json +from collections import defaultdict  from time import sleep  from uuid import uuid4 +from contextlib import contextmanager -from u1db.remote import utils, http_errors -from u1db.errors import BrokenSyncStream +import simplejson as json +from taskthread import TimerTask  from u1db import errors +from u1db.remote import utils, http_errors  from u1db.remote.http_target import HTTPSyncTarget -from u1db.remote.http_client import _encode_query_parameter - +from u1db.remote.http_client import _encode_query_parameter, HTTPClientBase +from zope.proxy import ProxyBase +from zope.proxy import sameProxiedObjects, setProxiedObject  from leap.soledad.common import soledad_assert -from leap.soledad.common.crypto import ( -    EncryptionSchemes, -    UnknownEncryptionScheme, -    MacMethods, -    UnknownMacMethod, -    WrongMac, -    ENC_JSON_KEY, -    ENC_SCHEME_KEY, -    ENC_METHOD_KEY, -    ENC_IV_KEY, -    MAC_KEY, -    MAC_METHOD_KEY, -)  from leap.soledad.common.document import SoledadDocument  from leap.soledad.client.auth import TokenBasedAuth -from leap.soledad.client.crypto import ( -    EncryptionMethods, -    UnknownEncryptionMethod, -) -from leap.soledad.client.events import ( -    SOLEDAD_SYNC_SEND_STATUS, -    SOLEDAD_SYNC_RECEIVE_STATUS, -    signal, -) +from leap.soledad.client.crypto import is_symmetrically_encrypted +from leap.soledad.client.crypto import encrypt_doc, decrypt_doc +from leap.soledad.client.crypto import SyncEncrypterPool, SyncDecrypterPool +from leap.soledad.client.events import SOLEDAD_SYNC_SEND_STATUS +from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS +from leap.soledad.client.events import signal -logger = logging.getLogger(__name__) -# -# Exceptions -# +logger = logging.getLogger(__name__) -class DocumentNotEncrypted(Exception): +def _gunzip(data):      """ -    Raised for failures in document encryption. +    Uncompress data that is gzipped. + +    :param data: gzipped data +    :type data: basestring      """ -    pass +    buffer = cStringIO.StringIO() +    buffer.write(data) +    buffer.seek(0) +    try: +        data = gzip.GzipFile(mode='r', fileobj=buffer).read() +    except Exception: +        logger.warning("Error while decrypting gzipped data") +    buffer.close() +    return data -# -# Crypto utilities for a SoledadDocument. -# +class PendingReceivedDocsSyncError(Exception): +    pass -def mac_doc(crypto, doc_id, doc_rev, ciphertext, mac_method): +class DocumentSyncerThread(threading.Thread):      """ -    Calculate a MAC for C{doc} using C{ciphertext}. - -    Current MAC method used is HMAC, with the following parameters: - -        * key: sha256(storage_secret, doc_id) -        * msg: doc_id + doc_rev + ciphertext -        * digestmod: sha256 - -    :param crypto: A SoledadCryto instance used to perform the encryption. -    :type crypto: leap.soledad.crypto.SoledadCrypto -    :param doc_id: The id of the document. -    :type doc_id: str -    :param doc_rev: The revision of the document. -    :type doc_rev: str -    :param ciphertext: The content of the document. -    :type ciphertext: str -    :param mac_method: The MAC method to use. -    :type mac_method: str - -    :return: The calculated MAC. -    :rtype: str +    A thread that knowns how to either send or receive a document during the +    sync process.      """ -    if mac_method == MacMethods.HMAC: -        return hmac.new( -            crypto.doc_mac_key(doc_id), -            str(doc_id) + str(doc_rev) + ciphertext, -            hashlib.sha256).digest() -    # raise if we do not know how to handle this MAC method -    raise UnknownMacMethod('Unknown MAC method: %s.' % mac_method) +    def __init__(self, doc_syncer, release_method, failed_method, +            idx, total, last_request_lock=None, last_callback_lock=None): +        """ +        Initialize a new syncer thread. + +        :param doc_syncer: A document syncer. +        :type doc_syncer: HTTPDocumentSyncer +        :param release_method: A method to be called when finished running. +        :type release_method: callable(DocumentSyncerThread) +        :param failed_method: A method to be called when we failed. +        :type failed_method: callable(DocumentSyncerThread) +        :param idx: The index count of the current operation. +        :type idx: int +        :param total: The total number of operations. +        :type total: int +        :param last_request_lock: A lock to wait for before actually performing +                                  the request. +        :type last_request_lock: threading.Lock +        :param last_callback_lock: A lock to wait for before actually running +                                  the success callback. +        :type last_callback_lock: threading.Lock +        """ +        threading.Thread.__init__(self) +        self._doc_syncer = doc_syncer +        self._release_method = release_method +        self._failed_method = failed_method +        self._idx = idx +        self._total = total +        self._last_request_lock = last_request_lock +        self._last_callback_lock = last_callback_lock +        self._response = None +        self._exception = None +        self._result = None +        self._success = False +        # a lock so we can signal when we're finished +        self._request_lock = threading.Lock() +        self._request_lock.acquire() +        self._callback_lock = threading.Lock() +        self._callback_lock.acquire() +        # make thread interruptable +        self._stopped = None +        self._stop_lock = threading.Lock() -def encrypt_doc(crypto, doc): -    """ -    Encrypt C{doc}'s content. - -    Encrypt doc's contents using AES-256 CTR mode and return a valid JSON -    string representing the following: - -        { -            ENC_JSON_KEY: '<encrypted doc JSON string>', -            ENC_SCHEME_KEY: 'symkey', -            ENC_METHOD_KEY: EncryptionMethods.AES_256_CTR, -            ENC_IV_KEY: '<the initial value used to encrypt>', -            MAC_KEY: '<mac>' -            MAC_METHOD_KEY: 'hmac' -        } - -    :param crypto: A SoledadCryto instance used to perform the encryption. -    :type crypto: leap.soledad.crypto.SoledadCrypto -    :param doc: The document with contents to be encrypted. -    :type doc: SoledadDocument - -    :return: The JSON serialization of the dict representing the encrypted -        content. -    :rtype: str -    """ -    soledad_assert(doc.is_tombstone() is False) -    # encrypt content using AES-256 CTR mode -    iv, ciphertext = crypto.encrypt_sym( -        str(doc.get_json()),  # encryption/decryption routines expect str -        crypto.doc_passphrase(doc.doc_id), -        method=EncryptionMethods.AES_256_CTR) -    # Return a representation for the encrypted content. In the following, we -    # convert binary data to hexadecimal representation so the JSON -    # serialization does not complain about what it tries to serialize. -    hex_ciphertext = binascii.b2a_hex(ciphertext) -    return json.dumps({ -        ENC_JSON_KEY: hex_ciphertext, -        ENC_SCHEME_KEY: EncryptionSchemes.SYMKEY, -        ENC_METHOD_KEY: EncryptionMethods.AES_256_CTR, -        ENC_IV_KEY: iv, -        # store the mac as hex. -        MAC_KEY: binascii.b2a_hex( -            mac_doc( -                crypto, doc.doc_id, doc.rev, -                ciphertext, -                MacMethods.HMAC)), -        MAC_METHOD_KEY: MacMethods.HMAC, -    }) - - -def decrypt_doc(crypto, doc): -    """ -    Decrypt C{doc}'s content. +    def run(self): +        """ +        Run the HTTP request and store results. -    Return the JSON string representation of the document's decrypted content. +        This method will block and wait for an eventual previous operation to +        finish before actually performing the request. It also traps any +        exception and register any failure with the request. +        """ +        with self._stop_lock: +            if self._stopped is None: +                self._stopped = False +            else: +                return + +        # eventually wait for the previous thread to finish +        if self._last_request_lock is not None: +            self._last_request_lock.acquire() + +        # bail out in case we've been interrupted +        if self.stopped is True: +            return + +        try: +            self._response = self._doc_syncer.do_request() +            self._request_lock.release() + +            # run success callback +            if self._doc_syncer.success_callback is not None: + +                # eventually wait for callback lock release +                if self._last_callback_lock is not None: +                    self._last_callback_lock.acquire() + +                # bail out in case we've been interrupted +                if self._stopped is True: +                    return + +                self._result = self._doc_syncer.success_callback( +                    self._idx, self._total, self._response) +                self._success = True +                doc_syncer = self._doc_syncer +                self._release_method(self, doc_syncer) +                self._doc_syncer = None +                # let next thread executed its callback +                self._callback_lock.release() + +        # trap any exception and signal failure +        except Exception as e: +            self._exception = e +            self._success = False +            # run failure callback +            if self._doc_syncer.failure_callback is not None: + +                # eventually wait for callback lock release +                if self._last_callback_lock is not None: +                    self._last_callback_lock.acquire() + +                # bail out in case we've been interrupted +                if self.stopped is True: +                    return + +                self._doc_syncer.failure_callback( +                    self._idx, self._total, self._exception) + +                self._failed_method(self) +                # we do not release the callback lock here because we +                # failed and so we don't want other threads to succeed. -    The content of the document should have the following structure: +    @property +    def doc_syncer(self): +        return self._doc_syncer -        { -            ENC_JSON_KEY: '<enc_blob>', -            ENC_SCHEME_KEY: '<enc_scheme>', -            ENC_METHOD_KEY: '<enc_method>', -            ENC_IV_KEY: '<initial value used to encrypt>',  # (optional) -            MAC_KEY: '<mac>' -            MAC_METHOD_KEY: 'hmac' -        } +    @property +    def response(self): +        return self._response -    C{enc_blob} is the encryption of the JSON serialization of the document's -    content. For now Soledad just deals with documents whose C{enc_scheme} is -    EncryptionSchemes.SYMKEY and C{enc_method} is -    EncryptionMethods.AES_256_CTR. +    @property +    def exception(self): +        return self._exception -    :param crypto: A SoledadCryto instance to perform the encryption. -    :type crypto: leap.soledad.crypto.SoledadCrypto -    :param doc: The document to be decrypted. -    :type doc: SoledadDocument +    @property +    def callback_lock(self): +        return self._callback_lock -    :return: The JSON serialization of the decrypted content. -    :rtype: str -    """ -    soledad_assert(doc.is_tombstone() is False) -    soledad_assert(ENC_JSON_KEY in doc.content) -    soledad_assert(ENC_SCHEME_KEY in doc.content) -    soledad_assert(ENC_METHOD_KEY in doc.content) -    soledad_assert(MAC_KEY in doc.content) -    soledad_assert(MAC_METHOD_KEY in doc.content) -    # verify MAC -    ciphertext = binascii.a2b_hex(  # content is stored as hex. -        doc.content[ENC_JSON_KEY]) -    mac = mac_doc( -        crypto, doc.doc_id, doc.rev, -        ciphertext, -        doc.content[MAC_METHOD_KEY]) -    # we compare mac's hashes to avoid possible timing attacks that might -    # exploit python's builtin comparison operator behaviour, which fails -    # immediatelly when non-matching bytes are found. -    doc_mac_hash = hashlib.sha256( -        binascii.a2b_hex(  # the mac is stored as hex -            doc.content[MAC_KEY])).digest() -    calculated_mac_hash = hashlib.sha256(mac).digest() -    if doc_mac_hash != calculated_mac_hash: -        raise WrongMac('Could not authenticate document\'s contents.') -    # decrypt doc's content -    enc_scheme = doc.content[ENC_SCHEME_KEY] -    plainjson = None -    if enc_scheme == EncryptionSchemes.SYMKEY: -        enc_method = doc.content[ENC_METHOD_KEY] -        if enc_method == EncryptionMethods.AES_256_CTR: -            soledad_assert(ENC_IV_KEY in doc.content) -            plainjson = crypto.decrypt_sym( -                ciphertext, -                crypto.doc_passphrase(doc.doc_id), -                method=enc_method, -                iv=doc.content[ENC_IV_KEY]) -        else: -            raise UnknownEncryptionMethod(enc_method) -    else: -        raise UnknownEncryptionScheme(enc_scheme) -    return plainjson +    @property +    def request_lock(self): +        return self._request_lock +    @property +    def success(self): +        return self._success -def _gunzip(data): -    """ -    Uncompress data that is gzipped. +    def stop(self): +        with self._stop_lock: +            self._stopped = True -    :param data: gzipped data -    :type data: basestring -    """ -    buffer = cStringIO.StringIO() -    buffer.write(data) -    buffer.seek(0) -    try: -        data = gzip.GzipFile(mode='r', fileobj=buffer).read() -    except Exception: -        logger.warning("Error while decrypting gzipped data") -    buffer.close() -    return data +    @property +    def stopped(self): +        with self._stop_lock: +            return self._stopped +    @property +    def result(self): +        return self._result -# -# SoledadSyncTarget -# -class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): +class DocumentSyncerPool(object):      """ -    A SyncTarget that encrypts data before sending and decrypts data after -    receiving. +    A pool of reusable document syncers.      """ -    # -    # Token auth methods. -    # +    POOL_SIZE = 10 +    """ +    The maximum amount of syncer threads running at the same time. +    """ -    def set_token_credentials(self, uuid, token): +    def __init__(self, raw_url, raw_creds, query_string, headers, +            ensure_callback):          """ -        Store given credentials so we can sign the request later. +        Initialize the document syncer pool. + +        :param raw_url: The complete raw URL for the HTTP request. +        :type raw_url: str +        :param raw_creds: The credentials for the HTTP request. +        :type raw_creds: dict +        :param query_string: The query string for the HTTP request. +        :type query_string: str +        :param headers: The headers for the HTTP request. +        :type headers: dict +        :param ensure_callback: A callback to ensure we have the correct +                                target_replica_uid, if it was just created. +        :type ensure_callback: callable -        :param uuid: The user's uuid. -        :type uuid: str -        :param token: The authentication token. -        :type token: str          """ -        TokenBasedAuth.set_token_credentials(self, uuid, token) - -    def _sign_request(self, method, url_query, params): +        # save syncer params +        self._raw_url = raw_url +        self._raw_creds = raw_creds +        self._query_string = query_string +        self._headers = headers +        self._ensure_callback = ensure_callback +        # pool attributes +        self._failures = False +        self._semaphore_pool = threading.BoundedSemaphore( +            DocumentSyncerPool.POOL_SIZE) +        self._pool_access_lock = threading.Lock() +        self._doc_syncers = [] +        self._threads = [] + +    def new_syncer_thread(self, idx, total, last_request_lock=None, +            last_callback_lock=None):          """ -        Return an authorization header to be included in the HTTP request. +        Yield a new document syncer thread. + +        :param idx: The index count of the current operation. +        :type idx: int +        :param total: The total number of operations. +        :type total: int +        :param last_request_lock: A lock to wait for before actually performing +                                  the request. +        :type last_request_lock: threading.Lock +        :param last_callback_lock: A lock to wait for before actually running +                                   the success callback. +        :type last_callback_lock: threading.Lock +        """ +        t = None +        # wait for available threads +        self._semaphore_pool.acquire() +        with self._pool_access_lock: +            if self._failures is True: +                return None +            # get a syncer +            doc_syncer = self._get_syncer() +            # we rely on DocumentSyncerThread.run() to release the lock using +            # self.release_syncer so we can launch a new thread. +            t = DocumentSyncerThread( +                doc_syncer, self.release_syncer, self.cancel_threads, +                idx, total, +                last_request_lock=last_request_lock, +                last_callback_lock=last_callback_lock) +            self._threads.append(t) +            return t + +    def _failed(self): +        with self._pool_access_lock: +            self._failures = True -        :param method: The HTTP method. -        :type method: str -        :param url_query: The URL query string. -        :type url_query: str -        :param params: A list with encoded query parameters. -        :type param: list +    @property +    def failures(self): +        return self._failures -        :return: The Authorization header. -        :rtype: list of tuple +    def _get_syncer(self):          """ -        return TokenBasedAuth._sign_request(self, method, url_query, params) - -    # -    # Modified HTTPSyncTarget methods. -    # +        Get a document syncer from the pool. -    @staticmethod -    def connect(url, crypto=None): -        return SoledadSyncTarget(url, crypto=crypto) +        This method will create a new syncer whenever there is no syncer +        available in the pool. -    def __init__(self, url, creds=None, crypto=None): +        :return: A syncer. +        :rtype: HTTPDocumentSyncer          """ -        Initialize the SoledadSyncTarget. - -        :param url: The url of the target replica to sync with. -        :type url: str -        :param creds: optional dictionary giving credentials. -            to authorize the operation with the server. -        :type creds: dict -        :param soledad: An instance of Soledad so we can encrypt/decrypt -            document contents when syncing. -        :type soledad: soledad.Soledad +        syncer = None +        # get an available syncer or create a new one +        try: +            syncer = self._doc_syncers.pop() +        except IndexError: +            syncer = HTTPDocumentSyncer( +                self._raw_url, self._raw_creds, self._query_string, +                self._headers, self._ensure_callback) +        return syncer + +    def release_syncer(self, syncer_thread, doc_syncer):          """ -        HTTPSyncTarget.__init__(self, url, creds) -        self._crypto = crypto -        self._stopped = True -        self._stop_lock = threading.Lock() +        Return a syncer to the pool after use and check for any failures. -    def _init_post_request(self, url, action, headers, content_length): +        :param syncer: The syncer to be returned to the pool. +        :type syncer: HTTPDocumentSyncer          """ -        Initiate a syncing POST request. +        with self._pool_access_lock: +            self._doc_syncers.append(doc_syncer) +            if syncer_thread.success is True: +                self._threads.remove(syncer_thread) +            self._semaphore_pool.release() -        :param url: The syncing URL. -        :type url: str -        :param action: The syncing action, either 'get' or 'receive'. -        :type action: str -        :param headers: The initial headers to be sent on this request. -        :type headers: dict -        :param content_length: The content-length of the request. -        :type content_length: int +    def cancel_threads(self, calling_thread):          """ -        self._conn.putrequest('POST', url) -        self._conn.putheader( -            'content-type', 'application/x-soledad-sync-%s' % action) -        for header_name, header_value in headers: -            self._conn.putheader(header_name, header_value) -        self._conn.putheader('accept-encoding', 'gzip') -        self._conn.putheader('content-length', str(content_length)) -        self._conn.endheaders() - -    def _get_remote_docs(self, url, last_known_generation, last_known_trans_id, -                         headers, return_doc_cb, ensure_callback, sync_id): +        Stop all threads in the pool.          """ -        Fetch sync documents from the remote database and insert them in the -        local database. +        stopped = [] +        # stop all threads +        logger.warning("Soledad sync: cancelling sync threads.") +        with self._pool_access_lock: +            self._failures = True +            while self._threads: +                t = self._threads.pop(0) +                t.stop() +                self._doc_syncers.append(t.doc_syncer) +                stopped.append(t) +        # release locks and join +        while stopped: +            t = stopped.pop(0) +            t.request_lock.acquire(False)   # just in case +            t.request_lock.release() +            t.callback_lock.acquire(False)  # just in case +            t.callback_lock.release() +            if t is not calling_thread and t.is_alive(): +                t.join() + +    def cleanup(self): +        """ +        Close and remove any syncers from the pool. +        """ +        with self._pool_access_lock: +            while self._doc_syncers: +                syncer = self._doc_syncers.pop() +                syncer.close() +                del syncer -        If an incoming document's encryption scheme is equal to -        EncryptionSchemes.SYMKEY, then this method will decrypt it with -        Soledad's symmetric key. -        :param url: The syncing URL. -        :type url: str -        :param last_known_generation: Target's last known generation. -        :type last_known_generation: int -        :param last_known_trans_id: Target's last known transaction id. -        :type last_known_trans_id: str -        :param headers: The headers of the HTTP request. +class HTTPDocumentSyncer(HTTPClientBase, TokenBasedAuth): + +    def __init__(self, raw_url, creds, query_string, headers, ensure_callback): +        """ +        Initialize the client. + +        :param raw_url: The raw URL of the target HTTP server. +        :type raw_url: str +        :param creds: Authentication credentials. +        :type creds: dict +        :param query_string: The query string for the HTTP request. +        :type query_string: str +        :param headers: The headers for the HTTP request.          :type headers: dict -        :param return_doc_cb: A callback to insert docs from target. -        :type return_doc_cb: callable          :param ensure_callback: A callback to ensure we have the correct                                  target_replica_uid, if it was just created.          :type ensure_callback: callable +        """ +        HTTPClientBase.__init__(self, raw_url, creds=creds) +        # info needed to perform the request +        self._query_string = query_string +        self._headers = headers +        self._ensure_callback = ensure_callback +        # the actual request method +        self._request_method = None +        self._success_callback = None +        self._failure_callback = None +        # storage of info returned by the request +        self._success = None +        self._exception = None + +    def _reset(self): +        """ +        Reset this document syncer so we can reuse it. +        """ +        self._request_method = None +        self._success_callback = None +        self._failure_callback = None +        self._request_method = None +        self._success = None +        self._exception = None + +    def set_request_method(self, method, *args, **kwargs): +        """ +        Set the actual method to perform the request. -        :raise BrokenSyncStream: If C{data} is malformed. +        :param method: Either 'get' or 'put'. +        :type method: str +        :param args: Arguments for the request method. +        :type args: list +        :param kwargs: Keyworded arguments for the request method. +        :type kwargs: dict +        """ +        self._reset() +        # resolve request method +        if method is 'get': +            self._request_method = self._get_doc +        elif method is 'put': +            self._request_method = self._put_doc +        else: +            raise Exception +        # store request method args +        self._args = args +        self._kwargs = kwargs -        :return: A dictionary representing the first line of the response got -            from remote replica. -        :rtype: list of str -        """ - -        def _post_get_doc(received): -            """ -            Get a sync document from server by means of a POST request. - -            :param received: The number of documents already received in the -                             current sync session. -            :type received: int -            """ -            entries = ['['] -            size = 1 -            # add remote replica metadata to the request -            size += self._prepare( -                '', entries, -                last_known_generation=last_known_generation, -                last_known_trans_id=last_known_trans_id, -                sync_id=sync_id, -                ensure=ensure_callback is not None) -            # inform server of how many documents have already been received -            size += self._prepare( -                ',', entries, received=received) -            entries.append('\r\n]') -            size += len(entries[-1]) -            # send headers -            self._init_post_request(url, 'get', headers, size) -            # get document -            for entry in entries: -                self._conn.send(entry) -            return self._response() - -        number_of_changes = None -        received = 0 +    def set_success_callback(self, callback): +        self._success_callback = callback -        new_generation = last_known_generation -        new_transaction_id = last_known_trans_id -        while number_of_changes is None or received < number_of_changes: -            # bail out if sync process was interrupted -            if self.stopped is True: -                return last_known_generation, last_known_trans_id -            # try to fetch one document from target -            data, _ = _post_get_doc(received) -            # decode incoming stream -            parts = data.splitlines() -            if not parts or parts[0] != '[' or parts[-1] != ']': -                raise BrokenSyncStream -            data = parts[1:-1] -            # decode metadata -            line, comma = utils.check_and_strip_comma(data[0]) -            metadata = None -            try: -                metadata = json.loads(line) -                soledad_assert('number_of_changes' in metadata) -                soledad_assert('new_generation' in metadata) -                soledad_assert('new_transaction_id' in metadata) -                number_of_changes = metadata['number_of_changes'] -                new_generation = metadata['new_generation'] -                new_transaction_id = metadata['new_transaction_id'] -            except json.JSONDecodeError, AssertionError: -                raise BrokenSyncStream -            # make sure we have replica_uid from fresh new dbs -            if ensure_callback and 'replica_uid' in metadata: -                ensure_callback(metadata['replica_uid']) -            # bail out if there are no documents to be received -            if number_of_changes == 0: -                break -            # decrypt incoming document and insert into local database -            entry = None -            try: -                entry = json.loads(data[1]) -            except IndexError: -                raise BrokenSyncStream -            # ------------------------------------------------------------- -            # symmetric decryption of document's contents -            # ------------------------------------------------------------- -            # if arriving content was symmetrically encrypted, we decrypt -            # it. -            doc = SoledadDocument( -                entry['id'], entry['rev'], entry['content']) -            if doc.content and ENC_SCHEME_KEY in doc.content: -                if doc.content[ENC_SCHEME_KEY] == \ -                        EncryptionSchemes.SYMKEY: -                    doc.set_json(decrypt_doc(self._crypto, doc)) -            # ------------------------------------------------------------- -            # end of symmetric decryption -            # ------------------------------------------------------------- -            return_doc_cb(doc, entry['gen'], entry['trans_id']) -            received += 1 -            signal( -                SOLEDAD_SYNC_RECEIVE_STATUS, -                "%d/%d" % -                (received, number_of_changes)) -        return new_generation, new_transaction_id +    def set_failure_callback(self, callback): +        self._failure_callback = callback + +    @property +    def success_callback(self): +        return self._success_callback + +    @property +    def failure_callback(self): +        return self._failure_callback + +    def do_request(self): +        """ +        Actually perform the request. + +        :return: The body and headers of the response. +        :rtype: tuple +        """ +        self._ensure_connection() +        args = self._args +        kwargs = self._kwargs +        return self._request_method(*args, **kwargs)      def _request(self, method, url_parts, params=None, body=None,                   content_type=None): @@ -482,6 +493,14 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):          :type body: str          :param content-type: The content-type of the request.          :type content-type: str + +        :return: The body and headers of the response. +        :rtype: tuple + +        :raise errors.Unavailable: Raised after a number of unsuccesful +                                   request attempts. +        :raise Exception: Raised for any other exception ocurring during the +                          request.          """          self._ensure_connection() @@ -566,14 +585,485 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):          :type entries: list          :param dic: The data to be included in this entry.          :type dic: dict + +        :return: The size of the prepared entry. +        :rtype: int          """          entry = comma + '\r\n' + json.dumps(dic)          entries.append(entry)          return len(entry) -    def sync_exchange(self, docs_by_generations, source_replica_uid, -                      last_known_generation, last_known_trans_id, -                      return_doc_cb, ensure_callback=None): +    def _init_post_request(self, action, content_length): +        """ +        Initiate a syncing POST request. + +        :param url: The syncing URL. +        :type url: str +        :param action: The syncing action, either 'get' or 'receive'. +        :type action: str +        :param headers: The initial headers to be sent on this request. +        :type headers: dict +        :param content_length: The content-length of the request. +        :type content_length: int +        """ +        self._conn.putrequest('POST', self._query_string) +        self._conn.putheader( +            'content-type', 'application/x-soledad-sync-%s' % action) +        for header_name, header_value in self._headers: +            self._conn.putheader(header_name, header_value) +        self._conn.putheader('accept-encoding', 'gzip') +        self._conn.putheader('content-length', str(content_length)) +        self._conn.endheaders() + +    def _get_doc(self, received, sync_id, last_known_generation, +            last_known_trans_id): +        """ +        Get a sync document from server by means of a POST request. + +        :param received: The number of documents already received in the +                         current sync session. +        :type received: int +        :param sync_id: The id for the current sync session. +        :type sync_id: str +        :param last_known_generation: Target's last known generation. +        :type last_known_generation: int +        :param last_known_trans_id: Target's last known transaction id. +        :type last_known_trans_id: str + +        :return: The body and headers of the response. +        :rtype: tuple +        """ +        entries = ['['] +        size = 1 +        # add remote replica metadata to the request +        size += self._prepare( +            '', entries, +            last_known_generation=last_known_generation, +            last_known_trans_id=last_known_trans_id, +            sync_id=sync_id, +            ensure=self._ensure_callback is not None) +        # inform server of how many documents have already been received +        size += self._prepare( +            ',', entries, received=received) +        entries.append('\r\n]') +        size += len(entries[-1]) +        # send headers +        self._init_post_request('get', size) +        # get document +        for entry in entries: +            self._conn.send(entry) +        return self._response() + +    def _put_doc(self, sync_id, last_known_generation, last_known_trans_id, +            id, rev, content, gen, trans_id, number_of_docs): +        """ +        Put a sync document on server by means of a POST request. + +        :param sync_id: The id for the current sync session. +        :type sync_id: str +        :param last_known_generation: Target's last known generation. +        :type last_known_generation: int +        :param last_known_trans_id: Target's last known transaction id. +        :type last_known_trans_id: str +        :param id: The document id. +        :type id: str +        :param rev: The document revision. +        :type rev: str +        :param content: The serialized document content. +        :type content: str +        :param gen: The generation of the modification of the document. +        :type gen: int +        :param trans_id: The transaction id of the modification of the +                         document. +        :type trans_id: str +        :param number_of_docs: The total amount of documents sent on this sync +                               session. +        :type number_of_docs: int + +        :return: The body and headers of the response. +        :rtype: tuple +        """ +        # prepare to send the document +        entries = ['['] +        size = 1 +        # add remote replica metadata to the request +        size += self._prepare( +            '', entries, +            last_known_generation=last_known_generation, +            last_known_trans_id=last_known_trans_id, +            sync_id=sync_id, +            ensure=self._ensure_callback is not None) +        # add the document to the request +        size += self._prepare( +            ',', entries, +            id=id, rev=rev, content=content, gen=gen, trans_id=trans_id, +            number_of_docs=number_of_docs) +        entries.append('\r\n]') +        size += len(entries[-1]) +        # send headers +        self._init_post_request('put', size) +        # send document +        for entry in entries: +            self._conn.send(entry) +        return self._response() + +    def _sign_request(self, method, url_query, params): +        """ +        Return an authorization header to be included in the HTTP request. + +        :param method: The HTTP method. +        :type method: str +        :param url_query: The URL query string. +        :type url_query: str +        :param params: A list with encoded query parameters. +        :type param: list + +        :return: The Authorization header. +        :rtype: list of tuple +        """ +        return TokenBasedAuth._sign_request(self, method, url_query, params) + +    def set_token_credentials(self, uuid, token): +        """ +        Store given credentials so we can sign the request later. + +        :param uuid: The user's uuid. +        :type uuid: str +        :param token: The authentication token. +        :type token: str +        """ +        TokenBasedAuth.set_token_credentials(self, uuid, token) + + +class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): +    """ +    A SyncTarget that encrypts data before sending and decrypts data after +    receiving. + +    Normally encryption will have been written to the sync database upon +    document modification. The sync database is also used to write temporarily +    the parsed documents that the remote send us, before being decrypted and +    written to the main database. +    """ + +    # will later keep a reference to the insert-doc callback +    # passed to sync_exchange +    _insert_doc_cb = defaultdict(lambda: ProxyBase(None)) + +    """ +    Period of recurrence of the periodic decrypting task, in seconds. +    """ +    DECRYPT_TASK_PERIOD = 0.5 + +    # +    # Modified HTTPSyncTarget methods. +    # + +    def __init__(self, url, source_replica_uid=None, creds=None, crypto=None, +            sync_db=None, sync_db_write_lock=None): +        """ +        Initialize the SoledadSyncTarget. + +        :param source_replica_uid: The source replica uid which we use when +                                   deferring decryption. +        :type source_replica_uid: str +        :param url: The url of the target replica to sync with. +        :type url: str +        :param creds: Optional dictionary giving credentials. +                      to authorize the operation with the server. +        :type creds: dict +        :param crypto: An instance of SoledadCrypto so we can encrypt/decrypt +                        document contents when syncing. +        :type crypto: soledad.crypto.SoledadCrypto +        :param sync_db: Optional. handler for the db with the symmetric +                        encryption of the syncing documents. If +                        None, encryption will be done in-place, +                        instead of retreiving it from the dedicated +                        database. +        :type sync_db: Sqlite handler +        :param sync_db_write_lock: a write lock for controlling concurrent +                                   access to the sync_db +        :type sync_db_write_lock: threading.Lock +        """ +        HTTPSyncTarget.__init__(self, url, creds) +        self._raw_url = url +        self._raw_creds = creds +        self._crypto = crypto +        self._stopped = True +        self._stop_lock = threading.Lock() +        self._sync_exchange_lock = threading.Lock() +        self.source_replica_uid = source_replica_uid +        self._defer_decryption = False + +        self._sync_db = None +        self._sync_decr_pool = None +        self._sync_watcher = None + +        if sync_db and sync_db_write_lock is not None: +            self._sync_db = sync_db +            self._decryption_callback = None +            self._sync_db_write_lock = sync_db_write_lock + +            # initialize syncing queue decryption pool +            self._sync_decr_pool = SyncDecrypterPool( +                self._crypto, self._sync_db, +                self._sync_db_write_lock, +                insert_doc_cb=self._insert_doc_cb) +            self._sync_decr_pool.set_source_replica_uid( +                self.source_replica_uid) +            self._sync_watcher = TimerTask( +                self._decrypt_syncing_received_docs, +                delay=self.DECRYPT_TASK_PERIOD) + +    def _get_replica_uid(self, url): +        """ +        Return replica uid from the url, or None. + +        :param url: the replica url +        :type url: str +        """ +        replica_uid_match = re.findall("user-([0-9a-fA-F]+)", url) +        return replica_uid_match[0] if len(replica_uid_match) > 0 else None + +    def close(self): +        """ +        Cleanly close pool of workers. +        """ +        if self._sync_watcher is not None: +            self._sync_watcher.stop() +            self._sync_watcher.shutdown() +        if self._sync_decr_pool is not None: +            self._sync_decr_pool.close() + +    @staticmethod +    def connect(url, source_replica_uid=None, crypto=None): +        return SoledadSyncTarget( +            url, source_replica_uid=source_replica_uid, crypto=crypto) + +    def _parse_received_doc_response(self, response): +        """ +        Parse the response from the server containing the received document. + +        :param response: The body and headers of the response. +        :type response: tuple(str, dict) +        """ +        data, _ = response +        # decode incoming stream +        parts = data.splitlines() +        if not parts or parts[0] != '[' or parts[-1] != ']': +            raise errors.BrokenSyncStream +        data = parts[1:-1] +        # decode metadata +        line, comma = utils.check_and_strip_comma(data[0]) +        metadata = None +        try: +            metadata = json.loads(line) +            new_generation = metadata['new_generation'] +            new_transaction_id = metadata['new_transaction_id'] +            number_of_changes = metadata['number_of_changes'] +        except json.JSONDecodeError, KeyError: +            raise errors.BrokenSyncStream +        # make sure we have replica_uid from fresh new dbs +        if self._ensure_callback and 'replica_uid' in metadata: +            self._ensure_callback(metadata['replica_uid']) +        # parse incoming document info +        doc_id = None +        rev = None +        content = None +        gen = None +        trans_id = None +        if number_of_changes > 0: +            try: +                entry = json.loads(data[1]) +                doc_id = entry['id'] +                rev = entry['rev'] +                content = entry['content'] +                gen = entry['gen'] +                trans_id = entry['trans_id'] +            except IndexError, KeyError: +                raise errors.BrokenSyncStream +        return new_generation, new_transaction_id, number_of_changes, \ +            doc_id, rev, content, gen, trans_id + +    def _insert_received_doc(self, idx, total, response): +        """ +        Insert a received document into the local replica. + +        :param idx: The index count of the current operation. +        :type idx: int +        :param total: The total number of operations. +        :type total: int +        :param response: The body and headers of the response. +        :type response: tuple(str, dict) +        """ +        new_generation, new_transaction_id, number_of_changes, doc_id, \ +            rev, content, gen, trans_id = \ +                self._parse_received_doc_response(response) +        if doc_id is not None: +            # decrypt incoming document and insert into local database +            # ------------------------------------------------------------- +            # symmetric decryption of document's contents +            # ------------------------------------------------------------- +            # If arriving content was symmetrically encrypted, we decrypt it. +            # We do it inline if defer_decryption flag is False or no sync_db +            # was defined, otherwise we defer it writing it to the received +            # docs table. +            doc = SoledadDocument(doc_id, rev, content) +            if is_symmetrically_encrypted(doc): +                if self._queue_for_decrypt: +                    self._save_encrypted_received_doc( +                        doc, gen, trans_id, idx, total) +                else: +                    # defer_decryption is False or no-sync-db fallback +                    doc.set_json(decrypt_doc(self._crypto, doc)) +                    self._return_doc_cb(doc, gen, trans_id) +            else: +                # not symmetrically encrypted doc, insert it directly +                # or save it in the decrypted stage. +                if self._queue_for_decrypt: +                    self._save_received_doc(doc, gen, trans_id, idx, total) +                else: +                    self._return_doc_cb(doc, gen, trans_id) +            # ------------------------------------------------------------- +            # end of symmetric decryption +            # ------------------------------------------------------------- +        msg = "%d/%d" % (idx + 1, total) +        signal(SOLEDAD_SYNC_RECEIVE_STATUS, msg) +        logger.debug("Soledad sync receive status: %s" % msg) +        return number_of_changes, new_generation, new_transaction_id + +    def _get_remote_docs(self, url, last_known_generation, last_known_trans_id, +                         headers, return_doc_cb, ensure_callback, sync_id, +                         syncer_pool, defer_decryption=False): +        """ +        Fetch sync documents from the remote database and insert them in the +        local database. + +        If an incoming document's encryption scheme is equal to +        EncryptionSchemes.SYMKEY, then this method will decrypt it with +        Soledad's symmetric key. + +        :param url: The syncing URL. +        :type url: str +        :param last_known_generation: Target's last known generation. +        :type last_known_generation: int +        :param last_known_trans_id: Target's last known transaction id. +        :type last_known_trans_id: str +        :param headers: The headers of the HTTP request. +        :type headers: dict +        :param return_doc_cb: A callback to insert docs from target. +        :type return_doc_cb: callable +        :param ensure_callback: A callback to ensure we have the correct +                                target_replica_uid, if it was just created. +        :type ensure_callback: callable +        :param sync_id: The id for the current sync session. +        :type sync_id: str +        :param defer_decryption: Whether to defer the decryption process using +                                 the intermediate database. If False, +                                 decryption will be done inline. +        :type defer_decryption: bool + +        :raise BrokenSyncStream: If `data` is malformed. + +        :return: A dictionary representing the first line of the response got +                 from remote replica. +        :rtype: dict +        """ +        # we keep a reference to the callback in case we defer the decryption +        self._return_doc_cb = return_doc_cb +        self._queue_for_decrypt = defer_decryption \ +            and self._sync_db is not None + +        new_generation = last_known_generation +        new_transaction_id = last_known_trans_id + +        if self._queue_for_decrypt: +            logger.debug( +                "Soledad sync: will queue received docs for decrypting.") + +        idx = 0 +        number_of_changes = 1 + +        first_request = True +        last_callback_lock = None +        threads = [] + +        # get incoming documents +        while idx < number_of_changes: +            # bail out if sync process was interrupted +            if self.stopped is True: +                break + +            # bail out if any thread failed +            if syncer_pool.failures is True: +                #syncer_pool.cancel_threads() +                self.stop() +                break + +            # launch a thread to fetch one document from target +            t = syncer_pool.new_syncer_thread( +                idx, number_of_changes, +                last_callback_lock=last_callback_lock) + +            # bail out if any thread failed +            if t is None: +                self.stop() +                break + +            t.doc_syncer.set_request_method( +                'get', idx, sync_id, last_known_generation, +                last_known_trans_id) +            t.doc_syncer.set_success_callback(self._insert_received_doc) + +            def _failure_callback(idx, total, exception): +                _failure_msg = "Soledad sync: error while getting document " \ +                    "%d/%d: %s" \ +                    % (idx + 1, total, exception) +                logger.warning("%s" % _failure_msg) +                logger.warning("Soledad sync: failing gracefully, will " +                               "recover on next sync.") + +            t.doc_syncer.set_failure_callback(_failure_callback) +            threads.append(t) +            t.start() +            last_callback_lock = t.callback_lock +            idx += 1 + +            # if this is the first request, wait to update the number of +            # changes +            if first_request is True: +                t.join() +                if t.success: +                    number_of_changes, _, _ = t.result +                first_request = False + +        if syncer_pool.failures is True: +            self.stop() + +        # make sure all threads finished and we have up-to-date info +        last_successful_thread = None +        while threads: +            # check if there are failures +            t = threads.pop(0) +            t.join() +            if t.success: +                last_successful_thread = t + +        # get information about last successful thread +        if last_successful_thread is not None: +            body, _ = last_successful_thread.response +            parsed_body = json.loads(body) +            metadata = parsed_body[0] +            new_generation = metadata['new_generation'] +            new_transaction_id = metadata['new_transaction_id'] + +        return new_generation, new_transaction_id + +    def sync_exchange(self, docs_by_generations, +                      source_replica_uid, last_known_generation, +                      last_known_trans_id, return_doc_cb, +                      ensure_callback=None, defer_decryption=True, +                      sync_id=None):          """          Find out which documents the remote database does not know about,          encrypt and send them. @@ -586,24 +1076,55 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):                                      the last local generation the remote                                      replica knows about.          :type docs_by_generations: list of tuples +          :param source_replica_uid: The uid of the source replica.          :type source_replica_uid: str +          :param last_known_generation: Target's last known generation.          :type last_known_generation: int +          :param last_known_trans_id: Target's last known transaction id.          :type last_known_trans_id: str +          :param return_doc_cb: A callback for inserting received documents from -            target. +                              target. If not overriden, this will call u1db +                              insert_doc_from_target in synchronizer, which +                              implements the TAKE OTHER semantics.          :type return_doc_cb: function +          :param ensure_callback: A callback that ensures we know the target -            replica uid if the target replica was just created. +                                replica uid if the target replica was just +                                created.          :type ensure_callback: function +        :param defer_decryption: Whether to defer the decryption process using +                                 the intermediate database. If False, +                                 decryption will be done inline. +        :type defer_decryption: bool +          :return: The new generation and transaction id of the target replica.          :rtype: tuple          """ +        self._ensure_callback = ensure_callback +        if defer_decryption: +            if self._sync_watcher is None: +                logger.warning( +                    "Soledad syncer: can't defer decryption, falling back to " +                    "normal syncing mode.") +                defer_decryption = False +            else: +                self._sync_exchange_lock.acquire() +                self._defer_decryption = True          self.start() -        sync_id = str(uuid4()) +        if sync_id is None: +            sync_id = str(uuid4()) +        self.source_replica_uid = source_replica_uid +        # let the decrypter pool access the passed callback to insert docs +        setProxiedObject(self._insert_doc_cb[source_replica_uid], +                         return_doc_cb) + +        if not self.clear_to_sync(): +            raise PendingReceivedDocsSyncError          self._ensure_connection()          if self._trace_hook:  # for tests @@ -611,78 +1132,140 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):          url = '%s/sync-from/%s' % (self._url.path, source_replica_uid)          headers = self._sign_request('POST', url, {}) -        def _post_put_doc(headers, last_known_generation, last_known_trans_id, -                          id, rev, content, gen, trans_id, sync_id): -            """ -            Put a sync document on server by means of a POST request. - -            :param received: How many documents have already been received in -                             this sync session. -            :type received: int -            """ -            # prepare to send the document -            entries = ['['] -            size = 1 -            # add remote replica metadata to the request -            size += self._prepare( -                '', entries, -                last_known_generation=last_known_generation, -                last_known_trans_id=last_known_trans_id, -                sync_id=sync_id, -                ensure=ensure_callback is not None) -            # add the document to the request -            size += self._prepare( -                ',', entries, -                id=id, rev=rev, content=content, gen=gen, trans_id=trans_id) -            entries.append('\r\n]') -            size += len(entries[-1]) -            # send headers -            self._init_post_request(url, 'put', headers, size) -            # send document -            for entry in entries: -                self._conn.send(entry) -            data, _ = self._response() -            data = json.loads(data) -            return data[0]['new_generation'], data[0]['new_transaction_id'] -          cur_target_gen = last_known_generation          cur_target_trans_id = last_known_trans_id          # send docs +        msg = "%d/%d" % (0, len(docs_by_generations)) +        signal(SOLEDAD_SYNC_SEND_STATUS, msg) +        logger.debug("Soledad sync send status: %s" % msg) + +        defer_encryption = self._sync_db is not None +        syncer_pool = DocumentSyncerPool( +            self._raw_url, self._raw_creds, url, headers, ensure_callback) +        threads = [] +        last_request_lock = None +        last_callback_lock = None          sent = 0 -        signal( -            SOLEDAD_SYNC_SEND_STATUS, -            "%d/%d" % (0, len(docs_by_generations))) +        total = len(docs_by_generations) + +        synced = [] +        number_of_docs = len(docs_by_generations) +          for doc, gen, trans_id in docs_by_generations:              # allow for interrupting the sync process              if self.stopped is True:                  break + +            # bail out if any thread failed +            if syncer_pool.failures is True: +                self.stop() +                break +              # skip non-syncable docs              if isinstance(doc, SoledadDocument) and not doc.syncable:                  continue +              # -------------------------------------------------------------              # symmetric encryption of document's contents              # -------------------------------------------------------------              doc_json = doc.get_json()              if not doc.is_tombstone(): -                doc_json = encrypt_doc(self._crypto, doc) +                if not defer_encryption: +                    # fallback case, for tests +                    doc_json = encrypt_doc(self._crypto, doc) +                else: +                    try: +                        doc_json = self.get_encrypted_doc_from_db( +                            doc.doc_id, doc.rev) +                    except Exception as exc: +                        logger.error("Error while getting " +                                     "encrypted doc from db") +                        logger.exception(exc) +                        continue +                    if doc_json is None: +                        # Not marked as tombstone, but we got nothing +                        # from the sync db. As it is not encrypted yet, we +                        # force inline encryption. +                        # TODO: implement a queue to deal with these cases. +                        doc_json = encrypt_doc(self._crypto, doc)              # -------------------------------------------------------------              # end of symmetric encryption              # ------------------------------------------------------------- -            cur_target_gen, cur_target_trans_id = _post_put_doc( -                headers, cur_target_gen, cur_target_trans_id, id=doc.doc_id, -                rev=doc.rev, content=doc_json, gen=gen, trans_id=trans_id, -                sync_id=sync_id) +            t = syncer_pool.new_syncer_thread( +                sent + 1, total, last_request_lock=None, +                last_callback_lock=last_callback_lock) + +            # bail out if any thread failed +            if t is None: +                self.stop() +                break + +            # set the request method +            t.doc_syncer.set_request_method( +                'put', sync_id, cur_target_gen, cur_target_trans_id, +                id=doc.doc_id, rev=doc.rev, content=doc_json, gen=gen, +                trans_id=trans_id, number_of_docs=number_of_docs) +            # set the success calback + +            def _success_callback(idx, total, response): +                _success_msg = "Soledad sync send status: %d/%d" \ +                               % (idx, total) +                signal(SOLEDAD_SYNC_SEND_STATUS, _success_msg) +                logger.debug(_success_msg) + +            t.doc_syncer.set_success_callback(_success_callback) + +            # set the failure callback +            def _failure_callback(idx, total, exception): +                _failure_msg = "Soledad sync: error while sending document " \ +                               "%d/%d: %s" % (idx, total, exception) +                logger.warning("%s" % _failure_msg) +                logger.warning("Soledad sync: failing gracefully, will " +                               "recover on next sync.") + +            t.doc_syncer.set_failure_callback(_failure_callback) + +            # save thread and append +            t.start() +            threads.append((t, doc)) +            last_request_lock = t.request_lock +            last_callback_lock = t.callback_lock              sent += 1 -            signal( -                SOLEDAD_SYNC_SEND_STATUS, -                "%d/%d" % (sent, len(docs_by_generations))) + +        # make sure all threads finished and we have up-to-date info +        while threads: +            # check if there are failures +            if syncer_pool.failures is True: +                #syncer_pool.cancel_threads() +                self.stop() +            t, doc = threads.pop(0) +            t.join() +            if t.success: +                synced.append((doc.doc_id, doc.rev)) + +        if defer_decryption: +            self._sync_watcher.start()          # get docs from target          cur_target_gen, cur_target_trans_id = self._get_remote_docs(              url,              last_known_generation, last_known_trans_id, headers, -            return_doc_cb, ensure_callback, sync_id) +            return_doc_cb, ensure_callback, sync_id, syncer_pool, +            defer_decryption=defer_decryption) + +        # delete documents from the sync database +        if defer_encryption: +            self.delete_encrypted_docs_from_db(synced) + +        # wait for deferred decryption to finish +        if defer_decryption: +            while self.clear_to_sync() is False: +                sleep(self.DECRYPT_TASK_PERIOD) +            self._sync_exchange_lock.release() +            self._sync_watcher.stop() + +        syncer_pool.cleanup()          self.stop()          return cur_target_gen, cur_target_trans_id @@ -714,3 +1297,164 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):          """          with self._stop_lock:              return self._stopped is True + +    def get_encrypted_doc_from_db(self, doc_id, doc_rev): +        """ +        Retrieve encrypted document from the database of encrypted docs for +        sync. + +        :param doc_id: The Document id. +        :type doc_id: str + +        :param doc_rev: The document revision +        :type doc_rev: str +        """ +        encr = SyncEncrypterPool +        c = self._sync_db.cursor() +        sql = ("SELECT content FROM %s WHERE doc_id=? and rev=?" % ( +            encr.TABLE_NAME,)) +        c.execute(sql, (doc_id, doc_rev)) +        res = c.fetchall() +        if len(res) != 0: +            return res[0][0] + +    def delete_encrypted_docs_from_db(self, docs_ids): +        """ +        Delete several encrypted documents from the database of symmetrically +        encrypted docs to sync. + +        :param docs_ids: an iterable with (doc_id, doc_rev) for all documents +                         to be deleted. +        :type docs_ids: any iterable of tuples of str +        """ +        if docs_ids: +            encr = SyncEncrypterPool +            c = self._sync_db.cursor() +            for doc_id, doc_rev in docs_ids: +                sql = ("DELETE FROM %s WHERE doc_id=? and rev=?" % ( +                    encr.TABLE_NAME,)) +                c.execute(sql, (doc_id, doc_rev)) +            self._sync_db.commit() + +    def _save_encrypted_received_doc(self, doc, gen, trans_id, idx, total): +        """ +        Save a symmetrically encrypted incoming document into the received +        docs table in the sync db. A decryption task will pick it up +        from here in turn. + +        :param doc: The document to save. +        :type doc: SoledadDocument +        :param gen: The generation. +        :type gen: str +        :param  trans_id: Transacion id. +        :type gen: str +        :param idx: The index count of the current operation. +        :type idx: int +        :param total: The total number of operations. +        :type total: int +        """ +        logger.debug( +            "Enqueueing doc for decryption: %d/%d." +            % (idx + 1, total)) +        self._sync_decr_pool.insert_encrypted_received_doc( +            doc.doc_id, doc.rev, doc.content, gen, trans_id) + +    def _save_received_doc(self, doc, gen, trans_id, idx, total): +        """ +        Save any incoming document into the received docs table in the sync db. + +        :param doc: The document to save. +        :type doc: SoledadDocument +        :param gen: The generation. +        :type gen: str +        :param  trans_id: Transacion id. +        :type gen: str +        :param idx: The index count of the current operation. +        :type idx: int +        :param total: The total number of operations. +        :type total: int +        """ +        logger.debug( +            "Enqueueing doc, no decryption needed: %d/%d." +            % (idx + 1, total)) +        self._sync_decr_pool.insert_received_doc( +            doc.doc_id, doc.rev, doc.content, gen, trans_id) + +    # +    # Symmetric decryption of syncing docs +    # + +    def clear_to_sync(self): +        """ +        Return True if sync can proceed (ie, the received db table is empty). +        :rtype: bool +        """ +        if self._sync_decr_pool is not None: +            return self._sync_decr_pool.count_received_encrypted_docs() == 0 +        else: +            return True + +    def set_decryption_callback(self, cb): +        """ +        Set callback to be called when the decryption finishes. + +        :param cb: The callback to be set. +        :type cb: callable +        """ +        self._decryption_callback = cb + +    def has_decryption_callback(self): +        """ +        Return True if there is a decryption callback set. +        :rtype: bool +        """ +        return self._decryption_callback is not None + +    def has_syncdb(self): +        """ +        Return True if we have an initialized syncdb. +        """ +        return self._sync_db is not None + +    def _decrypt_syncing_received_docs(self): +        """ +        Decrypt the documents received from remote replica and insert them +        into the local one. + +        Called periodically from TimerTask self._sync_watcher. +        """ +        if sameProxiedObjects( +                self._insert_doc_cb.get(self.source_replica_uid), +                None): +            return + +        decrypter = self._sync_decr_pool +        decrypter.decrypt_received_docs() +        done = decrypter.process_decrypted() + +    def _sign_request(self, method, url_query, params): +        """ +        Return an authorization header to be included in the HTTP request. + +        :param method: The HTTP method. +        :type method: str +        :param url_query: The URL query string. +        :type url_query: str +        :param params: A list with encoded query parameters. +        :type param: list + +        :return: The Authorization header. +        :rtype: list of tuple +        """ +        return TokenBasedAuth._sign_request(self, method, url_query, params) + +    def set_token_credentials(self, uuid, token): +        """ +        Store given credentials so we can sign the request later. + +        :param uuid: The user's uuid. +        :type uuid: str +        :param token: The authentication token. +        :type token: str +        """ +        TokenBasedAuth.set_token_credentials(self, uuid, token) diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py index b51b32f3..c0adfc70 100644 --- a/common/src/leap/soledad/common/couch.py +++ b/common/src/leap/soledad/common/couch.py @@ -1106,7 +1106,8 @@ class CouchDatabase(CommonBackend):          )      def _set_replica_gen_and_trans_id(self, other_replica_uid, -                                      other_generation, other_transaction_id): +                                      other_generation, other_transaction_id, +                                      number_of_docs=None, sync_id=None):          """          Set the last-known generation and transaction id for the other          database replica. @@ -1122,12 +1123,19 @@ class CouchDatabase(CommonBackend):          :param other_transaction_id: The transaction id associated with the              generation.          :type other_transaction_id: str +        :param number_of_docs: The total amount of documents sent on this sync +                               session. +        :type number_of_docs: int +        :param sync_id: The id of the current sync session. +        :type sync_id: str          """          self._do_set_replica_gen_and_trans_id( -            other_replica_uid, other_generation, other_transaction_id) +            other_replica_uid, other_generation, other_transaction_id, +            number_of_docs=number_of_docs, sync_id=sync_id)      def _do_set_replica_gen_and_trans_id( -            self, other_replica_uid, other_generation, other_transaction_id): +            self, other_replica_uid, other_generation, other_transaction_id, +            number_of_docs=None, sync_id=None):          """          Set the last-known generation and transaction id for the other          database replica. @@ -1143,6 +1151,11 @@ class CouchDatabase(CommonBackend):          :param other_transaction_id: The transaction id associated with the                                       generation.          :type other_transaction_id: str +        :param number_of_docs: The total amount of documents sent on this sync +                               session. +        :type number_of_docs: int +        :param sync_id: The id of the current sync session. +        :type sync_id: str          :raise MissingDesignDocError: Raised when tried to access a missing                                        design document. @@ -1163,12 +1176,17 @@ class CouchDatabase(CommonBackend):          res = self._database.resource(*ddoc_path)          try:              with CouchDatabase.update_handler_lock[self._get_replica_uid()]: +                body={ +                    'other_replica_uid': other_replica_uid, +                    'other_generation': other_generation, +                    'other_transaction_id': other_transaction_id, +                } +                if sync_id is not None: +                    body['sync_id'] = sync_id +                if number_of_docs is not None: +                    body['number_of_docs'] = number_of_docs                  res.put_json( -                    body={ -                        'other_replica_uid': other_replica_uid, -                        'other_generation': other_generation, -                        'other_transaction_id': other_transaction_id, -                    }, +                    body=body,                      headers={'content-type': 'application/json'})          except ResourceNotFound as e:              raise_missing_design_doc_error(e, ddoc_path) @@ -1306,7 +1324,8 @@ class CouchDatabase(CommonBackend):              doc.set_conflicts(cur_doc.get_conflicts())      def _put_doc_if_newer(self, doc, save_conflict, replica_uid, replica_gen, -                          replica_trans_id=''): +                          replica_trans_id='', number_of_docs=None, +                          sync_id=None):          """          Insert/update document into the database with a given revision. @@ -1339,6 +1358,11 @@ class CouchDatabase(CommonBackend):          :param replica_trans_id: The transaction_id associated with the                                   generation.          :type replica_trans_id: str +        :param number_of_docs: The total amount of documents sent on this sync +                               session. +        :type number_of_docs: int +        :param sync_id: The id of the current sync session. +        :type sync_id: str          :return: (state, at_gen) -  If we don't have doc_id already, or if                   doc_rev supersedes the existing document revision, then the @@ -1398,7 +1422,8 @@ class CouchDatabase(CommonBackend):                  self._force_doc_sync_conflict(doc)          if replica_uid is not None and replica_gen is not None:              self._set_replica_gen_and_trans_id( -                replica_uid, replica_gen, replica_trans_id) +                replica_uid, replica_gen, replica_trans_id, +                number_of_docs=number_of_docs, sync_id=sync_id)          # update info          old_doc.rev = doc.rev          if doc.is_tombstone(): diff --git a/common/src/leap/soledad/common/ddocs/syncs/updates/put.js b/common/src/leap/soledad/common/ddocs/syncs/updates/put.js index 722f695a..d754faaa 100644 --- a/common/src/leap/soledad/common/ddocs/syncs/updates/put.js +++ b/common/src/leap/soledad/common/ddocs/syncs/updates/put.js @@ -1,22 +1,128 @@ +/** + * The u1db_sync_log document stores both the actual sync log and a list of + * pending updates to the log, in case we receive incoming documents out of + * the correct order (i.e. if there are parallel PUTs during the sync + * process). + * + * The structure of the document is the following: + * + *     { + *         'syncs': [ + *             ['<replica_uid>', <gen>, '<trans_id>'], + *             ...  + *         ], + *         'pending': { + *             'other_replica_uid': { + *                 'sync_id': '<sync_id>', + *                 'log': [[<gen>, '<trans_id>'], ...] + *             }, + *             ... + *         } + *     } + * + * The update function below does the following: + * + *   0. If we do not receive a sync_id, we just update the 'syncs' list with + *      the incoming info about the source replica state. + * + *   1. Otherwise, if the incoming sync_id differs from current stored + *      sync_id, then we assume that the previous sync session for that source + *      replica was interrupted and discard all pending data. + * + *   2. Then we append incoming info as pending data for that source replica + *      and current sync_id, and sort the pending data by generation. + * + *   3. Then we go through pending data and find the most recent generation + *      that we can use to update the actual sync log. + * + *   4. Finally, we insert the most up to date information into the sync log. + */  function(doc, req){ + +    // create the document if it doesn't exist      if (!doc) {          doc = {}          doc['_id'] = 'u1db_sync_log';          doc['syncs'] = [];      } -    body = JSON.parse(req.body); + +    // get and validate incoming info +    var body = JSON.parse(req.body); +    var other_replica_uid = body['other_replica_uid']; +    var other_generation = parseInt(body['other_generation']); +    var other_transaction_id = body['other_transaction_id'] +    var sync_id = body['sync_id']; +    var number_of_docs = body['number_of_docs']; +    if (number_of_docs != null) +        number_of_docs = parseInt(number_of_docs); + +    if (other_replica_uid == null +            || other_generation == null +            || other_transaction_id == null) +        return [null, 'invalid data']; + +    // create slot for pending logs +    if (doc['pending'] == null) +        doc['pending'] = {}; + +    // these are the values that will be actually inserted +    var current_gen = other_generation; +    var current_trans_id = other_transaction_id; + +    /*------------ Wait for end of sync session before storing ------------*/ + +    // we just try to obtain pending log if we received a sync_id +    if (sync_id != null) { + +        // create slot for current source and sync_id pending log +        if (doc['pending'][other_replica_uid] == null +                || doc['pending'][other_replica_uid]['sync_id'] != sync_id) { +            doc['pending'][other_replica_uid] = { +                'sync_id': sync_id, +                'log': [], +            } +        } + +        // append incoming data to pending log +        doc['pending'][other_replica_uid]['log'].push([ +            other_generation, +            other_transaction_id +        ]) + +        // leave the sync log untouched if we still did not receive all docs +        if (doc['pending'][other_replica_uid]['log'].length < number_of_docs) +            return [doc, 'ok']; + +        // otherwise, sort pending log according to generation +        doc['pending'][other_replica_uid]['log'].sort(function(a, b) { +            return a[0] - b[0]; +        }); + +        // get most up-to-date information from pending log +        pending = doc['pending'][other_replica_uid]['log'].pop() +        current_gen = pending[0]; +        current_trans_id = pending[1]; + +        // and remove all pending data from that replica +        delete doc['pending'][other_replica_uid] +    } + +    /*--------------- Store source replica info on sync log ---------------*/ +      // remove outdated info      doc['syncs'] = doc['syncs'].filter(          function (entry) { -            return entry[0] != body['other_replica_uid']; +            return entry[0] != other_replica_uid;          }      ); -    // store u1db rev + +    // store in log      doc['syncs'].push([ -        body['other_replica_uid'], -        body['other_generation'], -        body['other_transaction_id'] +        other_replica_uid, +        current_gen, +        current_trans_id       ]); +      return [doc, 'ok'];  } diff --git a/server/src/leap/soledad/server/sync.py b/server/src/leap/soledad/server/sync.py index c6928aaa..3a1881fc 100644 --- a/server/src/leap/soledad/server/sync.py +++ b/server/src/leap/soledad/server/sync.py @@ -210,6 +210,8 @@ class SyncExchange(sync.SyncExchange):          :param last_known_generation: The last target replica generation the                                        source replica knows about.          :type last_known_generation: int +        :param sync_id: The id of the current sync session. +        :type sync_id: str          """          self._db = db          self.source_replica_uid = source_replica_uid @@ -284,7 +286,8 @@ class SyncExchange(sync.SyncExchange):              doc = self._db.get_doc(changed_doc_id, include_deleted=True)              return_doc_cb(doc, gen, trans_id) -    def insert_doc_from_source(self, doc, source_gen, trans_id): +    def insert_doc_from_source(self, doc, source_gen, trans_id, +            number_of_docs=None, sync_id=None):          """Try to insert synced document from source.          Conflicting documents are not inserted but will be sent over @@ -302,10 +305,16 @@ class SyncExchange(sync.SyncExchange):          :type source_gen: int          :param trans_id: The transaction id of that document change.          :type trans_id: str +        :param number_of_docs: The total amount of documents sent on this sync +                               session. +        :type number_of_docs: int +        :param sync_id: The id of the current sync session. +        :type sync_id: str          """          state, at_gen = self._db._put_doc_if_newer(              doc, save_conflict=False, replica_uid=self.source_replica_uid, -            replica_gen=source_gen, replica_trans_id=trans_id) +            replica_gen=source_gen, replica_trans_id=trans_id, +            number_of_docs=number_of_docs, sync_id=sync_id)          if state == 'inserted':              self._sync_state.put_seen_id(doc.doc_id, at_gen)          elif state == 'converged': @@ -340,6 +349,8 @@ class SyncResource(http_app.SyncResource):          :param last_known_trans_id: The last server replica transaction_id the                                      client knows about.          :type last_known_trans_id: str +        :param sync_id: The id of the current sync session. +        :type sync_id: str          :param ensure: Whether the server replica should be created if it does                         not already exist.          :type ensure: bool @@ -355,9 +366,10 @@ class SyncResource(http_app.SyncResource):          # get a sync exchange object          self.sync_exch = self.sync_exchange_class(              db, self.source_replica_uid, last_known_generation, sync_id) +        self._sync_id = sync_id      @http_app.http_method(content_as_args=True) -    def post_put(self, id, rev, content, gen, trans_id): +    def post_put(self, id, rev, content, gen, trans_id, number_of_docs):          """          Put one incoming document into the server replica. @@ -373,9 +385,14 @@ class SyncResource(http_app.SyncResource):          :param trans_id: The source replica transaction id corresponding to                           the revision of the incoming document.          :type trans_id: str +        :param number_of_docs: The total amount of documents sent on this sync +                               session. +        :type number_of_docs: int          """          doc = Document(id, rev, content) -        self.sync_exch.insert_doc_from_source(doc, gen, trans_id) +        self.sync_exch.insert_doc_from_source( +            doc, gen, trans_id, number_of_docs=number_of_docs, +            sync_id=self._sync_id)      @http_app.http_method(received=int, content_as_args=True)      def post_get(self, received): | 
