summaryrefslogtreecommitdiff
path: root/src/leap/soledad
diff options
context:
space:
mode:
authorVictor Shyba <victor1984@riseup.net>2017-07-25 05:59:12 -0300
committerVictor Shyba <victor1984@riseup.net>2017-08-03 05:33:01 -0300
commitccc13a7038a5f524b0549ab0b1a782e7cd4b45cc (patch)
tree611764185309cfc1f68d1a22c218ff8672c2df82 /src/leap/soledad
parent7e624f40b2b60a582db9bf297f00b743e3a91c96 (diff)
[feature] integrate incoming loop into server
Adds a IncomingBox implementation that can be used by the incoming loop to interact with the server. Includes end to end test from message creation on Incoming API to callback consumer. -- Related: #8914
Diffstat (limited to 'src/leap/soledad')
-rw-r--r--src/leap/soledad/client/incoming.py80
1 files changed, 79 insertions, 1 deletions
diff --git a/src/leap/soledad/client/incoming.py b/src/leap/soledad/client/incoming.py
index c2213276..58e0885b 100644
--- a/src/leap/soledad/client/incoming.py
+++ b/src/leap/soledad/client/incoming.py
@@ -20,13 +20,14 @@ See: http://soledad.readthedocs.io/en/latest/incoming_box.html
or docs/incoming_box.rst
"""
import sys
+from leap.soledad.common.blobs import Flags
from twisted.logger import Logger
from twisted.internet.task import LoopingCall
from twisted.internet import defer
log = Logger()
-class IncomingBoxProcessingLoop(object):
+class IncomingBoxProcessingLoop:
"""
Implements the client-side processing flow for Incoming Box feature,
maintaining a loop that fetches incoming messages from remote replica,
@@ -100,3 +101,80 @@ class IncomingBoxProcessingLoop(object):
yield self.incoming_box.set_failed(item_id)
else:
yield self.incoming_box.delete(item_id)
+
+
+class IncomingBox:
+ """
+ A BlobManager proxy that represents an user's Incoming Box.
+ It locks all queries to a namespace and deals with parameters and
+ implementation details as defined on specification for client/server
+ interactions.
+ """
+
+ def __init__(self, blob_manager, namespace):
+ self.blob_manager = blob_manager
+ self.namespace = namespace
+
+ @defer.inlineCallbacks
+ def fetch_for_processing(self, blob_id):
+ """
+ Try to flag a blob as PROCESSING, if server allows (no other replica
+ with it), then it gets reserved and fetched. Otherwise `None` is
+ returned, making the loop skip this item as another replica reserved it
+ already.
+ :param blob_id: Unique identifier of a blob.
+ :type blob_id: str
+ :return: A deferred that fires when operation finishes.
+ It will hold None if reservation fails or a file-like object with
+ the requested blob.
+ :rtype: Deferred
+ """
+ try:
+ yield self.blob_manager.set_flags(blob_id, [Flags.PROCESSED],
+ namespace=self.namespace)
+ except:
+ defer.returnValue([])
+ blob = yield self.blob_manager.get(blob_id, namespace=self.namespace)
+ defer.returnValue(blob)
+
+ def list_pending(self):
+ """
+ Lists blobs sorted by date (older first).
+ :return: A deferred that fires with the requested list.
+ :rtype: Deferred
+ """
+ return self.blob_manager.remote_list(namespace=self.namespace,
+ order_by='+date',
+ filter_flag=Flags.PENDING)
+
+ def set_processed(self, blob_id):
+ """
+ Flag a blob as Flags.FAILED
+ :param blob_id: Unique identifier of a blob.
+ :type blob_id: str
+ :return: A deferred that fires when operation finishes.
+ :rtype: Deferred
+ """
+ return self.blob_manager.set_flags(blob_id, [Flags.PROCESSED],
+ namespace=self.namespace)
+
+ def set_failed(self, blob_id):
+ """
+ Flag a blob as Flags.FAILED
+ :param blob_id: Unique identifier of a blob.
+ :type blob_id: str
+ :return: A deferred that fires when operation finishes.
+ :rtype: Deferred
+ """
+ return self.blob_manager.set_flags(blob_id, [Flags.FAILED],
+ namespace=self.namespace)
+
+ def delete(self, blob_id):
+ """
+ Deletes a blob belonging to a namespace.
+ :param blob_id: Unique identifier of a blob.
+ :type blob_id: str
+ :return: A deferred that fires when operation finishes.
+ :rtype: Deferred
+ """
+ return self.blob_manager.delete(blob_id, namespace=self.namespace)