diff options
| -rw-r--r-- | src/leap/soledad/client/incoming.py | 80 | ||||
| -rw-r--r-- | testing/tests/server/test_incoming_flow_integration.py | 97 | 
2 files changed, 176 insertions, 1 deletions
| diff --git a/src/leap/soledad/client/incoming.py b/src/leap/soledad/client/incoming.py index c2213276..58e0885b 100644 --- a/src/leap/soledad/client/incoming.py +++ b/src/leap/soledad/client/incoming.py @@ -20,13 +20,14 @@ See: http://soledad.readthedocs.io/en/latest/incoming_box.html       or docs/incoming_box.rst  """  import sys +from leap.soledad.common.blobs import Flags  from twisted.logger import Logger  from twisted.internet.task import LoopingCall  from twisted.internet import defer  log = Logger() -class IncomingBoxProcessingLoop(object): +class IncomingBoxProcessingLoop:      """      Implements the client-side processing flow for Incoming Box feature,      maintaining a loop that fetches incoming messages from remote replica, @@ -100,3 +101,80 @@ class IncomingBoxProcessingLoop(object):                  yield self.incoming_box.set_failed(item_id)              else:                  yield self.incoming_box.delete(item_id) + + +class IncomingBox: +    """ +    A BlobManager proxy that represents an user's Incoming Box. +    It locks all queries to a namespace and deals with parameters and +    implementation details as defined on specification for client/server +    interactions. +    """ + +    def __init__(self, blob_manager, namespace): +        self.blob_manager = blob_manager +        self.namespace = namespace + +    @defer.inlineCallbacks +    def fetch_for_processing(self, blob_id): +        """ +        Try to flag a blob as PROCESSING, if server allows (no other replica +        with it), then it gets reserved and fetched. Otherwise `None` is +        returned, making the loop skip this item as another replica reserved it +        already. +        :param blob_id: Unique identifier of a blob. +        :type blob_id: str +        :return: A deferred that fires when operation finishes. +            It will hold None if reservation fails or a file-like object with +            the requested blob. +        :rtype: Deferred +        """ +        try: +            yield self.blob_manager.set_flags(blob_id, [Flags.PROCESSED], +                                              namespace=self.namespace) +        except: +            defer.returnValue([]) +        blob = yield self.blob_manager.get(blob_id, namespace=self.namespace) +        defer.returnValue(blob) + +    def list_pending(self): +        """ +        Lists blobs sorted by date (older first). +        :return: A deferred that fires with the requested list. +        :rtype: Deferred +        """ +        return self.blob_manager.remote_list(namespace=self.namespace, +                                             order_by='+date', +                                             filter_flag=Flags.PENDING) + +    def set_processed(self, blob_id): +        """ +        Flag a blob as Flags.FAILED +        :param blob_id: Unique identifier of a blob. +        :type blob_id: str +        :return: A deferred that fires when operation finishes. +        :rtype: Deferred +        """ +        return self.blob_manager.set_flags(blob_id, [Flags.PROCESSED], +                                           namespace=self.namespace) + +    def set_failed(self, blob_id): +        """ +        Flag a blob as Flags.FAILED +        :param blob_id: Unique identifier of a blob. +        :type blob_id: str +        :return: A deferred that fires when operation finishes. +        :rtype: Deferred +        """ +        return self.blob_manager.set_flags(blob_id, [Flags.FAILED], +                                           namespace=self.namespace) + +    def delete(self, blob_id): +        """ +        Deletes a blob belonging to a namespace. +        :param blob_id: Unique identifier of a blob. +        :type blob_id: str +        :return: A deferred that fires when operation finishes. +        :rtype: Deferred +        """ +        return self.blob_manager.delete(blob_id, namespace=self.namespace) diff --git a/testing/tests/server/test_incoming_flow_integration.py b/testing/tests/server/test_incoming_flow_integration.py new file mode 100644 index 00000000..b492534f --- /dev/null +++ b/testing/tests/server/test_incoming_flow_integration.py @@ -0,0 +1,97 @@ +# -*- coding: utf-8 -*- +# test_incoming_flow_integration.py +# Copyright (C) 2017 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Integration tests for the complete flow of IncomingBox feature +""" +import pytest +from uuid import uuid4 +from twisted.trial import unittest +from twisted.web.server import Site +from twisted.internet import reactor +from twisted.internet import defer +from twisted.web.resource import Resource +from zope.interface import implementer + +from leap.soledad.client.incoming import IncomingBoxProcessingLoop +from leap.soledad.client.incoming import IncomingBox +from leap.soledad.server import _blobs as server_blobs +from leap.soledad.client._db.blobs import BlobManager +from leap.soledad.server._incoming import IncomingResource +from leap.soledad.server._blobs import BlobsServerState +from leap.soledad.client import interfaces + + +@implementer(interfaces.IIncomingBoxConsumer) +class GoodConsumer(object): +    def __init__(self): +        self.name = 'GoodConsumer' +        self.processed, self.saved = [], [] + +    def process(self, item, item_id, encrypted=True): +        self.processed.append(item_id) +        return defer.succeed([item_id]) + +    def save(self, parts, item_id): +        self.saved.append(item_id) +        return defer.succeed(None) + + +class IncomingFlowIntegrationTestCase(unittest.TestCase): + +    def setUp(self): +        root = Resource() +        state = BlobsServerState('filesystem', blobs_path=self.tempdir) +        incoming_resource = IncomingResource(state) +        blobs_resource = server_blobs.BlobsResource("filesystem", self.tempdir) +        root.putChild('blobs', blobs_resource) +        root.putChild('incoming', incoming_resource) +        site = Site(root) +        self.port = reactor.listenTCP(0, site, interface='127.0.0.1') +        self.host = self.port.getHost() +        self.uri = 'http://%s:%s/' % (self.host.host, self.host.port) +        self.blobs_uri = self.uri + 'blobs/' +        self.incoming_uri = self.uri + 'incoming' +        self.user_id = 'user-' + uuid4().hex +        self.secret = 'A' * 96 +        self.blob_manager = BlobManager(self.tempdir, self.blobs_uri, +                                        self.secret, self.secret, +                                        self.user_id) +        self.box = IncomingBox(self.blob_manager, 'MX') +        self.loop = IncomingBoxProcessingLoop(self.box) +        # FIXME: We use blob_manager client only to avoid DelayedCalls +        # Somehow treq being used here keeps a connection pool open +        self.client = self.blob_manager._client + +    def fill(self, messages): +        deferreds = [] +        for message_id, message in messages: +            uri = '%s/%s/%s' % (self.incoming_uri, self.user_id, message_id) +            deferreds.append(self.blob_manager._client.put(uri, data=message)) +        return defer.gatherResults(deferreds) + +    def tearDown(self): +        self.port.stopListening() +        self.blob_manager.close() + +    @defer.inlineCallbacks +    @pytest.mark.usefixtures("method_tmpdir") +    def test_consume_a_incoming_message(self): +        yield self.fill([('msg1', 'blob')]) +        consumer = GoodConsumer() +        self.loop.add_consumer(consumer) +        yield self.loop() +        self.assertIn('msg1', consumer.processed) | 
