summaryrefslogtreecommitdiff
path: root/src/leap/soledad/client/_db/blobs
diff options
context:
space:
mode:
Diffstat (limited to 'src/leap/soledad/client/_db/blobs')
-rw-r--r--src/leap/soledad/client/_db/blobs/__init__.py19
-rw-r--r--src/leap/soledad/client/_db/blobs/upstream_producer.py89
2 files changed, 95 insertions, 13 deletions
diff --git a/src/leap/soledad/client/_db/blobs/__init__.py b/src/leap/soledad/client/_db/blobs/__init__.py
index a0b48035..8d469760 100644
--- a/src/leap/soledad/client/_db/blobs/__init__.py
+++ b/src/leap/soledad/client/_db/blobs/__init__.py
@@ -50,6 +50,7 @@ from .sql import SyncStatus
from .sql import Priority
from .sql import SQLiteBlobBackend
from .sync import BlobsSynchronizer
+from .upstream_producer import BlobsUpstreamProducer
from .errors import (
BlobAlreadyExistsError, MaximumRetriesError,
RetriableTransferError, BlobNotFoundError, InvalidFlagsError)
@@ -498,25 +499,17 @@ class BlobManager(BlobsSynchronizer):
@defer.inlineCallbacks
def _upstream(self, blobs_id_list, namespace=''):
+ local, secret = self.local, self.secret
uri = urljoin(self.remote_stream, self.user)
params = {'namespace': namespace} if namespace else None
sizes = yield self.local.get_size_list(blobs_id_list, namespace)
convert = get_unarmored_ciphertext_size
- sizes = map(lambda (x, y): (x, convert(y)), sizes)
- data = BytesIO() # TODO: stream from db
- data.write(json.dumps(sizes) + '\n')
- for blob_id in blobs_id_list:
- blob_fd = yield self.local.get(blob_id, namespace=namespace)
- doc_info = DocInfo(blob_id, FIXED_REV)
- crypter = BlobEncryptor(doc_info, blob_fd, secret=self.secret,
- armor=False)
- fd = yield crypter.encrypt()
- data.write(fd.read())
- data.seek(0)
+ sizes = map(lambda (blob_id, size): (blob_id, convert(size)), sizes)
+ producer = BlobsUpstreamProducer(local, sizes, namespace, secret)
params = {'namespace': namespace} if namespace else {}
params['direction'] = 'upload'
- response = yield self._client.post(uri, data=data, params=params)
- check_http_status(response.code, blob_id)
+ response = yield self._client.post(uri, data=producer, params=params)
+ check_http_status(response.code, 'stream')
logger.info("Finished stream up: %s" % (blobs_id_list,))
@defer.inlineCallbacks
diff --git a/src/leap/soledad/client/_db/blobs/upstream_producer.py b/src/leap/soledad/client/_db/blobs/upstream_producer.py
new file mode 100644
index 00000000..5c8f0530
--- /dev/null
+++ b/src/leap/soledad/client/_db/blobs/upstream_producer.py
@@ -0,0 +1,89 @@
+# -*- coding: utf-8 -*-
+# upstream_producer.py
+# Copyright (C) 2017 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import json
+from zope.interface import implementer
+from twisted.internet import defer
+from twisted.internet import reactor
+from twisted.web.iweb import IBodyProducer
+from twisted.web.iweb import UNKNOWN_LENGTH
+from leap.soledad.client._crypto import DocInfo
+from leap.soledad.client._crypto import BlobEncryptor
+
+
+FIXED_REV = 'ImmutableRevision' # Blob content is immutable
+
+
+@implementer(IBodyProducer)
+class BlobsUpstreamProducer(object):
+ """
+ Blob producer for upload streams.
+ """
+
+ def __init__(self, database, blobs_lengths, namespace, secret):
+ """
+ Initialize the upload streamer.
+
+ :param database: Local blobs SQLCipher backend instance
+ :type database: .sql.SQLiteBlobBackend
+ :param blobs_lengths: List of blobs with ids and sizes
+ :type blobs_lengths: [(blob_id:str, size:int)]
+ :param namespace: Namespace which this stream belongs
+ :type namespace: str
+ :param secret: The secret used to encrypt blobs.
+ :type secret: str
+ """
+ self.blobs_lengths = blobs_lengths
+ self.db = database
+ self.length = UNKNOWN_LENGTH
+ self.pause = False
+ self.stop = False
+ self.namespace = namespace
+ self.secret = secret
+
+ @defer.inlineCallbacks
+ def startProducing(self, consumer):
+ """
+ Write blobs to the consumer.
+
+ :param consumer: Any IConsumer provider.
+ :type consumer: twisted.internet.interfaces.IConsumer
+
+ :return: A Deferred that fires when production ends.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ consumer.write(json.dumps(self.blobs_lengths) + '\n')
+ for blob_id, _ in self.blobs_lengths:
+ blob_fd = yield self.db.get(blob_id, namespace=self.namespace)
+ doc_info = DocInfo(blob_id, FIXED_REV)
+ crypter = BlobEncryptor(doc_info, blob_fd, secret=self.secret,
+ armor=False)
+ fd = yield crypter.encrypt()
+ consumer.write(fd.read())
+
+ def sleep(self, secs):
+ d = defer.Deferred()
+ reactor.callLater(secs, d.callback, None)
+ return d
+
+ def pauseProducing(self):
+ self.pause = True
+
+ def stopProducing(self):
+ self.stop = True
+
+ def resumeProducing(self):
+ self.pause = False