summaryrefslogtreecommitdiff
path: root/src/leap/soledad/client/http_target
diff options
context:
space:
mode:
Diffstat (limited to 'src/leap/soledad/client/http_target')
-rw-r--r--src/leap/soledad/client/http_target/__init__.py94
-rw-r--r--src/leap/soledad/client/http_target/api.py248
-rw-r--r--src/leap/soledad/client/http_target/fetch.py161
-rw-r--r--src/leap/soledad/client/http_target/fetch_protocol.py157
-rw-r--r--src/leap/soledad/client/http_target/send.py107
-rw-r--r--src/leap/soledad/client/http_target/send_protocol.py75
-rw-r--r--src/leap/soledad/client/http_target/support.py220
7 files changed, 1062 insertions, 0 deletions
diff --git a/src/leap/soledad/client/http_target/__init__.py b/src/leap/soledad/client/http_target/__init__.py
new file mode 100644
index 00000000..b67d03f6
--- /dev/null
+++ b/src/leap/soledad/client/http_target/__init__.py
@@ -0,0 +1,94 @@
+# -*- coding: utf-8 -*-
+# __init__.py
+# Copyright (C) 2015 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+"""
+A U1DB backend for encrypting data before sending to server and decrypting
+after receiving.
+"""
+
+
+import os
+
+from twisted.web.client import Agent
+from twisted.internet import reactor
+
+from leap.common.certs import get_compatible_ssl_context_factory
+from leap.soledad.common.log import getLogger
+from leap.soledad.client.http_target.send import HTTPDocSender
+from leap.soledad.client.http_target.api import SyncTargetAPI
+from leap.soledad.client.http_target.fetch import HTTPDocFetcher
+from leap.soledad.client import crypto as old_crypto
+
+
+logger = getLogger(__name__)
+
+
+# we may want to collect statistics from the sync process
+DO_STATS = False
+if os.environ.get('SOLEDAD_STATS'):
+ DO_STATS = True
+
+
+class SoledadHTTPSyncTarget(SyncTargetAPI, HTTPDocSender, HTTPDocFetcher):
+
+ """
+ A SyncTarget that encrypts data before sending and decrypts data after
+ receiving.
+
+ Normally encryption will have been written to the sync database upon
+ document modification. The sync database is also used to write temporarily
+ the parsed documents that the remote send us, before being decrypted and
+ written to the main database.
+ """
+ def __init__(self, url, source_replica_uid, creds, crypto, cert_file):
+ """
+ Initialize the sync target.
+
+ :param url: The server sync url.
+ :type url: str
+ :param source_replica_uid: The source replica uid which we use when
+ deferring decryption.
+ :type source_replica_uid: str
+ :param creds: A dictionary containing the uuid and token.
+ :type creds: creds
+ :param crypto: An instance of SoledadCrypto so we can encrypt/decrypt
+ document contents when syncing.
+ :type crypto: soledad._crypto.SoledadCrypto
+ :param cert_file: Path to the certificate of the ca used to validate
+ the SSL certificate used by the remote soledad
+ server.
+ :type cert_file: str
+ """
+ if url.endswith("/"):
+ url = url[:-1]
+ self._url = str(url) + "/sync-from/" + str(source_replica_uid)
+ self.source_replica_uid = source_replica_uid
+ self._auth_header = None
+ self._uuid = None
+ self.set_creds(creds)
+ self._crypto = crypto
+ # TODO: DEPRECATED CRYPTO
+ self._deprecated_crypto = old_crypto.SoledadCrypto(crypto.secret)
+ self._insert_doc_cb = None
+
+ # Twisted default Agent with our own ssl context factory
+ factory = get_compatible_ssl_context_factory(cert_file)
+ self._http = Agent(reactor, factory)
+
+ if DO_STATS:
+ self.sync_exchange_phase = [0]
diff --git a/src/leap/soledad/client/http_target/api.py b/src/leap/soledad/client/http_target/api.py
new file mode 100644
index 00000000..c68185c6
--- /dev/null
+++ b/src/leap/soledad/client/http_target/api.py
@@ -0,0 +1,248 @@
+# -*- coding: utf-8 -*-
+# api.py
+# Copyright (C) 2015 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import os
+import json
+import base64
+
+from six import StringIO
+from uuid import uuid4
+
+from twisted.internet import defer
+from twisted.web.http_headers import Headers
+from twisted.web.client import FileBodyProducer
+
+from leap.soledad.client.http_target.support import readBody
+from leap.soledad.common.errors import InvalidAuthTokenError
+from leap.soledad.common.l2db.errors import HTTPError
+from leap.soledad.common.l2db import SyncTarget
+
+
+# we may want to collect statistics from the sync process
+DO_STATS = False
+if os.environ.get('SOLEDAD_STATS'):
+ DO_STATS = True
+
+
+class SyncTargetAPI(SyncTarget):
+ """
+ Declares public methods and implements u1db.SyncTarget.
+ """
+
+ @property
+ def uuid(self):
+ return self._uuid
+
+ def set_creds(self, creds):
+ """
+ Update credentials.
+
+ :param creds: A dictionary containing the uuid and token.
+ :type creds: dict
+ """
+ uuid = creds['token']['uuid']
+ token = creds['token']['token']
+ self._uuid = uuid
+ auth = '%s:%s' % (uuid, token)
+ b64_token = base64.b64encode(auth)
+ self._auth_header = {'Authorization': ['Token %s' % b64_token]}
+
+ @property
+ def _base_header(self):
+ return self._auth_header.copy() if self._auth_header else {}
+
+ def _http_request(self, url, method='GET', body=None, headers=None,
+ content_type=None, body_reader=readBody,
+ body_producer=None):
+ headers = headers or self._base_header
+ if content_type:
+ headers.update({'content-type': [content_type]})
+ if not body_producer and body:
+ body = FileBodyProducer(StringIO(body))
+ elif body_producer:
+ # Upload case, check send.py
+ body = body_producer(body)
+ d = self._http.request(
+ method, url, headers=Headers(headers), bodyProducer=body)
+ d.addCallback(body_reader)
+ d.addErrback(_unauth_to_invalid_token_error)
+ return d
+
+ @defer.inlineCallbacks
+ def get_sync_info(self, source_replica_uid):
+ """
+ Return information about known state of remote database.
+
+ Return the replica_uid and the current database generation of the
+ remote database, and its last-seen database generation for the client
+ replica.
+
+ :param source_replica_uid: The client-size replica uid.
+ :type source_replica_uid: str
+
+ :return: A deferred which fires with (target_replica_uid,
+ target_replica_generation, target_trans_id,
+ source_replica_last_known_generation,
+ source_replica_last_known_transaction_id)
+ :rtype: twisted.internet.defer.Deferred
+ """
+ raw = yield self._http_request(self._url)
+ res = json.loads(raw)
+ defer.returnValue((
+ res['target_replica_uid'],
+ res['target_replica_generation'],
+ res['target_replica_transaction_id'],
+ res['source_replica_generation'],
+ res['source_transaction_id']
+ ))
+
+ def record_sync_info(
+ self, source_replica_uid, source_replica_generation,
+ source_replica_transaction_id):
+ """
+ Record tip information for another replica.
+
+ After sync_exchange has been processed, the caller will have
+ received new content from this replica. This call allows the
+ source replica instigating the sync to inform us what their
+ generation became after applying the documents we returned.
+
+ This is used to allow future sync operations to not need to repeat data
+ that we just talked about. It also means that if this is called at the
+ wrong time, there can be database records that will never be
+ synchronized.
+
+ :param source_replica_uid: The identifier for the source replica.
+ :type source_replica_uid: str
+ :param source_replica_generation: The database generation for the
+ source replica.
+ :type source_replica_generation: int
+ :param source_replica_transaction_id: The transaction id associated
+ with the source replica
+ generation.
+ :type source_replica_transaction_id: str
+
+ :return: A deferred which fires with the result of the query.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ data = json.dumps({
+ 'generation': source_replica_generation,
+ 'transaction_id': source_replica_transaction_id
+ })
+ return self._http_request(
+ self._url,
+ method='PUT',
+ body=data,
+ content_type='application/json')
+
+ @defer.inlineCallbacks
+ def sync_exchange(self, docs_by_generation, source_replica_uid,
+ last_known_generation, last_known_trans_id,
+ insert_doc_cb, ensure_callback=None,
+ sync_id=None):
+ """
+ Find out which documents the remote database does not know about,
+ encrypt and send them. After that, receive documents from the remote
+ database.
+
+ :param docs_by_generations: A list of (doc_id, generation, trans_id)
+ of local documents that were changed since
+ the last local generation the remote
+ replica knows about.
+ :type docs_by_generations: list of tuples
+
+ :param source_replica_uid: The uid of the source replica.
+ :type source_replica_uid: str
+
+ :param last_known_generation: Target's last known generation.
+ :type last_known_generation: int
+
+ :param last_known_trans_id: Target's last known transaction id.
+ :type last_known_trans_id: str
+
+ :param insert_doc_cb: A callback for inserting received documents from
+ target. If not overriden, this will call u1db
+ insert_doc_from_target in synchronizer, which
+ implements the TAKE OTHER semantics.
+ :type insert_doc_cb: function
+
+ :param ensure_callback: A callback that ensures we know the target
+ replica uid if the target replica was just
+ created.
+ :type ensure_callback: function
+
+ :return: A deferred which fires with the new generation and
+ transaction id of the target replica.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ # ---------- phase 1: send docs to server ----------------------------
+ if DO_STATS:
+ self.sync_exchange_phase[0] += 1
+ # --------------------------------------------------------------------
+
+ self._ensure_callback = ensure_callback
+
+ if sync_id is None:
+ sync_id = str(uuid4())
+ self.source_replica_uid = source_replica_uid
+
+ # save a reference to the callback so we can use it after decrypting
+ self._insert_doc_cb = insert_doc_cb
+
+ gen_after_send, trans_id_after_send = yield self._send_docs(
+ docs_by_generation,
+ last_known_generation,
+ last_known_trans_id,
+ sync_id)
+
+ # ---------- phase 2: receive docs -----------------------------------
+ if DO_STATS:
+ self.sync_exchange_phase[0] += 1
+ # --------------------------------------------------------------------
+
+ cur_target_gen, cur_target_trans_id = yield self._receive_docs(
+ last_known_generation, last_known_trans_id,
+ ensure_callback, sync_id)
+
+ # update gen and trans id info in case we just sent and did not
+ # receive docs.
+ if gen_after_send is not None and gen_after_send > cur_target_gen:
+ cur_target_gen = gen_after_send
+ cur_target_trans_id = trans_id_after_send
+
+ # ---------- phase 3: sync exchange is over --------------------------
+ if DO_STATS:
+ self.sync_exchange_phase[0] += 1
+ # --------------------------------------------------------------------
+
+ defer.returnValue([cur_target_gen, cur_target_trans_id])
+
+
+def _unauth_to_invalid_token_error(failure):
+ """
+ An errback to translate unauthorized errors to our own invalid token
+ class.
+
+ :param failure: The original failure.
+ :type failure: twisted.python.failure.Failure
+
+ :return: Either the original failure or an invalid auth token error.
+ :rtype: twisted.python.failure.Failure
+ """
+ failure.trap(HTTPError)
+ if failure.value.status == 401:
+ raise InvalidAuthTokenError
+ return failure
diff --git a/src/leap/soledad/client/http_target/fetch.py b/src/leap/soledad/client/http_target/fetch.py
new file mode 100644
index 00000000..9d456830
--- /dev/null
+++ b/src/leap/soledad/client/http_target/fetch.py
@@ -0,0 +1,161 @@
+# -*- coding: utf-8 -*-
+# fetch.py
+# Copyright (C) 2015 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import json
+from twisted.internet import defer
+from twisted.internet import threads
+
+from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS
+from leap.soledad.client.events import emit_async
+from leap.soledad.client.http_target.support import RequestBody
+from leap.soledad.common.log import getLogger
+from leap.soledad.client._crypto import is_symmetrically_encrypted
+from leap.soledad.common.l2db import errors
+from leap.soledad.client import crypto as old_crypto
+
+from .._document import Document
+from . import fetch_protocol
+
+logger = getLogger(__name__)
+
+
+class HTTPDocFetcher(object):
+ """
+ Handles Document fetching from Soledad server, using HTTP as transport.
+ Steps:
+ * Prepares metadata by asking server for one document
+ * Fetch the total on response and prepare to ask all remaining
+ * (async) Documents will come encrypted.
+ So we parse, decrypt and insert locally as they arrive.
+ """
+
+ # The uuid of the local replica.
+ # Any class inheriting from this one should provide a meaningful attribute
+ # if the sync status event is meant to be used somewhere else.
+
+ uuid = 'undefined'
+ userid = 'undefined'
+
+ @defer.inlineCallbacks
+ def _receive_docs(self, last_known_generation, last_known_trans_id,
+ ensure_callback, sync_id):
+ new_generation = last_known_generation
+ new_transaction_id = last_known_trans_id
+ # Acts as a queue, ensuring line order on async processing
+ # as `self._insert_doc_cb` cant be run concurrently or out of order.
+ # DeferredSemaphore solves the concurrency and its implementation uses
+ # a queue, solving the ordering.
+ # FIXME: Find a proper solution to avoid surprises on Twisted changes
+ self.semaphore = defer.DeferredSemaphore(1)
+
+ metadata = yield self._fetch_all(
+ last_known_generation, last_known_trans_id,
+ sync_id)
+ number_of_changes, ngen, ntrans = self._parse_metadata(metadata)
+
+ # wait for pending inserts
+ yield self.semaphore.acquire()
+
+ if ngen:
+ new_generation = ngen
+ new_transaction_id = ntrans
+
+ defer.returnValue([new_generation, new_transaction_id])
+
+ def _fetch_all(self, last_known_generation,
+ last_known_trans_id, sync_id):
+ # add remote replica metadata to the request
+ body = RequestBody(
+ last_known_generation=last_known_generation,
+ last_known_trans_id=last_known_trans_id,
+ sync_id=sync_id,
+ ensure=self._ensure_callback is not None)
+ self._received_docs = 0
+ # build a stream reader with _doc_parser as a callback
+ body_reader = fetch_protocol.build_body_reader(self._doc_parser)
+ # start download stream
+ return self._http_request(
+ self._url,
+ method='POST',
+ body=str(body),
+ content_type='application/x-soledad-sync-get',
+ body_reader=body_reader)
+
+ @defer.inlineCallbacks
+ def _doc_parser(self, doc_info, content, total):
+ """
+ Insert a received document into the local replica, decrypting
+ if necessary. The case where it's not decrypted is when a doc gets
+ inserted from Server side with a GPG encrypted content.
+
+ :param doc_info: Dictionary representing Document information.
+ :type doc_info: dict
+ :param content: The Document's content.
+ :type idx: str
+ :param total: The total number of operations.
+ :type total: int
+ """
+ yield self.semaphore.run(self.__atomic_doc_parse, doc_info, content,
+ total)
+
+ @defer.inlineCallbacks
+ def __atomic_doc_parse(self, doc_info, content, total):
+ doc = Document(doc_info['id'], doc_info['rev'], content)
+ if is_symmetrically_encrypted(content):
+ content = (yield self._crypto.decrypt_doc(doc)).getvalue()
+ elif old_crypto.is_symmetrically_encrypted(doc):
+ content = self._deprecated_crypto.decrypt_doc(doc)
+ doc.set_json(content)
+
+ # TODO insert blobs here on the blob backend
+ # FIXME: This is wrong. Using the very same SQLite connection object
+ # from multiple threads is dangerous. We should bring the dbpool here
+ # or find an alternative. Deferring to a thread only helps releasing
+ # the reactor for other tasks as this is an IO intensive call.
+ yield threads.deferToThread(self._insert_doc_cb,
+ doc, doc_info['gen'], doc_info['trans_id'])
+ self._received_docs += 1
+ user_data = {'uuid': self.uuid, 'userid': self.userid}
+ _emit_receive_status(user_data, self._received_docs, total=total)
+
+ def _parse_metadata(self, metadata):
+ """
+ Parse the response from the server containing the sync metadata.
+
+ :param response: Metadata as string
+ :type response: str
+
+ :return: (number_of_changes, new_gen, new_trans_id)
+ :rtype: tuple
+ """
+ try:
+ metadata = json.loads(metadata)
+ # make sure we have replica_uid from fresh new dbs
+ if self._ensure_callback and 'replica_uid' in metadata:
+ self._ensure_callback(metadata['replica_uid'])
+ return (metadata['number_of_changes'], metadata['new_generation'],
+ metadata['new_transaction_id'])
+ except (ValueError, KeyError):
+ raise errors.BrokenSyncStream('Metadata parsing failed')
+
+
+def _emit_receive_status(user_data, received_docs, total):
+ content = {'received': received_docs, 'total': total}
+ emit_async(SOLEDAD_SYNC_RECEIVE_STATUS, user_data, content)
+
+ if received_docs % 20 == 0:
+ msg = "%d/%d" % (received_docs, total)
+ logger.debug("Sync receive status: %s" % msg)
diff --git a/src/leap/soledad/client/http_target/fetch_protocol.py b/src/leap/soledad/client/http_target/fetch_protocol.py
new file mode 100644
index 00000000..851eb3a1
--- /dev/null
+++ b/src/leap/soledad/client/http_target/fetch_protocol.py
@@ -0,0 +1,157 @@
+# -*- coding: utf-8 -*-
+# fetch_protocol.py
+# Copyright (C) 2016 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import json
+from functools import partial
+from six import StringIO
+from twisted.web._newclient import ResponseDone
+from leap.soledad.common.l2db import errors
+from leap.soledad.common.l2db.remote import utils
+from leap.soledad.common.log import getLogger
+from .support import ReadBodyProtocol
+from .support import readBody
+
+logger = getLogger(__name__)
+
+
+class DocStreamReceiver(ReadBodyProtocol):
+ """
+ A protocol implementation that can parse incoming data from server based
+ on a line format specified on u1db implementation. Except that we split doc
+ attributes from content to ease parsing and increment throughput for larger
+ documents.
+ [\r\n
+ {metadata},\r\n
+ {doc_info},\r\n
+ {content},\r\n
+ ...
+ {doc_info},\r\n
+ {content},\r\n
+ ]
+ """
+
+ def __init__(self, response, deferred, doc_reader):
+ self.deferred = deferred
+ self.status = response.code if response else None
+ self.message = response.phrase if response else None
+ self.headers = response.headers if response else {}
+ self.delimiter = '\r\n'
+ self.metadata = ''
+ self._doc_reader = doc_reader
+ self.reset()
+
+ def reset(self):
+ self._line = 0
+ self._buffer = StringIO()
+ self._properly_finished = False
+
+ def connectionLost(self, reason):
+ """
+ Deliver the accumulated response bytes to the waiting L{Deferred}, if
+ the response body has been completely received without error.
+ """
+ if self.deferred.called:
+ return
+ try:
+ if reason.check(ResponseDone):
+ self.dataBuffer = self.metadata
+ else:
+ self.dataBuffer = self.finish()
+ except errors.BrokenSyncStream as e:
+ return self.deferred.errback(e)
+ return ReadBodyProtocol.connectionLost(self, reason)
+
+ def consumeBufferLines(self):
+ """
+ Consumes lines from buffer and rewind it, writing remaining data
+ that didn't formed a line back into buffer.
+ """
+ content = self._buffer.getvalue()[0:self._buffer.tell()]
+ self._buffer.seek(0)
+ lines = content.split(self.delimiter)
+ self._buffer.write(lines.pop(-1))
+ return lines
+
+ def dataReceived(self, data):
+ """
+ Buffer incoming data until a line breaks comes in. We check only
+ the incoming data for efficiency.
+ """
+ self._buffer.write(data)
+ if '\n' not in data:
+ return
+ lines = self.consumeBufferLines()
+ while lines:
+ line, _ = utils.check_and_strip_comma(lines.pop(0))
+ self.lineReceived(line)
+ self._line += 1
+
+ def lineReceived(self, line):
+ """
+ Protocol implementation.
+ 0: [\r\n
+ 1: {metadata},\r\n
+ (even): {doc_info},\r\n
+ (odd): {data},\r\n
+ (last): ]
+ """
+ if self._properly_finished:
+ raise errors.BrokenSyncStream("Reading a finished stream")
+ if ']' == line:
+ self._properly_finished = True
+ elif self._line == 0:
+ if line is not '[':
+ raise errors.BrokenSyncStream("Invalid start")
+ elif self._line == 1:
+ self.metadata = line
+ if 'error' in self.metadata:
+ raise errors.BrokenSyncStream("Error from server: %s" % line)
+ self.total = json.loads(line).get('number_of_changes', -1)
+ elif (self._line % 2) == 0:
+ self.current_doc = json.loads(line)
+ if 'error' in self.current_doc:
+ raise errors.BrokenSyncStream("Error from server: %s" % line)
+ else:
+ d = self._doc_reader(
+ self.current_doc, line.strip() or None, self.total)
+ d.addErrback(self.deferred.errback)
+
+ def finish(self):
+ """
+ Checks that ']' came and stream was properly closed.
+ """
+ if not self._properly_finished:
+ raise errors.BrokenSyncStream('Stream not properly closed')
+ content = self._buffer.getvalue()[0:self._buffer.tell()]
+ self._buffer.close()
+ return content
+
+
+def build_body_reader(doc_reader):
+ """
+ Get the documents from a sync stream and call doc_reader on each
+ doc received.
+
+ @param doc_reader: Function to be called for processing an incoming doc.
+ Will be called with doc metadata (dict parsed from 1st line) and doc
+ content (string)
+ @type doc_reader: function
+
+ @return: A function that can be called by the http Agent to create and
+ configure the proper protocol.
+ """
+ protocolClass = partial(DocStreamReceiver, doc_reader=doc_reader)
+ return partial(readBody, protocolClass=protocolClass)
diff --git a/src/leap/soledad/client/http_target/send.py b/src/leap/soledad/client/http_target/send.py
new file mode 100644
index 00000000..2b286ec5
--- /dev/null
+++ b/src/leap/soledad/client/http_target/send.py
@@ -0,0 +1,107 @@
+# -*- coding: utf-8 -*-
+# send.py
+# Copyright (C) 2015 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import json
+
+from twisted.internet import defer
+
+from leap.soledad.common.log import getLogger
+from leap.soledad.client.events import emit_async
+from leap.soledad.client.events import SOLEDAD_SYNC_SEND_STATUS
+from leap.soledad.client.http_target.support import RequestBody
+from .send_protocol import DocStreamProducer
+
+logger = getLogger(__name__)
+
+
+class HTTPDocSender(object):
+ """
+ Handles Document uploading from Soledad server, using HTTP as transport.
+ They need to be encrypted and metadata prepared before sending.
+ """
+
+ # The uuid of the local replica.
+ # Any class inheriting from this one should provide a meaningful attribute
+ # if the sync status event is meant to be used somewhere else.
+
+ uuid = 'undefined'
+ userid = 'undefined'
+
+ @defer.inlineCallbacks
+ def _send_docs(self, docs_by_generation, last_known_generation,
+ last_known_trans_id, sync_id):
+
+ if not docs_by_generation:
+ defer.returnValue([None, None])
+
+ # add remote replica metadata to the request
+ body = RequestBody(
+ last_known_generation=last_known_generation,
+ last_known_trans_id=last_known_trans_id,
+ sync_id=sync_id,
+ ensure=self._ensure_callback is not None)
+ result = yield self._send_batch(body, docs_by_generation)
+ response_dict = json.loads(result)[0]
+ gen_after_send = response_dict['new_generation']
+ trans_id_after_send = response_dict['new_transaction_id']
+ defer.returnValue([gen_after_send, trans_id_after_send])
+
+ @defer.inlineCallbacks
+ def _send_batch(self, body, docs):
+ total, calls = len(docs), []
+ for i, entry in enumerate(docs):
+ calls.append((self._prepare_one_doc,
+ entry, body, i + 1, total))
+ result = yield self._send_request(body, calls)
+ _emit_send_status(self.uuid, body.consumed, total)
+
+ defer.returnValue(result)
+
+ def _send_request(self, body, calls):
+ return self._http_request(
+ self._url,
+ method='POST',
+ body=(body, calls),
+ content_type='application/x-soledad-sync-put',
+ body_producer=DocStreamProducer)
+
+ @defer.inlineCallbacks
+ def _prepare_one_doc(self, entry, body, idx, total):
+ get_doc_call, gen, trans_id = entry
+ doc, content = yield self._encrypt_doc(get_doc_call)
+ body.insert_info(
+ id=doc.doc_id, rev=doc.rev, content=content, gen=gen,
+ trans_id=trans_id, number_of_docs=total,
+ doc_idx=idx)
+ _emit_send_status(self.uuid, body.consumed, total)
+
+ @defer.inlineCallbacks
+ def _encrypt_doc(self, get_doc_call):
+ f, args, kwargs = get_doc_call
+ doc = yield f(*args, **kwargs)
+ if doc.is_tombstone():
+ defer.returnValue((doc, None))
+ else:
+ content = yield self._crypto.encrypt_doc(doc)
+ defer.returnValue((doc, content))
+
+
+def _emit_send_status(user_data, idx, total):
+ content = {'sent': idx, 'total': total}
+ emit_async(SOLEDAD_SYNC_SEND_STATUS, user_data, content)
+
+ msg = "%d/%d" % (idx, total)
+ logger.debug("Sync send status: %s" % msg)
diff --git a/src/leap/soledad/client/http_target/send_protocol.py b/src/leap/soledad/client/http_target/send_protocol.py
new file mode 100644
index 00000000..4941aa34
--- /dev/null
+++ b/src/leap/soledad/client/http_target/send_protocol.py
@@ -0,0 +1,75 @@
+# -*- coding: utf-8 -*-
+# send_protocol.py
+# Copyright (C) 2016 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+from zope.interface import implementer
+from twisted.internet import defer
+from twisted.internet import reactor
+from twisted.web.iweb import IBodyProducer
+from twisted.web.iweb import UNKNOWN_LENGTH
+
+
+@implementer(IBodyProducer)
+class DocStreamProducer(object):
+ """
+ A producer that writes the body of a request to a consumer.
+ """
+
+ def __init__(self, producer):
+ """
+ Initialize the string produer.
+
+ :param producer: A RequestBody instance and a list of producer calls
+ :type producer: (.support.RequestBody, [(function, *args)])
+ """
+ self.body, self.producer = producer
+ self.length = UNKNOWN_LENGTH
+ self.pause = False
+ self.stop = False
+
+ @defer.inlineCallbacks
+ def startProducing(self, consumer):
+ """
+ Write the body to the consumer.
+
+ :param consumer: Any IConsumer provider.
+ :type consumer: twisted.internet.interfaces.IConsumer
+
+ :return: A Deferred that fires when production ends.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ while self.producer and not self.stop:
+ if self.pause:
+ yield self.sleep(0.001)
+ continue
+ call = self.producer.pop(0)
+ fun, args = call[0], call[1:]
+ yield fun(*args)
+ consumer.write(self.body.pop(1, leave_open=True))
+ consumer.write(self.body.pop(0)) # close stream
+
+ def sleep(self, secs):
+ d = defer.Deferred()
+ reactor.callLater(secs, d.callback, None)
+ return d
+
+ def pauseProducing(self):
+ self.pause = True
+
+ def stopProducing(self):
+ self.stop = True
+
+ def resumeProducing(self):
+ self.pause = False
diff --git a/src/leap/soledad/client/http_target/support.py b/src/leap/soledad/client/http_target/support.py
new file mode 100644
index 00000000..d8d8e420
--- /dev/null
+++ b/src/leap/soledad/client/http_target/support.py
@@ -0,0 +1,220 @@
+# -*- coding: utf-8 -*-
+# support.py
+# Copyright (C) 2015 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import warnings
+import json
+
+from twisted.internet import defer
+from twisted.web.client import _ReadBodyProtocol
+from twisted.web.client import PartialDownloadError
+from twisted.web._newclient import ResponseDone
+from twisted.web._newclient import PotentialDataLoss
+
+from leap.soledad.common.l2db import errors
+from leap.soledad.common.l2db.remote import http_errors
+
+# we want to make sure that HTTP errors will raise appropriate u1db errors,
+# that is, fire errbacks with the appropriate failures, in the context of
+# twisted. Because of that, we redefine the http body reader used by the HTTP
+# client below.
+
+
+class ReadBodyProtocol(_ReadBodyProtocol):
+ """
+ From original Twisted implementation, focused on adding our error
+ handling and ensuring that the proper u1db error is raised.
+ """
+
+ def __init__(self, response, deferred):
+ """
+ Initialize the protocol, additionally storing the response headers.
+ """
+ _ReadBodyProtocol.__init__(
+ self, response.code, response.phrase, deferred)
+ self.headers = response.headers
+
+ # ---8<--- snippet from u1db.remote.http_client, modified to use errbacks
+ def _error(self, respdic):
+ descr = respdic.get("error")
+ exc_cls = errors.wire_description_to_exc.get(descr)
+ if exc_cls is not None:
+ message = respdic.get("message")
+ self.deferred.errback(exc_cls(message))
+ else:
+ self.deferred.errback(
+ errors.HTTPError(self.status, respdic, self.headers))
+ # ---8<--- end of snippet from u1db.remote.http_client
+
+ def connectionLost(self, reason):
+ """
+ Deliver the accumulated response bytes to the waiting L{Deferred}, if
+ the response body has been completely received without error.
+ """
+ if reason.check(ResponseDone):
+
+ body = b''.join(self.dataBuffer)
+
+ # ---8<--- snippet from u1db.remote.http_client
+ if self.status in (200, 201):
+ self.deferred.callback(body)
+ elif self.status in http_errors.ERROR_STATUSES:
+ try:
+ respdic = json.loads(body)
+ except ValueError:
+ self.deferred.errback(
+ errors.HTTPError(self.status, body, self.headers))
+ else:
+ self._error(respdic)
+ # special cases
+ elif self.status == 503:
+ self.deferred.errback(errors.Unavailable(body, self.headers))
+ else:
+ self.deferred.errback(
+ errors.HTTPError(self.status, body, self.headers))
+ # ---8<--- end of snippet from u1db.remote.http_client
+
+ elif reason.check(PotentialDataLoss):
+ self.deferred.errback(
+ PartialDownloadError(self.status, self.message,
+ b''.join(self.dataBuffer)))
+ else:
+ self.deferred.errback(reason)
+
+
+def readBody(response, protocolClass=ReadBodyProtocol):
+ """
+ Get the body of an L{IResponse} and return it as a byte string.
+
+ This is a helper function for clients that don't want to incrementally
+ receive the body of an HTTP response.
+
+ @param response: The HTTP response for which the body will be read.
+ @type response: L{IResponse} provider
+
+ @return: A L{Deferred} which will fire with the body of the response.
+ Cancelling it will close the connection to the server immediately.
+ """
+ def cancel(deferred):
+ """
+ Cancel a L{readBody} call, close the connection to the HTTP server
+ immediately, if it is still open.
+
+ @param deferred: The cancelled L{defer.Deferred}.
+ """
+ abort = getAbort()
+ if abort is not None:
+ abort()
+
+ d = defer.Deferred(cancel)
+ protocol = protocolClass(response, d)
+
+ def getAbort():
+ return getattr(protocol.transport, 'abortConnection', None)
+
+ response.deliverBody(protocol)
+
+ if protocol.transport is not None and getAbort() is None:
+ warnings.warn(
+ 'Using readBody with a transport that does not have an '
+ 'abortConnection method',
+ category=DeprecationWarning,
+ stacklevel=2)
+
+ return d
+
+
+class RequestBody(object):
+ """
+ This class is a helper to generate send and fetch requests.
+ The expected format is something like:
+ [
+ {headers},
+ {entry1},
+ {...},
+ {entryN},
+ ]
+ """
+
+ def __init__(self, **header_dict):
+ """
+ Creates a new RequestBody holding header information.
+
+ :param header_dict: A dictionary with the headers.
+ :type header_dict: dict
+ """
+ self.headers = header_dict
+ self.entries = []
+ self.consumed = 0
+
+ def insert_info(self, **entry_dict):
+ """
+ Dumps an entry into JSON format and add it to entries list.
+ Adds 'content' key on a new line if it's present.
+
+ :param entry_dict: Entry as a dictionary
+ :type entry_dict: dict
+ """
+ content = ''
+ if 'content' in entry_dict:
+ content = ',\r\n' + (entry_dict['content'] or '')
+ entry = json.dumps(entry_dict) + content
+ self.entries.append(entry)
+
+ def pop(self, amount=10, leave_open=False):
+ """
+ Removes entries and returns it formatted and ready
+ to be sent.
+
+ :param amount: number of entries to pop and format
+ :type amount: int
+
+ :param leave_open: flag to skip stream closing
+ :type amount: bool
+
+ :return: formatted body ready to be sent
+ :rtype: str
+ """
+ start = self.consumed == 0
+ amount = min([len(self.entries), amount])
+ entries = [self.entries.pop(0) for i in xrange(amount)]
+ self.consumed += amount
+ end = len(self.entries) == 0 if not leave_open else False
+ return self.entries_to_str(entries, start, end)
+
+ def __str__(self):
+ return self.pop(len(self.entries))
+
+ def __len__(self):
+ return len(self.entries)
+
+ def entries_to_str(self, entries=None, start=True, end=True):
+ """
+ Format a list of entries into the body format expected
+ by the server.
+
+ :param entries: entries to format
+ :type entries: list
+
+ :return: formatted body ready to be sent
+ :rtype: str
+ """
+ data = ''
+ if start:
+ data = '[\r\n' + json.dumps(self.headers)
+ data += ''.join(',\r\n' + entry for entry in entries)
+ if end:
+ data += '\r\n]'
+ return data