From ccc13a7038a5f524b0549ab0b1a782e7cd4b45cc Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 25 Jul 2017 05:59:12 -0300 Subject: [feature] integrate incoming loop into server Adds a IncomingBox implementation that can be used by the incoming loop to interact with the server. Includes end to end test from message creation on Incoming API to callback consumer. -- Related: #8914 --- src/leap/soledad/client/incoming.py | 80 ++++++++++++++++++++++++++++++++++++- 1 file changed, 79 insertions(+), 1 deletion(-) (limited to 'src') diff --git a/src/leap/soledad/client/incoming.py b/src/leap/soledad/client/incoming.py index c2213276..58e0885b 100644 --- a/src/leap/soledad/client/incoming.py +++ b/src/leap/soledad/client/incoming.py @@ -20,13 +20,14 @@ See: http://soledad.readthedocs.io/en/latest/incoming_box.html or docs/incoming_box.rst """ import sys +from leap.soledad.common.blobs import Flags from twisted.logger import Logger from twisted.internet.task import LoopingCall from twisted.internet import defer log = Logger() -class IncomingBoxProcessingLoop(object): +class IncomingBoxProcessingLoop: """ Implements the client-side processing flow for Incoming Box feature, maintaining a loop that fetches incoming messages from remote replica, @@ -100,3 +101,80 @@ class IncomingBoxProcessingLoop(object): yield self.incoming_box.set_failed(item_id) else: yield self.incoming_box.delete(item_id) + + +class IncomingBox: + """ + A BlobManager proxy that represents an user's Incoming Box. + It locks all queries to a namespace and deals with parameters and + implementation details as defined on specification for client/server + interactions. + """ + + def __init__(self, blob_manager, namespace): + self.blob_manager = blob_manager + self.namespace = namespace + + @defer.inlineCallbacks + def fetch_for_processing(self, blob_id): + """ + Try to flag a blob as PROCESSING, if server allows (no other replica + with it), then it gets reserved and fetched. Otherwise `None` is + returned, making the loop skip this item as another replica reserved it + already. + :param blob_id: Unique identifier of a blob. + :type blob_id: str + :return: A deferred that fires when operation finishes. + It will hold None if reservation fails or a file-like object with + the requested blob. + :rtype: Deferred + """ + try: + yield self.blob_manager.set_flags(blob_id, [Flags.PROCESSED], + namespace=self.namespace) + except: + defer.returnValue([]) + blob = yield self.blob_manager.get(blob_id, namespace=self.namespace) + defer.returnValue(blob) + + def list_pending(self): + """ + Lists blobs sorted by date (older first). + :return: A deferred that fires with the requested list. + :rtype: Deferred + """ + return self.blob_manager.remote_list(namespace=self.namespace, + order_by='+date', + filter_flag=Flags.PENDING) + + def set_processed(self, blob_id): + """ + Flag a blob as Flags.FAILED + :param blob_id: Unique identifier of a blob. + :type blob_id: str + :return: A deferred that fires when operation finishes. + :rtype: Deferred + """ + return self.blob_manager.set_flags(blob_id, [Flags.PROCESSED], + namespace=self.namespace) + + def set_failed(self, blob_id): + """ + Flag a blob as Flags.FAILED + :param blob_id: Unique identifier of a blob. + :type blob_id: str + :return: A deferred that fires when operation finishes. + :rtype: Deferred + """ + return self.blob_manager.set_flags(blob_id, [Flags.FAILED], + namespace=self.namespace) + + def delete(self, blob_id): + """ + Deletes a blob belonging to a namespace. + :param blob_id: Unique identifier of a blob. + :type blob_id: str + :return: A deferred that fires when operation finishes. + :rtype: Deferred + """ + return self.blob_manager.delete(blob_id, namespace=self.namespace) -- cgit v1.2.3