diff options
author | Kali Kaneko <kali@leap.se> | 2014-02-11 01:43:14 -0400 |
---|---|---|
committer | Kali Kaneko <kali@leap.se> | 2014-02-17 11:39:48 -0400 |
commit | d6c352a72766a17df9d3804f58890b876370bc93 (patch) | |
tree | d883fbb45e5e4c7f5b30bb5fa6e49888d4879b66 | |
parent | 5bba9574dd0a8906178a928e4b7e8f1877a75a12 (diff) |
separate new and dirty queues
-rw-r--r-- | mail/src/leap/mail/imap/memorystore.py | 80 | ||||
-rw-r--r-- | mail/src/leap/mail/imap/messageparts.py | 25 | ||||
-rw-r--r-- | mail/src/leap/mail/imap/soledadstore.py | 20 | ||||
-rw-r--r-- | mail/src/leap/mail/messageflow.py | 26 |
4 files changed, 102 insertions, 49 deletions
diff --git a/mail/src/leap/mail/imap/memorystore.py b/mail/src/leap/mail/imap/memorystore.py index 786a9c4..a053f3f 100644 --- a/mail/src/leap/mail/imap/memorystore.py +++ b/mail/src/leap/mail/imap/memorystore.py @@ -24,7 +24,6 @@ import weakref from collections import defaultdict from copy import copy -from itertools import chain from twisted.internet import defer from twisted.internet.task import LoopingCall @@ -33,7 +32,6 @@ from zope.interface import implements from leap.common.check import leap_assert_type from leap.mail import size -from leap.mail.decorators import deferred_to_thread from leap.mail.utils import empty, phash_iter from leap.mail.messageflow import MessageProducer from leap.mail.imap import interfaces @@ -48,7 +46,7 @@ logger = logging.getLogger(__name__) # The default period to do writebacks to the permanent # soledad storage, in seconds. -SOLEDAD_WRITE_PERIOD = 30 +SOLEDAD_WRITE_PERIOD = 15 FDOC = MessagePartType.fdoc.key HDOC = MessagePartType.hdoc.key @@ -106,6 +104,9 @@ class MemoryStore(object): :param write_period: the interval to dump messages to disk, in seconds. :type write_period: int """ + from twisted.internet import reactor + self.reactor = reactor + self._permanent_store = permanent_store self._write_period = write_period @@ -195,11 +196,15 @@ class MemoryStore(object): # New and dirty flags, to set MessageWrapper State. self._new = set([]) + self._new_queue = set([]) self._new_deferreds = {} + self._dirty = set([]) - self._rflags_dirty = set([]) + self._dirty_queue = set([]) self._dirty_deferreds = {} + self._rflags_dirty = set([]) + # Flag for signaling we're busy writing to the disk storage. setattr(self, self.WRITING_FLAG, False) @@ -297,7 +302,7 @@ class MemoryStore(object): """ Put an existing message. - This will set the dirty flag on the MemoryStore. + This will also set the dirty flag on the MemoryStore. :param mbox: the mailbox :type mbox: str or unicode @@ -498,9 +503,14 @@ class MemoryStore(object): # is accquired with set_bool_flag(self, self.WRITING_FLAG): for rflags_doc_wrapper in self.all_rdocs_iter(): - self.producer.push(rflags_doc_wrapper) - for msg_wrapper in self.all_new_dirty_msg_iter(): - self.producer.push(msg_wrapper) + self.producer.push(rflags_doc_wrapper, + state=self.producer.STATE_DIRTY) + for msg_wrapper in self.all_new_msg_iter(): + self.producer.push(msg_wrapper, + state=self.producer.STATE_NEW) + for msg_wrapper in self.all_dirty_msg_iter(): + self.producer.push(msg_wrapper, + state=self.producer.STATE_DIRTY) # MemoryStore specific methods. @@ -784,17 +794,34 @@ class MemoryStore(object): for uid in fdoc_store[mbox]: yield mbox, uid - def all_new_dirty_msg_iter(self): + def all_new_msg_iter(self): """ - Return generator that iterates through all new and dirty messages. + Return generator that iterates through all new messages. :return: generator of MessageWrappers :rtype: generator """ gm = self.get_message - new = (gm(*key) for key in self._new) - dirty = (gm(*key, flags_only=True) for key in self._dirty) - return chain(new, dirty) + new = [gm(*key) for key in self._new] + # move content from new set to the queue + self._new_queue.update(self._new) + self._new.difference_update(self._new) + return new + + def all_dirty_msg_iter(self): + """ + Return generator that iterates through all dirty messages. + + :return: generator of MessageWrappers + :rtype: generator + """ + gm = self.get_message + dirty = [gm(*key, flags_only=True) for key in self._dirty] + # move content from new and dirty sets to the queue + + self._dirty_queue.update(self._dirty) + self._dirty.difference_update(self._dirty) + return dirty def all_deleted_uid_iter(self, mbox): """ @@ -826,25 +853,28 @@ class MemoryStore(object): """ # TODO change indexing of sets to [mbox][key] too. # XXX should return *first* the news, and *then* the dirty... + + # TODO should query in queues too , true? + # return map(lambda _set: key in _set, (self._new, self._dirty)) - def set_new(self, key): + def set_new_queued(self, key): """ - Add the key value to the `new` set. + Add the key value to the `new-queue` set. :param key: the key for the message, in the form mbox, uid :type key: tuple """ - self._new.add(key) + self._new_queue.add(key) - def unset_new(self, key): + def unset_new_queued(self, key): """ - Remove the key value from the `new` set. + Remove the key value from the `new-queue` set. :param key: the key for the message, in the form mbox, uid :type key: tuple """ - self._new.discard(key) + self._new_queue.discard(key) deferreds = self._new_deferreds d = deferreds.get(key, None) if d: @@ -853,23 +883,23 @@ class MemoryStore(object): d.callback('%s, ok' % str(key)) deferreds.pop(key) - def set_dirty(self, key): + def set_dirty_queued(self, key): """ - Add the key value to the `dirty` set. + Add the key value to the `dirty-queue` set. :param key: the key for the message, in the form mbox, uid :type key: tuple """ - self._dirty.add(key) + self._dirty_queue.add(key) - def unset_dirty(self, key): + def unset_dirty_queued(self, key): """ - Remove the key value from the `dirty` set. + Remove the key value from the `dirty-queue` set. :param key: the key for the message, in the form mbox, uid :type key: tuple """ - self._dirty.discard(key) + self._dirty_queue.discard(key) deferreds = self._dirty_deferreds d = deferreds.get(key, None) if d: diff --git a/mail/src/leap/mail/imap/messageparts.py b/mail/src/leap/mail/imap/messageparts.py index b1f333a..9b7de86 100644 --- a/mail/src/leap/mail/imap/messageparts.py +++ b/mail/src/leap/mail/imap/messageparts.py @@ -98,7 +98,7 @@ class MessageWrapper(object): CDOCS = "cdocs" DOCS_ID = "docs_id" - # Using slots to limit some the memory footprint, + # Using slots to limit some the memory use, # Add your attribute here. __slots__ = ["_dict", "_new", "_dirty", "_storetype", "memstore"] @@ -148,7 +148,7 @@ class MessageWrapper(object): """ return self._new - def _set_new(self, value=True): + def _set_new(self, value=False): """ Set the value for the `new` flag, and propagate it to the memory store if any. @@ -161,8 +161,8 @@ class MessageWrapper(object): mbox = self.fdoc.content['mbox'] uid = self.fdoc.content['uid'] key = mbox, uid - fun = [self.memstore.unset_new, - self.memstore.set_new][int(value)] + fun = [self.memstore.unset_new_queued, + self.memstore.set_new_queued][int(value)] fun(key) else: logger.warning("Could not find a memstore referenced from this " @@ -193,8 +193,8 @@ class MessageWrapper(object): mbox = self.fdoc.content['mbox'] uid = self.fdoc.content['uid'] key = mbox, uid - fun = [self.memstore.unset_dirty, - self.memstore.set_dirty][int(value)] + fun = [self.memstore.unset_dirty_queued, + self.memstore.set_dirty_queued][int(value)] fun(key) else: logger.warning("Could not find a memstore referenced from this " @@ -271,11 +271,14 @@ class MessageWrapper(object): :rtype: generator """ if self._dirty: - mbox = self.fdoc.content[fields.MBOX_KEY] - uid = self.fdoc.content[fields.UID_KEY] - docid_dict = self._dict[self.DOCS_ID] - docid_dict[self.FDOC] = self.memstore.get_docid_for_fdoc( - mbox, uid) + try: + mbox = self.fdoc.content[fields.MBOX_KEY] + uid = self.fdoc.content[fields.UID_KEY] + docid_dict = self._dict[self.DOCS_ID] + docid_dict[self.FDOC] = self.memstore.get_docid_for_fdoc( + mbox, uid) + except Exception as exc: + logger.exception(exc) if not empty(self.fdoc.content): yield self.fdoc diff --git a/mail/src/leap/mail/imap/soledadstore.py b/mail/src/leap/mail/imap/soledadstore.py index e7c6b29..667e64d 100644 --- a/mail/src/leap/mail/imap/soledadstore.py +++ b/mail/src/leap/mail/imap/soledadstore.py @@ -220,12 +220,15 @@ class SoledadStore(ContentDedup): to be inserted. :type queue: Queue """ - from twisted.internet import reactor - - while not queue.empty(): - doc_wrapper = queue.get() - reactor.callInThread(self._consume_doc, doc_wrapper, - self.docs_notify_queue) + new, dirty = queue + while not new.empty(): + doc_wrapper = new.get() + self.reactor.callInThread(self._consume_doc, doc_wrapper, + self.docs_notify_queue) + while not dirty.empty(): + doc_wrapper = dirty.get() + self.reactor.callInThread(self._consume_doc, doc_wrapper, + self.docs_notify_queue) # Queue empty, flush the notifications queue. self.docs_notify_queue(None, flush=True) @@ -239,7 +242,8 @@ class SoledadStore(ContentDedup): :type doc_wrapper: MessageWrapper """ if isinstance(doc_wrapper, MessageWrapper): - logger.info("unsetting new flag!") + # XXX still needed for debug quite often + #logger.info("unsetting new flag!") doc_wrapper.new = False doc_wrapper.dirty = False @@ -284,6 +288,8 @@ class SoledadStore(ContentDedup): try: self._try_call(call, item) except Exception as exc: + logger.debug("ITEM WAS: %s" % str(item)) + logger.debug("ITEM CONTENT WAS: %s" % str(item.content)) logger.exception(exc) failed = True continue diff --git a/mail/src/leap/mail/messageflow.py b/mail/src/leap/mail/messageflow.py index 80121c8..c8f224c 100644 --- a/mail/src/leap/mail/messageflow.py +++ b/mail/src/leap/mail/messageflow.py @@ -49,7 +49,7 @@ class IMessageProducer(Interface): entities. """ - def push(self, item): + def push(self, item, state=None): """ Push a new item in the queue. """ @@ -101,6 +101,10 @@ class MessageProducer(object): # and consumption is not likely (?) to consume huge amounts of memory in # our current settings, so the need to pause the stream is not urgent now. + # TODO use enum + STATE_NEW = 1 + STATE_DIRTY = 2 + def __init__(self, consumer, queue=Queue.Queue, period=1): """ Initializes the MessageProducer @@ -115,7 +119,8 @@ class MessageProducer(object): # it should implement a `consume` method self._consumer = consumer - self._queue = queue() + self._queue_new = queue() + self._queue_dirty = queue() self._period = period self._loop = LoopingCall(self._check_for_new) @@ -130,7 +135,7 @@ class MessageProducer(object): If the queue is found empty, the loop is stopped. It will be started again after the addition of new items. """ - self._consumer.consume(self._queue) + self._consumer.consume((self._queue_new, self._queue_dirty)) if self.is_queue_empty(): self.stop() @@ -138,11 +143,13 @@ class MessageProducer(object): """ Return True if queue is empty, False otherwise. """ - return self._queue.empty() + new = self._queue_new + dirty = self._queue_dirty + return new.empty() and dirty.empty() # public methods: IMessageProducer - def push(self, item): + def push(self, item, state=None): """ Push a new item in the queue. @@ -150,7 +157,14 @@ class MessageProducer(object): """ # XXX this might raise if the queue does not accept any new # items. what to do then? - self._queue.put(item) + queue = self._queue_new + + if state == self.STATE_NEW: + queue = self._queue_new + if state == self.STATE_DIRTY: + queue = self._queue_dirty + + queue.put(item) self.start() def start(self): |