diff options
| -rw-r--r-- | mail/src/leap/mail/imap/account.py | 13 | ||||
| -rw-r--r-- | mail/src/leap/mail/imap/interfaces.py | 93 | ||||
| -rw-r--r-- | mail/src/leap/mail/imap/mailbox.py | 62 | ||||
| -rw-r--r-- | mail/src/leap/mail/imap/memorystore.py | 478 | ||||
| -rw-r--r-- | mail/src/leap/mail/imap/messages.py | 206 | ||||
| -rw-r--r-- | mail/src/leap/mail/imap/service/imap.py | 20 | ||||
| -rw-r--r-- | mail/src/leap/mail/messageflow.py | 39 | ||||
| -rw-r--r-- | mail/src/leap/mail/size.py | 57 | 
8 files changed, 884 insertions, 84 deletions
| diff --git a/mail/src/leap/mail/imap/account.py b/mail/src/leap/mail/imap/account.py index ce83079..7641ea8 100644 --- a/mail/src/leap/mail/imap/account.py +++ b/mail/src/leap/mail/imap/account.py @@ -48,7 +48,7 @@ class SoledadBackedAccount(WithMsgFields, IndexedDB, MBoxParser):      selected = None      closed = False -    def __init__(self, account_name, soledad=None): +    def __init__(self, account_name, soledad=None, memstore=None):          """          Creates a SoledadAccountIndex that keeps track of the mailboxes          and subscriptions handled by this account. @@ -57,7 +57,9 @@ class SoledadBackedAccount(WithMsgFields, IndexedDB, MBoxParser):          :type acct_name: str          :param soledad: a Soledad instance. -        :param soledad: Soledad +        :type soledad: Soledad +        :param memstore: a MemoryStore instance. +        :type memstore: MemoryStore          """          leap_assert(soledad, "Need a soledad instance to initialize")          leap_assert_type(soledad, Soledad) @@ -67,6 +69,7 @@ class SoledadBackedAccount(WithMsgFields, IndexedDB, MBoxParser):          self._account_name = self._parse_mailbox_name(account_name)          self._soledad = soledad +        self._memstore = memstore          self.initialize_db() @@ -131,7 +134,8 @@ class SoledadBackedAccount(WithMsgFields, IndexedDB, MBoxParser):          if name not in self.mailboxes:              raise imap4.MailboxException("No such mailbox: %r" % name) -        return SoledadMailbox(name, soledad=self._soledad) +        return SoledadMailbox(name, soledad=self._soledad, +                              memstore=self._memstore)      ##      ## IAccount @@ -221,8 +225,7 @@ class SoledadBackedAccount(WithMsgFields, IndexedDB, MBoxParser):          self.selected = name          return SoledadMailbox( -            name, rw=readwrite, -            soledad=self._soledad) +            name, self._soledad, self._memstore, readwrite)      def delete(self, name, force=False):          """ diff --git a/mail/src/leap/mail/imap/interfaces.py b/mail/src/leap/mail/imap/interfaces.py new file mode 100644 index 0000000..585165a --- /dev/null +++ b/mail/src/leap/mail/imap/interfaces.py @@ -0,0 +1,93 @@ +# -*- coding: utf-8 -*- +# interfaces.py +# Copyright (C) 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program.  If not, see <http://www.gnu.org/licenses/>. +""" +Interfaces for the IMAP module. +""" +from zope.interface import Interface, Attribute + + +class IMessageContainer(Interface): +    """ +    I am a container around the different documents that a message +    is split into. +    """ +    fdoc = Attribute('The flags document for this message, if any.') +    hdoc = Attribute('The headers document for this message, if any.') +    cdocs = Attribute('The dict of content documents for this message, ' +                      'if any.') + +    def walk(self): +        """ +        Return an iterator to the docs for all the parts. + +        :rtype: iterator +        """ + + +class IMessageStore(Interface): +    """ +    I represent a generic storage for LEAP Messages. +    """ + +    def create_message(self, mbox, uid, message): +        """ +        Put the passed message into this IMessageStore. + +        :param mbox: the mbox this message belongs. +        :param uid: the UID that identifies this message in this mailbox. +        :param message: a IMessageContainer implementor. +        """ + +    def put_message(self, mbox, uid, message): +        """ +        Put the passed message into this IMessageStore. + +        :param mbox: the mbox this message belongs. +        :param uid: the UID that identifies this message in this mailbox. +        :param message: a IMessageContainer implementor. +        """ + +    def remove_message(self, mbox, uid): +        """ +        Remove the given message from this IMessageStore. + +        :param mbox: the mbox this message belongs. +        :param uid: the UID that identifies this message in this mailbox. +        """ + +    def get_message(self, mbox, uid): +        """ +        Get a IMessageContainer for the given mbox and uid combination. + +        :param mbox: the mbox this message belongs. +        :param uid: the UID that identifies this message in this mailbox. +        """ + + +class IMessageStoreWriter(Interface): +    """ +    I represent a storage that is able to write its contents to another +    different IMessageStore. +    """ + +    def write_messages(self, store): +        """ +        Write the documents in this IMessageStore to a different +        storage. Usually this will be done from a MemoryStorage to a DbStorage. + +        :param store: another IMessageStore implementor. +        """ diff --git a/mail/src/leap/mail/imap/mailbox.py b/mail/src/leap/mail/imap/mailbox.py index 0131ce0..9babe6b 100644 --- a/mail/src/leap/mail/imap/mailbox.py +++ b/mail/src/leap/mail/imap/mailbox.py @@ -37,6 +37,7 @@ from leap.common.events.events_pb2 import IMAP_UNREAD_MAIL  from leap.common.check import leap_assert, leap_assert_type  from leap.mail.decorators import deferred  from leap.mail.imap.fields import WithMsgFields, fields +from leap.mail.imap.memorystore import MessageDict  from leap.mail.imap.messages import MessageCollection  from leap.mail.imap.parser import MBoxParser @@ -80,7 +81,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser):      next_uid_lock = threading.Lock() -    def __init__(self, mbox, soledad=None, rw=1): +    def __init__(self, mbox, soledad, memstore, rw=1):          """          SoledadMailbox constructor. Needs to get passed a name, plus a          Soledad instance. @@ -91,9 +92,13 @@ class SoledadMailbox(WithMsgFields, MBoxParser):          :param soledad: a Soledad instance.          :type soledad: Soledad -        :param rw: read-and-write flags +        :param memstore: a MemoryStore instance +        :type memstore: MemoryStore + +        :param rw: read-and-write flag for this mailbox          :type rw: int          """ +        print "got memstore: ", memstore          leap_assert(mbox, "Need a mailbox name to initialize")          leap_assert(soledad, "Need a soledad instance to initialize") @@ -105,9 +110,10 @@ class SoledadMailbox(WithMsgFields, MBoxParser):          self.rw = rw          self._soledad = soledad +        self._memstore = memstore          self.messages = MessageCollection( -            mbox=mbox, soledad=self._soledad) +            mbox=mbox, soledad=self._soledad, memstore=self._memstore)          if not self.getFlags():              self.setFlags(self.INIT_FLAGS) @@ -231,7 +237,10 @@ class SoledadMailbox(WithMsgFields, MBoxParser):              # XXX It looks like it has been corrupted.              # We need to be able to survive this.              return None -        return mbox.content.get(self.LAST_UID_KEY, 1) +        last = mbox.content.get(self.LAST_UID_KEY, 1) +        if self._memstore: +            last = max(last, self._memstore.get_last_uid(mbox)) +        return last      def _set_last_uid(self, uid):          """ @@ -259,6 +268,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser):              value = count          mbox.content[key] = value +        # XXX this should be set in the memorystore instead!!!          self._soledad.put_doc(mbox)      last_uid = property( @@ -532,12 +542,17 @@ class SoledadMailbox(WithMsgFields, MBoxParser):          # can treat them all the same.          # Change this to the flag that twisted expects when we          # switch to content-hash based index + local UID table. +        print +        print "FETCHING..."          sequence = False          #sequence = True if uid == 0 else False          messages_asked = self._bound_seq(messages_asked) +        print "asked: ", messages_asked          seq_messg = self._filter_msg_seq(messages_asked) + +        print "seq: ", seq_messg          getmsg = lambda uid: self.messages.get_msg_by_uid(uid)          # for sequence numbers (uid = 0) @@ -769,36 +784,41 @@ class SoledadMailbox(WithMsgFields, MBoxParser):          uid_next = self.getUIDNext()          msg = messageObject -        # XXX DEBUG ---------------------------------------- -        #print "copying MESSAGE from %s (%s) to %s (%s)" % ( -            #msg._mbox, msg._uid, self.mbox, uid_next) -          # XXX should use a public api instead          fdoc = msg._fdoc +        hdoc = msg._hdoc          if not fdoc:              logger.debug("Tried to copy a MSG with no fdoc")              return +        #old_mbox = fdoc.content[self.MBOX_KEY] +        #old_uid = fdoc.content[self.UID_KEY] +        #old_key = old_mbox, old_uid +        #print "copying from OLD MBOX ", old_mbox + +        # XXX bit doubt... to duplicate in memory +        # or not to...? +        # I think it should be ok to duplicate as long as we're +        # careful at the hour of writes... +        # We could use also proxies, but it will break when +        # the original mailbox is flushed. + +        # XXX DEBUG ---------------------------------------- +        #print "copying MESSAGE from %s (%s) to %s (%s)" % ( +            #msg._mbox, msg._uid, self.mbox, uid_next) +          new_fdoc = copy.deepcopy(fdoc.content)          new_fdoc[self.UID_KEY] = uid_next          new_fdoc[self.MBOX_KEY] = self.mbox -        self._do_add_doc(new_fdoc) +        self._memstore.put(self.mbox, uid_next, MessageDict( +            new_fdoc, hdoc.content)) -        # XXX should use a public api instead -        hdoc = msg._hdoc -        self.messages.add_hdocset_docid(hdoc.doc_id) +        # XXX use memory store +        if hasattr(hdoc, 'doc_id'): +            self.messages.add_hdocset_docid(hdoc.doc_id)          deferLater(reactor, 1, self.notify_new) -    def _do_add_doc(self, doc): -        """ -        Defer the adding of a new doc. - -        :param doc: document to be created in soledad. -        :type doc: dict -        """ -        self._soledad.create_doc(doc) -      # convenience fun      def deleteAllDocs(self): diff --git a/mail/src/leap/mail/imap/memorystore.py b/mail/src/leap/mail/imap/memorystore.py new file mode 100644 index 0000000..b8829e0 --- /dev/null +++ b/mail/src/leap/mail/imap/memorystore.py @@ -0,0 +1,478 @@ +# -*- coding: utf-8 -*- +# memorystore.py +# Copyright (C) 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program.  If not, see <http://www.gnu.org/licenses/>. +""" +In-memory transient store for a LEAPIMAPServer. +""" +import contextlib +import logging +import weakref + +from collections import namedtuple + +from twisted.internet.task import LoopingCall +from zope.interface import implements + +from leap.mail import size +from leap.mail.messageflow import MessageProducer +from leap.mail.messageparts import MessagePartType +from leap.mail.imap import interfaces +from leap.mail.imap.fields import fields + +logger = logging.getLogger(__name__) + + +""" +A MessagePartDoc is a light wrapper around the dictionary-like +data that we pass along for message parts. It can be used almost everywhere +that you would expect a SoledadDocument, since it has a dict under the +`content` attribute. + +We also keep some metadata on it, relative in part to the message as a whole, +and sometimes to a part in particular only. + +* `new` indicates that the document has just been created. SoledadStore +  should just create a new doc for all the related message parts. +* `store` indicates the type of store a given MessagePartDoc lives in. +  We currently use this to indicate that  the document comes from memeory, +  but we should probably get rid of it as soon as we extend the use of the +  SoledadStore interface along LeapMessage, MessageCollection and Mailbox. +* `part` is one of the MessagePartType enums. + +* `dirty` indicates that, while we already have the document in Soledad, +  we have modified its state in memory, so we need to put_doc instead while +  dumping the MemoryStore contents. +  `dirty` attribute would only apply to flags-docs and linkage-docs. + + +  XXX this is still not implemented! + +""" + +MessagePartDoc = namedtuple( +    'MessagePartDoc', +    ['new', 'dirty', 'part', 'store', 'content']) + + +class ReferenciableDict(dict): +    """ +    A dict that can be weak-referenced. + +    Some builtin objects are not weak-referenciable unless +    subclassed. So we do. + +    Used to return pointers to the items in the MemoryStore. +    """ + + +class MessageWrapper(object): +    """ +    A simple nested dictionary container around the different message subparts. +    """ +    implements(interfaces.IMessageContainer) + +    FDOC = "fdoc" +    HDOC = "hdoc" +    CDOCS = "cdocs" + +    # XXX can use this to limit the memory footprint, +    # or is it too premature to optimize? +    # Does it work well together with the interfaces.implements? + +    #__slots__ = ["_dict", "_new", "_dirty", "memstore"] + +    def __init__(self, fdoc=None, hdoc=None, cdocs=None, +                 from_dict=None, memstore=None, +                 new=True, dirty=False): +        self._dict = {} + +        self._new = new +        self._dirty = dirty +        self.memstore = memstore + +        if from_dict is not None: +            self.from_dict(from_dict) +        else: +            if fdoc is not None: +                self._dict[self.FDOC] = ReferenciableDict(fdoc) +            if hdoc is not None: +                self._dict[self.HDOC] = ReferenciableDict(hdoc) +            if cdocs is not None: +                self._dict[self.CDOCS] = ReferenciableDict(cdocs) + +    # properties + +    @property +    def new(self): +        return self._new + +    def set_new(self, value=True): +        self._new = value + +    @property +    def dirty(self): +        return self._dirty + +    def set_dirty(self, value=True): +        self._dirty = value + +    # IMessageContainer + +    @property +    def fdoc(self): +        _fdoc = self._dict.get(self.FDOC, None) +        if _fdoc: +            content_ref = weakref.proxy(_fdoc) +        else: +            logger.warning("NO FDOC!!!") +            content_ref = {} +        return MessagePartDoc(new=self.new, dirty=self.dirty, +                              store=self._storetype, +                              part=MessagePartType.fdoc, +                              content=content_ref) + +    @property +    def hdoc(self): +        _hdoc = self._dict.get(self.HDOC, None) +        if _hdoc: +            content_ref = weakref.proxy(_hdoc) +        else: +            logger.warning("NO HDOC!!!!") +            content_ref = {} +        return MessagePartDoc(new=self.new, dirty=self.dirty, +                              store=self._storetype, +                              part=MessagePartType.hdoc, +                              content=content_ref) + +    @property +    def cdocs(self): +        _cdocs = self._dict.get(self.CDOCS, None) +        if _cdocs: +            return weakref.proxy(_cdocs) +        else: +            return {} + +    def walk(self): +        """ +        Generator that iterates through all the parts, returning +        MessagePartDoc. +        """ +        yield self.fdoc +        yield self.hdoc +        for cdoc in self.cdocs.values(): +            # XXX this will break ---- +            content_ref = weakref.proxy(cdoc) +            yield MessagePartDoc(new=self.new, dirty=self.dirty, +                                 store=self._storetype, +                                 part=MessagePartType.cdoc, +                                 content=content_ref) + +    # i/o + +    def as_dict(self): +        """ +        Return a dict representation of the parts contained. +        """ +        return self._dict + +    def from_dict(self, msg_dict): +        """ +        Populate MessageWrapper parts from a dictionary. +        It expects the same format that we use in a +        MessageWrapper. +        """ +        fdoc, hdoc, cdocs = map( +            lambda part: msg_dict.get(part, None), +            [self.FDOC, self.HDOC, self.CDOCS]) +        self._dict[self.FDOC] = fdoc +        self._dict[self.HDOC] = hdoc +        self._dict[self.CDOCS] = cdocs + + +@contextlib.contextmanager +def set_bool_flag(obj, att): +    """ +    Set a boolean flag to True while we're doing our thing. +    Just to let the world know. +    """ +    setattr(obj, att, True) +    try: +        yield True +    except RuntimeError as exc: +        logger.exception(exc) +    finally: +        setattr(obj, att, False) + + +class MemoryStore(object): +    """ +    An in-memory store to where we can write the different parts that +    we split the messages into and buffer them until we write them to the +    permanent storage. + +    It uses MessageWrapper instances to represent the message-parts, which are +    indexed by mailbox name and UID. + +    It also can be passed a permanent storage as a paremeter (any implementor +    of IMessageStore, in this case a SoledadStore). In this case, a periodic +    dump of the messages stored in memory will be done. The period of the +    writes to the permanent storage is controled by the write_period parameter +    in the constructor. +    """ +    implements(interfaces.IMessageStore) +    implements(interfaces.IMessageStoreWriter) + +    producer = None + +    # TODO We will want to index by chash when we transition to local-only +    # UIDs. +    # TODO should store RECENT-FLAGS too +    # TODO should store HDOCSET too (use weakrefs!) -- will need to subclass +    # TODO do use dirty flag (maybe use namedtuples for that) so we can use it +    # also as a read-cache. + +    WRITING_FLAG = "_writing" + +    def __init__(self, permanent_store=None, write_period=60): +        """ +        Initialize a MemoryStore. + +        :param permanent_store: a IMessageStore implementor to dump +                                messages to. +        :type permanent_store: IMessageStore +        :param write_period: the interval to dump messages to disk, in seconds. +        :type write_period: int +        """ +        self._permanent_store = permanent_store +        self._write_period = write_period + +        # Internal Storage +        self._msg_store = {} +        self._phash_store = {} + +        # TODO ----------------- implement mailbox-level flags store too! ---- +        self._rflags_store = {} +        self._hdocset_store = {} +        # TODO ----------------- implement mailbox-level flags store too! ---- + +        # New and dirty flags, to set MessageWrapper State. +        self._new = set([]) +        self._dirty = set([]) + +        # Flag for signaling we're busy writing to the disk storage. +        setattr(self, self.WRITING_FLAG, False) + +        if self._permanent_store is not None: +            # this producer spits its messages to the permanent store +            # consumer using a queue. We will use that to put +            # our messages to be written. +            self.producer = MessageProducer(permanent_store, +                                            period=0.1) +            # looping call for dumping to SoledadStore +            self._write_loop = LoopingCall(self.write_messages, +                                           permanent_store) + +            # We can start the write loop right now, why wait? +            self._start_write_loop() + +    def _start_write_loop(self): +        """ +        Start loop for writing to disk database. +        """ +        if not self._write_loop.running: +            self._write_loop.start(self._write_period, now=True) + +    def _stop_write_loop(self): +        """ +        Stop loop for writing to disk database. +        """ +        if self._write_loop.running: +            self._write_loop.stop() + +    # IMessageStore + +    # XXX this would work well for whole message operations. +    # We would have to add a put_flags operation to modify only +    # the flags doc (and set the dirty flag accordingly) + +    def create_message(self, mbox, uid, message): +        """ +        Create the passed message into this MemoryStore. + +        By default we consider that any message is a new message. +        """ +        print "adding new doc to memstore %s (%s)" % (mbox, uid) +        key = mbox, uid +        self._new.add(key) + +        msg_dict = message.as_dict() +        self._msg_store[key] = msg_dict + +        cdocs = message.cdocs + +        dirty = key in self._dirty +        new = key in self._new + +        # XXX should capture this in log... + +        for cdoc_key in cdocs.keys(): +            print "saving cdoc" +            cdoc = self._msg_store[key]['cdocs'][cdoc_key] + +            # XXX this should be done in the MessageWrapper constructor +            # instead... +            # first we make it weak-referenciable +            referenciable_cdoc = ReferenciableDict(cdoc) +            self._msg_store[key]['cdocs'][cdoc_key] = MessagePartDoc( +                new=new, dirty=dirty, store="mem", +                part=MessagePartType.cdoc, +                content=referenciable_cdoc) +            phash = cdoc.get(fields.PAYLOAD_HASH_KEY, None) +            if not phash: +                continue +            self._phash_store[phash] = weakref.proxy(referenciable_cdoc) + +    def put_message(self, mbox, uid, msg): +        """ +        Put an existing message. +        """ +        return NotImplementedError() + +    def get_message(self, mbox, uid): +        """ +        Get a MessageWrapper for the given mbox and uid combination. + +        :return: MessageWrapper or None +        """ +        key = mbox, uid +        msg_dict = self._msg_store.get(key, None) +        if msg_dict: +            new, dirty = self._get_new_dirty_state(key) +            return MessageWrapper(from_dict=msg_dict, +                                  memstore=weakref.proxy(self)) +        else: +            return None + +    def remove_message(self, mbox, uid): +        """ +        Remove a Message from this MemoryStore. +        """ +        raise NotImplementedError() + +    # IMessageStoreWriter + +    def write_messages(self, store): +        """ +        Write the message documents in this MemoryStore to a different store. +        """ +        # XXX pass if it's writing (ie, the queue is not empty...) +        # See how to make the writing_flag aware of the queue state... +        print "writing messages to producer..." + +        with set_bool_flag(self, self.WRITING_FLAG): +            for msg_wrapper in self.all_msg_iter(): +                self.producer.push(msg_wrapper) + +    # MemoryStore specific methods. + +    def get_uids(self, mbox): +        """ +        Get all uids for a given mbox. +        """ +        all_keys = self._msg_store.keys() +        return [uid for m, uid in all_keys if m == mbox] + +    def get_last_uid(self, mbox): +        """ +        Get the highest UID for a given mbox. +        """ +        # XXX should get from msg_store keys instead! +        if not self._new: +            return 0 +        return max(self.get_uids(mbox)) + +    def count_new_mbox(self, mbox): +        """ +        Count the new messages by inbox. +        """ +        return len([(m, uid) for m, uid in self._new if mbox == mbox]) + +    def count_new(self): +        """ +        Count all the new messages in the MemoryStore. +        """ +        return len(self._new) + +    def get_by_phash(self, phash): +        """ +        Return a content-document by its payload-hash. +        """ +        doc = self._phash_store.get(phash, None) + +        # XXX have to keep a mapping between phash and its linkage +        # info, to know if this payload is been already saved or not. +        # We will be able to get this from the linkage-docs, +        # not yet implemented. +        new = True +        dirty = False +        return MessagePartDoc( +            new=new, dirty=dirty, store="mem", +            part=MessagePartType.cdoc, +            content=doc) + +    def all_msg_iter(self): +        """ +        Return generator that iterates through all messages in the store. +        """ +        return (self.get_message(*key) +                for key in sorted(self._msg_store.keys())) + +    def _get_new_dirty_state(self, key): +        """ +        Return `new` and `dirty` flags for a given message. +        """ +        return map(lambda _set: key in _set, (self._new, self._dirty)) + +    @property +    def is_writing(self): +        """ +        Property that returns whether the store is currently writing its +        internal state to a permanent storage. + +        Used to evaluate whether the CHECK command can inform that the field +        is clear to proceed, or waiting for the write operations to complete +        is needed instead. + +        :rtype: bool +        """ +        # XXX this should probably return a deferred !!! +        return getattr(self, self.WRITING_FLAG) + +    def put_part(self, part_type, value): +        """ +        Put the passed part into this IMessageStore. +        `part` should be one of: fdoc, hdoc, cdoc +        """ +        # XXX turn that into a enum + +    # Memory management. + +    def get_size(self): +        """ +        Return the size of the internal storage. +        Use for calculating the limit beyond which we should flush the store. +        """ +        return size.get_size(self._msg_store) diff --git a/mail/src/leap/mail/imap/messages.py b/mail/src/leap/mail/imap/messages.py index 34304ea..ef0b0a1 100644 --- a/mail/src/leap/mail/imap/messages.py +++ b/mail/src/leap/mail/imap/messages.py @@ -42,6 +42,7 @@ from leap.mail.utils import first, find_charset  from leap.mail.decorators import deferred  from leap.mail.imap.index import IndexedDB  from leap.mail.imap.fields import fields, WithMsgFields +from leap.mail.imap.memorystore import MessageDict  from leap.mail.imap.parser import MailParser, MBoxParser  from leap.mail.messageflow import IMessageConsumer @@ -49,11 +50,20 @@ logger = logging.getLogger(__name__)  # TODO ------------------------------------------------------------ +# [ ] Add ref to incoming message during add_msg  # [ ] Add linked-from info.  # [ ] Delete incoming mail only after successful write!  # [ ] Remove UID from syncable db. Store only those indexes locally. +# XXX no longer needed, since i'm using proxies instead of direct weakrefs +def maybe_call(thing): +    """ +    Return the same thing, or the result of its invocation if it is a callable. +    """ +    return thing() if callable(thing) else thing + +  def lowerdict(_dict):      """      Return a dict with the keys in lowercase. @@ -333,7 +343,7 @@ class LeapMessage(fields, MailParser, MBoxParser):      implements(imap4.IMessage) -    def __init__(self, soledad, uid, mbox, collection=None): +    def __init__(self, soledad, uid, mbox, collection=None, container=None):          """          Initializes a LeapMessage. @@ -345,12 +355,15 @@ class LeapMessage(fields, MailParser, MBoxParser):          :type mbox: basestring          :param collection: a reference to the parent collection object          :type collection: MessageCollection +        :param container: a IMessageContainer implementor instance +        :type container: IMessageContainer          """          MailParser.__init__(self)          self._soledad = soledad          self._uid = int(uid)          self._mbox = self._parse_mailbox_name(mbox)          self._collection = collection +        self._container = container          self.__chash = None          self.__bdoc = None @@ -361,13 +374,29 @@ class LeapMessage(fields, MailParser, MBoxParser):          An accessor to the flags document.          """          if all(map(bool, (self._uid, self._mbox))): -            fdoc = self._get_flags_doc() +            fdoc = None +            if self._container is not None: +                fdoc = self._container.fdoc +            if not fdoc: +                fdoc = self._get_flags_doc()              if fdoc: -                self.__chash = fdoc.content.get( +                fdoc_content = maybe_call(fdoc.content) +                self.__chash = fdoc_content.get(                      fields.CONTENT_HASH_KEY, None)              return fdoc      @property +    def _hdoc(self): +        """ +        An accessor to the headers document. +        """ +        if self._container is not None: +            hdoc = self._container.hdoc +            if hdoc: +                return hdoc +        return self._get_headers_doc() + +    @property      def _chash(self):          """          An accessor to the content hash for this message. @@ -375,18 +404,11 @@ class LeapMessage(fields, MailParser, MBoxParser):          if not self._fdoc:              return None          if not self.__chash and self._fdoc: -            self.__chash = self._fdoc.content.get( +            self.__chash = maybe_call(self._fdoc.content).get(                  fields.CONTENT_HASH_KEY, None)          return self.__chash      @property -    def _hdoc(self): -        """ -        An accessor to the headers document. -        """ -        return self._get_headers_doc() - -    @property      def _bdoc(self):          """          An accessor to the body document. @@ -422,7 +444,7 @@ class LeapMessage(fields, MailParser, MBoxParser):          flags = []          fdoc = self._fdoc          if fdoc: -            flags = fdoc.content.get(self.FLAGS_KEY, None) +            flags = maybe_call(fdoc.content).get(self.FLAGS_KEY, None)          msgcol = self._collection @@ -449,6 +471,8 @@ class LeapMessage(fields, MailParser, MBoxParser):          :return: a SoledadDocument instance          :rtype: SoledadDocument          """ +        # XXX use memory store ...! +          leap_assert(isinstance(flags, tuple), "flags need to be a tuple")          log.msg('setting flags: %s (%s)' % (self._uid, flags)) @@ -461,7 +485,9 @@ class LeapMessage(fields, MailParser, MBoxParser):          doc.content[self.FLAGS_KEY] = flags          doc.content[self.SEEN_KEY] = self.SEEN_FLAG in flags          doc.content[self.DEL_KEY] = self.DELETED_FLAG in flags -        self._soledad.put_doc(doc) + +        if getattr(doc, 'store', None) != "mem": +            self._soledad.put_doc(doc)      def addFlags(self, flags):          """ @@ -521,18 +547,26 @@ class LeapMessage(fields, MailParser, MBoxParser):          """          # TODO refactor with getBodyFile in MessagePart          fd = StringIO.StringIO() -        bdoc = self._bdoc -        if bdoc: -            body = self._bdoc.content.get(self.RAW_KEY, "") -            content_type = bdoc.content.get('content-type', "") +        if self._bdoc is not None: +            bdoc_content = self._bdoc.content +            body = bdoc_content.get(self.RAW_KEY, "") +            content_type = bdoc_content.get('content-type', "")              charset = find_charset(content_type) +            logger.debug('got charset from content-type: %s' % charset)              if charset is None:                  charset = self._get_charset(body)              try:                  body = body.encode(charset)              except UnicodeError as e: -                logger.error("Unicode error, using 'replace'. {0!r}".format(e)) -                body = body.encode(charset, 'replace') +                logger.error("Unicode error {0}".format(e)) +                logger.debug("Attempted to encode with: %s" % charset) +                try: +                    body = body.encode(charset, 'replace') +                except UnicodeError as e: +                    try: +                        body = body.encode('utf-8', 'replace') +                    except: +                        pass          # We are still returning funky characters from here.          else: @@ -567,7 +601,8 @@ class LeapMessage(fields, MailParser, MBoxParser):          """          size = None          if self._fdoc: -            size = self._fdoc.content.get(self.SIZE_KEY, False) +            fdoc_content = maybe_call(self._fdoc.content) +            size = fdoc_content.get(self.SIZE_KEY, False)          else:              logger.warning("No FLAGS doc for %s:%s" % (self._mbox,                                                         self._uid)) @@ -632,7 +667,8 @@ class LeapMessage(fields, MailParser, MBoxParser):          Return the headers dict for this message.          """          if self._hdoc is not None: -            headers = self._hdoc.content.get(self.HEADERS_KEY, {}) +            hdoc_content = maybe_call(self._hdoc.content) +            headers = hdoc_content.get(self.HEADERS_KEY, {})              return headers          else: @@ -646,7 +682,8 @@ class LeapMessage(fields, MailParser, MBoxParser):          Return True if this message is multipart.          """          if self._fdoc: -            is_multipart = self._fdoc.content.get(self.MULTIPART_KEY, False) +            fdoc_content = maybe_call(self._fdoc.content) +            is_multipart = fdoc_content.get(self.MULTIPART_KEY, False)              return is_multipart          else:              logger.warning( @@ -688,7 +725,8 @@ class LeapMessage(fields, MailParser, MBoxParser):              logger.warning("Tried to get part but no HDOC found!")              return None -        pmap = self._hdoc.content.get(fields.PARTS_MAP_KEY, {}) +        hdoc_content = maybe_call(self._hdoc.content) +        pmap = hdoc_content.get(fields.PARTS_MAP_KEY, {})          return pmap[str(part)]      def _get_flags_doc(self): @@ -724,16 +762,33 @@ class LeapMessage(fields, MailParser, MBoxParser):          Return the document that keeps the body for this          message.          """ -        body_phash = self._hdoc.content.get( +        hdoc_content = maybe_call(self._hdoc.content) +        body_phash = hdoc_content.get(              fields.BODY_KEY, None)          if not body_phash:              logger.warning("No body phash for this document!")              return None -        body_docs = self._soledad.get_from_index( -            fields.TYPE_P_HASH_IDX, -            fields.TYPE_CONTENT_VAL, str(body_phash)) -        return first(body_docs) +        # XXX get from memstore too... +        # if memstore: memstore.get_phrash +        # memstore should keep a dict with weakrefs to the +        # phash doc... + +        if self._container is not None: +            bdoc = self._container.memstore.get_by_phash(body_phash) +            if bdoc: +                return bdoc +            else: +                print "no doc for that phash found!" + +        # no memstore or no doc found there +        if self._soledad: +            body_docs = self._soledad.get_from_index( +                fields.TYPE_P_HASH_IDX, +                fields.TYPE_CONTENT_VAL, str(body_phash)) +            return first(body_docs) +        else: +            logger.error("No phash in container, and no soledad found!")      def __getitem__(self, key):          """ @@ -746,7 +801,7 @@ class LeapMessage(fields, MailParser, MBoxParser):          :return: The content value indexed by C{key} or None          :rtype: str          """ -        return self._fdoc.content.get(key, None) +        return maybe_call(self._fdoc.content).get(key, None)      # setters @@ -790,6 +845,8 @@ class LeapMessage(fields, MailParser, MBoxParser):          # until we think about a good way of deorphaning.          # Maybe a crawler of unreferenced docs. +        # XXX remove from memory store!!! +          # XXX implement elijah's idea of using a PUT document as a          # token to ensure consistency in the removal. @@ -957,7 +1014,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser,      """      A collection of messages, surprisingly. -    It is tied to a selected mailbox name that is passed to constructor. +    It is tied to a selected mailbox name that is passed to its constructor.      Implements a filter query over the messages contained in a soledad      database.      """ @@ -1058,7 +1115,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser,      _hdocset_lock = threading.Lock()      _hdocset_property_lock = threading.Lock() -    def __init__(self, mbox=None, soledad=None): +    def __init__(self, mbox=None, soledad=None, memstore=None):          """          Constructor for MessageCollection. @@ -1068,13 +1125,18 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser,          MessageCollection for each mailbox, instead of treating them          as a property of each message. +        We are passed an instance of MemoryStore, the same for the +        SoledadBackedAccount, that we use as a read cache and a buffer +        for writes. +          :param mbox: the name of the mailbox. It is the name                       with which we filter the query over the -                     messages database +                     messages database.          :type mbox: str -          :param soledad: Soledad database          :type soledad: Soledad instance +        :param memstore: a MemoryStore instance +        :type memstore: MemoryStore          """          MailParser.__init__(self)          leap_assert(mbox, "Need a mailbox name to initialize") @@ -1086,6 +1148,8 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser,          # okay, all in order, keep going...          self.mbox = self._parse_mailbox_name(mbox)          self._soledad = soledad +        self._memstore = memstore +          self.__rflags = None          self.__hdocset = None          self.initialize_db() @@ -1241,6 +1305,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser,          # when all the processing is done.          # TODO add the linked-from info ! +        # TODO add reference to the original message          logger.debug('adding message')          if flags is None: @@ -1273,24 +1338,29 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser,              hd[key] = parts_map[key]          del parts_map -        # Saving ---------------------------------------- -        self.set_recent_flag(uid) +        # The MessageContainer expects a dict, zero-indexed +        # XXX review-me +        cdocs = dict((index, doc) for index, doc in +                     enumerate(walk.get_raw_docs(msg, parts))) +        print "cdocs is", cdocs -        # first, regular docs: flags and headers -        self._soledad.create_doc(fd) +        # Saving ----------------------------------------          # XXX should check for content duplication on headers too          # but with chash. !!! -        hdoc = self._soledad.create_doc(hd) + +        # XXX adapt hdocset to use memstore +        #hdoc = self._soledad.create_doc(hd)          # We add the newly created hdoc to the fast-access set of          # headers documents associated with the mailbox. -        self.add_hdocset_docid(hdoc.doc_id) +        #self.add_hdocset_docid(hdoc.doc_id) -        # and last, but not least, try to create -        # content docs if not already there. -        cdocs = walk.get_raw_docs(msg, parts) -        for cdoc in cdocs: -            if not self._content_does_exist(cdoc): -                self._soledad.create_doc(cdoc) +        # XXX move to memory store too +        # self.set_recent_flag(uid) + +        # TODO ---- add reference to original doc, to be deleted +        # after writes are done. +        msg_container = MessageDict(fd, hd, cdocs) +        self._memstore.put(self.mbox, uid, msg_container)      def _remove_cb(self, result):          return result @@ -1321,6 +1391,8 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser,      #      # recent flags +    # XXX FIXME ------------------------------------- +    # This should be rewritten to use memory store.      def _get_recent_flags(self):          """ @@ -1390,6 +1462,9 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser,      # headers-docs-set +    # XXX FIXME ------------------------------------- +    # This should be rewritten to use memory store. +      def _get_hdocset(self):          """          An accessor for the hdocs-set for this mailbox. @@ -1532,7 +1607,16 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser,                   or None if not found.          :rtype: LeapMessage          """ -        msg = LeapMessage(self._soledad, uid, self.mbox, collection=self) +        print "getting msg by id!" +        msg_container = self._memstore.get(self.mbox, uid) +        print "msg container", msg_container +        if msg_container is not None: +            print "getting LeapMessage (from memstore)" +            msg = LeapMessage(None, uid, self.mbox, collection=self, +                              container=msg_container) +            print "got msg:", msg +        else: +            msg = LeapMessage(self._soledad, uid, self.mbox, collection=self)          if not msg.does_exist():              return None          return msg @@ -1570,11 +1654,19 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser,          ascending order.          """          # XXX we should get this from the uid table, local-only -        all_uids = (doc.content[self.UID_KEY] for doc in -                    self._soledad.get_from_index( -                        fields.TYPE_MBOX_IDX, -                        fields.TYPE_FLAGS_VAL, self.mbox)) -        return (u for u in sorted(all_uids)) +        # XXX FIXME ------------- +        # This should be cached in the memstoretoo +        db_uids = set([doc.content[self.UID_KEY] for doc in +                       self._soledad.get_from_index( +                           fields.TYPE_MBOX_IDX, +                           fields.TYPE_FLAGS_VAL, self.mbox)]) +        if self._memstore is not None: +            mem_uids = self._memstore.get_uids(self.mbox) +            uids = db_uids.union(set(mem_uids)) +        else: +            uids = db_uids + +        return (u for u in sorted(uids))      def reset_last_uid(self, param):          """ @@ -1592,12 +1684,21 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser,          """          Return a dict with all flags documents for this mailbox.          """ +        # XXX get all from memstore and cahce it there          all_flags = dict(((              doc.content[self.UID_KEY],              doc.content[self.FLAGS_KEY]) for doc in              self._soledad.get_from_index(                  fields.TYPE_MBOX_IDX,                  fields.TYPE_FLAGS_VAL, self.mbox))) +        if self._memstore is not None: +            # XXX +            uids = self._memstore.get_uids(self.mbox) +            fdocs = [(uid, self._memstore.get(self.mbox, uid).fdoc) +                     for uid in uids] +            for uid, doc in fdocs: +                all_flags[uid] = doc.content[self.FLAGS_KEY] +          return all_flags      def all_flags_chash(self): @@ -1630,9 +1731,12 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser,          :rtype: int          """ +        # XXX We could cache this in memstore too until next write...          count = self._soledad.get_count_from_index(              fields.TYPE_MBOX_IDX,              fields.TYPE_FLAGS_VAL, self.mbox) +        if self._memstore is not None: +            count += self._memstore.count_new()          return count      # unseen messages diff --git a/mail/src/leap/mail/imap/service/imap.py b/mail/src/leap/mail/imap/service/imap.py index ad22da6..71b9950 100644 --- a/mail/src/leap/mail/imap/service/imap.py +++ b/mail/src/leap/mail/imap/service/imap.py @@ -36,6 +36,7 @@ from leap.common.check import leap_assert, leap_assert_type, leap_check  from leap.keymanager import KeyManager  from leap.mail.imap.account import SoledadBackedAccount  from leap.mail.imap.fetch import LeapIncomingMail +from leap.mail.imap.memorystore import MemoryStore  from leap.soledad.client import Soledad  # The default port in which imap service will run @@ -69,6 +70,8 @@ except Exception:  ###################################################### +# TODO move this to imap.server +  class LeapIMAPServer(imap4.IMAP4Server):      """      An IMAP4 Server with mailboxes backed by soledad @@ -256,11 +259,15 @@ class LeapIMAPFactory(ServerFactory):          self._uuid = uuid          self._userid = userid          self._soledad = soledad +        self._memstore = MemoryStore()          theAccount = SoledadBackedAccount( -            uuid, soledad=soledad) +            uuid, soledad=soledad, +            memstore=self._memstore)          self.theAccount = theAccount +        # XXX how to pass the store along? +      def buildProtocol(self, addr):          "Return a protocol suitable for the job."          imapProtocol = LeapIMAPServer( @@ -323,3 +330,14 @@ def run_service(*args, **kwargs):      # not ok, signal error.      leap_events.signal(IMAP_SERVICE_FAILED_TO_START, str(port)) + +    def checkpoint(self): +        """ +        Called when the client issues a CHECK command. + +        This should perform any checkpoint operations required by the server. +        It may be a long running operation, but may not block.  If it returns +        a deferred, the client will only be informed of success (or failure) +        when the deferred's callback (or errback) is invoked. +        """ +        return None diff --git a/mail/src/leap/mail/messageflow.py b/mail/src/leap/mail/messageflow.py index ac26e45..ed6abcd 100644 --- a/mail/src/leap/mail/messageflow.py +++ b/mail/src/leap/mail/messageflow.py @@ -25,12 +25,15 @@ from zope.interface import Interface, implements  class IMessageConsumer(Interface): +    """ +    I consume messages from a queue. +    """      def consume(self, queue):          """          Consumes the passed item. -        :param item: q queue where we put the object to be consumed. +        :param item: a queue where we put the object to be consumed.          :type item: object          """          # TODO we could add an optional type to be passed @@ -40,6 +43,28 @@ class IMessageConsumer(Interface):          # the queue, maybe wrapped in an object with a retries attribute. +class IMessageProducer(Interface): +    """ +    I produce messages and put them in a store to be consumed by other +    entities. +    """ + +    def push(self, item): +        """ +        Push a new item in the queue. +        """ + +    def start(self): +        """ +        Start producing items. +        """ + +    def stop(self): +        """ +        Stop producing items. +        """ + +  class DummyMsgConsumer(object):      implements(IMessageConsumer) @@ -62,6 +87,8 @@ class MessageProducer(object):      deferred chain and leave further processing detached from the calling loop,      as in the case of smtp.      """ +    implements(IMessageProducer) +      # TODO this can be seen as a first step towards properly implementing      # components that implement IPushProducer / IConsumer  interfaces.      # However, I need to think more about how to pause the streaming. @@ -92,7 +119,7 @@ class MessageProducer(object):      def _check_for_new(self):          """ -        Checks for new items in the internal queue, and calls the consume +        Check for new items in the internal queue, and calls the consume          method in the consumer.          If the queue is found empty, the loop is stopped. It will be started @@ -102,11 +129,11 @@ class MessageProducer(object):          if self._queue.empty():              self.stop() -    # public methods +    # public methods: IMessageProducer -    def put(self, item): +    def push(self, item):          """ -        Puts a new item in the queue. +        Push a new item in the queue.          If the queue was empty, we will start the loop again.          """ @@ -117,7 +144,7 @@ class MessageProducer(object):      def start(self):          """ -        Starts polling for new items. +        Start polling for new items.          """          if not self._loop.running:              self._loop.start(self._period, now=True) diff --git a/mail/src/leap/mail/size.py b/mail/src/leap/mail/size.py new file mode 100644 index 0000000..4880d71 --- /dev/null +++ b/mail/src/leap/mail/size.py @@ -0,0 +1,57 @@ +# -*- coding: utf-8 -*- +# size.py +# Copyright (C) 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program.  If not, see <http://www.gnu.org/licenses/>. +""" +Recursively get size of objects. +""" +from gc import collect +from itertools import chain +from sys import getsizeof + + +def _get_size(item, seen): +    known_types = {dict: lambda d: chain.from_iterable(d.items())} +    default_size = getsizeof(0) + +    def size_walk(item): +        if id(item) in seen: +            return 0 +        seen.add(id(item)) +        s = getsizeof(item, default_size) +        for _type, fun in known_types.iteritems(): +            if isinstance(item, _type): +                s += sum(map(size_walk, fun(item))) +                break +        return s + +    return size_walk(item) + + +def get_size(item): +    """ +    Return the cumulative size of a given object. + +    Currently it supports only dictionaries, and seemingly leaks +    some memory, so use with care. + +    :param item: the item which size wants to be computed +    """ +    seen = set() +    size = _get_size(item, seen) +    #print "len(seen) ", len(seen) +    del seen +    collect() +    return size | 
