summaryrefslogtreecommitdiff
path: root/client/src/leap/soledad/client/target.py
diff options
context:
space:
mode:
Diffstat (limited to 'client/src/leap/soledad/client/target.py')
-rw-r--r--client/src/leap/soledad/client/target.py1517
1 files changed, 0 insertions, 1517 deletions
diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py
deleted file mode 100644
index 986bd991..00000000
--- a/client/src/leap/soledad/client/target.py
+++ /dev/null
@@ -1,1517 +0,0 @@
-# -*- coding: utf-8 -*-
-# target.py
-# Copyright (C) 2013, 2014 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-"""
-A U1DB backend for encrypting data before sending to server and decrypting
-after receiving.
-"""
-import cStringIO
-import gzip
-import logging
-import re
-import urllib
-import threading
-
-from collections import defaultdict
-from time import sleep
-from uuid import uuid4
-
-import simplejson as json
-
-from u1db import errors
-from u1db.remote import utils, http_errors
-from u1db.remote.http_target import HTTPSyncTarget
-from u1db.remote.http_client import _encode_query_parameter, HTTPClientBase
-from zope.proxy import ProxyBase
-from zope.proxy import sameProxiedObjects, setProxiedObject
-
-from twisted.internet.task import LoopingCall
-
-from leap.soledad.common.document import SoledadDocument
-from leap.soledad.client.auth import TokenBasedAuth
-from leap.soledad.client.crypto import is_symmetrically_encrypted
-from leap.soledad.client.crypto import encrypt_doc, decrypt_doc
-from leap.soledad.client.crypto import SyncEncrypterPool, SyncDecrypterPool
-from leap.soledad.client.events import SOLEDAD_SYNC_SEND_STATUS
-from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS
-from leap.soledad.client.events import signal
-
-
-logger = logging.getLogger(__name__)
-
-
-def _gunzip(data):
- """
- Uncompress data that is gzipped.
-
- :param data: gzipped data
- :type data: basestring
- """
- buffer = cStringIO.StringIO()
- buffer.write(data)
- buffer.seek(0)
- try:
- data = gzip.GzipFile(mode='r', fileobj=buffer).read()
- except Exception:
- logger.warning("Error while decrypting gzipped data")
- buffer.close()
- return data
-
-
-class PendingReceivedDocsSyncError(Exception):
- pass
-
-
-class DocumentSyncerThread(threading.Thread):
- """
- A thread that knowns how to either send or receive a document during the
- sync process.
- """
-
- def __init__(self, doc_syncer, release_method, failed_method,
- idx, total, last_request_lock=None, last_callback_lock=None):
- """
- Initialize a new syncer thread.
-
- :param doc_syncer: A document syncer.
- :type doc_syncer: HTTPDocumentSyncer
- :param release_method: A method to be called when finished running.
- :type release_method: callable(DocumentSyncerThread)
- :param failed_method: A method to be called when we failed.
- :type failed_method: callable(DocumentSyncerThread)
- :param idx: The index count of the current operation.
- :type idx: int
- :param total: The total number of operations.
- :type total: int
- :param last_request_lock: A lock to wait for before actually performing
- the request.
- :type last_request_lock: threading.Lock
- :param last_callback_lock: A lock to wait for before actually running
- the success callback.
- :type last_callback_lock: threading.Lock
- """
- threading.Thread.__init__(self)
- self._doc_syncer = doc_syncer
- self._release_method = release_method
- self._failed_method = failed_method
- self._idx = idx
- self._total = total
- self._last_request_lock = last_request_lock
- self._last_callback_lock = last_callback_lock
- self._response = None
- self._exception = None
- self._result = None
- self._success = False
- # a lock so we can signal when we're finished
- self._request_lock = threading.Lock()
- self._request_lock.acquire()
- self._callback_lock = threading.Lock()
- self._callback_lock.acquire()
- # make thread interruptable
- self._stopped = None
- self._stop_lock = threading.Lock()
-
- def run(self):
- """
- Run the HTTP request and store results.
-
- This method will block and wait for an eventual previous operation to
- finish before actually performing the request. It also traps any
- exception and register any failure with the request.
- """
- with self._stop_lock:
- if self._stopped is None:
- self._stopped = False
- else:
- return
-
- # eventually wait for the previous thread to finish
- if self._last_request_lock is not None:
- self._last_request_lock.acquire()
-
- # bail out in case we've been interrupted
- if self.stopped is True:
- return
-
- try:
- self._response = self._doc_syncer.do_request()
- self._request_lock.release()
-
- # run success callback
- if self._doc_syncer.success_callback is not None:
-
- # eventually wait for callback lock release
- if self._last_callback_lock is not None:
- self._last_callback_lock.acquire()
-
- # bail out in case we've been interrupted
- if self._stopped is True:
- return
-
- self._result = self._doc_syncer.success_callback(
- self._idx, self._total, self._response)
- self._success = True
- doc_syncer = self._doc_syncer
- self._release_method(self, doc_syncer)
- self._doc_syncer = None
- # let next thread executed its callback
- self._callback_lock.release()
-
- # trap any exception and signal failure
- except Exception as e:
- self._exception = e
- self._success = False
- # run failure callback
- if self._doc_syncer.failure_callback is not None:
-
- # eventually wait for callback lock release
- if self._last_callback_lock is not None:
- self._last_callback_lock.acquire()
-
- # bail out in case we've been interrupted
- if self.stopped is True:
- return
-
- self._doc_syncer.failure_callback(
- self._idx, self._total, self._exception)
-
- self._failed_method()
- # we do not release the callback lock here because we
- # failed and so we don't want other threads to succeed.
-
- @property
- def doc_syncer(self):
- return self._doc_syncer
-
- @property
- def response(self):
- return self._response
-
- @property
- def exception(self):
- return self._exception
-
- @property
- def callback_lock(self):
- return self._callback_lock
-
- @property
- def request_lock(self):
- return self._request_lock
-
- @property
- def success(self):
- return self._success
-
- def stop(self):
- with self._stop_lock:
- self._stopped = True
-
- @property
- def stopped(self):
- with self._stop_lock:
- return self._stopped
-
- @property
- def result(self):
- return self._result
-
-
-class DocumentSyncerPool(object):
- """
- A pool of reusable document syncers.
- """
-
- POOL_SIZE = 10
- """
- The maximum amount of syncer threads running at the same time.
- """
-
- def __init__(self, raw_url, raw_creds, query_string, headers,
- ensure_callback, stop_method):
- """
- Initialize the document syncer pool.
-
- :param raw_url: The complete raw URL for the HTTP request.
- :type raw_url: str
- :param raw_creds: The credentials for the HTTP request.
- :type raw_creds: dict
- :param query_string: The query string for the HTTP request.
- :type query_string: str
- :param headers: The headers for the HTTP request.
- :type headers: dict
- :param ensure_callback: A callback to ensure we have the correct
- target_replica_uid, if it was just created.
- :type ensure_callback: callable
-
- """
- # save syncer params
- self._raw_url = raw_url
- self._raw_creds = raw_creds
- self._query_string = query_string
- self._headers = headers
- self._ensure_callback = ensure_callback
- self._stop_method = stop_method
- # pool attributes
- self._failures = False
- self._semaphore_pool = threading.BoundedSemaphore(
- DocumentSyncerPool.POOL_SIZE)
- self._pool_access_lock = threading.Lock()
- self._doc_syncers = []
- self._threads = []
-
- def new_syncer_thread(self, idx, total, last_request_lock=None,
- last_callback_lock=None):
- """
- Yield a new document syncer thread.
-
- :param idx: The index count of the current operation.
- :type idx: int
- :param total: The total number of operations.
- :type total: int
- :param last_request_lock: A lock to wait for before actually performing
- the request.
- :type last_request_lock: threading.Lock
- :param last_callback_lock: A lock to wait for before actually running
- the success callback.
- :type last_callback_lock: threading.Lock
- """
- t = None
- # wait for available threads
- self._semaphore_pool.acquire()
- with self._pool_access_lock:
- if self._failures is True:
- return None
- # get a syncer
- doc_syncer = self._get_syncer()
- # we rely on DocumentSyncerThread.run() to release the lock using
- # self.release_syncer so we can launch a new thread.
- t = DocumentSyncerThread(
- doc_syncer, self.release_syncer, self.cancel_threads,
- idx, total,
- last_request_lock=last_request_lock,
- last_callback_lock=last_callback_lock)
- self._threads.append(t)
- return t
-
- def _failed(self):
- with self._pool_access_lock:
- self._failures = True
-
- @property
- def failures(self):
- return self._failures
-
- def _get_syncer(self):
- """
- Get a document syncer from the pool.
-
- This method will create a new syncer whenever there is no syncer
- available in the pool.
-
- :return: A syncer.
- :rtype: HTTPDocumentSyncer
- """
- syncer = None
- # get an available syncer or create a new one
- try:
- syncer = self._doc_syncers.pop()
- except IndexError:
- syncer = HTTPDocumentSyncer(
- self._raw_url, self._raw_creds, self._query_string,
- self._headers, self._ensure_callback)
- return syncer
-
- def release_syncer(self, syncer_thread, doc_syncer):
- """
- Return a syncer to the pool after use and check for any failures.
-
- :param syncer: The syncer to be returned to the pool.
- :type syncer: HTTPDocumentSyncer
- """
- with self._pool_access_lock:
- self._doc_syncers.append(doc_syncer)
- if syncer_thread.success is True:
- self._threads.remove(syncer_thread)
- self._semaphore_pool.release()
-
- def cancel_threads(self):
- """
- Stop all threads in the pool.
- """
- # stop sync
- self._stop_method()
- stopped = []
- # stop all threads
- logger.warning("Soledad sync: cancelling sync threads...")
- with self._pool_access_lock:
- self._failures = True
- while self._threads:
- t = self._threads.pop(0)
- t.stop()
- self._doc_syncers.append(t.doc_syncer)
- stopped.append(t)
- # release locks and join
- while stopped:
- t = stopped.pop(0)
- t.request_lock.acquire(False) # just in case
- t.request_lock.release()
- t.callback_lock.acquire(False) # just in case
- t.callback_lock.release()
- # release any blocking semaphores
- for i in xrange(DocumentSyncerPool.POOL_SIZE):
- try:
- self._semaphore_pool.release()
- except ValueError:
- break
- logger.warning("Soledad sync: cancelled sync threads.")
-
- def cleanup(self):
- """
- Close and remove any syncers from the pool.
- """
- with self._pool_access_lock:
- while self._doc_syncers:
- syncer = self._doc_syncers.pop()
- syncer.close()
- del syncer
-
-
-class HTTPDocumentSyncer(HTTPClientBase, TokenBasedAuth):
-
- def __init__(self, raw_url, creds, query_string, headers, ensure_callback):
- """
- Initialize the client.
-
- :param raw_url: The raw URL of the target HTTP server.
- :type raw_url: str
- :param creds: Authentication credentials.
- :type creds: dict
- :param query_string: The query string for the HTTP request.
- :type query_string: str
- :param headers: The headers for the HTTP request.
- :type headers: dict
- :param ensure_callback: A callback to ensure we have the correct
- target_replica_uid, if it was just created.
- :type ensure_callback: callable
- """
- HTTPClientBase.__init__(self, raw_url, creds=creds)
- # info needed to perform the request
- self._query_string = query_string
- self._headers = headers
- self._ensure_callback = ensure_callback
- # the actual request method
- self._request_method = None
- self._success_callback = None
- self._failure_callback = None
-
- def _reset(self):
- """
- Reset this document syncer so we can reuse it.
- """
- self._request_method = None
- self._success_callback = None
- self._failure_callback = None
- self._request_method = None
-
- def set_request_method(self, method, *args, **kwargs):
- """
- Set the actual method to perform the request.
-
- :param method: Either 'get' or 'put'.
- :type method: str
- :param args: Arguments for the request method.
- :type args: list
- :param kwargs: Keyworded arguments for the request method.
- :type kwargs: dict
- """
- self._reset()
- # resolve request method
- if method is 'get':
- self._request_method = self._get_doc
- elif method is 'put':
- self._request_method = self._put_doc
- else:
- raise Exception
- # store request method args
- self._args = args
- self._kwargs = kwargs
-
- def set_success_callback(self, callback):
- self._success_callback = callback
-
- def set_failure_callback(self, callback):
- self._failure_callback = callback
-
- @property
- def success_callback(self):
- return self._success_callback
-
- @property
- def failure_callback(self):
- return self._failure_callback
-
- def do_request(self):
- """
- Actually perform the request.
-
- :return: The body and headers of the response.
- :rtype: tuple
- """
- self._ensure_connection()
- args = self._args
- kwargs = self._kwargs
- return self._request_method(*args, **kwargs)
-
- def _request(self, method, url_parts, params=None, body=None,
- content_type=None):
- """
- Perform an HTTP request.
-
- :param method: The HTTP request method.
- :type method: str
- :param url_parts: A list representing the request path.
- :type url_parts: list
- :param params: Parameters for the URL query string.
- :type params: dict
- :param body: The body of the request.
- :type body: str
- :param content-type: The content-type of the request.
- :type content-type: str
-
- :return: The body and headers of the response.
- :rtype: tuple
-
- :raise errors.Unavailable: Raised after a number of unsuccesful
- request attempts.
- :raise Exception: Raised for any other exception ocurring during the
- request.
- """
-
- self._ensure_connection()
- unquoted_url = url_query = self._url.path
- if url_parts:
- if not url_query.endswith('/'):
- url_query += '/'
- unquoted_url = url_query
- url_query += '/'.join(urllib.quote(part, safe='')
- for part in url_parts)
- # oauth performs its own quoting
- unquoted_url += '/'.join(url_parts)
- encoded_params = {}
- if params:
- for key, value in params.items():
- key = unicode(key).encode('utf-8')
- encoded_params[key] = _encode_query_parameter(value)
- url_query += ('?' + urllib.urlencode(encoded_params))
- if body is not None and not isinstance(body, basestring):
- body = json.dumps(body)
- content_type = 'application/json'
- headers = {}
- if content_type:
- headers['content-type'] = content_type
-
- # Patched: We would like to receive gzip pretty please
- # ----------------------------------------------------
- headers['accept-encoding'] = "gzip"
- # ----------------------------------------------------
-
- headers.update(
- self._sign_request(method, unquoted_url, encoded_params))
-
- for delay in self._delays:
- try:
- self._conn.request(method, url_query, body, headers)
- return self._response()
- except errors.Unavailable, e:
- sleep(delay)
- raise e
-
- def _response(self):
- """
- Return the response of the (possibly gzipped) HTTP request.
-
- :return: The body and headers of the response.
- :rtype: tuple
- """
- resp = self._conn.getresponse()
- body = resp.read()
- headers = dict(resp.getheaders())
-
- # Patched: We would like to decode gzip
- # ----------------------------------------------------
- encoding = headers.get('content-encoding', '')
- if "gzip" in encoding:
- body = _gunzip(body)
- # ----------------------------------------------------
-
- if resp.status in (200, 201):
- return body, headers
- elif resp.status in http_errors.ERROR_STATUSES:
- try:
- respdic = json.loads(body)
- except ValueError:
- pass
- else:
- self._error(respdic)
- # special case
- if resp.status == 503:
- raise errors.Unavailable(body, headers)
- raise errors.HTTPError(resp.status, body, headers)
-
- def _prepare(self, comma, entries, **dic):
- """
- Prepare an entry to be sent through a syncing POST request.
-
- :param comma: A string to be prepended to the current entry.
- :type comma: str
- :param entries: A list of entries accumulated to be sent on the
- request.
- :type entries: list
- :param dic: The data to be included in this entry.
- :type dic: dict
-
- :return: The size of the prepared entry.
- :rtype: int
- """
- entry = comma + '\r\n' + json.dumps(dic)
- entries.append(entry)
- return len(entry)
-
- def _init_post_request(self, action, content_length):
- """
- Initiate a syncing POST request.
-
- :param url: The syncing URL.
- :type url: str
- :param action: The syncing action, either 'get' or 'receive'.
- :type action: str
- :param headers: The initial headers to be sent on this request.
- :type headers: dict
- :param content_length: The content-length of the request.
- :type content_length: int
- """
- self._conn.putrequest('POST', self._query_string)
- self._conn.putheader(
- 'content-type', 'application/x-soledad-sync-%s' % action)
- for header_name, header_value in self._headers:
- self._conn.putheader(header_name, header_value)
- self._conn.putheader('accept-encoding', 'gzip')
- self._conn.putheader('content-length', str(content_length))
- self._conn.endheaders()
-
- def _get_doc(self, received, sync_id, last_known_generation,
- last_known_trans_id):
- """
- Get a sync document from server by means of a POST request.
-
- :param received: The number of documents already received in the
- current sync session.
- :type received: int
- :param sync_id: The id for the current sync session.
- :type sync_id: str
- :param last_known_generation: Target's last known generation.
- :type last_known_generation: int
- :param last_known_trans_id: Target's last known transaction id.
- :type last_known_trans_id: str
-
- :return: The body and headers of the response.
- :rtype: tuple
- """
- entries = ['[']
- size = 1
- # add remote replica metadata to the request
- size += self._prepare(
- '', entries,
- last_known_generation=last_known_generation,
- last_known_trans_id=last_known_trans_id,
- sync_id=sync_id,
- ensure=self._ensure_callback is not None)
- # inform server of how many documents have already been received
- size += self._prepare(
- ',', entries, received=received)
- entries.append('\r\n]')
- size += len(entries[-1])
- # send headers
- self._init_post_request('get', size)
- # get document
- for entry in entries:
- self._conn.send(entry)
- return self._response()
-
- def _put_doc(self, sync_id, last_known_generation, last_known_trans_id,
- id, rev, content, gen, trans_id, number_of_docs, doc_idx):
- """
- Put a sync document on server by means of a POST request.
-
- :param sync_id: The id for the current sync session.
- :type sync_id: str
- :param last_known_generation: Target's last known generation.
- :type last_known_generation: int
- :param last_known_trans_id: Target's last known transaction id.
- :type last_known_trans_id: str
- :param id: The document id.
- :type id: str
- :param rev: The document revision.
- :type rev: str
- :param content: The serialized document content.
- :type content: str
- :param gen: The generation of the modification of the document.
- :type gen: int
- :param trans_id: The transaction id of the modification of the
- document.
- :type trans_id: str
- :param number_of_docs: The total amount of documents sent on this sync
- session.
- :type number_of_docs: int
- :param doc_idx: The index of the current document being sent.
- :type doc_idx: int
-
- :return: The body and headers of the response.
- :rtype: tuple
- """
- # prepare to send the document
- entries = ['[']
- size = 1
- # add remote replica metadata to the request
- size += self._prepare(
- '', entries,
- last_known_generation=last_known_generation,
- last_known_trans_id=last_known_trans_id,
- sync_id=sync_id,
- ensure=self._ensure_callback is not None)
- # add the document to the request
- size += self._prepare(
- ',', entries,
- id=id, rev=rev, content=content, gen=gen, trans_id=trans_id,
- number_of_docs=number_of_docs, doc_idx=doc_idx)
- entries.append('\r\n]')
- size += len(entries[-1])
- # send headers
- self._init_post_request('put', size)
- # send document
- for entry in entries:
- self._conn.send(entry)
- return self._response()
-
- def _sign_request(self, method, url_query, params):
- """
- Return an authorization header to be included in the HTTP request.
-
- :param method: The HTTP method.
- :type method: str
- :param url_query: The URL query string.
- :type url_query: str
- :param params: A list with encoded query parameters.
- :type param: list
-
- :return: The Authorization header.
- :rtype: list of tuple
- """
- return TokenBasedAuth._sign_request(self, method, url_query, params)
-
- def set_token_credentials(self, uuid, token):
- """
- Store given credentials so we can sign the request later.
-
- :param uuid: The user's uuid.
- :type uuid: str
- :param token: The authentication token.
- :type token: str
- """
- TokenBasedAuth.set_token_credentials(self, uuid, token)
-
-
-class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
- """
- A SyncTarget that encrypts data before sending and decrypts data after
- receiving.
-
- Normally encryption will have been written to the sync database upon
- document modification. The sync database is also used to write temporarily
- the parsed documents that the remote send us, before being decrypted and
- written to the main database.
- """
-
- # will later keep a reference to the insert-doc callback
- # passed to sync_exchange
- _insert_doc_cb = defaultdict(lambda: ProxyBase(None))
-
- """
- Period of recurrence of the periodic decrypting task, in seconds.
- """
- DECRYPT_LOOP_PERIOD = 0.5
-
- #
- # Modified HTTPSyncTarget methods.
- #
-
- def __init__(self, url, source_replica_uid=None, creds=None, crypto=None,
- sync_db=None, sync_db_write_lock=None):
- """
- Initialize the SoledadSyncTarget.
-
- :param source_replica_uid: The source replica uid which we use when
- deferring decryption.
- :type source_replica_uid: str
- :param url: The url of the target replica to sync with.
- :type url: str
- :param creds: Optional dictionary giving credentials.
- to authorize the operation with the server.
- :type creds: dict
- :param crypto: An instance of SoledadCrypto so we can encrypt/decrypt
- document contents when syncing.
- :type crypto: soledad.crypto.SoledadCrypto
- :param sync_db: Optional. handler for the db with the symmetric
- encryption of the syncing documents. If
- None, encryption will be done in-place,
- instead of retreiving it from the dedicated
- database.
- :type sync_db: Sqlite handler
- :param sync_db_write_lock: a write lock for controlling concurrent
- access to the sync_db
- :type sync_db_write_lock: threading.Lock
- """
- HTTPSyncTarget.__init__(self, url, creds)
- self._raw_url = url
- self._raw_creds = creds
- self._crypto = crypto
- self._stopped = True
- self._stop_lock = threading.Lock()
- self._sync_exchange_lock = threading.Lock()
- self.source_replica_uid = source_replica_uid
- self._defer_decryption = False
- self._syncer_pool = None
-
- # deferred decryption attributes
- self._sync_db = None
- self._sync_db_write_lock = None
- self._decryption_callback = None
- self._sync_decr_pool = None
- self._sync_loop = None
- if sync_db and sync_db_write_lock is not None:
- self._sync_db = sync_db
- self._sync_db_write_lock = sync_db_write_lock
-
- def _setup_sync_decr_pool(self):
- """
- Set up the SyncDecrypterPool for deferred decryption.
- """
- if self._sync_decr_pool is None:
- # initialize syncing queue decryption pool
- self._sync_decr_pool = SyncDecrypterPool(
- self._crypto, self._sync_db,
- self._sync_db_write_lock,
- insert_doc_cb=self._insert_doc_cb)
- self._sync_decr_pool.set_source_replica_uid(
- self.source_replica_uid)
-
- def _teardown_sync_decr_pool(self):
- """
- Tear down the SyncDecrypterPool.
- """
- if self._sync_decr_pool is not None:
- self._sync_decr_pool.close()
- self._sync_decr_pool = None
-
- def _setup_sync_loop(self):
- """
- Set up the sync loop for deferred decryption.
- """
- if self._sync_loop is None:
- self._sync_loop = LoopingCall(
- self._decrypt_syncing_received_docs)
- self._sync_loop.start(self.DECRYPT_LOOP_PERIOD)
-
- def _teardown_sync_loop(self):
- """
- Tear down the sync loop.
- """
- if self._sync_loop is not None:
- self._sync_loop.stop()
- self._sync_loop = None
-
- def _get_replica_uid(self, url):
- """
- Return replica uid from the url, or None.
-
- :param url: the replica url
- :type url: str
- """
- replica_uid_match = re.findall("user-([0-9a-fA-F]+)", url)
- return replica_uid_match[0] if len(replica_uid_match) > 0 else None
-
- @staticmethod
- def connect(url, source_replica_uid=None, crypto=None):
- return SoledadSyncTarget(
- url, source_replica_uid=source_replica_uid, crypto=crypto)
-
- def _parse_received_doc_response(self, response):
- """
- Parse the response from the server containing the received document.
-
- :param response: The body and headers of the response.
- :type response: tuple(str, dict)
- """
- data, _ = response
- # decode incoming stream
- parts = data.splitlines()
- if not parts or parts[0] != '[' or parts[-1] != ']':
- raise errors.BrokenSyncStream
- data = parts[1:-1]
- # decode metadata
- line, comma = utils.check_and_strip_comma(data[0])
- metadata = None
- try:
- metadata = json.loads(line)
- new_generation = metadata['new_generation']
- new_transaction_id = metadata['new_transaction_id']
- number_of_changes = metadata['number_of_changes']
- except (json.JSONDecodeError, KeyError):
- raise errors.BrokenSyncStream
- # make sure we have replica_uid from fresh new dbs
- if self._ensure_callback and 'replica_uid' in metadata:
- self._ensure_callback(metadata['replica_uid'])
- # parse incoming document info
- doc_id = None
- rev = None
- content = None
- gen = None
- trans_id = None
- if number_of_changes > 0:
- try:
- entry = json.loads(data[1])
- doc_id = entry['id']
- rev = entry['rev']
- content = entry['content']
- gen = entry['gen']
- trans_id = entry['trans_id']
- except (IndexError, KeyError):
- raise errors.BrokenSyncStream
- return new_generation, new_transaction_id, number_of_changes, \
- doc_id, rev, content, gen, trans_id
-
- def _insert_received_doc(self, idx, total, response):
- """
- Insert a received document into the local replica.
-
- :param idx: The index count of the current operation.
- :type idx: int
- :param total: The total number of operations.
- :type total: int
- :param response: The body and headers of the response.
- :type response: tuple(str, dict)
- """
- new_generation, new_transaction_id, number_of_changes, doc_id, \
- rev, content, gen, trans_id = \
- self._parse_received_doc_response(response)
- if doc_id is not None:
- # decrypt incoming document and insert into local database
- # -------------------------------------------------------------
- # symmetric decryption of document's contents
- # -------------------------------------------------------------
- # If arriving content was symmetrically encrypted, we decrypt it.
- # We do it inline if defer_decryption flag is False or no sync_db
- # was defined, otherwise we defer it writing it to the received
- # docs table.
- doc = SoledadDocument(doc_id, rev, content)
- if is_symmetrically_encrypted(doc):
- if self._queue_for_decrypt:
- self._save_encrypted_received_doc(
- doc, gen, trans_id, idx, total)
- else:
- # defer_decryption is False or no-sync-db fallback
- doc.set_json(decrypt_doc(self._crypto, doc))
- self._return_doc_cb(doc, gen, trans_id)
- else:
- # not symmetrically encrypted doc, insert it directly
- # or save it in the decrypted stage.
- if self._queue_for_decrypt:
- self._save_received_doc(doc, gen, trans_id, idx, total)
- else:
- self._return_doc_cb(doc, gen, trans_id)
- # -------------------------------------------------------------
- # end of symmetric decryption
- # -------------------------------------------------------------
- msg = "%d/%d" % (idx + 1, total)
- signal(SOLEDAD_SYNC_RECEIVE_STATUS, msg)
- logger.debug("Soledad sync receive status: %s" % msg)
- return number_of_changes, new_generation, new_transaction_id
-
- def _get_remote_docs(self, url, last_known_generation, last_known_trans_id,
- headers, return_doc_cb, ensure_callback, sync_id,
- defer_decryption=False):
- """
- Fetch sync documents from the remote database and insert them in the
- local database.
-
- If an incoming document's encryption scheme is equal to
- EncryptionSchemes.SYMKEY, then this method will decrypt it with
- Soledad's symmetric key.
-
- :param url: The syncing URL.
- :type url: str
- :param last_known_generation: Target's last known generation.
- :type last_known_generation: int
- :param last_known_trans_id: Target's last known transaction id.
- :type last_known_trans_id: str
- :param headers: The headers of the HTTP request.
- :type headers: dict
- :param return_doc_cb: A callback to insert docs from target.
- :type return_doc_cb: callable
- :param ensure_callback: A callback to ensure we have the correct
- target_replica_uid, if it was just created.
- :type ensure_callback: callable
- :param sync_id: The id for the current sync session.
- :type sync_id: str
- :param defer_decryption: Whether to defer the decryption process using
- the intermediate database. If False,
- decryption will be done inline.
- :type defer_decryption: bool
-
- :raise BrokenSyncStream: If `data` is malformed.
-
- :return: A dictionary representing the first line of the response got
- from remote replica.
- :rtype: dict
- """
- # we keep a reference to the callback in case we defer the decryption
- self._return_doc_cb = return_doc_cb
- self._queue_for_decrypt = defer_decryption \
- and self._sync_db is not None
-
- new_generation = last_known_generation
- new_transaction_id = last_known_trans_id
-
- if self._queue_for_decrypt:
- logger.debug(
- "Soledad sync: will queue received docs for decrypting.")
-
- idx = 0
- number_of_changes = 1
-
- first_request = True
- last_callback_lock = None
- threads = []
-
- # get incoming documents
- while idx < number_of_changes:
- # bail out if sync process was interrupted
- if self.stopped is True:
- break
-
- # launch a thread to fetch one document from target
- t = self._syncer_pool.new_syncer_thread(
- idx, number_of_changes,
- last_callback_lock=last_callback_lock)
-
- # bail out if any thread failed
- if t is None:
- self.stop()
- break
-
- t.doc_syncer.set_request_method(
- 'get', idx, sync_id, last_known_generation,
- last_known_trans_id)
- t.doc_syncer.set_success_callback(self._insert_received_doc)
-
- def _failure_callback(idx, total, exception):
- _failure_msg = "Soledad sync: error while getting document " \
- "%d/%d: %s" \
- % (idx + 1, total, exception)
- logger.warning("%s" % _failure_msg)
- logger.warning("Soledad sync: failing gracefully, will "
- "recover on next sync.")
-
- t.doc_syncer.set_failure_callback(_failure_callback)
- threads.append(t)
- t.start()
- last_callback_lock = t.callback_lock
- idx += 1
-
- # if this is the first request, wait to update the number of
- # changes
- if first_request is True:
- t.join()
- if t.success:
- number_of_changes, _, _ = t.result
- else:
- raise t.exception
- first_request = False
-
- # make sure all threads finished and we have up-to-date info
- last_successful_thread = None
- while threads:
- # check if there are failures
- t = threads.pop(0)
- t.join()
- if t.success:
- last_successful_thread = t
- else:
- raise t.exception
-
- # get information about last successful thread
- if last_successful_thread is not None:
- body, _ = last_successful_thread.response
- parsed_body = json.loads(body)
- # get current target gen and trans id in case no documents were
- # transferred
- if len(parsed_body) == 1:
- metadata = parsed_body[0]
- new_generation = metadata['new_generation']
- new_transaction_id = metadata['new_transaction_id']
- # get current target gen and trans id from last transferred
- # document
- else:
- doc_data = parsed_body[1]
- new_generation = doc_data['gen']
- new_transaction_id = doc_data['trans_id']
-
- return new_generation, new_transaction_id
-
- def sync_exchange(self, docs_by_generations,
- source_replica_uid, last_known_generation,
- last_known_trans_id, return_doc_cb,
- ensure_callback=None, defer_decryption=True,
- sync_id=None):
- """
- Find out which documents the remote database does not know about,
- encrypt and send them.
-
- This does the same as the parent's method but encrypts content before
- syncing.
-
- :param docs_by_generations: A list of (doc_id, generation, trans_id)
- of local documents that were changed since
- the last local generation the remote
- replica knows about.
- :type docs_by_generations: list of tuples
-
- :param source_replica_uid: The uid of the source replica.
- :type source_replica_uid: str
-
- :param last_known_generation: Target's last known generation.
- :type last_known_generation: int
-
- :param last_known_trans_id: Target's last known transaction id.
- :type last_known_trans_id: str
-
- :param return_doc_cb: A callback for inserting received documents from
- target. If not overriden, this will call u1db
- insert_doc_from_target in synchronizer, which
- implements the TAKE OTHER semantics.
- :type return_doc_cb: function
-
- :param ensure_callback: A callback that ensures we know the target
- replica uid if the target replica was just
- created.
- :type ensure_callback: function
-
- :param defer_decryption: Whether to defer the decryption process using
- the intermediate database. If False,
- decryption will be done inline.
- :type defer_decryption: bool
-
- :return: The new generation and transaction id of the target replica.
- :rtype: tuple
- """
- self._ensure_callback = ensure_callback
-
- if defer_decryption and self._sync_db is not None:
- self._sync_exchange_lock.acquire()
- self._setup_sync_decr_pool()
- self._setup_sync_loop()
- self._defer_decryption = True
- else:
- # fall back
- defer_decryption = False
-
- self.start()
-
- if sync_id is None:
- sync_id = str(uuid4())
- self.source_replica_uid = source_replica_uid
- # let the decrypter pool access the passed callback to insert docs
- setProxiedObject(self._insert_doc_cb[source_replica_uid],
- return_doc_cb)
-
- # empty the database before starting a new sync
- if defer_decryption is True and not self.clear_to_sync():
- self._sync_decr_pool.empty()
-
- self._ensure_connection()
- if self._trace_hook: # for tests
- self._trace_hook('sync_exchange')
- url = '%s/sync-from/%s' % (self._url.path, source_replica_uid)
- headers = self._sign_request('POST', url, {})
-
- cur_target_gen = last_known_generation
- cur_target_trans_id = last_known_trans_id
-
- # send docs
- msg = "%d/%d" % (0, len(docs_by_generations))
- signal(SOLEDAD_SYNC_SEND_STATUS, msg)
- logger.debug("Soledad sync send status: %s" % msg)
-
- defer_encryption = self._sync_db is not None
- self._syncer_pool = DocumentSyncerPool(
- self._raw_url, self._raw_creds, url, headers, ensure_callback,
- self.stop_syncer)
- threads = []
- last_callback_lock = None
- sent = 0
- total = len(docs_by_generations)
-
- synced = []
- number_of_docs = len(docs_by_generations)
-
- last_request_lock = None
- for doc, gen, trans_id in docs_by_generations:
- # allow for interrupting the sync process
- if self.stopped is True:
- break
-
- # skip non-syncable docs
- if isinstance(doc, SoledadDocument) and not doc.syncable:
- continue
-
- # -------------------------------------------------------------
- # symmetric encryption of document's contents
- # -------------------------------------------------------------
- doc_json = doc.get_json()
- if not doc.is_tombstone():
- if not defer_encryption:
- # fallback case, for tests
- doc_json = encrypt_doc(self._crypto, doc)
- else:
- try:
- doc_json = self.get_encrypted_doc_from_db(
- doc.doc_id, doc.rev)
- except Exception as exc:
- logger.error("Error while getting "
- "encrypted doc from db")
- logger.exception(exc)
- continue
- if doc_json is None:
- # Not marked as tombstone, but we got nothing
- # from the sync db. As it is not encrypted yet, we
- # force inline encryption.
- # TODO: implement a queue to deal with these cases.
- doc_json = encrypt_doc(self._crypto, doc)
- # -------------------------------------------------------------
- # end of symmetric encryption
- # -------------------------------------------------------------
- t = self._syncer_pool.new_syncer_thread(
- sent + 1, total, last_request_lock=last_request_lock,
- last_callback_lock=last_callback_lock)
-
- # bail out if any thread failed
- if t is None:
- self.stop()
- break
-
- # set the request method
- t.doc_syncer.set_request_method(
- 'put', sync_id, cur_target_gen, cur_target_trans_id,
- id=doc.doc_id, rev=doc.rev, content=doc_json, gen=gen,
- trans_id=trans_id, number_of_docs=number_of_docs,
- doc_idx=sent + 1)
- # set the success calback
-
- def _success_callback(idx, total, response):
- _success_msg = "Soledad sync send status: %d/%d" \
- % (idx, total)
- signal(SOLEDAD_SYNC_SEND_STATUS, _success_msg)
- logger.debug(_success_msg)
-
- t.doc_syncer.set_success_callback(_success_callback)
-
- # set the failure callback
- def _failure_callback(idx, total, exception):
- _failure_msg = "Soledad sync: error while sending document " \
- "%d/%d: %s" % (idx, total, exception)
- logger.warning("%s" % _failure_msg)
- logger.warning("Soledad sync: failing gracefully, will "
- "recover on next sync.")
-
- t.doc_syncer.set_failure_callback(_failure_callback)
-
- # save thread and append
- t.start()
- threads.append((t, doc))
-
- # update lock references so they can be used in next call to
- # syncer_pool.new_syncer_thread() above
- last_callback_lock = t.callback_lock
- last_request_lock = t.request_lock
-
- sent += 1
-
- # make sure all threads finished and we have up-to-date info
- last_successful_thread = None
- while threads:
- # check if there are failures
- t, doc = threads.pop(0)
- t.join()
- if t.success:
- synced.append((doc.doc_id, doc.rev))
- last_successful_thread = t
- else:
- raise t.exception
-
- # delete documents from the sync database
- if defer_encryption:
- self.delete_encrypted_docs_from_db(synced)
-
- # get target gen and trans_id after docs
- gen_after_send = None
- trans_id_after_send = None
- if last_successful_thread is not None:
- response_dict = json.loads(last_successful_thread.response[0])[0]
- gen_after_send = response_dict['new_generation']
- trans_id_after_send = response_dict['new_transaction_id']
-
- # get docs from target
- if self.stopped is False:
- cur_target_gen, cur_target_trans_id = self._get_remote_docs(
- url,
- last_known_generation, last_known_trans_id, headers,
- return_doc_cb, ensure_callback, sync_id,
- defer_decryption=defer_decryption)
-
- self._syncer_pool.cleanup()
-
- # decrypt docs in case of deferred decryption
- if defer_decryption:
- while not self.clear_to_sync():
- sleep(self.DECRYPT_LOOP_PERIOD)
- self._teardown_sync_loop()
- self._teardown_sync_decr_pool()
- self._sync_exchange_lock.release()
-
- # update gen and trans id info in case we just sent and did not
- # receive docs.
- if gen_after_send is not None and gen_after_send > cur_target_gen:
- cur_target_gen = gen_after_send
- cur_target_trans_id = trans_id_after_send
-
- self.stop()
- self._syncer_pool = None
- return cur_target_gen, cur_target_trans_id
-
- def start(self):
- """
- Mark current sync session as running.
- """
- with self._stop_lock:
- self._stopped = False
-
-
- def stop_syncer(self):
- with self._stop_lock:
- self._stopped = True
-
- def stop(self):
- """
- Mark current sync session as stopped.
-
- This will eventually interrupt the sync_exchange() method and return
- enough information to the synchronizer so the sync session can be
- recovered afterwards.
- """
- self.stop_syncer()
- if self._syncer_pool:
- self._syncer_pool.cancel_threads()
-
- @property
- def stopped(self):
- """
- Return whether this sync session is stopped.
-
- :return: Whether this sync session is stopped.
- :rtype: bool
- """
- with self._stop_lock:
- return self._stopped is True
-
- def get_encrypted_doc_from_db(self, doc_id, doc_rev):
- """
- Retrieve encrypted document from the database of encrypted docs for
- sync.
-
- :param doc_id: The Document id.
- :type doc_id: str
-
- :param doc_rev: The document revision
- :type doc_rev: str
- """
- encr = SyncEncrypterPool
- sql = ("SELECT content FROM %s WHERE doc_id=? and rev=?" % (
- encr.TABLE_NAME,))
- res = self._fetchall(sql, (doc_id, doc_rev))
- if res:
- val = res.pop()
- return val[0]
- else:
- # no doc found
- return None
-
- def delete_encrypted_docs_from_db(self, docs_ids):
- """
- Delete several encrypted documents from the database of symmetrically
- encrypted docs to sync.
-
- :param docs_ids: an iterable with (doc_id, doc_rev) for all documents
- to be deleted.
- :type docs_ids: any iterable of tuples of str
- """
- if docs_ids:
- encr = SyncEncrypterPool
- for doc_id, doc_rev in docs_ids:
- sql = ("DELETE FROM %s WHERE doc_id=? and rev=?" % (
- encr.TABLE_NAME,))
- self._sync_db.execute(sql, (doc_id, doc_rev))
-
- def _save_encrypted_received_doc(self, doc, gen, trans_id, idx, total):
- """
- Save a symmetrically encrypted incoming document into the received
- docs table in the sync db. A decryption task will pick it up
- from here in turn.
-
- :param doc: The document to save.
- :type doc: SoledadDocument
- :param gen: The generation.
- :type gen: str
- :param trans_id: Transacion id.
- :type gen: str
- :param idx: The index count of the current operation.
- :type idx: int
- :param total: The total number of operations.
- :type total: int
- """
- logger.debug(
- "Enqueueing doc for decryption: %d/%d."
- % (idx + 1, total))
- self._sync_decr_pool.insert_encrypted_received_doc(
- doc.doc_id, doc.rev, doc.content, gen, trans_id)
-
- def _save_received_doc(self, doc, gen, trans_id, idx, total):
- """
- Save any incoming document into the received docs table in the sync db.
-
- :param doc: The document to save.
- :type doc: SoledadDocument
- :param gen: The generation.
- :type gen: str
- :param trans_id: Transacion id.
- :type gen: str
- :param idx: The index count of the current operation.
- :type idx: int
- :param total: The total number of operations.
- :type total: int
- """
- logger.debug(
- "Enqueueing doc, no decryption needed: %d/%d."
- % (idx + 1, total))
- self._sync_decr_pool.insert_received_doc(
- doc.doc_id, doc.rev, doc.content, gen, trans_id)
-
- #
- # Symmetric decryption of syncing docs
- #
-
- def clear_to_sync(self):
- """
- Return whether sync can proceed (ie, the received db table is empty).
-
- :return: Whether sync can proceed.
- :rtype: bool
- """
- if self._sync_decr_pool:
- return self._sync_decr_pool.count_docs_in_sync_db() == 0
- return True
-
- def set_decryption_callback(self, cb):
- """
- Set callback to be called when the decryption finishes.
-
- :param cb: The callback to be set.
- :type cb: callable
- """
- self._decryption_callback = cb
-
- def has_decryption_callback(self):
- """
- Return True if there is a decryption callback set.
- :rtype: bool
- """
- return self._decryption_callback is not None
-
- def has_syncdb(self):
- """
- Return True if we have an initialized syncdb.
- """
- return self._sync_db is not None
-
- def _decrypt_syncing_received_docs(self):
- """
- Decrypt the documents received from remote replica and insert them
- into the local one.
-
- Called periodically from LoopingCall self._sync_loop.
- """
- if sameProxiedObjects(
- self._insert_doc_cb.get(self.source_replica_uid),
- None):
- return
-
- decrypter = self._sync_decr_pool
- decrypter.raise_in_case_of_failed_async_calls()
- decrypter.decrypt_received_docs()
- decrypter.process_decrypted()
-
- def _sign_request(self, method, url_query, params):
- """
- Return an authorization header to be included in the HTTP request.
-
- :param method: The HTTP method.
- :type method: str
- :param url_query: The URL query string.
- :type url_query: str
- :param params: A list with encoded query parameters.
- :type param: list
-
- :return: The Authorization header.
- :rtype: list of tuple
- """
- return TokenBasedAuth._sign_request(self, method, url_query, params)
-
- def set_token_credentials(self, uuid, token):
- """
- Store given credentials so we can sign the request later.
-
- :param uuid: The user's uuid.
- :type uuid: str
- :param token: The authentication token.
- :type token: str
- """
- TokenBasedAuth.set_token_credentials(self, uuid, token)
-
- def _fetchall(self, *args, **kwargs):
- with self._sync_db:
- c = self._sync_db.cursor()
- c.execute(*args, **kwargs)
- return c.fetchall()