summaryrefslogtreecommitdiff
path: root/src/leap/mail/messageflow.py
blob: 21f6d62a69f60470dcc3c1047a299a1eb7458fec (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# -*- coding: utf-8 -*-
# messageflow.py
# Copyright (C) 2013 LEAP
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
"""
Message Producers and Consumers for flow control.
"""
import Queue

from twisted.internet.task import LoopingCall

from zope.interface import Interface, implements


class IMessageConsumer(Interface):

    def consume(self, item):
        """
        Consumes the passed item.

        :param item: an object to be consumed.
        :type item: object
        """
        # TODO we could add an optional type to be passed
        # for doing type check.

        # TODO in case of errors, we could return the object to
        # the queue, maybe wrapped in an object with a retries attribute.


class DummyMsgConsumer(object):

    implements(IMessageConsumer)

    def consume(self, item):
        """
        Just prints the passed item.
        """
        print "got item %s" % item


class MessageProducer(object):
    """
    A Producer class that we can use to temporarily buffer the production
    of messages so that different objects can consume them.

    This is useful for serializing the consumption of the messages stream
    in the case of an slow resource (db), or for returning early from a
    deferred chain and leave further processing detached from the calling loop,
    as in the case of smtp.
    """
    # TODO this can be seen as a first step towards properly implementing
    # components that implement IPushProducer / IConsumer  interfaces.
    # However, I need to think more about how to pause the streaming.
    # In any case, the differential rate between message production
    # and consumption is not likely (?) to consume huge amounts of memory in
    # our current settings, so the need to pause the stream is not urgent now.

    def __init__(self, consumer, queue=Queue.Queue, period=1):
        """
        Initializes the MessageProducer

        :param consumer: an instance of a IMessageConsumer that will consume
                         the new messages.
        :param queue: any queue implementation to be used as the temporary
                      buffer for new items. Default is a FIFO Queue.
        :param period: the period to check for new items, in seconds.
        """
        # XXX should assert it implements IConsumer / IMailConsumer
        # it should implement a `consume` method
        self._consumer = consumer

        self._queue = queue()
        self._period = period

        self._loop = LoopingCall(self._check_for_new)

    # private methods

    def _check_for_new(self):
        """
        Checks for new items in the internal queue, and calls the consume
        method in the consumer.

        If the queue is found empty, the loop is stopped. It will be started
        again after the addition of new items.
        """
        # XXX right now I'm assuming that the period is good enough to allow
        # a right pace of processing. but we could also pass the queue object
        # to the consumer and let it choose whether process a new item or not.

        if self._queue.empty():
            self.stop()
        else:
            self._consumer.consume(self._queue.get())

    # public methods

    def put(self, item):
        """
        Puts a new item in the queue.

        If the queue was empty, we will start the loop again.
        """
        was_empty = self._queue.empty()

        # XXX this might raise if the queue does not accept any new
        # items. what to do then?
        self._queue.put(item)
        if was_empty:
            self.start()

    def start(self):
        """
        Starts polling for new items.
        """
        if not self._loop.running:
            self._loop.start(self._period)

    def stop(self):
        """
        Stop polling for new items.
        """
        if self._loop.running:
            self._loop.stop()


if __name__ == "__main__":
    from twisted.internet import reactor
    producer = MessageProducer(DummyMsgConsumer())
    producer.start()

    for delay, item in ((2, 1), (3, 2), (4, 3),
                        (6, 4), (7, 5), (8, 6), (8.2, 7),
                        (15, 'a'), (16, 'b'), (17, 'c')):
        reactor.callLater(delay, producer.put, item)
    reactor.run()