From 222c229d29b9b8e6ed72897b1c96d23f0d8de80e Mon Sep 17 00:00:00 2001 From: drebs Date: Wed, 2 Jul 2014 12:22:25 -0300 Subject: Split sync_exchange into many requests (#5517). --- client/src/leap/soledad/client/target.py | 1604 ++++++++++++++------ common/src/leap/soledad/common/couch.py | 45 +- .../leap/soledad/common/ddocs/syncs/updates/put.js | 118 +- server/src/leap/soledad/server/sync.py | 25 +- 4 files changed, 1342 insertions(+), 450 deletions(-) diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py index 968545b6..28edd027 100644 --- a/client/src/leap/soledad/client/target.py +++ b/client/src/leap/soledad/client/target.py @@ -14,458 +14,469 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see . + + """ A U1DB backend for encrypting data before sending to server and decrypting after receiving. """ -import binascii + + import cStringIO import gzip -import hashlib -import hmac import logging +import re import urllib import threading +import urlparse -import simplejson as json +from collections import defaultdict from time import sleep from uuid import uuid4 +from contextlib import contextmanager -from u1db.remote import utils, http_errors -from u1db.errors import BrokenSyncStream +import simplejson as json +from taskthread import TimerTask from u1db import errors +from u1db.remote import utils, http_errors from u1db.remote.http_target import HTTPSyncTarget -from u1db.remote.http_client import _encode_query_parameter - +from u1db.remote.http_client import _encode_query_parameter, HTTPClientBase +from zope.proxy import ProxyBase +from zope.proxy import sameProxiedObjects, setProxiedObject from leap.soledad.common import soledad_assert -from leap.soledad.common.crypto import ( - EncryptionSchemes, - UnknownEncryptionScheme, - MacMethods, - UnknownMacMethod, - WrongMac, - ENC_JSON_KEY, - ENC_SCHEME_KEY, - ENC_METHOD_KEY, - ENC_IV_KEY, - MAC_KEY, - MAC_METHOD_KEY, -) from leap.soledad.common.document import SoledadDocument from leap.soledad.client.auth import TokenBasedAuth -from leap.soledad.client.crypto import ( - EncryptionMethods, - UnknownEncryptionMethod, -) -from leap.soledad.client.events import ( - SOLEDAD_SYNC_SEND_STATUS, - SOLEDAD_SYNC_RECEIVE_STATUS, - signal, -) +from leap.soledad.client.crypto import is_symmetrically_encrypted +from leap.soledad.client.crypto import encrypt_doc, decrypt_doc +from leap.soledad.client.crypto import SyncEncrypterPool, SyncDecrypterPool +from leap.soledad.client.events import SOLEDAD_SYNC_SEND_STATUS +from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS +from leap.soledad.client.events import signal -logger = logging.getLogger(__name__) -# -# Exceptions -# +logger = logging.getLogger(__name__) -class DocumentNotEncrypted(Exception): +def _gunzip(data): """ - Raised for failures in document encryption. + Uncompress data that is gzipped. + + :param data: gzipped data + :type data: basestring """ - pass + buffer = cStringIO.StringIO() + buffer.write(data) + buffer.seek(0) + try: + data = gzip.GzipFile(mode='r', fileobj=buffer).read() + except Exception: + logger.warning("Error while decrypting gzipped data") + buffer.close() + return data -# -# Crypto utilities for a SoledadDocument. -# +class PendingReceivedDocsSyncError(Exception): + pass -def mac_doc(crypto, doc_id, doc_rev, ciphertext, mac_method): +class DocumentSyncerThread(threading.Thread): """ - Calculate a MAC for C{doc} using C{ciphertext}. - - Current MAC method used is HMAC, with the following parameters: - - * key: sha256(storage_secret, doc_id) - * msg: doc_id + doc_rev + ciphertext - * digestmod: sha256 - - :param crypto: A SoledadCryto instance used to perform the encryption. - :type crypto: leap.soledad.crypto.SoledadCrypto - :param doc_id: The id of the document. - :type doc_id: str - :param doc_rev: The revision of the document. - :type doc_rev: str - :param ciphertext: The content of the document. - :type ciphertext: str - :param mac_method: The MAC method to use. - :type mac_method: str - - :return: The calculated MAC. - :rtype: str + A thread that knowns how to either send or receive a document during the + sync process. """ - if mac_method == MacMethods.HMAC: - return hmac.new( - crypto.doc_mac_key(doc_id), - str(doc_id) + str(doc_rev) + ciphertext, - hashlib.sha256).digest() - # raise if we do not know how to handle this MAC method - raise UnknownMacMethod('Unknown MAC method: %s.' % mac_method) + def __init__(self, doc_syncer, release_method, failed_method, + idx, total, last_request_lock=None, last_callback_lock=None): + """ + Initialize a new syncer thread. + + :param doc_syncer: A document syncer. + :type doc_syncer: HTTPDocumentSyncer + :param release_method: A method to be called when finished running. + :type release_method: callable(DocumentSyncerThread) + :param failed_method: A method to be called when we failed. + :type failed_method: callable(DocumentSyncerThread) + :param idx: The index count of the current operation. + :type idx: int + :param total: The total number of operations. + :type total: int + :param last_request_lock: A lock to wait for before actually performing + the request. + :type last_request_lock: threading.Lock + :param last_callback_lock: A lock to wait for before actually running + the success callback. + :type last_callback_lock: threading.Lock + """ + threading.Thread.__init__(self) + self._doc_syncer = doc_syncer + self._release_method = release_method + self._failed_method = failed_method + self._idx = idx + self._total = total + self._last_request_lock = last_request_lock + self._last_callback_lock = last_callback_lock + self._response = None + self._exception = None + self._result = None + self._success = False + # a lock so we can signal when we're finished + self._request_lock = threading.Lock() + self._request_lock.acquire() + self._callback_lock = threading.Lock() + self._callback_lock.acquire() + # make thread interruptable + self._stopped = None + self._stop_lock = threading.Lock() -def encrypt_doc(crypto, doc): - """ - Encrypt C{doc}'s content. - - Encrypt doc's contents using AES-256 CTR mode and return a valid JSON - string representing the following: - - { - ENC_JSON_KEY: '', - ENC_SCHEME_KEY: 'symkey', - ENC_METHOD_KEY: EncryptionMethods.AES_256_CTR, - ENC_IV_KEY: '', - MAC_KEY: '' - MAC_METHOD_KEY: 'hmac' - } - - :param crypto: A SoledadCryto instance used to perform the encryption. - :type crypto: leap.soledad.crypto.SoledadCrypto - :param doc: The document with contents to be encrypted. - :type doc: SoledadDocument - - :return: The JSON serialization of the dict representing the encrypted - content. - :rtype: str - """ - soledad_assert(doc.is_tombstone() is False) - # encrypt content using AES-256 CTR mode - iv, ciphertext = crypto.encrypt_sym( - str(doc.get_json()), # encryption/decryption routines expect str - crypto.doc_passphrase(doc.doc_id), - method=EncryptionMethods.AES_256_CTR) - # Return a representation for the encrypted content. In the following, we - # convert binary data to hexadecimal representation so the JSON - # serialization does not complain about what it tries to serialize. - hex_ciphertext = binascii.b2a_hex(ciphertext) - return json.dumps({ - ENC_JSON_KEY: hex_ciphertext, - ENC_SCHEME_KEY: EncryptionSchemes.SYMKEY, - ENC_METHOD_KEY: EncryptionMethods.AES_256_CTR, - ENC_IV_KEY: iv, - # store the mac as hex. - MAC_KEY: binascii.b2a_hex( - mac_doc( - crypto, doc.doc_id, doc.rev, - ciphertext, - MacMethods.HMAC)), - MAC_METHOD_KEY: MacMethods.HMAC, - }) - - -def decrypt_doc(crypto, doc): - """ - Decrypt C{doc}'s content. + def run(self): + """ + Run the HTTP request and store results. - Return the JSON string representation of the document's decrypted content. + This method will block and wait for an eventual previous operation to + finish before actually performing the request. It also traps any + exception and register any failure with the request. + """ + with self._stop_lock: + if self._stopped is None: + self._stopped = False + else: + return + + # eventually wait for the previous thread to finish + if self._last_request_lock is not None: + self._last_request_lock.acquire() + + # bail out in case we've been interrupted + if self.stopped is True: + return + + try: + self._response = self._doc_syncer.do_request() + self._request_lock.release() + + # run success callback + if self._doc_syncer.success_callback is not None: + + # eventually wait for callback lock release + if self._last_callback_lock is not None: + self._last_callback_lock.acquire() + + # bail out in case we've been interrupted + if self._stopped is True: + return + + self._result = self._doc_syncer.success_callback( + self._idx, self._total, self._response) + self._success = True + doc_syncer = self._doc_syncer + self._release_method(self, doc_syncer) + self._doc_syncer = None + # let next thread executed its callback + self._callback_lock.release() + + # trap any exception and signal failure + except Exception as e: + self._exception = e + self._success = False + # run failure callback + if self._doc_syncer.failure_callback is not None: + + # eventually wait for callback lock release + if self._last_callback_lock is not None: + self._last_callback_lock.acquire() + + # bail out in case we've been interrupted + if self.stopped is True: + return + + self._doc_syncer.failure_callback( + self._idx, self._total, self._exception) + + self._failed_method(self) + # we do not release the callback lock here because we + # failed and so we don't want other threads to succeed. - The content of the document should have the following structure: + @property + def doc_syncer(self): + return self._doc_syncer - { - ENC_JSON_KEY: '', - ENC_SCHEME_KEY: '', - ENC_METHOD_KEY: '', - ENC_IV_KEY: '', # (optional) - MAC_KEY: '' - MAC_METHOD_KEY: 'hmac' - } + @property + def response(self): + return self._response - C{enc_blob} is the encryption of the JSON serialization of the document's - content. For now Soledad just deals with documents whose C{enc_scheme} is - EncryptionSchemes.SYMKEY and C{enc_method} is - EncryptionMethods.AES_256_CTR. + @property + def exception(self): + return self._exception - :param crypto: A SoledadCryto instance to perform the encryption. - :type crypto: leap.soledad.crypto.SoledadCrypto - :param doc: The document to be decrypted. - :type doc: SoledadDocument + @property + def callback_lock(self): + return self._callback_lock - :return: The JSON serialization of the decrypted content. - :rtype: str - """ - soledad_assert(doc.is_tombstone() is False) - soledad_assert(ENC_JSON_KEY in doc.content) - soledad_assert(ENC_SCHEME_KEY in doc.content) - soledad_assert(ENC_METHOD_KEY in doc.content) - soledad_assert(MAC_KEY in doc.content) - soledad_assert(MAC_METHOD_KEY in doc.content) - # verify MAC - ciphertext = binascii.a2b_hex( # content is stored as hex. - doc.content[ENC_JSON_KEY]) - mac = mac_doc( - crypto, doc.doc_id, doc.rev, - ciphertext, - doc.content[MAC_METHOD_KEY]) - # we compare mac's hashes to avoid possible timing attacks that might - # exploit python's builtin comparison operator behaviour, which fails - # immediatelly when non-matching bytes are found. - doc_mac_hash = hashlib.sha256( - binascii.a2b_hex( # the mac is stored as hex - doc.content[MAC_KEY])).digest() - calculated_mac_hash = hashlib.sha256(mac).digest() - if doc_mac_hash != calculated_mac_hash: - raise WrongMac('Could not authenticate document\'s contents.') - # decrypt doc's content - enc_scheme = doc.content[ENC_SCHEME_KEY] - plainjson = None - if enc_scheme == EncryptionSchemes.SYMKEY: - enc_method = doc.content[ENC_METHOD_KEY] - if enc_method == EncryptionMethods.AES_256_CTR: - soledad_assert(ENC_IV_KEY in doc.content) - plainjson = crypto.decrypt_sym( - ciphertext, - crypto.doc_passphrase(doc.doc_id), - method=enc_method, - iv=doc.content[ENC_IV_KEY]) - else: - raise UnknownEncryptionMethod(enc_method) - else: - raise UnknownEncryptionScheme(enc_scheme) - return plainjson + @property + def request_lock(self): + return self._request_lock + @property + def success(self): + return self._success -def _gunzip(data): - """ - Uncompress data that is gzipped. + def stop(self): + with self._stop_lock: + self._stopped = True - :param data: gzipped data - :type data: basestring - """ - buffer = cStringIO.StringIO() - buffer.write(data) - buffer.seek(0) - try: - data = gzip.GzipFile(mode='r', fileobj=buffer).read() - except Exception: - logger.warning("Error while decrypting gzipped data") - buffer.close() - return data + @property + def stopped(self): + with self._stop_lock: + return self._stopped + @property + def result(self): + return self._result -# -# SoledadSyncTarget -# -class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): +class DocumentSyncerPool(object): """ - A SyncTarget that encrypts data before sending and decrypts data after - receiving. + A pool of reusable document syncers. """ - # - # Token auth methods. - # + POOL_SIZE = 10 + """ + The maximum amount of syncer threads running at the same time. + """ - def set_token_credentials(self, uuid, token): + def __init__(self, raw_url, raw_creds, query_string, headers, + ensure_callback): """ - Store given credentials so we can sign the request later. + Initialize the document syncer pool. + + :param raw_url: The complete raw URL for the HTTP request. + :type raw_url: str + :param raw_creds: The credentials for the HTTP request. + :type raw_creds: dict + :param query_string: The query string for the HTTP request. + :type query_string: str + :param headers: The headers for the HTTP request. + :type headers: dict + :param ensure_callback: A callback to ensure we have the correct + target_replica_uid, if it was just created. + :type ensure_callback: callable - :param uuid: The user's uuid. - :type uuid: str - :param token: The authentication token. - :type token: str """ - TokenBasedAuth.set_token_credentials(self, uuid, token) - - def _sign_request(self, method, url_query, params): + # save syncer params + self._raw_url = raw_url + self._raw_creds = raw_creds + self._query_string = query_string + self._headers = headers + self._ensure_callback = ensure_callback + # pool attributes + self._failures = False + self._semaphore_pool = threading.BoundedSemaphore( + DocumentSyncerPool.POOL_SIZE) + self._pool_access_lock = threading.Lock() + self._doc_syncers = [] + self._threads = [] + + def new_syncer_thread(self, idx, total, last_request_lock=None, + last_callback_lock=None): """ - Return an authorization header to be included in the HTTP request. + Yield a new document syncer thread. + + :param idx: The index count of the current operation. + :type idx: int + :param total: The total number of operations. + :type total: int + :param last_request_lock: A lock to wait for before actually performing + the request. + :type last_request_lock: threading.Lock + :param last_callback_lock: A lock to wait for before actually running + the success callback. + :type last_callback_lock: threading.Lock + """ + t = None + # wait for available threads + self._semaphore_pool.acquire() + with self._pool_access_lock: + if self._failures is True: + return None + # get a syncer + doc_syncer = self._get_syncer() + # we rely on DocumentSyncerThread.run() to release the lock using + # self.release_syncer so we can launch a new thread. + t = DocumentSyncerThread( + doc_syncer, self.release_syncer, self.cancel_threads, + idx, total, + last_request_lock=last_request_lock, + last_callback_lock=last_callback_lock) + self._threads.append(t) + return t + + def _failed(self): + with self._pool_access_lock: + self._failures = True - :param method: The HTTP method. - :type method: str - :param url_query: The URL query string. - :type url_query: str - :param params: A list with encoded query parameters. - :type param: list + @property + def failures(self): + return self._failures - :return: The Authorization header. - :rtype: list of tuple + def _get_syncer(self): """ - return TokenBasedAuth._sign_request(self, method, url_query, params) - - # - # Modified HTTPSyncTarget methods. - # + Get a document syncer from the pool. - @staticmethod - def connect(url, crypto=None): - return SoledadSyncTarget(url, crypto=crypto) + This method will create a new syncer whenever there is no syncer + available in the pool. - def __init__(self, url, creds=None, crypto=None): + :return: A syncer. + :rtype: HTTPDocumentSyncer """ - Initialize the SoledadSyncTarget. - - :param url: The url of the target replica to sync with. - :type url: str - :param creds: optional dictionary giving credentials. - to authorize the operation with the server. - :type creds: dict - :param soledad: An instance of Soledad so we can encrypt/decrypt - document contents when syncing. - :type soledad: soledad.Soledad + syncer = None + # get an available syncer or create a new one + try: + syncer = self._doc_syncers.pop() + except IndexError: + syncer = HTTPDocumentSyncer( + self._raw_url, self._raw_creds, self._query_string, + self._headers, self._ensure_callback) + return syncer + + def release_syncer(self, syncer_thread, doc_syncer): """ - HTTPSyncTarget.__init__(self, url, creds) - self._crypto = crypto - self._stopped = True - self._stop_lock = threading.Lock() + Return a syncer to the pool after use and check for any failures. - def _init_post_request(self, url, action, headers, content_length): + :param syncer: The syncer to be returned to the pool. + :type syncer: HTTPDocumentSyncer """ - Initiate a syncing POST request. + with self._pool_access_lock: + self._doc_syncers.append(doc_syncer) + if syncer_thread.success is True: + self._threads.remove(syncer_thread) + self._semaphore_pool.release() - :param url: The syncing URL. - :type url: str - :param action: The syncing action, either 'get' or 'receive'. - :type action: str - :param headers: The initial headers to be sent on this request. - :type headers: dict - :param content_length: The content-length of the request. - :type content_length: int + def cancel_threads(self, calling_thread): """ - self._conn.putrequest('POST', url) - self._conn.putheader( - 'content-type', 'application/x-soledad-sync-%s' % action) - for header_name, header_value in headers: - self._conn.putheader(header_name, header_value) - self._conn.putheader('accept-encoding', 'gzip') - self._conn.putheader('content-length', str(content_length)) - self._conn.endheaders() - - def _get_remote_docs(self, url, last_known_generation, last_known_trans_id, - headers, return_doc_cb, ensure_callback, sync_id): + Stop all threads in the pool. """ - Fetch sync documents from the remote database and insert them in the - local database. + stopped = [] + # stop all threads + logger.warning("Soledad sync: cancelling sync threads.") + with self._pool_access_lock: + self._failures = True + while self._threads: + t = self._threads.pop(0) + t.stop() + self._doc_syncers.append(t.doc_syncer) + stopped.append(t) + # release locks and join + while stopped: + t = stopped.pop(0) + t.request_lock.acquire(False) # just in case + t.request_lock.release() + t.callback_lock.acquire(False) # just in case + t.callback_lock.release() + if t is not calling_thread and t.is_alive(): + t.join() + + def cleanup(self): + """ + Close and remove any syncers from the pool. + """ + with self._pool_access_lock: + while self._doc_syncers: + syncer = self._doc_syncers.pop() + syncer.close() + del syncer - If an incoming document's encryption scheme is equal to - EncryptionSchemes.SYMKEY, then this method will decrypt it with - Soledad's symmetric key. - :param url: The syncing URL. - :type url: str - :param last_known_generation: Target's last known generation. - :type last_known_generation: int - :param last_known_trans_id: Target's last known transaction id. - :type last_known_trans_id: str - :param headers: The headers of the HTTP request. +class HTTPDocumentSyncer(HTTPClientBase, TokenBasedAuth): + + def __init__(self, raw_url, creds, query_string, headers, ensure_callback): + """ + Initialize the client. + + :param raw_url: The raw URL of the target HTTP server. + :type raw_url: str + :param creds: Authentication credentials. + :type creds: dict + :param query_string: The query string for the HTTP request. + :type query_string: str + :param headers: The headers for the HTTP request. :type headers: dict - :param return_doc_cb: A callback to insert docs from target. - :type return_doc_cb: callable :param ensure_callback: A callback to ensure we have the correct target_replica_uid, if it was just created. :type ensure_callback: callable + """ + HTTPClientBase.__init__(self, raw_url, creds=creds) + # info needed to perform the request + self._query_string = query_string + self._headers = headers + self._ensure_callback = ensure_callback + # the actual request method + self._request_method = None + self._success_callback = None + self._failure_callback = None + # storage of info returned by the request + self._success = None + self._exception = None + + def _reset(self): + """ + Reset this document syncer so we can reuse it. + """ + self._request_method = None + self._success_callback = None + self._failure_callback = None + self._request_method = None + self._success = None + self._exception = None + + def set_request_method(self, method, *args, **kwargs): + """ + Set the actual method to perform the request. - :raise BrokenSyncStream: If C{data} is malformed. + :param method: Either 'get' or 'put'. + :type method: str + :param args: Arguments for the request method. + :type args: list + :param kwargs: Keyworded arguments for the request method. + :type kwargs: dict + """ + self._reset() + # resolve request method + if method is 'get': + self._request_method = self._get_doc + elif method is 'put': + self._request_method = self._put_doc + else: + raise Exception + # store request method args + self._args = args + self._kwargs = kwargs - :return: A dictionary representing the first line of the response got - from remote replica. - :rtype: list of str - """ - - def _post_get_doc(received): - """ - Get a sync document from server by means of a POST request. - - :param received: The number of documents already received in the - current sync session. - :type received: int - """ - entries = ['['] - size = 1 - # add remote replica metadata to the request - size += self._prepare( - '', entries, - last_known_generation=last_known_generation, - last_known_trans_id=last_known_trans_id, - sync_id=sync_id, - ensure=ensure_callback is not None) - # inform server of how many documents have already been received - size += self._prepare( - ',', entries, received=received) - entries.append('\r\n]') - size += len(entries[-1]) - # send headers - self._init_post_request(url, 'get', headers, size) - # get document - for entry in entries: - self._conn.send(entry) - return self._response() - - number_of_changes = None - received = 0 + def set_success_callback(self, callback): + self._success_callback = callback - new_generation = last_known_generation - new_transaction_id = last_known_trans_id - while number_of_changes is None or received < number_of_changes: - # bail out if sync process was interrupted - if self.stopped is True: - return last_known_generation, last_known_trans_id - # try to fetch one document from target - data, _ = _post_get_doc(received) - # decode incoming stream - parts = data.splitlines() - if not parts or parts[0] != '[' or parts[-1] != ']': - raise BrokenSyncStream - data = parts[1:-1] - # decode metadata - line, comma = utils.check_and_strip_comma(data[0]) - metadata = None - try: - metadata = json.loads(line) - soledad_assert('number_of_changes' in metadata) - soledad_assert('new_generation' in metadata) - soledad_assert('new_transaction_id' in metadata) - number_of_changes = metadata['number_of_changes'] - new_generation = metadata['new_generation'] - new_transaction_id = metadata['new_transaction_id'] - except json.JSONDecodeError, AssertionError: - raise BrokenSyncStream - # make sure we have replica_uid from fresh new dbs - if ensure_callback and 'replica_uid' in metadata: - ensure_callback(metadata['replica_uid']) - # bail out if there are no documents to be received - if number_of_changes == 0: - break - # decrypt incoming document and insert into local database - entry = None - try: - entry = json.loads(data[1]) - except IndexError: - raise BrokenSyncStream - # ------------------------------------------------------------- - # symmetric decryption of document's contents - # ------------------------------------------------------------- - # if arriving content was symmetrically encrypted, we decrypt - # it. - doc = SoledadDocument( - entry['id'], entry['rev'], entry['content']) - if doc.content and ENC_SCHEME_KEY in doc.content: - if doc.content[ENC_SCHEME_KEY] == \ - EncryptionSchemes.SYMKEY: - doc.set_json(decrypt_doc(self._crypto, doc)) - # ------------------------------------------------------------- - # end of symmetric decryption - # ------------------------------------------------------------- - return_doc_cb(doc, entry['gen'], entry['trans_id']) - received += 1 - signal( - SOLEDAD_SYNC_RECEIVE_STATUS, - "%d/%d" % - (received, number_of_changes)) - return new_generation, new_transaction_id + def set_failure_callback(self, callback): + self._failure_callback = callback + + @property + def success_callback(self): + return self._success_callback + + @property + def failure_callback(self): + return self._failure_callback + + def do_request(self): + """ + Actually perform the request. + + :return: The body and headers of the response. + :rtype: tuple + """ + self._ensure_connection() + args = self._args + kwargs = self._kwargs + return self._request_method(*args, **kwargs) def _request(self, method, url_parts, params=None, body=None, content_type=None): @@ -482,6 +493,14 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): :type body: str :param content-type: The content-type of the request. :type content-type: str + + :return: The body and headers of the response. + :rtype: tuple + + :raise errors.Unavailable: Raised after a number of unsuccesful + request attempts. + :raise Exception: Raised for any other exception ocurring during the + request. """ self._ensure_connection() @@ -566,14 +585,485 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): :type entries: list :param dic: The data to be included in this entry. :type dic: dict + + :return: The size of the prepared entry. + :rtype: int """ entry = comma + '\r\n' + json.dumps(dic) entries.append(entry) return len(entry) - def sync_exchange(self, docs_by_generations, source_replica_uid, - last_known_generation, last_known_trans_id, - return_doc_cb, ensure_callback=None): + def _init_post_request(self, action, content_length): + """ + Initiate a syncing POST request. + + :param url: The syncing URL. + :type url: str + :param action: The syncing action, either 'get' or 'receive'. + :type action: str + :param headers: The initial headers to be sent on this request. + :type headers: dict + :param content_length: The content-length of the request. + :type content_length: int + """ + self._conn.putrequest('POST', self._query_string) + self._conn.putheader( + 'content-type', 'application/x-soledad-sync-%s' % action) + for header_name, header_value in self._headers: + self._conn.putheader(header_name, header_value) + self._conn.putheader('accept-encoding', 'gzip') + self._conn.putheader('content-length', str(content_length)) + self._conn.endheaders() + + def _get_doc(self, received, sync_id, last_known_generation, + last_known_trans_id): + """ + Get a sync document from server by means of a POST request. + + :param received: The number of documents already received in the + current sync session. + :type received: int + :param sync_id: The id for the current sync session. + :type sync_id: str + :param last_known_generation: Target's last known generation. + :type last_known_generation: int + :param last_known_trans_id: Target's last known transaction id. + :type last_known_trans_id: str + + :return: The body and headers of the response. + :rtype: tuple + """ + entries = ['['] + size = 1 + # add remote replica metadata to the request + size += self._prepare( + '', entries, + last_known_generation=last_known_generation, + last_known_trans_id=last_known_trans_id, + sync_id=sync_id, + ensure=self._ensure_callback is not None) + # inform server of how many documents have already been received + size += self._prepare( + ',', entries, received=received) + entries.append('\r\n]') + size += len(entries[-1]) + # send headers + self._init_post_request('get', size) + # get document + for entry in entries: + self._conn.send(entry) + return self._response() + + def _put_doc(self, sync_id, last_known_generation, last_known_trans_id, + id, rev, content, gen, trans_id, number_of_docs): + """ + Put a sync document on server by means of a POST request. + + :param sync_id: The id for the current sync session. + :type sync_id: str + :param last_known_generation: Target's last known generation. + :type last_known_generation: int + :param last_known_trans_id: Target's last known transaction id. + :type last_known_trans_id: str + :param id: The document id. + :type id: str + :param rev: The document revision. + :type rev: str + :param content: The serialized document content. + :type content: str + :param gen: The generation of the modification of the document. + :type gen: int + :param trans_id: The transaction id of the modification of the + document. + :type trans_id: str + :param number_of_docs: The total amount of documents sent on this sync + session. + :type number_of_docs: int + + :return: The body and headers of the response. + :rtype: tuple + """ + # prepare to send the document + entries = ['['] + size = 1 + # add remote replica metadata to the request + size += self._prepare( + '', entries, + last_known_generation=last_known_generation, + last_known_trans_id=last_known_trans_id, + sync_id=sync_id, + ensure=self._ensure_callback is not None) + # add the document to the request + size += self._prepare( + ',', entries, + id=id, rev=rev, content=content, gen=gen, trans_id=trans_id, + number_of_docs=number_of_docs) + entries.append('\r\n]') + size += len(entries[-1]) + # send headers + self._init_post_request('put', size) + # send document + for entry in entries: + self._conn.send(entry) + return self._response() + + def _sign_request(self, method, url_query, params): + """ + Return an authorization header to be included in the HTTP request. + + :param method: The HTTP method. + :type method: str + :param url_query: The URL query string. + :type url_query: str + :param params: A list with encoded query parameters. + :type param: list + + :return: The Authorization header. + :rtype: list of tuple + """ + return TokenBasedAuth._sign_request(self, method, url_query, params) + + def set_token_credentials(self, uuid, token): + """ + Store given credentials so we can sign the request later. + + :param uuid: The user's uuid. + :type uuid: str + :param token: The authentication token. + :type token: str + """ + TokenBasedAuth.set_token_credentials(self, uuid, token) + + +class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): + """ + A SyncTarget that encrypts data before sending and decrypts data after + receiving. + + Normally encryption will have been written to the sync database upon + document modification. The sync database is also used to write temporarily + the parsed documents that the remote send us, before being decrypted and + written to the main database. + """ + + # will later keep a reference to the insert-doc callback + # passed to sync_exchange + _insert_doc_cb = defaultdict(lambda: ProxyBase(None)) + + """ + Period of recurrence of the periodic decrypting task, in seconds. + """ + DECRYPT_TASK_PERIOD = 0.5 + + # + # Modified HTTPSyncTarget methods. + # + + def __init__(self, url, source_replica_uid=None, creds=None, crypto=None, + sync_db=None, sync_db_write_lock=None): + """ + Initialize the SoledadSyncTarget. + + :param source_replica_uid: The source replica uid which we use when + deferring decryption. + :type source_replica_uid: str + :param url: The url of the target replica to sync with. + :type url: str + :param creds: Optional dictionary giving credentials. + to authorize the operation with the server. + :type creds: dict + :param crypto: An instance of SoledadCrypto so we can encrypt/decrypt + document contents when syncing. + :type crypto: soledad.crypto.SoledadCrypto + :param sync_db: Optional. handler for the db with the symmetric + encryption of the syncing documents. If + None, encryption will be done in-place, + instead of retreiving it from the dedicated + database. + :type sync_db: Sqlite handler + :param sync_db_write_lock: a write lock for controlling concurrent + access to the sync_db + :type sync_db_write_lock: threading.Lock + """ + HTTPSyncTarget.__init__(self, url, creds) + self._raw_url = url + self._raw_creds = creds + self._crypto = crypto + self._stopped = True + self._stop_lock = threading.Lock() + self._sync_exchange_lock = threading.Lock() + self.source_replica_uid = source_replica_uid + self._defer_decryption = False + + self._sync_db = None + self._sync_decr_pool = None + self._sync_watcher = None + + if sync_db and sync_db_write_lock is not None: + self._sync_db = sync_db + self._decryption_callback = None + self._sync_db_write_lock = sync_db_write_lock + + # initialize syncing queue decryption pool + self._sync_decr_pool = SyncDecrypterPool( + self._crypto, self._sync_db, + self._sync_db_write_lock, + insert_doc_cb=self._insert_doc_cb) + self._sync_decr_pool.set_source_replica_uid( + self.source_replica_uid) + self._sync_watcher = TimerTask( + self._decrypt_syncing_received_docs, + delay=self.DECRYPT_TASK_PERIOD) + + def _get_replica_uid(self, url): + """ + Return replica uid from the url, or None. + + :param url: the replica url + :type url: str + """ + replica_uid_match = re.findall("user-([0-9a-fA-F]+)", url) + return replica_uid_match[0] if len(replica_uid_match) > 0 else None + + def close(self): + """ + Cleanly close pool of workers. + """ + if self._sync_watcher is not None: + self._sync_watcher.stop() + self._sync_watcher.shutdown() + if self._sync_decr_pool is not None: + self._sync_decr_pool.close() + + @staticmethod + def connect(url, source_replica_uid=None, crypto=None): + return SoledadSyncTarget( + url, source_replica_uid=source_replica_uid, crypto=crypto) + + def _parse_received_doc_response(self, response): + """ + Parse the response from the server containing the received document. + + :param response: The body and headers of the response. + :type response: tuple(str, dict) + """ + data, _ = response + # decode incoming stream + parts = data.splitlines() + if not parts or parts[0] != '[' or parts[-1] != ']': + raise errors.BrokenSyncStream + data = parts[1:-1] + # decode metadata + line, comma = utils.check_and_strip_comma(data[0]) + metadata = None + try: + metadata = json.loads(line) + new_generation = metadata['new_generation'] + new_transaction_id = metadata['new_transaction_id'] + number_of_changes = metadata['number_of_changes'] + except json.JSONDecodeError, KeyError: + raise errors.BrokenSyncStream + # make sure we have replica_uid from fresh new dbs + if self._ensure_callback and 'replica_uid' in metadata: + self._ensure_callback(metadata['replica_uid']) + # parse incoming document info + doc_id = None + rev = None + content = None + gen = None + trans_id = None + if number_of_changes > 0: + try: + entry = json.loads(data[1]) + doc_id = entry['id'] + rev = entry['rev'] + content = entry['content'] + gen = entry['gen'] + trans_id = entry['trans_id'] + except IndexError, KeyError: + raise errors.BrokenSyncStream + return new_generation, new_transaction_id, number_of_changes, \ + doc_id, rev, content, gen, trans_id + + def _insert_received_doc(self, idx, total, response): + """ + Insert a received document into the local replica. + + :param idx: The index count of the current operation. + :type idx: int + :param total: The total number of operations. + :type total: int + :param response: The body and headers of the response. + :type response: tuple(str, dict) + """ + new_generation, new_transaction_id, number_of_changes, doc_id, \ + rev, content, gen, trans_id = \ + self._parse_received_doc_response(response) + if doc_id is not None: + # decrypt incoming document and insert into local database + # ------------------------------------------------------------- + # symmetric decryption of document's contents + # ------------------------------------------------------------- + # If arriving content was symmetrically encrypted, we decrypt it. + # We do it inline if defer_decryption flag is False or no sync_db + # was defined, otherwise we defer it writing it to the received + # docs table. + doc = SoledadDocument(doc_id, rev, content) + if is_symmetrically_encrypted(doc): + if self._queue_for_decrypt: + self._save_encrypted_received_doc( + doc, gen, trans_id, idx, total) + else: + # defer_decryption is False or no-sync-db fallback + doc.set_json(decrypt_doc(self._crypto, doc)) + self._return_doc_cb(doc, gen, trans_id) + else: + # not symmetrically encrypted doc, insert it directly + # or save it in the decrypted stage. + if self._queue_for_decrypt: + self._save_received_doc(doc, gen, trans_id, idx, total) + else: + self._return_doc_cb(doc, gen, trans_id) + # ------------------------------------------------------------- + # end of symmetric decryption + # ------------------------------------------------------------- + msg = "%d/%d" % (idx + 1, total) + signal(SOLEDAD_SYNC_RECEIVE_STATUS, msg) + logger.debug("Soledad sync receive status: %s" % msg) + return number_of_changes, new_generation, new_transaction_id + + def _get_remote_docs(self, url, last_known_generation, last_known_trans_id, + headers, return_doc_cb, ensure_callback, sync_id, + syncer_pool, defer_decryption=False): + """ + Fetch sync documents from the remote database and insert them in the + local database. + + If an incoming document's encryption scheme is equal to + EncryptionSchemes.SYMKEY, then this method will decrypt it with + Soledad's symmetric key. + + :param url: The syncing URL. + :type url: str + :param last_known_generation: Target's last known generation. + :type last_known_generation: int + :param last_known_trans_id: Target's last known transaction id. + :type last_known_trans_id: str + :param headers: The headers of the HTTP request. + :type headers: dict + :param return_doc_cb: A callback to insert docs from target. + :type return_doc_cb: callable + :param ensure_callback: A callback to ensure we have the correct + target_replica_uid, if it was just created. + :type ensure_callback: callable + :param sync_id: The id for the current sync session. + :type sync_id: str + :param defer_decryption: Whether to defer the decryption process using + the intermediate database. If False, + decryption will be done inline. + :type defer_decryption: bool + + :raise BrokenSyncStream: If `data` is malformed. + + :return: A dictionary representing the first line of the response got + from remote replica. + :rtype: dict + """ + # we keep a reference to the callback in case we defer the decryption + self._return_doc_cb = return_doc_cb + self._queue_for_decrypt = defer_decryption \ + and self._sync_db is not None + + new_generation = last_known_generation + new_transaction_id = last_known_trans_id + + if self._queue_for_decrypt: + logger.debug( + "Soledad sync: will queue received docs for decrypting.") + + idx = 0 + number_of_changes = 1 + + first_request = True + last_callback_lock = None + threads = [] + + # get incoming documents + while idx < number_of_changes: + # bail out if sync process was interrupted + if self.stopped is True: + break + + # bail out if any thread failed + if syncer_pool.failures is True: + #syncer_pool.cancel_threads() + self.stop() + break + + # launch a thread to fetch one document from target + t = syncer_pool.new_syncer_thread( + idx, number_of_changes, + last_callback_lock=last_callback_lock) + + # bail out if any thread failed + if t is None: + self.stop() + break + + t.doc_syncer.set_request_method( + 'get', idx, sync_id, last_known_generation, + last_known_trans_id) + t.doc_syncer.set_success_callback(self._insert_received_doc) + + def _failure_callback(idx, total, exception): + _failure_msg = "Soledad sync: error while getting document " \ + "%d/%d: %s" \ + % (idx + 1, total, exception) + logger.warning("%s" % _failure_msg) + logger.warning("Soledad sync: failing gracefully, will " + "recover on next sync.") + + t.doc_syncer.set_failure_callback(_failure_callback) + threads.append(t) + t.start() + last_callback_lock = t.callback_lock + idx += 1 + + # if this is the first request, wait to update the number of + # changes + if first_request is True: + t.join() + if t.success: + number_of_changes, _, _ = t.result + first_request = False + + if syncer_pool.failures is True: + self.stop() + + # make sure all threads finished and we have up-to-date info + last_successful_thread = None + while threads: + # check if there are failures + t = threads.pop(0) + t.join() + if t.success: + last_successful_thread = t + + # get information about last successful thread + if last_successful_thread is not None: + body, _ = last_successful_thread.response + parsed_body = json.loads(body) + metadata = parsed_body[0] + new_generation = metadata['new_generation'] + new_transaction_id = metadata['new_transaction_id'] + + return new_generation, new_transaction_id + + def sync_exchange(self, docs_by_generations, + source_replica_uid, last_known_generation, + last_known_trans_id, return_doc_cb, + ensure_callback=None, defer_decryption=True, + sync_id=None): """ Find out which documents the remote database does not know about, encrypt and send them. @@ -586,24 +1076,55 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): the last local generation the remote replica knows about. :type docs_by_generations: list of tuples + :param source_replica_uid: The uid of the source replica. :type source_replica_uid: str + :param last_known_generation: Target's last known generation. :type last_known_generation: int + :param last_known_trans_id: Target's last known transaction id. :type last_known_trans_id: str + :param return_doc_cb: A callback for inserting received documents from - target. + target. If not overriden, this will call u1db + insert_doc_from_target in synchronizer, which + implements the TAKE OTHER semantics. :type return_doc_cb: function + :param ensure_callback: A callback that ensures we know the target - replica uid if the target replica was just created. + replica uid if the target replica was just + created. :type ensure_callback: function + :param defer_decryption: Whether to defer the decryption process using + the intermediate database. If False, + decryption will be done inline. + :type defer_decryption: bool + :return: The new generation and transaction id of the target replica. :rtype: tuple """ + self._ensure_callback = ensure_callback + if defer_decryption: + if self._sync_watcher is None: + logger.warning( + "Soledad syncer: can't defer decryption, falling back to " + "normal syncing mode.") + defer_decryption = False + else: + self._sync_exchange_lock.acquire() + self._defer_decryption = True self.start() - sync_id = str(uuid4()) + if sync_id is None: + sync_id = str(uuid4()) + self.source_replica_uid = source_replica_uid + # let the decrypter pool access the passed callback to insert docs + setProxiedObject(self._insert_doc_cb[source_replica_uid], + return_doc_cb) + + if not self.clear_to_sync(): + raise PendingReceivedDocsSyncError self._ensure_connection() if self._trace_hook: # for tests @@ -611,78 +1132,140 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): url = '%s/sync-from/%s' % (self._url.path, source_replica_uid) headers = self._sign_request('POST', url, {}) - def _post_put_doc(headers, last_known_generation, last_known_trans_id, - id, rev, content, gen, trans_id, sync_id): - """ - Put a sync document on server by means of a POST request. - - :param received: How many documents have already been received in - this sync session. - :type received: int - """ - # prepare to send the document - entries = ['['] - size = 1 - # add remote replica metadata to the request - size += self._prepare( - '', entries, - last_known_generation=last_known_generation, - last_known_trans_id=last_known_trans_id, - sync_id=sync_id, - ensure=ensure_callback is not None) - # add the document to the request - size += self._prepare( - ',', entries, - id=id, rev=rev, content=content, gen=gen, trans_id=trans_id) - entries.append('\r\n]') - size += len(entries[-1]) - # send headers - self._init_post_request(url, 'put', headers, size) - # send document - for entry in entries: - self._conn.send(entry) - data, _ = self._response() - data = json.loads(data) - return data[0]['new_generation'], data[0]['new_transaction_id'] - cur_target_gen = last_known_generation cur_target_trans_id = last_known_trans_id # send docs + msg = "%d/%d" % (0, len(docs_by_generations)) + signal(SOLEDAD_SYNC_SEND_STATUS, msg) + logger.debug("Soledad sync send status: %s" % msg) + + defer_encryption = self._sync_db is not None + syncer_pool = DocumentSyncerPool( + self._raw_url, self._raw_creds, url, headers, ensure_callback) + threads = [] + last_request_lock = None + last_callback_lock = None sent = 0 - signal( - SOLEDAD_SYNC_SEND_STATUS, - "%d/%d" % (0, len(docs_by_generations))) + total = len(docs_by_generations) + + synced = [] + number_of_docs = len(docs_by_generations) + for doc, gen, trans_id in docs_by_generations: # allow for interrupting the sync process if self.stopped is True: break + + # bail out if any thread failed + if syncer_pool.failures is True: + self.stop() + break + # skip non-syncable docs if isinstance(doc, SoledadDocument) and not doc.syncable: continue + # ------------------------------------------------------------- # symmetric encryption of document's contents # ------------------------------------------------------------- doc_json = doc.get_json() if not doc.is_tombstone(): - doc_json = encrypt_doc(self._crypto, doc) + if not defer_encryption: + # fallback case, for tests + doc_json = encrypt_doc(self._crypto, doc) + else: + try: + doc_json = self.get_encrypted_doc_from_db( + doc.doc_id, doc.rev) + except Exception as exc: + logger.error("Error while getting " + "encrypted doc from db") + logger.exception(exc) + continue + if doc_json is None: + # Not marked as tombstone, but we got nothing + # from the sync db. As it is not encrypted yet, we + # force inline encryption. + # TODO: implement a queue to deal with these cases. + doc_json = encrypt_doc(self._crypto, doc) # ------------------------------------------------------------- # end of symmetric encryption # ------------------------------------------------------------- - cur_target_gen, cur_target_trans_id = _post_put_doc( - headers, cur_target_gen, cur_target_trans_id, id=doc.doc_id, - rev=doc.rev, content=doc_json, gen=gen, trans_id=trans_id, - sync_id=sync_id) + t = syncer_pool.new_syncer_thread( + sent + 1, total, last_request_lock=None, + last_callback_lock=last_callback_lock) + + # bail out if any thread failed + if t is None: + self.stop() + break + + # set the request method + t.doc_syncer.set_request_method( + 'put', sync_id, cur_target_gen, cur_target_trans_id, + id=doc.doc_id, rev=doc.rev, content=doc_json, gen=gen, + trans_id=trans_id, number_of_docs=number_of_docs) + # set the success calback + + def _success_callback(idx, total, response): + _success_msg = "Soledad sync send status: %d/%d" \ + % (idx, total) + signal(SOLEDAD_SYNC_SEND_STATUS, _success_msg) + logger.debug(_success_msg) + + t.doc_syncer.set_success_callback(_success_callback) + + # set the failure callback + def _failure_callback(idx, total, exception): + _failure_msg = "Soledad sync: error while sending document " \ + "%d/%d: %s" % (idx, total, exception) + logger.warning("%s" % _failure_msg) + logger.warning("Soledad sync: failing gracefully, will " + "recover on next sync.") + + t.doc_syncer.set_failure_callback(_failure_callback) + + # save thread and append + t.start() + threads.append((t, doc)) + last_request_lock = t.request_lock + last_callback_lock = t.callback_lock sent += 1 - signal( - SOLEDAD_SYNC_SEND_STATUS, - "%d/%d" % (sent, len(docs_by_generations))) + + # make sure all threads finished and we have up-to-date info + while threads: + # check if there are failures + if syncer_pool.failures is True: + #syncer_pool.cancel_threads() + self.stop() + t, doc = threads.pop(0) + t.join() + if t.success: + synced.append((doc.doc_id, doc.rev)) + + if defer_decryption: + self._sync_watcher.start() # get docs from target cur_target_gen, cur_target_trans_id = self._get_remote_docs( url, last_known_generation, last_known_trans_id, headers, - return_doc_cb, ensure_callback, sync_id) + return_doc_cb, ensure_callback, sync_id, syncer_pool, + defer_decryption=defer_decryption) + + # delete documents from the sync database + if defer_encryption: + self.delete_encrypted_docs_from_db(synced) + + # wait for deferred decryption to finish + if defer_decryption: + while self.clear_to_sync() is False: + sleep(self.DECRYPT_TASK_PERIOD) + self._sync_exchange_lock.release() + self._sync_watcher.stop() + + syncer_pool.cleanup() self.stop() return cur_target_gen, cur_target_trans_id @@ -714,3 +1297,164 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): """ with self._stop_lock: return self._stopped is True + + def get_encrypted_doc_from_db(self, doc_id, doc_rev): + """ + Retrieve encrypted document from the database of encrypted docs for + sync. + + :param doc_id: The Document id. + :type doc_id: str + + :param doc_rev: The document revision + :type doc_rev: str + """ + encr = SyncEncrypterPool + c = self._sync_db.cursor() + sql = ("SELECT content FROM %s WHERE doc_id=? and rev=?" % ( + encr.TABLE_NAME,)) + c.execute(sql, (doc_id, doc_rev)) + res = c.fetchall() + if len(res) != 0: + return res[0][0] + + def delete_encrypted_docs_from_db(self, docs_ids): + """ + Delete several encrypted documents from the database of symmetrically + encrypted docs to sync. + + :param docs_ids: an iterable with (doc_id, doc_rev) for all documents + to be deleted. + :type docs_ids: any iterable of tuples of str + """ + if docs_ids: + encr = SyncEncrypterPool + c = self._sync_db.cursor() + for doc_id, doc_rev in docs_ids: + sql = ("DELETE FROM %s WHERE doc_id=? and rev=?" % ( + encr.TABLE_NAME,)) + c.execute(sql, (doc_id, doc_rev)) + self._sync_db.commit() + + def _save_encrypted_received_doc(self, doc, gen, trans_id, idx, total): + """ + Save a symmetrically encrypted incoming document into the received + docs table in the sync db. A decryption task will pick it up + from here in turn. + + :param doc: The document to save. + :type doc: SoledadDocument + :param gen: The generation. + :type gen: str + :param trans_id: Transacion id. + :type gen: str + :param idx: The index count of the current operation. + :type idx: int + :param total: The total number of operations. + :type total: int + """ + logger.debug( + "Enqueueing doc for decryption: %d/%d." + % (idx + 1, total)) + self._sync_decr_pool.insert_encrypted_received_doc( + doc.doc_id, doc.rev, doc.content, gen, trans_id) + + def _save_received_doc(self, doc, gen, trans_id, idx, total): + """ + Save any incoming document into the received docs table in the sync db. + + :param doc: The document to save. + :type doc: SoledadDocument + :param gen: The generation. + :type gen: str + :param trans_id: Transacion id. + :type gen: str + :param idx: The index count of the current operation. + :type idx: int + :param total: The total number of operations. + :type total: int + """ + logger.debug( + "Enqueueing doc, no decryption needed: %d/%d." + % (idx + 1, total)) + self._sync_decr_pool.insert_received_doc( + doc.doc_id, doc.rev, doc.content, gen, trans_id) + + # + # Symmetric decryption of syncing docs + # + + def clear_to_sync(self): + """ + Return True if sync can proceed (ie, the received db table is empty). + :rtype: bool + """ + if self._sync_decr_pool is not None: + return self._sync_decr_pool.count_received_encrypted_docs() == 0 + else: + return True + + def set_decryption_callback(self, cb): + """ + Set callback to be called when the decryption finishes. + + :param cb: The callback to be set. + :type cb: callable + """ + self._decryption_callback = cb + + def has_decryption_callback(self): + """ + Return True if there is a decryption callback set. + :rtype: bool + """ + return self._decryption_callback is not None + + def has_syncdb(self): + """ + Return True if we have an initialized syncdb. + """ + return self._sync_db is not None + + def _decrypt_syncing_received_docs(self): + """ + Decrypt the documents received from remote replica and insert them + into the local one. + + Called periodically from TimerTask self._sync_watcher. + """ + if sameProxiedObjects( + self._insert_doc_cb.get(self.source_replica_uid), + None): + return + + decrypter = self._sync_decr_pool + decrypter.decrypt_received_docs() + done = decrypter.process_decrypted() + + def _sign_request(self, method, url_query, params): + """ + Return an authorization header to be included in the HTTP request. + + :param method: The HTTP method. + :type method: str + :param url_query: The URL query string. + :type url_query: str + :param params: A list with encoded query parameters. + :type param: list + + :return: The Authorization header. + :rtype: list of tuple + """ + return TokenBasedAuth._sign_request(self, method, url_query, params) + + def set_token_credentials(self, uuid, token): + """ + Store given credentials so we can sign the request later. + + :param uuid: The user's uuid. + :type uuid: str + :param token: The authentication token. + :type token: str + """ + TokenBasedAuth.set_token_credentials(self, uuid, token) diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py index b51b32f3..c0adfc70 100644 --- a/common/src/leap/soledad/common/couch.py +++ b/common/src/leap/soledad/common/couch.py @@ -1106,7 +1106,8 @@ class CouchDatabase(CommonBackend): ) def _set_replica_gen_and_trans_id(self, other_replica_uid, - other_generation, other_transaction_id): + other_generation, other_transaction_id, + number_of_docs=None, sync_id=None): """ Set the last-known generation and transaction id for the other database replica. @@ -1122,12 +1123,19 @@ class CouchDatabase(CommonBackend): :param other_transaction_id: The transaction id associated with the generation. :type other_transaction_id: str + :param number_of_docs: The total amount of documents sent on this sync + session. + :type number_of_docs: int + :param sync_id: The id of the current sync session. + :type sync_id: str """ self._do_set_replica_gen_and_trans_id( - other_replica_uid, other_generation, other_transaction_id) + other_replica_uid, other_generation, other_transaction_id, + number_of_docs=number_of_docs, sync_id=sync_id) def _do_set_replica_gen_and_trans_id( - self, other_replica_uid, other_generation, other_transaction_id): + self, other_replica_uid, other_generation, other_transaction_id, + number_of_docs=None, sync_id=None): """ Set the last-known generation and transaction id for the other database replica. @@ -1143,6 +1151,11 @@ class CouchDatabase(CommonBackend): :param other_transaction_id: The transaction id associated with the generation. :type other_transaction_id: str + :param number_of_docs: The total amount of documents sent on this sync + session. + :type number_of_docs: int + :param sync_id: The id of the current sync session. + :type sync_id: str :raise MissingDesignDocError: Raised when tried to access a missing design document. @@ -1163,12 +1176,17 @@ class CouchDatabase(CommonBackend): res = self._database.resource(*ddoc_path) try: with CouchDatabase.update_handler_lock[self._get_replica_uid()]: + body={ + 'other_replica_uid': other_replica_uid, + 'other_generation': other_generation, + 'other_transaction_id': other_transaction_id, + } + if sync_id is not None: + body['sync_id'] = sync_id + if number_of_docs is not None: + body['number_of_docs'] = number_of_docs res.put_json( - body={ - 'other_replica_uid': other_replica_uid, - 'other_generation': other_generation, - 'other_transaction_id': other_transaction_id, - }, + body=body, headers={'content-type': 'application/json'}) except ResourceNotFound as e: raise_missing_design_doc_error(e, ddoc_path) @@ -1306,7 +1324,8 @@ class CouchDatabase(CommonBackend): doc.set_conflicts(cur_doc.get_conflicts()) def _put_doc_if_newer(self, doc, save_conflict, replica_uid, replica_gen, - replica_trans_id=''): + replica_trans_id='', number_of_docs=None, + sync_id=None): """ Insert/update document into the database with a given revision. @@ -1339,6 +1358,11 @@ class CouchDatabase(CommonBackend): :param replica_trans_id: The transaction_id associated with the generation. :type replica_trans_id: str + :param number_of_docs: The total amount of documents sent on this sync + session. + :type number_of_docs: int + :param sync_id: The id of the current sync session. + :type sync_id: str :return: (state, at_gen) - If we don't have doc_id already, or if doc_rev supersedes the existing document revision, then the @@ -1398,7 +1422,8 @@ class CouchDatabase(CommonBackend): self._force_doc_sync_conflict(doc) if replica_uid is not None and replica_gen is not None: self._set_replica_gen_and_trans_id( - replica_uid, replica_gen, replica_trans_id) + replica_uid, replica_gen, replica_trans_id, + number_of_docs=number_of_docs, sync_id=sync_id) # update info old_doc.rev = doc.rev if doc.is_tombstone(): diff --git a/common/src/leap/soledad/common/ddocs/syncs/updates/put.js b/common/src/leap/soledad/common/ddocs/syncs/updates/put.js index 722f695a..d754faaa 100644 --- a/common/src/leap/soledad/common/ddocs/syncs/updates/put.js +++ b/common/src/leap/soledad/common/ddocs/syncs/updates/put.js @@ -1,22 +1,128 @@ +/** + * The u1db_sync_log document stores both the actual sync log and a list of + * pending updates to the log, in case we receive incoming documents out of + * the correct order (i.e. if there are parallel PUTs during the sync + * process). + * + * The structure of the document is the following: + * + * { + * 'syncs': [ + * ['', , ''], + * ... + * ], + * 'pending': { + * 'other_replica_uid': { + * 'sync_id': '', + * 'log': [[, ''], ...] + * }, + * ... + * } + * } + * + * The update function below does the following: + * + * 0. If we do not receive a sync_id, we just update the 'syncs' list with + * the incoming info about the source replica state. + * + * 1. Otherwise, if the incoming sync_id differs from current stored + * sync_id, then we assume that the previous sync session for that source + * replica was interrupted and discard all pending data. + * + * 2. Then we append incoming info as pending data for that source replica + * and current sync_id, and sort the pending data by generation. + * + * 3. Then we go through pending data and find the most recent generation + * that we can use to update the actual sync log. + * + * 4. Finally, we insert the most up to date information into the sync log. + */ function(doc, req){ + + // create the document if it doesn't exist if (!doc) { doc = {} doc['_id'] = 'u1db_sync_log'; doc['syncs'] = []; } - body = JSON.parse(req.body); + + // get and validate incoming info + var body = JSON.parse(req.body); + var other_replica_uid = body['other_replica_uid']; + var other_generation = parseInt(body['other_generation']); + var other_transaction_id = body['other_transaction_id'] + var sync_id = body['sync_id']; + var number_of_docs = body['number_of_docs']; + if (number_of_docs != null) + number_of_docs = parseInt(number_of_docs); + + if (other_replica_uid == null + || other_generation == null + || other_transaction_id == null) + return [null, 'invalid data']; + + // create slot for pending logs + if (doc['pending'] == null) + doc['pending'] = {}; + + // these are the values that will be actually inserted + var current_gen = other_generation; + var current_trans_id = other_transaction_id; + + /*------------ Wait for end of sync session before storing ------------*/ + + // we just try to obtain pending log if we received a sync_id + if (sync_id != null) { + + // create slot for current source and sync_id pending log + if (doc['pending'][other_replica_uid] == null + || doc['pending'][other_replica_uid]['sync_id'] != sync_id) { + doc['pending'][other_replica_uid] = { + 'sync_id': sync_id, + 'log': [], + } + } + + // append incoming data to pending log + doc['pending'][other_replica_uid]['log'].push([ + other_generation, + other_transaction_id + ]) + + // leave the sync log untouched if we still did not receive all docs + if (doc['pending'][other_replica_uid]['log'].length < number_of_docs) + return [doc, 'ok']; + + // otherwise, sort pending log according to generation + doc['pending'][other_replica_uid]['log'].sort(function(a, b) { + return a[0] - b[0]; + }); + + // get most up-to-date information from pending log + pending = doc['pending'][other_replica_uid]['log'].pop() + current_gen = pending[0]; + current_trans_id = pending[1]; + + // and remove all pending data from that replica + delete doc['pending'][other_replica_uid] + } + + /*--------------- Store source replica info on sync log ---------------*/ + // remove outdated info doc['syncs'] = doc['syncs'].filter( function (entry) { - return entry[0] != body['other_replica_uid']; + return entry[0] != other_replica_uid; } ); - // store u1db rev + + // store in log doc['syncs'].push([ - body['other_replica_uid'], - body['other_generation'], - body['other_transaction_id'] + other_replica_uid, + current_gen, + current_trans_id ]); + return [doc, 'ok']; } diff --git a/server/src/leap/soledad/server/sync.py b/server/src/leap/soledad/server/sync.py index c6928aaa..3a1881fc 100644 --- a/server/src/leap/soledad/server/sync.py +++ b/server/src/leap/soledad/server/sync.py @@ -210,6 +210,8 @@ class SyncExchange(sync.SyncExchange): :param last_known_generation: The last target replica generation the source replica knows about. :type last_known_generation: int + :param sync_id: The id of the current sync session. + :type sync_id: str """ self._db = db self.source_replica_uid = source_replica_uid @@ -284,7 +286,8 @@ class SyncExchange(sync.SyncExchange): doc = self._db.get_doc(changed_doc_id, include_deleted=True) return_doc_cb(doc, gen, trans_id) - def insert_doc_from_source(self, doc, source_gen, trans_id): + def insert_doc_from_source(self, doc, source_gen, trans_id, + number_of_docs=None, sync_id=None): """Try to insert synced document from source. Conflicting documents are not inserted but will be sent over @@ -302,10 +305,16 @@ class SyncExchange(sync.SyncExchange): :type source_gen: int :param trans_id: The transaction id of that document change. :type trans_id: str + :param number_of_docs: The total amount of documents sent on this sync + session. + :type number_of_docs: int + :param sync_id: The id of the current sync session. + :type sync_id: str """ state, at_gen = self._db._put_doc_if_newer( doc, save_conflict=False, replica_uid=self.source_replica_uid, - replica_gen=source_gen, replica_trans_id=trans_id) + replica_gen=source_gen, replica_trans_id=trans_id, + number_of_docs=number_of_docs, sync_id=sync_id) if state == 'inserted': self._sync_state.put_seen_id(doc.doc_id, at_gen) elif state == 'converged': @@ -340,6 +349,8 @@ class SyncResource(http_app.SyncResource): :param last_known_trans_id: The last server replica transaction_id the client knows about. :type last_known_trans_id: str + :param sync_id: The id of the current sync session. + :type sync_id: str :param ensure: Whether the server replica should be created if it does not already exist. :type ensure: bool @@ -355,9 +366,10 @@ class SyncResource(http_app.SyncResource): # get a sync exchange object self.sync_exch = self.sync_exchange_class( db, self.source_replica_uid, last_known_generation, sync_id) + self._sync_id = sync_id @http_app.http_method(content_as_args=True) - def post_put(self, id, rev, content, gen, trans_id): + def post_put(self, id, rev, content, gen, trans_id, number_of_docs): """ Put one incoming document into the server replica. @@ -373,9 +385,14 @@ class SyncResource(http_app.SyncResource): :param trans_id: The source replica transaction id corresponding to the revision of the incoming document. :type trans_id: str + :param number_of_docs: The total amount of documents sent on this sync + session. + :type number_of_docs: int """ doc = Document(id, rev, content) - self.sync_exch.insert_doc_from_source(doc, gen, trans_id) + self.sync_exch.insert_doc_from_source( + doc, gen, trans_id, number_of_docs=number_of_docs, + sync_id=self._sync_id) @http_app.http_method(received=int, content_as_args=True) def post_get(self, received): -- cgit v1.2.3