summaryrefslogtreecommitdiff
path: root/mail
diff options
context:
space:
mode:
authorKali Kaneko <kali@leap.se>2014-01-21 19:22:09 -0400
committerKali Kaneko <kali@leap.se>2014-01-28 19:38:09 -0400
commit24dda6a4a30c9afef3be475d2bd91c8a1ba8ad6b (patch)
treec10a7baabdac1f9400084ef8bfce46175503568c /mail
parenta25a93d99d9c10b49279ec5b3568639db8ead67a (diff)
memory store for append/fetch/copy
Diffstat (limited to 'mail')
-rw-r--r--mail/src/leap/mail/imap/account.py13
-rw-r--r--mail/src/leap/mail/imap/interfaces.py93
-rw-r--r--mail/src/leap/mail/imap/mailbox.py62
-rw-r--r--mail/src/leap/mail/imap/memorystore.py478
-rw-r--r--mail/src/leap/mail/imap/messages.py206
-rw-r--r--mail/src/leap/mail/imap/service/imap.py20
-rw-r--r--mail/src/leap/mail/messageflow.py39
-rw-r--r--mail/src/leap/mail/size.py57
8 files changed, 884 insertions, 84 deletions
diff --git a/mail/src/leap/mail/imap/account.py b/mail/src/leap/mail/imap/account.py
index ce83079..7641ea8 100644
--- a/mail/src/leap/mail/imap/account.py
+++ b/mail/src/leap/mail/imap/account.py
@@ -48,7 +48,7 @@ class SoledadBackedAccount(WithMsgFields, IndexedDB, MBoxParser):
selected = None
closed = False
- def __init__(self, account_name, soledad=None):
+ def __init__(self, account_name, soledad=None, memstore=None):
"""
Creates a SoledadAccountIndex that keeps track of the mailboxes
and subscriptions handled by this account.
@@ -57,7 +57,9 @@ class SoledadBackedAccount(WithMsgFields, IndexedDB, MBoxParser):
:type acct_name: str
:param soledad: a Soledad instance.
- :param soledad: Soledad
+ :type soledad: Soledad
+ :param memstore: a MemoryStore instance.
+ :type memstore: MemoryStore
"""
leap_assert(soledad, "Need a soledad instance to initialize")
leap_assert_type(soledad, Soledad)
@@ -67,6 +69,7 @@ class SoledadBackedAccount(WithMsgFields, IndexedDB, MBoxParser):
self._account_name = self._parse_mailbox_name(account_name)
self._soledad = soledad
+ self._memstore = memstore
self.initialize_db()
@@ -131,7 +134,8 @@ class SoledadBackedAccount(WithMsgFields, IndexedDB, MBoxParser):
if name not in self.mailboxes:
raise imap4.MailboxException("No such mailbox: %r" % name)
- return SoledadMailbox(name, soledad=self._soledad)
+ return SoledadMailbox(name, soledad=self._soledad,
+ memstore=self._memstore)
##
## IAccount
@@ -221,8 +225,7 @@ class SoledadBackedAccount(WithMsgFields, IndexedDB, MBoxParser):
self.selected = name
return SoledadMailbox(
- name, rw=readwrite,
- soledad=self._soledad)
+ name, self._soledad, self._memstore, readwrite)
def delete(self, name, force=False):
"""
diff --git a/mail/src/leap/mail/imap/interfaces.py b/mail/src/leap/mail/imap/interfaces.py
new file mode 100644
index 0000000..585165a
--- /dev/null
+++ b/mail/src/leap/mail/imap/interfaces.py
@@ -0,0 +1,93 @@
+# -*- coding: utf-8 -*-
+# interfaces.py
+# Copyright (C) 2014 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Interfaces for the IMAP module.
+"""
+from zope.interface import Interface, Attribute
+
+
+class IMessageContainer(Interface):
+ """
+ I am a container around the different documents that a message
+ is split into.
+ """
+ fdoc = Attribute('The flags document for this message, if any.')
+ hdoc = Attribute('The headers document for this message, if any.')
+ cdocs = Attribute('The dict of content documents for this message, '
+ 'if any.')
+
+ def walk(self):
+ """
+ Return an iterator to the docs for all the parts.
+
+ :rtype: iterator
+ """
+
+
+class IMessageStore(Interface):
+ """
+ I represent a generic storage for LEAP Messages.
+ """
+
+ def create_message(self, mbox, uid, message):
+ """
+ Put the passed message into this IMessageStore.
+
+ :param mbox: the mbox this message belongs.
+ :param uid: the UID that identifies this message in this mailbox.
+ :param message: a IMessageContainer implementor.
+ """
+
+ def put_message(self, mbox, uid, message):
+ """
+ Put the passed message into this IMessageStore.
+
+ :param mbox: the mbox this message belongs.
+ :param uid: the UID that identifies this message in this mailbox.
+ :param message: a IMessageContainer implementor.
+ """
+
+ def remove_message(self, mbox, uid):
+ """
+ Remove the given message from this IMessageStore.
+
+ :param mbox: the mbox this message belongs.
+ :param uid: the UID that identifies this message in this mailbox.
+ """
+
+ def get_message(self, mbox, uid):
+ """
+ Get a IMessageContainer for the given mbox and uid combination.
+
+ :param mbox: the mbox this message belongs.
+ :param uid: the UID that identifies this message in this mailbox.
+ """
+
+
+class IMessageStoreWriter(Interface):
+ """
+ I represent a storage that is able to write its contents to another
+ different IMessageStore.
+ """
+
+ def write_messages(self, store):
+ """
+ Write the documents in this IMessageStore to a different
+ storage. Usually this will be done from a MemoryStorage to a DbStorage.
+
+ :param store: another IMessageStore implementor.
+ """
diff --git a/mail/src/leap/mail/imap/mailbox.py b/mail/src/leap/mail/imap/mailbox.py
index 0131ce0..9babe6b 100644
--- a/mail/src/leap/mail/imap/mailbox.py
+++ b/mail/src/leap/mail/imap/mailbox.py
@@ -37,6 +37,7 @@ from leap.common.events.events_pb2 import IMAP_UNREAD_MAIL
from leap.common.check import leap_assert, leap_assert_type
from leap.mail.decorators import deferred
from leap.mail.imap.fields import WithMsgFields, fields
+from leap.mail.imap.memorystore import MessageDict
from leap.mail.imap.messages import MessageCollection
from leap.mail.imap.parser import MBoxParser
@@ -80,7 +81,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
next_uid_lock = threading.Lock()
- def __init__(self, mbox, soledad=None, rw=1):
+ def __init__(self, mbox, soledad, memstore, rw=1):
"""
SoledadMailbox constructor. Needs to get passed a name, plus a
Soledad instance.
@@ -91,9 +92,13 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
:param soledad: a Soledad instance.
:type soledad: Soledad
- :param rw: read-and-write flags
+ :param memstore: a MemoryStore instance
+ :type memstore: MemoryStore
+
+ :param rw: read-and-write flag for this mailbox
:type rw: int
"""
+ print "got memstore: ", memstore
leap_assert(mbox, "Need a mailbox name to initialize")
leap_assert(soledad, "Need a soledad instance to initialize")
@@ -105,9 +110,10 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
self.rw = rw
self._soledad = soledad
+ self._memstore = memstore
self.messages = MessageCollection(
- mbox=mbox, soledad=self._soledad)
+ mbox=mbox, soledad=self._soledad, memstore=self._memstore)
if not self.getFlags():
self.setFlags(self.INIT_FLAGS)
@@ -231,7 +237,10 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
# XXX It looks like it has been corrupted.
# We need to be able to survive this.
return None
- return mbox.content.get(self.LAST_UID_KEY, 1)
+ last = mbox.content.get(self.LAST_UID_KEY, 1)
+ if self._memstore:
+ last = max(last, self._memstore.get_last_uid(mbox))
+ return last
def _set_last_uid(self, uid):
"""
@@ -259,6 +268,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
value = count
mbox.content[key] = value
+ # XXX this should be set in the memorystore instead!!!
self._soledad.put_doc(mbox)
last_uid = property(
@@ -532,12 +542,17 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
# can treat them all the same.
# Change this to the flag that twisted expects when we
# switch to content-hash based index + local UID table.
+ print
+ print "FETCHING..."
sequence = False
#sequence = True if uid == 0 else False
messages_asked = self._bound_seq(messages_asked)
+ print "asked: ", messages_asked
seq_messg = self._filter_msg_seq(messages_asked)
+
+ print "seq: ", seq_messg
getmsg = lambda uid: self.messages.get_msg_by_uid(uid)
# for sequence numbers (uid = 0)
@@ -769,36 +784,41 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
uid_next = self.getUIDNext()
msg = messageObject
- # XXX DEBUG ----------------------------------------
- #print "copying MESSAGE from %s (%s) to %s (%s)" % (
- #msg._mbox, msg._uid, self.mbox, uid_next)
-
# XXX should use a public api instead
fdoc = msg._fdoc
+ hdoc = msg._hdoc
if not fdoc:
logger.debug("Tried to copy a MSG with no fdoc")
return
+ #old_mbox = fdoc.content[self.MBOX_KEY]
+ #old_uid = fdoc.content[self.UID_KEY]
+ #old_key = old_mbox, old_uid
+ #print "copying from OLD MBOX ", old_mbox
+
+ # XXX bit doubt... to duplicate in memory
+ # or not to...?
+ # I think it should be ok to duplicate as long as we're
+ # careful at the hour of writes...
+ # We could use also proxies, but it will break when
+ # the original mailbox is flushed.
+
+ # XXX DEBUG ----------------------------------------
+ #print "copying MESSAGE from %s (%s) to %s (%s)" % (
+ #msg._mbox, msg._uid, self.mbox, uid_next)
+
new_fdoc = copy.deepcopy(fdoc.content)
new_fdoc[self.UID_KEY] = uid_next
new_fdoc[self.MBOX_KEY] = self.mbox
- self._do_add_doc(new_fdoc)
+ self._memstore.put(self.mbox, uid_next, MessageDict(
+ new_fdoc, hdoc.content))
- # XXX should use a public api instead
- hdoc = msg._hdoc
- self.messages.add_hdocset_docid(hdoc.doc_id)
+ # XXX use memory store
+ if hasattr(hdoc, 'doc_id'):
+ self.messages.add_hdocset_docid(hdoc.doc_id)
deferLater(reactor, 1, self.notify_new)
- def _do_add_doc(self, doc):
- """
- Defer the adding of a new doc.
-
- :param doc: document to be created in soledad.
- :type doc: dict
- """
- self._soledad.create_doc(doc)
-
# convenience fun
def deleteAllDocs(self):
diff --git a/mail/src/leap/mail/imap/memorystore.py b/mail/src/leap/mail/imap/memorystore.py
new file mode 100644
index 0000000..b8829e0
--- /dev/null
+++ b/mail/src/leap/mail/imap/memorystore.py
@@ -0,0 +1,478 @@
+# -*- coding: utf-8 -*-
+# memorystore.py
+# Copyright (C) 2014 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+In-memory transient store for a LEAPIMAPServer.
+"""
+import contextlib
+import logging
+import weakref
+
+from collections import namedtuple
+
+from twisted.internet.task import LoopingCall
+from zope.interface import implements
+
+from leap.mail import size
+from leap.mail.messageflow import MessageProducer
+from leap.mail.messageparts import MessagePartType
+from leap.mail.imap import interfaces
+from leap.mail.imap.fields import fields
+
+logger = logging.getLogger(__name__)
+
+
+"""
+A MessagePartDoc is a light wrapper around the dictionary-like
+data that we pass along for message parts. It can be used almost everywhere
+that you would expect a SoledadDocument, since it has a dict under the
+`content` attribute.
+
+We also keep some metadata on it, relative in part to the message as a whole,
+and sometimes to a part in particular only.
+
+* `new` indicates that the document has just been created. SoledadStore
+ should just create a new doc for all the related message parts.
+* `store` indicates the type of store a given MessagePartDoc lives in.
+ We currently use this to indicate that the document comes from memeory,
+ but we should probably get rid of it as soon as we extend the use of the
+ SoledadStore interface along LeapMessage, MessageCollection and Mailbox.
+* `part` is one of the MessagePartType enums.
+
+* `dirty` indicates that, while we already have the document in Soledad,
+ we have modified its state in memory, so we need to put_doc instead while
+ dumping the MemoryStore contents.
+ `dirty` attribute would only apply to flags-docs and linkage-docs.
+
+
+ XXX this is still not implemented!
+
+"""
+
+MessagePartDoc = namedtuple(
+ 'MessagePartDoc',
+ ['new', 'dirty', 'part', 'store', 'content'])
+
+
+class ReferenciableDict(dict):
+ """
+ A dict that can be weak-referenced.
+
+ Some builtin objects are not weak-referenciable unless
+ subclassed. So we do.
+
+ Used to return pointers to the items in the MemoryStore.
+ """
+
+
+class MessageWrapper(object):
+ """
+ A simple nested dictionary container around the different message subparts.
+ """
+ implements(interfaces.IMessageContainer)
+
+ FDOC = "fdoc"
+ HDOC = "hdoc"
+ CDOCS = "cdocs"
+
+ # XXX can use this to limit the memory footprint,
+ # or is it too premature to optimize?
+ # Does it work well together with the interfaces.implements?
+
+ #__slots__ = ["_dict", "_new", "_dirty", "memstore"]
+
+ def __init__(self, fdoc=None, hdoc=None, cdocs=None,
+ from_dict=None, memstore=None,
+ new=True, dirty=False):
+ self._dict = {}
+
+ self._new = new
+ self._dirty = dirty
+ self.memstore = memstore
+
+ if from_dict is not None:
+ self.from_dict(from_dict)
+ else:
+ if fdoc is not None:
+ self._dict[self.FDOC] = ReferenciableDict(fdoc)
+ if hdoc is not None:
+ self._dict[self.HDOC] = ReferenciableDict(hdoc)
+ if cdocs is not None:
+ self._dict[self.CDOCS] = ReferenciableDict(cdocs)
+
+ # properties
+
+ @property
+ def new(self):
+ return self._new
+
+ def set_new(self, value=True):
+ self._new = value
+
+ @property
+ def dirty(self):
+ return self._dirty
+
+ def set_dirty(self, value=True):
+ self._dirty = value
+
+ # IMessageContainer
+
+ @property
+ def fdoc(self):
+ _fdoc = self._dict.get(self.FDOC, None)
+ if _fdoc:
+ content_ref = weakref.proxy(_fdoc)
+ else:
+ logger.warning("NO FDOC!!!")
+ content_ref = {}
+ return MessagePartDoc(new=self.new, dirty=self.dirty,
+ store=self._storetype,
+ part=MessagePartType.fdoc,
+ content=content_ref)
+
+ @property
+ def hdoc(self):
+ _hdoc = self._dict.get(self.HDOC, None)
+ if _hdoc:
+ content_ref = weakref.proxy(_hdoc)
+ else:
+ logger.warning("NO HDOC!!!!")
+ content_ref = {}
+ return MessagePartDoc(new=self.new, dirty=self.dirty,
+ store=self._storetype,
+ part=MessagePartType.hdoc,
+ content=content_ref)
+
+ @property
+ def cdocs(self):
+ _cdocs = self._dict.get(self.CDOCS, None)
+ if _cdocs:
+ return weakref.proxy(_cdocs)
+ else:
+ return {}
+
+ def walk(self):
+ """
+ Generator that iterates through all the parts, returning
+ MessagePartDoc.
+ """
+ yield self.fdoc
+ yield self.hdoc
+ for cdoc in self.cdocs.values():
+ # XXX this will break ----
+ content_ref = weakref.proxy(cdoc)
+ yield MessagePartDoc(new=self.new, dirty=self.dirty,
+ store=self._storetype,
+ part=MessagePartType.cdoc,
+ content=content_ref)
+
+ # i/o
+
+ def as_dict(self):
+ """
+ Return a dict representation of the parts contained.
+ """
+ return self._dict
+
+ def from_dict(self, msg_dict):
+ """
+ Populate MessageWrapper parts from a dictionary.
+ It expects the same format that we use in a
+ MessageWrapper.
+ """
+ fdoc, hdoc, cdocs = map(
+ lambda part: msg_dict.get(part, None),
+ [self.FDOC, self.HDOC, self.CDOCS])
+ self._dict[self.FDOC] = fdoc
+ self._dict[self.HDOC] = hdoc
+ self._dict[self.CDOCS] = cdocs
+
+
+@contextlib.contextmanager
+def set_bool_flag(obj, att):
+ """
+ Set a boolean flag to True while we're doing our thing.
+ Just to let the world know.
+ """
+ setattr(obj, att, True)
+ try:
+ yield True
+ except RuntimeError as exc:
+ logger.exception(exc)
+ finally:
+ setattr(obj, att, False)
+
+
+class MemoryStore(object):
+ """
+ An in-memory store to where we can write the different parts that
+ we split the messages into and buffer them until we write them to the
+ permanent storage.
+
+ It uses MessageWrapper instances to represent the message-parts, which are
+ indexed by mailbox name and UID.
+
+ It also can be passed a permanent storage as a paremeter (any implementor
+ of IMessageStore, in this case a SoledadStore). In this case, a periodic
+ dump of the messages stored in memory will be done. The period of the
+ writes to the permanent storage is controled by the write_period parameter
+ in the constructor.
+ """
+ implements(interfaces.IMessageStore)
+ implements(interfaces.IMessageStoreWriter)
+
+ producer = None
+
+ # TODO We will want to index by chash when we transition to local-only
+ # UIDs.
+ # TODO should store RECENT-FLAGS too
+ # TODO should store HDOCSET too (use weakrefs!) -- will need to subclass
+ # TODO do use dirty flag (maybe use namedtuples for that) so we can use it
+ # also as a read-cache.
+
+ WRITING_FLAG = "_writing"
+
+ def __init__(self, permanent_store=None, write_period=60):
+ """
+ Initialize a MemoryStore.
+
+ :param permanent_store: a IMessageStore implementor to dump
+ messages to.
+ :type permanent_store: IMessageStore
+ :param write_period: the interval to dump messages to disk, in seconds.
+ :type write_period: int
+ """
+ self._permanent_store = permanent_store
+ self._write_period = write_period
+
+ # Internal Storage
+ self._msg_store = {}
+ self._phash_store = {}
+
+ # TODO ----------------- implement mailbox-level flags store too! ----
+ self._rflags_store = {}
+ self._hdocset_store = {}
+ # TODO ----------------- implement mailbox-level flags store too! ----
+
+ # New and dirty flags, to set MessageWrapper State.
+ self._new = set([])
+ self._dirty = set([])
+
+ # Flag for signaling we're busy writing to the disk storage.
+ setattr(self, self.WRITING_FLAG, False)
+
+ if self._permanent_store is not None:
+ # this producer spits its messages to the permanent store
+ # consumer using a queue. We will use that to put
+ # our messages to be written.
+ self.producer = MessageProducer(permanent_store,
+ period=0.1)
+ # looping call for dumping to SoledadStore
+ self._write_loop = LoopingCall(self.write_messages,
+ permanent_store)
+
+ # We can start the write loop right now, why wait?
+ self._start_write_loop()
+
+ def _start_write_loop(self):
+ """
+ Start loop for writing to disk database.
+ """
+ if not self._write_loop.running:
+ self._write_loop.start(self._write_period, now=True)
+
+ def _stop_write_loop(self):
+ """
+ Stop loop for writing to disk database.
+ """
+ if self._write_loop.running:
+ self._write_loop.stop()
+
+ # IMessageStore
+
+ # XXX this would work well for whole message operations.
+ # We would have to add a put_flags operation to modify only
+ # the flags doc (and set the dirty flag accordingly)
+
+ def create_message(self, mbox, uid, message):
+ """
+ Create the passed message into this MemoryStore.
+
+ By default we consider that any message is a new message.
+ """
+ print "adding new doc to memstore %s (%s)" % (mbox, uid)
+ key = mbox, uid
+ self._new.add(key)
+
+ msg_dict = message.as_dict()
+ self._msg_store[key] = msg_dict
+
+ cdocs = message.cdocs
+
+ dirty = key in self._dirty
+ new = key in self._new
+
+ # XXX should capture this in log...
+
+ for cdoc_key in cdocs.keys():
+ print "saving cdoc"
+ cdoc = self._msg_store[key]['cdocs'][cdoc_key]
+
+ # XXX this should be done in the MessageWrapper constructor
+ # instead...
+ # first we make it weak-referenciable
+ referenciable_cdoc = ReferenciableDict(cdoc)
+ self._msg_store[key]['cdocs'][cdoc_key] = MessagePartDoc(
+ new=new, dirty=dirty, store="mem",
+ part=MessagePartType.cdoc,
+ content=referenciable_cdoc)
+ phash = cdoc.get(fields.PAYLOAD_HASH_KEY, None)
+ if not phash:
+ continue
+ self._phash_store[phash] = weakref.proxy(referenciable_cdoc)
+
+ def put_message(self, mbox, uid, msg):
+ """
+ Put an existing message.
+ """
+ return NotImplementedError()
+
+ def get_message(self, mbox, uid):
+ """
+ Get a MessageWrapper for the given mbox and uid combination.
+
+ :return: MessageWrapper or None
+ """
+ key = mbox, uid
+ msg_dict = self._msg_store.get(key, None)
+ if msg_dict:
+ new, dirty = self._get_new_dirty_state(key)
+ return MessageWrapper(from_dict=msg_dict,
+ memstore=weakref.proxy(self))
+ else:
+ return None
+
+ def remove_message(self, mbox, uid):
+ """
+ Remove a Message from this MemoryStore.
+ """
+ raise NotImplementedError()
+
+ # IMessageStoreWriter
+
+ def write_messages(self, store):
+ """
+ Write the message documents in this MemoryStore to a different store.
+ """
+ # XXX pass if it's writing (ie, the queue is not empty...)
+ # See how to make the writing_flag aware of the queue state...
+ print "writing messages to producer..."
+
+ with set_bool_flag(self, self.WRITING_FLAG):
+ for msg_wrapper in self.all_msg_iter():
+ self.producer.push(msg_wrapper)
+
+ # MemoryStore specific methods.
+
+ def get_uids(self, mbox):
+ """
+ Get all uids for a given mbox.
+ """
+ all_keys = self._msg_store.keys()
+ return [uid for m, uid in all_keys if m == mbox]
+
+ def get_last_uid(self, mbox):
+ """
+ Get the highest UID for a given mbox.
+ """
+ # XXX should get from msg_store keys instead!
+ if not self._new:
+ return 0
+ return max(self.get_uids(mbox))
+
+ def count_new_mbox(self, mbox):
+ """
+ Count the new messages by inbox.
+ """
+ return len([(m, uid) for m, uid in self._new if mbox == mbox])
+
+ def count_new(self):
+ """
+ Count all the new messages in the MemoryStore.
+ """
+ return len(self._new)
+
+ def get_by_phash(self, phash):
+ """
+ Return a content-document by its payload-hash.
+ """
+ doc = self._phash_store.get(phash, None)
+
+ # XXX have to keep a mapping between phash and its linkage
+ # info, to know if this payload is been already saved or not.
+ # We will be able to get this from the linkage-docs,
+ # not yet implemented.
+ new = True
+ dirty = False
+ return MessagePartDoc(
+ new=new, dirty=dirty, store="mem",
+ part=MessagePartType.cdoc,
+ content=doc)
+
+ def all_msg_iter(self):
+ """
+ Return generator that iterates through all messages in the store.
+ """
+ return (self.get_message(*key)
+ for key in sorted(self._msg_store.keys()))
+
+ def _get_new_dirty_state(self, key):
+ """
+ Return `new` and `dirty` flags for a given message.
+ """
+ return map(lambda _set: key in _set, (self._new, self._dirty))
+
+ @property
+ def is_writing(self):
+ """
+ Property that returns whether the store is currently writing its
+ internal state to a permanent storage.
+
+ Used to evaluate whether the CHECK command can inform that the field
+ is clear to proceed, or waiting for the write operations to complete
+ is needed instead.
+
+ :rtype: bool
+ """
+ # XXX this should probably return a deferred !!!
+ return getattr(self, self.WRITING_FLAG)
+
+ def put_part(self, part_type, value):
+ """
+ Put the passed part into this IMessageStore.
+ `part` should be one of: fdoc, hdoc, cdoc
+ """
+ # XXX turn that into a enum
+
+ # Memory management.
+
+ def get_size(self):
+ """
+ Return the size of the internal storage.
+ Use for calculating the limit beyond which we should flush the store.
+ """
+ return size.get_size(self._msg_store)
diff --git a/mail/src/leap/mail/imap/messages.py b/mail/src/leap/mail/imap/messages.py
index 34304ea..ef0b0a1 100644
--- a/mail/src/leap/mail/imap/messages.py
+++ b/mail/src/leap/mail/imap/messages.py
@@ -42,6 +42,7 @@ from leap.mail.utils import first, find_charset
from leap.mail.decorators import deferred
from leap.mail.imap.index import IndexedDB
from leap.mail.imap.fields import fields, WithMsgFields
+from leap.mail.imap.memorystore import MessageDict
from leap.mail.imap.parser import MailParser, MBoxParser
from leap.mail.messageflow import IMessageConsumer
@@ -49,11 +50,20 @@ logger = logging.getLogger(__name__)
# TODO ------------------------------------------------------------
+# [ ] Add ref to incoming message during add_msg
# [ ] Add linked-from info.
# [ ] Delete incoming mail only after successful write!
# [ ] Remove UID from syncable db. Store only those indexes locally.
+# XXX no longer needed, since i'm using proxies instead of direct weakrefs
+def maybe_call(thing):
+ """
+ Return the same thing, or the result of its invocation if it is a callable.
+ """
+ return thing() if callable(thing) else thing
+
+
def lowerdict(_dict):
"""
Return a dict with the keys in lowercase.
@@ -333,7 +343,7 @@ class LeapMessage(fields, MailParser, MBoxParser):
implements(imap4.IMessage)
- def __init__(self, soledad, uid, mbox, collection=None):
+ def __init__(self, soledad, uid, mbox, collection=None, container=None):
"""
Initializes a LeapMessage.
@@ -345,12 +355,15 @@ class LeapMessage(fields, MailParser, MBoxParser):
:type mbox: basestring
:param collection: a reference to the parent collection object
:type collection: MessageCollection
+ :param container: a IMessageContainer implementor instance
+ :type container: IMessageContainer
"""
MailParser.__init__(self)
self._soledad = soledad
self._uid = int(uid)
self._mbox = self._parse_mailbox_name(mbox)
self._collection = collection
+ self._container = container
self.__chash = None
self.__bdoc = None
@@ -361,13 +374,29 @@ class LeapMessage(fields, MailParser, MBoxParser):
An accessor to the flags document.
"""
if all(map(bool, (self._uid, self._mbox))):
- fdoc = self._get_flags_doc()
+ fdoc = None
+ if self._container is not None:
+ fdoc = self._container.fdoc
+ if not fdoc:
+ fdoc = self._get_flags_doc()
if fdoc:
- self.__chash = fdoc.content.get(
+ fdoc_content = maybe_call(fdoc.content)
+ self.__chash = fdoc_content.get(
fields.CONTENT_HASH_KEY, None)
return fdoc
@property
+ def _hdoc(self):
+ """
+ An accessor to the headers document.
+ """
+ if self._container is not None:
+ hdoc = self._container.hdoc
+ if hdoc:
+ return hdoc
+ return self._get_headers_doc()
+
+ @property
def _chash(self):
"""
An accessor to the content hash for this message.
@@ -375,18 +404,11 @@ class LeapMessage(fields, MailParser, MBoxParser):
if not self._fdoc:
return None
if not self.__chash and self._fdoc:
- self.__chash = self._fdoc.content.get(
+ self.__chash = maybe_call(self._fdoc.content).get(
fields.CONTENT_HASH_KEY, None)
return self.__chash
@property
- def _hdoc(self):
- """
- An accessor to the headers document.
- """
- return self._get_headers_doc()
-
- @property
def _bdoc(self):
"""
An accessor to the body document.
@@ -422,7 +444,7 @@ class LeapMessage(fields, MailParser, MBoxParser):
flags = []
fdoc = self._fdoc
if fdoc:
- flags = fdoc.content.get(self.FLAGS_KEY, None)
+ flags = maybe_call(fdoc.content).get(self.FLAGS_KEY, None)
msgcol = self._collection
@@ -449,6 +471,8 @@ class LeapMessage(fields, MailParser, MBoxParser):
:return: a SoledadDocument instance
:rtype: SoledadDocument
"""
+ # XXX use memory store ...!
+
leap_assert(isinstance(flags, tuple), "flags need to be a tuple")
log.msg('setting flags: %s (%s)' % (self._uid, flags))
@@ -461,7 +485,9 @@ class LeapMessage(fields, MailParser, MBoxParser):
doc.content[self.FLAGS_KEY] = flags
doc.content[self.SEEN_KEY] = self.SEEN_FLAG in flags
doc.content[self.DEL_KEY] = self.DELETED_FLAG in flags
- self._soledad.put_doc(doc)
+
+ if getattr(doc, 'store', None) != "mem":
+ self._soledad.put_doc(doc)
def addFlags(self, flags):
"""
@@ -521,18 +547,26 @@ class LeapMessage(fields, MailParser, MBoxParser):
"""
# TODO refactor with getBodyFile in MessagePart
fd = StringIO.StringIO()
- bdoc = self._bdoc
- if bdoc:
- body = self._bdoc.content.get(self.RAW_KEY, "")
- content_type = bdoc.content.get('content-type', "")
+ if self._bdoc is not None:
+ bdoc_content = self._bdoc.content
+ body = bdoc_content.get(self.RAW_KEY, "")
+ content_type = bdoc_content.get('content-type', "")
charset = find_charset(content_type)
+ logger.debug('got charset from content-type: %s' % charset)
if charset is None:
charset = self._get_charset(body)
try:
body = body.encode(charset)
except UnicodeError as e:
- logger.error("Unicode error, using 'replace'. {0!r}".format(e))
- body = body.encode(charset, 'replace')
+ logger.error("Unicode error {0}".format(e))
+ logger.debug("Attempted to encode with: %s" % charset)
+ try:
+ body = body.encode(charset, 'replace')
+ except UnicodeError as e:
+ try:
+ body = body.encode('utf-8', 'replace')
+ except:
+ pass
# We are still returning funky characters from here.
else:
@@ -567,7 +601,8 @@ class LeapMessage(fields, MailParser, MBoxParser):
"""
size = None
if self._fdoc:
- size = self._fdoc.content.get(self.SIZE_KEY, False)
+ fdoc_content = maybe_call(self._fdoc.content)
+ size = fdoc_content.get(self.SIZE_KEY, False)
else:
logger.warning("No FLAGS doc for %s:%s" % (self._mbox,
self._uid))
@@ -632,7 +667,8 @@ class LeapMessage(fields, MailParser, MBoxParser):
Return the headers dict for this message.
"""
if self._hdoc is not None:
- headers = self._hdoc.content.get(self.HEADERS_KEY, {})
+ hdoc_content = maybe_call(self._hdoc.content)
+ headers = hdoc_content.get(self.HEADERS_KEY, {})
return headers
else:
@@ -646,7 +682,8 @@ class LeapMessage(fields, MailParser, MBoxParser):
Return True if this message is multipart.
"""
if self._fdoc:
- is_multipart = self._fdoc.content.get(self.MULTIPART_KEY, False)
+ fdoc_content = maybe_call(self._fdoc.content)
+ is_multipart = fdoc_content.get(self.MULTIPART_KEY, False)
return is_multipart
else:
logger.warning(
@@ -688,7 +725,8 @@ class LeapMessage(fields, MailParser, MBoxParser):
logger.warning("Tried to get part but no HDOC found!")
return None
- pmap = self._hdoc.content.get(fields.PARTS_MAP_KEY, {})
+ hdoc_content = maybe_call(self._hdoc.content)
+ pmap = hdoc_content.get(fields.PARTS_MAP_KEY, {})
return pmap[str(part)]
def _get_flags_doc(self):
@@ -724,16 +762,33 @@ class LeapMessage(fields, MailParser, MBoxParser):
Return the document that keeps the body for this
message.
"""
- body_phash = self._hdoc.content.get(
+ hdoc_content = maybe_call(self._hdoc.content)
+ body_phash = hdoc_content.get(
fields.BODY_KEY, None)
if not body_phash:
logger.warning("No body phash for this document!")
return None
- body_docs = self._soledad.get_from_index(
- fields.TYPE_P_HASH_IDX,
- fields.TYPE_CONTENT_VAL, str(body_phash))
- return first(body_docs)
+ # XXX get from memstore too...
+ # if memstore: memstore.get_phrash
+ # memstore should keep a dict with weakrefs to the
+ # phash doc...
+
+ if self._container is not None:
+ bdoc = self._container.memstore.get_by_phash(body_phash)
+ if bdoc:
+ return bdoc
+ else:
+ print "no doc for that phash found!"
+
+ # no memstore or no doc found there
+ if self._soledad:
+ body_docs = self._soledad.get_from_index(
+ fields.TYPE_P_HASH_IDX,
+ fields.TYPE_CONTENT_VAL, str(body_phash))
+ return first(body_docs)
+ else:
+ logger.error("No phash in container, and no soledad found!")
def __getitem__(self, key):
"""
@@ -746,7 +801,7 @@ class LeapMessage(fields, MailParser, MBoxParser):
:return: The content value indexed by C{key} or None
:rtype: str
"""
- return self._fdoc.content.get(key, None)
+ return maybe_call(self._fdoc.content).get(key, None)
# setters
@@ -790,6 +845,8 @@ class LeapMessage(fields, MailParser, MBoxParser):
# until we think about a good way of deorphaning.
# Maybe a crawler of unreferenced docs.
+ # XXX remove from memory store!!!
+
# XXX implement elijah's idea of using a PUT document as a
# token to ensure consistency in the removal.
@@ -957,7 +1014,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser,
"""
A collection of messages, surprisingly.
- It is tied to a selected mailbox name that is passed to constructor.
+ It is tied to a selected mailbox name that is passed to its constructor.
Implements a filter query over the messages contained in a soledad
database.
"""
@@ -1058,7 +1115,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser,
_hdocset_lock = threading.Lock()
_hdocset_property_lock = threading.Lock()
- def __init__(self, mbox=None, soledad=None):
+ def __init__(self, mbox=None, soledad=None, memstore=None):
"""
Constructor for MessageCollection.
@@ -1068,13 +1125,18 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser,
MessageCollection for each mailbox, instead of treating them
as a property of each message.
+ We are passed an instance of MemoryStore, the same for the
+ SoledadBackedAccount, that we use as a read cache and a buffer
+ for writes.
+
:param mbox: the name of the mailbox. It is the name
with which we filter the query over the
- messages database
+ messages database.
:type mbox: str
-
:param soledad: Soledad database
:type soledad: Soledad instance
+ :param memstore: a MemoryStore instance
+ :type memstore: MemoryStore
"""
MailParser.__init__(self)
leap_assert(mbox, "Need a mailbox name to initialize")
@@ -1086,6 +1148,8 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser,
# okay, all in order, keep going...
self.mbox = self._parse_mailbox_name(mbox)
self._soledad = soledad
+ self._memstore = memstore
+
self.__rflags = None
self.__hdocset = None
self.initialize_db()
@@ -1241,6 +1305,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser,
# when all the processing is done.
# TODO add the linked-from info !
+ # TODO add reference to the original message
logger.debug('adding message')
if flags is None:
@@ -1273,24 +1338,29 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser,
hd[key] = parts_map[key]
del parts_map
- # Saving ----------------------------------------
- self.set_recent_flag(uid)
+ # The MessageContainer expects a dict, zero-indexed
+ # XXX review-me
+ cdocs = dict((index, doc) for index, doc in
+ enumerate(walk.get_raw_docs(msg, parts)))
+ print "cdocs is", cdocs
- # first, regular docs: flags and headers
- self._soledad.create_doc(fd)
+ # Saving ----------------------------------------
# XXX should check for content duplication on headers too
# but with chash. !!!
- hdoc = self._soledad.create_doc(hd)
+
+ # XXX adapt hdocset to use memstore
+ #hdoc = self._soledad.create_doc(hd)
# We add the newly created hdoc to the fast-access set of
# headers documents associated with the mailbox.
- self.add_hdocset_docid(hdoc.doc_id)
+ #self.add_hdocset_docid(hdoc.doc_id)
- # and last, but not least, try to create
- # content docs if not already there.
- cdocs = walk.get_raw_docs(msg, parts)
- for cdoc in cdocs:
- if not self._content_does_exist(cdoc):
- self._soledad.create_doc(cdoc)
+ # XXX move to memory store too
+ # self.set_recent_flag(uid)
+
+ # TODO ---- add reference to original doc, to be deleted
+ # after writes are done.
+ msg_container = MessageDict(fd, hd, cdocs)
+ self._memstore.put(self.mbox, uid, msg_container)
def _remove_cb(self, result):
return result
@@ -1321,6 +1391,8 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser,
#
# recent flags
+ # XXX FIXME -------------------------------------
+ # This should be rewritten to use memory store.
def _get_recent_flags(self):
"""
@@ -1390,6 +1462,9 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser,
# headers-docs-set
+ # XXX FIXME -------------------------------------
+ # This should be rewritten to use memory store.
+
def _get_hdocset(self):
"""
An accessor for the hdocs-set for this mailbox.
@@ -1532,7 +1607,16 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser,
or None if not found.
:rtype: LeapMessage
"""
- msg = LeapMessage(self._soledad, uid, self.mbox, collection=self)
+ print "getting msg by id!"
+ msg_container = self._memstore.get(self.mbox, uid)
+ print "msg container", msg_container
+ if msg_container is not None:
+ print "getting LeapMessage (from memstore)"
+ msg = LeapMessage(None, uid, self.mbox, collection=self,
+ container=msg_container)
+ print "got msg:", msg
+ else:
+ msg = LeapMessage(self._soledad, uid, self.mbox, collection=self)
if not msg.does_exist():
return None
return msg
@@ -1570,11 +1654,19 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser,
ascending order.
"""
# XXX we should get this from the uid table, local-only
- all_uids = (doc.content[self.UID_KEY] for doc in
- self._soledad.get_from_index(
- fields.TYPE_MBOX_IDX,
- fields.TYPE_FLAGS_VAL, self.mbox))
- return (u for u in sorted(all_uids))
+ # XXX FIXME -------------
+ # This should be cached in the memstoretoo
+ db_uids = set([doc.content[self.UID_KEY] for doc in
+ self._soledad.get_from_index(
+ fields.TYPE_MBOX_IDX,
+ fields.TYPE_FLAGS_VAL, self.mbox)])
+ if self._memstore is not None:
+ mem_uids = self._memstore.get_uids(self.mbox)
+ uids = db_uids.union(set(mem_uids))
+ else:
+ uids = db_uids
+
+ return (u for u in sorted(uids))
def reset_last_uid(self, param):
"""
@@ -1592,12 +1684,21 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser,
"""
Return a dict with all flags documents for this mailbox.
"""
+ # XXX get all from memstore and cahce it there
all_flags = dict(((
doc.content[self.UID_KEY],
doc.content[self.FLAGS_KEY]) for doc in
self._soledad.get_from_index(
fields.TYPE_MBOX_IDX,
fields.TYPE_FLAGS_VAL, self.mbox)))
+ if self._memstore is not None:
+ # XXX
+ uids = self._memstore.get_uids(self.mbox)
+ fdocs = [(uid, self._memstore.get(self.mbox, uid).fdoc)
+ for uid in uids]
+ for uid, doc in fdocs:
+ all_flags[uid] = doc.content[self.FLAGS_KEY]
+
return all_flags
def all_flags_chash(self):
@@ -1630,9 +1731,12 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser,
:rtype: int
"""
+ # XXX We could cache this in memstore too until next write...
count = self._soledad.get_count_from_index(
fields.TYPE_MBOX_IDX,
fields.TYPE_FLAGS_VAL, self.mbox)
+ if self._memstore is not None:
+ count += self._memstore.count_new()
return count
# unseen messages
diff --git a/mail/src/leap/mail/imap/service/imap.py b/mail/src/leap/mail/imap/service/imap.py
index ad22da6..71b9950 100644
--- a/mail/src/leap/mail/imap/service/imap.py
+++ b/mail/src/leap/mail/imap/service/imap.py
@@ -36,6 +36,7 @@ from leap.common.check import leap_assert, leap_assert_type, leap_check
from leap.keymanager import KeyManager
from leap.mail.imap.account import SoledadBackedAccount
from leap.mail.imap.fetch import LeapIncomingMail
+from leap.mail.imap.memorystore import MemoryStore
from leap.soledad.client import Soledad
# The default port in which imap service will run
@@ -69,6 +70,8 @@ except Exception:
######################################################
+# TODO move this to imap.server
+
class LeapIMAPServer(imap4.IMAP4Server):
"""
An IMAP4 Server with mailboxes backed by soledad
@@ -256,11 +259,15 @@ class LeapIMAPFactory(ServerFactory):
self._uuid = uuid
self._userid = userid
self._soledad = soledad
+ self._memstore = MemoryStore()
theAccount = SoledadBackedAccount(
- uuid, soledad=soledad)
+ uuid, soledad=soledad,
+ memstore=self._memstore)
self.theAccount = theAccount
+ # XXX how to pass the store along?
+
def buildProtocol(self, addr):
"Return a protocol suitable for the job."
imapProtocol = LeapIMAPServer(
@@ -323,3 +330,14 @@ def run_service(*args, **kwargs):
# not ok, signal error.
leap_events.signal(IMAP_SERVICE_FAILED_TO_START, str(port))
+
+ def checkpoint(self):
+ """
+ Called when the client issues a CHECK command.
+
+ This should perform any checkpoint operations required by the server.
+ It may be a long running operation, but may not block. If it returns
+ a deferred, the client will only be informed of success (or failure)
+ when the deferred's callback (or errback) is invoked.
+ """
+ return None
diff --git a/mail/src/leap/mail/messageflow.py b/mail/src/leap/mail/messageflow.py
index ac26e45..ed6abcd 100644
--- a/mail/src/leap/mail/messageflow.py
+++ b/mail/src/leap/mail/messageflow.py
@@ -25,12 +25,15 @@ from zope.interface import Interface, implements
class IMessageConsumer(Interface):
+ """
+ I consume messages from a queue.
+ """
def consume(self, queue):
"""
Consumes the passed item.
- :param item: q queue where we put the object to be consumed.
+ :param item: a queue where we put the object to be consumed.
:type item: object
"""
# TODO we could add an optional type to be passed
@@ -40,6 +43,28 @@ class IMessageConsumer(Interface):
# the queue, maybe wrapped in an object with a retries attribute.
+class IMessageProducer(Interface):
+ """
+ I produce messages and put them in a store to be consumed by other
+ entities.
+ """
+
+ def push(self, item):
+ """
+ Push a new item in the queue.
+ """
+
+ def start(self):
+ """
+ Start producing items.
+ """
+
+ def stop(self):
+ """
+ Stop producing items.
+ """
+
+
class DummyMsgConsumer(object):
implements(IMessageConsumer)
@@ -62,6 +87,8 @@ class MessageProducer(object):
deferred chain and leave further processing detached from the calling loop,
as in the case of smtp.
"""
+ implements(IMessageProducer)
+
# TODO this can be seen as a first step towards properly implementing
# components that implement IPushProducer / IConsumer interfaces.
# However, I need to think more about how to pause the streaming.
@@ -92,7 +119,7 @@ class MessageProducer(object):
def _check_for_new(self):
"""
- Checks for new items in the internal queue, and calls the consume
+ Check for new items in the internal queue, and calls the consume
method in the consumer.
If the queue is found empty, the loop is stopped. It will be started
@@ -102,11 +129,11 @@ class MessageProducer(object):
if self._queue.empty():
self.stop()
- # public methods
+ # public methods: IMessageProducer
- def put(self, item):
+ def push(self, item):
"""
- Puts a new item in the queue.
+ Push a new item in the queue.
If the queue was empty, we will start the loop again.
"""
@@ -117,7 +144,7 @@ class MessageProducer(object):
def start(self):
"""
- Starts polling for new items.
+ Start polling for new items.
"""
if not self._loop.running:
self._loop.start(self._period, now=True)
diff --git a/mail/src/leap/mail/size.py b/mail/src/leap/mail/size.py
new file mode 100644
index 0000000..4880d71
--- /dev/null
+++ b/mail/src/leap/mail/size.py
@@ -0,0 +1,57 @@
+# -*- coding: utf-8 -*-
+# size.py
+# Copyright (C) 2014 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Recursively get size of objects.
+"""
+from gc import collect
+from itertools import chain
+from sys import getsizeof
+
+
+def _get_size(item, seen):
+ known_types = {dict: lambda d: chain.from_iterable(d.items())}
+ default_size = getsizeof(0)
+
+ def size_walk(item):
+ if id(item) in seen:
+ return 0
+ seen.add(id(item))
+ s = getsizeof(item, default_size)
+ for _type, fun in known_types.iteritems():
+ if isinstance(item, _type):
+ s += sum(map(size_walk, fun(item)))
+ break
+ return s
+
+ return size_walk(item)
+
+
+def get_size(item):
+ """
+ Return the cumulative size of a given object.
+
+ Currently it supports only dictionaries, and seemingly leaks
+ some memory, so use with care.
+
+ :param item: the item which size wants to be computed
+ """
+ seen = set()
+ size = _get_size(item, seen)
+ #print "len(seen) ", len(seen)
+ del seen
+ collect()
+ return size