1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
|
# -*- coding: utf-8 -*-
# messageflow.py
# Copyright (C) 2013 LEAP
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
Message Producers and Consumers for flow control.
"""
import Queue
from twisted.internet.task import LoopingCall
from zope.interface import Interface, implements
class IMessageConsumer(Interface):
"""
I consume messages from a queue.
"""
def consume(self, queue):
"""
Consumes the passed item.
:param item: a queue where we put the object to be consumed.
:type item: object
"""
# TODO we could add an optional type to be passed
# for doing type check.
# TODO in case of errors, we could return the object to
# the queue, maybe wrapped in an object with a retries attribute.
class IMessageProducer(Interface):
"""
I produce messages and put them in a store to be consumed by other
entities.
"""
def push(self, item):
"""
Push a new item in the queue.
"""
def start(self):
"""
Start producing items.
"""
def stop(self):
"""
Stop producing items.
"""
class DummyMsgConsumer(object):
implements(IMessageConsumer)
def consume(self, queue):
"""
Just prints the passed item.
"""
if not queue.empty():
print "got item %s" % queue.get()
class MessageProducer(object):
"""
A Producer class that we can use to temporarily buffer the production
of messages so that different objects can consume them.
This is useful for serializing the consumption of the messages stream
in the case of an slow resource (db), or for returning early from a
deferred chain and leave further processing detached from the calling loop,
as in the case of smtp.
"""
implements(IMessageProducer)
# TODO this can be seen as a first step towards properly implementing
# components that implement IPushProducer / IConsumer interfaces.
# However, I need to think more about how to pause the streaming.
# In any case, the differential rate between message production
# and consumption is not likely (?) to consume huge amounts of memory in
# our current settings, so the need to pause the stream is not urgent now.
def __init__(self, consumer, queue=Queue.Queue, period=1):
"""
Initializes the MessageProducer
:param consumer: an instance of a IMessageConsumer that will consume
the new messages.
:param queue: any queue implementation to be used as the temporary
buffer for new items. Default is a FIFO Queue.
:param period: the period to check for new items, in seconds.
"""
# XXX should assert it implements IConsumer / IMailConsumer
# it should implement a `consume` method
self._consumer = consumer
self._queue = queue()
self._period = period
self._loop = LoopingCall(self._check_for_new)
# private methods
def _check_for_new(self):
"""
Check for new items in the internal queue, and calls the consume
method in the consumer.
If the queue is found empty, the loop is stopped. It will be started
again after the addition of new items.
"""
self._consumer.consume(self._queue)
if self._queue.empty():
self.stop()
# public methods: IMessageProducer
def push(self, item):
"""
Push a new item in the queue.
If the queue was empty, we will start the loop again.
"""
# XXX this might raise if the queue does not accept any new
# items. what to do then?
self._queue.put(item)
self.start()
def start(self):
"""
Start polling for new items.
"""
if not self._loop.running:
self._loop.start(self._period, now=True)
def stop(self):
"""
Stop polling for new items.
"""
if self._loop.running:
self._loop.stop()
if __name__ == "__main__":
from twisted.internet import reactor
producer = MessageProducer(DummyMsgConsumer())
producer.start()
for delay, item in ((2, 1), (3, 2), (4, 3),
(6, 4), (7, 5), (8, 6), (8.2, 7),
(15, 'a'), (16, 'b'), (17, 'c')):
reactor.callLater(delay, producer.put, item)
reactor.run()
|