summaryrefslogtreecommitdiff
path: root/src/leap/mail/messageflow.py
diff options
context:
space:
mode:
authorTomás Touceda <chiiph@leap.se>2014-02-17 12:45:19 -0300
committerTomás Touceda <chiiph@leap.se>2014-02-17 12:45:19 -0300
commit32e3c5ddaa5df30a573762e273f3a12f7eb3c255 (patch)
tree0924be918d990628e73eb2059fb6eb1200748b7c /src/leap/mail/messageflow.py
parent7828c517ae162de4676a71e05f77339598acd6f7 (diff)
parent985ff0a78a8df0eafb7789383f711b9e5ceb1cb6 (diff)
Merge remote-tracking branch 'refs/remotes/kali/bug/separate_deferreds_threads' into develop
Diffstat (limited to 'src/leap/mail/messageflow.py')
-rw-r--r--src/leap/mail/messageflow.py26
1 files changed, 20 insertions, 6 deletions
diff --git a/src/leap/mail/messageflow.py b/src/leap/mail/messageflow.py
index 80121c8..c8f224c 100644
--- a/src/leap/mail/messageflow.py
+++ b/src/leap/mail/messageflow.py
@@ -49,7 +49,7 @@ class IMessageProducer(Interface):
entities.
"""
- def push(self, item):
+ def push(self, item, state=None):
"""
Push a new item in the queue.
"""
@@ -101,6 +101,10 @@ class MessageProducer(object):
# and consumption is not likely (?) to consume huge amounts of memory in
# our current settings, so the need to pause the stream is not urgent now.
+ # TODO use enum
+ STATE_NEW = 1
+ STATE_DIRTY = 2
+
def __init__(self, consumer, queue=Queue.Queue, period=1):
"""
Initializes the MessageProducer
@@ -115,7 +119,8 @@ class MessageProducer(object):
# it should implement a `consume` method
self._consumer = consumer
- self._queue = queue()
+ self._queue_new = queue()
+ self._queue_dirty = queue()
self._period = period
self._loop = LoopingCall(self._check_for_new)
@@ -130,7 +135,7 @@ class MessageProducer(object):
If the queue is found empty, the loop is stopped. It will be started
again after the addition of new items.
"""
- self._consumer.consume(self._queue)
+ self._consumer.consume((self._queue_new, self._queue_dirty))
if self.is_queue_empty():
self.stop()
@@ -138,11 +143,13 @@ class MessageProducer(object):
"""
Return True if queue is empty, False otherwise.
"""
- return self._queue.empty()
+ new = self._queue_new
+ dirty = self._queue_dirty
+ return new.empty() and dirty.empty()
# public methods: IMessageProducer
- def push(self, item):
+ def push(self, item, state=None):
"""
Push a new item in the queue.
@@ -150,7 +157,14 @@ class MessageProducer(object):
"""
# XXX this might raise if the queue does not accept any new
# items. what to do then?
- self._queue.put(item)
+ queue = self._queue_new
+
+ if state == self.STATE_NEW:
+ queue = self._queue_new
+ if state == self.STATE_DIRTY:
+ queue = self._queue_dirty
+
+ queue.put(item)
self.start()
def start(self):