From 1bce0175b98f94c784181b02a4e17d4c14c732c5 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 20 Dec 2017 13:58:21 -0300 Subject: [feature] add a producer protocol for upstream -- Resolves: #8809 --- src/leap/soledad/client/_db/blobs/__init__.py | 19 ++--- .../soledad/client/_db/blobs/upstream_producer.py | 89 ++++++++++++++++++++++ 2 files changed, 95 insertions(+), 13 deletions(-) create mode 100644 src/leap/soledad/client/_db/blobs/upstream_producer.py diff --git a/src/leap/soledad/client/_db/blobs/__init__.py b/src/leap/soledad/client/_db/blobs/__init__.py index a0b48035..8d469760 100644 --- a/src/leap/soledad/client/_db/blobs/__init__.py +++ b/src/leap/soledad/client/_db/blobs/__init__.py @@ -50,6 +50,7 @@ from .sql import SyncStatus from .sql import Priority from .sql import SQLiteBlobBackend from .sync import BlobsSynchronizer +from .upstream_producer import BlobsUpstreamProducer from .errors import ( BlobAlreadyExistsError, MaximumRetriesError, RetriableTransferError, BlobNotFoundError, InvalidFlagsError) @@ -498,25 +499,17 @@ class BlobManager(BlobsSynchronizer): @defer.inlineCallbacks def _upstream(self, blobs_id_list, namespace=''): + local, secret = self.local, self.secret uri = urljoin(self.remote_stream, self.user) params = {'namespace': namespace} if namespace else None sizes = yield self.local.get_size_list(blobs_id_list, namespace) convert = get_unarmored_ciphertext_size - sizes = map(lambda (x, y): (x, convert(y)), sizes) - data = BytesIO() # TODO: stream from db - data.write(json.dumps(sizes) + '\n') - for blob_id in blobs_id_list: - blob_fd = yield self.local.get(blob_id, namespace=namespace) - doc_info = DocInfo(blob_id, FIXED_REV) - crypter = BlobEncryptor(doc_info, blob_fd, secret=self.secret, - armor=False) - fd = yield crypter.encrypt() - data.write(fd.read()) - data.seek(0) + sizes = map(lambda (blob_id, size): (blob_id, convert(size)), sizes) + producer = BlobsUpstreamProducer(local, sizes, namespace, secret) params = {'namespace': namespace} if namespace else {} params['direction'] = 'upload' - response = yield self._client.post(uri, data=data, params=params) - check_http_status(response.code, blob_id) + response = yield self._client.post(uri, data=producer, params=params) + check_http_status(response.code, 'stream') logger.info("Finished stream up: %s" % (blobs_id_list,)) @defer.inlineCallbacks diff --git a/src/leap/soledad/client/_db/blobs/upstream_producer.py b/src/leap/soledad/client/_db/blobs/upstream_producer.py new file mode 100644 index 00000000..5c8f0530 --- /dev/null +++ b/src/leap/soledad/client/_db/blobs/upstream_producer.py @@ -0,0 +1,89 @@ +# -*- coding: utf-8 -*- +# upstream_producer.py +# Copyright (C) 2017 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +import json +from zope.interface import implementer +from twisted.internet import defer +from twisted.internet import reactor +from twisted.web.iweb import IBodyProducer +from twisted.web.iweb import UNKNOWN_LENGTH +from leap.soledad.client._crypto import DocInfo +from leap.soledad.client._crypto import BlobEncryptor + + +FIXED_REV = 'ImmutableRevision' # Blob content is immutable + + +@implementer(IBodyProducer) +class BlobsUpstreamProducer(object): + """ + Blob producer for upload streams. + """ + + def __init__(self, database, blobs_lengths, namespace, secret): + """ + Initialize the upload streamer. + + :param database: Local blobs SQLCipher backend instance + :type database: .sql.SQLiteBlobBackend + :param blobs_lengths: List of blobs with ids and sizes + :type blobs_lengths: [(blob_id:str, size:int)] + :param namespace: Namespace which this stream belongs + :type namespace: str + :param secret: The secret used to encrypt blobs. + :type secret: str + """ + self.blobs_lengths = blobs_lengths + self.db = database + self.length = UNKNOWN_LENGTH + self.pause = False + self.stop = False + self.namespace = namespace + self.secret = secret + + @defer.inlineCallbacks + def startProducing(self, consumer): + """ + Write blobs to the consumer. + + :param consumer: Any IConsumer provider. + :type consumer: twisted.internet.interfaces.IConsumer + + :return: A Deferred that fires when production ends. + :rtype: twisted.internet.defer.Deferred + """ + consumer.write(json.dumps(self.blobs_lengths) + '\n') + for blob_id, _ in self.blobs_lengths: + blob_fd = yield self.db.get(blob_id, namespace=self.namespace) + doc_info = DocInfo(blob_id, FIXED_REV) + crypter = BlobEncryptor(doc_info, blob_fd, secret=self.secret, + armor=False) + fd = yield crypter.encrypt() + consumer.write(fd.read()) + + def sleep(self, secs): + d = defer.Deferred() + reactor.callLater(secs, d.callback, None) + return d + + def pauseProducing(self): + self.pause = True + + def stopProducing(self): + self.stop = True + + def resumeProducing(self): + self.pause = False -- cgit v1.2.3