diff options
-rw-r--r-- | src/leap/soledad/client/_db/blobs.py | 14 | ||||
-rw-r--r-- | tests/server/test_blobs_server.py | 28 |
2 files changed, 39 insertions, 3 deletions
diff --git a/src/leap/soledad/client/_db/blobs.py b/src/leap/soledad/client/_db/blobs.py index 76746c71..05802dab 100644 --- a/src/leap/soledad/client/_db/blobs.py +++ b/src/leap/soledad/client/_db/blobs.py @@ -31,6 +31,7 @@ from functools import partial from twisted.logger import Logger from twisted.enterprise import adbapi from twisted.internet import defer +from twisted.internet import error import treq @@ -338,10 +339,21 @@ class BlobManager(object): for _ in range(min(self.concurrency_limit, len(docs_we_want))): blob_id = docs_we_want.pop() logger.info("Fetching new doc: %s" % blob_id) - deferreds.append(self.get(blob_id, namespace)) + d = self.__with_retry(self.get, blob_id, namespace) + deferreds.append(d) yield defer.gatherResults(deferreds) @defer.inlineCallbacks + def __with_retry(self, func, *args, **kwargs): + retries, max_retries = 0, 300 + while retries < max_retries: + try: + yield func(*args, **kwargs) + break + except(error.ConnectError, error.ConnectionClosed): + retries += 1 + + @defer.inlineCallbacks def sync(self, namespace=''): yield self.refresh_sync_status_from_server(namespace) yield self.fetch_missing(namespace) diff --git a/tests/server/test_blobs_server.py b/tests/server/test_blobs_server.py index 7e8533fe..f27fa985 100644 --- a/tests/server/test_blobs_server.py +++ b/tests/server/test_blobs_server.py @@ -39,8 +39,8 @@ class BlobServerTestCase(unittest.TestCase): def setUp(self): root = server_blobs.BlobsResource("filesystem", self.tempdir) - site = Site(root) - self.port = reactor.listenTCP(0, site, interface='127.0.0.1') + self.site = Site(root) + self.port = reactor.listenTCP(0, self.site, interface='127.0.0.1') self.host = self.port.getHost() self.uri = 'http://%s:%s/' % (self.host.host, self.host.port) self.secret = 'A' * 96 @@ -270,6 +270,30 @@ class BlobServerTestCase(unittest.TestCase): @defer.inlineCallbacks @pytest.mark.usefixtures("method_tmpdir") + def test_sync_fetch_missing_retry(self): + manager = BlobManager(self.tempdir, self.uri, self.secret, + self.secret, uuid4().hex) + self.addCleanup(manager.close) + blob_id = 'remote_only_blob_id' + yield manager._encrypt_and_upload(blob_id, BytesIO("X")) + yield manager.refresh_sync_status_from_server() + yield self.port.stopListening() + + def sleep(x): + d = defer.Deferred() + reactor.callLater(x, d.callback, None) + return d + d = manager.fetch_missing() + yield sleep(1) + self.port = reactor.listenTCP( + self.host.port, self.site, interface='127.0.0.1') + yield d + result = yield manager.local.get(blob_id) + self.assertIsNotNone(result) + self.assertEquals(result.getvalue(), "X") + + @defer.inlineCallbacks + @pytest.mark.usefixtures("method_tmpdir") def test_upload_then_delete_updates_list(self): manager = BlobManager('', self.uri, self.secret, self.secret, uuid4().hex) |