diff options
| -rw-r--r-- | src/leap/soledad/client/_db/blobs.py | 63 | ||||
| -rw-r--r-- | tests/blobs/test_blob_manager.py | 49 | ||||
| -rw-r--r-- | tests/server/test_blobs_server.py | 7 | 
3 files changed, 86 insertions, 33 deletions
diff --git a/src/leap/soledad/client/_db/blobs.py b/src/leap/soledad/client/_db/blobs.py index 527a6418..4edb77f4 100644 --- a/src/leap/soledad/client/_db/blobs.py +++ b/src/leap/soledad/client/_db/blobs.py @@ -151,6 +151,10 @@ class RetriableTransferError(Exception):      pass +class MaximumRetriesError(Exception): +    pass + +  def sleep(seconds):      d = defer.Deferred()      reactor.callLater(seconds, d.callback, None) @@ -346,19 +350,21 @@ class BlobManager(object):      @defer.inlineCallbacks      def __send_one(self, blob_id, namespace, i, total): -            logger.info("Sending blob to server (%d/%d): %s" -                        % (i, total, blob_id)) -            fd = yield self.local.get(blob_id, namespace=namespace) -            try: -                yield self._encrypt_and_upload(blob_id, fd) -                yield self.local.update_sync_status(blob_id, SyncStatus.SYNCED) -            except Exception as e: -                yield self.local.increment_retries(blob_id) -                _, retries = yield self.local.get_sync_status(blob_id) -                if retries > self.max_retries: -                    failed_upload = SyncStatus.FAILED_UPLOAD -                    yield self.local.update_sync_status(blob_id, failed_upload) -                raise e +        logger.info("Sending blob to server (%d/%d): %s" +                    % (i, total, blob_id)) +        fd = yield self.local.get(blob_id, namespace=namespace) +        try: +            yield self._encrypt_and_upload(blob_id, fd) +            yield self.local.update_sync_status(blob_id, SyncStatus.SYNCED) +        except Exception as e: +            yield self.local.increment_retries(blob_id) +            res = yield self.local.get_sync_status(blob_id) +            _, retries = res +            if (retries + 1) > self.max_retries: +                failed_upload = SyncStatus.FAILED_UPLOAD +                yield self.local.update_sync_status(blob_id, failed_upload) +                raise MaximumRetriesError(e) +            raise e      @defer.inlineCallbacks      def fetch_missing(self, namespace=''): @@ -489,6 +495,9 @@ class BlobManager(object):              logger.info("Found blob in local database: %s" % blob_id)              defer.returnValue(local_blob) +        yield self.local.update_sync_status( +            blob_id, SyncStatus.PENDING_DOWNLOAD, namespace=namespace) +          try:              result = yield self._download_and_decrypt(blob_id, namespace)          except Exception as e: @@ -505,12 +514,14 @@ class BlobManager(object):                  logger.error(message)              yield self.local.increment_retries(blob_id) +              if (retries + 1) >= self.max_retries:                  failed_download = SyncStatus.FAILED_DOWNLOAD -                yield self.local.update_sync_status(blob_id, failed_download) -                raise e -            else: -                raise RetriableTransferError(e) +                yield self.local.update_sync_status( +                    blob_id, failed_download, namespace=namespace) +                raise MaximumRetriesError(e) + +            raise RetriableTransferError(e)          if not result:              defer.returnValue(None) @@ -684,10 +695,22 @@ class SQLiteBlobBackend(object):          else:              defer.returnValue([]) -    def update_sync_status(self, blob_id, sync_status): -        query = 'update blobs set sync_status = ? where blob_id = ?' +    @defer.inlineCallbacks +    def update_sync_status(self, blob_id, sync_status, namespace=""): +        query = 'SELECT sync_status FROM blobs WHERE blob_id = ?' +        result = yield self.dbpool.runQuery(query, (blob_id,)) + +        if not result: +            insert = 'INSERT INTO blobs' +            insert += ' (blob_id, namespace, payload, sync_status)' +            insert += ' VALUES (?, ?, zeroblob(0), ?)' +            values = (blob_id, namespace, sync_status) +            yield self.dbpool.runOperation(insert, values) +            return + +        update = 'UPDATE blobs SET sync_status = ? WHERE blob_id = ?'          values = (sync_status, blob_id,) -        return self.dbpool.runQuery(query, values) +        result = yield self.dbpool.runOperation(update, values)      def update_batch_sync_status(self, blob_id_list, sync_status,                                   namespace=''): diff --git a/tests/blobs/test_blob_manager.py b/tests/blobs/test_blob_manager.py index c6f84e29..c151d8c3 100644 --- a/tests/blobs/test_blob_manager.py +++ b/tests/blobs/test_blob_manager.py @@ -19,16 +19,20 @@ Tests for BlobManager.  """  from twisted.trial import unittest  from twisted.internet import defer -from twisted.web.error import SchemeNotSupported  from leap.soledad.client._db.blobs import BlobManager, BlobDoc, FIXED_REV  from leap.soledad.client._db.blobs import BlobAlreadyExistsError  from leap.soledad.client._db.blobs import SyncStatus +from leap.soledad.client._db.blobs import RetriableTransferError  from io import BytesIO  from mock import Mock  from uuid import uuid4  import pytest  import os +# monkey-patch the blobmanager MAX_WAIT time so tests run faster +from leap.soledad.client._db import blobs +blobs.MAX_WAIT = 1 +  class BlobManagerTestCase(unittest.TestCase): @@ -155,8 +159,7 @@ class BlobManagerTestCase(unittest.TestCase):          self.manager._encrypt_and_upload = Mock(return_value=upload_failure)          content, blob_id = "Blob content", uuid4().hex          doc1 = BlobDoc(BytesIO(content), blob_id) -        with pytest.raises(SchemeNotSupported): -            # should fail because manager URL is invalid +        with pytest.raises(Exception):              yield self.manager.put(doc1, len(content))          pending_upload = SyncStatus.PENDING_UPLOAD          local_list = yield self.manager.local_list(sync_status=pending_upload) @@ -165,18 +168,44 @@ class BlobManagerTestCase(unittest.TestCase):      @defer.inlineCallbacks      @pytest.mark.usefixtures("method_tmpdir")      def test_upload_retry_limit(self): +        # prepare the manager to fail accordingly          self.manager.remote_list = Mock(return_value=[]) +        self.manager._encrypt_and_upload = Mock( +            side_effect=RetriableTransferError) +        # put a blob in local storage          content, blob_id = "Blob content", uuid4().hex -        doc1 = BlobDoc(BytesIO(content), blob_id) -        with pytest.raises(SchemeNotSupported): -            # should fail because manager URL is invalid -            yield self.manager.put(doc1, len(content)) -        for _ in range(self.manager.max_retries + 1): -            with pytest.raises(defer.FirstError): -                yield self.manager.send_missing() +        yield self.manager.local.put(blob_id, BytesIO(content), len(content)) +        # try to send missing +        with pytest.raises(defer.FirstError): +            yield self.manager.send_missing() +        # assert failed state and number of retries          failed_upload = SyncStatus.FAILED_UPLOAD          local_list = yield self.manager.local_list(sync_status=failed_upload)          self.assertIn(blob_id, local_list) +        sync_status, retries = \ +            yield self.manager.local.get_sync_status(blob_id) +        self.assertEqual(failed_upload, sync_status) +        self.assertEqual(self.manager.max_retries, retries) + +    @defer.inlineCallbacks +    @pytest.mark.usefixtures("method_tmpdir") +    def test_download_retry_limit(self): +        # prepare the manager to fail accordingly +        blob_id = uuid4().hex +        self.manager.local_list = Mock(return_value=[blob_id]) +        self.manager._download_and_decrypt = Mock( +            side_effect=RetriableTransferError) +        # try to fetch missing +        with pytest.raises(defer.FirstError): +            yield self.manager.fetch_missing() +        # assert failed state and number of retries +        failed_download = SyncStatus.FAILED_DOWNLOAD +        local_list = yield self.manager.local.list(sync_status=failed_download) +        self.assertIn(blob_id, local_list) +        sync_status, retries = \ +            yield self.manager.local.get_sync_status(blob_id) +        self.assertEqual(failed_download, sync_status) +        self.assertEqual(self.manager.max_retries, retries)      @defer.inlineCallbacks      @pytest.mark.usefixtures("method_tmpdir") diff --git a/tests/server/test_blobs_server.py b/tests/server/test_blobs_server.py index 892ae5ff..c4a00ab5 100644 --- a/tests/server/test_blobs_server.py +++ b/tests/server/test_blobs_server.py @@ -34,8 +34,9 @@ from leap.soledad.client._db.blobs import BlobAlreadyExistsError  from leap.soledad.client._db.blobs import InvalidFlagsError  from leap.soledad.client._db.blobs import SoledadError  from leap.soledad.client._db.blobs import SyncStatus +from leap.soledad.client._db.blobs import RetriableTransferError +from leap.soledad.client._db.blobs import MaximumRetriesError  from leap.soledad.client._db import blobs as client_blobs -from leap.soledad.client._crypto import InvalidBlob  def sleep(x): @@ -333,7 +334,7 @@ class BlobServerTestCase(unittest.TestCase):              # Corrupt the tag (last 16 bytes)              corrupted_blob.seek(-16, 2)              corrupted_blob.write('x' * 16) -        with pytest.raises(InvalidBlob): +        with pytest.raises(MaximumRetriesError):              yield manager.sync()          status, retries = yield manager.local.get_sync_status(blob_id)          self.assertEquals(status, SyncStatus.FAILED_DOWNLOAD) @@ -370,7 +371,7 @@ class BlobServerTestCase(unittest.TestCase):          manager = BlobManager(self.tempdir, self.uri, self.secret,                                self.secret, uuid4().hex)          self.addCleanup(manager.close) -        with pytest.raises(SoledadError): +        with pytest.raises(RetriableTransferError):              yield manager.get('missing_id')      @defer.inlineCallbacks  | 
