From 4ae6ad57a0f80143e3ded867c1fdd2264804a775 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Tue, 21 Jan 2014 19:22:09 -0400 Subject: memory store for append/fetch/copy --- src/leap/mail/imap/memorystore.py | 478 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 478 insertions(+) create mode 100644 src/leap/mail/imap/memorystore.py (limited to 'src/leap/mail/imap/memorystore.py') diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py new file mode 100644 index 0000000..b8829e0 --- /dev/null +++ b/src/leap/mail/imap/memorystore.py @@ -0,0 +1,478 @@ +# -*- coding: utf-8 -*- +# memorystore.py +# Copyright (C) 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +""" +In-memory transient store for a LEAPIMAPServer. +""" +import contextlib +import logging +import weakref + +from collections import namedtuple + +from twisted.internet.task import LoopingCall +from zope.interface import implements + +from leap.mail import size +from leap.mail.messageflow import MessageProducer +from leap.mail.messageparts import MessagePartType +from leap.mail.imap import interfaces +from leap.mail.imap.fields import fields + +logger = logging.getLogger(__name__) + + +""" +A MessagePartDoc is a light wrapper around the dictionary-like +data that we pass along for message parts. It can be used almost everywhere +that you would expect a SoledadDocument, since it has a dict under the +`content` attribute. + +We also keep some metadata on it, relative in part to the message as a whole, +and sometimes to a part in particular only. + +* `new` indicates that the document has just been created. SoledadStore + should just create a new doc for all the related message parts. +* `store` indicates the type of store a given MessagePartDoc lives in. + We currently use this to indicate that the document comes from memeory, + but we should probably get rid of it as soon as we extend the use of the + SoledadStore interface along LeapMessage, MessageCollection and Mailbox. +* `part` is one of the MessagePartType enums. + +* `dirty` indicates that, while we already have the document in Soledad, + we have modified its state in memory, so we need to put_doc instead while + dumping the MemoryStore contents. + `dirty` attribute would only apply to flags-docs and linkage-docs. + + + XXX this is still not implemented! + +""" + +MessagePartDoc = namedtuple( + 'MessagePartDoc', + ['new', 'dirty', 'part', 'store', 'content']) + + +class ReferenciableDict(dict): + """ + A dict that can be weak-referenced. + + Some builtin objects are not weak-referenciable unless + subclassed. So we do. + + Used to return pointers to the items in the MemoryStore. + """ + + +class MessageWrapper(object): + """ + A simple nested dictionary container around the different message subparts. + """ + implements(interfaces.IMessageContainer) + + FDOC = "fdoc" + HDOC = "hdoc" + CDOCS = "cdocs" + + # XXX can use this to limit the memory footprint, + # or is it too premature to optimize? + # Does it work well together with the interfaces.implements? + + #__slots__ = ["_dict", "_new", "_dirty", "memstore"] + + def __init__(self, fdoc=None, hdoc=None, cdocs=None, + from_dict=None, memstore=None, + new=True, dirty=False): + self._dict = {} + + self._new = new + self._dirty = dirty + self.memstore = memstore + + if from_dict is not None: + self.from_dict(from_dict) + else: + if fdoc is not None: + self._dict[self.FDOC] = ReferenciableDict(fdoc) + if hdoc is not None: + self._dict[self.HDOC] = ReferenciableDict(hdoc) + if cdocs is not None: + self._dict[self.CDOCS] = ReferenciableDict(cdocs) + + # properties + + @property + def new(self): + return self._new + + def set_new(self, value=True): + self._new = value + + @property + def dirty(self): + return self._dirty + + def set_dirty(self, value=True): + self._dirty = value + + # IMessageContainer + + @property + def fdoc(self): + _fdoc = self._dict.get(self.FDOC, None) + if _fdoc: + content_ref = weakref.proxy(_fdoc) + else: + logger.warning("NO FDOC!!!") + content_ref = {} + return MessagePartDoc(new=self.new, dirty=self.dirty, + store=self._storetype, + part=MessagePartType.fdoc, + content=content_ref) + + @property + def hdoc(self): + _hdoc = self._dict.get(self.HDOC, None) + if _hdoc: + content_ref = weakref.proxy(_hdoc) + else: + logger.warning("NO HDOC!!!!") + content_ref = {} + return MessagePartDoc(new=self.new, dirty=self.dirty, + store=self._storetype, + part=MessagePartType.hdoc, + content=content_ref) + + @property + def cdocs(self): + _cdocs = self._dict.get(self.CDOCS, None) + if _cdocs: + return weakref.proxy(_cdocs) + else: + return {} + + def walk(self): + """ + Generator that iterates through all the parts, returning + MessagePartDoc. + """ + yield self.fdoc + yield self.hdoc + for cdoc in self.cdocs.values(): + # XXX this will break ---- + content_ref = weakref.proxy(cdoc) + yield MessagePartDoc(new=self.new, dirty=self.dirty, + store=self._storetype, + part=MessagePartType.cdoc, + content=content_ref) + + # i/o + + def as_dict(self): + """ + Return a dict representation of the parts contained. + """ + return self._dict + + def from_dict(self, msg_dict): + """ + Populate MessageWrapper parts from a dictionary. + It expects the same format that we use in a + MessageWrapper. + """ + fdoc, hdoc, cdocs = map( + lambda part: msg_dict.get(part, None), + [self.FDOC, self.HDOC, self.CDOCS]) + self._dict[self.FDOC] = fdoc + self._dict[self.HDOC] = hdoc + self._dict[self.CDOCS] = cdocs + + +@contextlib.contextmanager +def set_bool_flag(obj, att): + """ + Set a boolean flag to True while we're doing our thing. + Just to let the world know. + """ + setattr(obj, att, True) + try: + yield True + except RuntimeError as exc: + logger.exception(exc) + finally: + setattr(obj, att, False) + + +class MemoryStore(object): + """ + An in-memory store to where we can write the different parts that + we split the messages into and buffer them until we write them to the + permanent storage. + + It uses MessageWrapper instances to represent the message-parts, which are + indexed by mailbox name and UID. + + It also can be passed a permanent storage as a paremeter (any implementor + of IMessageStore, in this case a SoledadStore). In this case, a periodic + dump of the messages stored in memory will be done. The period of the + writes to the permanent storage is controled by the write_period parameter + in the constructor. + """ + implements(interfaces.IMessageStore) + implements(interfaces.IMessageStoreWriter) + + producer = None + + # TODO We will want to index by chash when we transition to local-only + # UIDs. + # TODO should store RECENT-FLAGS too + # TODO should store HDOCSET too (use weakrefs!) -- will need to subclass + # TODO do use dirty flag (maybe use namedtuples for that) so we can use it + # also as a read-cache. + + WRITING_FLAG = "_writing" + + def __init__(self, permanent_store=None, write_period=60): + """ + Initialize a MemoryStore. + + :param permanent_store: a IMessageStore implementor to dump + messages to. + :type permanent_store: IMessageStore + :param write_period: the interval to dump messages to disk, in seconds. + :type write_period: int + """ + self._permanent_store = permanent_store + self._write_period = write_period + + # Internal Storage + self._msg_store = {} + self._phash_store = {} + + # TODO ----------------- implement mailbox-level flags store too! ---- + self._rflags_store = {} + self._hdocset_store = {} + # TODO ----------------- implement mailbox-level flags store too! ---- + + # New and dirty flags, to set MessageWrapper State. + self._new = set([]) + self._dirty = set([]) + + # Flag for signaling we're busy writing to the disk storage. + setattr(self, self.WRITING_FLAG, False) + + if self._permanent_store is not None: + # this producer spits its messages to the permanent store + # consumer using a queue. We will use that to put + # our messages to be written. + self.producer = MessageProducer(permanent_store, + period=0.1) + # looping call for dumping to SoledadStore + self._write_loop = LoopingCall(self.write_messages, + permanent_store) + + # We can start the write loop right now, why wait? + self._start_write_loop() + + def _start_write_loop(self): + """ + Start loop for writing to disk database. + """ + if not self._write_loop.running: + self._write_loop.start(self._write_period, now=True) + + def _stop_write_loop(self): + """ + Stop loop for writing to disk database. + """ + if self._write_loop.running: + self._write_loop.stop() + + # IMessageStore + + # XXX this would work well for whole message operations. + # We would have to add a put_flags operation to modify only + # the flags doc (and set the dirty flag accordingly) + + def create_message(self, mbox, uid, message): + """ + Create the passed message into this MemoryStore. + + By default we consider that any message is a new message. + """ + print "adding new doc to memstore %s (%s)" % (mbox, uid) + key = mbox, uid + self._new.add(key) + + msg_dict = message.as_dict() + self._msg_store[key] = msg_dict + + cdocs = message.cdocs + + dirty = key in self._dirty + new = key in self._new + + # XXX should capture this in log... + + for cdoc_key in cdocs.keys(): + print "saving cdoc" + cdoc = self._msg_store[key]['cdocs'][cdoc_key] + + # XXX this should be done in the MessageWrapper constructor + # instead... + # first we make it weak-referenciable + referenciable_cdoc = ReferenciableDict(cdoc) + self._msg_store[key]['cdocs'][cdoc_key] = MessagePartDoc( + new=new, dirty=dirty, store="mem", + part=MessagePartType.cdoc, + content=referenciable_cdoc) + phash = cdoc.get(fields.PAYLOAD_HASH_KEY, None) + if not phash: + continue + self._phash_store[phash] = weakref.proxy(referenciable_cdoc) + + def put_message(self, mbox, uid, msg): + """ + Put an existing message. + """ + return NotImplementedError() + + def get_message(self, mbox, uid): + """ + Get a MessageWrapper for the given mbox and uid combination. + + :return: MessageWrapper or None + """ + key = mbox, uid + msg_dict = self._msg_store.get(key, None) + if msg_dict: + new, dirty = self._get_new_dirty_state(key) + return MessageWrapper(from_dict=msg_dict, + memstore=weakref.proxy(self)) + else: + return None + + def remove_message(self, mbox, uid): + """ + Remove a Message from this MemoryStore. + """ + raise NotImplementedError() + + # IMessageStoreWriter + + def write_messages(self, store): + """ + Write the message documents in this MemoryStore to a different store. + """ + # XXX pass if it's writing (ie, the queue is not empty...) + # See how to make the writing_flag aware of the queue state... + print "writing messages to producer..." + + with set_bool_flag(self, self.WRITING_FLAG): + for msg_wrapper in self.all_msg_iter(): + self.producer.push(msg_wrapper) + + # MemoryStore specific methods. + + def get_uids(self, mbox): + """ + Get all uids for a given mbox. + """ + all_keys = self._msg_store.keys() + return [uid for m, uid in all_keys if m == mbox] + + def get_last_uid(self, mbox): + """ + Get the highest UID for a given mbox. + """ + # XXX should get from msg_store keys instead! + if not self._new: + return 0 + return max(self.get_uids(mbox)) + + def count_new_mbox(self, mbox): + """ + Count the new messages by inbox. + """ + return len([(m, uid) for m, uid in self._new if mbox == mbox]) + + def count_new(self): + """ + Count all the new messages in the MemoryStore. + """ + return len(self._new) + + def get_by_phash(self, phash): + """ + Return a content-document by its payload-hash. + """ + doc = self._phash_store.get(phash, None) + + # XXX have to keep a mapping between phash and its linkage + # info, to know if this payload is been already saved or not. + # We will be able to get this from the linkage-docs, + # not yet implemented. + new = True + dirty = False + return MessagePartDoc( + new=new, dirty=dirty, store="mem", + part=MessagePartType.cdoc, + content=doc) + + def all_msg_iter(self): + """ + Return generator that iterates through all messages in the store. + """ + return (self.get_message(*key) + for key in sorted(self._msg_store.keys())) + + def _get_new_dirty_state(self, key): + """ + Return `new` and `dirty` flags for a given message. + """ + return map(lambda _set: key in _set, (self._new, self._dirty)) + + @property + def is_writing(self): + """ + Property that returns whether the store is currently writing its + internal state to a permanent storage. + + Used to evaluate whether the CHECK command can inform that the field + is clear to proceed, or waiting for the write operations to complete + is needed instead. + + :rtype: bool + """ + # XXX this should probably return a deferred !!! + return getattr(self, self.WRITING_FLAG) + + def put_part(self, part_type, value): + """ + Put the passed part into this IMessageStore. + `part` should be one of: fdoc, hdoc, cdoc + """ + # XXX turn that into a enum + + # Memory management. + + def get_size(self): + """ + Return the size of the internal storage. + Use for calculating the limit beyond which we should flush the store. + """ + return size.get_size(self._msg_store) -- cgit v1.2.3 From e2218eec4fd91e4648160a05e3debc05efa0d0d9 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Thu, 23 Jan 2014 02:36:38 -0400 Subject: add soledadstore class move parts-related bits to messageparts pass soledad in initialization for memory messages --- src/leap/mail/imap/memorystore.py | 185 ++------------------------------------ 1 file changed, 8 insertions(+), 177 deletions(-) (limited to 'src/leap/mail/imap/memorystore.py') diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py index b8829e0..7cb361f 100644 --- a/src/leap/mail/imap/memorystore.py +++ b/src/leap/mail/imap/memorystore.py @@ -21,187 +21,20 @@ import contextlib import logging import weakref -from collections import namedtuple - from twisted.internet.task import LoopingCall from zope.interface import implements from leap.mail import size from leap.mail.messageflow import MessageProducer -from leap.mail.messageparts import MessagePartType from leap.mail.imap import interfaces from leap.mail.imap.fields import fields +from leap.mail.imap.messageparts import MessagePartType, MessagePartDoc +from leap.mail.imap.messageparts import MessageWrapper +from leap.mail.imap.messageparts import ReferenciableDict logger = logging.getLogger(__name__) -""" -A MessagePartDoc is a light wrapper around the dictionary-like -data that we pass along for message parts. It can be used almost everywhere -that you would expect a SoledadDocument, since it has a dict under the -`content` attribute. - -We also keep some metadata on it, relative in part to the message as a whole, -and sometimes to a part in particular only. - -* `new` indicates that the document has just been created. SoledadStore - should just create a new doc for all the related message parts. -* `store` indicates the type of store a given MessagePartDoc lives in. - We currently use this to indicate that the document comes from memeory, - but we should probably get rid of it as soon as we extend the use of the - SoledadStore interface along LeapMessage, MessageCollection and Mailbox. -* `part` is one of the MessagePartType enums. - -* `dirty` indicates that, while we already have the document in Soledad, - we have modified its state in memory, so we need to put_doc instead while - dumping the MemoryStore contents. - `dirty` attribute would only apply to flags-docs and linkage-docs. - - - XXX this is still not implemented! - -""" - -MessagePartDoc = namedtuple( - 'MessagePartDoc', - ['new', 'dirty', 'part', 'store', 'content']) - - -class ReferenciableDict(dict): - """ - A dict that can be weak-referenced. - - Some builtin objects are not weak-referenciable unless - subclassed. So we do. - - Used to return pointers to the items in the MemoryStore. - """ - - -class MessageWrapper(object): - """ - A simple nested dictionary container around the different message subparts. - """ - implements(interfaces.IMessageContainer) - - FDOC = "fdoc" - HDOC = "hdoc" - CDOCS = "cdocs" - - # XXX can use this to limit the memory footprint, - # or is it too premature to optimize? - # Does it work well together with the interfaces.implements? - - #__slots__ = ["_dict", "_new", "_dirty", "memstore"] - - def __init__(self, fdoc=None, hdoc=None, cdocs=None, - from_dict=None, memstore=None, - new=True, dirty=False): - self._dict = {} - - self._new = new - self._dirty = dirty - self.memstore = memstore - - if from_dict is not None: - self.from_dict(from_dict) - else: - if fdoc is not None: - self._dict[self.FDOC] = ReferenciableDict(fdoc) - if hdoc is not None: - self._dict[self.HDOC] = ReferenciableDict(hdoc) - if cdocs is not None: - self._dict[self.CDOCS] = ReferenciableDict(cdocs) - - # properties - - @property - def new(self): - return self._new - - def set_new(self, value=True): - self._new = value - - @property - def dirty(self): - return self._dirty - - def set_dirty(self, value=True): - self._dirty = value - - # IMessageContainer - - @property - def fdoc(self): - _fdoc = self._dict.get(self.FDOC, None) - if _fdoc: - content_ref = weakref.proxy(_fdoc) - else: - logger.warning("NO FDOC!!!") - content_ref = {} - return MessagePartDoc(new=self.new, dirty=self.dirty, - store=self._storetype, - part=MessagePartType.fdoc, - content=content_ref) - - @property - def hdoc(self): - _hdoc = self._dict.get(self.HDOC, None) - if _hdoc: - content_ref = weakref.proxy(_hdoc) - else: - logger.warning("NO HDOC!!!!") - content_ref = {} - return MessagePartDoc(new=self.new, dirty=self.dirty, - store=self._storetype, - part=MessagePartType.hdoc, - content=content_ref) - - @property - def cdocs(self): - _cdocs = self._dict.get(self.CDOCS, None) - if _cdocs: - return weakref.proxy(_cdocs) - else: - return {} - - def walk(self): - """ - Generator that iterates through all the parts, returning - MessagePartDoc. - """ - yield self.fdoc - yield self.hdoc - for cdoc in self.cdocs.values(): - # XXX this will break ---- - content_ref = weakref.proxy(cdoc) - yield MessagePartDoc(new=self.new, dirty=self.dirty, - store=self._storetype, - part=MessagePartType.cdoc, - content=content_ref) - - # i/o - - def as_dict(self): - """ - Return a dict representation of the parts contained. - """ - return self._dict - - def from_dict(self, msg_dict): - """ - Populate MessageWrapper parts from a dictionary. - It expects the same format that we use in a - MessageWrapper. - """ - fdoc, hdoc, cdocs = map( - lambda part: msg_dict.get(part, None), - [self.FDOC, self.HDOC, self.CDOCS]) - self._dict[self.FDOC] = fdoc - self._dict[self.HDOC] = hdoc - self._dict[self.CDOCS] = cdocs - - @contextlib.contextmanager def set_bool_flag(obj, att): """ @@ -232,8 +65,8 @@ class MemoryStore(object): writes to the permanent storage is controled by the write_period parameter in the constructor. """ - implements(interfaces.IMessageStore) - implements(interfaces.IMessageStoreWriter) + implements(interfaces.IMessageStore, + interfaces.IMessageStoreWriter) producer = None @@ -332,7 +165,7 @@ class MemoryStore(object): print "saving cdoc" cdoc = self._msg_store[key]['cdocs'][cdoc_key] - # XXX this should be done in the MessageWrapper constructor + # FIXME this should be done in the MessageWrapper constructor # instead... # first we make it weak-referenciable referenciable_cdoc = ReferenciableDict(cdoc) @@ -399,10 +232,8 @@ class MemoryStore(object): """ Get the highest UID for a given mbox. """ - # XXX should get from msg_store keys instead! - if not self._new: - return 0 - return max(self.get_uids(mbox)) + uids = self.get_uids(mbox) + return uids and max(uids) or 0 def count_new_mbox(self, mbox): """ -- cgit v1.2.3 From ff28e22977db802c87f0b7be99e37c6de29183e9 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Thu, 23 Jan 2014 13:32:01 -0400 Subject: Unset new flag after successful write --- src/leap/mail/imap/memorystore.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) (limited to 'src/leap/mail/imap/memorystore.py') diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py index 7cb361f..f0bdab5 100644 --- a/src/leap/mail/imap/memorystore.py +++ b/src/leap/mail/imap/memorystore.py @@ -271,12 +271,28 @@ class MemoryStore(object): return (self.get_message(*key) for key in sorted(self._msg_store.keys())) + # new, dirty flags + def _get_new_dirty_state(self, key): """ Return `new` and `dirty` flags for a given message. """ return map(lambda _set: key in _set, (self._new, self._dirty)) + def set_new(self, key): + """ + Add the key value to the `new` set. + """ + self._new.add(key) + + def unset_new(self, key): + """ + Remove the key value from the `new` set. + """ + print "******************" + print "UNSETTING NEW FOR: %s" % str(key) + self._new.discard(key) + @property def is_writing(self): """ -- cgit v1.2.3 From e02db78b1b6d8fe021efd4adb250c64a1dd4bac4 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Fri, 24 Jan 2014 05:39:13 -0400 Subject: flags use the memstore * add new/dirty deferred dict to notify when written to disk * fix eventual duplication after copy * fix flag flickering on first retrieval. --- src/leap/mail/imap/memorystore.py | 265 +++++++++++++++++++++++++++++++++----- 1 file changed, 234 insertions(+), 31 deletions(-) (limited to 'src/leap/mail/imap/memorystore.py') diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py index f0bdab5..f0c0d4b 100644 --- a/src/leap/mail/imap/memorystore.py +++ b/src/leap/mail/imap/memorystore.py @@ -21,10 +21,13 @@ import contextlib import logging import weakref +from twisted.internet import defer from twisted.internet.task import LoopingCall +from twisted.python import log from zope.interface import implements from leap.mail import size +from leap.mail.utils import empty from leap.mail.messageflow import MessageProducer from leap.mail.imap import interfaces from leap.mail.imap.fields import fields @@ -34,6 +37,8 @@ from leap.mail.imap.messageparts import ReferenciableDict logger = logging.getLogger(__name__) +SOLEDAD_WRITE_PERIOD = 20 + @contextlib.contextmanager def set_bool_flag(obj, att): @@ -79,7 +84,8 @@ class MemoryStore(object): WRITING_FLAG = "_writing" - def __init__(self, permanent_store=None, write_period=60): + def __init__(self, permanent_store=None, + write_period=SOLEDAD_WRITE_PERIOD): """ Initialize a MemoryStore. @@ -92,10 +98,23 @@ class MemoryStore(object): self._permanent_store = permanent_store self._write_period = write_period - # Internal Storage + # Internal Storage: messages self._msg_store = {} + + # Internal Storage: payload-hash + """ + {'phash': weakreaf.proxy(dict)} + """ self._phash_store = {} + # Internal Storage: content-hash:fdoc + """ + {'chash': {'mbox-a': weakref.proxy(dict), + 'mbox-b': weakref.proxy(dict)} + } + """ + self._chash_fdoc_store = {} + # TODO ----------------- implement mailbox-level flags store too! ---- self._rflags_store = {} self._hdocset_store = {} @@ -103,7 +122,9 @@ class MemoryStore(object): # New and dirty flags, to set MessageWrapper State. self._new = set([]) + self._new_deferreds = {} self._dirty = set([]) + self._dirty_deferreds = {} # Flag for signaling we're busy writing to the disk storage. setattr(self, self.WRITING_FLAG, False) @@ -141,48 +162,141 @@ class MemoryStore(object): # We would have to add a put_flags operation to modify only # the flags doc (and set the dirty flag accordingly) - def create_message(self, mbox, uid, message): + def create_message(self, mbox, uid, message, notify_on_disk=True): """ Create the passed message into this MemoryStore. By default we consider that any message is a new message. + + :param mbox: the mailbox + :type mbox: basestring + :param uid: the UID for the message + :type uid: int + :param message: a to be added + :type message: MessageWrapper + :param notify_on_disk: + :type notify_on_disk: bool + + :return: a Deferred. if notify_on_disk is True, will be fired + when written to the db on disk. + Otherwise will fire inmediately + :rtype: Deferred """ print "adding new doc to memstore %s (%s)" % (mbox, uid) key = mbox, uid + + d = defer.Deferred() + d.addCallback(lambda result: log.msg("message save: %s" % result)) + self._new.add(key) + self._new_deferreds[key] = d + self._add_message(mbox, uid, message, notify_on_disk) + print "create message: ", d + return d - msg_dict = message.as_dict() - self._msg_store[key] = msg_dict + def put_message(self, mbox, uid, message, notify_on_disk=True): + """ + Put an existing message. - cdocs = message.cdocs + :param mbox: the mailbox + :type mbox: basestring + :param uid: the UID for the message + :type uid: int + :param message: a to be added + :type message: MessageWrapper + :param notify_on_disk: + :type notify_on_disk: bool - dirty = key in self._dirty - new = key in self._new + :return: a Deferred. if notify_on_disk is True, will be fired + when written to the db on disk. + Otherwise will fire inmediately + :rtype: Deferred + """ + key = mbox, uid + + d = defer.Deferred() + d.addCallback(lambda result: log.msg("message save: %s" % result)) + + self._dirty.add(key) + self._dirty_deferreds[key] = d + self._add_message(mbox, uid, message, notify_on_disk) + return d - # XXX should capture this in log... + def _add_message(self, mbox, uid, message, notify_on_disk=True): + # XXX have to differentiate between notify_new and notify_dirty + + key = mbox, uid + msg_dict = message.as_dict() + print "ADDING MESSAGE..." + import pprint; pprint.pprint(msg_dict) + + # XXX use the enum as keys + + try: + store = self._msg_store[key] + except KeyError: + self._msg_store[key] = {'fdoc': {}, + 'hdoc': {}, + 'cdocs': {}, + 'docs_id': {}} + store = self._msg_store[key] + + print "In store (before):" + import pprint; pprint.pprint(store) + + #self._msg_store[key] = msg_dict + fdoc = msg_dict.get('fdoc', None) + if fdoc: + if not store.get('fdoc', None): + store['fdoc'] = ReferenciableDict({}) + store['fdoc'].update(fdoc) + + # content-hash indexing + chash = fdoc.get(fields.CONTENT_HASH_KEY) + chash_fdoc_store = self._chash_fdoc_store + if not chash in chash_fdoc_store: + chash_fdoc_store[chash] = {} + + chash_fdoc_store[chash][mbox] = weakref.proxy( + store['fdoc']) + + hdoc = msg_dict.get('hdoc', None) + if hdoc: + if not store.get('hdoc', None): + store['hdoc'] = ReferenciableDict({}) + store['hdoc'].update(hdoc) + + docs_id = msg_dict.get('docs_id', None) + if docs_id: + if not store.get('docs_id', None): + store['docs_id'] = {} + store['docs_id'].update(docs_id) + cdocs = message.cdocs for cdoc_key in cdocs.keys(): - print "saving cdoc" - cdoc = self._msg_store[key]['cdocs'][cdoc_key] + if not store.get('cdocs', None): + store['cdocs'] = {} - # FIXME this should be done in the MessageWrapper constructor - # instead... + cdoc = cdocs[cdoc_key] # first we make it weak-referenciable referenciable_cdoc = ReferenciableDict(cdoc) - self._msg_store[key]['cdocs'][cdoc_key] = MessagePartDoc( - new=new, dirty=dirty, store="mem", - part=MessagePartType.cdoc, - content=referenciable_cdoc) + store['cdocs'][cdoc_key] = referenciable_cdoc phash = cdoc.get(fields.PAYLOAD_HASH_KEY, None) if not phash: continue self._phash_store[phash] = weakref.proxy(referenciable_cdoc) - def put_message(self, mbox, uid, msg): - """ - Put an existing message. - """ - return NotImplementedError() + def prune(seq, store): + for key in seq: + if key in store and empty(store.get(key)): + store.pop(key) + + prune(('fdoc', 'hdoc', 'cdocs', 'docs_id'), store) + #import ipdb; ipdb.set_trace() + + + print "after appending to store: ", key + import pprint; pprint.pprint(self._msg_store[key]) def get_message(self, mbox, uid): """ @@ -203,7 +317,13 @@ class MemoryStore(object): """ Remove a Message from this MemoryStore. """ - raise NotImplementedError() + try: + key = mbox, uid + self._new.discard(key) + self._dirty.discard(key) + self._msg_store.pop(key, None) + except Exception as exc: + logger.exception(exc) # IMessageStoreWriter @@ -211,12 +331,15 @@ class MemoryStore(object): """ Write the message documents in this MemoryStore to a different store. """ - # XXX pass if it's writing (ie, the queue is not empty...) - # See how to make the writing_flag aware of the queue state... - print "writing messages to producer..." + # For now, we pass if the queue is not empty, to avoid duplication. + # We would better use a flag to know when we've already enqueued an + # item. + if not self.producer.is_queue_empty(): + return + print "Writing messages to Soledad..." with set_bool_flag(self, self.WRITING_FLAG): - for msg_wrapper in self.all_msg_iter(): + for msg_wrapper in self.all_new_dirty_msg_iter(): self.producer.push(msg_wrapper) # MemoryStore specific methods. @@ -247,12 +370,14 @@ class MemoryStore(object): """ return len(self._new) - def get_by_phash(self, phash): + def get_cdoc_from_phash(self, phash): """ Return a content-document by its payload-hash. """ doc = self._phash_store.get(phash, None) + # XXX return None for consistency? + # XXX have to keep a mapping between phash and its linkage # info, to know if this payload is been already saved or not. # We will be able to get this from the linkage-docs, @@ -262,7 +387,40 @@ class MemoryStore(object): return MessagePartDoc( new=new, dirty=dirty, store="mem", part=MessagePartType.cdoc, - content=doc) + content=doc, + doc_id=None) + + def get_fdoc_from_chash(self, chash, mbox): + """ + Return a flags-document by its content-hash and a given mailbox. + + :return: MessagePartDoc, or None. + """ + docs_dict = self._chash_fdoc_store.get(chash, None) + fdoc = docs_dict.get(mbox, None) if docs_dict else None + + print "GETTING FDOC BY CHASH:", fdoc + + # a couple of special cases. + # 1. We might have a doc with empty content... + if empty(fdoc): + return None + + # ...Or the message could exist, but being flagged for deletion. + # We want to create a new one in this case. + # Hmmm what if the deletion is un-done?? We would end with a + # duplicate... + if fdoc and fields.DELETED_FLAG in fdoc[fields.FLAGS_KEY]: + return None + + # XXX get flags + new = True + dirty = False + return MessagePartDoc( + new=new, dirty=dirty, store="mem", + part=MessagePartType.fdoc, + content=fdoc, + doc_id=None) def all_msg_iter(self): """ @@ -271,6 +429,25 @@ class MemoryStore(object): return (self.get_message(*key) for key in sorted(self._msg_store.keys())) + def all_new_dirty_msg_iter(self): + """ + Return geneator that iterates through all new and dirty messages. + """ + return (self.get_message(*key) + for key in sorted(self._msg_store.keys()) + if key in self._new or key in self._dirty) + + def all_deleted_uid_iter(self, mbox): + """ + Return generator that iterates through the UIDs for all messags + with deleted flag in a given mailbox. + """ + all_deleted = ( + msg['fdoc']['uid'] for msg in self._msg_store.values() + if msg.get('fdoc', None) + and fields.DELETED_FLAG in msg['fdoc']['flags']) + return all_deleted + # new, dirty flags def _get_new_dirty_state(self, key): @@ -289,9 +466,35 @@ class MemoryStore(object): """ Remove the key value from the `new` set. """ - print "******************" - print "UNSETTING NEW FOR: %s" % str(key) + print "Unsetting NEW for: %s" % str(key) self._new.discard(key) + deferreds = self._new_deferreds + d = deferreds.get(key, None) + if d: + # XXX use a namedtuple for passing the result + # when we check it in the other side. + d.callback('%s, ok' % str(key)) + deferreds.pop(key) + + def set_dirty(self, key): + """ + Add the key value to the `dirty` set. + """ + self._dirty.add(key) + + def unset_dirty(self, key): + """ + Remove the key value from the `dirty` set. + """ + print "Unsetting DIRTY for: %s" % str(key) + self._dirty.discard(key) + deferreds = self._dirty_deferreds + d = deferreds.get(key, None) + if d: + # XXX use a namedtuple for passing the result + # when we check it in the other side. + d.callback('%s, ok' % str(key)) + deferreds.pop(key) @property def is_writing(self): -- cgit v1.2.3 From b6f08b2fb731a4f3d1e6a04839bd3af71e9b2f5c Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Fri, 24 Jan 2014 21:09:38 -0400 Subject: use enums for dict keys --- src/leap/mail/imap/memorystore.py | 60 ++++++++++++++++----------------------- 1 file changed, 25 insertions(+), 35 deletions(-) (limited to 'src/leap/mail/imap/memorystore.py') diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py index f0c0d4b..dcae6b0 100644 --- a/src/leap/mail/imap/memorystore.py +++ b/src/leap/mail/imap/memorystore.py @@ -224,32 +224,28 @@ class MemoryStore(object): def _add_message(self, mbox, uid, message, notify_on_disk=True): # XXX have to differentiate between notify_new and notify_dirty - key = mbox, uid msg_dict = message.as_dict() - print "ADDING MESSAGE..." - import pprint; pprint.pprint(msg_dict) - # XXX use the enum as keys + FDOC = MessagePartType.fdoc.key + HDOC = MessagePartType.hdoc.key + CDOCS = MessagePartType.cdocs.key + DOCS_ID = MessagePartType.docs_id.key try: store = self._msg_store[key] except KeyError: - self._msg_store[key] = {'fdoc': {}, - 'hdoc': {}, - 'cdocs': {}, - 'docs_id': {}} + self._msg_store[key] = {FDOC: {}, + HDOC: {}, + CDOCS: {}, + DOCS_ID: {}} store = self._msg_store[key] - print "In store (before):" - import pprint; pprint.pprint(store) - - #self._msg_store[key] = msg_dict - fdoc = msg_dict.get('fdoc', None) + fdoc = msg_dict.get(FDOC, None) if fdoc: - if not store.get('fdoc', None): - store['fdoc'] = ReferenciableDict({}) - store['fdoc'].update(fdoc) + if not store.get(FDOC, None): + store[FDOC] = ReferenciableDict({}) + store[FDOC].update(fdoc) # content-hash indexing chash = fdoc.get(fields.CONTENT_HASH_KEY) @@ -258,29 +254,29 @@ class MemoryStore(object): chash_fdoc_store[chash] = {} chash_fdoc_store[chash][mbox] = weakref.proxy( - store['fdoc']) + store[FDOC]) - hdoc = msg_dict.get('hdoc', None) + hdoc = msg_dict.get(HDOC, None) if hdoc: - if not store.get('hdoc', None): - store['hdoc'] = ReferenciableDict({}) - store['hdoc'].update(hdoc) + if not store.get(HDOC, None): + store[HDOC] = ReferenciableDict({}) + store[HDOC].update(hdoc) - docs_id = msg_dict.get('docs_id', None) + docs_id = msg_dict.get(DOCS_ID, None) if docs_id: - if not store.get('docs_id', None): - store['docs_id'] = {} - store['docs_id'].update(docs_id) + if not store.get(DOCS_ID, None): + store[DOCS_ID] = {} + store[DOCS_ID].update(docs_id) cdocs = message.cdocs for cdoc_key in cdocs.keys(): - if not store.get('cdocs', None): - store['cdocs'] = {} + if not store.get(CDOCS, None): + store[CDOCS] = {} cdoc = cdocs[cdoc_key] # first we make it weak-referenciable referenciable_cdoc = ReferenciableDict(cdoc) - store['cdocs'][cdoc_key] = referenciable_cdoc + store[CDOCS][cdoc_key] = referenciable_cdoc phash = cdoc.get(fields.PAYLOAD_HASH_KEY, None) if not phash: continue @@ -290,13 +286,7 @@ class MemoryStore(object): for key in seq: if key in store and empty(store.get(key)): store.pop(key) - - prune(('fdoc', 'hdoc', 'cdocs', 'docs_id'), store) - #import ipdb; ipdb.set_trace() - - - print "after appending to store: ", key - import pprint; pprint.pprint(self._msg_store[key]) + prune((FDOC, HDOC, CDOCS, DOCS_ID), store) def get_message(self, mbox, uid): """ -- cgit v1.2.3 From a5508429b90e2e9b58c5d073610ee5a10274663f Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Fri, 24 Jan 2014 23:14:38 -0400 Subject: recent-flags use the memory store --- src/leap/mail/imap/memorystore.py | 112 ++++++++++++++++++++++++++++++++++++-- 1 file changed, 108 insertions(+), 4 deletions(-) (limited to 'src/leap/mail/imap/memorystore.py') diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py index dcae6b0..232a2fb 100644 --- a/src/leap/mail/imap/memorystore.py +++ b/src/leap/mail/imap/memorystore.py @@ -21,6 +21,8 @@ import contextlib import logging import weakref +from collections import defaultdict + from twisted.internet import defer from twisted.internet.task import LoopingCall from twisted.python import log @@ -32,6 +34,7 @@ from leap.mail.messageflow import MessageProducer from leap.mail.imap import interfaces from leap.mail.imap.fields import fields from leap.mail.imap.messageparts import MessagePartType, MessagePartDoc +from leap.mail.imap.messageparts import RecentFlagsDoc from leap.mail.imap.messageparts import MessageWrapper from leap.mail.imap.messageparts import ReferenciableDict @@ -109,16 +112,38 @@ class MemoryStore(object): # Internal Storage: content-hash:fdoc """ + chash-fdoc-store keeps references to + the flag-documents indexed by content-hash. + {'chash': {'mbox-a': weakref.proxy(dict), 'mbox-b': weakref.proxy(dict)} } """ self._chash_fdoc_store = {} - # TODO ----------------- implement mailbox-level flags store too! ---- - self._rflags_store = {} + # Internal Storage: recent-flags store + """ + recent-flags store keeps one dict per mailbox, + with the document-id of the u1db document + and the set of the UIDs that have the recent flag. + + {'mbox-a': {'doc_id': 'deadbeef', + 'set': {1,2,3,4} + } + } + """ + # TODO this will have to transition to content-hash + # indexes after we move to local-only UIDs. + + self._rflags_store = defaultdict( + lambda: {'doc_id': None, 'set': set([])}) + + # TODO ----------------- implement mailbox-level flags store too? + # XXX maybe we don't need this anymore... + # let's see how good does it prefetch the headers if + # we cache them in the store. self._hdocset_store = {} - # TODO ----------------- implement mailbox-level flags store too! ---- + # -------------------------------------------------------------- # New and dirty flags, to set MessageWrapper State. self._new = set([]) @@ -224,6 +249,8 @@ class MemoryStore(object): def _add_message(self, mbox, uid, message, notify_on_disk=True): # XXX have to differentiate between notify_new and notify_dirty + # TODO defaultdict the hell outa here... + key = mbox, uid msg_dict = message.as_dict() @@ -331,6 +358,8 @@ class MemoryStore(object): with set_bool_flag(self, self.WRITING_FLAG): for msg_wrapper in self.all_new_dirty_msg_iter(): self.producer.push(msg_wrapper) + for rflags_doc_wrapper in self.all_rdocs_iter(): + self.producer.push(rflags_doc_wrapper) # MemoryStore specific methods. @@ -486,6 +515,79 @@ class MemoryStore(object): d.callback('%s, ok' % str(key)) deferreds.pop(key) + # Recent Flags + + # TODO --- nice but unused + def set_recent_flag(self, mbox, uid): + """ + Set the `Recent` flag for a given mailbox and UID. + """ + self._rflags_store[mbox]['set'].add(uid) + + # TODO --- nice but unused + def unset_recent_flag(self, mbox, uid): + """ + Unset the `Recent` flag for a given mailbox and UID. + """ + self._rflags_store[mbox]['set'].discard(uid) + + def set_recent_flags(self, mbox, value): + """ + Set the value for the set of the recent flags. + Used from the property in the MessageCollection. + """ + self._rflags_store[mbox]['set'] = set(value) + + def load_recent_flags(self, mbox, flags_doc): + """ + Load the passed flags document in the recent flags store, for a given + mailbox. + + :param flags_doc: A dictionary containing the `doc_id` of the Soledad + flags-document for this mailbox, and the `set` + of uids marked with that flag. + """ + self._rflags_store[mbox] = flags_doc + + def get_recent_flags(self, mbox): + """ + Get the set of UIDs with the `Recent` flag for this mailbox. + + :return: set, or None + """ + rflag_for_mbox = self._rflags_store.get(mbox, None) + if not rflag_for_mbox: + return None + return self._rflags_store[mbox]['set'] + + def all_rdocs_iter(self): + """ + Return an iterator through all in-memory recent flag dicts, wrapped + under a RecentFlagsDoc namedtuple. + Used for saving to disk. + + :rtype: generator + """ + rflags_store = self._rflags_store + + # XXX use enums + DOC_ID = "doc_id" + SET = "set" + + print "LEN RFLAGS_STORE ------->", len(rflags_store) + return ( + RecentFlagsDoc( + doc_id=rflags_store[mbox][DOC_ID], + content={ + fields.TYPE_KEY: fields.TYPE_RECENT_VAL, + fields.MBOX_KEY: mbox, + fields.RECENTFLAGS_KEY: list( + rflags_store[mbox][SET]) + }) + for mbox in rflags_store) + + # Dump-to-disk controls. + @property def is_writing(self): """ @@ -498,7 +600,9 @@ class MemoryStore(object): :rtype: bool """ - # XXX this should probably return a deferred !!! + # FIXME this should return a deferred !!! + # XXX ----- can fire when all new + dirty deferreds + # are done (gatherResults) return getattr(self, self.WRITING_FLAG) def put_part(self, part_type, value): -- cgit v1.2.3 From f5365ae0c2edb8b3e879f876f2f7e42b25f4616a Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Mon, 27 Jan 2014 16:11:53 -0400 Subject: handle last_uid property in memory store --- src/leap/mail/imap/memorystore.py | 236 +++++++++++++++++++++++++++++++------- 1 file changed, 197 insertions(+), 39 deletions(-) (limited to 'src/leap/mail/imap/memorystore.py') diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py index 232a2fb..60e98c7 100644 --- a/src/leap/mail/imap/memorystore.py +++ b/src/leap/mail/imap/memorystore.py @@ -19,16 +19,20 @@ In-memory transient store for a LEAPIMAPServer. """ import contextlib import logging +import threading import weakref from collections import defaultdict +from copy import copy from twisted.internet import defer from twisted.internet.task import LoopingCall from twisted.python import log from zope.interface import implements +from leap.common.check import leap_assert_type from leap.mail import size +from leap.mail.decorators import deferred from leap.mail.utils import empty from leap.mail.messageflow import MessageProducer from leap.mail.imap import interfaces @@ -40,7 +44,10 @@ from leap.mail.imap.messageparts import ReferenciableDict logger = logging.getLogger(__name__) -SOLEDAD_WRITE_PERIOD = 20 + +# The default period to do writebacks to the permanent +# soledad storage, in seconds. +SOLEDAD_WRITE_PERIOD = 10 @contextlib.contextmanager @@ -76,16 +83,11 @@ class MemoryStore(object): implements(interfaces.IMessageStore, interfaces.IMessageStoreWriter) - producer = None - # TODO We will want to index by chash when we transition to local-only # UIDs. - # TODO should store RECENT-FLAGS too - # TODO should store HDOCSET too (use weakrefs!) -- will need to subclass - # TODO do use dirty flag (maybe use namedtuples for that) so we can use it - # also as a read-cache. WRITING_FLAG = "_writing" + _last_uid_lock = threading.Lock() def __init__(self, permanent_store=None, write_period=SOLEDAD_WRITE_PERIOD): @@ -138,17 +140,20 @@ class MemoryStore(object): self._rflags_store = defaultdict( lambda: {'doc_id': None, 'set': set([])}) - # TODO ----------------- implement mailbox-level flags store too? - # XXX maybe we don't need this anymore... - # let's see how good does it prefetch the headers if - # we cache them in the store. - self._hdocset_store = {} - # -------------------------------------------------------------- + """ + last-uid store keeps the count of the highest UID + per mailbox. + + {'mbox-a': 42, + 'mbox-b': 23} + """ + self._last_uid = {} # New and dirty flags, to set MessageWrapper State. self._new = set([]) self._new_deferreds = {} self._dirty = set([]) + self._rflags_dirty = set([]) self._dirty_deferreds = {} # Flag for signaling we're busy writing to the disk storage. @@ -210,14 +215,25 @@ class MemoryStore(object): print "adding new doc to memstore %s (%s)" % (mbox, uid) key = mbox, uid + self._add_message(mbox, uid, message, notify_on_disk) + d = defer.Deferred() d.addCallback(lambda result: log.msg("message save: %s" % result)) - self._new.add(key) + + # We store this deferred so we can keep track of the pending + # operations internally. self._new_deferreds[key] = d - self._add_message(mbox, uid, message, notify_on_disk) - print "create message: ", d - return d + + if notify_on_disk: + # Caller wants to be notified when the message is on disk + # so we pass the deferred that will be fired when the message + # has been written. + return d + else: + # Caller does not care, just fired and forgot, so we pass + # a defer that will inmediately have its callback triggered. + return defer.succeed('fire-and-forget:%s' % str(key)) def put_message(self, mbox, uid, message, notify_on_disk=True): """ @@ -238,13 +254,14 @@ class MemoryStore(object): :rtype: Deferred """ key = mbox, uid - d = defer.Deferred() - d.addCallback(lambda result: log.msg("message save: %s" % result)) + d.addCallback(lambda result: log.msg("message PUT save: %s" % result)) self._dirty.add(key) self._dirty_deferreds[key] = d self._add_message(mbox, uid, message, notify_on_disk) + #print "dirty ", self._dirty + #print "new ", self._new return d def _add_message(self, mbox, uid, message, notify_on_disk=True): @@ -315,6 +332,19 @@ class MemoryStore(object): store.pop(key) prune((FDOC, HDOC, CDOCS, DOCS_ID), store) + #print "after adding: " + #import pprint; pprint.pprint(self._msg_store[key]) + + def get_docid_for_fdoc(self, mbox, uid): + """ + Get Soledad document id for the flags-doc for a given mbox and uid. + """ + fdoc = self._permanent_store.get_flags_doc(mbox, uid) + if not fdoc: + return None + doc_id = fdoc.doc_id + return doc_id + def get_message(self, mbox, uid): """ Get a MessageWrapper for the given mbox and uid combination. @@ -326,6 +356,8 @@ class MemoryStore(object): if msg_dict: new, dirty = self._get_new_dirty_state(key) return MessageWrapper(from_dict=msg_dict, + new=new, + dirty=dirty, memstore=weakref.proxy(self)) else: return None @@ -334,6 +366,13 @@ class MemoryStore(object): """ Remove a Message from this MemoryStore. """ + # XXX For the moment we are only removing the flags and headers + # docs. The rest we leave there polluting your hard disk, + # until we think about a good way of deorphaning. + + # XXX implement elijah's idea of using a PUT document as a + # token to ensure consistency in the removal. + try: key = mbox, uid self._new.discard(key) @@ -348,18 +387,22 @@ class MemoryStore(object): """ Write the message documents in this MemoryStore to a different store. """ - # For now, we pass if the queue is not empty, to avoid duplication. + # For now, we pass if the queue is not empty, to avoid duplicate + # queuing. # We would better use a flag to know when we've already enqueued an # item. + + # XXX this could return the deferred for all the enqueued operations + if not self.producer.is_queue_empty(): return print "Writing messages to Soledad..." with set_bool_flag(self, self.WRITING_FLAG): - for msg_wrapper in self.all_new_dirty_msg_iter(): - self.producer.push(msg_wrapper) for rflags_doc_wrapper in self.all_rdocs_iter(): self.producer.push(rflags_doc_wrapper) + for msg_wrapper in self.all_new_dirty_msg_iter(): + self.producer.push(msg_wrapper) # MemoryStore specific methods. @@ -370,12 +413,61 @@ class MemoryStore(object): all_keys = self._msg_store.keys() return [uid for m, uid in all_keys if m == mbox] + # last_uid + def get_last_uid(self, mbox): """ Get the highest UID for a given mbox. + It will be the highest between the highest uid in the message store for + the mailbox, and the soledad integer cache. """ uids = self.get_uids(mbox) - return uids and max(uids) or 0 + last_mem_uid = uids and max(uids) or 0 + last_soledad_uid = self.get_last_soledad_uid(mbox) + return max(last_mem_uid, last_soledad_uid) + + def get_last_soledad_uid(self, mbox): + """ + Get last uid for a given mbox from the soledad integer cache. + """ + return self._last_uid.get(mbox, 0) + + def set_last_soledad_uid(self, mbox, value): + """ + Set last uid for a given mbox in the soledad integer cache. + SoledadMailbox should prime this value during initialization. + Other methods (during message adding) SHOULD call + `increment_last_soledad_uid` instead. + """ + leap_assert_type(value, int) + print "setting last soledad uid for ", mbox, "to", value + # if we already have a vlue here, don't do anything + with self._last_uid_lock: + if not self._last_uid.get(mbox, None): + self._last_uid[mbox] = value + + def increment_last_soledad_uid(self, mbox): + """ + Increment by one the soledad integer cache for the last_uid for + this mbox, and fire a defer-to-thread to update the soledad value. + The caller should lock the call tho this method. + """ + with self._last_uid_lock: + self._last_uid[mbox] += 1 + value = self._last_uid[mbox] + self.write_last_uid(mbox, value) + return value + + @deferred + def write_last_uid(self, mbox, value): + """ + Increment the soledad cache, + """ + leap_assert_type(value, int) + if self._permanent_store: + self._permanent_store.write_last_uid(mbox, value) + + # Counting sheeps... def count_new_mbox(self, mbox): """ @@ -418,14 +510,12 @@ class MemoryStore(object): docs_dict = self._chash_fdoc_store.get(chash, None) fdoc = docs_dict.get(mbox, None) if docs_dict else None - print "GETTING FDOC BY CHASH:", fdoc - # a couple of special cases. # 1. We might have a doc with empty content... if empty(fdoc): return None - # ...Or the message could exist, but being flagged for deletion. + # 2. ...Or the message could exist, but being flagged for deletion. # We want to create a new one in this case. # Hmmm what if the deletion is un-done?? We would end with a # duplicate... @@ -456,15 +546,22 @@ class MemoryStore(object): for key in sorted(self._msg_store.keys()) if key in self._new or key in self._dirty) + def all_msg_dict_for_mbox(self, mbox): + """ + Return all the message dicts for a given mbox. + """ + return [self._msg_store[(mb, uid)] + for mb, uid in self._msg_store if mb == mbox] + def all_deleted_uid_iter(self, mbox): """ Return generator that iterates through the UIDs for all messags with deleted flag in a given mailbox. """ - all_deleted = ( - msg['fdoc']['uid'] for msg in self._msg_store.values() + all_deleted = [ + msg['fdoc']['uid'] for msg in self.all_msg_dict_for_mbox(mbox) if msg.get('fdoc', None) - and fields.DELETED_FLAG in msg['fdoc']['flags']) + and fields.DELETED_FLAG in msg['fdoc']['flags']] return all_deleted # new, dirty flags @@ -473,6 +570,7 @@ class MemoryStore(object): """ Return `new` and `dirty` flags for a given message. """ + # XXX should return *first* the news, and *then* the dirty... return map(lambda _set: key in _set, (self._new, self._dirty)) def set_new(self, key): @@ -485,7 +583,7 @@ class MemoryStore(object): """ Remove the key value from the `new` set. """ - print "Unsetting NEW for: %s" % str(key) + #print "Unsetting NEW for: %s" % str(key) self._new.discard(key) deferreds = self._new_deferreds d = deferreds.get(key, None) @@ -505,7 +603,7 @@ class MemoryStore(object): """ Remove the key value from the `dirty` set. """ - print "Unsetting DIRTY for: %s" % str(key) + #print "Unsetting DIRTY for: %s" % str(key) self._dirty.discard(key) deferreds = self._dirty_deferreds d = deferreds.get(key, None) @@ -522,6 +620,7 @@ class MemoryStore(object): """ Set the `Recent` flag for a given mailbox and UID. """ + self._rflags_dirty.add(mbox) self._rflags_store[mbox]['set'].add(uid) # TODO --- nice but unused @@ -536,6 +635,7 @@ class MemoryStore(object): Set the value for the set of the recent flags. Used from the property in the MessageCollection. """ + self._rflags_dirty.add(mbox) self._rflags_store[mbox]['set'] = set(value) def load_recent_flags(self, mbox, flags_doc): @@ -568,23 +668,81 @@ class MemoryStore(object): :rtype: generator """ - rflags_store = self._rflags_store - # XXX use enums DOC_ID = "doc_id" SET = "set" - print "LEN RFLAGS_STORE ------->", len(rflags_store) - return ( - RecentFlagsDoc( + rflags_store = self._rflags_store + + def get_rdoc(mbox, rdict): + mbox_rflag_set = rdict[SET] + recent_set = copy(mbox_rflag_set) + # zero it! + mbox_rflag_set.difference_update(mbox_rflag_set) + return RecentFlagsDoc( doc_id=rflags_store[mbox][DOC_ID], content={ fields.TYPE_KEY: fields.TYPE_RECENT_VAL, fields.MBOX_KEY: mbox, - fields.RECENTFLAGS_KEY: list( - rflags_store[mbox][SET]) + fields.RECENTFLAGS_KEY: list(recent_set) }) - for mbox in rflags_store) + + return (get_rdoc(mbox, rdict) for mbox, rdict in rflags_store.items() + if not empty(rdict[SET])) + + # Methods that mirror the IMailbox interface + + def remove_all_deleted(self, mbox): + """ + Remove all messages flagged \\Deleted from this Memory Store only. + Called from `expunge` + """ + mem_deleted = self.all_deleted_uid_iter(mbox) + for uid in mem_deleted: + self.remove_message(mbox, uid) + return mem_deleted + + def expunge(self, mbox): + """ + Remove all messages flagged \\Deleted, from the Memory Store + and from the permanent store also. + """ + # TODO expunge should add itself as a callback to the ongoing + # writes. + soledad_store = self._permanent_store + + try: + # 1. Stop the writing call + self._stop_write_loop() + # 2. Enqueue a last write. + #self.write_messages(soledad_store) + # 3. Should wait on the writebacks to finish ??? + # FIXME wait for this, and add all the rest of the method + # as a callback!!! + except Exception as exc: + logger.exception(exc) + + # Now, we...: + + try: + # 1. Delete all messages marked as deleted in soledad. + + # XXX this could be deferred for faster operation. + if soledad_store: + sol_deleted = soledad_store.remove_all_deleted(mbox) + else: + sol_deleted = [] + + # 2. Delete all messages marked as deleted in memory. + mem_deleted = self.remove_all_deleted(mbox) + + all_deleted = set(mem_deleted).union(set(sol_deleted)) + print "deleted ", all_deleted + except Exception as exc: + logger.exception(exc) + finally: + self._start_write_loop() + return all_deleted # Dump-to-disk controls. -- cgit v1.2.3 From a7e0054b595822325f749b0b1df7d25cab4e6486 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Tue, 28 Jan 2014 18:39:59 -0400 Subject: docstring fixes Also some fixes for None comparisons. --- src/leap/mail/imap/memorystore.py | 215 ++++++++++++++++++++++++++++++-------- 1 file changed, 172 insertions(+), 43 deletions(-) (limited to 'src/leap/mail/imap/memorystore.py') diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py index 60e98c7..2d60b13 100644 --- a/src/leap/mail/imap/memorystore.py +++ b/src/leap/mail/imap/memorystore.py @@ -199,12 +199,14 @@ class MemoryStore(object): By default we consider that any message is a new message. :param mbox: the mailbox - :type mbox: basestring + :type mbox: str or unicode :param uid: the UID for the message :type uid: int - :param message: a to be added + :param message: a message to be added :type message: MessageWrapper - :param notify_on_disk: + :param notify_on_disk: whether the deferred that is returned should + wait until the message is written to disk to + be fired. :type notify_on_disk: bool :return: a Deferred. if notify_on_disk is True, will be fired @@ -212,7 +214,7 @@ class MemoryStore(object): Otherwise will fire inmediately :rtype: Deferred """ - print "adding new doc to memstore %s (%s)" % (mbox, uid) + log.msg("adding new doc to memstore %r (%r)" % (mbox, uid)) key = mbox, uid self._add_message(mbox, uid, message, notify_on_disk) @@ -239,13 +241,17 @@ class MemoryStore(object): """ Put an existing message. + This will set the dirty flag on the MemoryStore. + :param mbox: the mailbox - :type mbox: basestring + :type mbox: str or unicode :param uid: the UID for the message :type uid: int - :param message: a to be added + :param message: a message to be added :type message: MessageWrapper - :param notify_on_disk: + :param notify_on_disk: whether the deferred that is returned should + wait until the message is written to disk to + be fired. :type notify_on_disk: bool :return: a Deferred. if notify_on_disk is True, will be fired @@ -260,11 +266,13 @@ class MemoryStore(object): self._dirty.add(key) self._dirty_deferreds[key] = d self._add_message(mbox, uid, message, notify_on_disk) - #print "dirty ", self._dirty - #print "new ", self._new return d def _add_message(self, mbox, uid, message, notify_on_disk=True): + """ + Helper method, called by both create_message and put_message. + See those for parameter documentation. + """ # XXX have to differentiate between notify_new and notify_dirty # TODO defaultdict the hell outa here... @@ -332,15 +340,19 @@ class MemoryStore(object): store.pop(key) prune((FDOC, HDOC, CDOCS, DOCS_ID), store) - #print "after adding: " - #import pprint; pprint.pprint(self._msg_store[key]) - def get_docid_for_fdoc(self, mbox, uid): """ - Get Soledad document id for the flags-doc for a given mbox and uid. + Return Soledad document id for the flags-doc for a given mbox and uid, + or None of no flags document could be found. + + :param mbox: the mailbox + :type mbox: str or unicode + :param uid: the message UID + :type uid: int + :rtype: unicode or None """ fdoc = self._permanent_store.get_flags_doc(mbox, uid) - if not fdoc: + if empty(fdoc): return None doc_id = fdoc.doc_id return doc_id @@ -349,22 +361,30 @@ class MemoryStore(object): """ Get a MessageWrapper for the given mbox and uid combination. + :param mbox: the mailbox + :type mbox: str or unicode + :param uid: the message UID + :type uid: int + :return: MessageWrapper or None """ key = mbox, uid msg_dict = self._msg_store.get(key, None) - if msg_dict: - new, dirty = self._get_new_dirty_state(key) - return MessageWrapper(from_dict=msg_dict, - new=new, - dirty=dirty, - memstore=weakref.proxy(self)) - else: + if empty(msg_dict): return None + new, dirty = self._get_new_dirty_state(key) + return MessageWrapper(from_dict=msg_dict, + new=new, dirty=dirty, + memstore=weakref.proxy(self)) def remove_message(self, mbox, uid): """ Remove a Message from this MemoryStore. + + :param mbox: the mailbox + :type mbox: str or unicode + :param uid: the message UID + :type uid: int """ # XXX For the moment we are only removing the flags and headers # docs. The rest we leave there polluting your hard disk, @@ -386,6 +406,8 @@ class MemoryStore(object): def write_messages(self, store): """ Write the message documents in this MemoryStore to a different store. + + :param store: the IMessageStore to write to """ # For now, we pass if the queue is not empty, to avoid duplicate # queuing. @@ -397,7 +419,10 @@ class MemoryStore(object): if not self.producer.is_queue_empty(): return - print "Writing messages to Soledad..." + logger.info("Writing messages to Soledad...") + + # TODO change for lock, and make the property access + # is accquired with set_bool_flag(self, self.WRITING_FLAG): for rflags_doc_wrapper in self.all_rdocs_iter(): self.producer.push(rflags_doc_wrapper) @@ -409,6 +434,9 @@ class MemoryStore(object): def get_uids(self, mbox): """ Get all uids for a given mbox. + + :param mbox: the mailbox + :type mbox: str or unicode """ all_keys = self._msg_store.keys() return [uid for m, uid in all_keys if m == mbox] @@ -420,6 +448,9 @@ class MemoryStore(object): Get the highest UID for a given mbox. It will be the highest between the highest uid in the message store for the mailbox, and the soledad integer cache. + + :param mbox: the mailbox + :type mbox: str or unicode """ uids = self.get_uids(mbox) last_mem_uid = uids and max(uids) or 0 @@ -429,6 +460,9 @@ class MemoryStore(object): def get_last_soledad_uid(self, mbox): """ Get last uid for a given mbox from the soledad integer cache. + + :param mbox: the mailbox + :type mbox: str or unicode """ return self._last_uid.get(mbox, 0) @@ -438,10 +472,16 @@ class MemoryStore(object): SoledadMailbox should prime this value during initialization. Other methods (during message adding) SHOULD call `increment_last_soledad_uid` instead. + + :param mbox: the mailbox + :type mbox: str or unicode + :param value: the value to set + :type value: int """ leap_assert_type(value, int) - print "setting last soledad uid for ", mbox, "to", value - # if we already have a vlue here, don't do anything + logger.info("setting last soledad uid for %s to %s" % + (mbox, value)) + # if we already have a value here, don't do anything with self._last_uid_lock: if not self._last_uid.get(mbox, None): self._last_uid[mbox] = value @@ -451,6 +491,9 @@ class MemoryStore(object): Increment by one the soledad integer cache for the last_uid for this mbox, and fire a defer-to-thread to update the soledad value. The caller should lock the call tho this method. + + :param mbox: the mailbox + :type mbox: str or unicode """ with self._last_uid_lock: self._last_uid[mbox] += 1 @@ -461,7 +504,12 @@ class MemoryStore(object): @deferred def write_last_uid(self, mbox, value): """ - Increment the soledad cache, + Increment the soledad integer cache for the highest uid value. + + :param mbox: the mailbox + :type mbox: str or unicode + :param value: the value to set + :type value: int """ leap_assert_type(value, int) if self._permanent_store: @@ -472,18 +520,30 @@ class MemoryStore(object): def count_new_mbox(self, mbox): """ Count the new messages by inbox. + + :param mbox: the mailbox + :type mbox: str or unicode + :return: number of new messages + :rtype: int """ return len([(m, uid) for m, uid in self._new if mbox == mbox]) + # XXX used at all? def count_new(self): """ Count all the new messages in the MemoryStore. + + :rtype: int """ return len(self._new) def get_cdoc_from_phash(self, phash): """ Return a content-document by its payload-hash. + + :param phash: the payload hash to check against + :type phash: str or unicode + :rtype: MessagePartDoc """ doc = self._phash_store.get(phash, None) @@ -504,8 +564,16 @@ class MemoryStore(object): def get_fdoc_from_chash(self, chash, mbox): """ Return a flags-document by its content-hash and a given mailbox. + Used during content-duplication detection while copying or adding a + message. + + :param chash: the content hash to check against + :type chash: str or unicode + :param mbox: the mailbox + :type mbox: str or unicode - :return: MessagePartDoc, or None. + :return: MessagePartDoc. It will return None if the flags document + has empty content or it is flagged as \\Deleted. """ docs_dict = self._chash_fdoc_store.get(chash, None) fdoc = docs_dict.get(mbox, None) if docs_dict else None @@ -522,9 +590,10 @@ class MemoryStore(object): if fdoc and fields.DELETED_FLAG in fdoc[fields.FLAGS_KEY]: return None - # XXX get flags - new = True - dirty = False + uid = fdoc.content[fields.UID_KEY] + key = mbox, uid + new = key in self._new + dirty = key in self._dirty return MessagePartDoc( new=new, dirty=dirty, store="mem", part=MessagePartType.fdoc, @@ -534,13 +603,19 @@ class MemoryStore(object): def all_msg_iter(self): """ Return generator that iterates through all messages in the store. + + :return: generator of MessageWrappers + :rtype: generator """ return (self.get_message(*key) for key in sorted(self._msg_store.keys())) def all_new_dirty_msg_iter(self): """ - Return geneator that iterates through all new and dirty messages. + Return generator that iterates through all new and dirty messages. + + :return: generator of MessageWrappers + :rtype: generator """ return (self.get_message(*key) for key in sorted(self._msg_store.keys()) @@ -549,15 +624,29 @@ class MemoryStore(object): def all_msg_dict_for_mbox(self, mbox): """ Return all the message dicts for a given mbox. + + :param mbox: the mailbox + :type mbox: str or unicode + :return: list of dictionaries + :rtype: list """ + # This *needs* to return a fixed sequence. Otherwise the dictionary len + # will change during iteration, when we modify it return [self._msg_store[(mb, uid)] for mb, uid in self._msg_store if mb == mbox] def all_deleted_uid_iter(self, mbox): """ - Return generator that iterates through the UIDs for all messags + Return a list with the UIDs for all messags with deleted flag in a given mailbox. + + :param mbox: the mailbox + :type mbox: str or unicode + :return: list of integers + :rtype: list """ + # This *needs* to return a fixed sequence. Otherwise the dictionary len + # will change during iteration, when we modify it all_deleted = [ msg['fdoc']['uid'] for msg in self.all_msg_dict_for_mbox(mbox) if msg.get('fdoc', None) @@ -569,6 +658,11 @@ class MemoryStore(object): def _get_new_dirty_state(self, key): """ Return `new` and `dirty` flags for a given message. + + :param key: the key for the message, in the form mbox, uid + :type key: tuple + :return: tuple of bools + :rtype: tuple """ # XXX should return *first* the news, and *then* the dirty... return map(lambda _set: key in _set, (self._new, self._dirty)) @@ -576,14 +670,19 @@ class MemoryStore(object): def set_new(self, key): """ Add the key value to the `new` set. + + :param key: the key for the message, in the form mbox, uid + :type key: tuple """ self._new.add(key) def unset_new(self, key): """ Remove the key value from the `new` set. + + :param key: the key for the message, in the form mbox, uid + :type key: tuple """ - #print "Unsetting NEW for: %s" % str(key) self._new.discard(key) deferreds = self._new_deferreds d = deferreds.get(key, None) @@ -596,14 +695,19 @@ class MemoryStore(object): def set_dirty(self, key): """ Add the key value to the `dirty` set. + + :param key: the key for the message, in the form mbox, uid + :type key: tuple """ self._dirty.add(key) def unset_dirty(self, key): """ Remove the key value from the `dirty` set. + + :param key: the key for the message, in the form mbox, uid + :type key: tuple """ - #print "Unsetting DIRTY for: %s" % str(key) self._dirty.discard(key) deferreds = self._dirty_deferreds d = deferreds.get(key, None) @@ -619,6 +723,11 @@ class MemoryStore(object): def set_recent_flag(self, mbox, uid): """ Set the `Recent` flag for a given mailbox and UID. + + :param mbox: the mailbox + :type mbox: str or unicode + :param uid: the message UID + :type uid: int """ self._rflags_dirty.add(mbox) self._rflags_store[mbox]['set'].add(uid) @@ -627,6 +736,11 @@ class MemoryStore(object): def unset_recent_flag(self, mbox, uid): """ Unset the `Recent` flag for a given mailbox and UID. + + :param mbox: the mailbox + :type mbox: str or unicode + :param uid: the message UID + :type uid: int """ self._rflags_store[mbox]['set'].discard(uid) @@ -634,6 +748,11 @@ class MemoryStore(object): """ Set the value for the set of the recent flags. Used from the property in the MessageCollection. + + :param mbox: the mailbox + :type mbox: str or unicode + :param value: a sequence of flags to set + :type value: sequence """ self._rflags_dirty.add(mbox) self._rflags_store[mbox]['set'] = set(value) @@ -643,6 +762,8 @@ class MemoryStore(object): Load the passed flags document in the recent flags store, for a given mailbox. + :param mbox: the mailbox + :type mbox: str or unicode :param flags_doc: A dictionary containing the `doc_id` of the Soledad flags-document for this mailbox, and the `set` of uids marked with that flag. @@ -651,9 +772,11 @@ class MemoryStore(object): def get_recent_flags(self, mbox): """ - Get the set of UIDs with the `Recent` flag for this mailbox. + Return the set of UIDs with the `Recent` flag for this mailbox. - :return: set, or None + :param mbox: the mailbox + :type mbox: str or unicode + :rtype: set, or None """ rflag_for_mbox = self._rflags_store.get(mbox, None) if not rflag_for_mbox: @@ -666,6 +789,7 @@ class MemoryStore(object): under a RecentFlagsDoc namedtuple. Used for saving to disk. + :return: a generator of RecentFlagDoc :rtype: generator """ # XXX use enums @@ -696,6 +820,11 @@ class MemoryStore(object): """ Remove all messages flagged \\Deleted from this Memory Store only. Called from `expunge` + + :param mbox: the mailbox + :type mbox: str or unicode + :return: a list of UIDs + :rtype: list """ mem_deleted = self.all_deleted_uid_iter(mbox) for uid in mem_deleted: @@ -706,6 +835,11 @@ class MemoryStore(object): """ Remove all messages flagged \\Deleted, from the Memory Store and from the permanent store also. + + :param mbox: the mailbox + :type mbox: str or unicode + :return: a list of UIDs + :rtype: list """ # TODO expunge should add itself as a callback to the ongoing # writes. @@ -737,7 +871,7 @@ class MemoryStore(object): mem_deleted = self.remove_all_deleted(mbox) all_deleted = set(mem_deleted).union(set(sol_deleted)) - print "deleted ", all_deleted + logger.debug("deleted %r" % all_deleted) except Exception as exc: logger.exception(exc) finally: @@ -763,18 +897,13 @@ class MemoryStore(object): # are done (gatherResults) return getattr(self, self.WRITING_FLAG) - def put_part(self, part_type, value): - """ - Put the passed part into this IMessageStore. - `part` should be one of: fdoc, hdoc, cdoc - """ - # XXX turn that into a enum - # Memory management. def get_size(self): """ Return the size of the internal storage. Use for calculating the limit beyond which we should flush the store. + + :rtype: int """ return size.get_size(self._msg_store) -- cgit v1.2.3 From 1b71ba510a2e6680f1ecc84eacfc492b0bbe24fc Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Wed, 29 Jan 2014 00:54:20 -0400 Subject: Fix copy and deletion problems * reorganize and simplify STORE command processing * add the notification after the processing of the whole sequence --- src/leap/mail/imap/memorystore.py | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) (limited to 'src/leap/mail/imap/memorystore.py') diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py index 2d60b13..fac66ad 100644 --- a/src/leap/mail/imap/memorystore.py +++ b/src/leap/mail/imap/memorystore.py @@ -357,7 +357,7 @@ class MemoryStore(object): doc_id = fdoc.doc_id return doc_id - def get_message(self, mbox, uid): + def get_message(self, mbox, uid, flags_only=False): """ Get a MessageWrapper for the given mbox and uid combination. @@ -365,17 +365,27 @@ class MemoryStore(object): :type mbox: str or unicode :param uid: the message UID :type uid: int + :param flags_only: whether the message should carry only a reference + to the flags document. + :type flags_only: bool :return: MessageWrapper or None """ key = mbox, uid + FDOC = MessagePartType.fdoc.key + msg_dict = self._msg_store.get(key, None) if empty(msg_dict): return None new, dirty = self._get_new_dirty_state(key) - return MessageWrapper(from_dict=msg_dict, - new=new, dirty=dirty, - memstore=weakref.proxy(self)) + if flags_only: + return MessageWrapper(fdoc=msg_dict[FDOC], + new=new, dirty=dirty, + memstore=weakref.proxy(self)) + else: + return MessageWrapper(from_dict=msg_dict, + new=new, dirty=dirty, + memstore=weakref.proxy(self)) def remove_message(self, mbox, uid): """ @@ -590,7 +600,7 @@ class MemoryStore(object): if fdoc and fields.DELETED_FLAG in fdoc[fields.FLAGS_KEY]: return None - uid = fdoc.content[fields.UID_KEY] + uid = fdoc[fields.UID_KEY] key = mbox, uid new = key in self._new dirty = key in self._dirty -- cgit v1.2.3 From ff7de0c9bc760e097c0286d2d62a19095be3f35e Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Thu, 30 Jan 2014 18:35:03 -0400 Subject: prime-uids We pre-fetch the uids from soledad on mailbox initialization --- src/leap/mail/imap/memorystore.py | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) (limited to 'src/leap/mail/imap/memorystore.py') diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py index fac66ad..217ad8e 100644 --- a/src/leap/mail/imap/memorystore.py +++ b/src/leap/mail/imap/memorystore.py @@ -149,6 +149,14 @@ class MemoryStore(object): """ self._last_uid = {} + """ + known-uids keeps a count of the uids that soledad knows for a given + mailbox + + {'mbox-a': set([1,2,3])} + """ + self._known_uids = defaultdict(set) + # New and dirty flags, to set MessageWrapper State. self._new = set([]) self._new_deferreds = {} @@ -447,10 +455,20 @@ class MemoryStore(object): :param mbox: the mailbox :type mbox: str or unicode + :rtype: list """ all_keys = self._msg_store.keys() return [uid for m, uid in all_keys if m == mbox] + def get_soledad_known_uids(self, mbox): + """ + Get all uids that soledad knows about, from the memory cache. + :param mbox: the mailbox + :type mbox: str or unicode + :rtype: list + """ + return self._known_uids.get(mbox, []) + # last_uid def get_last_uid(self, mbox): @@ -496,6 +514,18 @@ class MemoryStore(object): if not self._last_uid.get(mbox, None): self._last_uid[mbox] = value + def set_known_uids(self, mbox, value): + """ + Set the value fo the known-uids set for this mbox. + + :param mbox: the mailbox + :type mbox: str or unicode + :param value: a sequence of integers to be added to the set. + :type value: tuple + """ + current = self._known_uids[mbox] + self._known_uids[mbox] = current.union(set(value)) + def increment_last_soledad_uid(self, mbox): """ Increment by one the soledad integer cache for the last_uid for -- cgit v1.2.3 From 0f6a8e1c83995cffec51e81f626d4bb29d4f7345 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Fri, 31 Jan 2014 03:34:03 -0400 Subject: properly implement deferreds in several commands Passing along a deferred as an observer whose callback will be called with the proper result. Returning to thread in the appropiate points. just let's remember that twisted APIs are not thread safe! SoledadStore process_item also properly returned to thread. Changed @deferred to @deferred_to_thread so it results less confusing to read. "know the territory". aha! --- src/leap/mail/imap/memorystore.py | 43 ++++++++++++++++++++------------------- 1 file changed, 22 insertions(+), 21 deletions(-) (limited to 'src/leap/mail/imap/memorystore.py') diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py index 217ad8e..211d282 100644 --- a/src/leap/mail/imap/memorystore.py +++ b/src/leap/mail/imap/memorystore.py @@ -32,7 +32,7 @@ from zope.interface import implements from leap.common.check import leap_assert_type from leap.mail import size -from leap.mail.decorators import deferred +from leap.mail.decorators import deferred_to_thread from leap.mail.utils import empty from leap.mail.messageflow import MessageProducer from leap.mail.imap import interfaces @@ -200,7 +200,8 @@ class MemoryStore(object): # We would have to add a put_flags operation to modify only # the flags doc (and set the dirty flag accordingly) - def create_message(self, mbox, uid, message, notify_on_disk=True): + def create_message(self, mbox, uid, message, observer, + notify_on_disk=True): """ Create the passed message into this MemoryStore. @@ -212,38 +213,38 @@ class MemoryStore(object): :type uid: int :param message: a message to be added :type message: MessageWrapper - :param notify_on_disk: whether the deferred that is returned should + :param observer: the deferred that will fire with the + UID of the message. If notify_on_disk is True, + this will happen when the message is written to + Soledad. Otherwise it will fire as soon as we've + added the message to the memory store. + :type observer: Deferred + :param notify_on_disk: whether the `observer` deferred should wait until the message is written to disk to be fired. :type notify_on_disk: bool - - :return: a Deferred. if notify_on_disk is True, will be fired - when written to the db on disk. - Otherwise will fire inmediately - :rtype: Deferred """ log.msg("adding new doc to memstore %r (%r)" % (mbox, uid)) key = mbox, uid self._add_message(mbox, uid, message, notify_on_disk) - - d = defer.Deferred() - d.addCallback(lambda result: log.msg("message save: %s" % result)) self._new.add(key) - # We store this deferred so we can keep track of the pending - # operations internally. - self._new_deferreds[key] = d + def log_add(result): + log.msg("message save: %s" % result) + return result + observer.addCallback(log_add) if notify_on_disk: - # Caller wants to be notified when the message is on disk - # so we pass the deferred that will be fired when the message - # has been written. - return d - else: + # We store this deferred so we can keep track of the pending + # operations internally. + # TODO this should fire with the UID !!! -- change that in + # the soledad store code. + self._new_deferreds[key] = observer + if not notify_on_disk: # Caller does not care, just fired and forgot, so we pass # a defer that will inmediately have its callback triggered. - return defer.succeed('fire-and-forget:%s' % str(key)) + observer.callback(uid) def put_message(self, mbox, uid, message, notify_on_disk=True): """ @@ -541,7 +542,7 @@ class MemoryStore(object): self.write_last_uid(mbox, value) return value - @deferred + @deferred_to_thread def write_last_uid(self, mbox, value): """ Increment the soledad integer cache for the highest uid value. -- cgit v1.2.3 From 18fed49c4143eb764ae9e806882d24f8f4e95744 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Sun, 2 Feb 2014 09:26:37 -0400 Subject: fix missing content after in-memory add because THE KEYS WILL BE STRINGS AFTER ADDED TO SOLEDAD Can I remember that? * Fix copy from local folders * Fix copy when we already have a copy of the message in the inbox, marked as deleted. * Fix also bad deferred.succeed in add_msg when it already exist. --- src/leap/mail/imap/memorystore.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) (limited to 'src/leap/mail/imap/memorystore.py') diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py index 211d282..542e227 100644 --- a/src/leap/mail/imap/memorystore.py +++ b/src/leap/mail/imap/memorystore.py @@ -318,7 +318,7 @@ class MemoryStore(object): store[FDOC]) hdoc = msg_dict.get(HDOC, None) - if hdoc: + if hdoc is not None: if not store.get(HDOC, None): store[HDOC] = ReferenciableDict({}) store[HDOC].update(hdoc) @@ -438,7 +438,8 @@ class MemoryStore(object): if not self.producer.is_queue_empty(): return - logger.info("Writing messages to Soledad...") + if any(map(lambda i: not empty(i), (self._new, self._dirty))): + logger.info("Writing messages to Soledad...") # TODO change for lock, and make the property access # is accquired @@ -885,6 +886,7 @@ class MemoryStore(object): # TODO expunge should add itself as a callback to the ongoing # writes. soledad_store = self._permanent_store + all_deleted = [] try: # 1. Stop the writing call -- cgit v1.2.3 From 8201146254a204fec92395bf497a2a6f76274b85 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Sun, 2 Feb 2014 16:26:58 -0400 Subject: re-add expunge deferred --- src/leap/mail/imap/memorystore.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) (limited to 'src/leap/mail/imap/memorystore.py') diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py index 542e227..0632d1c 100644 --- a/src/leap/mail/imap/memorystore.py +++ b/src/leap/mail/imap/memorystore.py @@ -873,13 +873,15 @@ class MemoryStore(object): self.remove_message(mbox, uid) return mem_deleted - def expunge(self, mbox): + def expunge(self, mbox, observer): """ Remove all messages flagged \\Deleted, from the Memory Store and from the permanent store also. :param mbox: the mailbox :type mbox: str or unicode + :param observer: a deferred that will be fired when expunge is done + :type observer: Deferred :return: a list of UIDs :rtype: list """ @@ -910,6 +912,11 @@ class MemoryStore(object): else: sol_deleted = [] + try: + self._known_uids[mbox].difference_update(set(sol_deleted)) + except Exception as exc: + logger.exception(exc) + # 2. Delete all messages marked as deleted in memory. mem_deleted = self.remove_all_deleted(mbox) @@ -919,6 +926,7 @@ class MemoryStore(object): logger.exception(exc) finally: self._start_write_loop() + observer.callback(True) return all_deleted # Dump-to-disk controls. -- cgit v1.2.3 From 23e28bae2c3cb74e00e29ee8add0b73adeb65c2b Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Tue, 4 Feb 2014 10:57:49 -0400 Subject: fixes after review * Some more docstring completion/fixes. * Removed unneeded str coertion. * Handle mailbox name in logs. * Separate manhole boilerplate into its own file. --- src/leap/mail/imap/memorystore.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'src/leap/mail/imap/memorystore.py') diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py index 0632d1c..195cef7 100644 --- a/src/leap/mail/imap/memorystore.py +++ b/src/leap/mail/imap/memorystore.py @@ -475,12 +475,13 @@ class MemoryStore(object): def get_last_uid(self, mbox): """ - Get the highest UID for a given mbox. + Return the highest UID for a given mbox. It will be the highest between the highest uid in the message store for the mailbox, and the soledad integer cache. :param mbox: the mailbox :type mbox: str or unicode + :rtype: int """ uids = self.get_uids(mbox) last_mem_uid = uids and max(uids) or 0 -- cgit v1.2.3 From 498c6745abd91652dfef94045dfe005be0422bf2 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Tue, 4 Feb 2014 15:39:52 -0400 Subject: Rebased dreb's commit to update sizes dictionary for faster calculation of sizes. https://github.com/andrejb/leap_mail/commit/8b88e85fab3c2b75da16b16c8d492c001b8076c6 --- src/leap/mail/imap/memorystore.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) (limited to 'src/leap/mail/imap/memorystore.py') diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py index 195cef7..a99148f 100644 --- a/src/leap/mail/imap/memorystore.py +++ b/src/leap/mail/imap/memorystore.py @@ -106,6 +106,12 @@ class MemoryStore(object): # Internal Storage: messages self._msg_store = {} + # Sizes + """ + {'mbox, uid': } + """ + self._sizes = {} + # Internal Storage: payload-hash """ {'phash': weakreaf.proxy(dict)} @@ -347,8 +353,12 @@ class MemoryStore(object): for key in seq: if key in store and empty(store.get(key)): store.pop(key) + prune((FDOC, HDOC, CDOCS, DOCS_ID), store) + # Update memory store size + self._sizes[key] = size(self._msg_store[key]) + def get_docid_for_fdoc(self, mbox, uid): """ Return Soledad document id for the flags-doc for a given mbox and uid, @@ -417,6 +427,9 @@ class MemoryStore(object): self._new.discard(key) self._dirty.discard(key) self._msg_store.pop(key, None) + if key in self._sizes: + del self._sizes[key] + except Exception as exc: logger.exception(exc) @@ -958,4 +971,4 @@ class MemoryStore(object): :rtype: int """ - return size.get_size(self._msg_store) + return reduce(lambda x, y: x + y, self._sizes, 0) -- cgit v1.2.3 From 423624e5f2c4d3f8cfe8f15f4d6649ed3eea11dc Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Wed, 5 Feb 2014 11:48:20 -0400 Subject: fix expunge deferreds so they wait --- src/leap/mail/imap/memorystore.py | 38 +++++++++++++++++++++++++------------- 1 file changed, 25 insertions(+), 13 deletions(-) (limited to 'src/leap/mail/imap/memorystore.py') diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py index 195cef7..f4a4522 100644 --- a/src/leap/mail/imap/memorystore.py +++ b/src/leap/mail/imap/memorystore.py @@ -879,30 +879,43 @@ class MemoryStore(object): Remove all messages flagged \\Deleted, from the Memory Store and from the permanent store also. + It first queues up a last write, and wait for the deferreds to be done + before continuing. + :param mbox: the mailbox :type mbox: str or unicode :param observer: a deferred that will be fired when expunge is done :type observer: Deferred - :return: a list of UIDs - :rtype: list """ - # TODO expunge should add itself as a callback to the ongoing - # writes. soledad_store = self._permanent_store - all_deleted = [] - try: # 1. Stop the writing call self._stop_write_loop() # 2. Enqueue a last write. - #self.write_messages(soledad_store) - # 3. Should wait on the writebacks to finish ??? - # FIXME wait for this, and add all the rest of the method - # as a callback!!! + self.write_messages(soledad_store) + # 3. Wait on the writebacks to finish + + pending_deferreds = (self._new_deferreds.get(mbox, []) + + self._dirty_deferreds.get(mbox, [])) + d1 = defer.gatherResults(pending_deferreds, consumeErrors=True) + d1.addCallback( + self._delete_from_soledad_and_memory, mbox, observer) except Exception as exc: logger.exception(exc) - # Now, we...: + def _delete_from_soledad_and_memory(self, result, mbox, observer): + """ + Remove all messages marked as deleted from soledad and memory. + + :param result: ignored. the result of the deferredList that triggers + this as a callback from `expunge`. + :param mbox: the mailbox + :type mbox: str or unicode + :param observer: a deferred that will be fired when expunge is done + :type observer: Deferred + """ + all_deleted = [] + soledad_store = self._permanent_store try: # 1. Delete all messages marked as deleted in soledad. @@ -927,8 +940,7 @@ class MemoryStore(object): logger.exception(exc) finally: self._start_write_loop() - observer.callback(True) - return all_deleted + observer.callback(all_deleted) # Dump-to-disk controls. -- cgit v1.2.3 From bf9db4b5381230b4e2a1e1d2d4b2acc31c29ff87 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Wed, 5 Feb 2014 16:47:36 -0400 Subject: Fix the fallback for the memoized call for bodies/content. Changed to "empty" to consider empty strings too. --- src/leap/mail/imap/memorystore.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) (limited to 'src/leap/mail/imap/memorystore.py') diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py index f4a4522..9c7973d 100644 --- a/src/leap/mail/imap/memorystore.py +++ b/src/leap/mail/imap/memorystore.py @@ -230,10 +230,11 @@ class MemoryStore(object): self._add_message(mbox, uid, message, notify_on_disk) self._new.add(key) - def log_add(result): - log.msg("message save: %s" % result) - return result - observer.addCallback(log_add) + # XXX use this while debugging the callback firing, + # remove after unittesting this. + #def log_add(result): + #return result + #observer.addCallback(log_add) if notify_on_disk: # We store this deferred so we can keep track of the pending -- cgit v1.2.3 From 362aaec0897261973e58b4282f5c054985d1f113 Mon Sep 17 00:00:00 2001 From: drebs Date: Thu, 6 Feb 2014 15:46:01 -0200 Subject: Flush IMAP data to disk when stopping. Closes #5095. --- src/leap/mail/imap/memorystore.py | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) (limited to 'src/leap/mail/imap/memorystore.py') diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py index 9c7973d..3eba59a 100644 --- a/src/leap/mail/imap/memorystore.py +++ b/src/leap/mail/imap/memorystore.py @@ -875,6 +875,15 @@ class MemoryStore(object): self.remove_message(mbox, uid) return mem_deleted + def stop_and_flush(self): + """ + Stop the write loop and trigger a write to the producer. + """ + self._stop_write_loop() + if self._permanent_store is not None: + self.write_messages(self._permanent_store) + self.producer.flush() + def expunge(self, mbox, observer): """ Remove all messages flagged \\Deleted, from the Memory Store @@ -890,12 +899,9 @@ class MemoryStore(object): """ soledad_store = self._permanent_store try: - # 1. Stop the writing call - self._stop_write_loop() - # 2. Enqueue a last write. - self.write_messages(soledad_store) - # 3. Wait on the writebacks to finish - + # Stop and trigger last write + self.stop_and_flush() + # Wait on the writebacks to finish pending_deferreds = (self._new_deferreds.get(mbox, []) + self._dirty_deferreds.get(mbox, [])) d1 = defer.gatherResults(pending_deferreds, consumeErrors=True) @@ -962,6 +968,10 @@ class MemoryStore(object): # are done (gatherResults) return getattr(self, self.WRITING_FLAG) + @property + def permanent_store(self): + return self._permanent_store + # Memory management. def get_size(self): -- cgit v1.2.3 From 12ffea333922d99ee7f7b4ab2cd46cfcec6a0d05 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Mon, 17 Feb 2014 11:31:40 -0400 Subject: fix get_size call --- src/leap/mail/imap/memorystore.py | 6 ++++++ 1 file changed, 6 insertions(+) (limited to 'src/leap/mail/imap/memorystore.py') diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py index ed2b3f2..d0321ae 100644 --- a/src/leap/mail/imap/memorystore.py +++ b/src/leap/mail/imap/memorystore.py @@ -350,6 +350,12 @@ class MemoryStore(object): continue self._phash_store[phash] = weakref.proxy(referenciable_cdoc) + # Update memory store size + # XXX this should use [mbox][uid] + key = mbox, uid + self._sizes[key] = size.get_size(self._fdoc_store[key]) + # TODO add hdoc and cdocs sizes too + def prune(seq, store): for key in seq: if key in store and empty(store.get(key)): -- cgit v1.2.3 From 06556ec6dc56a4859736fc2782779ee2eb9c1f55 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Wed, 5 Feb 2014 23:44:23 -0400 Subject: defer parse to thread --- src/leap/mail/imap/memorystore.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'src/leap/mail/imap/memorystore.py') diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py index d0321ae..8deddda 100644 --- a/src/leap/mail/imap/memorystore.py +++ b/src/leap/mail/imap/memorystore.py @@ -230,6 +230,8 @@ class MemoryStore(object): be fired. :type notify_on_disk: bool """ + from twisted.internet import reactor + log.msg("adding new doc to memstore %r (%r)" % (mbox, uid)) key = mbox, uid @@ -251,7 +253,7 @@ class MemoryStore(object): if not notify_on_disk: # Caller does not care, just fired and forgot, so we pass # a defer that will inmediately have its callback triggered. - observer.callback(uid) + reactor.callLater(0, observer.callback, uid) def put_message(self, mbox, uid, message, notify_on_disk=True): """ -- cgit v1.2.3 From bd83f834920709db3350c58dedd3cd2181c1b2cc Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Thu, 6 Feb 2014 02:28:54 -0400 Subject: prefetch flag docs --- src/leap/mail/imap/memorystore.py | 53 +++++++++++++++++++++++++++++++++++---- 1 file changed, 48 insertions(+), 5 deletions(-) (limited to 'src/leap/mail/imap/memorystore.py') diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py index 8deddda..00cf2cc 100644 --- a/src/leap/mail/imap/memorystore.py +++ b/src/leap/mail/imap/memorystore.py @@ -49,6 +49,11 @@ logger = logging.getLogger(__name__) # soledad storage, in seconds. SOLEDAD_WRITE_PERIOD = 10 +FDOC = MessagePartType.fdoc.key +HDOC = MessagePartType.hdoc.key +CDOCS = MessagePartType.cdocs.key +DOCS_ID = MessagePartType.docs_id.key + @contextlib.contextmanager def set_bool_flag(obj, att): @@ -104,6 +109,11 @@ class MemoryStore(object): self._write_period = write_period # Internal Storage: messages + # TODO this probably will have better access times if we + # use msg_store[mbox][uid] insted of the current key scheme. + """ + key is str(mbox,uid) + """ self._msg_store = {} # Sizes @@ -297,11 +307,6 @@ class MemoryStore(object): key = mbox, uid msg_dict = message.as_dict() - FDOC = MessagePartType.fdoc.key - HDOC = MessagePartType.hdoc.key - CDOCS = MessagePartType.cdocs.key - DOCS_ID = MessagePartType.docs_id.key - try: store = self._msg_store[key] except KeyError: @@ -580,6 +585,44 @@ class MemoryStore(object): if self._permanent_store: self._permanent_store.write_last_uid(mbox, value) + def load_flag_docs(self, mbox, flag_docs): + """ + Load the flag documents for the given mbox. + Used during initial flag docs prefetch. + + :param mbox: the mailbox + :type mbox: str or unicode + :param flag_docs: a dict with the content for the flag docs. + :type flag_docs: dict + """ + # We can do direct assignments cause we know this will only + # be called during initialization of the mailbox. + msg_store = self._msg_store + for uid in flag_docs: + key = mbox, uid + msg_store[key] = {} + msg_store[key][FDOC] = ReferenciableDict(flag_docs[uid]) + + def all_flags(self, mbox): + """ + Return a dictionary with all the flags for a given mbox. + + :param mbox: the mailbox + :type mbox: str or unicode + :rtype: dict + """ + flags_dict = {} + uids = self.get_uids(mbox) + store = self._msg_store + for uid in uids: + key = mbox, uid + try: + flags = store[key][FDOC][fields.FLAGS_KEY] + flags_dict[uid] = flags + except KeyError: + continue + return flags_dict + # Counting sheeps... def count_new_mbox(self, mbox): -- cgit v1.2.3 From ff3a6a640fdb345449a5f9cd3379bbaefa36111e Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Thu, 6 Feb 2014 15:46:17 -0400 Subject: take recent count from memstore --- src/leap/mail/imap/memorystore.py | 1 - 1 file changed, 1 deletion(-) (limited to 'src/leap/mail/imap/memorystore.py') diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py index 00cf2cc..bc40a8e 100644 --- a/src/leap/mail/imap/memorystore.py +++ b/src/leap/mail/imap/memorystore.py @@ -827,7 +827,6 @@ class MemoryStore(object): # Recent Flags - # TODO --- nice but unused def set_recent_flag(self, mbox, uid): """ Set the `Recent` flag for a given mailbox and UID. -- cgit v1.2.3 From b849dbb2e79427aabb7c6d2d6364c73778d548d3 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Thu, 6 Feb 2014 15:46:52 -0400 Subject: increase writeback period for debug --- src/leap/mail/imap/memorystore.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/leap/mail/imap/memorystore.py') diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py index bc40a8e..4a6a3ed 100644 --- a/src/leap/mail/imap/memorystore.py +++ b/src/leap/mail/imap/memorystore.py @@ -47,7 +47,7 @@ logger = logging.getLogger(__name__) # The default period to do writebacks to the permanent # soledad storage, in seconds. -SOLEDAD_WRITE_PERIOD = 10 +SOLEDAD_WRITE_PERIOD = 30 FDOC = MessagePartType.fdoc.key HDOC = MessagePartType.hdoc.key -- cgit v1.2.3 From fec92585f933b6ce9b8c2701a9e28a8b7490d32a Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Thu, 6 Feb 2014 19:01:58 -0400 Subject: enable memory-only store --- src/leap/mail/imap/memorystore.py | 33 +++++++++++++++++++++++++++------ 1 file changed, 27 insertions(+), 6 deletions(-) (limited to 'src/leap/mail/imap/memorystore.py') diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py index 4a6a3ed..04e0af6 100644 --- a/src/leap/mail/imap/memorystore.py +++ b/src/leap/mail/imap/memorystore.py @@ -195,11 +195,17 @@ class MemoryStore(object): # We can start the write loop right now, why wait? self._start_write_loop() + else: + # We have a memory-only store. + self.producer = None + self._write_loop = None def _start_write_loop(self): """ Start loop for writing to disk database. """ + if self._write_loop is None: + return if not self._write_loop.running: self._write_loop.start(self._write_period, now=True) @@ -207,6 +213,8 @@ class MemoryStore(object): """ Stop loop for writing to disk database. """ + if self._write_loop is None: + return if self._write_loop.running: self._write_loop.stop() @@ -961,6 +969,12 @@ class MemoryStore(object): :type observer: Deferred """ soledad_store = self._permanent_store + if soledad_store is None: + # just-in memory store, easy then. + self._delete_from_memory(mbox, observer) + return + + # We have a soledad storage. try: # Stop and trigger last write self.stop_and_flush() @@ -973,6 +987,18 @@ class MemoryStore(object): except Exception as exc: logger.exception(exc) + def _delete_from_memory(self, mbox, observer): + """ + Remove all messages marked as deleted from soledad and memory. + + :param mbox: the mailbox + :type mbox: str or unicode + :param observer: a deferred that will be fired when expunge is done + :type observer: Deferred + """ + mem_deleted = self.remove_all_deleted(mbox) + observer.callback(mem_deleted) + def _delete_from_soledad_and_memory(self, result, mbox, observer): """ Remove all messages marked as deleted from soledad and memory. @@ -989,12 +1015,7 @@ class MemoryStore(object): try: # 1. Delete all messages marked as deleted in soledad. - - # XXX this could be deferred for faster operation. - if soledad_store: - sol_deleted = soledad_store.remove_all_deleted(mbox) - else: - sol_deleted = [] + sol_deleted = soledad_store.remove_all_deleted(mbox) try: self._known_uids[mbox].difference_update(set(sol_deleted)) -- cgit v1.2.3 From a5c45803dfdc62f22db592d1e542fcbd07170a43 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Thu, 6 Feb 2014 19:44:25 -0400 Subject: make last_uid a defaultdict --- src/leap/mail/imap/memorystore.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/leap/mail/imap/memorystore.py') diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py index 04e0af6..3f3cf83 100644 --- a/src/leap/mail/imap/memorystore.py +++ b/src/leap/mail/imap/memorystore.py @@ -163,7 +163,7 @@ class MemoryStore(object): {'mbox-a': 42, 'mbox-b': 23} """ - self._last_uid = {} + self._last_uid = defaultdict(lambda: 0) """ known-uids keeps a count of the uids that soledad knows for a given -- cgit v1.2.3 From 6586c21a12bfa0d9026068629a9d34203ad577c7 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Fri, 7 Feb 2014 02:27:58 -0400 Subject: change internal storage and keying scheme in memstore --- src/leap/mail/imap/memorystore.py | 187 +++++++++++++++++++------------------- 1 file changed, 94 insertions(+), 93 deletions(-) (limited to 'src/leap/mail/imap/memorystore.py') diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py index 3f3cf83..b198e12 100644 --- a/src/leap/mail/imap/memorystore.py +++ b/src/leap/mail/imap/memorystore.py @@ -109,13 +109,14 @@ class MemoryStore(object): self._write_period = write_period # Internal Storage: messages - # TODO this probably will have better access times if we - # use msg_store[mbox][uid] insted of the current key scheme. """ - key is str(mbox,uid) + Flags document store. + _fdoc_store[mbox][uid] = { 'content': 'aaa' } """ - self._msg_store = {} + self._fdoc_store = defaultdict(lambda: defaultdict( + lambda: ReferenciableDict({}))) +<<<<<<< HEAD # Sizes """ {'mbox, uid': } @@ -123,10 +124,24 @@ class MemoryStore(object): self._sizes = {} # Internal Storage: payload-hash +======= + # Internal Storage: content-hash:hdoc +>>>>>>> change internal storage and keying scheme in memstore """ - {'phash': weakreaf.proxy(dict)} + hdoc-store keeps references to + the header-documents indexed by content-hash. + + {'chash': { dict-stuff } + } + """ + self._hdoc_store = defaultdict(lambda: ReferenciableDict({})) + + # Internal Storage: payload-hash:cdoc + """ + content-docs stored by payload-hash + {'phash': { dict-stuff } } """ - self._phash_store = {} + self._cdoc_store = defaultdict(lambda: ReferenciableDict({})) # Internal Storage: content-hash:fdoc """ @@ -309,26 +324,12 @@ class MemoryStore(object): Helper method, called by both create_message and put_message. See those for parameter documentation. """ - # XXX have to differentiate between notify_new and notify_dirty - # TODO defaultdict the hell outa here... - - key = mbox, uid msg_dict = message.as_dict() - try: - store = self._msg_store[key] - except KeyError: - self._msg_store[key] = {FDOC: {}, - HDOC: {}, - CDOCS: {}, - DOCS_ID: {}} - store = self._msg_store[key] - fdoc = msg_dict.get(FDOC, None) - if fdoc: - if not store.get(FDOC, None): - store[FDOC] = ReferenciableDict({}) - store[FDOC].update(fdoc) + if fdoc is not None: + fdoc_store = self._fdoc_store[mbox][uid] + fdoc_store.update(fdoc) # content-hash indexing chash = fdoc.get(fields.CONTENT_HASH_KEY) @@ -337,33 +338,21 @@ class MemoryStore(object): chash_fdoc_store[chash] = {} chash_fdoc_store[chash][mbox] = weakref.proxy( - store[FDOC]) + fdoc_store) hdoc = msg_dict.get(HDOC, None) if hdoc is not None: - if not store.get(HDOC, None): - store[HDOC] = ReferenciableDict({}) - store[HDOC].update(hdoc) - - docs_id = msg_dict.get(DOCS_ID, None) - if docs_id: - if not store.get(DOCS_ID, None): - store[DOCS_ID] = {} - store[DOCS_ID].update(docs_id) + chash = hdoc.get(fields.CONTENT_HASH_KEY) + hdoc_store = self._hdoc_store[chash] + hdoc_store.update(hdoc) cdocs = message.cdocs - for cdoc_key in cdocs.keys(): - if not store.get(CDOCS, None): - store[CDOCS] = {} - - cdoc = cdocs[cdoc_key] - # first we make it weak-referenciable - referenciable_cdoc = ReferenciableDict(cdoc) - store[CDOCS][cdoc_key] = referenciable_cdoc + for cdoc in cdocs.values(): phash = cdoc.get(fields.PAYLOAD_HASH_KEY, None) if not phash: continue - self._phash_store[phash] = weakref.proxy(referenciable_cdoc) + cdoc_store = self._cdoc_store[phash] + cdoc_store.update(cdoc) # Update memory store size # XXX this should use [mbox][uid] @@ -371,15 +360,13 @@ class MemoryStore(object): self._sizes[key] = size.get_size(self._fdoc_store[key]) # TODO add hdoc and cdocs sizes too - def prune(seq, store): - for key in seq: - if key in store and empty(store.get(key)): - store.pop(key) - - prune((FDOC, HDOC, CDOCS, DOCS_ID), store) + # XXX what to do with this? + #docs_id = msg_dict.get(DOCS_ID, None) + #if docs_id is not None: + #if not store.get(DOCS_ID, None): + #store[DOCS_ID] = {} + #store[DOCS_ID].update(docs_id) - # Update memory store size - self._sizes[key] = size(self._msg_store[key]) def get_docid_for_fdoc(self, mbox, uid): """ @@ -413,18 +400,20 @@ class MemoryStore(object): :return: MessageWrapper or None """ key = mbox, uid - FDOC = MessagePartType.fdoc.key - msg_dict = self._msg_store.get(key, None) - if empty(msg_dict): + fdoc = self._fdoc_store[mbox][uid] + if empty(fdoc): return None + new, dirty = self._get_new_dirty_state(key) if flags_only: - return MessageWrapper(fdoc=msg_dict[FDOC], + return MessageWrapper(fdoc=fdoc, new=new, dirty=dirty, memstore=weakref.proxy(self)) else: - return MessageWrapper(from_dict=msg_dict, + chash = fdoc.get(fields.CONTENT_HASH_KEY) + hdoc = self._hdoc_store[chash] + return MessageWrapper(fdoc=fdoc, hdoc=hdoc, new=new, dirty=dirty, memstore=weakref.proxy(self)) @@ -448,10 +437,14 @@ class MemoryStore(object): key = mbox, uid self._new.discard(key) self._dirty.discard(key) +<<<<<<< HEAD self._msg_store.pop(key, None) if key in self._sizes: del self._sizes[key] +======= + self._fdoc_store[mbox].pop(uid, None) +>>>>>>> change internal storage and keying scheme in memstore except Exception as exc: logger.exception(exc) @@ -494,8 +487,7 @@ class MemoryStore(object): :type mbox: str or unicode :rtype: list """ - all_keys = self._msg_store.keys() - return [uid for m, uid in all_keys if m == mbox] + return self._fdoc_store[mbox].keys() def get_soledad_known_uids(self, mbox): """ @@ -605,11 +597,9 @@ class MemoryStore(object): """ # We can do direct assignments cause we know this will only # be called during initialization of the mailbox. - msg_store = self._msg_store + fdoc_store = self._fdoc_store[mbox] for uid in flag_docs: - key = mbox, uid - msg_store[key] = {} - msg_store[key][FDOC] = ReferenciableDict(flag_docs[uid]) + fdoc_store[uid] = ReferenciableDict(flag_docs[uid]) def all_flags(self, mbox): """ @@ -621,11 +611,10 @@ class MemoryStore(object): """ flags_dict = {} uids = self.get_uids(mbox) - store = self._msg_store + fdoc_store = self._fdoc_store for uid in uids: - key = mbox, uid try: - flags = store[key][FDOC][fields.FLAGS_KEY] + flags = fdoc_store[uid][fields.FLAGS_KEY] flags_dict[uid] = flags except KeyError: continue @@ -635,7 +624,7 @@ class MemoryStore(object): def count_new_mbox(self, mbox): """ - Count the new messages by inbox. + Count the new messages by mailbox. :param mbox: the mailbox :type mbox: str or unicode @@ -653,6 +642,32 @@ class MemoryStore(object): """ return len(self._new) + def count(self, mbox): + """ + Return the count of messages for a given mbox. + + :param mbox: the mailbox + :type mbox: str or unicode + :return: number of messages + :rtype: int + """ + return len(self._fdoc_store[mbox]) + + def unseen_iter(self, mbox): + """ + Get an iterator for the message UIDs with no `seen` flag + for a given mailbox. + + :param mbox: the mailbox + :type mbox: str or unicode + :return: iterator through unseen message doc UIDs + :rtype: iterable + """ + fdocs = self._fdoc_store[mbox] + return [uid for uid, value + in fdocs.items() + if fields.SEEN_FLAG not in value["flags"]] + def get_cdoc_from_phash(self, phash): """ Return a content-document by its payload-hash. @@ -661,7 +676,7 @@ class MemoryStore(object): :type phash: str or unicode :rtype: MessagePartDoc """ - doc = self._phash_store.get(phash, None) + doc = self._cdoc_store.get(phash, None) # XXX return None for consistency? @@ -716,15 +731,15 @@ class MemoryStore(object): content=fdoc, doc_id=None) - def all_msg_iter(self): + def iter_fdoc_keys(self): """ - Return generator that iterates through all messages in the store. - - :return: generator of MessageWrappers - :rtype: generator + Return a generator through all the mbox, uid keys in the flags-doc + store. """ - return (self.get_message(*key) - for key in sorted(self._msg_store.keys())) + fdoc_store = self._fdoc_store + for mbox in fdoc_store: + for uid in fdoc_store[mbox]: + yield mbox, uid def all_new_dirty_msg_iter(self): """ @@ -734,23 +749,9 @@ class MemoryStore(object): :rtype: generator """ return (self.get_message(*key) - for key in sorted(self._msg_store.keys()) + for key in sorted(self.iter_fdoc_keys()) if key in self._new or key in self._dirty) - def all_msg_dict_for_mbox(self, mbox): - """ - Return all the message dicts for a given mbox. - - :param mbox: the mailbox - :type mbox: str or unicode - :return: list of dictionaries - :rtype: list - """ - # This *needs* to return a fixed sequence. Otherwise the dictionary len - # will change during iteration, when we modify it - return [self._msg_store[(mb, uid)] - for mb, uid in self._msg_store if mb == mbox] - def all_deleted_uid_iter(self, mbox): """ Return a list with the UIDs for all messags @@ -763,11 +764,10 @@ class MemoryStore(object): """ # This *needs* to return a fixed sequence. Otherwise the dictionary len # will change during iteration, when we modify it - all_deleted = [ - msg['fdoc']['uid'] for msg in self.all_msg_dict_for_mbox(mbox) - if msg.get('fdoc', None) - and fields.DELETED_FLAG in msg['fdoc']['flags']] - return all_deleted + fdocs = self._fdoc_store[mbox] + return [uid for uid, value + in fdocs.items() + if fields.DELETED_FLAG in value["flags"]] # new, dirty flags @@ -780,6 +780,7 @@ class MemoryStore(object): :return: tuple of bools :rtype: tuple """ + # TODO change indexing of sets to [mbox][key] too. # XXX should return *first* the news, and *then* the dirty... return map(lambda _set: key in _set, (self._new, self._dirty)) -- cgit v1.2.3 From 813db4a356141592337f39f9c801203367c63193 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Fri, 7 Feb 2014 02:54:52 -0400 Subject: remove hdoc copy since it's in its own structure now --- src/leap/mail/imap/memorystore.py | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) (limited to 'src/leap/mail/imap/memorystore.py') diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py index b198e12..4156c0b 100644 --- a/src/leap/mail/imap/memorystore.py +++ b/src/leap/mail/imap/memorystore.py @@ -592,7 +592,8 @@ class MemoryStore(object): :param mbox: the mailbox :type mbox: str or unicode - :param flag_docs: a dict with the content for the flag docs. + :param flag_docs: a dict with the content for the flag docs, indexed + by uid. :type flag_docs: dict """ # We can do direct assignments cause we know this will only @@ -601,6 +602,20 @@ class MemoryStore(object): for uid in flag_docs: fdoc_store[uid] = ReferenciableDict(flag_docs[uid]) + def load_header_docs(self, header_docs): + """ + Load the flag documents for the given mbox. + Used during header docs prefetch, and during cache after + a read from soledad if the hdoc property in message did not + find its value in here. + + :param flag_docs: a dict with the content for the flag docs. + :type flag_docs: dict + """ + hdoc_store = self._hdoc_store + for chash in header_docs: + hdoc_store[chash] = ReferenciableDict(header_docs[chash]) + def all_flags(self, mbox): """ Return a dictionary with all the flags for a given mbox. -- cgit v1.2.3 From b92e63c316c1cf9f8b6481dbfa70737acfb3eee9 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Fri, 7 Feb 2014 05:50:55 -0400 Subject: separate better dirty/new flags; add cdocs --- src/leap/mail/imap/memorystore.py | 88 ++++++++++++++++++++++++++------------- 1 file changed, 58 insertions(+), 30 deletions(-) (limited to 'src/leap/mail/imap/memorystore.py') diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py index 4156c0b..ee3ee92 100644 --- a/src/leap/mail/imap/memorystore.py +++ b/src/leap/mail/imap/memorystore.py @@ -24,6 +24,7 @@ import weakref from collections import defaultdict from copy import copy +from itertools import chain from twisted.internet import defer from twisted.internet.task import LoopingCall @@ -33,7 +34,7 @@ from zope.interface import implements from leap.common.check import leap_assert_type from leap.mail import size from leap.mail.decorators import deferred_to_thread -from leap.mail.utils import empty +from leap.mail.utils import empty, phash_iter from leap.mail.messageflow import MessageProducer from leap.mail.imap import interfaces from leap.mail.imap.fields import fields @@ -110,13 +111,12 @@ class MemoryStore(object): # Internal Storage: messages """ - Flags document store. + flags document store. _fdoc_store[mbox][uid] = { 'content': 'aaa' } """ self._fdoc_store = defaultdict(lambda: defaultdict( lambda: ReferenciableDict({}))) -<<<<<<< HEAD # Sizes """ {'mbox, uid': } @@ -124,9 +124,14 @@ class MemoryStore(object): self._sizes = {} # Internal Storage: payload-hash -======= + """ + fdocs:doc-id store, stores document IDs for putting + the dirty flags-docs. + """ + self._fdoc_id_store = defaultdict(lambda: defaultdict( + lambda: '')) + # Internal Storage: content-hash:hdoc ->>>>>>> change internal storage and keying scheme in memstore """ hdoc-store keeps references to the header-documents indexed by content-hash. @@ -360,14 +365,6 @@ class MemoryStore(object): self._sizes[key] = size.get_size(self._fdoc_store[key]) # TODO add hdoc and cdocs sizes too - # XXX what to do with this? - #docs_id = msg_dict.get(DOCS_ID, None) - #if docs_id is not None: - #if not store.get(DOCS_ID, None): - #store[DOCS_ID] = {} - #store[DOCS_ID].update(docs_id) - - def get_docid_for_fdoc(self, mbox, uid): """ Return Soledad document id for the flags-doc for a given mbox and uid, @@ -379,13 +376,18 @@ class MemoryStore(object): :type uid: int :rtype: unicode or None """ - fdoc = self._permanent_store.get_flags_doc(mbox, uid) - if empty(fdoc): - return None - doc_id = fdoc.doc_id + doc_id = self._fdoc_id_store[mbox][uid] + + if empty(doc_id): + fdoc = self._permanent_store.get_flags_doc(mbox, uid) + if empty(fdoc.content): + return None + doc_id = fdoc.doc_id + self._fdoc_id_store[mbox][uid] = doc_id + return doc_id - def get_message(self, mbox, uid, flags_only=False): + def get_message(self, mbox, uid, dirtystate="none", flags_only=False): """ Get a MessageWrapper for the given mbox and uid combination. @@ -393,19 +395,32 @@ class MemoryStore(object): :type mbox: str or unicode :param uid: the message UID :type uid: int + :param dirtystate: one of `dirty`, `new` or `none` (default) + :type dirtystate: str :param flags_only: whether the message should carry only a reference to the flags document. :type flags_only: bool + : :return: MessageWrapper or None """ + if dirtystate == "dirty": + flags_only = True + key = mbox, uid fdoc = self._fdoc_store[mbox][uid] if empty(fdoc): return None - new, dirty = self._get_new_dirty_state(key) + new, dirty = False, False + if dirtystate == "none": + new, dirty = self._get_new_dirty_state(key) + if dirtystate == "dirty": + new, dirty = False, True + if dirtystate == "new": + new, dirty = True, False + if flags_only: return MessageWrapper(fdoc=fdoc, new=new, dirty=dirty, @@ -413,7 +428,22 @@ class MemoryStore(object): else: chash = fdoc.get(fields.CONTENT_HASH_KEY) hdoc = self._hdoc_store[chash] - return MessageWrapper(fdoc=fdoc, hdoc=hdoc, + if empty(hdoc): + hdoc = self._permanent_store.get_headers_doc(chash) + if not empty(hdoc.content): + self._hdoc_store[chash] = hdoc.content + hdoc = hdoc.content + cdocs = None + + pmap = hdoc.get(fields.PARTS_MAP_KEY, None) + if new and pmap is not None: + # take the different cdocs for write... + cdoc_store = self._cdoc_store + cdocs_list = phash_iter(hdoc) + cdocs = dict(enumerate( + [cdoc_store[phash] for phash in cdocs_list], 1)) + + return MessageWrapper(fdoc=fdoc, hdoc=hdoc, cdocs=cdocs, new=new, dirty=dirty, memstore=weakref.proxy(self)) @@ -437,14 +467,9 @@ class MemoryStore(object): key = mbox, uid self._new.discard(key) self._dirty.discard(key) -<<<<<<< HEAD - self._msg_store.pop(key, None) if key in self._sizes: del self._sizes[key] - -======= self._fdoc_store[mbox].pop(uid, None) ->>>>>>> change internal storage and keying scheme in memstore except Exception as exc: logger.exception(exc) @@ -464,7 +489,7 @@ class MemoryStore(object): # XXX this could return the deferred for all the enqueued operations if not self.producer.is_queue_empty(): - return + return False if any(map(lambda i: not empty(i), (self._new, self._dirty))): logger.info("Writing messages to Soledad...") @@ -598,6 +623,7 @@ class MemoryStore(object): """ # We can do direct assignments cause we know this will only # be called during initialization of the mailbox. + fdoc_store = self._fdoc_store[mbox] for uid in flag_docs: fdoc_store[uid] = ReferenciableDict(flag_docs[uid]) @@ -626,7 +652,8 @@ class MemoryStore(object): """ flags_dict = {} uids = self.get_uids(mbox) - fdoc_store = self._fdoc_store + fdoc_store = self._fdoc_store[mbox] + for uid in uids: try: flags = fdoc_store[uid][fields.FLAGS_KEY] @@ -763,9 +790,10 @@ class MemoryStore(object): :return: generator of MessageWrappers :rtype: generator """ - return (self.get_message(*key) - for key in sorted(self.iter_fdoc_keys()) - if key in self._new or key in self._dirty) + gm = self.get_message + new = (gm(*key) for key in self._new) + dirty = (gm(*key, flags_only=True) for key in self._dirty) + return chain(new, dirty) def all_deleted_uid_iter(self, mbox): """ -- cgit v1.2.3 From 9ffcaa09c2d6a57f3f34350298eff8412b540bc9 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Fri, 7 Feb 2014 07:27:38 -0400 Subject: defer_to_thread the bulk of write operations and batch the notifications back to the memorystore, within the reactor thread. --- src/leap/mail/imap/memorystore.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) (limited to 'src/leap/mail/imap/memorystore.py') diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py index ee3ee92..786a9c4 100644 --- a/src/leap/mail/imap/memorystore.py +++ b/src/leap/mail/imap/memorystore.py @@ -380,7 +380,7 @@ class MemoryStore(object): if empty(doc_id): fdoc = self._permanent_store.get_flags_doc(mbox, uid) - if empty(fdoc.content): + if empty(fdoc) or empty(fdoc.content): return None doc_id = fdoc.doc_id self._fdoc_id_store[mbox][uid] = doc_id @@ -706,9 +706,10 @@ class MemoryStore(object): :rtype: iterable """ fdocs = self._fdoc_store[mbox] + return [uid for uid, value in fdocs.items() - if fields.SEEN_FLAG not in value["flags"]] + if fields.SEEN_FLAG not in value.get(fields.FLAGS_KEY, [])] def get_cdoc_from_phash(self, phash): """ @@ -760,7 +761,7 @@ class MemoryStore(object): # We want to create a new one in this case. # Hmmm what if the deletion is un-done?? We would end with a # duplicate... - if fdoc and fields.DELETED_FLAG in fdoc[fields.FLAGS_KEY]: + if fdoc and fields.DELETED_FLAG in fdoc.get(fields.FLAGS_KEY, []): return None uid = fdoc[fields.UID_KEY] @@ -810,7 +811,7 @@ class MemoryStore(object): fdocs = self._fdoc_store[mbox] return [uid for uid, value in fdocs.items() - if fields.DELETED_FLAG in value["flags"]] + if fields.DELETED_FLAG in value.get(fields.FLAGS_KEY, [])] # new, dirty flags -- cgit v1.2.3 From 4338368aa2ba0efaee742e9000e21b81af34d3db Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Tue, 11 Feb 2014 01:43:14 -0400 Subject: separate new and dirty queues --- src/leap/mail/imap/memorystore.py | 80 +++++++++++++++++++++++++++------------ 1 file changed, 55 insertions(+), 25 deletions(-) (limited to 'src/leap/mail/imap/memorystore.py') diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py index 786a9c4..a053f3f 100644 --- a/src/leap/mail/imap/memorystore.py +++ b/src/leap/mail/imap/memorystore.py @@ -24,7 +24,6 @@ import weakref from collections import defaultdict from copy import copy -from itertools import chain from twisted.internet import defer from twisted.internet.task import LoopingCall @@ -33,7 +32,6 @@ from zope.interface import implements from leap.common.check import leap_assert_type from leap.mail import size -from leap.mail.decorators import deferred_to_thread from leap.mail.utils import empty, phash_iter from leap.mail.messageflow import MessageProducer from leap.mail.imap import interfaces @@ -48,7 +46,7 @@ logger = logging.getLogger(__name__) # The default period to do writebacks to the permanent # soledad storage, in seconds. -SOLEDAD_WRITE_PERIOD = 30 +SOLEDAD_WRITE_PERIOD = 15 FDOC = MessagePartType.fdoc.key HDOC = MessagePartType.hdoc.key @@ -106,6 +104,9 @@ class MemoryStore(object): :param write_period: the interval to dump messages to disk, in seconds. :type write_period: int """ + from twisted.internet import reactor + self.reactor = reactor + self._permanent_store = permanent_store self._write_period = write_period @@ -195,11 +196,15 @@ class MemoryStore(object): # New and dirty flags, to set MessageWrapper State. self._new = set([]) + self._new_queue = set([]) self._new_deferreds = {} + self._dirty = set([]) - self._rflags_dirty = set([]) + self._dirty_queue = set([]) self._dirty_deferreds = {} + self._rflags_dirty = set([]) + # Flag for signaling we're busy writing to the disk storage. setattr(self, self.WRITING_FLAG, False) @@ -297,7 +302,7 @@ class MemoryStore(object): """ Put an existing message. - This will set the dirty flag on the MemoryStore. + This will also set the dirty flag on the MemoryStore. :param mbox: the mailbox :type mbox: str or unicode @@ -498,9 +503,14 @@ class MemoryStore(object): # is accquired with set_bool_flag(self, self.WRITING_FLAG): for rflags_doc_wrapper in self.all_rdocs_iter(): - self.producer.push(rflags_doc_wrapper) - for msg_wrapper in self.all_new_dirty_msg_iter(): - self.producer.push(msg_wrapper) + self.producer.push(rflags_doc_wrapper, + state=self.producer.STATE_DIRTY) + for msg_wrapper in self.all_new_msg_iter(): + self.producer.push(msg_wrapper, + state=self.producer.STATE_NEW) + for msg_wrapper in self.all_dirty_msg_iter(): + self.producer.push(msg_wrapper, + state=self.producer.STATE_DIRTY) # MemoryStore specific methods. @@ -784,17 +794,34 @@ class MemoryStore(object): for uid in fdoc_store[mbox]: yield mbox, uid - def all_new_dirty_msg_iter(self): + def all_new_msg_iter(self): """ - Return generator that iterates through all new and dirty messages. + Return generator that iterates through all new messages. :return: generator of MessageWrappers :rtype: generator """ gm = self.get_message - new = (gm(*key) for key in self._new) - dirty = (gm(*key, flags_only=True) for key in self._dirty) - return chain(new, dirty) + new = [gm(*key) for key in self._new] + # move content from new set to the queue + self._new_queue.update(self._new) + self._new.difference_update(self._new) + return new + + def all_dirty_msg_iter(self): + """ + Return generator that iterates through all dirty messages. + + :return: generator of MessageWrappers + :rtype: generator + """ + gm = self.get_message + dirty = [gm(*key, flags_only=True) for key in self._dirty] + # move content from new and dirty sets to the queue + + self._dirty_queue.update(self._dirty) + self._dirty.difference_update(self._dirty) + return dirty def all_deleted_uid_iter(self, mbox): """ @@ -826,25 +853,28 @@ class MemoryStore(object): """ # TODO change indexing of sets to [mbox][key] too. # XXX should return *first* the news, and *then* the dirty... + + # TODO should query in queues too , true? + # return map(lambda _set: key in _set, (self._new, self._dirty)) - def set_new(self, key): + def set_new_queued(self, key): """ - Add the key value to the `new` set. + Add the key value to the `new-queue` set. :param key: the key for the message, in the form mbox, uid :type key: tuple """ - self._new.add(key) + self._new_queue.add(key) - def unset_new(self, key): + def unset_new_queued(self, key): """ - Remove the key value from the `new` set. + Remove the key value from the `new-queue` set. :param key: the key for the message, in the form mbox, uid :type key: tuple """ - self._new.discard(key) + self._new_queue.discard(key) deferreds = self._new_deferreds d = deferreds.get(key, None) if d: @@ -853,23 +883,23 @@ class MemoryStore(object): d.callback('%s, ok' % str(key)) deferreds.pop(key) - def set_dirty(self, key): + def set_dirty_queued(self, key): """ - Add the key value to the `dirty` set. + Add the key value to the `dirty-queue` set. :param key: the key for the message, in the form mbox, uid :type key: tuple """ - self._dirty.add(key) + self._dirty_queue.add(key) - def unset_dirty(self, key): + def unset_dirty_queued(self, key): """ - Remove the key value from the `dirty` set. + Remove the key value from the `dirty-queue` set. :param key: the key for the message, in the form mbox, uid :type key: tuple """ - self._dirty.discard(key) + self._dirty_queue.discard(key) deferreds = self._dirty_deferreds d = deferreds.get(key, None) if d: -- cgit v1.2.3 From 04dfa3afdbb2080c717bfd32d6e47641615967fc Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Tue, 11 Feb 2014 01:52:21 -0400 Subject: improve flag-docs relative internal storage --- src/leap/mail/imap/memorystore.py | 58 +++++++++++++++++++++++++++++---------- 1 file changed, 44 insertions(+), 14 deletions(-) (limited to 'src/leap/mail/imap/memorystore.py') diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py index a053f3f..2835826 100644 --- a/src/leap/mail/imap/memorystore.py +++ b/src/leap/mail/imap/memorystore.py @@ -92,6 +92,7 @@ class MemoryStore(object): WRITING_FLAG = "_writing" _last_uid_lock = threading.Lock() + _fdoc_docid_lock = threading.Lock() def __init__(self, permanent_store=None, write_period=SOLEDAD_WRITE_PERIOD): @@ -158,7 +159,7 @@ class MemoryStore(object): 'mbox-b': weakref.proxy(dict)} } """ - self._chash_fdoc_store = {} + self._chash_fdoc_store = defaultdict(lambda: defaultdict(lambda: None)) # Internal Storage: recent-flags store """ @@ -275,7 +276,7 @@ class MemoryStore(object): """ from twisted.internet import reactor - log.msg("adding new doc to memstore %r (%r)" % (mbox, uid)) + log.msg("Adding new doc to memstore %r (%r)" % (mbox, uid)) key = mbox, uid self._add_message(mbox, uid, message, notify_on_disk) @@ -340,15 +341,12 @@ class MemoryStore(object): if fdoc is not None: fdoc_store = self._fdoc_store[mbox][uid] fdoc_store.update(fdoc) + chash_fdoc_store = self._chash_fdoc_store # content-hash indexing chash = fdoc.get(fields.CONTENT_HASH_KEY) - chash_fdoc_store = self._chash_fdoc_store - if not chash in chash_fdoc_store: - chash_fdoc_store[chash] = {} - chash_fdoc_store[chash][mbox] = weakref.proxy( - fdoc_store) + self._fdoc_store[mbox][uid]) hdoc = msg_dict.get(HDOC, None) if hdoc is not None: @@ -381,7 +379,8 @@ class MemoryStore(object): :type uid: int :rtype: unicode or None """ - doc_id = self._fdoc_id_store[mbox][uid] + with self._fdoc_docid_lock: + doc_id = self._fdoc_id_store[mbox][uid] if empty(doc_id): fdoc = self._permanent_store.get_flags_doc(mbox, uid) @@ -475,6 +474,8 @@ class MemoryStore(object): if key in self._sizes: del self._sizes[key] self._fdoc_store[mbox].pop(uid, None) + with self._fdoc_docid_lock: + self._fdoc_id_store[mbox].pop(uid, None) except Exception as exc: logger.exception(exc) @@ -571,7 +572,8 @@ class MemoryStore(object): :param value: the value to set :type value: int """ - leap_assert_type(value, int) + # can be long??? + #leap_assert_type(value, int) logger.info("setting last soledad uid for %s to %s" % (mbox, value)) # if we already have a value here, don't do anything @@ -603,10 +605,9 @@ class MemoryStore(object): with self._last_uid_lock: self._last_uid[mbox] += 1 value = self._last_uid[mbox] - self.write_last_uid(mbox, value) + self.reactor.callInThread(self.write_last_uid, mbox, value) return value - @deferred_to_thread def write_last_uid(self, mbox, value): """ Increment the soledad integer cache for the highest uid value. @@ -633,10 +634,36 @@ class MemoryStore(object): """ # We can do direct assignments cause we know this will only # be called during initialization of the mailbox. + # TODO could hook here a sanity-check + # for duplicates fdoc_store = self._fdoc_store[mbox] + chash_fdoc_store = self._chash_fdoc_store for uid in flag_docs: - fdoc_store[uid] = ReferenciableDict(flag_docs[uid]) + rdict = ReferenciableDict(flag_docs[uid]) + fdoc_store[uid] = rdict + # populate chash dict too, to avoid fdoc duplication + chash = flag_docs[uid]["chash"] + chash_fdoc_store[chash][mbox] = weakref.proxy( + self._fdoc_store[mbox][uid]) + + def update_flags(self, mbox, uid, fdoc): + """ + Update the flag document for a given mbox and uid combination, + and set the dirty flag. + We could use put_message, but this is faster. + + :param mbox: the mailbox + :type mbox: str or unicode + :param uid: the uid of the message + :type uid: int + + :param fdoc: a dict with the content for the flag docs + :type fdoc: dict + """ + key = mbox, uid + self._fdoc_store[mbox][uid].update(fdoc) + self._dirty.add(key) def load_header_docs(self, header_docs): """ @@ -759,8 +786,7 @@ class MemoryStore(object): :return: MessagePartDoc. It will return None if the flags document has empty content or it is flagged as \\Deleted. """ - docs_dict = self._chash_fdoc_store.get(chash, None) - fdoc = docs_dict.get(mbox, None) if docs_dict else None + fdoc = self._chash_fdoc_store[chash][mbox] # a couple of special cases. # 1. We might have a doc with empty content... @@ -778,6 +804,7 @@ class MemoryStore(object): key = mbox, uid new = key in self._new dirty = key in self._dirty + return MessagePartDoc( new=new, dirty=dirty, store="mem", part=MessagePartType.fdoc, @@ -1027,6 +1054,8 @@ class MemoryStore(object): """ self._stop_write_loop() if self._permanent_store is not None: + # XXX we should check if we did get a True value on this + # operation. If we got False we should retry! (queue was not empty) self.write_messages(self._permanent_store) self.producer.flush() @@ -1090,6 +1119,7 @@ class MemoryStore(object): try: # 1. Delete all messages marked as deleted in soledad. + logger.debug("DELETING FROM SOLEDAD ALL FOR %r" % (mbox,)) sol_deleted = soledad_store.remove_all_deleted(mbox) try: -- cgit v1.2.3 From fd9c8c2e3c88476b90805b689f6914fe5eac16df Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Tue, 11 Feb 2014 02:53:28 -0400 Subject: defer fetch to thread also, dispatch query for all headers to its own method. --- src/leap/mail/imap/memorystore.py | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) (limited to 'src/leap/mail/imap/memorystore.py') diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py index 2835826..e8e8152 100644 --- a/src/leap/mail/imap/memorystore.py +++ b/src/leap/mail/imap/memorystore.py @@ -434,6 +434,8 @@ class MemoryStore(object): hdoc = self._hdoc_store[chash] if empty(hdoc): hdoc = self._permanent_store.get_headers_doc(chash) + if empty(hdoc): + return None if not empty(hdoc.content): self._hdoc_store[chash] = hdoc.content hdoc = hdoc.content @@ -699,6 +701,31 @@ class MemoryStore(object): continue return flags_dict + def all_headers(self, mbox): + """ + Return a dictionary with all the header docs for a given mbox. + + :param mbox: the mailbox + :type mbox: str or unicode + :rtype: dict + """ + headers_dict = {} + uids = self.get_uids(mbox) + fdoc_store = self._fdoc_store[mbox] + hdoc_store = self._hdoc_store + + for uid in uids: + try: + chash = fdoc_store[uid][fields.CONTENT_HASH_KEY] + hdoc = hdoc_store[chash] + if not empty(hdoc): + headers_dict[uid] = hdoc + except KeyError: + continue + + import pprint; pprint.pprint(headers_dict) + return headers_dict + # Counting sheeps... def count_new_mbox(self, mbox): -- cgit v1.2.3 From f6566fe83c93625b918664526e8858f7be667354 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Tue, 11 Feb 2014 16:20:26 -0400 Subject: defer appends too and cut some more time by firing the callback as soon as we've got an UID. --- src/leap/mail/imap/memorystore.py | 32 ++++++++++++-------------------- 1 file changed, 12 insertions(+), 20 deletions(-) (limited to 'src/leap/mail/imap/memorystore.py') diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py index e8e8152..423b891 100644 --- a/src/leap/mail/imap/memorystore.py +++ b/src/leap/mail/imap/memorystore.py @@ -274,30 +274,24 @@ class MemoryStore(object): be fired. :type notify_on_disk: bool """ - from twisted.internet import reactor - log.msg("Adding new doc to memstore %r (%r)" % (mbox, uid)) key = mbox, uid self._add_message(mbox, uid, message, notify_on_disk) self._new.add(key) - # XXX use this while debugging the callback firing, - # remove after unittesting this. - #def log_add(result): - #return result - #observer.addCallback(log_add) - - if notify_on_disk: - # We store this deferred so we can keep track of the pending - # operations internally. - # TODO this should fire with the UID !!! -- change that in - # the soledad store code. - self._new_deferreds[key] = observer - if not notify_on_disk: - # Caller does not care, just fired and forgot, so we pass - # a defer that will inmediately have its callback triggered. - reactor.callLater(0, observer.callback, uid) + if observer is not None: + if notify_on_disk: + # We store this deferred so we can keep track of the pending + # operations internally. + # TODO this should fire with the UID !!! -- change that in + # the soledad store code. + self._new_deferreds[key] = observer + + else: + # Caller does not care, just fired and forgot, so we pass + # a defer that will inmediately have its callback triggered. + self.reactor.callFromThread(observer.callback, uid) def put_message(self, mbox, uid, message, notify_on_disk=True): """ @@ -722,8 +716,6 @@ class MemoryStore(object): headers_dict[uid] = hdoc except KeyError: continue - - import pprint; pprint.pprint(headers_dict) return headers_dict # Counting sheeps... -- cgit v1.2.3 From 54114126d0b8e16784b67ee972e549e5c152c9d0 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Wed, 12 Feb 2014 12:37:31 -0400 Subject: purge empty fdocs on select --- src/leap/mail/imap/memorystore.py | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) (limited to 'src/leap/mail/imap/memorystore.py') diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py index 423b891..4aaee75 100644 --- a/src/leap/mail/imap/memorystore.py +++ b/src/leap/mail/imap/memorystore.py @@ -362,6 +362,27 @@ class MemoryStore(object): self._sizes[key] = size.get_size(self._fdoc_store[key]) # TODO add hdoc and cdocs sizes too + def purge_fdoc_store(self, mbox): + """ + Purge the empty documents from a fdoc store. + Called during initialization of the SoledadMailbox + + :param mbox: the mailbox + :type mbox: str or unicode + """ + # XXX This is really a workaround until I find the conditions + # that are making the empty items remain there. + # This happens, for instance, after running several times + # the regression test, that issues a store deleted + expunge + select + # The items are being correclty deleted, but in succesive appends + # the empty items with previously deleted uids reappear as empty + # documents. I suspect it's a timing condition with a previously + # evaluated sequence being used after the items has been removed. + + for uid, value in self._fdoc_store[mbox].items(): + if empty(value): + del self._fdoc_store[mbox][uid] + def get_docid_for_fdoc(self, mbox, uid): """ Return Soledad document id for the flags-doc for a given mbox and uid, -- cgit v1.2.3 From b520a60d0e48f36dcebe03d19b65839afc460fe9 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Wed, 12 Feb 2014 12:39:33 -0400 Subject: move mbox-doc handling to soledadstore, and lock it --- src/leap/mail/imap/memorystore.py | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) (limited to 'src/leap/mail/imap/memorystore.py') diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py index 4aaee75..ba444b0 100644 --- a/src/leap/mail/imap/memorystore.py +++ b/src/leap/mail/imap/memorystore.py @@ -293,6 +293,7 @@ class MemoryStore(object): # a defer that will inmediately have its callback triggered. self.reactor.callFromThread(observer.callback, uid) + def put_message(self, mbox, uid, message, notify_on_disk=True): """ Put an existing message. @@ -1176,8 +1177,43 @@ class MemoryStore(object): logger.exception(exc) finally: self._start_write_loop() + observer.callback(all_deleted) + # Mailbox documents and attributes + + # This could be also be cached in memstore, but proxying directly + # to soledad since it's not too performance-critical. + + def get_mbox_doc(self, mbox): + """ + Return the soledad document for a given mailbox. + + :param mbox: the mailbox + :type mbox: str or unicode + :rtype: SoledadDocument or None. + """ + return self.permanent_store.get_mbox_document(mbox) + + def get_mbox_closed(self, mbox): + """ + Return the closed attribute for a given mailbox. + + :param mbox: the mailbox + :type mbox: str or unicode + :rtype: bool + """ + return self.permanent_store.get_mbox_closed(mbox) + + def set_mbox_closed(self, mbox, closed): + """ + Set the closed attribute for a given mailbox. + + :param mbox: the mailbox + :type mbox: str or unicode + """ + self.permanent_store.set_mbox_closed(mbox, closed) + # Dump-to-disk controls. @property -- cgit v1.2.3 From ac4c70f0be36c985e16e3f4ec0a38ef6f8d48166 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Wed, 12 Feb 2014 12:42:02 -0400 Subject: remove all refs during removal, and protect from empty docs --- src/leap/mail/imap/memorystore.py | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) (limited to 'src/leap/mail/imap/memorystore.py') diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py index ba444b0..1e4262a 100644 --- a/src/leap/mail/imap/memorystore.py +++ b/src/leap/mail/imap/memorystore.py @@ -485,16 +485,26 @@ class MemoryStore(object): # XXX implement elijah's idea of using a PUT document as a # token to ensure consistency in the removal. + try: + del self._fdoc_store[mbox][uid] + except KeyError: + pass + try: key = mbox, uid self._new.discard(key) self._dirty.discard(key) if key in self._sizes: del self._sizes[key] - self._fdoc_store[mbox].pop(uid, None) + self._known_uids[mbox].discard(uid) + except Exception as exc: + logger.error("error while removing message!") + logger.exception(exc) + try: with self._fdoc_docid_lock: - self._fdoc_id_store[mbox].pop(uid, None) + del self._fdoc_id_store[mbox][uid] except Exception as exc: + logger.error("error while removing message!") logger.exception(exc) # IMessageStoreWriter @@ -1124,6 +1134,8 @@ class MemoryStore(object): # Stop and trigger last write self.stop_and_flush() # Wait on the writebacks to finish + + # XXX what if pending deferreds is empty? pending_deferreds = (self._new_deferreds.get(mbox, []) + self._dirty_deferreds.get(mbox, [])) d1 = defer.gatherResults(pending_deferreds, consumeErrors=True) @@ -1169,6 +1181,7 @@ class MemoryStore(object): logger.exception(exc) # 2. Delete all messages marked as deleted in memory. + logger.debug("DELETING FROM MEM ALL FOR %r" % (mbox,)) mem_deleted = self.remove_all_deleted(mbox) all_deleted = set(mem_deleted).union(set(sol_deleted)) -- cgit v1.2.3 From 45733a231128cc06e123f352b4eb9886d6820878 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Fri, 14 Feb 2014 12:41:58 -0400 Subject: docstring fixes --- src/leap/mail/imap/memorystore.py | 27 +++++++++++++++++---------- 1 file changed, 17 insertions(+), 10 deletions(-) (limited to 'src/leap/mail/imap/memorystore.py') diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py index 1e4262a..53b8d99 100644 --- a/src/leap/mail/imap/memorystore.py +++ b/src/leap/mail/imap/memorystore.py @@ -25,6 +25,7 @@ import weakref from collections import defaultdict from copy import copy +from enum import Enum from twisted.internet import defer from twisted.internet.task import LoopingCall from twisted.python import log @@ -69,6 +70,9 @@ def set_bool_flag(obj, att): setattr(obj, att, False) +DirtyState = Enum("none", "dirty", "new") + + class MemoryStore(object): """ An in-memory store to where we can write the different parts that @@ -293,7 +297,6 @@ class MemoryStore(object): # a defer that will inmediately have its callback triggered. self.reactor.callFromThread(observer.callback, uid) - def put_message(self, mbox, uid, message, notify_on_disk=True): """ Put an existing message. @@ -407,7 +410,8 @@ class MemoryStore(object): return doc_id - def get_message(self, mbox, uid, dirtystate="none", flags_only=False): + def get_message(self, mbox, uid, dirtystate=DirtyState.none, + flags_only=False): """ Get a MessageWrapper for the given mbox and uid combination. @@ -415,8 +419,9 @@ class MemoryStore(object): :type mbox: str or unicode :param uid: the message UID :type uid: int - :param dirtystate: one of `dirty`, `new` or `none` (default) - :type dirtystate: str + :param dirtystate: DirtyState enum: one of `dirty`, `new` + or `none` (default) + :type dirtystate: enum :param flags_only: whether the message should carry only a reference to the flags document. :type flags_only: bool @@ -424,7 +429,7 @@ class MemoryStore(object): :return: MessageWrapper or None """ - if dirtystate == "dirty": + if dirtystate == DirtyState.dirty: flags_only = True key = mbox, uid @@ -434,11 +439,11 @@ class MemoryStore(object): return None new, dirty = False, False - if dirtystate == "none": + if dirtystate == DirtyState.none: new, dirty = self._get_new_dirty_state(key) - if dirtystate == "dirty": + if dirtystate == DirtyState.dirty: new, dirty = False, True - if dirtystate == "new": + if dirtystate == DirtyState.new: new, dirty = True, False if flags_only: @@ -514,6 +519,7 @@ class MemoryStore(object): Write the message documents in this MemoryStore to a different store. :param store: the IMessageStore to write to + :rtype: False if queue is not empty, None otherwise. """ # For now, we pass if the queue is not empty, to avoid duplicate # queuing. @@ -880,7 +886,7 @@ class MemoryStore(object): :rtype: generator """ gm = self.get_message - new = [gm(*key) for key in self._new] + new = [gm(*key, dirtystate=DirtyState.new) for key in self._new] # move content from new set to the queue self._new_queue.update(self._new) self._new.difference_update(self._new) @@ -894,7 +900,8 @@ class MemoryStore(object): :rtype: generator """ gm = self.get_message - dirty = [gm(*key, flags_only=True) for key in self._dirty] + dirty = [gm(*key, flags_only=True, dirtystate=DirtyState.dirty) + for key in self._dirty] # move content from new and dirty sets to the queue self._dirty_queue.update(self._dirty) -- cgit v1.2.3 From 5af059a237833f52869a632e490ff932315a4939 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Mon, 17 Feb 2014 10:52:17 -0400 Subject: defer message push to thread --- src/leap/mail/imap/memorystore.py | 3 +++ 1 file changed, 3 insertions(+) (limited to 'src/leap/mail/imap/memorystore.py') diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py index 53b8d99..2d1f95b 100644 --- a/src/leap/mail/imap/memorystore.py +++ b/src/leap/mail/imap/memorystore.py @@ -42,6 +42,8 @@ from leap.mail.imap.messageparts import RecentFlagsDoc from leap.mail.imap.messageparts import MessageWrapper from leap.mail.imap.messageparts import ReferenciableDict +from leap.mail.decorators import deferred_to_thread + logger = logging.getLogger(__name__) @@ -514,6 +516,7 @@ class MemoryStore(object): # IMessageStoreWriter + @deferred_to_thread def write_messages(self, store): """ Write the message documents in this MemoryStore to a different store. -- cgit v1.2.3 From e9488bf377f07f6f05d3fdd2eb316843cf561605 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Mon, 17 Feb 2014 10:52:48 -0400 Subject: freeze dirty/new sets to avoid changes during iteration --- src/leap/mail/imap/memorystore.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) (limited to 'src/leap/mail/imap/memorystore.py') diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py index 2d1f95b..f23a234 100644 --- a/src/leap/mail/imap/memorystore.py +++ b/src/leap/mail/imap/memorystore.py @@ -889,7 +889,8 @@ class MemoryStore(object): :rtype: generator """ gm = self.get_message - new = [gm(*key, dirtystate=DirtyState.new) for key in self._new] + # need to freeze, set can change during iteration + new = [gm(*key, dirtystate=DirtyState.new) for key in tuple(self._new)] # move content from new set to the queue self._new_queue.update(self._new) self._new.difference_update(self._new) @@ -903,8 +904,9 @@ class MemoryStore(object): :rtype: generator """ gm = self.get_message + # need to freeze, set can change during iteration dirty = [gm(*key, flags_only=True, dirtystate=DirtyState.dirty) - for key in self._dirty] + for key in tuple(self._dirty)] # move content from new and dirty sets to the queue self._dirty_queue.update(self._dirty) -- cgit v1.2.3 From e87dba3288345ba28dce5a844a7faf9f5a5a0b9c Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Mon, 17 Feb 2014 12:19:02 -0400 Subject: remove size calculation until we defer it to thread properly --- src/leap/mail/imap/memorystore.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) (limited to 'src/leap/mail/imap/memorystore.py') diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py index f23a234..56cd000 100644 --- a/src/leap/mail/imap/memorystore.py +++ b/src/leap/mail/imap/memorystore.py @@ -364,9 +364,11 @@ class MemoryStore(object): # Update memory store size # XXX this should use [mbox][uid] - key = mbox, uid - self._sizes[key] = size.get_size(self._fdoc_store[key]) + # TODO --- this has to be deferred to thread, # TODO add hdoc and cdocs sizes too + # it's slowing things down here. + #key = mbox, uid + #self._sizes[key] = size.get_size(self._fdoc_store[key]) def purge_fdoc_store(self, mbox): """ -- cgit v1.2.3 From 0f2f53c8819133e36780e521fecbfadda331255a Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Mon, 17 Feb 2014 13:00:41 -0400 Subject: defer fetch-all-flags too --- src/leap/mail/imap/memorystore.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) (limited to 'src/leap/mail/imap/memorystore.py') diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py index 56cd000..875b1b8 100644 --- a/src/leap/mail/imap/memorystore.py +++ b/src/leap/mail/imap/memorystore.py @@ -726,17 +726,16 @@ class MemoryStore(object): :type mbox: str or unicode :rtype: dict """ - flags_dict = {} + fdict = {} uids = self.get_uids(mbox) - fdoc_store = self._fdoc_store[mbox] + fstore = self._fdoc_store[mbox] for uid in uids: try: - flags = fdoc_store[uid][fields.FLAGS_KEY] - flags_dict[uid] = flags + fdict[uid] = fstore[uid][fields.FLAGS_KEY] except KeyError: continue - return flags_dict + return fdict def all_headers(self, mbox): """ -- cgit v1.2.3 From 4bcb32639bff9a5aab076dba2bdc7667cea60c7f Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Thu, 20 Feb 2014 01:11:26 -0400 Subject: fix rdoc duplication --- src/leap/mail/imap/memorystore.py | 2 ++ 1 file changed, 2 insertions(+) (limited to 'src/leap/mail/imap/memorystore.py') diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py index 875b1b8..aa7da3d 100644 --- a/src/leap/mail/imap/memorystore.py +++ b/src/leap/mail/imap/memorystore.py @@ -506,6 +506,8 @@ class MemoryStore(object): if key in self._sizes: del self._sizes[key] self._known_uids[mbox].discard(uid) + except KeyError: + pass except Exception as exc: logger.error("error while removing message!") logger.exception(exc) -- cgit v1.2.3 From 976ec85451bef3fd380f69c64e803d7740d7dae4 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Thu, 20 Feb 2014 01:27:17 -0400 Subject: ignore keyerror on deletion --- src/leap/mail/imap/memorystore.py | 2 ++ 1 file changed, 2 insertions(+) (limited to 'src/leap/mail/imap/memorystore.py') diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py index aa7da3d..6206468 100644 --- a/src/leap/mail/imap/memorystore.py +++ b/src/leap/mail/imap/memorystore.py @@ -514,6 +514,8 @@ class MemoryStore(object): try: with self._fdoc_docid_lock: del self._fdoc_id_store[mbox][uid] + except KeyError: + pass except Exception as exc: logger.error("error while removing message!") logger.exception(exc) -- cgit v1.2.3 From 733994d68b9f3ce528b552f67e9cbec005e57e9f Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Tue, 25 Feb 2014 22:38:29 -0400 Subject: rename all fdocs when folder is renamed --- src/leap/mail/imap/memorystore.py | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) (limited to 'src/leap/mail/imap/memorystore.py') diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py index 6206468..d383b79 100644 --- a/src/leap/mail/imap/memorystore.py +++ b/src/leap/mail/imap/memorystore.py @@ -1244,6 +1244,27 @@ class MemoryStore(object): """ self.permanent_store.set_mbox_closed(mbox, closed) + # Rename flag-documents + + def rename_fdocs_mailbox(self, old_mbox, new_mbox): + """ + Change the mailbox name for all flag documents in a given mailbox. + Used from account.rename + + :param old_mbox: name for the old mbox + :type old_mbox: str or unicode + :param new_mbox: name for the new mbox + :type new_mbox: str or unicode + """ + fs = self._fdoc_store + keys = fs[old_mbox].keys() + for k in keys: + fdoc = fs[old_mbox][k] + fdoc['mbox'] = new_mbox + fs[new_mbox][k] = fdoc + fs[old_mbox].pop(k) + self._dirty.add((new_mbox, k)) + # Dump-to-disk controls. @property -- cgit v1.2.3