diff options
author | Tomás Touceda <chiiph@leap.se> | 2014-02-17 12:45:19 -0300 |
---|---|---|
committer | Tomás Touceda <chiiph@leap.se> | 2014-02-17 12:45:19 -0300 |
commit | 32e3c5ddaa5df30a573762e273f3a12f7eb3c255 (patch) | |
tree | 0924be918d990628e73eb2059fb6eb1200748b7c /src/leap/mail/messageflow.py | |
parent | 7828c517ae162de4676a71e05f77339598acd6f7 (diff) | |
parent | 985ff0a78a8df0eafb7789383f711b9e5ceb1cb6 (diff) |
Merge remote-tracking branch 'refs/remotes/kali/bug/separate_deferreds_threads' into develop
Diffstat (limited to 'src/leap/mail/messageflow.py')
-rw-r--r-- | src/leap/mail/messageflow.py | 26 |
1 files changed, 20 insertions, 6 deletions
diff --git a/src/leap/mail/messageflow.py b/src/leap/mail/messageflow.py index 80121c8..c8f224c 100644 --- a/src/leap/mail/messageflow.py +++ b/src/leap/mail/messageflow.py @@ -49,7 +49,7 @@ class IMessageProducer(Interface): entities. """ - def push(self, item): + def push(self, item, state=None): """ Push a new item in the queue. """ @@ -101,6 +101,10 @@ class MessageProducer(object): # and consumption is not likely (?) to consume huge amounts of memory in # our current settings, so the need to pause the stream is not urgent now. + # TODO use enum + STATE_NEW = 1 + STATE_DIRTY = 2 + def __init__(self, consumer, queue=Queue.Queue, period=1): """ Initializes the MessageProducer @@ -115,7 +119,8 @@ class MessageProducer(object): # it should implement a `consume` method self._consumer = consumer - self._queue = queue() + self._queue_new = queue() + self._queue_dirty = queue() self._period = period self._loop = LoopingCall(self._check_for_new) @@ -130,7 +135,7 @@ class MessageProducer(object): If the queue is found empty, the loop is stopped. It will be started again after the addition of new items. """ - self._consumer.consume(self._queue) + self._consumer.consume((self._queue_new, self._queue_dirty)) if self.is_queue_empty(): self.stop() @@ -138,11 +143,13 @@ class MessageProducer(object): """ Return True if queue is empty, False otherwise. """ - return self._queue.empty() + new = self._queue_new + dirty = self._queue_dirty + return new.empty() and dirty.empty() # public methods: IMessageProducer - def push(self, item): + def push(self, item, state=None): """ Push a new item in the queue. @@ -150,7 +157,14 @@ class MessageProducer(object): """ # XXX this might raise if the queue does not accept any new # items. what to do then? - self._queue.put(item) + queue = self._queue_new + + if state == self.STATE_NEW: + queue = self._queue_new + if state == self.STATE_DIRTY: + queue = self._queue_dirty + + queue.put(item) self.start() def start(self): |