diff options
| -rw-r--r-- | src/leap/soledad/client/_db/blobs.py | 55 | ||||
| -rw-r--r-- | tests/blobs/test_blob_manager.py | 7 | 
2 files changed, 41 insertions, 21 deletions
diff --git a/src/leap/soledad/client/_db/blobs.py b/src/leap/soledad/client/_db/blobs.py index afc92864..73f011c2 100644 --- a/src/leap/soledad/client/_db/blobs.py +++ b/src/leap/soledad/client/_db/blobs.py @@ -327,18 +327,27 @@ class BlobManager(object):          d = self.local_list(namespace=namespace,                              sync_status=SyncStatus.PENDING_UPLOAD)          missing = yield d -        logger.info("Amount of documents missing on server: %s" % len(missing)) -        while missing: -            deferreds = [] -            for _ in range(min(self.concurrency_limit, len(missing))): -                blob_id = missing.pop() -                d = with_retry(self.__send_one, blob_id, namespace) -                deferreds.append(d) -            yield defer.gatherResults(deferreds) +        total = len(missing) +        logger.info("Will send %d blobs to server." % total) +        deferreds = [] +        semaphore = defer.DeferredSemaphore(self.concurrency_limit) + +        def release(result): +            semaphore.release() +            return result + +        for i in xrange(total): +            yield semaphore.acquire() +            blob_id = missing.pop() +            d = with_retry(self.__send_one, blob_id, namespace, i, total) +            d.addCallbacks(release, release) +            deferreds.append(d) +        yield defer.gatherResults(deferreds)      @defer.inlineCallbacks -    def __send_one(self, blob_id, namespace): -            logger.info("Upload local blob: %s" % blob_id) +    def __send_one(self, blob_id, namespace, i, total): +            logger.info("Sending blob to server (%d/%d): %s" +                        % (i, total, blob_id))              fd = yield self.local.get(blob_id, namespace=namespace)              try:                  yield self._encrypt_and_upload(blob_id, fd) @@ -365,15 +374,23 @@ class BlobManager(object):          d = self.local_list(namespace=namespace,                              sync_status=SyncStatus.PENDING_DOWNLOAD)          docs_we_want = yield d -        logger.info("Fetching new docs from server: %s" % len(docs_we_want)) -        while docs_we_want: -            deferreds = [] -            for _ in range(min(self.concurrency_limit, len(docs_we_want))): -                blob_id = docs_we_want.pop() -                logger.info("Fetching new doc: %s" % blob_id) -                d = with_retry(self.get, blob_id, namespace) -                deferreds.append(d) -            yield defer.gatherResults(deferreds) +        total = len(docs_we_want) +        logger.info("Will fetch %d blobs from server." % total) +        deferreds = [] +        semaphore = defer.DeferredSemaphore(self.concurrency_limit) + +        def release(result): +            semaphore.release() +            return result + +        for i in xrange(len(docs_we_want)): +            yield semaphore.acquire() +            blob_id = docs_we_want.pop() +            logger.info("Fetching blob (%d/%d): %s" % (i, total, blob_id)) +            d = with_retry(self.get, blob_id, namespace) +            d.addCallbacks(release, release) +            deferreds.append(d) +        yield defer.gatherResults(deferreds)      @defer.inlineCallbacks      def sync(self, namespace=''): diff --git a/tests/blobs/test_blob_manager.py b/tests/blobs/test_blob_manager.py index 81379c73..c6f84e29 100644 --- a/tests/blobs/test_blob_manager.py +++ b/tests/blobs/test_blob_manager.py @@ -19,6 +19,7 @@ Tests for BlobManager.  """  from twisted.trial import unittest  from twisted.internet import defer +from twisted.web.error import SchemeNotSupported  from leap.soledad.client._db.blobs import BlobManager, BlobDoc, FIXED_REV  from leap.soledad.client._db.blobs import BlobAlreadyExistsError  from leap.soledad.client._db.blobs import SyncStatus @@ -154,7 +155,8 @@ class BlobManagerTestCase(unittest.TestCase):          self.manager._encrypt_and_upload = Mock(return_value=upload_failure)          content, blob_id = "Blob content", uuid4().hex          doc1 = BlobDoc(BytesIO(content), blob_id) -        with pytest.raises(Exception): +        with pytest.raises(SchemeNotSupported): +            # should fail because manager URL is invalid              yield self.manager.put(doc1, len(content))          pending_upload = SyncStatus.PENDING_UPLOAD          local_list = yield self.manager.local_list(sync_status=pending_upload) @@ -166,7 +168,8 @@ class BlobManagerTestCase(unittest.TestCase):          self.manager.remote_list = Mock(return_value=[])          content, blob_id = "Blob content", uuid4().hex          doc1 = BlobDoc(BytesIO(content), blob_id) -        with pytest.raises(Exception): +        with pytest.raises(SchemeNotSupported): +            # should fail because manager URL is invalid              yield self.manager.put(doc1, len(content))          for _ in range(self.manager.max_retries + 1):              with pytest.raises(defer.FirstError):  | 
