summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordrebs <drebs@leap.se>2014-07-02 12:22:25 -0300
committerdrebs <drebs@leap.se>2014-07-02 12:41:02 -0300
commit222c229d29b9b8e6ed72897b1c96d23f0d8de80e (patch)
tree081527150a859909290d70dccadd3218346de908
parentc3870a4315acf30893679e4cd11c990a4338e47b (diff)
Split sync_exchange into many requests (#5517).
-rw-r--r--client/src/leap/soledad/client/target.py1604
-rw-r--r--common/src/leap/soledad/common/couch.py45
-rw-r--r--common/src/leap/soledad/common/ddocs/syncs/updates/put.js118
-rw-r--r--server/src/leap/soledad/server/sync.py25
4 files changed, 1342 insertions, 450 deletions
diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py
index 968545b6..28edd027 100644
--- a/client/src/leap/soledad/client/target.py
+++ b/client/src/leap/soledad/client/target.py
@@ -14,458 +14,469 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
"""
A U1DB backend for encrypting data before sending to server and decrypting
after receiving.
"""
-import binascii
+
+
import cStringIO
import gzip
-import hashlib
-import hmac
import logging
+import re
import urllib
import threading
+import urlparse
-import simplejson as json
+from collections import defaultdict
from time import sleep
from uuid import uuid4
+from contextlib import contextmanager
-from u1db.remote import utils, http_errors
-from u1db.errors import BrokenSyncStream
+import simplejson as json
+from taskthread import TimerTask
from u1db import errors
+from u1db.remote import utils, http_errors
from u1db.remote.http_target import HTTPSyncTarget
-from u1db.remote.http_client import _encode_query_parameter
-
+from u1db.remote.http_client import _encode_query_parameter, HTTPClientBase
+from zope.proxy import ProxyBase
+from zope.proxy import sameProxiedObjects, setProxiedObject
from leap.soledad.common import soledad_assert
-from leap.soledad.common.crypto import (
- EncryptionSchemes,
- UnknownEncryptionScheme,
- MacMethods,
- UnknownMacMethod,
- WrongMac,
- ENC_JSON_KEY,
- ENC_SCHEME_KEY,
- ENC_METHOD_KEY,
- ENC_IV_KEY,
- MAC_KEY,
- MAC_METHOD_KEY,
-)
from leap.soledad.common.document import SoledadDocument
from leap.soledad.client.auth import TokenBasedAuth
-from leap.soledad.client.crypto import (
- EncryptionMethods,
- UnknownEncryptionMethod,
-)
-from leap.soledad.client.events import (
- SOLEDAD_SYNC_SEND_STATUS,
- SOLEDAD_SYNC_RECEIVE_STATUS,
- signal,
-)
+from leap.soledad.client.crypto import is_symmetrically_encrypted
+from leap.soledad.client.crypto import encrypt_doc, decrypt_doc
+from leap.soledad.client.crypto import SyncEncrypterPool, SyncDecrypterPool
+from leap.soledad.client.events import SOLEDAD_SYNC_SEND_STATUS
+from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS
+from leap.soledad.client.events import signal
-logger = logging.getLogger(__name__)
-#
-# Exceptions
-#
+logger = logging.getLogger(__name__)
-class DocumentNotEncrypted(Exception):
+def _gunzip(data):
"""
- Raised for failures in document encryption.
+ Uncompress data that is gzipped.
+
+ :param data: gzipped data
+ :type data: basestring
"""
- pass
+ buffer = cStringIO.StringIO()
+ buffer.write(data)
+ buffer.seek(0)
+ try:
+ data = gzip.GzipFile(mode='r', fileobj=buffer).read()
+ except Exception:
+ logger.warning("Error while decrypting gzipped data")
+ buffer.close()
+ return data
-#
-# Crypto utilities for a SoledadDocument.
-#
+class PendingReceivedDocsSyncError(Exception):
+ pass
-def mac_doc(crypto, doc_id, doc_rev, ciphertext, mac_method):
+class DocumentSyncerThread(threading.Thread):
"""
- Calculate a MAC for C{doc} using C{ciphertext}.
-
- Current MAC method used is HMAC, with the following parameters:
-
- * key: sha256(storage_secret, doc_id)
- * msg: doc_id + doc_rev + ciphertext
- * digestmod: sha256
-
- :param crypto: A SoledadCryto instance used to perform the encryption.
- :type crypto: leap.soledad.crypto.SoledadCrypto
- :param doc_id: The id of the document.
- :type doc_id: str
- :param doc_rev: The revision of the document.
- :type doc_rev: str
- :param ciphertext: The content of the document.
- :type ciphertext: str
- :param mac_method: The MAC method to use.
- :type mac_method: str
-
- :return: The calculated MAC.
- :rtype: str
+ A thread that knowns how to either send or receive a document during the
+ sync process.
"""
- if mac_method == MacMethods.HMAC:
- return hmac.new(
- crypto.doc_mac_key(doc_id),
- str(doc_id) + str(doc_rev) + ciphertext,
- hashlib.sha256).digest()
- # raise if we do not know how to handle this MAC method
- raise UnknownMacMethod('Unknown MAC method: %s.' % mac_method)
+ def __init__(self, doc_syncer, release_method, failed_method,
+ idx, total, last_request_lock=None, last_callback_lock=None):
+ """
+ Initialize a new syncer thread.
+
+ :param doc_syncer: A document syncer.
+ :type doc_syncer: HTTPDocumentSyncer
+ :param release_method: A method to be called when finished running.
+ :type release_method: callable(DocumentSyncerThread)
+ :param failed_method: A method to be called when we failed.
+ :type failed_method: callable(DocumentSyncerThread)
+ :param idx: The index count of the current operation.
+ :type idx: int
+ :param total: The total number of operations.
+ :type total: int
+ :param last_request_lock: A lock to wait for before actually performing
+ the request.
+ :type last_request_lock: threading.Lock
+ :param last_callback_lock: A lock to wait for before actually running
+ the success callback.
+ :type last_callback_lock: threading.Lock
+ """
+ threading.Thread.__init__(self)
+ self._doc_syncer = doc_syncer
+ self._release_method = release_method
+ self._failed_method = failed_method
+ self._idx = idx
+ self._total = total
+ self._last_request_lock = last_request_lock
+ self._last_callback_lock = last_callback_lock
+ self._response = None
+ self._exception = None
+ self._result = None
+ self._success = False
+ # a lock so we can signal when we're finished
+ self._request_lock = threading.Lock()
+ self._request_lock.acquire()
+ self._callback_lock = threading.Lock()
+ self._callback_lock.acquire()
+ # make thread interruptable
+ self._stopped = None
+ self._stop_lock = threading.Lock()
-def encrypt_doc(crypto, doc):
- """
- Encrypt C{doc}'s content.
-
- Encrypt doc's contents using AES-256 CTR mode and return a valid JSON
- string representing the following:
-
- {
- ENC_JSON_KEY: '<encrypted doc JSON string>',
- ENC_SCHEME_KEY: 'symkey',
- ENC_METHOD_KEY: EncryptionMethods.AES_256_CTR,
- ENC_IV_KEY: '<the initial value used to encrypt>',
- MAC_KEY: '<mac>'
- MAC_METHOD_KEY: 'hmac'
- }
-
- :param crypto: A SoledadCryto instance used to perform the encryption.
- :type crypto: leap.soledad.crypto.SoledadCrypto
- :param doc: The document with contents to be encrypted.
- :type doc: SoledadDocument
-
- :return: The JSON serialization of the dict representing the encrypted
- content.
- :rtype: str
- """
- soledad_assert(doc.is_tombstone() is False)
- # encrypt content using AES-256 CTR mode
- iv, ciphertext = crypto.encrypt_sym(
- str(doc.get_json()), # encryption/decryption routines expect str
- crypto.doc_passphrase(doc.doc_id),
- method=EncryptionMethods.AES_256_CTR)
- # Return a representation for the encrypted content. In the following, we
- # convert binary data to hexadecimal representation so the JSON
- # serialization does not complain about what it tries to serialize.
- hex_ciphertext = binascii.b2a_hex(ciphertext)
- return json.dumps({
- ENC_JSON_KEY: hex_ciphertext,
- ENC_SCHEME_KEY: EncryptionSchemes.SYMKEY,
- ENC_METHOD_KEY: EncryptionMethods.AES_256_CTR,
- ENC_IV_KEY: iv,
- # store the mac as hex.
- MAC_KEY: binascii.b2a_hex(
- mac_doc(
- crypto, doc.doc_id, doc.rev,
- ciphertext,
- MacMethods.HMAC)),
- MAC_METHOD_KEY: MacMethods.HMAC,
- })
-
-
-def decrypt_doc(crypto, doc):
- """
- Decrypt C{doc}'s content.
+ def run(self):
+ """
+ Run the HTTP request and store results.
- Return the JSON string representation of the document's decrypted content.
+ This method will block and wait for an eventual previous operation to
+ finish before actually performing the request. It also traps any
+ exception and register any failure with the request.
+ """
+ with self._stop_lock:
+ if self._stopped is None:
+ self._stopped = False
+ else:
+ return
+
+ # eventually wait for the previous thread to finish
+ if self._last_request_lock is not None:
+ self._last_request_lock.acquire()
+
+ # bail out in case we've been interrupted
+ if self.stopped is True:
+ return
+
+ try:
+ self._response = self._doc_syncer.do_request()
+ self._request_lock.release()
+
+ # run success callback
+ if self._doc_syncer.success_callback is not None:
+
+ # eventually wait for callback lock release
+ if self._last_callback_lock is not None:
+ self._last_callback_lock.acquire()
+
+ # bail out in case we've been interrupted
+ if self._stopped is True:
+ return
+
+ self._result = self._doc_syncer.success_callback(
+ self._idx, self._total, self._response)
+ self._success = True
+ doc_syncer = self._doc_syncer
+ self._release_method(self, doc_syncer)
+ self._doc_syncer = None
+ # let next thread executed its callback
+ self._callback_lock.release()
+
+ # trap any exception and signal failure
+ except Exception as e:
+ self._exception = e
+ self._success = False
+ # run failure callback
+ if self._doc_syncer.failure_callback is not None:
+
+ # eventually wait for callback lock release
+ if self._last_callback_lock is not None:
+ self._last_callback_lock.acquire()
+
+ # bail out in case we've been interrupted
+ if self.stopped is True:
+ return
+
+ self._doc_syncer.failure_callback(
+ self._idx, self._total, self._exception)
+
+ self._failed_method(self)
+ # we do not release the callback lock here because we
+ # failed and so we don't want other threads to succeed.
- The content of the document should have the following structure:
+ @property
+ def doc_syncer(self):
+ return self._doc_syncer
- {
- ENC_JSON_KEY: '<enc_blob>',
- ENC_SCHEME_KEY: '<enc_scheme>',
- ENC_METHOD_KEY: '<enc_method>',
- ENC_IV_KEY: '<initial value used to encrypt>', # (optional)
- MAC_KEY: '<mac>'
- MAC_METHOD_KEY: 'hmac'
- }
+ @property
+ def response(self):
+ return self._response
- C{enc_blob} is the encryption of the JSON serialization of the document's
- content. For now Soledad just deals with documents whose C{enc_scheme} is
- EncryptionSchemes.SYMKEY and C{enc_method} is
- EncryptionMethods.AES_256_CTR.
+ @property
+ def exception(self):
+ return self._exception
- :param crypto: A SoledadCryto instance to perform the encryption.
- :type crypto: leap.soledad.crypto.SoledadCrypto
- :param doc: The document to be decrypted.
- :type doc: SoledadDocument
+ @property
+ def callback_lock(self):
+ return self._callback_lock
- :return: The JSON serialization of the decrypted content.
- :rtype: str
- """
- soledad_assert(doc.is_tombstone() is False)
- soledad_assert(ENC_JSON_KEY in doc.content)
- soledad_assert(ENC_SCHEME_KEY in doc.content)
- soledad_assert(ENC_METHOD_KEY in doc.content)
- soledad_assert(MAC_KEY in doc.content)
- soledad_assert(MAC_METHOD_KEY in doc.content)
- # verify MAC
- ciphertext = binascii.a2b_hex( # content is stored as hex.
- doc.content[ENC_JSON_KEY])
- mac = mac_doc(
- crypto, doc.doc_id, doc.rev,
- ciphertext,
- doc.content[MAC_METHOD_KEY])
- # we compare mac's hashes to avoid possible timing attacks that might
- # exploit python's builtin comparison operator behaviour, which fails
- # immediatelly when non-matching bytes are found.
- doc_mac_hash = hashlib.sha256(
- binascii.a2b_hex( # the mac is stored as hex
- doc.content[MAC_KEY])).digest()
- calculated_mac_hash = hashlib.sha256(mac).digest()
- if doc_mac_hash != calculated_mac_hash:
- raise WrongMac('Could not authenticate document\'s contents.')
- # decrypt doc's content
- enc_scheme = doc.content[ENC_SCHEME_KEY]
- plainjson = None
- if enc_scheme == EncryptionSchemes.SYMKEY:
- enc_method = doc.content[ENC_METHOD_KEY]
- if enc_method == EncryptionMethods.AES_256_CTR:
- soledad_assert(ENC_IV_KEY in doc.content)
- plainjson = crypto.decrypt_sym(
- ciphertext,
- crypto.doc_passphrase(doc.doc_id),
- method=enc_method,
- iv=doc.content[ENC_IV_KEY])
- else:
- raise UnknownEncryptionMethod(enc_method)
- else:
- raise UnknownEncryptionScheme(enc_scheme)
- return plainjson
+ @property
+ def request_lock(self):
+ return self._request_lock
+ @property
+ def success(self):
+ return self._success
-def _gunzip(data):
- """
- Uncompress data that is gzipped.
+ def stop(self):
+ with self._stop_lock:
+ self._stopped = True
- :param data: gzipped data
- :type data: basestring
- """
- buffer = cStringIO.StringIO()
- buffer.write(data)
- buffer.seek(0)
- try:
- data = gzip.GzipFile(mode='r', fileobj=buffer).read()
- except Exception:
- logger.warning("Error while decrypting gzipped data")
- buffer.close()
- return data
+ @property
+ def stopped(self):
+ with self._stop_lock:
+ return self._stopped
+ @property
+ def result(self):
+ return self._result
-#
-# SoledadSyncTarget
-#
-class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
+class DocumentSyncerPool(object):
"""
- A SyncTarget that encrypts data before sending and decrypts data after
- receiving.
+ A pool of reusable document syncers.
"""
- #
- # Token auth methods.
- #
+ POOL_SIZE = 10
+ """
+ The maximum amount of syncer threads running at the same time.
+ """
- def set_token_credentials(self, uuid, token):
+ def __init__(self, raw_url, raw_creds, query_string, headers,
+ ensure_callback):
"""
- Store given credentials so we can sign the request later.
+ Initialize the document syncer pool.
+
+ :param raw_url: The complete raw URL for the HTTP request.
+ :type raw_url: str
+ :param raw_creds: The credentials for the HTTP request.
+ :type raw_creds: dict
+ :param query_string: The query string for the HTTP request.
+ :type query_string: str
+ :param headers: The headers for the HTTP request.
+ :type headers: dict
+ :param ensure_callback: A callback to ensure we have the correct
+ target_replica_uid, if it was just created.
+ :type ensure_callback: callable
- :param uuid: The user's uuid.
- :type uuid: str
- :param token: The authentication token.
- :type token: str
"""
- TokenBasedAuth.set_token_credentials(self, uuid, token)
-
- def _sign_request(self, method, url_query, params):
+ # save syncer params
+ self._raw_url = raw_url
+ self._raw_creds = raw_creds
+ self._query_string = query_string
+ self._headers = headers
+ self._ensure_callback = ensure_callback
+ # pool attributes
+ self._failures = False
+ self._semaphore_pool = threading.BoundedSemaphore(
+ DocumentSyncerPool.POOL_SIZE)
+ self._pool_access_lock = threading.Lock()
+ self._doc_syncers = []
+ self._threads = []
+
+ def new_syncer_thread(self, idx, total, last_request_lock=None,
+ last_callback_lock=None):
"""
- Return an authorization header to be included in the HTTP request.
+ Yield a new document syncer thread.
+
+ :param idx: The index count of the current operation.
+ :type idx: int
+ :param total: The total number of operations.
+ :type total: int
+ :param last_request_lock: A lock to wait for before actually performing
+ the request.
+ :type last_request_lock: threading.Lock
+ :param last_callback_lock: A lock to wait for before actually running
+ the success callback.
+ :type last_callback_lock: threading.Lock
+ """
+ t = None
+ # wait for available threads
+ self._semaphore_pool.acquire()
+ with self._pool_access_lock:
+ if self._failures is True:
+ return None
+ # get a syncer
+ doc_syncer = self._get_syncer()
+ # we rely on DocumentSyncerThread.run() to release the lock using
+ # self.release_syncer so we can launch a new thread.
+ t = DocumentSyncerThread(
+ doc_syncer, self.release_syncer, self.cancel_threads,
+ idx, total,
+ last_request_lock=last_request_lock,
+ last_callback_lock=last_callback_lock)
+ self._threads.append(t)
+ return t
+
+ def _failed(self):
+ with self._pool_access_lock:
+ self._failures = True
- :param method: The HTTP method.
- :type method: str
- :param url_query: The URL query string.
- :type url_query: str
- :param params: A list with encoded query parameters.
- :type param: list
+ @property
+ def failures(self):
+ return self._failures
- :return: The Authorization header.
- :rtype: list of tuple
+ def _get_syncer(self):
"""
- return TokenBasedAuth._sign_request(self, method, url_query, params)
-
- #
- # Modified HTTPSyncTarget methods.
- #
+ Get a document syncer from the pool.
- @staticmethod
- def connect(url, crypto=None):
- return SoledadSyncTarget(url, crypto=crypto)
+ This method will create a new syncer whenever there is no syncer
+ available in the pool.
- def __init__(self, url, creds=None, crypto=None):
+ :return: A syncer.
+ :rtype: HTTPDocumentSyncer
"""
- Initialize the SoledadSyncTarget.
-
- :param url: The url of the target replica to sync with.
- :type url: str
- :param creds: optional dictionary giving credentials.
- to authorize the operation with the server.
- :type creds: dict
- :param soledad: An instance of Soledad so we can encrypt/decrypt
- document contents when syncing.
- :type soledad: soledad.Soledad
+ syncer = None
+ # get an available syncer or create a new one
+ try:
+ syncer = self._doc_syncers.pop()
+ except IndexError:
+ syncer = HTTPDocumentSyncer(
+ self._raw_url, self._raw_creds, self._query_string,
+ self._headers, self._ensure_callback)
+ return syncer
+
+ def release_syncer(self, syncer_thread, doc_syncer):
"""
- HTTPSyncTarget.__init__(self, url, creds)
- self._crypto = crypto
- self._stopped = True
- self._stop_lock = threading.Lock()
+ Return a syncer to the pool after use and check for any failures.
- def _init_post_request(self, url, action, headers, content_length):
+ :param syncer: The syncer to be returned to the pool.
+ :type syncer: HTTPDocumentSyncer
"""
- Initiate a syncing POST request.
+ with self._pool_access_lock:
+ self._doc_syncers.append(doc_syncer)
+ if syncer_thread.success is True:
+ self._threads.remove(syncer_thread)
+ self._semaphore_pool.release()
- :param url: The syncing URL.
- :type url: str
- :param action: The syncing action, either 'get' or 'receive'.
- :type action: str
- :param headers: The initial headers to be sent on this request.
- :type headers: dict
- :param content_length: The content-length of the request.
- :type content_length: int
+ def cancel_threads(self, calling_thread):
"""
- self._conn.putrequest('POST', url)
- self._conn.putheader(
- 'content-type', 'application/x-soledad-sync-%s' % action)
- for header_name, header_value in headers:
- self._conn.putheader(header_name, header_value)
- self._conn.putheader('accept-encoding', 'gzip')
- self._conn.putheader('content-length', str(content_length))
- self._conn.endheaders()
-
- def _get_remote_docs(self, url, last_known_generation, last_known_trans_id,
- headers, return_doc_cb, ensure_callback, sync_id):
+ Stop all threads in the pool.
"""
- Fetch sync documents from the remote database and insert them in the
- local database.
+ stopped = []
+ # stop all threads
+ logger.warning("Soledad sync: cancelling sync threads.")
+ with self._pool_access_lock:
+ self._failures = True
+ while self._threads:
+ t = self._threads.pop(0)
+ t.stop()
+ self._doc_syncers.append(t.doc_syncer)
+ stopped.append(t)
+ # release locks and join
+ while stopped:
+ t = stopped.pop(0)
+ t.request_lock.acquire(False) # just in case
+ t.request_lock.release()
+ t.callback_lock.acquire(False) # just in case
+ t.callback_lock.release()
+ if t is not calling_thread and t.is_alive():
+ t.join()
+
+ def cleanup(self):
+ """
+ Close and remove any syncers from the pool.
+ """
+ with self._pool_access_lock:
+ while self._doc_syncers:
+ syncer = self._doc_syncers.pop()
+ syncer.close()
+ del syncer
- If an incoming document's encryption scheme is equal to
- EncryptionSchemes.SYMKEY, then this method will decrypt it with
- Soledad's symmetric key.
- :param url: The syncing URL.
- :type url: str
- :param last_known_generation: Target's last known generation.
- :type last_known_generation: int
- :param last_known_trans_id: Target's last known transaction id.
- :type last_known_trans_id: str
- :param headers: The headers of the HTTP request.
+class HTTPDocumentSyncer(HTTPClientBase, TokenBasedAuth):
+
+ def __init__(self, raw_url, creds, query_string, headers, ensure_callback):
+ """
+ Initialize the client.
+
+ :param raw_url: The raw URL of the target HTTP server.
+ :type raw_url: str
+ :param creds: Authentication credentials.
+ :type creds: dict
+ :param query_string: The query string for the HTTP request.
+ :type query_string: str
+ :param headers: The headers for the HTTP request.
:type headers: dict
- :param return_doc_cb: A callback to insert docs from target.
- :type return_doc_cb: callable
:param ensure_callback: A callback to ensure we have the correct
target_replica_uid, if it was just created.
:type ensure_callback: callable
+ """
+ HTTPClientBase.__init__(self, raw_url, creds=creds)
+ # info needed to perform the request
+ self._query_string = query_string
+ self._headers = headers
+ self._ensure_callback = ensure_callback
+ # the actual request method
+ self._request_method = None
+ self._success_callback = None
+ self._failure_callback = None
+ # storage of info returned by the request
+ self._success = None
+ self._exception = None
+
+ def _reset(self):
+ """
+ Reset this document syncer so we can reuse it.
+ """
+ self._request_method = None
+ self._success_callback = None
+ self._failure_callback = None
+ self._request_method = None
+ self._success = None
+ self._exception = None
+
+ def set_request_method(self, method, *args, **kwargs):
+ """
+ Set the actual method to perform the request.
- :raise BrokenSyncStream: If C{data} is malformed.
+ :param method: Either 'get' or 'put'.
+ :type method: str
+ :param args: Arguments for the request method.
+ :type args: list
+ :param kwargs: Keyworded arguments for the request method.
+ :type kwargs: dict
+ """
+ self._reset()
+ # resolve request method
+ if method is 'get':
+ self._request_method = self._get_doc
+ elif method is 'put':
+ self._request_method = self._put_doc
+ else:
+ raise Exception
+ # store request method args
+ self._args = args
+ self._kwargs = kwargs
- :return: A dictionary representing the first line of the response got
- from remote replica.
- :rtype: list of str
- """
-
- def _post_get_doc(received):
- """
- Get a sync document from server by means of a POST request.
-
- :param received: The number of documents already received in the
- current sync session.
- :type received: int
- """
- entries = ['[']
- size = 1
- # add remote replica metadata to the request
- size += self._prepare(
- '', entries,
- last_known_generation=last_known_generation,
- last_known_trans_id=last_known_trans_id,
- sync_id=sync_id,
- ensure=ensure_callback is not None)
- # inform server of how many documents have already been received
- size += self._prepare(
- ',', entries, received=received)
- entries.append('\r\n]')
- size += len(entries[-1])
- # send headers
- self._init_post_request(url, 'get', headers, size)
- # get document
- for entry in entries:
- self._conn.send(entry)
- return self._response()
-
- number_of_changes = None
- received = 0
+ def set_success_callback(self, callback):
+ self._success_callback = callback
- new_generation = last_known_generation
- new_transaction_id = last_known_trans_id
- while number_of_changes is None or received < number_of_changes:
- # bail out if sync process was interrupted
- if self.stopped is True:
- return last_known_generation, last_known_trans_id
- # try to fetch one document from target
- data, _ = _post_get_doc(received)
- # decode incoming stream
- parts = data.splitlines()
- if not parts or parts[0] != '[' or parts[-1] != ']':
- raise BrokenSyncStream
- data = parts[1:-1]
- # decode metadata
- line, comma = utils.check_and_strip_comma(data[0])
- metadata = None
- try:
- metadata = json.loads(line)
- soledad_assert('number_of_changes' in metadata)
- soledad_assert('new_generation' in metadata)
- soledad_assert('new_transaction_id' in metadata)
- number_of_changes = metadata['number_of_changes']
- new_generation = metadata['new_generation']
- new_transaction_id = metadata['new_transaction_id']
- except json.JSONDecodeError, AssertionError:
- raise BrokenSyncStream
- # make sure we have replica_uid from fresh new dbs
- if ensure_callback and 'replica_uid' in metadata:
- ensure_callback(metadata['replica_uid'])
- # bail out if there are no documents to be received
- if number_of_changes == 0:
- break
- # decrypt incoming document and insert into local database
- entry = None
- try:
- entry = json.loads(data[1])
- except IndexError:
- raise BrokenSyncStream
- # -------------------------------------------------------------
- # symmetric decryption of document's contents
- # -------------------------------------------------------------
- # if arriving content was symmetrically encrypted, we decrypt
- # it.
- doc = SoledadDocument(
- entry['id'], entry['rev'], entry['content'])
- if doc.content and ENC_SCHEME_KEY in doc.content:
- if doc.content[ENC_SCHEME_KEY] == \
- EncryptionSchemes.SYMKEY:
- doc.set_json(decrypt_doc(self._crypto, doc))
- # -------------------------------------------------------------
- # end of symmetric decryption
- # -------------------------------------------------------------
- return_doc_cb(doc, entry['gen'], entry['trans_id'])
- received += 1
- signal(
- SOLEDAD_SYNC_RECEIVE_STATUS,
- "%d/%d" %
- (received, number_of_changes))
- return new_generation, new_transaction_id
+ def set_failure_callback(self, callback):
+ self._failure_callback = callback
+
+ @property
+ def success_callback(self):
+ return self._success_callback
+
+ @property
+ def failure_callback(self):
+ return self._failure_callback
+
+ def do_request(self):
+ """
+ Actually perform the request.
+
+ :return: The body and headers of the response.
+ :rtype: tuple
+ """
+ self._ensure_connection()
+ args = self._args
+ kwargs = self._kwargs
+ return self._request_method(*args, **kwargs)
def _request(self, method, url_parts, params=None, body=None,
content_type=None):
@@ -482,6 +493,14 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
:type body: str
:param content-type: The content-type of the request.
:type content-type: str
+
+ :return: The body and headers of the response.
+ :rtype: tuple
+
+ :raise errors.Unavailable: Raised after a number of unsuccesful
+ request attempts.
+ :raise Exception: Raised for any other exception ocurring during the
+ request.
"""
self._ensure_connection()
@@ -566,14 +585,485 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
:type entries: list
:param dic: The data to be included in this entry.
:type dic: dict
+
+ :return: The size of the prepared entry.
+ :rtype: int
"""
entry = comma + '\r\n' + json.dumps(dic)
entries.append(entry)
return len(entry)
- def sync_exchange(self, docs_by_generations, source_replica_uid,
- last_known_generation, last_known_trans_id,
- return_doc_cb, ensure_callback=None):
+ def _init_post_request(self, action, content_length):
+ """
+ Initiate a syncing POST request.
+
+ :param url: The syncing URL.
+ :type url: str
+ :param action: The syncing action, either 'get' or 'receive'.
+ :type action: str
+ :param headers: The initial headers to be sent on this request.
+ :type headers: dict
+ :param content_length: The content-length of the request.
+ :type content_length: int
+ """
+ self._conn.putrequest('POST', self._query_string)
+ self._conn.putheader(
+ 'content-type', 'application/x-soledad-sync-%s' % action)
+ for header_name, header_value in self._headers:
+ self._conn.putheader(header_name, header_value)
+ self._conn.putheader('accept-encoding', 'gzip')
+ self._conn.putheader('content-length', str(content_length))
+ self._conn.endheaders()
+
+ def _get_doc(self, received, sync_id, last_known_generation,
+ last_known_trans_id):
+ """
+ Get a sync document from server by means of a POST request.
+
+ :param received: The number of documents already received in the
+ current sync session.
+ :type received: int
+ :param sync_id: The id for the current sync session.
+ :type sync_id: str
+ :param last_known_generation: Target's last known generation.
+ :type last_known_generation: int
+ :param last_known_trans_id: Target's last known transaction id.
+ :type last_known_trans_id: str
+
+ :return: The body and headers of the response.
+ :rtype: tuple
+ """
+ entries = ['[']
+ size = 1
+ # add remote replica metadata to the request
+ size += self._prepare(
+ '', entries,
+ last_known_generation=last_known_generation,
+ last_known_trans_id=last_known_trans_id,
+ sync_id=sync_id,
+ ensure=self._ensure_callback is not None)
+ # inform server of how many documents have already been received
+ size += self._prepare(
+ ',', entries, received=received)
+ entries.append('\r\n]')
+ size += len(entries[-1])
+ # send headers
+ self._init_post_request('get', size)
+ # get document
+ for entry in entries:
+ self._conn.send(entry)
+ return self._response()
+
+ def _put_doc(self, sync_id, last_known_generation, last_known_trans_id,
+ id, rev, content, gen, trans_id, number_of_docs):
+ """
+ Put a sync document on server by means of a POST request.
+
+ :param sync_id: The id for the current sync session.
+ :type sync_id: str
+ :param last_known_generation: Target's last known generation.
+ :type last_known_generation: int
+ :param last_known_trans_id: Target's last known transaction id.
+ :type last_known_trans_id: str
+ :param id: The document id.
+ :type id: str
+ :param rev: The document revision.
+ :type rev: str
+ :param content: The serialized document content.
+ :type content: str
+ :param gen: The generation of the modification of the document.
+ :type gen: int
+ :param trans_id: The transaction id of the modification of the
+ document.
+ :type trans_id: str
+ :param number_of_docs: The total amount of documents sent on this sync
+ session.
+ :type number_of_docs: int
+
+ :return: The body and headers of the response.
+ :rtype: tuple
+ """
+ # prepare to send the document
+ entries = ['[']
+ size = 1
+ # add remote replica metadata to the request
+ size += self._prepare(
+ '', entries,
+ last_known_generation=last_known_generation,
+ last_known_trans_id=last_known_trans_id,
+ sync_id=sync_id,
+ ensure=self._ensure_callback is not None)
+ # add the document to the request
+ size += self._prepare(
+ ',', entries,
+ id=id, rev=rev, content=content, gen=gen, trans_id=trans_id,
+ number_of_docs=number_of_docs)
+ entries.append('\r\n]')
+ size += len(entries[-1])
+ # send headers
+ self._init_post_request('put', size)
+ # send document
+ for entry in entries:
+ self._conn.send(entry)
+ return self._response()
+
+ def _sign_request(self, method, url_query, params):
+ """
+ Return an authorization header to be included in the HTTP request.
+
+ :param method: The HTTP method.
+ :type method: str
+ :param url_query: The URL query string.
+ :type url_query: str
+ :param params: A list with encoded query parameters.
+ :type param: list
+
+ :return: The Authorization header.
+ :rtype: list of tuple
+ """
+ return TokenBasedAuth._sign_request(self, method, url_query, params)
+
+ def set_token_credentials(self, uuid, token):
+ """
+ Store given credentials so we can sign the request later.
+
+ :param uuid: The user's uuid.
+ :type uuid: str
+ :param token: The authentication token.
+ :type token: str
+ """
+ TokenBasedAuth.set_token_credentials(self, uuid, token)
+
+
+class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
+ """
+ A SyncTarget that encrypts data before sending and decrypts data after
+ receiving.
+
+ Normally encryption will have been written to the sync database upon
+ document modification. The sync database is also used to write temporarily
+ the parsed documents that the remote send us, before being decrypted and
+ written to the main database.
+ """
+
+ # will later keep a reference to the insert-doc callback
+ # passed to sync_exchange
+ _insert_doc_cb = defaultdict(lambda: ProxyBase(None))
+
+ """
+ Period of recurrence of the periodic decrypting task, in seconds.
+ """
+ DECRYPT_TASK_PERIOD = 0.5
+
+ #
+ # Modified HTTPSyncTarget methods.
+ #
+
+ def __init__(self, url, source_replica_uid=None, creds=None, crypto=None,
+ sync_db=None, sync_db_write_lock=None):
+ """
+ Initialize the SoledadSyncTarget.
+
+ :param source_replica_uid: The source replica uid which we use when
+ deferring decryption.
+ :type source_replica_uid: str
+ :param url: The url of the target replica to sync with.
+ :type url: str
+ :param creds: Optional dictionary giving credentials.
+ to authorize the operation with the server.
+ :type creds: dict
+ :param crypto: An instance of SoledadCrypto so we can encrypt/decrypt
+ document contents when syncing.
+ :type crypto: soledad.crypto.SoledadCrypto
+ :param sync_db: Optional. handler for the db with the symmetric
+ encryption of the syncing documents. If
+ None, encryption will be done in-place,
+ instead of retreiving it from the dedicated
+ database.
+ :type sync_db: Sqlite handler
+ :param sync_db_write_lock: a write lock for controlling concurrent
+ access to the sync_db
+ :type sync_db_write_lock: threading.Lock
+ """
+ HTTPSyncTarget.__init__(self, url, creds)
+ self._raw_url = url
+ self._raw_creds = creds
+ self._crypto = crypto
+ self._stopped = True
+ self._stop_lock = threading.Lock()
+ self._sync_exchange_lock = threading.Lock()
+ self.source_replica_uid = source_replica_uid
+ self._defer_decryption = False
+
+ self._sync_db = None
+ self._sync_decr_pool = None
+ self._sync_watcher = None
+
+ if sync_db and sync_db_write_lock is not None:
+ self._sync_db = sync_db
+ self._decryption_callback = None
+ self._sync_db_write_lock = sync_db_write_lock
+
+ # initialize syncing queue decryption pool
+ self._sync_decr_pool = SyncDecrypterPool(
+ self._crypto, self._sync_db,
+ self._sync_db_write_lock,
+ insert_doc_cb=self._insert_doc_cb)
+ self._sync_decr_pool.set_source_replica_uid(
+ self.source_replica_uid)
+ self._sync_watcher = TimerTask(
+ self._decrypt_syncing_received_docs,
+ delay=self.DECRYPT_TASK_PERIOD)
+
+ def _get_replica_uid(self, url):
+ """
+ Return replica uid from the url, or None.
+
+ :param url: the replica url
+ :type url: str
+ """
+ replica_uid_match = re.findall("user-([0-9a-fA-F]+)", url)
+ return replica_uid_match[0] if len(replica_uid_match) > 0 else None
+
+ def close(self):
+ """
+ Cleanly close pool of workers.
+ """
+ if self._sync_watcher is not None:
+ self._sync_watcher.stop()
+ self._sync_watcher.shutdown()
+ if self._sync_decr_pool is not None:
+ self._sync_decr_pool.close()
+
+ @staticmethod
+ def connect(url, source_replica_uid=None, crypto=None):
+ return SoledadSyncTarget(
+ url, source_replica_uid=source_replica_uid, crypto=crypto)
+
+ def _parse_received_doc_response(self, response):
+ """
+ Parse the response from the server containing the received document.
+
+ :param response: The body and headers of the response.
+ :type response: tuple(str, dict)
+ """
+ data, _ = response
+ # decode incoming stream
+ parts = data.splitlines()
+ if not parts or parts[0] != '[' or parts[-1] != ']':
+ raise errors.BrokenSyncStream
+ data = parts[1:-1]
+ # decode metadata
+ line, comma = utils.check_and_strip_comma(data[0])
+ metadata = None
+ try:
+ metadata = json.loads(line)
+ new_generation = metadata['new_generation']
+ new_transaction_id = metadata['new_transaction_id']
+ number_of_changes = metadata['number_of_changes']
+ except json.JSONDecodeError, KeyError:
+ raise errors.BrokenSyncStream
+ # make sure we have replica_uid from fresh new dbs
+ if self._ensure_callback and 'replica_uid' in metadata:
+ self._ensure_callback(metadata['replica_uid'])
+ # parse incoming document info
+ doc_id = None
+ rev = None
+ content = None
+ gen = None
+ trans_id = None
+ if number_of_changes > 0:
+ try:
+ entry = json.loads(data[1])
+ doc_id = entry['id']
+ rev = entry['rev']
+ content = entry['content']
+ gen = entry['gen']
+ trans_id = entry['trans_id']
+ except IndexError, KeyError:
+ raise errors.BrokenSyncStream
+ return new_generation, new_transaction_id, number_of_changes, \
+ doc_id, rev, content, gen, trans_id
+
+ def _insert_received_doc(self, idx, total, response):
+ """
+ Insert a received document into the local replica.
+
+ :param idx: The index count of the current operation.
+ :type idx: int
+ :param total: The total number of operations.
+ :type total: int
+ :param response: The body and headers of the response.
+ :type response: tuple(str, dict)
+ """
+ new_generation, new_transaction_id, number_of_changes, doc_id, \
+ rev, content, gen, trans_id = \
+ self._parse_received_doc_response(response)
+ if doc_id is not None:
+ # decrypt incoming document and insert into local database
+ # -------------------------------------------------------------
+ # symmetric decryption of document's contents
+ # -------------------------------------------------------------
+ # If arriving content was symmetrically encrypted, we decrypt it.
+ # We do it inline if defer_decryption flag is False or no sync_db
+ # was defined, otherwise we defer it writing it to the received
+ # docs table.
+ doc = SoledadDocument(doc_id, rev, content)
+ if is_symmetrically_encrypted(doc):
+ if self._queue_for_decrypt:
+ self._save_encrypted_received_doc(
+ doc, gen, trans_id, idx, total)
+ else:
+ # defer_decryption is False or no-sync-db fallback
+ doc.set_json(decrypt_doc(self._crypto, doc))
+ self._return_doc_cb(doc, gen, trans_id)
+ else:
+ # not symmetrically encrypted doc, insert it directly
+ # or save it in the decrypted stage.
+ if self._queue_for_decrypt:
+ self._save_received_doc(doc, gen, trans_id, idx, total)
+ else:
+ self._return_doc_cb(doc, gen, trans_id)
+ # -------------------------------------------------------------
+ # end of symmetric decryption
+ # -------------------------------------------------------------
+ msg = "%d/%d" % (idx + 1, total)
+ signal(SOLEDAD_SYNC_RECEIVE_STATUS, msg)
+ logger.debug("Soledad sync receive status: %s" % msg)
+ return number_of_changes, new_generation, new_transaction_id
+
+ def _get_remote_docs(self, url, last_known_generation, last_known_trans_id,
+ headers, return_doc_cb, ensure_callback, sync_id,
+ syncer_pool, defer_decryption=False):
+ """
+ Fetch sync documents from the remote database and insert them in the
+ local database.
+
+ If an incoming document's encryption scheme is equal to
+ EncryptionSchemes.SYMKEY, then this method will decrypt it with
+ Soledad's symmetric key.
+
+ :param url: The syncing URL.
+ :type url: str
+ :param last_known_generation: Target's last known generation.
+ :type last_known_generation: int
+ :param last_known_trans_id: Target's last known transaction id.
+ :type last_known_trans_id: str
+ :param headers: The headers of the HTTP request.
+ :type headers: dict
+ :param return_doc_cb: A callback to insert docs from target.
+ :type return_doc_cb: callable
+ :param ensure_callback: A callback to ensure we have the correct
+ target_replica_uid, if it was just created.
+ :type ensure_callback: callable
+ :param sync_id: The id for the current sync session.
+ :type sync_id: str
+ :param defer_decryption: Whether to defer the decryption process using
+ the intermediate database. If False,
+ decryption will be done inline.
+ :type defer_decryption: bool
+
+ :raise BrokenSyncStream: If `data` is malformed.
+
+ :return: A dictionary representing the first line of the response got
+ from remote replica.
+ :rtype: dict
+ """
+ # we keep a reference to the callback in case we defer the decryption
+ self._return_doc_cb = return_doc_cb
+ self._queue_for_decrypt = defer_decryption \
+ and self._sync_db is not None
+
+ new_generation = last_known_generation
+ new_transaction_id = last_known_trans_id
+
+ if self._queue_for_decrypt:
+ logger.debug(
+ "Soledad sync: will queue received docs for decrypting.")
+
+ idx = 0
+ number_of_changes = 1
+
+ first_request = True
+ last_callback_lock = None
+ threads = []
+
+ # get incoming documents
+ while idx < number_of_changes:
+ # bail out if sync process was interrupted
+ if self.stopped is True:
+ break
+
+ # bail out if any thread failed
+ if syncer_pool.failures is True:
+ #syncer_pool.cancel_threads()
+ self.stop()
+ break
+
+ # launch a thread to fetch one document from target
+ t = syncer_pool.new_syncer_thread(
+ idx, number_of_changes,
+ last_callback_lock=last_callback_lock)
+
+ # bail out if any thread failed
+ if t is None:
+ self.stop()
+ break
+
+ t.doc_syncer.set_request_method(
+ 'get', idx, sync_id, last_known_generation,
+ last_known_trans_id)
+ t.doc_syncer.set_success_callback(self._insert_received_doc)
+
+ def _failure_callback(idx, total, exception):
+ _failure_msg = "Soledad sync: error while getting document " \
+ "%d/%d: %s" \
+ % (idx + 1, total, exception)
+ logger.warning("%s" % _failure_msg)
+ logger.warning("Soledad sync: failing gracefully, will "
+ "recover on next sync.")
+
+ t.doc_syncer.set_failure_callback(_failure_callback)
+ threads.append(t)
+ t.start()
+ last_callback_lock = t.callback_lock
+ idx += 1
+
+ # if this is the first request, wait to update the number of
+ # changes
+ if first_request is True:
+ t.join()
+ if t.success:
+ number_of_changes, _, _ = t.result
+ first_request = False
+
+ if syncer_pool.failures is True:
+ self.stop()
+
+ # make sure all threads finished and we have up-to-date info
+ last_successful_thread = None
+ while threads:
+ # check if there are failures
+ t = threads.pop(0)
+ t.join()
+ if t.success:
+ last_successful_thread = t
+
+ # get information about last successful thread
+ if last_successful_thread is not None:
+ body, _ = last_successful_thread.response
+ parsed_body = json.loads(body)
+ metadata = parsed_body[0]
+ new_generation = metadata['new_generation']
+ new_transaction_id = metadata['new_transaction_id']
+
+ return new_generation, new_transaction_id
+
+ def sync_exchange(self, docs_by_generations,
+ source_replica_uid, last_known_generation,
+ last_known_trans_id, return_doc_cb,
+ ensure_callback=None, defer_decryption=True,
+ sync_id=None):
"""
Find out which documents the remote database does not know about,
encrypt and send them.
@@ -586,24 +1076,55 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
the last local generation the remote
replica knows about.
:type docs_by_generations: list of tuples
+
:param source_replica_uid: The uid of the source replica.
:type source_replica_uid: str
+
:param last_known_generation: Target's last known generation.
:type last_known_generation: int
+
:param last_known_trans_id: Target's last known transaction id.
:type last_known_trans_id: str
+
:param return_doc_cb: A callback for inserting received documents from
- target.
+ target. If not overriden, this will call u1db
+ insert_doc_from_target in synchronizer, which
+ implements the TAKE OTHER semantics.
:type return_doc_cb: function
+
:param ensure_callback: A callback that ensures we know the target
- replica uid if the target replica was just created.
+ replica uid if the target replica was just
+ created.
:type ensure_callback: function
+ :param defer_decryption: Whether to defer the decryption process using
+ the intermediate database. If False,
+ decryption will be done inline.
+ :type defer_decryption: bool
+
:return: The new generation and transaction id of the target replica.
:rtype: tuple
"""
+ self._ensure_callback = ensure_callback
+ if defer_decryption:
+ if self._sync_watcher is None:
+ logger.warning(
+ "Soledad syncer: can't defer decryption, falling back to "
+ "normal syncing mode.")
+ defer_decryption = False
+ else:
+ self._sync_exchange_lock.acquire()
+ self._defer_decryption = True
self.start()
- sync_id = str(uuid4())
+ if sync_id is None:
+ sync_id = str(uuid4())
+ self.source_replica_uid = source_replica_uid
+ # let the decrypter pool access the passed callback to insert docs
+ setProxiedObject(self._insert_doc_cb[source_replica_uid],
+ return_doc_cb)
+
+ if not self.clear_to_sync():
+ raise PendingReceivedDocsSyncError
self._ensure_connection()
if self._trace_hook: # for tests
@@ -611,78 +1132,140 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
url = '%s/sync-from/%s' % (self._url.path, source_replica_uid)
headers = self._sign_request('POST', url, {})
- def _post_put_doc(headers, last_known_generation, last_known_trans_id,
- id, rev, content, gen, trans_id, sync_id):
- """
- Put a sync document on server by means of a POST request.
-
- :param received: How many documents have already been received in
- this sync session.
- :type received: int
- """
- # prepare to send the document
- entries = ['[']
- size = 1
- # add remote replica metadata to the request
- size += self._prepare(
- '', entries,
- last_known_generation=last_known_generation,
- last_known_trans_id=last_known_trans_id,
- sync_id=sync_id,
- ensure=ensure_callback is not None)
- # add the document to the request
- size += self._prepare(
- ',', entries,
- id=id, rev=rev, content=content, gen=gen, trans_id=trans_id)
- entries.append('\r\n]')
- size += len(entries[-1])
- # send headers
- self._init_post_request(url, 'put', headers, size)
- # send document
- for entry in entries:
- self._conn.send(entry)
- data, _ = self._response()
- data = json.loads(data)
- return data[0]['new_generation'], data[0]['new_transaction_id']
-
cur_target_gen = last_known_generation
cur_target_trans_id = last_known_trans_id
# send docs
+ msg = "%d/%d" % (0, len(docs_by_generations))
+ signal(SOLEDAD_SYNC_SEND_STATUS, msg)
+ logger.debug("Soledad sync send status: %s" % msg)
+
+ defer_encryption = self._sync_db is not None
+ syncer_pool = DocumentSyncerPool(
+ self._raw_url, self._raw_creds, url, headers, ensure_callback)
+ threads = []
+ last_request_lock = None
+ last_callback_lock = None
sent = 0
- signal(
- SOLEDAD_SYNC_SEND_STATUS,
- "%d/%d" % (0, len(docs_by_generations)))
+ total = len(docs_by_generations)
+
+ synced = []
+ number_of_docs = len(docs_by_generations)
+
for doc, gen, trans_id in docs_by_generations:
# allow for interrupting the sync process
if self.stopped is True:
break
+
+ # bail out if any thread failed
+ if syncer_pool.failures is True:
+ self.stop()
+ break
+
# skip non-syncable docs
if isinstance(doc, SoledadDocument) and not doc.syncable:
continue
+
# -------------------------------------------------------------
# symmetric encryption of document's contents
# -------------------------------------------------------------
doc_json = doc.get_json()
if not doc.is_tombstone():
- doc_json = encrypt_doc(self._crypto, doc)
+ if not defer_encryption:
+ # fallback case, for tests
+ doc_json = encrypt_doc(self._crypto, doc)
+ else:
+ try:
+ doc_json = self.get_encrypted_doc_from_db(
+ doc.doc_id, doc.rev)
+ except Exception as exc:
+ logger.error("Error while getting "
+ "encrypted doc from db")
+ logger.exception(exc)
+ continue
+ if doc_json is None:
+ # Not marked as tombstone, but we got nothing
+ # from the sync db. As it is not encrypted yet, we
+ # force inline encryption.
+ # TODO: implement a queue to deal with these cases.
+ doc_json = encrypt_doc(self._crypto, doc)
# -------------------------------------------------------------
# end of symmetric encryption
# -------------------------------------------------------------
- cur_target_gen, cur_target_trans_id = _post_put_doc(
- headers, cur_target_gen, cur_target_trans_id, id=doc.doc_id,
- rev=doc.rev, content=doc_json, gen=gen, trans_id=trans_id,
- sync_id=sync_id)
+ t = syncer_pool.new_syncer_thread(
+ sent + 1, total, last_request_lock=None,
+ last_callback_lock=last_callback_lock)
+
+ # bail out if any thread failed
+ if t is None:
+ self.stop()
+ break
+
+ # set the request method
+ t.doc_syncer.set_request_method(
+ 'put', sync_id, cur_target_gen, cur_target_trans_id,
+ id=doc.doc_id, rev=doc.rev, content=doc_json, gen=gen,
+ trans_id=trans_id, number_of_docs=number_of_docs)
+ # set the success calback
+
+ def _success_callback(idx, total, response):
+ _success_msg = "Soledad sync send status: %d/%d" \
+ % (idx, total)
+ signal(SOLEDAD_SYNC_SEND_STATUS, _success_msg)
+ logger.debug(_success_msg)
+
+ t.doc_syncer.set_success_callback(_success_callback)
+
+ # set the failure callback
+ def _failure_callback(idx, total, exception):
+ _failure_msg = "Soledad sync: error while sending document " \
+ "%d/%d: %s" % (idx, total, exception)
+ logger.warning("%s" % _failure_msg)
+ logger.warning("Soledad sync: failing gracefully, will "
+ "recover on next sync.")
+
+ t.doc_syncer.set_failure_callback(_failure_callback)
+
+ # save thread and append
+ t.start()
+ threads.append((t, doc))
+ last_request_lock = t.request_lock
+ last_callback_lock = t.callback_lock
sent += 1
- signal(
- SOLEDAD_SYNC_SEND_STATUS,
- "%d/%d" % (sent, len(docs_by_generations)))
+
+ # make sure all threads finished and we have up-to-date info
+ while threads:
+ # check if there are failures
+ if syncer_pool.failures is True:
+ #syncer_pool.cancel_threads()
+ self.stop()
+ t, doc = threads.pop(0)
+ t.join()
+ if t.success:
+ synced.append((doc.doc_id, doc.rev))
+
+ if defer_decryption:
+ self._sync_watcher.start()
# get docs from target
cur_target_gen, cur_target_trans_id = self._get_remote_docs(
url,
last_known_generation, last_known_trans_id, headers,
- return_doc_cb, ensure_callback, sync_id)
+ return_doc_cb, ensure_callback, sync_id, syncer_pool,
+ defer_decryption=defer_decryption)
+
+ # delete documents from the sync database
+ if defer_encryption:
+ self.delete_encrypted_docs_from_db(synced)
+
+ # wait for deferred decryption to finish
+ if defer_decryption:
+ while self.clear_to_sync() is False:
+ sleep(self.DECRYPT_TASK_PERIOD)
+ self._sync_exchange_lock.release()
+ self._sync_watcher.stop()
+
+ syncer_pool.cleanup()
self.stop()
return cur_target_gen, cur_target_trans_id
@@ -714,3 +1297,164 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
"""
with self._stop_lock:
return self._stopped is True
+
+ def get_encrypted_doc_from_db(self, doc_id, doc_rev):
+ """
+ Retrieve encrypted document from the database of encrypted docs for
+ sync.
+
+ :param doc_id: The Document id.
+ :type doc_id: str
+
+ :param doc_rev: The document revision
+ :type doc_rev: str
+ """
+ encr = SyncEncrypterPool
+ c = self._sync_db.cursor()
+ sql = ("SELECT content FROM %s WHERE doc_id=? and rev=?" % (
+ encr.TABLE_NAME,))
+ c.execute(sql, (doc_id, doc_rev))
+ res = c.fetchall()
+ if len(res) != 0:
+ return res[0][0]
+
+ def delete_encrypted_docs_from_db(self, docs_ids):
+ """
+ Delete several encrypted documents from the database of symmetrically
+ encrypted docs to sync.
+
+ :param docs_ids: an iterable with (doc_id, doc_rev) for all documents
+ to be deleted.
+ :type docs_ids: any iterable of tuples of str
+ """
+ if docs_ids:
+ encr = SyncEncrypterPool
+ c = self._sync_db.cursor()
+ for doc_id, doc_rev in docs_ids:
+ sql = ("DELETE FROM %s WHERE doc_id=? and rev=?" % (
+ encr.TABLE_NAME,))
+ c.execute(sql, (doc_id, doc_rev))
+ self._sync_db.commit()
+
+ def _save_encrypted_received_doc(self, doc, gen, trans_id, idx, total):
+ """
+ Save a symmetrically encrypted incoming document into the received
+ docs table in the sync db. A decryption task will pick it up
+ from here in turn.
+
+ :param doc: The document to save.
+ :type doc: SoledadDocument
+ :param gen: The generation.
+ :type gen: str
+ :param trans_id: Transacion id.
+ :type gen: str
+ :param idx: The index count of the current operation.
+ :type idx: int
+ :param total: The total number of operations.
+ :type total: int
+ """
+ logger.debug(
+ "Enqueueing doc for decryption: %d/%d."
+ % (idx + 1, total))
+ self._sync_decr_pool.insert_encrypted_received_doc(
+ doc.doc_id, doc.rev, doc.content, gen, trans_id)
+
+ def _save_received_doc(self, doc, gen, trans_id, idx, total):
+ """
+ Save any incoming document into the received docs table in the sync db.
+
+ :param doc: The document to save.
+ :type doc: SoledadDocument
+ :param gen: The generation.
+ :type gen: str
+ :param trans_id: Transacion id.
+ :type gen: str
+ :param idx: The index count of the current operation.
+ :type idx: int
+ :param total: The total number of operations.
+ :type total: int
+ """
+ logger.debug(
+ "Enqueueing doc, no decryption needed: %d/%d."
+ % (idx + 1, total))
+ self._sync_decr_pool.insert_received_doc(
+ doc.doc_id, doc.rev, doc.content, gen, trans_id)
+
+ #
+ # Symmetric decryption of syncing docs
+ #
+
+ def clear_to_sync(self):
+ """
+ Return True if sync can proceed (ie, the received db table is empty).
+ :rtype: bool
+ """
+ if self._sync_decr_pool is not None:
+ return self._sync_decr_pool.count_received_encrypted_docs() == 0
+ else:
+ return True
+
+ def set_decryption_callback(self, cb):
+ """
+ Set callback to be called when the decryption finishes.
+
+ :param cb: The callback to be set.
+ :type cb: callable
+ """
+ self._decryption_callback = cb
+
+ def has_decryption_callback(self):
+ """
+ Return True if there is a decryption callback set.
+ :rtype: bool
+ """
+ return self._decryption_callback is not None
+
+ def has_syncdb(self):
+ """
+ Return True if we have an initialized syncdb.
+ """
+ return self._sync_db is not None
+
+ def _decrypt_syncing_received_docs(self):
+ """
+ Decrypt the documents received from remote replica and insert them
+ into the local one.
+
+ Called periodically from TimerTask self._sync_watcher.
+ """
+ if sameProxiedObjects(
+ self._insert_doc_cb.get(self.source_replica_uid),
+ None):
+ return
+
+ decrypter = self._sync_decr_pool
+ decrypter.decrypt_received_docs()
+ done = decrypter.process_decrypted()
+
+ def _sign_request(self, method, url_query, params):
+ """
+ Return an authorization header to be included in the HTTP request.
+
+ :param method: The HTTP method.
+ :type method: str
+ :param url_query: The URL query string.
+ :type url_query: str
+ :param params: A list with encoded query parameters.
+ :type param: list
+
+ :return: The Authorization header.
+ :rtype: list of tuple
+ """
+ return TokenBasedAuth._sign_request(self, method, url_query, params)
+
+ def set_token_credentials(self, uuid, token):
+ """
+ Store given credentials so we can sign the request later.
+
+ :param uuid: The user's uuid.
+ :type uuid: str
+ :param token: The authentication token.
+ :type token: str
+ """
+ TokenBasedAuth.set_token_credentials(self, uuid, token)
diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py
index b51b32f3..c0adfc70 100644
--- a/common/src/leap/soledad/common/couch.py
+++ b/common/src/leap/soledad/common/couch.py
@@ -1106,7 +1106,8 @@ class CouchDatabase(CommonBackend):
)
def _set_replica_gen_and_trans_id(self, other_replica_uid,
- other_generation, other_transaction_id):
+ other_generation, other_transaction_id,
+ number_of_docs=None, sync_id=None):
"""
Set the last-known generation and transaction id for the other
database replica.
@@ -1122,12 +1123,19 @@ class CouchDatabase(CommonBackend):
:param other_transaction_id: The transaction id associated with the
generation.
:type other_transaction_id: str
+ :param number_of_docs: The total amount of documents sent on this sync
+ session.
+ :type number_of_docs: int
+ :param sync_id: The id of the current sync session.
+ :type sync_id: str
"""
self._do_set_replica_gen_and_trans_id(
- other_replica_uid, other_generation, other_transaction_id)
+ other_replica_uid, other_generation, other_transaction_id,
+ number_of_docs=number_of_docs, sync_id=sync_id)
def _do_set_replica_gen_and_trans_id(
- self, other_replica_uid, other_generation, other_transaction_id):
+ self, other_replica_uid, other_generation, other_transaction_id,
+ number_of_docs=None, sync_id=None):
"""
Set the last-known generation and transaction id for the other
database replica.
@@ -1143,6 +1151,11 @@ class CouchDatabase(CommonBackend):
:param other_transaction_id: The transaction id associated with the
generation.
:type other_transaction_id: str
+ :param number_of_docs: The total amount of documents sent on this sync
+ session.
+ :type number_of_docs: int
+ :param sync_id: The id of the current sync session.
+ :type sync_id: str
:raise MissingDesignDocError: Raised when tried to access a missing
design document.
@@ -1163,12 +1176,17 @@ class CouchDatabase(CommonBackend):
res = self._database.resource(*ddoc_path)
try:
with CouchDatabase.update_handler_lock[self._get_replica_uid()]:
+ body={
+ 'other_replica_uid': other_replica_uid,
+ 'other_generation': other_generation,
+ 'other_transaction_id': other_transaction_id,
+ }
+ if sync_id is not None:
+ body['sync_id'] = sync_id
+ if number_of_docs is not None:
+ body['number_of_docs'] = number_of_docs
res.put_json(
- body={
- 'other_replica_uid': other_replica_uid,
- 'other_generation': other_generation,
- 'other_transaction_id': other_transaction_id,
- },
+ body=body,
headers={'content-type': 'application/json'})
except ResourceNotFound as e:
raise_missing_design_doc_error(e, ddoc_path)
@@ -1306,7 +1324,8 @@ class CouchDatabase(CommonBackend):
doc.set_conflicts(cur_doc.get_conflicts())
def _put_doc_if_newer(self, doc, save_conflict, replica_uid, replica_gen,
- replica_trans_id=''):
+ replica_trans_id='', number_of_docs=None,
+ sync_id=None):
"""
Insert/update document into the database with a given revision.
@@ -1339,6 +1358,11 @@ class CouchDatabase(CommonBackend):
:param replica_trans_id: The transaction_id associated with the
generation.
:type replica_trans_id: str
+ :param number_of_docs: The total amount of documents sent on this sync
+ session.
+ :type number_of_docs: int
+ :param sync_id: The id of the current sync session.
+ :type sync_id: str
:return: (state, at_gen) - If we don't have doc_id already, or if
doc_rev supersedes the existing document revision, then the
@@ -1398,7 +1422,8 @@ class CouchDatabase(CommonBackend):
self._force_doc_sync_conflict(doc)
if replica_uid is not None and replica_gen is not None:
self._set_replica_gen_and_trans_id(
- replica_uid, replica_gen, replica_trans_id)
+ replica_uid, replica_gen, replica_trans_id,
+ number_of_docs=number_of_docs, sync_id=sync_id)
# update info
old_doc.rev = doc.rev
if doc.is_tombstone():
diff --git a/common/src/leap/soledad/common/ddocs/syncs/updates/put.js b/common/src/leap/soledad/common/ddocs/syncs/updates/put.js
index 722f695a..d754faaa 100644
--- a/common/src/leap/soledad/common/ddocs/syncs/updates/put.js
+++ b/common/src/leap/soledad/common/ddocs/syncs/updates/put.js
@@ -1,22 +1,128 @@
+/**
+ * The u1db_sync_log document stores both the actual sync log and a list of
+ * pending updates to the log, in case we receive incoming documents out of
+ * the correct order (i.e. if there are parallel PUTs during the sync
+ * process).
+ *
+ * The structure of the document is the following:
+ *
+ * {
+ * 'syncs': [
+ * ['<replica_uid>', <gen>, '<trans_id>'],
+ * ...
+ * ],
+ * 'pending': {
+ * 'other_replica_uid': {
+ * 'sync_id': '<sync_id>',
+ * 'log': [[<gen>, '<trans_id>'], ...]
+ * },
+ * ...
+ * }
+ * }
+ *
+ * The update function below does the following:
+ *
+ * 0. If we do not receive a sync_id, we just update the 'syncs' list with
+ * the incoming info about the source replica state.
+ *
+ * 1. Otherwise, if the incoming sync_id differs from current stored
+ * sync_id, then we assume that the previous sync session for that source
+ * replica was interrupted and discard all pending data.
+ *
+ * 2. Then we append incoming info as pending data for that source replica
+ * and current sync_id, and sort the pending data by generation.
+ *
+ * 3. Then we go through pending data and find the most recent generation
+ * that we can use to update the actual sync log.
+ *
+ * 4. Finally, we insert the most up to date information into the sync log.
+ */
function(doc, req){
+
+ // create the document if it doesn't exist
if (!doc) {
doc = {}
doc['_id'] = 'u1db_sync_log';
doc['syncs'] = [];
}
- body = JSON.parse(req.body);
+
+ // get and validate incoming info
+ var body = JSON.parse(req.body);
+ var other_replica_uid = body['other_replica_uid'];
+ var other_generation = parseInt(body['other_generation']);
+ var other_transaction_id = body['other_transaction_id']
+ var sync_id = body['sync_id'];
+ var number_of_docs = body['number_of_docs'];
+ if (number_of_docs != null)
+ number_of_docs = parseInt(number_of_docs);
+
+ if (other_replica_uid == null
+ || other_generation == null
+ || other_transaction_id == null)
+ return [null, 'invalid data'];
+
+ // create slot for pending logs
+ if (doc['pending'] == null)
+ doc['pending'] = {};
+
+ // these are the values that will be actually inserted
+ var current_gen = other_generation;
+ var current_trans_id = other_transaction_id;
+
+ /*------------ Wait for end of sync session before storing ------------*/
+
+ // we just try to obtain pending log if we received a sync_id
+ if (sync_id != null) {
+
+ // create slot for current source and sync_id pending log
+ if (doc['pending'][other_replica_uid] == null
+ || doc['pending'][other_replica_uid]['sync_id'] != sync_id) {
+ doc['pending'][other_replica_uid] = {
+ 'sync_id': sync_id,
+ 'log': [],
+ }
+ }
+
+ // append incoming data to pending log
+ doc['pending'][other_replica_uid]['log'].push([
+ other_generation,
+ other_transaction_id
+ ])
+
+ // leave the sync log untouched if we still did not receive all docs
+ if (doc['pending'][other_replica_uid]['log'].length < number_of_docs)
+ return [doc, 'ok'];
+
+ // otherwise, sort pending log according to generation
+ doc['pending'][other_replica_uid]['log'].sort(function(a, b) {
+ return a[0] - b[0];
+ });
+
+ // get most up-to-date information from pending log
+ pending = doc['pending'][other_replica_uid]['log'].pop()
+ current_gen = pending[0];
+ current_trans_id = pending[1];
+
+ // and remove all pending data from that replica
+ delete doc['pending'][other_replica_uid]
+ }
+
+ /*--------------- Store source replica info on sync log ---------------*/
+
// remove outdated info
doc['syncs'] = doc['syncs'].filter(
function (entry) {
- return entry[0] != body['other_replica_uid'];
+ return entry[0] != other_replica_uid;
}
);
- // store u1db rev
+
+ // store in log
doc['syncs'].push([
- body['other_replica_uid'],
- body['other_generation'],
- body['other_transaction_id']
+ other_replica_uid,
+ current_gen,
+ current_trans_id
]);
+
return [doc, 'ok'];
}
diff --git a/server/src/leap/soledad/server/sync.py b/server/src/leap/soledad/server/sync.py
index c6928aaa..3a1881fc 100644
--- a/server/src/leap/soledad/server/sync.py
+++ b/server/src/leap/soledad/server/sync.py
@@ -210,6 +210,8 @@ class SyncExchange(sync.SyncExchange):
:param last_known_generation: The last target replica generation the
source replica knows about.
:type last_known_generation: int
+ :param sync_id: The id of the current sync session.
+ :type sync_id: str
"""
self._db = db
self.source_replica_uid = source_replica_uid
@@ -284,7 +286,8 @@ class SyncExchange(sync.SyncExchange):
doc = self._db.get_doc(changed_doc_id, include_deleted=True)
return_doc_cb(doc, gen, trans_id)
- def insert_doc_from_source(self, doc, source_gen, trans_id):
+ def insert_doc_from_source(self, doc, source_gen, trans_id,
+ number_of_docs=None, sync_id=None):
"""Try to insert synced document from source.
Conflicting documents are not inserted but will be sent over
@@ -302,10 +305,16 @@ class SyncExchange(sync.SyncExchange):
:type source_gen: int
:param trans_id: The transaction id of that document change.
:type trans_id: str
+ :param number_of_docs: The total amount of documents sent on this sync
+ session.
+ :type number_of_docs: int
+ :param sync_id: The id of the current sync session.
+ :type sync_id: str
"""
state, at_gen = self._db._put_doc_if_newer(
doc, save_conflict=False, replica_uid=self.source_replica_uid,
- replica_gen=source_gen, replica_trans_id=trans_id)
+ replica_gen=source_gen, replica_trans_id=trans_id,
+ number_of_docs=number_of_docs, sync_id=sync_id)
if state == 'inserted':
self._sync_state.put_seen_id(doc.doc_id, at_gen)
elif state == 'converged':
@@ -340,6 +349,8 @@ class SyncResource(http_app.SyncResource):
:param last_known_trans_id: The last server replica transaction_id the
client knows about.
:type last_known_trans_id: str
+ :param sync_id: The id of the current sync session.
+ :type sync_id: str
:param ensure: Whether the server replica should be created if it does
not already exist.
:type ensure: bool
@@ -355,9 +366,10 @@ class SyncResource(http_app.SyncResource):
# get a sync exchange object
self.sync_exch = self.sync_exchange_class(
db, self.source_replica_uid, last_known_generation, sync_id)
+ self._sync_id = sync_id
@http_app.http_method(content_as_args=True)
- def post_put(self, id, rev, content, gen, trans_id):
+ def post_put(self, id, rev, content, gen, trans_id, number_of_docs):
"""
Put one incoming document into the server replica.
@@ -373,9 +385,14 @@ class SyncResource(http_app.SyncResource):
:param trans_id: The source replica transaction id corresponding to
the revision of the incoming document.
:type trans_id: str
+ :param number_of_docs: The total amount of documents sent on this sync
+ session.
+ :type number_of_docs: int
"""
doc = Document(id, rev, content)
- self.sync_exch.insert_doc_from_source(doc, gen, trans_id)
+ self.sync_exch.insert_doc_from_source(
+ doc, gen, trans_id, number_of_docs=number_of_docs,
+ sync_id=self._sync_id)
@http_app.http_method(received=int, content_as_args=True)
def post_get(self, received):