diff options
author | Victor Shyba <victor1984@riseup.net> | 2017-07-21 05:05:59 -0300 |
---|---|---|
committer | Victor Shyba <victor1984@riseup.net> | 2017-08-03 05:33:01 -0300 |
commit | 8fc244cb0946f46ba9aff019b70568d08a226b29 (patch) | |
tree | 33c521813ca35f7caf7a44c98e20465c67a7e04c | |
parent | 97fbe66401d1de0f667df6bf3c09506bdce8c7c5 (diff) |
[feature] incoming processing flow
This class implements a process flow between the ones defined at #8881
and #8874. It uses a LoopingCall to keep running in a loop and accepts
IIncomingBoxConsumers defined by soledad users, such as leap mail.
- Resolves: #8874
-rw-r--r-- | src/leap/soledad/client/incoming.py | 102 | ||||
-rw-r--r-- | testing/tests/client/test_incoming_processing_flow.py | 189 |
2 files changed, 291 insertions, 0 deletions
diff --git a/src/leap/soledad/client/incoming.py b/src/leap/soledad/client/incoming.py new file mode 100644 index 00000000..b029e5b8 --- /dev/null +++ b/src/leap/soledad/client/incoming.py @@ -0,0 +1,102 @@ +# -*- coding: utf-8 -*- +# incoming.py +# Copyright (C) 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Code to interact with Incoming Box feature. +See: http://soledad.readthedocs.io/en/latest/incoming_box.html + or docs/incoming_box.rst +""" +import sys +from twisted.logger import Logger +from twisted.internet.task import LoopingCall +from twisted.internet import defer +log = Logger() + + +class IncomingBoxProcessingLoop(object): + """ + Implements the client-side processing flow for Incoming Box feature, + maintaining a loop that fetches incoming messages from remote replica, + delivers it to consumers and changes flags as necessary. This is defined at + "Processing Incoming Messages" section on Incoming Box Specifications: + http://soledad.readthedocs.io/en/latest/incoming_box.html + """ + + def __init__(self, incoming_box): + self.incoming_box = incoming_box + self.consumers = [] + self._loop = LoopingCall(self._process) + + def __call__(self): + return self._loop() + + @property + def running(self): + return self._loop.running + + def start(self, interval=30): + """ + Starts the inner LoopingCall, triggering the loop. + :param interval: Time between interactions in seconds. + :type interval: int + """ + return self._loop.start(interval) + + def stop(self): + """ + Stops the inner LoopingCall, stopping the loop. + """ + return self._loop.stop() + + def add_consumer(self, consumer): + """ + Adds a consumer to the consumers list, so it can be used to process and + persist incoming box items. + :param consumer: Consumer implementation + :type consumer: leap.soledad.client.interfaces.IIncomingBoxConsumer + """ + self.consumers.append(consumer) + + @defer.inlineCallbacks + def _process(self): + pending = yield self.incoming_box.list_pending() + for item_id in pending: + item = yield self.incoming_box.fetch_for_processing(item_id) + if not item: + log.warn("Couldn't reserve item %s for processing, skipping.") + continue + failed = False + for consumer in self.consumers: + try: + parts = yield consumer.process(item, item_id=item_id) + except: + msg = "Consumer %s failed to process item %s: %s" + msg %= (consumer.name, item_id, sys.exc_info()[0]) + log.error(msg) + failed = True + continue + yield self.incoming_box.set_processed(item_id) + try: + yield consumer.save(parts, item_id=item_id) + except: + msg = "Consumer %s failed to save item %s: %s" + msg %= (consumer.name, item_id, sys.exc_info()[0]) + log.error(msg) + failed = True + if failed: + yield self.incoming_box.set_failed(item_id) + else: + yield self.incoming_box.delete(item_id) diff --git a/testing/tests/client/test_incoming_processing_flow.py b/testing/tests/client/test_incoming_processing_flow.py new file mode 100644 index 00000000..d2c8bffa --- /dev/null +++ b/testing/tests/client/test_incoming_processing_flow.py @@ -0,0 +1,189 @@ +# -*- coding: utf-8 -*- +# test_incoming_server.py +# Copyright (C) 2017 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Unit tests for incoming box processing flow. +""" +from mock import Mock, call +from leap.soledad.client import interfaces +from leap.soledad.client.incoming import IncomingBoxProcessingLoop +from twisted.internet import defer +from twisted.trial import unittest +from zope.interface import implementer + + +@implementer(interfaces.IIncomingBoxConsumer) +class GoodConsumer(object): + def __init__(self): + self.name = 'GoodConsumer' + self.processed, self.saved = [], [] + + def process(self, item, item_id, encrypted=True): + self.processed.append(item_id) + return defer.succeed([item_id]) + + def save(self, parts, item_id): + self.saved.append(item_id) + return defer.succeed(None) + + +class ProcessingFailedConsumer(GoodConsumer): + def __init__(self): + self.name = 'ProcessingFailedConsumer' + self.processed, self.saved = [], [] + + def process(self, item, item_id, encrypted=True): + return defer.fail() + + +class SavingFailedConsumer(GoodConsumer): + def __init__(self): + self.name = 'ProcessingFailedConsumer' + self.processed, self.saved = [], [] + + def save(self, parts, item_id): + return defer.fail() + + +class IncomingBoxProcessingTestCase(unittest.TestCase): + + def setUp(self): + self.box = Mock() + self.loop = IncomingBoxProcessingLoop(self.box) + + def _set_pending_items(self, pending): + self.box.list_pending.return_value = defer.succeed(pending) + pending_iter = iter([defer.succeed(item) for item in pending]) + self.box.fetch_for_processing.side_effect = pending_iter + + @defer.inlineCallbacks + def test_processing_flow_reserves_a_message(self): + self._set_pending_items(['one_item']) + self.loop.add_consumer(GoodConsumer()) + yield self.loop() + self.box.fetch_for_processing.assert_called_once_with('one_item') + + @defer.inlineCallbacks + def test_pending_list_with_multiple_items(self): + items = ['one', 'two', 'three'] + self._set_pending_items(items) + consumer = GoodConsumer() + self.loop.add_consumer(consumer) + yield self.loop() + calls = [call('one'), call('two'), call('three')] + self.box.fetch_for_processing.assert_has_calls(calls) + + @defer.inlineCallbacks + def test_good_consumer_process_all(self): + items = ['one', 'two', 'three'] + self._set_pending_items(items) + consumer = GoodConsumer() + self.loop.add_consumer(consumer) + yield self.loop() + self.assertEquals(items, consumer.processed) + + @defer.inlineCallbacks + def test_good_consumer_saves_all(self): + items = ['one', 'two', 'three'] + self._set_pending_items(items) + consumer = GoodConsumer() + self.loop.add_consumer(consumer) + yield self.loop() + self.assertEquals(items, consumer.saved) + + @defer.inlineCallbacks + def test_multiple_good_consumers_process_all(self): + items = ['one', 'two', 'three'] + self._set_pending_items(items) + consumer = GoodConsumer() + consumer2 = GoodConsumer() + self.loop.add_consumer(consumer) + self.loop.add_consumer(consumer2) + yield self.loop() + self.assertEquals(items, consumer.processed) + self.assertEquals(items, consumer2.processed) + + @defer.inlineCallbacks + def test_good_consumer_marks_as_processed(self): + items = ['one', 'two', 'three'] + self._set_pending_items(items) + consumer = GoodConsumer() + self.loop.add_consumer(consumer) + yield self.loop() + self.box.set_processed.has_calls([call(x) for x in items]) + + @defer.inlineCallbacks + def test_good_consumer_deletes_items(self): + items = ['one', 'two', 'three'] + self._set_pending_items(items) + consumer = GoodConsumer() + self.loop.add_consumer(consumer) + yield self.loop() + self.box.delete.has_calls([call(x) for x in items]) + + @defer.inlineCallbacks + def test_processing_failed_doesnt_mark_as_processed(self): + items = ['one', 'two', 'three'] + self._set_pending_items(items) + consumer = ProcessingFailedConsumer() + self.loop.add_consumer(consumer) + yield self.loop() + self.box.set_processed.assert_not_called() + + @defer.inlineCallbacks + def test_processing_failed_doesnt_delete(self): + items = ['one', 'two', 'three'] + self._set_pending_items(items) + consumer = ProcessingFailedConsumer() + self.loop.add_consumer(consumer) + yield self.loop() + self.box.delete.assert_not_called() + + @defer.inlineCallbacks + def test_processing_failed_marks_as_failed(self): + items = ['one', 'two', 'three'] + self._set_pending_items(items) + consumer = ProcessingFailedConsumer() + self.loop.add_consumer(consumer) + yield self.loop() + self.box.set_failed.assert_has_calls([call(x) for x in items]) + + @defer.inlineCallbacks + def test_saving_failed_marks_as_processed(self): + items = ['one', 'two', 'three'] + self._set_pending_items(items) + consumer = SavingFailedConsumer() + self.loop.add_consumer(consumer) + yield self.loop() + self.box.set_processed.assert_has_calls([call(x) for x in items]) + + @defer.inlineCallbacks + def test_saving_failed_doesnt_delete(self): + items = ['one', 'two', 'three'] + self._set_pending_items(items) + consumer = SavingFailedConsumer() + self.loop.add_consumer(consumer) + yield self.loop() + self.box.delete.assert_not_called() + + @defer.inlineCallbacks + def test_saving_failed_marks_as_failed(self): + items = ['one', 'two', 'three'] + self._set_pending_items(items) + consumer = SavingFailedConsumer() + self.loop.add_consumer(consumer) + yield self.loop() + self.box.set_failed.assert_has_calls([call(x) for x in items]) |