summaryrefslogtreecommitdiff
path: root/src/leap/soledad/client/_db/blobs.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/leap/soledad/client/_db/blobs.py')
-rw-r--r--src/leap/soledad/client/_db/blobs.py63
1 files changed, 43 insertions, 20 deletions
diff --git a/src/leap/soledad/client/_db/blobs.py b/src/leap/soledad/client/_db/blobs.py
index 527a6418..4edb77f4 100644
--- a/src/leap/soledad/client/_db/blobs.py
+++ b/src/leap/soledad/client/_db/blobs.py
@@ -151,6 +151,10 @@ class RetriableTransferError(Exception):
pass
+class MaximumRetriesError(Exception):
+ pass
+
+
def sleep(seconds):
d = defer.Deferred()
reactor.callLater(seconds, d.callback, None)
@@ -346,19 +350,21 @@ class BlobManager(object):
@defer.inlineCallbacks
def __send_one(self, blob_id, namespace, i, total):
- logger.info("Sending blob to server (%d/%d): %s"
- % (i, total, blob_id))
- fd = yield self.local.get(blob_id, namespace=namespace)
- try:
- yield self._encrypt_and_upload(blob_id, fd)
- yield self.local.update_sync_status(blob_id, SyncStatus.SYNCED)
- except Exception as e:
- yield self.local.increment_retries(blob_id)
- _, retries = yield self.local.get_sync_status(blob_id)
- if retries > self.max_retries:
- failed_upload = SyncStatus.FAILED_UPLOAD
- yield self.local.update_sync_status(blob_id, failed_upload)
- raise e
+ logger.info("Sending blob to server (%d/%d): %s"
+ % (i, total, blob_id))
+ fd = yield self.local.get(blob_id, namespace=namespace)
+ try:
+ yield self._encrypt_and_upload(blob_id, fd)
+ yield self.local.update_sync_status(blob_id, SyncStatus.SYNCED)
+ except Exception as e:
+ yield self.local.increment_retries(blob_id)
+ res = yield self.local.get_sync_status(blob_id)
+ _, retries = res
+ if (retries + 1) > self.max_retries:
+ failed_upload = SyncStatus.FAILED_UPLOAD
+ yield self.local.update_sync_status(blob_id, failed_upload)
+ raise MaximumRetriesError(e)
+ raise e
@defer.inlineCallbacks
def fetch_missing(self, namespace=''):
@@ -489,6 +495,9 @@ class BlobManager(object):
logger.info("Found blob in local database: %s" % blob_id)
defer.returnValue(local_blob)
+ yield self.local.update_sync_status(
+ blob_id, SyncStatus.PENDING_DOWNLOAD, namespace=namespace)
+
try:
result = yield self._download_and_decrypt(blob_id, namespace)
except Exception as e:
@@ -505,12 +514,14 @@ class BlobManager(object):
logger.error(message)
yield self.local.increment_retries(blob_id)
+
if (retries + 1) >= self.max_retries:
failed_download = SyncStatus.FAILED_DOWNLOAD
- yield self.local.update_sync_status(blob_id, failed_download)
- raise e
- else:
- raise RetriableTransferError(e)
+ yield self.local.update_sync_status(
+ blob_id, failed_download, namespace=namespace)
+ raise MaximumRetriesError(e)
+
+ raise RetriableTransferError(e)
if not result:
defer.returnValue(None)
@@ -684,10 +695,22 @@ class SQLiteBlobBackend(object):
else:
defer.returnValue([])
- def update_sync_status(self, blob_id, sync_status):
- query = 'update blobs set sync_status = ? where blob_id = ?'
+ @defer.inlineCallbacks
+ def update_sync_status(self, blob_id, sync_status, namespace=""):
+ query = 'SELECT sync_status FROM blobs WHERE blob_id = ?'
+ result = yield self.dbpool.runQuery(query, (blob_id,))
+
+ if not result:
+ insert = 'INSERT INTO blobs'
+ insert += ' (blob_id, namespace, payload, sync_status)'
+ insert += ' VALUES (?, ?, zeroblob(0), ?)'
+ values = (blob_id, namespace, sync_status)
+ yield self.dbpool.runOperation(insert, values)
+ return
+
+ update = 'UPDATE blobs SET sync_status = ? WHERE blob_id = ?'
values = (sync_status, blob_id,)
- return self.dbpool.runQuery(query, values)
+ result = yield self.dbpool.runOperation(update, values)
def update_batch_sync_status(self, blob_id_list, sync_status,
namespace=''):