diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/leap/soledad/client/_db/blobs.py | 63 | 
1 files changed, 43 insertions, 20 deletions
| diff --git a/src/leap/soledad/client/_db/blobs.py b/src/leap/soledad/client/_db/blobs.py index 527a6418..4edb77f4 100644 --- a/src/leap/soledad/client/_db/blobs.py +++ b/src/leap/soledad/client/_db/blobs.py @@ -151,6 +151,10 @@ class RetriableTransferError(Exception):      pass +class MaximumRetriesError(Exception): +    pass + +  def sleep(seconds):      d = defer.Deferred()      reactor.callLater(seconds, d.callback, None) @@ -346,19 +350,21 @@ class BlobManager(object):      @defer.inlineCallbacks      def __send_one(self, blob_id, namespace, i, total): -            logger.info("Sending blob to server (%d/%d): %s" -                        % (i, total, blob_id)) -            fd = yield self.local.get(blob_id, namespace=namespace) -            try: -                yield self._encrypt_and_upload(blob_id, fd) -                yield self.local.update_sync_status(blob_id, SyncStatus.SYNCED) -            except Exception as e: -                yield self.local.increment_retries(blob_id) -                _, retries = yield self.local.get_sync_status(blob_id) -                if retries > self.max_retries: -                    failed_upload = SyncStatus.FAILED_UPLOAD -                    yield self.local.update_sync_status(blob_id, failed_upload) -                raise e +        logger.info("Sending blob to server (%d/%d): %s" +                    % (i, total, blob_id)) +        fd = yield self.local.get(blob_id, namespace=namespace) +        try: +            yield self._encrypt_and_upload(blob_id, fd) +            yield self.local.update_sync_status(blob_id, SyncStatus.SYNCED) +        except Exception as e: +            yield self.local.increment_retries(blob_id) +            res = yield self.local.get_sync_status(blob_id) +            _, retries = res +            if (retries + 1) > self.max_retries: +                failed_upload = SyncStatus.FAILED_UPLOAD +                yield self.local.update_sync_status(blob_id, failed_upload) +                raise MaximumRetriesError(e) +            raise e      @defer.inlineCallbacks      def fetch_missing(self, namespace=''): @@ -489,6 +495,9 @@ class BlobManager(object):              logger.info("Found blob in local database: %s" % blob_id)              defer.returnValue(local_blob) +        yield self.local.update_sync_status( +            blob_id, SyncStatus.PENDING_DOWNLOAD, namespace=namespace) +          try:              result = yield self._download_and_decrypt(blob_id, namespace)          except Exception as e: @@ -505,12 +514,14 @@ class BlobManager(object):                  logger.error(message)              yield self.local.increment_retries(blob_id) +              if (retries + 1) >= self.max_retries:                  failed_download = SyncStatus.FAILED_DOWNLOAD -                yield self.local.update_sync_status(blob_id, failed_download) -                raise e -            else: -                raise RetriableTransferError(e) +                yield self.local.update_sync_status( +                    blob_id, failed_download, namespace=namespace) +                raise MaximumRetriesError(e) + +            raise RetriableTransferError(e)          if not result:              defer.returnValue(None) @@ -684,10 +695,22 @@ class SQLiteBlobBackend(object):          else:              defer.returnValue([]) -    def update_sync_status(self, blob_id, sync_status): -        query = 'update blobs set sync_status = ? where blob_id = ?' +    @defer.inlineCallbacks +    def update_sync_status(self, blob_id, sync_status, namespace=""): +        query = 'SELECT sync_status FROM blobs WHERE blob_id = ?' +        result = yield self.dbpool.runQuery(query, (blob_id,)) + +        if not result: +            insert = 'INSERT INTO blobs' +            insert += ' (blob_id, namespace, payload, sync_status)' +            insert += ' VALUES (?, ?, zeroblob(0), ?)' +            values = (blob_id, namespace, sync_status) +            yield self.dbpool.runOperation(insert, values) +            return + +        update = 'UPDATE blobs SET sync_status = ? WHERE blob_id = ?'          values = (sync_status, blob_id,) -        return self.dbpool.runQuery(query, values) +        result = yield self.dbpool.runOperation(update, values)      def update_batch_sync_status(self, blob_id_list, sync_status,                                   namespace=''): | 
