diff options
author | Victor Shyba <victor1984@riseup.net> | 2017-07-25 05:59:12 -0300 |
---|---|---|
committer | Victor Shyba <victor1984@riseup.net> | 2017-08-03 05:33:01 -0300 |
commit | ccc13a7038a5f524b0549ab0b1a782e7cd4b45cc (patch) | |
tree | 611764185309cfc1f68d1a22c218ff8672c2df82 /src/leap/soledad | |
parent | 7e624f40b2b60a582db9bf297f00b743e3a91c96 (diff) |
[feature] integrate incoming loop into server
Adds a IncomingBox implementation that can be used by the incoming loop
to interact with the server. Includes end to end test from message
creation on Incoming API to callback consumer.
-- Related: #8914
Diffstat (limited to 'src/leap/soledad')
-rw-r--r-- | src/leap/soledad/client/incoming.py | 80 |
1 files changed, 79 insertions, 1 deletions
diff --git a/src/leap/soledad/client/incoming.py b/src/leap/soledad/client/incoming.py index c2213276..58e0885b 100644 --- a/src/leap/soledad/client/incoming.py +++ b/src/leap/soledad/client/incoming.py @@ -20,13 +20,14 @@ See: http://soledad.readthedocs.io/en/latest/incoming_box.html or docs/incoming_box.rst """ import sys +from leap.soledad.common.blobs import Flags from twisted.logger import Logger from twisted.internet.task import LoopingCall from twisted.internet import defer log = Logger() -class IncomingBoxProcessingLoop(object): +class IncomingBoxProcessingLoop: """ Implements the client-side processing flow for Incoming Box feature, maintaining a loop that fetches incoming messages from remote replica, @@ -100,3 +101,80 @@ class IncomingBoxProcessingLoop(object): yield self.incoming_box.set_failed(item_id) else: yield self.incoming_box.delete(item_id) + + +class IncomingBox: + """ + A BlobManager proxy that represents an user's Incoming Box. + It locks all queries to a namespace and deals with parameters and + implementation details as defined on specification for client/server + interactions. + """ + + def __init__(self, blob_manager, namespace): + self.blob_manager = blob_manager + self.namespace = namespace + + @defer.inlineCallbacks + def fetch_for_processing(self, blob_id): + """ + Try to flag a blob as PROCESSING, if server allows (no other replica + with it), then it gets reserved and fetched. Otherwise `None` is + returned, making the loop skip this item as another replica reserved it + already. + :param blob_id: Unique identifier of a blob. + :type blob_id: str + :return: A deferred that fires when operation finishes. + It will hold None if reservation fails or a file-like object with + the requested blob. + :rtype: Deferred + """ + try: + yield self.blob_manager.set_flags(blob_id, [Flags.PROCESSED], + namespace=self.namespace) + except: + defer.returnValue([]) + blob = yield self.blob_manager.get(blob_id, namespace=self.namespace) + defer.returnValue(blob) + + def list_pending(self): + """ + Lists blobs sorted by date (older first). + :return: A deferred that fires with the requested list. + :rtype: Deferred + """ + return self.blob_manager.remote_list(namespace=self.namespace, + order_by='+date', + filter_flag=Flags.PENDING) + + def set_processed(self, blob_id): + """ + Flag a blob as Flags.FAILED + :param blob_id: Unique identifier of a blob. + :type blob_id: str + :return: A deferred that fires when operation finishes. + :rtype: Deferred + """ + return self.blob_manager.set_flags(blob_id, [Flags.PROCESSED], + namespace=self.namespace) + + def set_failed(self, blob_id): + """ + Flag a blob as Flags.FAILED + :param blob_id: Unique identifier of a blob. + :type blob_id: str + :return: A deferred that fires when operation finishes. + :rtype: Deferred + """ + return self.blob_manager.set_flags(blob_id, [Flags.FAILED], + namespace=self.namespace) + + def delete(self, blob_id): + """ + Deletes a blob belonging to a namespace. + :param blob_id: Unique identifier of a blob. + :type blob_id: str + :return: A deferred that fires when operation finishes. + :rtype: Deferred + """ + return self.blob_manager.delete(blob_id, namespace=self.namespace) |