summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/leap/soledad/client/_db/blobs.py14
-rw-r--r--tests/server/test_blobs_server.py28
2 files changed, 39 insertions, 3 deletions
diff --git a/src/leap/soledad/client/_db/blobs.py b/src/leap/soledad/client/_db/blobs.py
index 76746c71..05802dab 100644
--- a/src/leap/soledad/client/_db/blobs.py
+++ b/src/leap/soledad/client/_db/blobs.py
@@ -31,6 +31,7 @@ from functools import partial
from twisted.logger import Logger
from twisted.enterprise import adbapi
from twisted.internet import defer
+from twisted.internet import error
import treq
@@ -338,10 +339,21 @@ class BlobManager(object):
for _ in range(min(self.concurrency_limit, len(docs_we_want))):
blob_id = docs_we_want.pop()
logger.info("Fetching new doc: %s" % blob_id)
- deferreds.append(self.get(blob_id, namespace))
+ d = self.__with_retry(self.get, blob_id, namespace)
+ deferreds.append(d)
yield defer.gatherResults(deferreds)
@defer.inlineCallbacks
+ def __with_retry(self, func, *args, **kwargs):
+ retries, max_retries = 0, 300
+ while retries < max_retries:
+ try:
+ yield func(*args, **kwargs)
+ break
+ except(error.ConnectError, error.ConnectionClosed):
+ retries += 1
+
+ @defer.inlineCallbacks
def sync(self, namespace=''):
yield self.refresh_sync_status_from_server(namespace)
yield self.fetch_missing(namespace)
diff --git a/tests/server/test_blobs_server.py b/tests/server/test_blobs_server.py
index 7e8533fe..f27fa985 100644
--- a/tests/server/test_blobs_server.py
+++ b/tests/server/test_blobs_server.py
@@ -39,8 +39,8 @@ class BlobServerTestCase(unittest.TestCase):
def setUp(self):
root = server_blobs.BlobsResource("filesystem", self.tempdir)
- site = Site(root)
- self.port = reactor.listenTCP(0, site, interface='127.0.0.1')
+ self.site = Site(root)
+ self.port = reactor.listenTCP(0, self.site, interface='127.0.0.1')
self.host = self.port.getHost()
self.uri = 'http://%s:%s/' % (self.host.host, self.host.port)
self.secret = 'A' * 96
@@ -270,6 +270,30 @@ class BlobServerTestCase(unittest.TestCase):
@defer.inlineCallbacks
@pytest.mark.usefixtures("method_tmpdir")
+ def test_sync_fetch_missing_retry(self):
+ manager = BlobManager(self.tempdir, self.uri, self.secret,
+ self.secret, uuid4().hex)
+ self.addCleanup(manager.close)
+ blob_id = 'remote_only_blob_id'
+ yield manager._encrypt_and_upload(blob_id, BytesIO("X"))
+ yield manager.refresh_sync_status_from_server()
+ yield self.port.stopListening()
+
+ def sleep(x):
+ d = defer.Deferred()
+ reactor.callLater(x, d.callback, None)
+ return d
+ d = manager.fetch_missing()
+ yield sleep(1)
+ self.port = reactor.listenTCP(
+ self.host.port, self.site, interface='127.0.0.1')
+ yield d
+ result = yield manager.local.get(blob_id)
+ self.assertIsNotNone(result)
+ self.assertEquals(result.getvalue(), "X")
+
+ @defer.inlineCallbacks
+ @pytest.mark.usefixtures("method_tmpdir")
def test_upload_then_delete_updates_list(self):
manager = BlobManager('', self.uri, self.secret,
self.secret, uuid4().hex)