diff options
| author | Kali Kaneko <kali@leap.se> | 2014-02-11 01:43:14 -0400 | 
|---|---|---|
| committer | Kali Kaneko <kali@leap.se> | 2014-02-17 11:39:48 -0400 | 
| commit | d6c352a72766a17df9d3804f58890b876370bc93 (patch) | |
| tree | d883fbb45e5e4c7f5b30bb5fa6e49888d4879b66 /mail/src | |
| parent | 5bba9574dd0a8906178a928e4b7e8f1877a75a12 (diff) | |
separate new and dirty queues
Diffstat (limited to 'mail/src')
| -rw-r--r-- | mail/src/leap/mail/imap/memorystore.py | 80 | ||||
| -rw-r--r-- | mail/src/leap/mail/imap/messageparts.py | 25 | ||||
| -rw-r--r-- | mail/src/leap/mail/imap/soledadstore.py | 20 | ||||
| -rw-r--r-- | mail/src/leap/mail/messageflow.py | 26 | 
4 files changed, 102 insertions, 49 deletions
| diff --git a/mail/src/leap/mail/imap/memorystore.py b/mail/src/leap/mail/imap/memorystore.py index 786a9c4..a053f3f 100644 --- a/mail/src/leap/mail/imap/memorystore.py +++ b/mail/src/leap/mail/imap/memorystore.py @@ -24,7 +24,6 @@ import weakref  from collections import defaultdict  from copy import copy -from itertools import chain  from twisted.internet import defer  from twisted.internet.task import LoopingCall @@ -33,7 +32,6 @@ from zope.interface import implements  from leap.common.check import leap_assert_type  from leap.mail import size -from leap.mail.decorators import deferred_to_thread  from leap.mail.utils import empty, phash_iter  from leap.mail.messageflow import MessageProducer  from leap.mail.imap import interfaces @@ -48,7 +46,7 @@ logger = logging.getLogger(__name__)  # The default period to do writebacks to the permanent  # soledad storage, in seconds. -SOLEDAD_WRITE_PERIOD = 30 +SOLEDAD_WRITE_PERIOD = 15  FDOC = MessagePartType.fdoc.key  HDOC = MessagePartType.hdoc.key @@ -106,6 +104,9 @@ class MemoryStore(object):          :param write_period: the interval to dump messages to disk, in seconds.          :type write_period: int          """ +        from twisted.internet import reactor +        self.reactor = reactor +          self._permanent_store = permanent_store          self._write_period = write_period @@ -195,11 +196,15 @@ class MemoryStore(object):          # New and dirty flags, to set MessageWrapper State.          self._new = set([]) +        self._new_queue = set([])          self._new_deferreds = {} +          self._dirty = set([]) -        self._rflags_dirty = set([]) +        self._dirty_queue = set([])          self._dirty_deferreds = {} +        self._rflags_dirty = set([]) +          # Flag for signaling we're busy writing to the disk storage.          setattr(self, self.WRITING_FLAG, False) @@ -297,7 +302,7 @@ class MemoryStore(object):          """          Put an existing message. -        This will set the dirty flag on the MemoryStore. +        This will also set the dirty flag on the MemoryStore.          :param mbox: the mailbox          :type mbox: str or unicode @@ -498,9 +503,14 @@ class MemoryStore(object):          # is accquired          with set_bool_flag(self, self.WRITING_FLAG):              for rflags_doc_wrapper in self.all_rdocs_iter(): -                self.producer.push(rflags_doc_wrapper) -            for msg_wrapper in self.all_new_dirty_msg_iter(): -                self.producer.push(msg_wrapper) +                self.producer.push(rflags_doc_wrapper, +                                   state=self.producer.STATE_DIRTY) +            for msg_wrapper in self.all_new_msg_iter(): +                self.producer.push(msg_wrapper, +                                   state=self.producer.STATE_NEW) +            for msg_wrapper in self.all_dirty_msg_iter(): +                self.producer.push(msg_wrapper, +                                   state=self.producer.STATE_DIRTY)      # MemoryStore specific methods. @@ -784,17 +794,34 @@ class MemoryStore(object):              for uid in fdoc_store[mbox]:                  yield mbox, uid -    def all_new_dirty_msg_iter(self): +    def all_new_msg_iter(self):          """ -        Return generator that iterates through all new and dirty messages. +        Return generator that iterates through all new messages.          :return: generator of MessageWrappers          :rtype: generator          """          gm = self.get_message -        new = (gm(*key) for key in self._new) -        dirty = (gm(*key, flags_only=True) for key in self._dirty) -        return chain(new, dirty) +        new = [gm(*key) for key in self._new] +        # move content from new set to the queue +        self._new_queue.update(self._new) +        self._new.difference_update(self._new) +        return new + +    def all_dirty_msg_iter(self): +        """ +        Return generator that iterates through all dirty messages. + +        :return: generator of MessageWrappers +        :rtype: generator +        """ +        gm = self.get_message +        dirty = [gm(*key, flags_only=True) for key in self._dirty] +        # move content from new and dirty sets to the queue + +        self._dirty_queue.update(self._dirty) +        self._dirty.difference_update(self._dirty) +        return dirty      def all_deleted_uid_iter(self, mbox):          """ @@ -826,25 +853,28 @@ class MemoryStore(object):          """          # TODO change indexing of sets to [mbox][key] too.          # XXX should return *first* the news, and *then* the dirty... + +        # TODO should query in queues too , true? +        #          return map(lambda _set: key in _set, (self._new, self._dirty)) -    def set_new(self, key): +    def set_new_queued(self, key):          """ -        Add the key value to the `new` set. +        Add the key value to the `new-queue` set.          :param key: the key for the message, in the form mbox, uid          :type key: tuple          """ -        self._new.add(key) +        self._new_queue.add(key) -    def unset_new(self, key): +    def unset_new_queued(self, key):          """ -        Remove the key value from the `new` set. +        Remove the key value from the `new-queue` set.          :param key: the key for the message, in the form mbox, uid          :type key: tuple          """ -        self._new.discard(key) +        self._new_queue.discard(key)          deferreds = self._new_deferreds          d = deferreds.get(key, None)          if d: @@ -853,23 +883,23 @@ class MemoryStore(object):              d.callback('%s, ok' % str(key))              deferreds.pop(key) -    def set_dirty(self, key): +    def set_dirty_queued(self, key):          """ -        Add the key value to the `dirty` set. +        Add the key value to the `dirty-queue` set.          :param key: the key for the message, in the form mbox, uid          :type key: tuple          """ -        self._dirty.add(key) +        self._dirty_queue.add(key) -    def unset_dirty(self, key): +    def unset_dirty_queued(self, key):          """ -        Remove the key value from the `dirty` set. +        Remove the key value from the `dirty-queue` set.          :param key: the key for the message, in the form mbox, uid          :type key: tuple          """ -        self._dirty.discard(key) +        self._dirty_queue.discard(key)          deferreds = self._dirty_deferreds          d = deferreds.get(key, None)          if d: diff --git a/mail/src/leap/mail/imap/messageparts.py b/mail/src/leap/mail/imap/messageparts.py index b1f333a..9b7de86 100644 --- a/mail/src/leap/mail/imap/messageparts.py +++ b/mail/src/leap/mail/imap/messageparts.py @@ -98,7 +98,7 @@ class MessageWrapper(object):      CDOCS = "cdocs"      DOCS_ID = "docs_id" -    # Using slots to limit some the memory footprint, +    # Using slots to limit some the memory use,      # Add your attribute here.      __slots__ = ["_dict", "_new", "_dirty", "_storetype", "memstore"] @@ -148,7 +148,7 @@ class MessageWrapper(object):          """          return self._new -    def _set_new(self, value=True): +    def _set_new(self, value=False):          """          Set the value for the `new` flag, and propagate it          to the memory store if any. @@ -161,8 +161,8 @@ class MessageWrapper(object):              mbox = self.fdoc.content['mbox']              uid = self.fdoc.content['uid']              key = mbox, uid -            fun = [self.memstore.unset_new, -                   self.memstore.set_new][int(value)] +            fun = [self.memstore.unset_new_queued, +                   self.memstore.set_new_queued][int(value)]              fun(key)          else:              logger.warning("Could not find a memstore referenced from this " @@ -193,8 +193,8 @@ class MessageWrapper(object):              mbox = self.fdoc.content['mbox']              uid = self.fdoc.content['uid']              key = mbox, uid -            fun = [self.memstore.unset_dirty, -                   self.memstore.set_dirty][int(value)] +            fun = [self.memstore.unset_dirty_queued, +                   self.memstore.set_dirty_queued][int(value)]              fun(key)          else:              logger.warning("Could not find a memstore referenced from this " @@ -271,11 +271,14 @@ class MessageWrapper(object):          :rtype: generator          """          if self._dirty: -            mbox = self.fdoc.content[fields.MBOX_KEY] -            uid = self.fdoc.content[fields.UID_KEY] -            docid_dict = self._dict[self.DOCS_ID] -            docid_dict[self.FDOC] = self.memstore.get_docid_for_fdoc( -                mbox, uid) +            try: +                mbox = self.fdoc.content[fields.MBOX_KEY] +                uid = self.fdoc.content[fields.UID_KEY] +                docid_dict = self._dict[self.DOCS_ID] +                docid_dict[self.FDOC] = self.memstore.get_docid_for_fdoc( +                    mbox, uid) +            except Exception as exc: +                logger.exception(exc)          if not empty(self.fdoc.content):              yield self.fdoc diff --git a/mail/src/leap/mail/imap/soledadstore.py b/mail/src/leap/mail/imap/soledadstore.py index e7c6b29..667e64d 100644 --- a/mail/src/leap/mail/imap/soledadstore.py +++ b/mail/src/leap/mail/imap/soledadstore.py @@ -220,12 +220,15 @@ class SoledadStore(ContentDedup):                        to be inserted.          :type queue: Queue          """ -        from twisted.internet import reactor - -        while not queue.empty(): -            doc_wrapper = queue.get() -            reactor.callInThread(self._consume_doc, doc_wrapper, -                                 self.docs_notify_queue) +        new, dirty = queue +        while not new.empty(): +            doc_wrapper = new.get() +            self.reactor.callInThread(self._consume_doc, doc_wrapper, +                                      self.docs_notify_queue) +        while not dirty.empty(): +            doc_wrapper = dirty.get() +            self.reactor.callInThread(self._consume_doc, doc_wrapper, +                                      self.docs_notify_queue)          # Queue empty, flush the notifications queue.          self.docs_notify_queue(None, flush=True) @@ -239,7 +242,8 @@ class SoledadStore(ContentDedup):          :type doc_wrapper: MessageWrapper          """          if isinstance(doc_wrapper, MessageWrapper): -            logger.info("unsetting new flag!") +            # XXX still needed for debug quite often +            #logger.info("unsetting new flag!")              doc_wrapper.new = False              doc_wrapper.dirty = False @@ -284,6 +288,8 @@ class SoledadStore(ContentDedup):              try:                  self._try_call(call, item)              except Exception as exc: +                logger.debug("ITEM WAS: %s" % str(item)) +                logger.debug("ITEM CONTENT WAS: %s" % str(item.content))                  logger.exception(exc)                  failed = True                  continue diff --git a/mail/src/leap/mail/messageflow.py b/mail/src/leap/mail/messageflow.py index 80121c8..c8f224c 100644 --- a/mail/src/leap/mail/messageflow.py +++ b/mail/src/leap/mail/messageflow.py @@ -49,7 +49,7 @@ class IMessageProducer(Interface):      entities.      """ -    def push(self, item): +    def push(self, item, state=None):          """          Push a new item in the queue.          """ @@ -101,6 +101,10 @@ class MessageProducer(object):      # and consumption is not likely (?) to consume huge amounts of memory in      # our current settings, so the need to pause the stream is not urgent now. +    # TODO use enum +    STATE_NEW = 1 +    STATE_DIRTY = 2 +      def __init__(self, consumer, queue=Queue.Queue, period=1):          """          Initializes the MessageProducer @@ -115,7 +119,8 @@ class MessageProducer(object):          # it should implement a `consume` method          self._consumer = consumer -        self._queue = queue() +        self._queue_new = queue() +        self._queue_dirty = queue()          self._period = period          self._loop = LoopingCall(self._check_for_new) @@ -130,7 +135,7 @@ class MessageProducer(object):          If the queue is found empty, the loop is stopped. It will be started          again after the addition of new items.          """ -        self._consumer.consume(self._queue) +        self._consumer.consume((self._queue_new, self._queue_dirty))          if self.is_queue_empty():              self.stop() @@ -138,11 +143,13 @@ class MessageProducer(object):          """          Return True if queue is empty, False otherwise.          """ -        return self._queue.empty() +        new = self._queue_new +        dirty = self._queue_dirty +        return new.empty() and dirty.empty()      # public methods: IMessageProducer -    def push(self, item): +    def push(self, item, state=None):          """          Push a new item in the queue. @@ -150,7 +157,14 @@ class MessageProducer(object):          """          # XXX this might raise if the queue does not accept any new          # items. what to do then? -        self._queue.put(item) +        queue = self._queue_new + +        if state == self.STATE_NEW: +            queue = self._queue_new +        if state == self.STATE_DIRTY: +            queue = self._queue_dirty + +        queue.put(item)          self.start()      def start(self): | 
