summaryrefslogtreecommitdiff
path: root/src/leap/mail/imap/memorystore.py
diff options
context:
space:
mode:
authorKali Kaneko <kali@leap.se>2014-01-21 19:22:09 -0400
committerKali Kaneko <kali@leap.se>2014-01-28 19:38:09 -0400
commit4ae6ad57a0f80143e3ded867c1fdd2264804a775 (patch)
treecb3d3a7ad91af220a292cf8ab3de332b6138ab32 /src/leap/mail/imap/memorystore.py
parent781bd2f4d2a047088d1a0ecd673a38c80ea0c0c0 (diff)
memory store for append/fetch/copy
Diffstat (limited to 'src/leap/mail/imap/memorystore.py')
-rw-r--r--src/leap/mail/imap/memorystore.py478
1 files changed, 478 insertions, 0 deletions
diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py
new file mode 100644
index 0000000..b8829e0
--- /dev/null
+++ b/src/leap/mail/imap/memorystore.py
@@ -0,0 +1,478 @@
+# -*- coding: utf-8 -*-
+# memorystore.py
+# Copyright (C) 2014 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+In-memory transient store for a LEAPIMAPServer.
+"""
+import contextlib
+import logging
+import weakref
+
+from collections import namedtuple
+
+from twisted.internet.task import LoopingCall
+from zope.interface import implements
+
+from leap.mail import size
+from leap.mail.messageflow import MessageProducer
+from leap.mail.messageparts import MessagePartType
+from leap.mail.imap import interfaces
+from leap.mail.imap.fields import fields
+
+logger = logging.getLogger(__name__)
+
+
+"""
+A MessagePartDoc is a light wrapper around the dictionary-like
+data that we pass along for message parts. It can be used almost everywhere
+that you would expect a SoledadDocument, since it has a dict under the
+`content` attribute.
+
+We also keep some metadata on it, relative in part to the message as a whole,
+and sometimes to a part in particular only.
+
+* `new` indicates that the document has just been created. SoledadStore
+ should just create a new doc for all the related message parts.
+* `store` indicates the type of store a given MessagePartDoc lives in.
+ We currently use this to indicate that the document comes from memeory,
+ but we should probably get rid of it as soon as we extend the use of the
+ SoledadStore interface along LeapMessage, MessageCollection and Mailbox.
+* `part` is one of the MessagePartType enums.
+
+* `dirty` indicates that, while we already have the document in Soledad,
+ we have modified its state in memory, so we need to put_doc instead while
+ dumping the MemoryStore contents.
+ `dirty` attribute would only apply to flags-docs and linkage-docs.
+
+
+ XXX this is still not implemented!
+
+"""
+
+MessagePartDoc = namedtuple(
+ 'MessagePartDoc',
+ ['new', 'dirty', 'part', 'store', 'content'])
+
+
+class ReferenciableDict(dict):
+ """
+ A dict that can be weak-referenced.
+
+ Some builtin objects are not weak-referenciable unless
+ subclassed. So we do.
+
+ Used to return pointers to the items in the MemoryStore.
+ """
+
+
+class MessageWrapper(object):
+ """
+ A simple nested dictionary container around the different message subparts.
+ """
+ implements(interfaces.IMessageContainer)
+
+ FDOC = "fdoc"
+ HDOC = "hdoc"
+ CDOCS = "cdocs"
+
+ # XXX can use this to limit the memory footprint,
+ # or is it too premature to optimize?
+ # Does it work well together with the interfaces.implements?
+
+ #__slots__ = ["_dict", "_new", "_dirty", "memstore"]
+
+ def __init__(self, fdoc=None, hdoc=None, cdocs=None,
+ from_dict=None, memstore=None,
+ new=True, dirty=False):
+ self._dict = {}
+
+ self._new = new
+ self._dirty = dirty
+ self.memstore = memstore
+
+ if from_dict is not None:
+ self.from_dict(from_dict)
+ else:
+ if fdoc is not None:
+ self._dict[self.FDOC] = ReferenciableDict(fdoc)
+ if hdoc is not None:
+ self._dict[self.HDOC] = ReferenciableDict(hdoc)
+ if cdocs is not None:
+ self._dict[self.CDOCS] = ReferenciableDict(cdocs)
+
+ # properties
+
+ @property
+ def new(self):
+ return self._new
+
+ def set_new(self, value=True):
+ self._new = value
+
+ @property
+ def dirty(self):
+ return self._dirty
+
+ def set_dirty(self, value=True):
+ self._dirty = value
+
+ # IMessageContainer
+
+ @property
+ def fdoc(self):
+ _fdoc = self._dict.get(self.FDOC, None)
+ if _fdoc:
+ content_ref = weakref.proxy(_fdoc)
+ else:
+ logger.warning("NO FDOC!!!")
+ content_ref = {}
+ return MessagePartDoc(new=self.new, dirty=self.dirty,
+ store=self._storetype,
+ part=MessagePartType.fdoc,
+ content=content_ref)
+
+ @property
+ def hdoc(self):
+ _hdoc = self._dict.get(self.HDOC, None)
+ if _hdoc:
+ content_ref = weakref.proxy(_hdoc)
+ else:
+ logger.warning("NO HDOC!!!!")
+ content_ref = {}
+ return MessagePartDoc(new=self.new, dirty=self.dirty,
+ store=self._storetype,
+ part=MessagePartType.hdoc,
+ content=content_ref)
+
+ @property
+ def cdocs(self):
+ _cdocs = self._dict.get(self.CDOCS, None)
+ if _cdocs:
+ return weakref.proxy(_cdocs)
+ else:
+ return {}
+
+ def walk(self):
+ """
+ Generator that iterates through all the parts, returning
+ MessagePartDoc.
+ """
+ yield self.fdoc
+ yield self.hdoc
+ for cdoc in self.cdocs.values():
+ # XXX this will break ----
+ content_ref = weakref.proxy(cdoc)
+ yield MessagePartDoc(new=self.new, dirty=self.dirty,
+ store=self._storetype,
+ part=MessagePartType.cdoc,
+ content=content_ref)
+
+ # i/o
+
+ def as_dict(self):
+ """
+ Return a dict representation of the parts contained.
+ """
+ return self._dict
+
+ def from_dict(self, msg_dict):
+ """
+ Populate MessageWrapper parts from a dictionary.
+ It expects the same format that we use in a
+ MessageWrapper.
+ """
+ fdoc, hdoc, cdocs = map(
+ lambda part: msg_dict.get(part, None),
+ [self.FDOC, self.HDOC, self.CDOCS])
+ self._dict[self.FDOC] = fdoc
+ self._dict[self.HDOC] = hdoc
+ self._dict[self.CDOCS] = cdocs
+
+
+@contextlib.contextmanager
+def set_bool_flag(obj, att):
+ """
+ Set a boolean flag to True while we're doing our thing.
+ Just to let the world know.
+ """
+ setattr(obj, att, True)
+ try:
+ yield True
+ except RuntimeError as exc:
+ logger.exception(exc)
+ finally:
+ setattr(obj, att, False)
+
+
+class MemoryStore(object):
+ """
+ An in-memory store to where we can write the different parts that
+ we split the messages into and buffer them until we write them to the
+ permanent storage.
+
+ It uses MessageWrapper instances to represent the message-parts, which are
+ indexed by mailbox name and UID.
+
+ It also can be passed a permanent storage as a paremeter (any implementor
+ of IMessageStore, in this case a SoledadStore). In this case, a periodic
+ dump of the messages stored in memory will be done. The period of the
+ writes to the permanent storage is controled by the write_period parameter
+ in the constructor.
+ """
+ implements(interfaces.IMessageStore)
+ implements(interfaces.IMessageStoreWriter)
+
+ producer = None
+
+ # TODO We will want to index by chash when we transition to local-only
+ # UIDs.
+ # TODO should store RECENT-FLAGS too
+ # TODO should store HDOCSET too (use weakrefs!) -- will need to subclass
+ # TODO do use dirty flag (maybe use namedtuples for that) so we can use it
+ # also as a read-cache.
+
+ WRITING_FLAG = "_writing"
+
+ def __init__(self, permanent_store=None, write_period=60):
+ """
+ Initialize a MemoryStore.
+
+ :param permanent_store: a IMessageStore implementor to dump
+ messages to.
+ :type permanent_store: IMessageStore
+ :param write_period: the interval to dump messages to disk, in seconds.
+ :type write_period: int
+ """
+ self._permanent_store = permanent_store
+ self._write_period = write_period
+
+ # Internal Storage
+ self._msg_store = {}
+ self._phash_store = {}
+
+ # TODO ----------------- implement mailbox-level flags store too! ----
+ self._rflags_store = {}
+ self._hdocset_store = {}
+ # TODO ----------------- implement mailbox-level flags store too! ----
+
+ # New and dirty flags, to set MessageWrapper State.
+ self._new = set([])
+ self._dirty = set([])
+
+ # Flag for signaling we're busy writing to the disk storage.
+ setattr(self, self.WRITING_FLAG, False)
+
+ if self._permanent_store is not None:
+ # this producer spits its messages to the permanent store
+ # consumer using a queue. We will use that to put
+ # our messages to be written.
+ self.producer = MessageProducer(permanent_store,
+ period=0.1)
+ # looping call for dumping to SoledadStore
+ self._write_loop = LoopingCall(self.write_messages,
+ permanent_store)
+
+ # We can start the write loop right now, why wait?
+ self._start_write_loop()
+
+ def _start_write_loop(self):
+ """
+ Start loop for writing to disk database.
+ """
+ if not self._write_loop.running:
+ self._write_loop.start(self._write_period, now=True)
+
+ def _stop_write_loop(self):
+ """
+ Stop loop for writing to disk database.
+ """
+ if self._write_loop.running:
+ self._write_loop.stop()
+
+ # IMessageStore
+
+ # XXX this would work well for whole message operations.
+ # We would have to add a put_flags operation to modify only
+ # the flags doc (and set the dirty flag accordingly)
+
+ def create_message(self, mbox, uid, message):
+ """
+ Create the passed message into this MemoryStore.
+
+ By default we consider that any message is a new message.
+ """
+ print "adding new doc to memstore %s (%s)" % (mbox, uid)
+ key = mbox, uid
+ self._new.add(key)
+
+ msg_dict = message.as_dict()
+ self._msg_store[key] = msg_dict
+
+ cdocs = message.cdocs
+
+ dirty = key in self._dirty
+ new = key in self._new
+
+ # XXX should capture this in log...
+
+ for cdoc_key in cdocs.keys():
+ print "saving cdoc"
+ cdoc = self._msg_store[key]['cdocs'][cdoc_key]
+
+ # XXX this should be done in the MessageWrapper constructor
+ # instead...
+ # first we make it weak-referenciable
+ referenciable_cdoc = ReferenciableDict(cdoc)
+ self._msg_store[key]['cdocs'][cdoc_key] = MessagePartDoc(
+ new=new, dirty=dirty, store="mem",
+ part=MessagePartType.cdoc,
+ content=referenciable_cdoc)
+ phash = cdoc.get(fields.PAYLOAD_HASH_KEY, None)
+ if not phash:
+ continue
+ self._phash_store[phash] = weakref.proxy(referenciable_cdoc)
+
+ def put_message(self, mbox, uid, msg):
+ """
+ Put an existing message.
+ """
+ return NotImplementedError()
+
+ def get_message(self, mbox, uid):
+ """
+ Get a MessageWrapper for the given mbox and uid combination.
+
+ :return: MessageWrapper or None
+ """
+ key = mbox, uid
+ msg_dict = self._msg_store.get(key, None)
+ if msg_dict:
+ new, dirty = self._get_new_dirty_state(key)
+ return MessageWrapper(from_dict=msg_dict,
+ memstore=weakref.proxy(self))
+ else:
+ return None
+
+ def remove_message(self, mbox, uid):
+ """
+ Remove a Message from this MemoryStore.
+ """
+ raise NotImplementedError()
+
+ # IMessageStoreWriter
+
+ def write_messages(self, store):
+ """
+ Write the message documents in this MemoryStore to a different store.
+ """
+ # XXX pass if it's writing (ie, the queue is not empty...)
+ # See how to make the writing_flag aware of the queue state...
+ print "writing messages to producer..."
+
+ with set_bool_flag(self, self.WRITING_FLAG):
+ for msg_wrapper in self.all_msg_iter():
+ self.producer.push(msg_wrapper)
+
+ # MemoryStore specific methods.
+
+ def get_uids(self, mbox):
+ """
+ Get all uids for a given mbox.
+ """
+ all_keys = self._msg_store.keys()
+ return [uid for m, uid in all_keys if m == mbox]
+
+ def get_last_uid(self, mbox):
+ """
+ Get the highest UID for a given mbox.
+ """
+ # XXX should get from msg_store keys instead!
+ if not self._new:
+ return 0
+ return max(self.get_uids(mbox))
+
+ def count_new_mbox(self, mbox):
+ """
+ Count the new messages by inbox.
+ """
+ return len([(m, uid) for m, uid in self._new if mbox == mbox])
+
+ def count_new(self):
+ """
+ Count all the new messages in the MemoryStore.
+ """
+ return len(self._new)
+
+ def get_by_phash(self, phash):
+ """
+ Return a content-document by its payload-hash.
+ """
+ doc = self._phash_store.get(phash, None)
+
+ # XXX have to keep a mapping between phash and its linkage
+ # info, to know if this payload is been already saved or not.
+ # We will be able to get this from the linkage-docs,
+ # not yet implemented.
+ new = True
+ dirty = False
+ return MessagePartDoc(
+ new=new, dirty=dirty, store="mem",
+ part=MessagePartType.cdoc,
+ content=doc)
+
+ def all_msg_iter(self):
+ """
+ Return generator that iterates through all messages in the store.
+ """
+ return (self.get_message(*key)
+ for key in sorted(self._msg_store.keys()))
+
+ def _get_new_dirty_state(self, key):
+ """
+ Return `new` and `dirty` flags for a given message.
+ """
+ return map(lambda _set: key in _set, (self._new, self._dirty))
+
+ @property
+ def is_writing(self):
+ """
+ Property that returns whether the store is currently writing its
+ internal state to a permanent storage.
+
+ Used to evaluate whether the CHECK command can inform that the field
+ is clear to proceed, or waiting for the write operations to complete
+ is needed instead.
+
+ :rtype: bool
+ """
+ # XXX this should probably return a deferred !!!
+ return getattr(self, self.WRITING_FLAG)
+
+ def put_part(self, part_type, value):
+ """
+ Put the passed part into this IMessageStore.
+ `part` should be one of: fdoc, hdoc, cdoc
+ """
+ # XXX turn that into a enum
+
+ # Memory management.
+
+ def get_size(self):
+ """
+ Return the size of the internal storage.
+ Use for calculating the limit beyond which we should flush the store.
+ """
+ return size.get_size(self._msg_store)