diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/leap/soledad/client/incoming.py | 80 | 
1 files changed, 79 insertions, 1 deletions
| diff --git a/src/leap/soledad/client/incoming.py b/src/leap/soledad/client/incoming.py index c2213276..58e0885b 100644 --- a/src/leap/soledad/client/incoming.py +++ b/src/leap/soledad/client/incoming.py @@ -20,13 +20,14 @@ See: http://soledad.readthedocs.io/en/latest/incoming_box.html       or docs/incoming_box.rst  """  import sys +from leap.soledad.common.blobs import Flags  from twisted.logger import Logger  from twisted.internet.task import LoopingCall  from twisted.internet import defer  log = Logger() -class IncomingBoxProcessingLoop(object): +class IncomingBoxProcessingLoop:      """      Implements the client-side processing flow for Incoming Box feature,      maintaining a loop that fetches incoming messages from remote replica, @@ -100,3 +101,80 @@ class IncomingBoxProcessingLoop(object):                  yield self.incoming_box.set_failed(item_id)              else:                  yield self.incoming_box.delete(item_id) + + +class IncomingBox: +    """ +    A BlobManager proxy that represents an user's Incoming Box. +    It locks all queries to a namespace and deals with parameters and +    implementation details as defined on specification for client/server +    interactions. +    """ + +    def __init__(self, blob_manager, namespace): +        self.blob_manager = blob_manager +        self.namespace = namespace + +    @defer.inlineCallbacks +    def fetch_for_processing(self, blob_id): +        """ +        Try to flag a blob as PROCESSING, if server allows (no other replica +        with it), then it gets reserved and fetched. Otherwise `None` is +        returned, making the loop skip this item as another replica reserved it +        already. +        :param blob_id: Unique identifier of a blob. +        :type blob_id: str +        :return: A deferred that fires when operation finishes. +            It will hold None if reservation fails or a file-like object with +            the requested blob. +        :rtype: Deferred +        """ +        try: +            yield self.blob_manager.set_flags(blob_id, [Flags.PROCESSED], +                                              namespace=self.namespace) +        except: +            defer.returnValue([]) +        blob = yield self.blob_manager.get(blob_id, namespace=self.namespace) +        defer.returnValue(blob) + +    def list_pending(self): +        """ +        Lists blobs sorted by date (older first). +        :return: A deferred that fires with the requested list. +        :rtype: Deferred +        """ +        return self.blob_manager.remote_list(namespace=self.namespace, +                                             order_by='+date', +                                             filter_flag=Flags.PENDING) + +    def set_processed(self, blob_id): +        """ +        Flag a blob as Flags.FAILED +        :param blob_id: Unique identifier of a blob. +        :type blob_id: str +        :return: A deferred that fires when operation finishes. +        :rtype: Deferred +        """ +        return self.blob_manager.set_flags(blob_id, [Flags.PROCESSED], +                                           namespace=self.namespace) + +    def set_failed(self, blob_id): +        """ +        Flag a blob as Flags.FAILED +        :param blob_id: Unique identifier of a blob. +        :type blob_id: str +        :return: A deferred that fires when operation finishes. +        :rtype: Deferred +        """ +        return self.blob_manager.set_flags(blob_id, [Flags.FAILED], +                                           namespace=self.namespace) + +    def delete(self, blob_id): +        """ +        Deletes a blob belonging to a namespace. +        :param blob_id: Unique identifier of a blob. +        :type blob_id: str +        :return: A deferred that fires when operation finishes. +        :rtype: Deferred +        """ +        return self.blob_manager.delete(blob_id, namespace=self.namespace) | 
