summaryrefslogtreecommitdiff
path: root/client/src
diff options
context:
space:
mode:
Diffstat (limited to 'client/src')
-rw-r--r--client/src/leap/soledad/client/adbapi.py3
-rw-r--r--client/src/leap/soledad/client/api.py2
-rw-r--r--client/src/leap/soledad/client/encdecpool.py2
-rw-r--r--client/src/leap/soledad/client/events.py4
-rw-r--r--client/src/leap/soledad/client/http_target.py711
-rw-r--r--client/src/leap/soledad/client/http_target/__init__.py90
-rw-r--r--client/src/leap/soledad/client/http_target/api.py229
-rw-r--r--client/src/leap/soledad/client/http_target/fetch.py252
-rw-r--r--client/src/leap/soledad/client/http_target/send.py102
-rw-r--r--client/src/leap/soledad/client/http_target/support.py203
-rw-r--r--client/src/leap/soledad/client/secrets.py35
-rw-r--r--client/src/leap/soledad/client/sqlcipher.py1
12 files changed, 902 insertions, 732 deletions
diff --git a/client/src/leap/soledad/client/adbapi.py b/client/src/leap/soledad/client/adbapi.py
index 237159bd..77822247 100644
--- a/client/src/leap/soledad/client/adbapi.py
+++ b/client/src/leap/soledad/client/adbapi.py
@@ -285,7 +285,8 @@ class U1DBConnectionPool(adbapi.ConnectionPool):
A final close, only called by the shutdown trigger.
"""
self.shutdownID = None
- self.threadpool.stop()
+ if self.threadpool.started:
+ self.threadpool.stop()
self.running = False
for conn in self.connections.values():
self._close(conn)
diff --git a/client/src/leap/soledad/client/api.py b/client/src/leap/soledad/client/api.py
index a6a98551..a558addd 100644
--- a/client/src/leap/soledad/client/api.py
+++ b/client/src/leap/soledad/client/api.py
@@ -718,7 +718,7 @@ class Soledad(object):
return failure
def _emit_done_data_sync(passthrough):
- soledad_events.emit(
+ soledad_events.emit_async(
soledad_events.SOLEDAD_DONE_DATA_SYNC, self.uuid)
return passthrough
diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py
index 2ad98767..6d3c11b9 100644
--- a/client/src/leap/soledad/client/encdecpool.py
+++ b/client/src/leap/soledad/client/encdecpool.py
@@ -74,6 +74,8 @@ class SyncEncryptDecryptPool(object):
self._started = True
def stop(self):
+ if not self._started:
+ return
self._started = False
self._destroy_pool()
# maybe cancel the next delayed call
diff --git a/client/src/leap/soledad/client/events.py b/client/src/leap/soledad/client/events.py
index b1379521..058be59c 100644
--- a/client/src/leap/soledad/client/events.py
+++ b/client/src/leap/soledad/client/events.py
@@ -20,7 +20,7 @@
Signaling functions.
"""
-from leap.common.events import emit
+from leap.common.events import emit_async
from leap.common.events import catalog
@@ -40,7 +40,7 @@ SOLEDAD_SYNC_RECEIVE_STATUS = catalog.SOLEDAD_SYNC_RECEIVE_STATUS
__all__ = [
"catalog",
- "emit",
+ "emit_async",
"SOLEDAD_CREATING_KEYS",
"SOLEDAD_DONE_CREATING_KEYS",
"SOLEDAD_DOWNLOADING_KEYS",
diff --git a/client/src/leap/soledad/client/http_target.py b/client/src/leap/soledad/client/http_target.py
deleted file mode 100644
index a6ef2b0d..00000000
--- a/client/src/leap/soledad/client/http_target.py
+++ /dev/null
@@ -1,711 +0,0 @@
-# -*- coding: utf-8 -*-
-# http_target.py
-# Copyright (C) 2015 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-
-"""
-A U1DB backend for encrypting data before sending to server and decrypting
-after receiving.
-"""
-
-
-import json
-import base64
-import logging
-import warnings
-
-from uuid import uuid4
-
-from twisted.internet import defer
-from twisted.web.error import Error
-from twisted.web.client import _ReadBodyProtocol
-from twisted.web.client import PartialDownloadError
-from twisted.web._newclient import ResponseDone
-from twisted.web._newclient import PotentialDataLoss
-
-from u1db import errors
-from u1db import SyncTarget
-from u1db.remote import utils
-from u1db.remote import http_errors
-
-from leap.common.http import HTTPClient
-
-from leap.soledad.common.document import SoledadDocument
-from leap.soledad.common.errors import InvalidAuthTokenError
-
-from leap.soledad.client.crypto import is_symmetrically_encrypted
-from leap.soledad.client.events import SOLEDAD_SYNC_SEND_STATUS
-from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS
-from leap.soledad.client.events import emit
-from leap.soledad.client.encdecpool import SyncDecrypterPool
-
-
-logger = logging.getLogger(__name__)
-
-
-# we want to make sure that HTTP errors will raise appropriate u1db errors,
-# that is, fire errbacks with the appropriate failures, in the context of
-# twisted. Because of that, we redefine the http body reader used by the HTTP
-# client below.
-
-class ReadBodyProtocol(_ReadBodyProtocol):
-
- def __init__(self, response, deferred):
- """
- Initialize the protocol, additionally storing the response headers.
- """
- _ReadBodyProtocol.__init__(
- self, response.code, response.phrase, deferred)
- self.headers = response.headers
-
- # ---8<--- snippet from u1db.remote.http_client, modified to use errbacks
- def _error(self, respdic):
- descr = respdic.get("error")
- exc_cls = errors.wire_description_to_exc.get(descr)
- if exc_cls is not None:
- message = respdic.get("message")
- self.deferred.errback(exc_cls(message))
- # ---8<--- end of snippet from u1db.remote.http_client
-
- def connectionLost(self, reason):
- """
- Deliver the accumulated response bytes to the waiting L{Deferred}, if
- the response body has been completely received without error.
- """
- if reason.check(ResponseDone):
-
- body = b''.join(self.dataBuffer)
-
- # ---8<--- snippet from u1db.remote.http_client
- if self.status in (200, 201):
- self.deferred.callback(body)
- elif self.status in http_errors.ERROR_STATUSES:
- try:
- respdic = json.loads(body)
- except ValueError:
- self.deferred.errback(
- errors.HTTPError(self.status, body, self.headers))
- else:
- self._error(respdic)
- # special cases
- elif self.status == 503:
- self.deferred.errback(errors.Unavailable(body, self.headers))
- else:
- self.deferred.errback(
- errors.HTTPError(self.status, body, self.headers))
- # ---8<--- end of snippet from u1db.remote.http_client
-
- elif reason.check(PotentialDataLoss):
- self.deferred.errback(
- PartialDownloadError(self.status, self.message,
- b''.join(self.dataBuffer)))
- else:
- self.deferred.errback(reason)
-
-
-def readBody(response):
- """
- Get the body of an L{IResponse} and return it as a byte string.
-
- This is a helper function for clients that don't want to incrementally
- receive the body of an HTTP response.
-
- @param response: The HTTP response for which the body will be read.
- @type response: L{IResponse} provider
-
- @return: A L{Deferred} which will fire with the body of the response.
- Cancelling it will close the connection to the server immediately.
- """
- def cancel(deferred):
- """
- Cancel a L{readBody} call, close the connection to the HTTP server
- immediately, if it is still open.
-
- @param deferred: The cancelled L{defer.Deferred}.
- """
- abort = getAbort()
- if abort is not None:
- abort()
-
- d = defer.Deferred(cancel)
- protocol = ReadBodyProtocol(response, d)
-
- def getAbort():
- return getattr(protocol.transport, 'abortConnection', None)
-
- response.deliverBody(protocol)
-
- if protocol.transport is not None and getAbort() is None:
- warnings.warn(
- 'Using readBody with a transport that does not have an '
- 'abortConnection method',
- category=DeprecationWarning,
- stacklevel=2)
-
- return d
-
-
-class SoledadHTTPSyncTarget(SyncTarget):
-
- """
- A SyncTarget that encrypts data before sending and decrypts data after
- receiving.
-
- Normally encryption will have been written to the sync database upon
- document modification. The sync database is also used to write temporarily
- the parsed documents that the remote send us, before being decrypted and
- written to the main database.
- """
-
- def __init__(self, url, source_replica_uid, creds, crypto, cert_file,
- sync_db=None, sync_enc_pool=None):
- """
- Initialize the sync target.
-
- :param url: The server sync url.
- :type url: str
- :param source_replica_uid: The source replica uid which we use when
- deferring decryption.
- :type source_replica_uid: str
- :param creds: A dictionary containing the uuid and token.
- :type creds: creds
- :param crypto: An instance of SoledadCrypto so we can encrypt/decrypt
- document contents when syncing.
- :type crypto: soledad.crypto.SoledadCrypto
- :param cert_file: Path to the certificate of the ca used to validate
- the SSL certificate used by the remote soledad
- server.
- :type cert_file: str
- :param sync_db: Optional. handler for the db with the symmetric
- encryption of the syncing documents. If
- None, encryption will be done in-place,
- instead of retreiving it from the dedicated
- database.
- :type sync_db: Sqlite handler
- :param sync_enc_pool: The encryption pool to use to defer encryption.
- If None is passed the encryption will not be
- deferred.
- :type sync_enc_pool: leap.soledad.client.encdecpool.SyncEncrypterPool
- """
- if url.endswith("/"):
- url = url[:-1]
- self._url = str(url) + "/sync-from/" + str(source_replica_uid)
- self.source_replica_uid = source_replica_uid
- self._auth_header = None
- self.set_creds(creds)
- self._crypto = crypto
- self._sync_db = sync_db
- self._sync_enc_pool = sync_enc_pool
- self._insert_doc_cb = None
- # asynchronous encryption/decryption attributes
- self._decryption_callback = None
- self._sync_decr_pool = None
- self._http = HTTPClient(cert_file)
-
- def close(self):
- self._http.close()
-
- def set_creds(self, creds):
- """
- Update credentials.
-
- :param creds: A dictionary containing the uuid and token.
- :type creds: dict
- """
- uuid = creds['token']['uuid']
- token = creds['token']['token']
- auth = '%s:%s' % (uuid, token)
- b64_token = base64.b64encode(auth)
- self._auth_header = {'Authorization': ['Token %s' % b64_token]}
-
- @property
- def _defer_encryption(self):
- return self._sync_enc_pool is not None
-
- #
- # SyncTarget API
- #
-
- @defer.inlineCallbacks
- def get_sync_info(self, source_replica_uid):
- """
- Return information about known state of remote database.
-
- Return the replica_uid and the current database generation of the
- remote database, and its last-seen database generation for the client
- replica.
-
- :param source_replica_uid: The client-size replica uid.
- :type source_replica_uid: str
-
- :return: A deferred which fires with (target_replica_uid,
- target_replica_generation, target_trans_id,
- source_replica_last_known_generation,
- source_replica_last_known_transaction_id)
- :rtype: twisted.internet.defer.Deferred
- """
- raw = yield self._http_request(self._url, headers=self._auth_header)
- res = json.loads(raw)
- defer.returnValue((
- res['target_replica_uid'],
- res['target_replica_generation'],
- res['target_replica_transaction_id'],
- res['source_replica_generation'],
- res['source_transaction_id']
- ))
-
- def record_sync_info(
- self, source_replica_uid, source_replica_generation,
- source_replica_transaction_id):
- """
- Record tip information for another replica.
-
- After sync_exchange has been processed, the caller will have
- received new content from this replica. This call allows the
- source replica instigating the sync to inform us what their
- generation became after applying the documents we returned.
-
- This is used to allow future sync operations to not need to repeat data
- that we just talked about. It also means that if this is called at the
- wrong time, there can be database records that will never be
- synchronized.
-
- :param source_replica_uid: The identifier for the source replica.
- :type source_replica_uid: str
- :param source_replica_generation: The database generation for the
- source replica.
- :type source_replica_generation: int
- :param source_replica_transaction_id: The transaction id associated
- with the source replica
- generation.
- :type source_replica_transaction_id: str
-
- :return: A deferred which fires with the result of the query.
- :rtype: twisted.internet.defer.Deferred
- """
- data = json.dumps({
- 'generation': source_replica_generation,
- 'transaction_id': source_replica_transaction_id
- })
- headers = self._auth_header.copy()
- headers.update({'content-type': ['application/json']})
- return self._http_request(
- self._url,
- method='PUT',
- headers=headers,
- body=data)
-
- @defer.inlineCallbacks
- def sync_exchange(self, docs_by_generation, source_replica_uid,
- last_known_generation, last_known_trans_id,
- insert_doc_cb, ensure_callback=None,
- defer_decryption=True, sync_id=None):
- """
- Find out which documents the remote database does not know about,
- encrypt and send them. After that, receive documents from the remote
- database.
-
- :param docs_by_generations: A list of (doc_id, generation, trans_id)
- of local documents that were changed since
- the last local generation the remote
- replica knows about.
- :type docs_by_generations: list of tuples
-
- :param source_replica_uid: The uid of the source replica.
- :type source_replica_uid: str
-
- :param last_known_generation: Target's last known generation.
- :type last_known_generation: int
-
- :param last_known_trans_id: Target's last known transaction id.
- :type last_known_trans_id: str
-
- :param insert_doc_cb: A callback for inserting received documents from
- target. If not overriden, this will call u1db
- insert_doc_from_target in synchronizer, which
- implements the TAKE OTHER semantics.
- :type insert_doc_cb: function
-
- :param ensure_callback: A callback that ensures we know the target
- replica uid if the target replica was just
- created.
- :type ensure_callback: function
-
- :param defer_decryption: Whether to defer the decryption process using
- the intermediate database. If False,
- decryption will be done inline.
- :type defer_decryption: bool
-
- :return: A deferred which fires with the new generation and
- transaction id of the target replica.
- :rtype: twisted.internet.defer.Deferred
- """
-
- self._ensure_callback = ensure_callback
-
- if sync_id is None:
- sync_id = str(uuid4())
- self.source_replica_uid = source_replica_uid
-
- # save a reference to the callback so we can use it after decrypting
- self._insert_doc_cb = insert_doc_cb
-
- gen_after_send, trans_id_after_send = yield self._send_docs(
- docs_by_generation,
- last_known_generation,
- last_known_trans_id,
- sync_id)
-
- cur_target_gen, cur_target_trans_id = yield self._receive_docs(
- last_known_generation, last_known_trans_id,
- ensure_callback, sync_id,
- defer_decryption=defer_decryption)
-
- # update gen and trans id info in case we just sent and did not
- # receive docs.
- if gen_after_send is not None and gen_after_send > cur_target_gen:
- cur_target_gen = gen_after_send
- cur_target_trans_id = trans_id_after_send
-
- defer.returnValue([cur_target_gen, cur_target_trans_id])
-
- #
- # methods to send docs
- #
-
- def _prepare(self, comma, entries, **dic):
- entry = comma + '\r\n' + json.dumps(dic)
- entries.append(entry)
- return len(entry)
-
- @defer.inlineCallbacks
- def _send_docs(self, docs_by_generation, last_known_generation,
- last_known_trans_id, sync_id):
-
- if not docs_by_generation:
- defer.returnValue([None, None])
-
- headers = self._auth_header.copy()
- headers.update({'content-type': ['application/x-soledad-sync-put']})
- # add remote replica metadata to the request
- first_entries = ['[']
- self._prepare(
- '', first_entries,
- last_known_generation=last_known_generation,
- last_known_trans_id=last_known_trans_id,
- sync_id=sync_id,
- ensure=self._ensure_callback is not None)
- idx = 0
- total = len(docs_by_generation)
- for doc, gen, trans_id in docs_by_generation:
- idx += 1
- result = yield self._send_one_doc(
- headers, first_entries, doc,
- gen, trans_id, total, idx)
- if self._defer_encryption:
- self._sync_enc_pool.delete_encrypted_doc(
- doc.doc_id, doc.rev)
-
- msg = "%d/%d" % (idx, total)
- content = {'sent': idx, 'total': total}
- emit(SOLEDAD_SYNC_SEND_STATUS, content)
- logger.debug("Sync send status: %s" % msg)
-
- response_dict = json.loads(result)[0]
- gen_after_send = response_dict['new_generation']
- trans_id_after_send = response_dict['new_transaction_id']
- defer.returnValue([gen_after_send, trans_id_after_send])
-
- @defer.inlineCallbacks
- def _send_one_doc(self, headers, first_entries, doc, gen, trans_id,
- number_of_docs, doc_idx):
- entries = first_entries[:]
- # add the document to the request
- content = yield self._encrypt_doc(doc)
- self._prepare(
- ',', entries,
- id=doc.doc_id, rev=doc.rev, content=content, gen=gen,
- trans_id=trans_id, number_of_docs=number_of_docs,
- doc_idx=doc_idx)
- entries.append('\r\n]')
- data = ''.join(entries)
- result = yield self._http_request(
- self._url,
- method='POST',
- headers=headers,
- body=data)
- defer.returnValue(result)
-
- def _encrypt_doc(self, doc):
- d = None
- if doc.is_tombstone():
- d = defer.succeed(None)
- elif not self._defer_encryption:
- # fallback case, for tests
- d = defer.succeed(self._crypto.encrypt_doc(doc))
- else:
-
- def _maybe_encrypt_doc_inline(doc_json):
- if doc_json is None:
- # the document is not marked as tombstone, but we got
- # nothing from the sync db. As it is not encrypted
- # yet, we force inline encryption.
- return self._crypto.encrypt_doc(doc)
- return doc_json
-
- d = self._sync_enc_pool.get_encrypted_doc(doc.doc_id, doc.rev)
- d.addCallback(_maybe_encrypt_doc_inline)
- return d
-
- #
- # methods to receive doc
- #
-
- @defer.inlineCallbacks
- def _receive_docs(self, last_known_generation, last_known_trans_id,
- ensure_callback, sync_id, defer_decryption):
-
- self._queue_for_decrypt = defer_decryption \
- and self._sync_db is not None
-
- new_generation = last_known_generation
- new_transaction_id = last_known_trans_id
-
- if self._queue_for_decrypt:
- logger.debug(
- "Soledad sync: will queue received docs for decrypting.")
-
- if defer_decryption:
- self._setup_sync_decr_pool()
-
- headers = self._auth_header.copy()
- headers.update({'content-type': ['application/x-soledad-sync-get']})
-
- # ---------------------------------------------------------------------
- # maybe receive the first document
- # ---------------------------------------------------------------------
-
- # we fetch the first document before fetching the rest because we need
- # to know the total number of documents to be received, and this
- # information comes as metadata to each request.
-
- doc = yield self._receive_one_doc(
- headers, last_known_generation, last_known_trans_id,
- sync_id, 0)
- self._received_docs = 0
- number_of_changes, ngen, ntrans = self._insert_received_doc(doc, 1, 1)
-
- # update the target gen and trans_id in case a document was received
- if ngen:
- new_generation = ngen
- new_transaction_id = ntrans
-
- if defer_decryption:
- self._sync_decr_pool.start(number_of_changes)
-
- # ---------------------------------------------------------------------
- # maybe receive the rest of the documents
- # ---------------------------------------------------------------------
-
- # launch many asynchronous fetches and inserts of received documents
- # in the temporary sync db. Will wait for all results before
- # continuing.
-
- received = 1
- deferreds = []
- while received < number_of_changes:
- d = self._receive_one_doc(
- headers, last_known_generation,
- last_known_trans_id, sync_id, received)
- d.addCallback(
- self._insert_received_doc,
- received + 1, # the index of the current received doc
- number_of_changes)
- deferreds.append(d)
- received += 1
- results = yield defer.gatherResults(deferreds)
-
- # get generation and transaction id of target after insertions
- if deferreds:
- _, new_generation, new_transaction_id = results.pop()
-
- # ---------------------------------------------------------------------
- # wait for async decryption to finish
- # ---------------------------------------------------------------------
-
- if defer_decryption:
- yield self._sync_decr_pool.deferred
- self._sync_decr_pool.stop()
-
- defer.returnValue([new_generation, new_transaction_id])
-
- def _receive_one_doc(self, headers, last_known_generation,
- last_known_trans_id, sync_id, received):
- entries = ['[']
- # add remote replica metadata to the request
- self._prepare(
- '', entries,
- last_known_generation=last_known_generation,
- last_known_trans_id=last_known_trans_id,
- sync_id=sync_id,
- ensure=self._ensure_callback is not None)
- # inform server of how many documents have already been received
- self._prepare(
- ',', entries, received=received)
- entries.append('\r\n]')
- # send headers
- return self._http_request(
- self._url,
- method='POST',
- headers=headers,
- body=''.join(entries))
-
- def _insert_received_doc(self, response, idx, total):
- """
- Insert a received document into the local replica.
-
- :param response: The body and headers of the response.
- :type response: tuple(str, dict)
- :param idx: The index count of the current operation.
- :type idx: int
- :param total: The total number of operations.
- :type total: int
- """
- new_generation, new_transaction_id, number_of_changes, doc_id, \
- rev, content, gen, trans_id = \
- self._parse_received_doc_response(response)
- if doc_id is not None:
- # decrypt incoming document and insert into local database
- # -------------------------------------------------------------
- # symmetric decryption of document's contents
- # -------------------------------------------------------------
- # If arriving content was symmetrically encrypted, we decrypt it.
- # We do it inline if defer_decryption flag is False or no sync_db
- # was defined, otherwise we defer it writing it to the received
- # docs table.
- doc = SoledadDocument(doc_id, rev, content)
- if is_symmetrically_encrypted(doc):
- if self._queue_for_decrypt:
- self._sync_decr_pool.insert_encrypted_received_doc(
- doc.doc_id, doc.rev, doc.content, gen, trans_id,
- idx)
- else:
- # defer_decryption is False or no-sync-db fallback
- doc.set_json(self._crypto.decrypt_doc(doc))
- self._insert_doc_cb(doc, gen, trans_id)
- else:
- # not symmetrically encrypted doc, insert it directly
- # or save it in the decrypted stage.
- if self._queue_for_decrypt:
- self._sync_decr_pool.insert_received_doc(
- doc.doc_id, doc.rev, doc.content, gen, trans_id,
- idx)
- else:
- self._insert_doc_cb(doc, gen, trans_id)
- # -------------------------------------------------------------
- # end of symmetric decryption
- # -------------------------------------------------------------
- self._received_docs += 1
- msg = "%d/%d" % (self._received_docs, total)
- content = {'received': self._received_docs, 'total': total}
- emit(SOLEDAD_SYNC_RECEIVE_STATUS, content)
- logger.debug("Sync receive status: %s" % msg)
- return number_of_changes, new_generation, new_transaction_id
-
- def _parse_received_doc_response(self, response):
- """
- Parse the response from the server containing the received document.
-
- :param response: The body and headers of the response.
- :type response: tuple(str, dict)
-
- :return: (new_gen, new_trans_id, number_of_changes, doc_id, rev,
- content, gen, trans_id)
- :rtype: tuple
- """
- # decode incoming stream
- parts = response.splitlines()
- if not parts or parts[0] != '[' or parts[-1] != ']':
- raise errors.BrokenSyncStream
- data = parts[1:-1]
- # decode metadata
- try:
- line, comma = utils.check_and_strip_comma(data[0])
- metadata = None
- except (IndexError):
- raise errors.BrokenSyncStream
- try:
- metadata = json.loads(line)
- new_generation = metadata['new_generation']
- new_transaction_id = metadata['new_transaction_id']
- number_of_changes = metadata['number_of_changes']
- except (ValueError, KeyError):
- raise errors.BrokenSyncStream
- # make sure we have replica_uid from fresh new dbs
- if self._ensure_callback and 'replica_uid' in metadata:
- self._ensure_callback(metadata['replica_uid'])
- # parse incoming document info
- doc_id = None
- rev = None
- content = None
- gen = None
- trans_id = None
- if number_of_changes > 0:
- try:
- entry = json.loads(data[1])
- doc_id = entry['id']
- rev = entry['rev']
- content = entry['content']
- gen = entry['gen']
- trans_id = entry['trans_id']
- except (IndexError, KeyError):
- raise errors.BrokenSyncStream
- return new_generation, new_transaction_id, number_of_changes, \
- doc_id, rev, content, gen, trans_id
-
- def _setup_sync_decr_pool(self):
- """
- Set up the SyncDecrypterPool for deferred decryption.
- """
- if self._sync_decr_pool is None and self._sync_db is not None:
- # initialize syncing queue decryption pool
- self._sync_decr_pool = SyncDecrypterPool(
- self._crypto,
- self._sync_db,
- insert_doc_cb=self._insert_doc_cb,
- source_replica_uid=self.source_replica_uid)
-
- def _http_request(self, url, method='GET', body=None, headers={}):
- d = self._http.request(url, method, body, headers, readBody)
- d.addErrback(_unauth_to_invalid_token_error)
- return d
-
-
-def _unauth_to_invalid_token_error(failure):
- """
- An errback to translate unauthorized errors to our own invalid token
- class.
-
- :param failure: The original failure.
- :type failure: twisted.python.failure.Failure
-
- :return: Either the original failure or an invalid auth token error.
- :rtype: twisted.python.failure.Failure
- """
- failure.trap(Error)
- if failure.getErrorMessage() == "401 Unauthorized":
- raise InvalidAuthTokenError
- return failure
diff --git a/client/src/leap/soledad/client/http_target/__init__.py b/client/src/leap/soledad/client/http_target/__init__.py
new file mode 100644
index 00000000..7a5cea9f
--- /dev/null
+++ b/client/src/leap/soledad/client/http_target/__init__.py
@@ -0,0 +1,90 @@
+# -*- coding: utf-8 -*-
+# __init__.py
+# Copyright (C) 2015 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+"""
+A U1DB backend for encrypting data before sending to server and decrypting
+after receiving.
+"""
+
+
+import logging
+
+from leap.common.http import HTTPClient
+from leap.soledad.client.http_target.send import HTTPDocSender
+from leap.soledad.client.http_target.api import SyncTargetAPI
+from leap.soledad.client.http_target.fetch import HTTPDocFetcher
+
+
+logger = logging.getLogger(__name__)
+
+
+class SoledadHTTPSyncTarget(SyncTargetAPI, HTTPDocSender, HTTPDocFetcher):
+
+ """
+ A SyncTarget that encrypts data before sending and decrypts data after
+ receiving.
+
+ Normally encryption will have been written to the sync database upon
+ document modification. The sync database is also used to write temporarily
+ the parsed documents that the remote send us, before being decrypted and
+ written to the main database.
+ """
+ def __init__(self, url, source_replica_uid, creds, crypto, cert_file,
+ sync_db=None, sync_enc_pool=None):
+ """
+ Initialize the sync target.
+
+ :param url: The server sync url.
+ :type url: str
+ :param source_replica_uid: The source replica uid which we use when
+ deferring decryption.
+ :type source_replica_uid: str
+ :param creds: A dictionary containing the uuid and token.
+ :type creds: creds
+ :param crypto: An instance of SoledadCrypto so we can encrypt/decrypt
+ document contents when syncing.
+ :type crypto: soledad.crypto.SoledadCrypto
+ :param cert_file: Path to the certificate of the ca used to validate
+ the SSL certificate used by the remote soledad
+ server.
+ :type cert_file: str
+ :param sync_db: Optional. handler for the db with the symmetric
+ encryption of the syncing documents. If
+ None, encryption will be done in-place,
+ instead of retreiving it from the dedicated
+ database.
+ :type sync_db: Sqlite handler
+ :param sync_enc_pool: The encryption pool to use to defer encryption.
+ If None is passed the encryption will not be
+ deferred.
+ :type sync_enc_pool: leap.soledad.client.encdecpool.SyncEncrypterPool
+ """
+ if url.endswith("/"):
+ url = url[:-1]
+ self._url = str(url) + "/sync-from/" + str(source_replica_uid)
+ self.source_replica_uid = source_replica_uid
+ self._auth_header = None
+ self.set_creds(creds)
+ self._crypto = crypto
+ self._sync_db = sync_db
+ self._sync_enc_pool = sync_enc_pool
+ self._insert_doc_cb = None
+ # asynchronous encryption/decryption attributes
+ self._decryption_callback = None
+ self._sync_decr_pool = None
+ self._http = HTTPClient(cert_file)
diff --git a/client/src/leap/soledad/client/http_target/api.py b/client/src/leap/soledad/client/http_target/api.py
new file mode 100644
index 00000000..dcc762f6
--- /dev/null
+++ b/client/src/leap/soledad/client/http_target/api.py
@@ -0,0 +1,229 @@
+# -*- coding: utf-8 -*-
+# api.py
+# Copyright (C) 2015 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import json
+import base64
+
+from uuid import uuid4
+from u1db import SyncTarget
+
+from twisted.web.error import Error
+from twisted.internet import defer
+
+from leap.soledad.common.errors import InvalidAuthTokenError
+from leap.soledad.client.http_target.support import readBody
+
+
+class SyncTargetAPI(SyncTarget):
+ """
+ Declares public methods and implements u1db.SyncTarget.
+ """
+
+ @defer.inlineCallbacks
+ def close(self):
+ if self._sync_enc_pool:
+ self._sync_enc_pool.stop()
+ if self._sync_decr_pool:
+ self._sync_decr_pool.stop()
+ yield self._http.close()
+
+ def set_creds(self, creds):
+ """
+ Update credentials.
+
+ :param creds: A dictionary containing the uuid and token.
+ :type creds: dict
+ """
+ uuid = creds['token']['uuid']
+ token = creds['token']['token']
+ auth = '%s:%s' % (uuid, token)
+ b64_token = base64.b64encode(auth)
+ self._auth_header = {'Authorization': ['Token %s' % b64_token]}
+
+ @property
+ def _base_header(self):
+ return self._auth_header.copy() if self._auth_header else {}
+
+ @property
+ def _defer_encryption(self):
+ return self._sync_enc_pool is not None
+
+ def _http_request(self, url, method='GET', body=None, headers=None,
+ content_type=None):
+ headers = headers or self._base_header
+ if content_type:
+ headers.update({'content-type': [content_type]})
+ d = self._http.request(url, method, body, headers, readBody)
+ d.addErrback(_unauth_to_invalid_token_error)
+ return d
+
+ @defer.inlineCallbacks
+ def get_sync_info(self, source_replica_uid):
+ """
+ Return information about known state of remote database.
+
+ Return the replica_uid and the current database generation of the
+ remote database, and its last-seen database generation for the client
+ replica.
+
+ :param source_replica_uid: The client-size replica uid.
+ :type source_replica_uid: str
+
+ :return: A deferred which fires with (target_replica_uid,
+ target_replica_generation, target_trans_id,
+ source_replica_last_known_generation,
+ source_replica_last_known_transaction_id)
+ :rtype: twisted.internet.defer.Deferred
+ """
+ raw = yield self._http_request(self._url)
+ res = json.loads(raw)
+ defer.returnValue((
+ res['target_replica_uid'],
+ res['target_replica_generation'],
+ res['target_replica_transaction_id'],
+ res['source_replica_generation'],
+ res['source_transaction_id']
+ ))
+
+ def record_sync_info(
+ self, source_replica_uid, source_replica_generation,
+ source_replica_transaction_id):
+ """
+ Record tip information for another replica.
+
+ After sync_exchange has been processed, the caller will have
+ received new content from this replica. This call allows the
+ source replica instigating the sync to inform us what their
+ generation became after applying the documents we returned.
+
+ This is used to allow future sync operations to not need to repeat data
+ that we just talked about. It also means that if this is called at the
+ wrong time, there can be database records that will never be
+ synchronized.
+
+ :param source_replica_uid: The identifier for the source replica.
+ :type source_replica_uid: str
+ :param source_replica_generation: The database generation for the
+ source replica.
+ :type source_replica_generation: int
+ :param source_replica_transaction_id: The transaction id associated
+ with the source replica
+ generation.
+ :type source_replica_transaction_id: str
+
+ :return: A deferred which fires with the result of the query.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ data = json.dumps({
+ 'generation': source_replica_generation,
+ 'transaction_id': source_replica_transaction_id
+ })
+ return self._http_request(
+ self._url,
+ method='PUT',
+ body=data,
+ content_type='application/json')
+
+ @defer.inlineCallbacks
+ def sync_exchange(self, docs_by_generation, source_replica_uid,
+ last_known_generation, last_known_trans_id,
+ insert_doc_cb, ensure_callback=None,
+ defer_decryption=True, sync_id=None):
+ """
+ Find out which documents the remote database does not know about,
+ encrypt and send them. After that, receive documents from the remote
+ database.
+
+ :param docs_by_generations: A list of (doc_id, generation, trans_id)
+ of local documents that were changed since
+ the last local generation the remote
+ replica knows about.
+ :type docs_by_generations: list of tuples
+
+ :param source_replica_uid: The uid of the source replica.
+ :type source_replica_uid: str
+
+ :param last_known_generation: Target's last known generation.
+ :type last_known_generation: int
+
+ :param last_known_trans_id: Target's last known transaction id.
+ :type last_known_trans_id: str
+
+ :param insert_doc_cb: A callback for inserting received documents from
+ target. If not overriden, this will call u1db
+ insert_doc_from_target in synchronizer, which
+ implements the TAKE OTHER semantics.
+ :type insert_doc_cb: function
+
+ :param ensure_callback: A callback that ensures we know the target
+ replica uid if the target replica was just
+ created.
+ :type ensure_callback: function
+
+ :param defer_decryption: Whether to defer the decryption process using
+ the intermediate database. If False,
+ decryption will be done inline.
+ :type defer_decryption: bool
+
+ :return: A deferred which fires with the new generation and
+ transaction id of the target replica.
+ :rtype: twisted.internet.defer.Deferred
+ """
+
+ self._ensure_callback = ensure_callback
+
+ if sync_id is None:
+ sync_id = str(uuid4())
+ self.source_replica_uid = source_replica_uid
+
+ # save a reference to the callback so we can use it after decrypting
+ self._insert_doc_cb = insert_doc_cb
+
+ gen_after_send, trans_id_after_send = yield self._send_docs(
+ docs_by_generation,
+ last_known_generation,
+ last_known_trans_id,
+ sync_id)
+
+ cur_target_gen, cur_target_trans_id = yield self._receive_docs(
+ last_known_generation, last_known_trans_id,
+ ensure_callback, sync_id,
+ defer_decryption=defer_decryption)
+
+ # update gen and trans id info in case we just sent and did not
+ # receive docs.
+ if gen_after_send is not None and gen_after_send > cur_target_gen:
+ cur_target_gen = gen_after_send
+ cur_target_trans_id = trans_id_after_send
+
+ defer.returnValue([cur_target_gen, cur_target_trans_id])
+
+
+def _unauth_to_invalid_token_error(failure):
+ """
+ An errback to translate unauthorized errors to our own invalid token
+ class.
+
+ :param failure: The original failure.
+ :type failure: twisted.python.failure.Failure
+
+ :return: Either the original failure or an invalid auth token error.
+ :rtype: twisted.python.failure.Failure
+ """
+ failure.trap(Error)
+ if failure.getErrorMessage() == "401 Unauthorized":
+ raise InvalidAuthTokenError
+ return failure
diff --git a/client/src/leap/soledad/client/http_target/fetch.py b/client/src/leap/soledad/client/http_target/fetch.py
new file mode 100644
index 00000000..65e576d9
--- /dev/null
+++ b/client/src/leap/soledad/client/http_target/fetch.py
@@ -0,0 +1,252 @@
+# -*- coding: utf-8 -*-
+# fetch.py
+# Copyright (C) 2015 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import logging
+import json
+from u1db import errors
+from u1db.remote import utils
+from twisted.internet import defer
+from leap.soledad.common.document import SoledadDocument
+from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS
+from leap.soledad.client.events import emit_async
+from leap.soledad.client.crypto import is_symmetrically_encrypted
+from leap.soledad.client.encdecpool import SyncDecrypterPool
+from leap.soledad.client.http_target.support import RequestBody
+
+logger = logging.getLogger(__name__)
+
+
+class HTTPDocFetcher(object):
+ """
+ Handles Document fetching from Soledad server, using HTTP as transport.
+ Steps:
+ * Prepares metadata by asking server for one document
+ * Fetch the total on response and prepare to ask all remaining
+ * (async) Documents will come encrypted.
+ So we parse, decrypt and insert locally as they arrive.
+ """
+
+ @defer.inlineCallbacks
+ def _receive_docs(self, last_known_generation, last_known_trans_id,
+ ensure_callback, sync_id, defer_decryption):
+
+ self._queue_for_decrypt = defer_decryption \
+ and self._sync_db is not None
+
+ new_generation = last_known_generation
+ new_transaction_id = last_known_trans_id
+
+ if self._queue_for_decrypt:
+ logger.debug(
+ "Soledad sync: will queue received docs for decrypting.")
+
+ if defer_decryption:
+ self._setup_sync_decr_pool()
+
+ # ---------------------------------------------------------------------
+ # maybe receive the first document
+ # ---------------------------------------------------------------------
+
+ # we fetch the first document before fetching the rest because we need
+ # to know the total number of documents to be received, and this
+ # information comes as metadata to each request.
+
+ doc = yield self._receive_one_doc(
+ last_known_generation, last_known_trans_id,
+ sync_id, 0)
+ self._received_docs = 0
+ number_of_changes, ngen, ntrans = self._insert_received_doc(doc, 1, 1)
+
+ if ngen:
+ new_generation = ngen
+ new_transaction_id = ntrans
+
+ if defer_decryption:
+ self._sync_decr_pool.start(number_of_changes)
+
+ # ---------------------------------------------------------------------
+ # maybe receive the rest of the documents
+ # ---------------------------------------------------------------------
+
+ # launch many asynchronous fetches and inserts of received documents
+ # in the temporary sync db. Will wait for all results before
+ # continuing.
+
+ received = 1
+ deferreds = []
+ while received < number_of_changes:
+ d = self._receive_one_doc(
+ last_known_generation,
+ last_known_trans_id, sync_id, received)
+ d.addCallback(
+ self._insert_received_doc,
+ received + 1, # the index of the current received doc
+ number_of_changes)
+ deferreds.append(d)
+ received += 1
+ results = yield defer.gatherResults(deferreds)
+
+ # get generation and transaction id of target after insertions
+ if deferreds:
+ _, new_generation, new_transaction_id = results.pop()
+
+ # ---------------------------------------------------------------------
+ # wait for async decryption to finish
+ # ---------------------------------------------------------------------
+
+ if defer_decryption:
+ yield self._sync_decr_pool.deferred
+ self._sync_decr_pool.stop()
+
+ defer.returnValue([new_generation, new_transaction_id])
+
+ def _receive_one_doc(self, last_known_generation,
+ last_known_trans_id, sync_id, received):
+ # add remote replica metadata to the request
+ body = RequestBody(
+ last_known_generation=last_known_generation,
+ last_known_trans_id=last_known_trans_id,
+ sync_id=sync_id,
+ ensure=self._ensure_callback is not None)
+ # inform server of how many documents have already been received
+ body.insert_info(received=received)
+ # send headers
+ return self._http_request(
+ self._url,
+ method='POST',
+ body=str(body),
+ content_type='application/x-soledad-sync-get')
+
+ def _insert_received_doc(self, response, idx, total):
+ """
+ Insert a received document into the local replica.
+
+ :param response: The body and headers of the response.
+ :type response: tuple(str, dict)
+ :param idx: The index count of the current operation.
+ :type idx: int
+ :param total: The total number of operations.
+ :type total: int
+ """
+ new_generation, new_transaction_id, number_of_changes, doc_id, \
+ rev, content, gen, trans_id = \
+ self._parse_received_doc_response(response)
+ if doc_id is not None:
+ # decrypt incoming document and insert into local database
+ # -------------------------------------------------------------
+ # symmetric decryption of document's contents
+ # -------------------------------------------------------------
+ # If arriving content was symmetrically encrypted, we decrypt it.
+ # We do it inline if defer_decryption flag is False or no sync_db
+ # was defined, otherwise we defer it writing it to the received
+ # docs table.
+ doc = SoledadDocument(doc_id, rev, content)
+ if is_symmetrically_encrypted(doc):
+ if self._queue_for_decrypt:
+ self._sync_decr_pool.insert_encrypted_received_doc(
+ doc.doc_id, doc.rev, doc.content, gen, trans_id,
+ idx)
+ else:
+ # defer_decryption is False or no-sync-db fallback
+ doc.set_json(self._crypto.decrypt_doc(doc))
+ self._insert_doc_cb(doc, gen, trans_id)
+ else:
+ # not symmetrically encrypted doc, insert it directly
+ # or save it in the decrypted stage.
+ if self._queue_for_decrypt:
+ self._sync_decr_pool.insert_received_doc(
+ doc.doc_id, doc.rev, doc.content, gen, trans_id,
+ idx)
+ else:
+ self._insert_doc_cb(doc, gen, trans_id)
+ # -------------------------------------------------------------
+ # end of symmetric decryption
+ # -------------------------------------------------------------
+ self._received_docs += 1
+ _emit_receive_status(self._received_docs, total)
+ return number_of_changes, new_generation, new_transaction_id
+
+ def _parse_received_doc_response(self, response):
+ """
+ Parse the response from the server containing the received document.
+
+ :param response: The body and headers of the response.
+ :type response: tuple(str, dict)
+
+ :return: (new_gen, new_trans_id, number_of_changes, doc_id, rev,
+ content, gen, trans_id)
+ :rtype: tuple
+ """
+ # decode incoming stream
+ parts = response.splitlines()
+ if not parts or parts[0] != '[' or parts[-1] != ']':
+ raise errors.BrokenSyncStream
+ data = parts[1:-1]
+ # decode metadata
+ try:
+ line, comma = utils.check_and_strip_comma(data[0])
+ metadata = None
+ except (IndexError):
+ raise errors.BrokenSyncStream
+ try:
+ metadata = json.loads(line)
+ new_generation = metadata['new_generation']
+ new_transaction_id = metadata['new_transaction_id']
+ number_of_changes = metadata['number_of_changes']
+ except (ValueError, KeyError):
+ raise errors.BrokenSyncStream
+ # make sure we have replica_uid from fresh new dbs
+ if self._ensure_callback and 'replica_uid' in metadata:
+ self._ensure_callback(metadata['replica_uid'])
+ # parse incoming document info
+ doc_id = None
+ rev = None
+ content = None
+ gen = None
+ trans_id = None
+ if number_of_changes > 0:
+ try:
+ entry = json.loads(data[1])
+ doc_id = entry['id']
+ rev = entry['rev']
+ content = entry['content']
+ gen = entry['gen']
+ trans_id = entry['trans_id']
+ except (IndexError, KeyError):
+ raise errors.BrokenSyncStream
+ return new_generation, new_transaction_id, number_of_changes, \
+ doc_id, rev, content, gen, trans_id
+
+ def _setup_sync_decr_pool(self):
+ """
+ Set up the SyncDecrypterPool for deferred decryption.
+ """
+ if self._sync_decr_pool is None and self._sync_db is not None:
+ # initialize syncing queue decryption pool
+ self._sync_decr_pool = SyncDecrypterPool(
+ self._crypto,
+ self._sync_db,
+ insert_doc_cb=self._insert_doc_cb,
+ source_replica_uid=self.source_replica_uid)
+
+
+def _emit_receive_status(received_docs, total):
+ content = {'received': received_docs, 'total': total}
+ emit_async(SOLEDAD_SYNC_RECEIVE_STATUS, content)
+
+ if received_docs % 20 == 0:
+ msg = "%d/%d" % (received_docs, total)
+ logger.debug("Sync receive status: %s" % msg)
diff --git a/client/src/leap/soledad/client/http_target/send.py b/client/src/leap/soledad/client/http_target/send.py
new file mode 100644
index 00000000..80483f0d
--- /dev/null
+++ b/client/src/leap/soledad/client/http_target/send.py
@@ -0,0 +1,102 @@
+# -*- coding: utf-8 -*-
+# send.py
+# Copyright (C) 2015 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import json
+import logging
+from twisted.internet import defer
+from leap.soledad.client.events import emit_async
+from leap.soledad.client.events import SOLEDAD_SYNC_SEND_STATUS
+from leap.soledad.client.http_target.support import RequestBody
+logger = logging.getLogger(__name__)
+
+
+class HTTPDocSender(object):
+ """
+ Handles Document uploading from Soledad server, using HTTP as transport.
+ They need to be encrypted and metadata prepared before sending.
+ """
+
+ @defer.inlineCallbacks
+ def _send_docs(self, docs_by_generation, last_known_generation,
+ last_known_trans_id, sync_id):
+
+ if not docs_by_generation:
+ defer.returnValue([None, None])
+
+ # add remote replica metadata to the request
+ body = RequestBody(
+ last_known_generation=last_known_generation,
+ last_known_trans_id=last_known_trans_id,
+ sync_id=sync_id,
+ ensure=self._ensure_callback is not None)
+ total = len(docs_by_generation)
+ for idx, entry in enumerate(docs_by_generation, 1):
+ yield self._prepare_one_doc(entry, body, idx, total)
+ result = yield self._http_request(
+ self._url,
+ method='POST',
+ body=body.pop(1),
+ content_type='application/x-soledad-sync-put')
+ if self._defer_encryption:
+ self._delete_sent(idx, docs_by_generation)
+ _emit_send_status(idx, total)
+ response_dict = json.loads(result)[0]
+ gen_after_send = response_dict['new_generation']
+ trans_id_after_send = response_dict['new_transaction_id']
+ defer.returnValue([gen_after_send, trans_id_after_send])
+
+ def _delete_sent(self, idx, docs_by_generation):
+ doc = docs_by_generation[idx - 1][0]
+ self._sync_enc_pool.delete_encrypted_doc(
+ doc.doc_id, doc.rev)
+
+ @defer.inlineCallbacks
+ def _prepare_one_doc(self, entry, body, idx, total):
+ doc, gen, trans_id = entry
+ content = yield self._encrypt_doc(doc)
+ body.insert_info(
+ id=doc.doc_id, rev=doc.rev, content=content, gen=gen,
+ trans_id=trans_id, number_of_docs=total,
+ doc_idx=idx)
+
+ def _encrypt_doc(self, doc):
+ d = None
+ if doc.is_tombstone():
+ d = defer.succeed(None)
+ elif not self._defer_encryption:
+ # fallback case, for tests
+ d = defer.succeed(self._crypto.encrypt_doc(doc))
+ else:
+
+ def _maybe_encrypt_doc_inline(doc_json):
+ if doc_json is None:
+ # the document is not marked as tombstone, but we got
+ # nothing from the sync db. As it is not encrypted
+ # yet, we force inline encryption.
+ return self._crypto.encrypt_doc(doc)
+ return doc_json
+
+ d = self._sync_enc_pool.get_encrypted_doc(doc.doc_id, doc.rev)
+ d.addCallback(_maybe_encrypt_doc_inline)
+ return d
+
+
+def _emit_send_status(idx, total):
+ content = {'sent': idx, 'total': total}
+ emit_async(SOLEDAD_SYNC_SEND_STATUS, content)
+
+ msg = "%d/%d" % (idx, total)
+ logger.debug("Sync send status: %s" % msg)
diff --git a/client/src/leap/soledad/client/http_target/support.py b/client/src/leap/soledad/client/http_target/support.py
new file mode 100644
index 00000000..44cd7089
--- /dev/null
+++ b/client/src/leap/soledad/client/http_target/support.py
@@ -0,0 +1,203 @@
+# -*- coding: utf-8 -*-
+# support.py
+# Copyright (C) 2015 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import warnings
+import json
+from u1db import errors
+from u1db.remote import http_errors
+from twisted.internet import defer
+from twisted.web.client import _ReadBodyProtocol
+from twisted.web.client import PartialDownloadError
+from twisted.web._newclient import ResponseDone
+from twisted.web._newclient import PotentialDataLoss
+
+
+# we want to make sure that HTTP errors will raise appropriate u1db errors,
+# that is, fire errbacks with the appropriate failures, in the context of
+# twisted. Because of that, we redefine the http body reader used by the HTTP
+# client below.
+
+class ReadBodyProtocol(_ReadBodyProtocol):
+ """
+ From original Twisted implementation, focused on adding our error
+ handling and ensuring that the proper u1db error is raised.
+ """
+
+ def __init__(self, response, deferred):
+ """
+ Initialize the protocol, additionally storing the response headers.
+ """
+ _ReadBodyProtocol.__init__(
+ self, response.code, response.phrase, deferred)
+ self.headers = response.headers
+
+ # ---8<--- snippet from u1db.remote.http_client, modified to use errbacks
+ def _error(self, respdic):
+ descr = respdic.get("error")
+ exc_cls = errors.wire_description_to_exc.get(descr)
+ if exc_cls is not None:
+ message = respdic.get("message")
+ self.deferred.errback(exc_cls(message))
+ # ---8<--- end of snippet from u1db.remote.http_client
+
+ def connectionLost(self, reason):
+ """
+ Deliver the accumulated response bytes to the waiting L{Deferred}, if
+ the response body has been completely received without error.
+ """
+ if reason.check(ResponseDone):
+
+ body = b''.join(self.dataBuffer)
+
+ # ---8<--- snippet from u1db.remote.http_client
+ if self.status in (200, 201):
+ self.deferred.callback(body)
+ elif self.status in http_errors.ERROR_STATUSES:
+ try:
+ respdic = json.loads(body)
+ except ValueError:
+ self.deferred.errback(
+ errors.HTTPError(self.status, body, self.headers))
+ else:
+ self._error(respdic)
+ # special cases
+ elif self.status == 503:
+ self.deferred.errback(errors.Unavailable(body, self.headers))
+ else:
+ self.deferred.errback(
+ errors.HTTPError(self.status, body, self.headers))
+ # ---8<--- end of snippet from u1db.remote.http_client
+
+ elif reason.check(PotentialDataLoss):
+ self.deferred.errback(
+ PartialDownloadError(self.status, self.message,
+ b''.join(self.dataBuffer)))
+ else:
+ self.deferred.errback(reason)
+
+
+def readBody(response):
+ """
+ Get the body of an L{IResponse} and return it as a byte string.
+
+ This is a helper function for clients that don't want to incrementally
+ receive the body of an HTTP response.
+
+ @param response: The HTTP response for which the body will be read.
+ @type response: L{IResponse} provider
+
+ @return: A L{Deferred} which will fire with the body of the response.
+ Cancelling it will close the connection to the server immediately.
+ """
+ def cancel(deferred):
+ """
+ Cancel a L{readBody} call, close the connection to the HTTP server
+ immediately, if it is still open.
+
+ @param deferred: The cancelled L{defer.Deferred}.
+ """
+ abort = getAbort()
+ if abort is not None:
+ abort()
+
+ d = defer.Deferred(cancel)
+ protocol = ReadBodyProtocol(response, d)
+
+ def getAbort():
+ return getattr(protocol.transport, 'abortConnection', None)
+
+ response.deliverBody(protocol)
+
+ if protocol.transport is not None and getAbort() is None:
+ warnings.warn(
+ 'Using readBody with a transport that does not have an '
+ 'abortConnection method',
+ category=DeprecationWarning,
+ stacklevel=2)
+
+ return d
+
+
+class RequestBody(object):
+ """
+ This class is a helper to generate send and fetch requests.
+ The expected format is something like:
+ [
+ {headers},
+ {entry1},
+ {...},
+ {entryN},
+ ]
+ """
+
+ def __init__(self, **header_dict):
+ """
+ Creates a new RequestBody holding header information.
+
+ :param header_dict: A dictionary with the headers.
+ :type header_dict: dict
+ """
+ self.headers = header_dict
+ self.entries = []
+
+ def insert_info(self, **entry_dict):
+ """
+ Dumps an entry into JSON format and add it to entries list.
+
+ :param entry_dict: Entry as a dictionary
+ :type entry_dict: dict
+
+ :return: length of the entry after JSON dumps
+ :rtype: int
+ """
+ entry = json.dumps(entry_dict)
+ self.entries.append(entry)
+ return len(entry)
+
+ def pop(self, number=1):
+ """
+ Removes an amount of entries and returns it formatted and ready
+ to be sent.
+
+ :param number: number of entries to pop and format
+ :type number: int
+
+ :return: formatted body ready to be sent
+ :rtype: str
+ """
+ entries = [self.entries.pop(0) for i in xrange(number)]
+ return self.entries_to_str(entries)
+
+ def __str__(self):
+ return self.entries_to_str(self.entries)
+
+ def __len__(self):
+ return len(self.entries)
+
+ def entries_to_str(self, entries=None):
+ """
+ Format a list of entries into the body format expected
+ by the server.
+
+ :param entries: entries to format
+ :type entries: list
+
+ :return: formatted body ready to be sent
+ :rtype: str
+ """
+ data = '[\r\n' + json.dumps(self.headers)
+ data += ''.join(',\r\n' + entry for entry in entries)
+ return data + '\r\n]'
diff --git a/client/src/leap/soledad/client/secrets.py b/client/src/leap/soledad/client/secrets.py
index ee3aacdb..c3c3dff5 100644
--- a/client/src/leap/soledad/client/secrets.py
+++ b/client/src/leap/soledad/client/secrets.py
@@ -261,6 +261,16 @@ class SoledadSecrets(object):
logger.info("Could not find a secret in local storage.")
return False
+ def _maybe_set_active_secret(self, active_secret):
+ """
+ If no secret_id is already set, choose the passed active secret, or
+ just choose first secret available if none.
+ """
+ if not self._secret_id:
+ if not active_secret:
+ active_secret = self._secrets.items()[0][0]
+ self.set_secret_id(active_secret)
+
def _load_secrets(self):
"""
Load storage secrets from local file.
@@ -270,12 +280,7 @@ class SoledadSecrets(object):
with open(self._secrets_path, 'r') as f:
content = json.loads(f.read())
_, active_secret = self._import_recovery_document(content)
- # choose first secret if no secret_id was given
- if self._secret_id is None:
- if active_secret is None:
- self.set_secret_id(self._secrets.items()[0][0])
- else:
- self.set_secret_id(active_secret)
+ self._maybe_set_active_secret(active_secret)
# enlarge secret if needed
enlarged = False
if len(self._secrets[self._secret_id]) < self.GEN_SECRET_LENGTH:
@@ -306,12 +311,8 @@ class SoledadSecrets(object):
'Found cryptographic secrets in shared recovery '
'database.')
_, active_secret = self._import_recovery_document(doc.content)
+ self._maybe_set_active_secret(active_secret)
self._store_secrets() # save new secrets in local file
- if self._secret_id is None:
- if active_secret is None:
- self.set_secret_id(self._secrets.items()[0][0])
- else:
- self.set_secret_id(active_secret)
else:
# STAGE 3 - there are no secrets in server also, so
# generate a secret and store it in remote db.
@@ -432,13 +433,13 @@ class SoledadSecrets(object):
:return: a document with encrypted key material in its contents
:rtype: document.SoledadDocument
"""
- events.emit(events.SOLEDAD_DOWNLOADING_KEYS, self._uuid)
+ events.emit_async(events.SOLEDAD_DOWNLOADING_KEYS, self._uuid)
db = self._shared_db
if not db:
logger.warning('No shared db found')
return
doc = db.get_doc(self._shared_db_doc_id())
- events.emit(events.SOLEDAD_DONE_DOWNLOADING_KEYS, self._uuid)
+ events.emit_async(events.SOLEDAD_DONE_DOWNLOADING_KEYS, self._uuid)
return doc
def _put_secrets_in_shared_db(self):
@@ -461,13 +462,13 @@ class SoledadSecrets(object):
# fill doc with encrypted secrets
doc.content = self._export_recovery_document()
# upload secrets to server
- events.emit(events.SOLEDAD_UPLOADING_KEYS, self._uuid)
+ events.emit_async(events.SOLEDAD_UPLOADING_KEYS, self._uuid)
db = self._shared_db
if not db:
logger.warning('No shared db found')
return
db.put_doc(doc)
- events.emit(events.SOLEDAD_DONE_UPLOADING_KEYS, self._uuid)
+ events.emit_async(events.SOLEDAD_DONE_UPLOADING_KEYS, self._uuid)
#
# Management of secret for symmetric encryption.
@@ -587,13 +588,13 @@ class SoledadSecrets(object):
:return: The id of the generated secret.
:rtype: str
"""
- events.emit(events.SOLEDAD_CREATING_KEYS, self._uuid)
+ events.emit_async(events.SOLEDAD_CREATING_KEYS, self._uuid)
# generate random secret
secret = os.urandom(self.GEN_SECRET_LENGTH)
secret_id = sha256(secret).hexdigest()
self._secrets[secret_id] = secret
self._store_secrets()
- events.emit(events.SOLEDAD_DONE_CREATING_KEYS, self._uuid)
+ events.emit_async(events.SOLEDAD_DONE_CREATING_KEYS, self._uuid)
return secret_id
def _store_secrets(self):
diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py
index 2151884a..22ddc87d 100644
--- a/client/src/leap/soledad/client/sqlcipher.py
+++ b/client/src/leap/soledad/client/sqlcipher.py
@@ -559,6 +559,7 @@ class SQLCipherU1DBSync(SQLCipherDatabase):
"""
Close the syncer and syncdb orderly
"""
+ super(SQLCipherU1DBSync, self).close()
# close all open syncers
for url in self._syncers.keys():
_, syncer = self._syncers[url]