summaryrefslogtreecommitdiff
path: root/src/leap/mail/messageflow.py
diff options
context:
space:
mode:
authorKali Kaneko <kali@leap.se>2014-02-11 01:43:14 -0400
committerKali Kaneko <kali@leap.se>2014-02-17 11:39:48 -0400
commit4338368aa2ba0efaee742e9000e21b81af34d3db (patch)
tree3d3fff4030567a6f9a0d90682335de6c213f7d08 /src/leap/mail/messageflow.py
parentde762b5c6e529f4e668bee1ec848eb1f6380369b (diff)
separate new and dirty queues
Diffstat (limited to 'src/leap/mail/messageflow.py')
-rw-r--r--src/leap/mail/messageflow.py26
1 files changed, 20 insertions, 6 deletions
diff --git a/src/leap/mail/messageflow.py b/src/leap/mail/messageflow.py
index 80121c8..c8f224c 100644
--- a/src/leap/mail/messageflow.py
+++ b/src/leap/mail/messageflow.py
@@ -49,7 +49,7 @@ class IMessageProducer(Interface):
entities.
"""
- def push(self, item):
+ def push(self, item, state=None):
"""
Push a new item in the queue.
"""
@@ -101,6 +101,10 @@ class MessageProducer(object):
# and consumption is not likely (?) to consume huge amounts of memory in
# our current settings, so the need to pause the stream is not urgent now.
+ # TODO use enum
+ STATE_NEW = 1
+ STATE_DIRTY = 2
+
def __init__(self, consumer, queue=Queue.Queue, period=1):
"""
Initializes the MessageProducer
@@ -115,7 +119,8 @@ class MessageProducer(object):
# it should implement a `consume` method
self._consumer = consumer
- self._queue = queue()
+ self._queue_new = queue()
+ self._queue_dirty = queue()
self._period = period
self._loop = LoopingCall(self._check_for_new)
@@ -130,7 +135,7 @@ class MessageProducer(object):
If the queue is found empty, the loop is stopped. It will be started
again after the addition of new items.
"""
- self._consumer.consume(self._queue)
+ self._consumer.consume((self._queue_new, self._queue_dirty))
if self.is_queue_empty():
self.stop()
@@ -138,11 +143,13 @@ class MessageProducer(object):
"""
Return True if queue is empty, False otherwise.
"""
- return self._queue.empty()
+ new = self._queue_new
+ dirty = self._queue_dirty
+ return new.empty() and dirty.empty()
# public methods: IMessageProducer
- def push(self, item):
+ def push(self, item, state=None):
"""
Push a new item in the queue.
@@ -150,7 +157,14 @@ class MessageProducer(object):
"""
# XXX this might raise if the queue does not accept any new
# items. what to do then?
- self._queue.put(item)
+ queue = self._queue_new
+
+ if state == self.STATE_NEW:
+ queue = self._queue_new
+ if state == self.STATE_DIRTY:
+ queue = self._queue_dirty
+
+ queue.put(item)
self.start()
def start(self):