summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/leap/soledad/client/_db/blobs.py44
1 files changed, 37 insertions, 7 deletions
diff --git a/src/leap/soledad/client/_db/blobs.py b/src/leap/soledad/client/_db/blobs.py
index d25a5ca6..bbb21b7c 100644
--- a/src/leap/soledad/client/_db/blobs.py
+++ b/src/leap/soledad/client/_db/blobs.py
@@ -41,6 +41,7 @@ from leap.common.files import mkdir_p
from .._document import BlobDoc
from .._crypto import DocInfo
+from .._crypto import InvalidBlob
from .._crypto import BlobEncryptor
from .._crypto import BlobDecryptor
from .._crypto import EncryptionSchemeNotImplementedException
@@ -146,22 +147,31 @@ def check_http_status(code, blob_id=None, flags=None):
raise SoledadError("Server Error: %s" % code)
+class RetriableTransferException(Exception):
+ pass
+
+
def sleep(seconds):
d = defer.Deferred()
reactor.callLater(seconds, d.callback, None)
return d
+MAX_WAIT = 60 # In seconds. Max time between retries
+
+
@defer.inlineCallbacks
def with_retry(func, *args, **kwargs):
- retry_wait, max_wait = 1, 60
+ retry_wait = 1
+ retriable_errors = (error.ConnectError, error.ConnectionClosed,
+ RetriableTransferException,)
while True:
try:
yield func(*args, **kwargs)
break
- except(error.ConnectError, error.ConnectionClosed):
+ except retriable_errors:
yield sleep(retry_wait)
- retry_wait = min(retry_wait + 10, max_wait)
+ retry_wait = min(retry_wait + 10, MAX_WAIT)
class DecrypterBuffer(object):
@@ -365,9 +375,12 @@ class BlobManager(object):
@defer.inlineCallbacks
def sync(self, namespace=''):
- yield self.refresh_sync_status_from_server(namespace)
- yield self.fetch_missing(namespace)
- yield self.send_missing(namespace)
+ try:
+ yield self.refresh_sync_status_from_server(namespace)
+ yield self.fetch_missing(namespace)
+ yield self.send_missing(namespace)
+ except defer.FirstError, e:
+ e.subFailure.raiseException()
@defer.inlineCallbacks
def put(self, doc, size, namespace=''):
@@ -515,7 +528,24 @@ class BlobManager(object):
# incrementally collect the body of the response
yield treq.collect(response, buf.write)
- fd, size = buf.close()
+ try:
+ fd, size = buf.close()
+ except InvalidBlob, e:
+ _, retries = yield self.local.get_sync_status(blob_id)
+ message = "Corrupted blob received from server! ID: %s\n"
+ message += "Retries: %s - Attempts left: %s\n"
+ message += "There is a chance of tampering. If this problem "
+ message += "persists, please check your connection then report to "
+ message += "your provider sysadmin and submit a bug report."
+ message = message % (blob_id, retries, 3 - retries)
+ logger.error(message)
+ yield self.local.increment_retries(blob_id)
+ if (retries + 1) >= 3:
+ failed_download = SyncStatus.FAILED_DOWNLOAD
+ yield self.local.update_sync_status(blob_id, failed_download)
+ raise e
+ else:
+ raise RetriableTransferException()
logger.info("Finished download: (%s, %d)" % (blob_id, size))
defer.returnValue((fd, size))