diff options
Diffstat (limited to 'src/leap/soledad/client/_db')
-rw-r--r-- | src/leap/soledad/client/_db/blobs.py | 63 |
1 files changed, 43 insertions, 20 deletions
diff --git a/src/leap/soledad/client/_db/blobs.py b/src/leap/soledad/client/_db/blobs.py index 527a6418..4edb77f4 100644 --- a/src/leap/soledad/client/_db/blobs.py +++ b/src/leap/soledad/client/_db/blobs.py @@ -151,6 +151,10 @@ class RetriableTransferError(Exception): pass +class MaximumRetriesError(Exception): + pass + + def sleep(seconds): d = defer.Deferred() reactor.callLater(seconds, d.callback, None) @@ -346,19 +350,21 @@ class BlobManager(object): @defer.inlineCallbacks def __send_one(self, blob_id, namespace, i, total): - logger.info("Sending blob to server (%d/%d): %s" - % (i, total, blob_id)) - fd = yield self.local.get(blob_id, namespace=namespace) - try: - yield self._encrypt_and_upload(blob_id, fd) - yield self.local.update_sync_status(blob_id, SyncStatus.SYNCED) - except Exception as e: - yield self.local.increment_retries(blob_id) - _, retries = yield self.local.get_sync_status(blob_id) - if retries > self.max_retries: - failed_upload = SyncStatus.FAILED_UPLOAD - yield self.local.update_sync_status(blob_id, failed_upload) - raise e + logger.info("Sending blob to server (%d/%d): %s" + % (i, total, blob_id)) + fd = yield self.local.get(blob_id, namespace=namespace) + try: + yield self._encrypt_and_upload(blob_id, fd) + yield self.local.update_sync_status(blob_id, SyncStatus.SYNCED) + except Exception as e: + yield self.local.increment_retries(blob_id) + res = yield self.local.get_sync_status(blob_id) + _, retries = res + if (retries + 1) > self.max_retries: + failed_upload = SyncStatus.FAILED_UPLOAD + yield self.local.update_sync_status(blob_id, failed_upload) + raise MaximumRetriesError(e) + raise e @defer.inlineCallbacks def fetch_missing(self, namespace=''): @@ -489,6 +495,9 @@ class BlobManager(object): logger.info("Found blob in local database: %s" % blob_id) defer.returnValue(local_blob) + yield self.local.update_sync_status( + blob_id, SyncStatus.PENDING_DOWNLOAD, namespace=namespace) + try: result = yield self._download_and_decrypt(blob_id, namespace) except Exception as e: @@ -505,12 +514,14 @@ class BlobManager(object): logger.error(message) yield self.local.increment_retries(blob_id) + if (retries + 1) >= self.max_retries: failed_download = SyncStatus.FAILED_DOWNLOAD - yield self.local.update_sync_status(blob_id, failed_download) - raise e - else: - raise RetriableTransferError(e) + yield self.local.update_sync_status( + blob_id, failed_download, namespace=namespace) + raise MaximumRetriesError(e) + + raise RetriableTransferError(e) if not result: defer.returnValue(None) @@ -684,10 +695,22 @@ class SQLiteBlobBackend(object): else: defer.returnValue([]) - def update_sync_status(self, blob_id, sync_status): - query = 'update blobs set sync_status = ? where blob_id = ?' + @defer.inlineCallbacks + def update_sync_status(self, blob_id, sync_status, namespace=""): + query = 'SELECT sync_status FROM blobs WHERE blob_id = ?' + result = yield self.dbpool.runQuery(query, (blob_id,)) + + if not result: + insert = 'INSERT INTO blobs' + insert += ' (blob_id, namespace, payload, sync_status)' + insert += ' VALUES (?, ?, zeroblob(0), ?)' + values = (blob_id, namespace, sync_status) + yield self.dbpool.runOperation(insert, values) + return + + update = 'UPDATE blobs SET sync_status = ? WHERE blob_id = ?' values = (sync_status, blob_id,) - return self.dbpool.runQuery(query, values) + result = yield self.dbpool.runOperation(update, values) def update_batch_sync_status(self, blob_id_list, sync_status, namespace=''): |