summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/leap/mail/adaptors/soledad.py13
-rw-r--r--src/leap/mail/constants.py14
-rw-r--r--src/leap/mail/imap/account.py306
-rw-r--r--src/leap/mail/imap/fields.py51
-rw-r--r--src/leap/mail/imap/interfaces.py96
-rw-r--r--src/leap/mail/imap/mailbox.py472
-rw-r--r--src/leap/mail/imap/memorystore.py1340
-rw-r--r--src/leap/mail/imap/messageparts.py586
-rw-r--r--src/leap/mail/imap/messages.py1007
-rw-r--r--src/leap/mail/imap/soledadstore.py617
-rw-r--r--src/leap/mail/mail.py93
-rw-r--r--src/leap/mail/messageflow.py200
12 files changed, 522 insertions, 4273 deletions
diff --git a/src/leap/mail/adaptors/soledad.py b/src/leap/mail/adaptors/soledad.py
index 0b97869..bf8f7e9 100644
--- a/src/leap/mail/adaptors/soledad.py
+++ b/src/leap/mail/adaptors/soledad.py
@@ -513,9 +513,13 @@ class MailboxWrapper(SoledadDocumentWrapper):
type_ = "mbox"
mbox = INBOX_NAME
flags = []
+ recent = []
+ created = 1
closed = False
subscribed = False
- rw = True
+
+ # I think we don't need to store this one.
+ # rw = True
class __meta__(object):
index = "mbox"
@@ -655,6 +659,7 @@ class SoledadMailAdaptor(SoledadIndexMixin):
assert(MessageClass is not None)
return MessageClass(MessageWrapper(mdoc, fdoc, hdoc, cdocs))
+ # XXX pass UID too?
def _get_msg_from_variable_doc_list(self, doc_list, msg_class):
if len(doc_list) == 2:
fdoc, hdoc = doc_list
@@ -664,12 +669,14 @@ class SoledadMailAdaptor(SoledadIndexMixin):
cdocs = dict(enumerate(doc_list[2:], 1))
return self.get_msg_from_docs(msg_class, fdoc, hdoc, cdocs)
+ # XXX pass UID too ?
def get_msg_from_mdoc_id(self, MessageClass, store, doc_id,
get_cdocs=False):
metamsg_id = doc_id
def wrap_meta_doc(doc):
cls = MetaMsgDocWrapper
+ # XXX pass UID?
return cls(doc_id=doc.doc_id, **doc.content)
def get_part_docs_from_mdoc_wrapper(wrapper):
@@ -692,8 +699,8 @@ class SoledadMailAdaptor(SoledadIndexMixin):
return constants.FDOCID.format(mbox=mbox, chash=chash)
d_docs = []
- fdoc_id = _get_fdoc_id_from_mdoc_id(doc_id)
- hdoc_id = _get_hdoc_id_from_mdoc_id(doc_id)
+ fdoc_id = _get_fdoc_id_from_mdoc_id()
+ hdoc_id = _get_hdoc_id_from_mdoc_id()
d_docs.append(store.get_doc(fdoc_id))
d_docs.append(store.get_doc(hdoc_id))
d = defer.gatherResults(d_docs)
diff --git a/src/leap/mail/constants.py b/src/leap/mail/constants.py
index bf1db7f..d76e652 100644
--- a/src/leap/mail/constants.py
+++ b/src/leap/mail/constants.py
@@ -36,3 +36,17 @@ HDOCID_RE = "H\-[0-9a-fA-F]+"
CDOCID = "C-{phash}"
CDOCID_RE = "C\-[0-9a-fA-F]+"
+
+
+class MessageFlags(object):
+ """
+ Flags used in Message and Mailbox.
+ """
+ SEEN_FLAG = "\\Seen"
+ RECENT_FLAG = "\\Recent"
+ ANSWERED_FLAG = "\\Answered"
+ FLAGGED_FLAG = "\\Flagged" # yo dawg
+ DELETED_FLAG = "\\Deleted"
+ DRAFT_FLAG = "\\Draft"
+ NOSELECT_FLAG = "\\Noselect"
+ LIST_FLAG = "List" # is this OK? (no \. ie, no system flag)
diff --git a/src/leap/mail/imap/account.py b/src/leap/mail/imap/account.py
index 7dfbbd1..0baf078 100644
--- a/src/leap/mail/imap/account.py
+++ b/src/leap/mail/imap/account.py
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
# account.py
-# Copyright (C) 2013 LEAP
+# Copyright (C) 2013-2015 LEAP
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -15,12 +15,12 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
-Soledad Backed Account.
+Soledad Backed IMAP Account.
"""
-import copy
import logging
import os
import time
+from functools import partial
from twisted.internet import defer
from twisted.mail import imap4
@@ -29,9 +29,9 @@ from zope.interface import implements
from leap.common.check import leap_assert, leap_assert_type
+from leap.mail.constants import MessageFlags
from leap.mail.mail import Account
-from leap.mail.imap.fields import WithMsgFields
-from leap.mail.imap.mailbox import SoledadMailbox, normalize_mailbox
+from leap.mail.imap.mailbox import IMAPMailbox, normalize_mailbox
from leap.soledad.client import Soledad
logger = logging.getLogger(__name__)
@@ -49,9 +49,10 @@ if PROFILE_CMD:
# Soledad IMAP Account
#######################################
-# TODO remove MsgFields too
+# XXX watchout, account needs to be ready... so we should maybe return
+# a deferred to the IMAP service when it's initialized
-class IMAPAccount(WithMsgFields):
+class IMAPAccount(object):
"""
An implementation of an imap4 Account
that is backed by Soledad Encrypted Documents.
@@ -72,37 +73,20 @@ class IMAPAccount(WithMsgFields):
:param store: a Soledad instance.
:type store: Soledad
"""
- # XXX assert a generic store interface instead, so that we
- # can plug the memory store wrapper seamlessly.
leap_assert(store, "Need a store instance to initialize")
leap_assert_type(store, Soledad)
- # XXX SHOULD assert too that the name matches the user/uuid with which
+ # TODO assert too that the name matches the user/uuid with which
# soledad has been initialized.
self.user_id = user_id
self.account = Account(store)
- # XXX should hide this in the adaptor...
- def _get_mailbox_by_name(self, name):
- """
- Return an mbox document by name.
-
- :param name: the name of the mailbox
- :type name: str
-
- :rtype: SoledadDocument
- """
- def get_first_if_any(docs):
- return docs[0] if docs else None
-
- d = self._store.get_from_index(
- self.TYPE_MBOX_IDX, self.MBOX_KEY,
- normalize_mailbox(name))
- d.addCallback(get_first_if_any)
- return d
+ def _return_mailbox_from_collection(self, collection, readwrite=1):
+ if collection is None:
+ return None
+ return IMAPMailbox(collection, rw=readwrite)
- # XXX move to Account?
- # XXX needed?
+ # XXX Where's this used from? -- self.delete...
def getMailbox(self, name):
"""
Return a Mailbox with that name, without selecting it.
@@ -110,31 +94,25 @@ class IMAPAccount(WithMsgFields):
:param name: name of the mailbox
:type name: str
- :returns: a a SoledadMailbox instance
- :rtype: SoledadMailbox
+ :returns: an IMAPMailbox instance
+ :rtype: IMAPMailbox
"""
name = normalize_mailbox(name)
- if name not in self.account.mailboxes:
- raise imap4.MailboxException("No such mailbox: %r" % name)
+ def check_it_exists(mailboxes):
+ if name not in mailboxes:
+ raise imap4.MailboxException("No such mailbox: %r" % name)
- # XXX Does mailbox really need reference to soledad?
- return SoledadMailbox(name, self._store)
+ d = self.account.list_all_mailbox_names()
+ d.addCallback(check_it_exists)
+ d.addCallback(lambda _: self.account.get_collection_by_mailbox, name)
+ d.addCallbacK(self._return_mailbox_from_collection)
+ return d
#
# IAccount
#
- def _get_empty_mailbox(self):
- """
- Returns an empty mailbox.
-
- :rtype: dict
- """
- # XXX move to mailbox module
- return copy.deepcopy(mailbox.EMPTY_MBOX)
-
- # TODO use mail.Account.add_mailbox
def addMailbox(self, name, creation_ts=None):
"""
Add a mailbox to the account.
@@ -154,8 +132,9 @@ class IMAPAccount(WithMsgFields):
leap_assert(name, "Need a mailbox name to create a mailbox")
- if name in self.mailboxes:
- raise imap4.MailboxCollision(repr(name))
+ def check_it_does_not_exist(mailboxes):
+ if name in mailboxes:
+ raise imap4.MailboxCollision(repr(name))
if creation_ts is None:
# by default, we pass an int value
@@ -164,21 +143,18 @@ class IMAPAccount(WithMsgFields):
# mailbox-uidvalidity.
creation_ts = int(time.time() * 10E2)
- mbox = self._get_empty_mailbox()
- mbox[self.MBOX_KEY] = name
- mbox[self.CREATED_KEY] = creation_ts
-
- def load_mbox_cache(result):
- d = self._load_mailboxes()
- d.addCallback(lambda _: result)
+ def set_mbox_creation_ts(collection):
+ d = collection.set_mbox_attr("created")
+ d.addCallback(lambda _: collection)
return d
- d = self._store.create_doc(mbox)
- d.addCallback(load_mbox_cache)
+ d = self.account.list_all_mailbox_names()
+ d.addCallback(check_it_does_not_exist)
+ d.addCallback(lambda _: self.account.get_collection_by_mailbox, name)
+ d.addCallback(set_mbox_creation_ts)
+ d.addCallback(self._return_mailbox_from_collection)
return d
- # TODO use mail.Account.create_mailbox?
- # Watch out, imap specific exceptions raised here.
def create(self, pathspec):
"""
Create a new mailbox from the given hierarchical name.
@@ -204,9 +180,10 @@ class IMAPAccount(WithMsgFields):
for accum in range(1, len(paths)):
try:
- partial = sep.join(paths[:accum])
- d = self.addMailbox(partial)
+ partial_path = sep.join(paths[:accum])
+ d = self.addMailbox(partial_path)
subs.append(d)
+ # XXX should this be handled by the deferred?
except imap4.MailboxCollision:
pass
try:
@@ -222,21 +199,13 @@ class IMAPAccount(WithMsgFields):
def all_good(result):
return all(result)
- def load_mbox_cache(result):
- d = self._load_mailboxes()
- d.addCallback(lambda _: result)
- return d
-
if subs:
d1 = defer.gatherResults(subs, consumeErrors=True)
- d1.addCallback(load_mbox_cache)
d1.addCallback(all_good)
else:
d1 = defer.succeed(False)
- d1.addCallback(load_mbox_cache)
return d1
- # TODO use mail.Account.get_collection_by_mailbox
def select(self, name, readwrite=1):
"""
Selects a mailbox.
@@ -250,15 +219,28 @@ class IMAPAccount(WithMsgFields):
:rtype: SoledadMailbox
"""
name = normalize_mailbox(name)
- if name not in self.mailboxes:
- logger.warning("No such mailbox!")
- return None
- self.selected = name
- sm = SoledadMailbox(name, self._store, readwrite)
- return sm
+ def check_it_exists(mailboxes):
+ if name not in mailboxes:
+ logger.warning("SELECT: No such mailbox!")
+ return None
+ return name
+
+ def set_selected(_):
+ self.selected = name
+
+ def get_collection(name):
+ if name is None:
+ return None
+ return self.account.get_collection_by_mailbox(name)
+
+ d = self.account.list_all_mailbox_names()
+ d.addCallback(check_it_exists)
+ d.addCallback(get_collection)
+ d.addCallback(partial(
+ self._return_mailbox_from_collection, readwrite=readwrite))
+ return d
- # TODO use mail.Account.delete_mailbox
def delete(self, name, force=False):
"""
Deletes a mailbox.
@@ -276,37 +258,52 @@ class IMAPAccount(WithMsgFields):
:rtype: Deferred
"""
name = normalize_mailbox(name)
+ _mboxes = []
- if name not in self.mailboxes:
- err = imap4.MailboxException("No such mailbox: %r" % name)
- return defer.fail(err)
- mbox = self.getMailbox(name)
+ def check_it_exists(mailboxes):
+ # FIXME works? -- pass variable ref to outer scope
+ _mboxes = mailboxes
+ if name not in mailboxes:
+ err = imap4.MailboxException("No such mailbox: %r" % name)
+ return defer.fail(err)
- if not force:
+ def get_mailbox(_):
+ return self.getMailbox(name)
+
+ def destroy_mailbox(mbox):
+ return mbox.destroy()
+
+ def check_can_be_deleted(mbox):
# See if this box is flagged \Noselect
- # XXX use mbox.flags instead?
mbox_flags = mbox.getFlags()
- if self.NOSELECT_FLAG in mbox_flags:
+ if MessageFlags.NOSELECT_FLAG in mbox_flags:
# Check for hierarchically inferior mailboxes with this one
# as part of their root.
- for others in self.mailboxes:
+ for others in _mboxes:
if others != name and others.startswith(name):
err = imap4.MailboxException(
"Hierarchically inferior mailboxes "
"exist and \\Noselect is set")
return defer.fail(err)
- self.__mailboxes.discard(name)
- return mbox.destroy()
+ return mbox
- # XXX FIXME --- not honoring the inferior names...
+ d = self.account.list_all_mailbox_names()
+ d.addCallback(check_it_exists)
+ d.addCallback(get_mailbox)
+ if not force:
+ d.addCallback(check_can_be_deleted)
+ d.addCallback(destroy_mailbox)
+ return d
+ # FIXME --- not honoring the inferior names...
# if there are no hierarchically inferior names, we will
# delete it from our ken.
+ # XXX is this right?
# if self._inferiorNames(name) > 1:
- # ??! -- can this be rite?
- # self._index.removeMailbox(name)
+ # self._index.removeMailbox(name)
# TODO use mail.Account.rename_mailbox
+ # TODO finish conversion to deferreds
def rename(self, oldname, newname):
"""
Renames a mailbox.
@@ -320,6 +317,9 @@ class IMAPAccount(WithMsgFields):
oldname = normalize_mailbox(oldname)
newname = normalize_mailbox(newname)
+ # FIXME check that scope works (test)
+ _mboxes = []
+
if oldname not in self.mailboxes:
raise imap4.NoSuchMailbox(repr(oldname))
@@ -327,34 +327,19 @@ class IMAPAccount(WithMsgFields):
inferiors = [(o, o.replace(oldname, newname, 1)) for o in inferiors]
for (old, new) in inferiors:
- if new in self.mailboxes:
+ if new in _mboxes:
raise imap4.MailboxCollision(repr(new))
rename_deferreds = []
- def load_mbox_cache(result):
- d = self._load_mailboxes()
- d.addCallback(lambda _: result)
- return d
-
- def update_mbox_doc_name(mbox, oldname, newname, update_deferred):
- mbox.content[self.MBOX_KEY] = newname
- d = self._soledad.put_doc(mbox)
- d.addCallback(lambda r: update_deferred.callback(True))
-
for (old, new) in inferiors:
- self.__mailboxes.discard(old)
- self._memstore.rename_fdocs_mailbox(old, new)
-
- d0 = defer.Deferred()
- d = self._get_mailbox_by_name(old)
- d.addCallback(update_mbox_doc_name, old, new, d0)
- rename_deferreds.append(d0)
+ d = self.account.rename_mailbox(old, new)
+ rename_deferreds.append(d)
d1 = defer.gatherResults(rename_deferreds, consumeErrors=True)
- d1.addCallback(load_mbox_cache)
return d1
+ # FIXME use deferreds (list_all_mailbox_names, etc)
def _inferiorNames(self, name):
"""
Return hierarchically inferior mailboxes.
@@ -387,16 +372,15 @@ class IMAPAccount(WithMsgFields):
:type wildcard: str
"""
# XXX use wildcard in index query
- ref = self._inferiorNames(normalize_mailbox(ref))
+ # TODO get deferreds
wildcard = imap4.wildcardToRegexp(wildcard, '/')
+ ref = self._inferiorNames(normalize_mailbox(ref))
return [(i, self.getMailbox(i)) for i in ref if wildcard.match(i)]
#
# The rest of the methods are specific for leap.mail.imap.account.Account
#
- # TODO ------------------ can we preserve the attr?
- # maybe add to memory store.
def isSubscribed(self, name):
"""
Returns True if user is subscribed to this mailbox.
@@ -406,63 +390,13 @@ class IMAPAccount(WithMsgFields):
:rtype: Deferred (will fire with bool)
"""
- # TODO use Flags class
- subscribed = self.SUBSCRIBED_KEY
-
- def is_subscribed(mbox):
- subs_bool = bool(mbox.content.get(subscribed, False))
- return subs_bool
-
- d = self._get_mailbox_by_name(name)
- d.addCallback(is_subscribed)
- return d
-
- # TODO ------------------ can we preserve the property?
- # maybe add to memory store.
-
- def _get_subscriptions(self):
- """
- Return a list of the current subscriptions for this account.
-
- :returns: A deferred that will fire with the subscriptions.
- :rtype: Deferred
- """
- def get_docs_content(docs):
- return [doc.content[self.MBOX_KEY] for doc in docs]
-
- d = self._store.get_from_index(
- self.TYPE_SUBS_IDX, self.MBOX_KEY, '1')
- d.addCallback(get_docs_content)
- return d
-
- def _set_subscription(self, name, value):
- """
- Sets the subscription value for a given mailbox
-
- :param name: the mailbox
- :type name: str
-
- :param value: the boolean value
- :type value: bool
- """
- # XXX Note that this kind of operation has
- # no guarantees of atomicity. We should not be accessing mbox
- # documents concurrently.
-
- subscribed = self.SUBSCRIBED_KEY
+ name = normalize_mailbox(name)
- def update_subscribed_value(mbox):
- mbox.content[subscribed] = value
- return self._store.put_doc(mbox)
+ def get_subscribed(mbox):
+ return mbox.get_mbox_attr("subscribed")
- # maybe we should store subscriptions in another
- # document...
- if name not in self.mailboxes:
- d = self.addMailbox(name)
- d.addCallback(lambda v: self._get_mailbox_by_name(name))
- else:
- d = self._get_mailbox_by_name(name)
- d.addCallback(update_subscribed_value)
+ d = self.getMailbox(name)
+ d.addCallback(get_subscribed)
return d
def subscribe(self, name):
@@ -475,11 +409,11 @@ class IMAPAccount(WithMsgFields):
"""
name = normalize_mailbox(name)
- def check_and_subscribe(subscriptions):
- if name not in subscriptions:
- return self._set_subscription(name, True)
- d = self._get_subscriptions()
- d.addCallback(check_and_subscribe)
+ def set_subscribed(mbox):
+ return mbox.set_mbox_attr("subscribed", True)
+
+ d = self.getMailbox(name)
+ d.addCallback(set_subscribed)
return d
def unsubscribe(self, name):
@@ -492,17 +426,17 @@ class IMAPAccount(WithMsgFields):
"""
name = normalize_mailbox(name)
- def check_and_unsubscribe(subscriptions):
- if name not in subscriptions:
- raise imap4.MailboxException(
- "Not currently subscribed to %r" % name)
- return self._set_subscription(name, False)
- d = self._get_subscriptions()
- d.addCallback(check_and_unsubscribe)
+ def set_unsubscribed(mbox):
+ return mbox.set_mbox_attr("subscribed", False)
+
+ d = self.getMailbox(name)
+ d.addCallback(set_unsubscribed)
return d
+ # TODO -- get__all_mboxes, return tuple
+ # with ... name? and subscribed bool...
def getSubscriptions(self):
- return self._get_subscriptions()
+ raise NotImplementedError()
#
# INamespacePresenter
@@ -517,20 +451,6 @@ class IMAPAccount(WithMsgFields):
def getOtherNamespaces(self):
return None
- # extra, for convenience
-
- def deleteAllMessages(self, iknowhatiamdoing=False):
- """
- Deletes all messages from all mailboxes.
- Danger! high voltage!
-
- :param iknowhatiamdoing: confirmation parameter, needs to be True
- to proceed.
- """
- if iknowhatiamdoing is True:
- for mbox in self.mailboxes:
- self.delete(mbox, force=True)
-
def __repr__(self):
"""
Representation string for this object.
diff --git a/src/leap/mail/imap/fields.py b/src/leap/mail/imap/fields.py
deleted file mode 100644
index a751c6d..0000000
--- a/src/leap/mail/imap/fields.py
+++ /dev/null
@@ -1,51 +0,0 @@
-# -*- coding: utf-8 -*-
-# fields.py
-# Copyright (C) 2013 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-"""
-Fields for Mailbox and Message.
-"""
-
-# TODO deprecate !!! (move all to constants maybe?)
-# Flags -> foo
-
-
-class WithMsgFields(object):
- """
- Container class for class-attributes to be shared by
- several message-related classes.
- """
- # Mailbox specific keys
- CREATED_KEY = "created" # used???
-
- RECENTFLAGS_KEY = "rct"
- HDOCS_SET_KEY = "hdocset"
-
- # Flags in Mailbox and Message
- SEEN_FLAG = "\\Seen"
- RECENT_FLAG = "\\Recent"
- ANSWERED_FLAG = "\\Answered"
- FLAGGED_FLAG = "\\Flagged" # yo dawg
- DELETED_FLAG = "\\Deleted"
- DRAFT_FLAG = "\\Draft"
- NOSELECT_FLAG = "\\Noselect"
- LIST_FLAG = "List" # is this OK? (no \. ie, no system flag)
-
- # Fields in mail object
- SUBJECT_FIELD = "Subject"
- DATE_FIELD = "Date"
-
-
-fields = WithMsgFields # alias for convenience
diff --git a/src/leap/mail/imap/interfaces.py b/src/leap/mail/imap/interfaces.py
deleted file mode 100644
index f8f25fa..0000000
--- a/src/leap/mail/imap/interfaces.py
+++ /dev/null
@@ -1,96 +0,0 @@
-# -*- coding: utf-8 -*-
-# interfaces.py
-# Copyright (C) 2014 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-"""
-Interfaces for the IMAP module.
-"""
-from zope.interface import Interface, Attribute
-
-
-# TODO remove ----------------
-class IMessageContainer(Interface):
- """
- I am a container around the different documents that a message
- is split into.
- """
- fdoc = Attribute('The flags document for this message, if any.')
- hdoc = Attribute('The headers document for this message, if any.')
- cdocs = Attribute('The dict of content documents for this message, '
- 'if any.')
-
- def walk(self):
- """
- Return an iterator to the docs for all the parts.
-
- :rtype: iterator
- """
-
-
-# TODO remove --------------------
-class IMessageStore(Interface):
- """
- I represent a generic storage for LEAP Messages.
- """
-
- def create_message(self, mbox, uid, message):
- """
- Put the passed message into this IMessageStore.
-
- :param mbox: the mbox this message belongs.
- :param uid: the UID that identifies this message in this mailbox.
- :param message: a IMessageContainer implementor.
- """
-
- def put_message(self, mbox, uid, message):
- """
- Put the passed message into this IMessageStore.
-
- :param mbox: the mbox this message belongs.
- :param uid: the UID that identifies this message in this mailbox.
- :param message: a IMessageContainer implementor.
- """
-
- def remove_message(self, mbox, uid):
- """
- Remove the given message from this IMessageStore.
-
- :param mbox: the mbox this message belongs.
- :param uid: the UID that identifies this message in this mailbox.
- """
-
- def get_message(self, mbox, uid):
- """
- Get a IMessageContainer for the given mbox and uid combination.
-
- :param mbox: the mbox this message belongs.
- :param uid: the UID that identifies this message in this mailbox.
- :return: IMessageContainer
- """
-
-
-class IMessageStoreWriter(Interface):
- """
- I represent a storage that is able to write its contents to another
- different IMessageStore.
- """
-
- def write_messages(self, store):
- """
- Write the documents in this IMessageStore to a different
- storage. Usually this will be done from a MemoryStorage to a DbStorage.
-
- :param store: another IMessageStore implementor.
- """
diff --git a/src/leap/mail/imap/mailbox.py b/src/leap/mail/imap/mailbox.py
index ea54d33..faeba9d 100644
--- a/src/leap/mail/imap/mailbox.py
+++ b/src/leap/mail/imap/mailbox.py
@@ -1,6 +1,6 @@
# *- coding: utf-8 -*-
# mailbox.py
-# Copyright (C) 2013, 2014 LEAP
+# Copyright (C) 2013-2015 LEAP
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -15,11 +15,9 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
-Soledad Mailbox.
+IMAP Mailbox.
"""
-import copy
import re
-import threading
import logging
import StringIO
import cStringIO
@@ -29,7 +27,6 @@ from collections import defaultdict
from twisted.internet import defer
from twisted.internet import reactor
-from twisted.internet.task import deferLater
from twisted.python import log
from twisted.mail import imap4
@@ -38,17 +35,15 @@ from zope.interface import implements
from leap.common import events as leap_events
from leap.common.events.events_pb2 import IMAP_UNREAD_MAIL
from leap.common.check import leap_assert, leap_assert_type
-from leap.mail.constants import INBOX_NAME
-from leap.mail.decorators import deferred_to_thread
-from leap.mail.utils import empty
-from leap.mail.imap.fields import WithMsgFields, fields
-from leap.mail.imap.messages import MessageCollection
-from leap.mail.imap.messageparts import MessageWrapper
+from leap.mail.constants import INBOX_NAME, MessageFlags
logger = logging.getLogger(__name__)
-# TODO
+# TODO LIST
# [ ] Restore profile_cmd instrumentation
+# [ ] finish the implementation of IMailboxListener
+# [ ] implement the rest of ISearchableMailbox
+
"""
If the environment variable `LEAP_SKIPNOTIFY` is set, we avoid
@@ -75,16 +70,20 @@ if PROFILE_CMD:
d.addCallback(_debugProfiling, name, time.time())
d.addErrback(lambda f: log.msg(f.getTraceback()))
+INIT_FLAGS = (MessageFlags.SEEN_FLAG, MessageFlags.ANSWERED_FLAG,
+ MessageFlags.FLAGGED_FLAG, MessageFlags.DELETED_FLAG,
+ MessageFlags.DRAFT_FLAG, MessageFlags.RECENT_FLAG,
+ MessageFlags.LIST_FLAG)
+
-# TODO Rename to Mailbox
-# TODO Remove WithMsgFields
-class SoledadMailbox(WithMsgFields):
+class IMAPMailbox(object):
"""
A Soledad-backed IMAP mailbox.
Implements the high-level method needed for the Mailbox interfaces.
- The low-level database methods are contained in MessageCollection class,
- which we instantiate and make accessible in the `messages` attribute.
+ The low-level database methods are contained in IMAPMessageCollection
+ class, which we instantiate and make accessible in the `messages`
+ attribute.
"""
implements(
imap4.IMailbox,
@@ -93,17 +92,7 @@ class SoledadMailbox(WithMsgFields):
imap4.ISearchableMailbox,
imap4.IMessageCopier)
- # XXX should finish the implementation of IMailboxListener
- # XXX should completely implement ISearchableMailbox too
-
- messages = None
- _closed = False
-
- INIT_FLAGS = (WithMsgFields.SEEN_FLAG, WithMsgFields.ANSWERED_FLAG,
- WithMsgFields.FLAGGED_FLAG, WithMsgFields.DELETED_FLAG,
- WithMsgFields.DRAFT_FLAG, WithMsgFields.RECENT_FLAG,
- WithMsgFields.LIST_FLAG)
- flags = None
+ init_flags = INIT_FLAGS
CMD_MSG = "MESSAGES"
CMD_RECENT = "RECENT"
@@ -111,58 +100,31 @@ class SoledadMailbox(WithMsgFields):
CMD_UIDVALIDITY = "UIDVALIDITY"
CMD_UNSEEN = "UNSEEN"
- # FIXME we should turn this into a datastructure with limited capacity
+ # TODO we should turn this into a datastructure with limited capacity
_listeners = defaultdict(set)
- next_uid_lock = threading.Lock()
- last_uid_lock = threading.Lock()
-
- # TODO unify all the `primed` dicts
- _fdoc_primed = {}
- _last_uid_primed = {}
- _known_uids_primed = {}
-
- # TODO pass the collection to the constructor
- # TODO pass the mbox_doc too
- def __init__(self, mbox, store, rw=1):
+ def __init__(self, collection, rw=1):
"""
SoledadMailbox constructor. Needs to get passed a name, plus a
Soledad instance.
- :param mbox: the mailbox name
- :type mbox: str
-
- :param store:
- :type store: Soledad
+ :param collection: instance of IMAPMessageCollection
+ :type collection: IMAPMessageCollection
:param rw: read-and-write flag for this mailbox
:type rw: int
"""
- leap_assert(mbox, "Need a mailbox name to initialize")
- leap_assert(store, "Need a store instance to initialize")
-
- self.mbox = normalize_mailbox(mbox)
self.rw = rw
- self.store = store
-
- self.messages = MessageCollection(mbox=mbox, soledad=store)
self._uidvalidity = None
+ self.collection = collection
- # XXX careful with this get/set (it would be
- # hitting db unconditionally, move to memstore too)
- # Now it's returning a fixed amount of flags from mem
- # as a workaround.
if not self.getFlags():
- self.setFlags(self.INIT_FLAGS)
-
- if self._memstore:
- self.prime_known_uids_to_memstore()
- self.prime_last_uid_to_memstore()
- self.prime_flag_docs_to_memstore()
+ self.setFlags(self.init_flags)
- # purge memstore from empty fdocs.
- self._memstore.purge_fdoc_store(mbox)
+ @property
+ def mbox_name(self):
+ return self.collection.mbox_name
@property
def listeners(self):
@@ -175,11 +137,12 @@ class SoledadMailbox(WithMsgFields):
:rtype: set
"""
- return self._listeners[self.mbox]
+ return self._listeners[self.mbox_name]
- # TODO this grows too crazily when many instances are fired, like
+ # FIXME this grows too crazily when many instances are fired, like
# during imaptest stress testing. Should have a queue of limited size
# instead.
+
def addListener(self, listener):
"""
Add a listener to the listeners queue.
@@ -204,16 +167,6 @@ class SoledadMailbox(WithMsgFields):
"""
self.listeners.remove(listener)
- def _get_mbox_doc(self):
- """
- Return mailbox document.
-
- :return: A SoledadDocument containing this mailbox, or None if
- the query failed.
- :rtype: SoledadDocument or None.
- """
- return self._memstore.get_mbox_doc(self.mbox)
-
def getFlags(self):
"""
Returns the flags defined for this mailbox.
@@ -221,10 +174,11 @@ class SoledadMailbox(WithMsgFields):
:returns: tuple of flags for this mailbox
:rtype: tuple of str
"""
- flags = self._memstore.get_mbox_flags(self.mbox)
+ flags = self.collection.mbox_wrapper.flags
if not flags:
- flags = self.INIT_FLAGS
- return map(str, flags)
+ flags = self.init_flags
+ flags_str = map(str, flags)
+ return flags_str
def setFlags(self, flags):
"""
@@ -234,98 +188,31 @@ class SoledadMailbox(WithMsgFields):
:type flags: tuple of str
"""
# XXX this is setting (overriding) old flags.
+ # Better pass a mode flag
leap_assert(isinstance(flags, tuple),
"flags expected to be a tuple")
- self._memstore.set_mbox_flags(self.mbox, flags)
-
- # XXX SHOULD BETTER IMPLEMENT ADD_FLAG, REMOVE_FLAG.
+ return self.collection.set_mbox_attr("flags", flags)
- def _get_closed(self):
+ @property
+ def is_closed(self):
"""
Return the closed attribute for this mailbox.
:return: True if the mailbox is closed
:rtype: bool
"""
- return self._memstore.get_mbox_closed(self.mbox)
+ return self.collection.get_mbox_attr("closed")
- def _set_closed(self, closed):
+ def set_closed(self, closed):
"""
Set the closed attribute for this mailbox.
:param closed: the state to be set
:type closed: bool
- """
- self._memstore.set_mbox_closed(self.mbox, closed)
-
- closed = property(
- _get_closed, _set_closed, doc="Closed attribute.")
-
- def _get_last_uid(self):
- """
- Return the last uid for this mailbox.
- If we have a memory store, the last UID will be the highest
- recorded UID in the message store, or a counter cached from
- the mailbox document in soledad if this is higher.
-
- :return: the last uid for messages in this mailbox
- :rtype: int
- """
- last = self._memstore.get_last_uid(self.mbox)
- logger.debug("last uid for %s: %s (from memstore)" % (
- repr(self.mbox), last))
- return last
-
- last_uid = property(
- _get_last_uid, doc="Last_UID attribute.")
- def prime_last_uid_to_memstore(self):
- """
- Prime memstore with last_uid value
- """
- primed = self._last_uid_primed.get(self.mbox, False)
- if not primed:
- mbox = self._get_mbox_doc()
- if mbox is None:
- # memory-only store
- return
- last = mbox.content.get('lastuid', 0)
- logger.info("Priming Soledad last_uid to %s" % (last,))
- self._memstore.set_last_soledad_uid(self.mbox, last)
- self._last_uid_primed[self.mbox] = True
-
- def prime_known_uids_to_memstore(self):
- """
- Prime memstore with the set of all known uids.
-
- We do this to be able to filter the requests efficiently.
- """
- primed = self._known_uids_primed.get(self.mbox, False)
- # XXX handle the maybeDeferred
-
- def set_primed(known_uids):
- self._memstore.set_known_uids(self.mbox, known_uids)
- self._known_uids_primed[self.mbox] = True
-
- if not primed:
- d = self.messages.all_soledad_uid_iter()
- d.addCallback(set_primed)
- return d
-
- def prime_flag_docs_to_memstore(self):
- """
- Prime memstore with all the flags documents.
+ :rtype: Deferred
"""
- primed = self._fdoc_primed.get(self.mbox, False)
-
- def set_flag_docs(flag_docs):
- self._memstore.load_flag_docs(self.mbox, flag_docs)
- self._fdoc_primed[self.mbox] = True
-
- if not primed:
- d = self.messages.get_all_soledad_flag_docs()
- d.addCallback(set_flag_docs)
- return d
+ return self.collection.set_mbox_attr("closed", closed)
def getUIDValidity(self):
"""
@@ -334,12 +221,7 @@ class SoledadMailbox(WithMsgFields):
:return: unique validity identifier
:rtype: int
"""
- if self._uidvalidity is None:
- mbox = self._get_mbox_doc()
- if mbox is None:
- return 0
- self._uidvalidity = mbox.content.get(self.CREATED_KEY, 1)
- return self._uidvalidity
+ return self.collection.get_mbox_attr("created")
def getUID(self, message):
"""
@@ -354,9 +236,9 @@ class SoledadMailbox(WithMsgFields):
:rtype: int
"""
- msg = self.messages.get_msg_by_uid(message)
- if msg is not None:
- return msg.getUID()
+ d = self.collection.get_msg_by_uid(message)
+ d.addCallback(lambda m: m.getUID())
+ return d
def getUIDNext(self):
"""
@@ -364,23 +246,20 @@ class SoledadMailbox(WithMsgFields):
mailbox. Currently it returns the higher UID incremented by
one.
- We increment the next uid *each* time this function gets called.
- In this way, there will be gaps if the message with the allocated
- uid cannot be saved. But that is preferable to having race conditions
- if we get to parallel message adding.
-
- :rtype: int
+ :return: deferred with int
+ :rtype: Deferred
"""
- with self.next_uid_lock:
- return self.last_uid + 1
+ d = self.collection.get_uid_next()
+ return d
def getMessageCount(self):
"""
Returns the total count of messages in this mailbox.
- :rtype: int
+ :return: deferred with int
+ :rtype: Deferred
"""
- return self.messages.count()
+ return self.collection.count()
def getUnseenCount(self):
"""
@@ -389,7 +268,7 @@ class SoledadMailbox(WithMsgFields):
:return: count of messages flagged `unseen`
:rtype: int
"""
- return self.messages.count_unseen()
+ return self.collection.count_unseen()
def getRecentCount(self):
"""
@@ -398,7 +277,7 @@ class SoledadMailbox(WithMsgFields):
:return: count of messages flagged `recent`
:rtype: int
"""
- return self.messages.count_recent()
+ return self.collection.count_recent()
def isWriteable(self):
"""
@@ -407,6 +286,8 @@ class SoledadMailbox(WithMsgFields):
:return: 1 if mailbox is read-writeable, 0 otherwise.
:rtype: int
"""
+ # XXX We don't need to store it in the mbox doc, do we?
+ # return int(self.collection.get_mbox_attr('rw'))
return self.rw
def getHierarchicalDelimiter(self):
@@ -431,14 +312,14 @@ class SoledadMailbox(WithMsgFields):
if self.CMD_RECENT in names:
r[self.CMD_RECENT] = self.getRecentCount()
if self.CMD_UIDNEXT in names:
- r[self.CMD_UIDNEXT] = self.last_uid + 1
+ r[self.CMD_UIDNEXT] = self.getUIDNext()
if self.CMD_UIDVALIDITY in names:
r[self.CMD_UIDVALIDITY] = self.getUIDValidity()
if self.CMD_UNSEEN in names:
r[self.CMD_UNSEEN] = self.getUnseenCount()
return defer.succeed(r)
- def addMessage(self, message, flags, date=None, notify_on_disk=False):
+ def addMessage(self, message, flags, date=None):
"""
Adds a message to this mailbox.
@@ -464,10 +345,8 @@ class SoledadMailbox(WithMsgFields):
else:
flags = tuple(str(flag) for flag in flags)
- d = self._do_add_message(message, flags=flags, date=date,
- notify_on_disk=notify_on_disk)
- #if PROFILE_CMD:
- #do_profile_cmd(d, "APPEND")
+ # if PROFILE_CMD:
+ # do_profile_cmd(d, "APPEND")
# XXX should review now that we're not using qtreactor.
# A better place for this would be the COPY/APPEND dispatcher
@@ -478,19 +357,11 @@ class SoledadMailbox(WithMsgFields):
reactor.callLater(0, self.notify_new)
return x
+ d = self.collection.add_message(flags=flags, date=date)
d.addCallback(notifyCallback)
d.addErrback(lambda f: log.msg(f.getTraceback()))
return d
- def _do_add_message(self, message, flags, date, notify_on_disk=False):
- """
- Calls to the messageCollection add_msg method.
- Invoked from addMessage.
- """
- d = self.messages.add_msg(message, flags=flags, date=date,
- notify_on_disk=notify_on_disk)
- return d
-
def notify_new(self, *args):
"""
Notify of new messages to all the listeners.
@@ -502,26 +373,34 @@ class SoledadMailbox(WithMsgFields):
def cbNotifyNew(result):
exists, recent = result
- for l in self.listeners:
- l.newMessages(exists, recent)
+ for listener in self.listeners:
+ listener.newMessages(exists, recent)
+
d = self._get_notify_count()
d.addCallback(cbNotifyNew)
d.addCallback(self.cb_signal_unread_to_ui)
- @deferred_to_thread
def _get_notify_count(self):
"""
Get message count and recent count for this mailbox
Executed in a separate thread. Called from notify_new.
- :return: number of messages and number of recent messages.
- :rtype: tuple
+ :return: a deferred that will fire with a tuple, with number of
+ messages and number of recent messages.
+ :rtype: Deferred
"""
- exists = self.getMessageCount()
- recent = self.getRecentCount()
- logger.debug("NOTIFY (%r): there are %s messages, %s recent" % (
- self.mbox, exists, recent))
- return exists, recent
+ d_exists = self.getMessageCount()
+ d_recent = self.getRecentCount()
+ d_list = [d_exists, d_recent]
+
+ def log_num_msg(result):
+ exists, recent = result
+ logger.debug("NOTIFY (%r): there are %s messages, %s recent" % (
+ self.mbox_name, exists, recent))
+
+ d = defer.gatherResults(d_list)
+ d.addCallback(log_num_msg)
+ return d
# commands, do not rename methods
@@ -533,27 +412,18 @@ class SoledadMailbox(WithMsgFields):
on the mailbox.
"""
- # XXX this will overwrite all the existing flags!
+ # XXX this will overwrite all the existing flags
# should better simply addFlag
- self.setFlags((self.NOSELECT_FLAG,))
-
- # XXX removing the mailbox in situ for now,
- # we should postpone the removal
-
- def remove_mbox_doc(ignored):
- # XXX move to memory store??
+ self.setFlags((MessageFlags.NOSELECT_FLAG,))
- def _remove_mbox_doc(doc):
- if doc is None:
- # memory-only store!
- return defer.succeed(True)
- return self._soledad.delete_doc(doc)
-
- doc = self._get_mbox_doc()
- return _remove_mbox_doc(doc)
+ def remove_mbox(_):
+ # FIXME collection does not have a delete_mbox method,
+ # it's in account.
+ # XXX should take care of deleting the uid table too.
+ return self.collection.delete_mbox(self.mbox_name)
d = self.deleteAllDocs()
- d.addCallback(remove_mbox_doc)
+ d.addCallback(remove_mbox)
return d
def _close_cb(self, result):
@@ -574,9 +444,11 @@ class SoledadMailbox(WithMsgFields):
if not self.isWriteable():
raise imap4.ReadOnlyMailbox
d = defer.Deferred()
- self._memstore.expunge(self.mbox, d)
+ # FIXME actually broken.
+ # Iterate through index, and do a expunge.
return d
+ # FIXME -- get last_uid from mbox_indexer
def _bound_seq(self, messages_asked):
"""
Put an upper bound to a messages sequence if this is open.
@@ -596,6 +468,7 @@ class SoledadMailbox(WithMsgFields):
pass
return messages_asked
+ # TODO -- needed? --- we can get the sequence from the indexer.
def _filter_msg_seq(self, messages_asked):
"""
Filter a message sequence returning only the ones that do exist in the
@@ -627,29 +500,6 @@ class SoledadMailbox(WithMsgFields):
:rtype: deferred
"""
- d = defer.Deferred()
-
- # XXX do not need no thread...
- reactor.callInThread(self._do_fetch, messages_asked, uid, d)
- d.addCallback(self.cb_signal_unread_to_ui)
- return d
-
- # called in thread
- def _do_fetch(self, messages_asked, uid, d):
- """
- :param messages_asked: IDs of the messages to retrieve information
- about
- :type messages_asked: MessageSet
-
- :param uid: If true, the IDs are UIDs. They are message sequence IDs
- otherwise.
- :type uid: bool
- :param d: deferred whose callback will be called with result.
- :type d: Deferred
-
- :rtype: A tuple of two-tuples of message sequence numbers and
- LeapMessage
- """
# For the moment our UID is sequential, so we
# can treat them all the same.
# Change this to the flag that twisted expects when we
@@ -660,18 +510,23 @@ class SoledadMailbox(WithMsgFields):
messages_asked = self._bound_seq(messages_asked)
seq_messg = self._filter_msg_seq(messages_asked)
- getmsg = lambda uid: self.messages.get_msg_by_uid(uid)
+ getmsg = self.collection.get_msg_by_uid
# for sequence numbers (uid = 0)
if sequence:
logger.debug("Getting msg by index: INEFFICIENT call!")
+ # TODO --- implement sequences in mailbox indexer
raise NotImplementedError
else:
got_msg = ((msgid, getmsg(msgid)) for msgid in seq_messg)
result = ((msgid, msg) for msgid, msg in got_msg
if msg is not None)
- self.reactor.callLater(0, self.unset_recent_flags, seq_messg)
- self.reactor.callFromThread(d.callback, result)
+ reactor.callLater(0, self.unset_recent_flags, seq_messg)
+
+ # TODO -- call signal_to_ui
+ # d.addCallback(self.cb_signal_unread_to_ui)
+
+ return result
def fetch_flags(self, messages_asked, uid):
"""
@@ -698,12 +553,11 @@ class SoledadMailbox(WithMsgFields):
:rtype: tuple
"""
d = defer.Deferred()
- self.reactor.callInThread(self._do_fetch_flags, messages_asked, uid, d)
+ reactor.callLater(0, self._do_fetch_flags, messages_asked, uid, d)
if PROFILE_CMD:
do_profile_cmd(d, "FETCH-ALL-FLAGS")
return d
- # called in thread
def _do_fetch_flags(self, messages_asked, uid, d):
"""
:param messages_asked: IDs of the messages to retrieve information
@@ -733,10 +587,11 @@ class SoledadMailbox(WithMsgFields):
messages_asked = self._bound_seq(messages_asked)
seq_messg = self._filter_msg_seq(messages_asked)
- all_flags = self._memstore.all_flags(self.mbox)
+ # FIXME use deferreds here
+ all_flags = self.collection.get_all_flags(self.mbox_name)
result = ((msgid, flagsPart(
msgid, all_flags.get(msgid, tuple()))) for msgid in seq_messg)
- self.reactor.callFromThread(d.callback, result)
+ d.callback(result)
def fetch_headers(self, messages_asked, uid):
"""
@@ -843,8 +698,8 @@ class SoledadMailbox(WithMsgFields):
raise imap4.ReadOnlyMailbox
d = defer.Deferred()
- self.reactor.callLater(0, self._do_store, messages_asked, flags,
- mode, uid, d)
+ reactor.callLater(0, self._do_store, messages_asked, flags,
+ mode, uid, d)
if PROFILE_CMD:
do_profile_cmd(d, "STORE")
d.addCallback(self.cb_signal_unread_to_ui)
@@ -853,7 +708,7 @@ class SoledadMailbox(WithMsgFields):
def _do_store(self, messages_asked, flags, mode, uid, observer):
"""
- Helper method, invoke set_flags method in the MessageCollection.
+ Helper method, invoke set_flags method in the IMAPMessageCollection.
See the documentation for the `store` method for the parameters.
@@ -869,7 +724,8 @@ class SoledadMailbox(WithMsgFields):
flags = tuple(flags)
messages_asked = self._bound_seq(messages_asked)
seq_messg = self._filter_msg_seq(messages_asked)
- self.messages.set_flags(self.mbox, seq_messg, flags, mode, observer)
+ self.collection.set_flags(
+ self.mbox_name, seq_messg, flags, mode, observer)
# ISearchableMailbox
@@ -908,6 +764,7 @@ class SoledadMailbox(WithMsgFields):
msgid = str(query[3]).strip()
logger.debug("Searching for %s" % (msgid,))
d = self.messages._get_uid_from_msgid(str(msgid))
+ # XXX remove gatherResults
d1 = defer.gatherResults([d])
# we want a list, so return it all the same
return d1
@@ -928,94 +785,18 @@ class SoledadMailbox(WithMsgFields):
uid when the copy succeed.
:rtype: Deferred
"""
- d = defer.Deferred()
if PROFILE_CMD:
do_profile_cmd(d, "COPY")
# A better place for this would be the COPY/APPEND dispatcher
# in server.py, but qtreactor hangs when I do that, so this seems
# to work fine for now.
- d.addCallback(lambda r: self.reactor.callLater(0, self.notify_new))
- deferLater(self.reactor, 0, self._do_copy, message, d)
- return d
+ #d.addCallback(lambda r: self.reactor.callLater(0, self.notify_new))
+ #deferLater(self.reactor, 0, self._do_copy, message, d)
+ #return d
- def _do_copy(self, message, observer):
- """
- Call invoked from the deferLater in `copy`. This will
- copy the flags and header documents, and pass them to the
- `create_message` method in the MemoryStore, together with
- the observer deferred that we've been passed along.
-
- :param message: an IMessage implementor
- :type message: LeapMessage
- :param observer: the deferred that will fire with the
- UID of the message
- :type observer: Deferred
- """
- memstore = self._memstore
-
- def createCopy(result):
- exist, new_fdoc = result
- if exist:
- # Should we signal error on the callback?
- logger.warning("Destination message already exists!")
-
- # XXX I'm not sure if we should raise the
- # errback. This actually rases an ugly warning
- # in some muas like thunderbird.
- # UID 0 seems a good convention for no uid.
- observer.callback(0)
- else:
- mbox = self.mbox
- uid_next = memstore.increment_last_soledad_uid(mbox)
-
- new_fdoc[self.UID_KEY] = uid_next
- new_fdoc[self.MBOX_KEY] = mbox
-
- flags = list(new_fdoc[self.FLAGS_KEY])
- flags.append(fields.RECENT_FLAG)
- new_fdoc[self.FLAGS_KEY] = tuple(set(flags))
-
- # FIXME set recent!
-
- self._memstore.create_message(
- self.mbox, uid_next,
- MessageWrapper(new_fdoc),
- observer=observer,
- notify_on_disk=False)
-
- d = self._get_msg_copy(message)
- d.addCallback(createCopy)
- d.addErrback(lambda f: log.msg(f.getTraceback()))
-
- #@deferred_to_thread
- def _get_msg_copy(self, message):
- """
- Get a copy of the fdoc for this message, and check whether
- it already exists.
-
- :param message: an IMessage implementor
- :type message: LeapMessage
- :return: exist, new_fdoc
- :rtype: tuple
- """
- # XXX for clarity, this could be delegated to a
- # MessageCollection mixin that implements copy too, and
- # moved out of here.
- msg = message
- memstore = self._memstore
-
- if empty(msg.fdoc):
- logger.warning("Tried to copy a MSG with no fdoc")
- return
- new_fdoc = copy.deepcopy(msg.fdoc.content)
- fdoc_chash = new_fdoc[fields.CONTENT_HASH_KEY]
-
- dest_fdoc = memstore.get_fdoc_from_chash(
- fdoc_chash, self.mbox)
-
- exist = not empty(dest_fdoc)
- return exist, new_fdoc
+ # FIXME not implemented !!! ---
+ return self.collection.copy_msg(message, self.mbox_name)
# convenience fun
@@ -1023,29 +804,25 @@ class SoledadMailbox(WithMsgFields):
"""
Delete all docs in this mailbox
"""
- def del_all_docs(docs):
- todelete = []
- for doc in docs:
- d = self.messages._soledad.delete_doc(doc)
- todelete.append(d)
- return defer.gatherResults(todelete)
-
- d = self.messages.get_all_docs()
- d.addCallback(del_all_docs)
- return d
+ # FIXME not implemented
+ return self.collection.delete_all_docs()
def unset_recent_flags(self, uid_seq):
"""
Unset Recent flag for a sequence of UIDs.
"""
- self.messages.unset_recent_flags(uid_seq)
+ # FIXME not implemented
+ return self.collection.unset_recent_flags(uid_seq)
def __repr__(self):
"""
Representation string for this mailbox.
"""
- return u"<SoledadMailbox: mbox '%s' (%s)>" % (
- self.mbox, self.messages.count())
+ return u"<IMAPMailbox: mbox '%s' (%s)>" % (
+ self.mbox_name, self.messages.count())
+
+
+_INBOX_RE = re.compile(INBOX_NAME, re.IGNORECASE)
def normalize_mailbox(name):
@@ -1060,7 +837,8 @@ def normalize_mailbox(name):
:rtype: unicode
"""
- _INBOX_RE = re.compile(INBOX_NAME, re.IGNORECASE)
+ # XXX maybe it would make sense to normalize common folders too:
+ # trash, sent, drafts, etc...
if _INBOX_RE.match(name):
# ensure inital INBOX is uppercase
return INBOX_NAME + name[len(INBOX_NAME):]
diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py
deleted file mode 100644
index eda5b96..0000000
--- a/src/leap/mail/imap/memorystore.py
+++ /dev/null
@@ -1,1340 +0,0 @@
-
-# memorystore.py
-# Copyright (C) 2014 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-"""
-In-memory transient store for a LEAPIMAPServer.
-"""
-import contextlib
-import logging
-import threading
-import weakref
-
-from collections import defaultdict
-from copy import copy
-
-from enum import Enum
-from twisted.internet import defer
-from twisted.internet import reactor
-from twisted.internet.task import LoopingCall
-from twisted.python import log
-from zope.interface import implements
-
-from leap.common.check import leap_assert_type
-from leap.mail import size
-from leap.mail.utils import empty, phash_iter
-from leap.mail.messageflow import MessageProducer
-from leap.mail.imap import interfaces
-from leap.mail.imap.fields import fields
-from leap.mail.imap.messageparts import MessagePartType, MessagePartDoc
-from leap.mail.imap.messageparts import RecentFlagsDoc
-from leap.mail.imap.messageparts import MessageWrapper
-from leap.mail.imap.messageparts import ReferenciableDict
-
-from leap.mail.decorators import deferred_to_thread
-
-logger = logging.getLogger(__name__)
-
-
-# The default period to do writebacks to the permanent
-# soledad storage, in seconds.
-SOLEDAD_WRITE_PERIOD = 15
-
-FDOC = MessagePartType.fdoc.name
-HDOC = MessagePartType.hdoc.name
-CDOCS = MessagePartType.cdocs.name
-DOCS_ID = MessagePartType.docs_id.name
-
-
-@contextlib.contextmanager
-def set_bool_flag(obj, att):
- """
- Set a boolean flag to True while we're doing our thing.
- Just to let the world know.
- """
- setattr(obj, att, True)
- try:
- yield True
- except RuntimeError as exc:
- logger.exception(exc)
- finally:
- setattr(obj, att, False)
-
-
-DirtyState = Enum("DirtyState", "none dirty new")
-
-
-class MemoryStore(object):
- """
- An in-memory store to where we can write the different parts that
- we split the messages into and buffer them until we write them to the
- permanent storage.
-
- It uses MessageWrapper instances to represent the message-parts, which are
- indexed by mailbox name and UID.
-
- It also can be passed a permanent storage as a paremeter (any implementor
- of IMessageStore, in this case a SoledadStore). In this case, a periodic
- dump of the messages stored in memory will be done. The period of the
- writes to the permanent storage is controled by the write_period parameter
- in the constructor.
- """
- implements(interfaces.IMessageStore,
- interfaces.IMessageStoreWriter)
-
- # TODO We will want to index by chash when we transition to local-only
- # UIDs.
-
- WRITING_FLAG = "_writing"
- _last_uid_lock = threading.Lock()
- _fdoc_docid_lock = threading.Lock()
-
- def __init__(self, permanent_store=None,
- write_period=SOLEDAD_WRITE_PERIOD):
- """
- Initialize a MemoryStore.
-
- :param permanent_store: a IMessageStore implementor to dump
- messages to.
- :type permanent_store: IMessageStore
- :param write_period: the interval to dump messages to disk, in seconds.
- :type write_period: int
- """
- self._permanent_store = permanent_store
- self._write_period = write_period
-
- if permanent_store is None:
- self._mbox_closed = defaultdict(lambda: False)
-
- # Internal Storage: messages
- """
- flags document store.
- _fdoc_store[mbox][uid] = { 'content': 'aaa' }
- """
- self._fdoc_store = defaultdict(lambda: defaultdict(
- lambda: ReferenciableDict({})))
-
- # Sizes
- """
- {'mbox, uid': <int>}
- """
- self._sizes = {}
-
- # Internal Storage: payload-hash
- """
- fdocs:doc-id store, stores document IDs for putting
- the dirty flags-docs.
- """
- self._fdoc_id_store = defaultdict(lambda: defaultdict(
- lambda: ''))
-
- # Internal Storage: content-hash:hdoc
- """
- hdoc-store keeps references to
- the header-documents indexed by content-hash.
-
- {'chash': { dict-stuff }
- }
- """
- self._hdoc_store = defaultdict(lambda: ReferenciableDict({}))
-
- # Internal Storage: payload-hash:cdoc
- """
- content-docs stored by payload-hash
- {'phash': { dict-stuff } }
- """
- self._cdoc_store = defaultdict(lambda: ReferenciableDict({}))
-
- # Internal Storage: content-hash:fdoc
- """
- chash-fdoc-store keeps references to
- the flag-documents indexed by content-hash.
-
- {'chash': {'mbox-a': weakref.proxy(dict),
- 'mbox-b': weakref.proxy(dict)}
- }
- """
- self._chash_fdoc_store = defaultdict(lambda: defaultdict(lambda: None))
-
- # Internal Storage: recent-flags store
- """
- recent-flags store keeps one dict per mailbox,
- with the document-id of the u1db document
- and the set of the UIDs that have the recent flag.
-
- {'mbox-a': {'doc_id': 'deadbeef',
- 'set': {1,2,3,4}
- }
- }
- """
- # TODO this will have to transition to content-hash
- # indexes after we move to local-only UIDs.
-
- self._rflags_store = defaultdict(
- lambda: {'doc_id': None, 'set': set([])})
-
- """
- last-uid store keeps the count of the highest UID
- per mailbox.
-
- {'mbox-a': 42,
- 'mbox-b': 23}
- """
- self._last_uid = defaultdict(lambda: 0)
-
- """
- known-uids keeps a count of the uids that soledad knows for a given
- mailbox
-
- {'mbox-a': set([1,2,3])}
- """
- self._known_uids = defaultdict(set)
-
- """
- mbox-flags is a dict containing flags for each mailbox. this is
- modified from mailbox.getFlags / mailbox.setFlags
- """
- self._mbox_flags = defaultdict(set)
-
- # New and dirty flags, to set MessageWrapper State.
- self._new = set([])
- self._new_queue = set([])
- self._new_deferreds = {}
-
- self._dirty = set([])
- self._dirty_queue = set([])
- self._dirty_deferreds = {}
-
- self._rflags_dirty = set([])
-
- # Flag for signaling we're busy writing to the disk storage.
- setattr(self, self.WRITING_FLAG, False)
-
- if self._permanent_store is not None:
- # this producer spits its messages to the permanent store
- # consumer using a queue. We will use that to put
- # our messages to be written.
- self.producer = MessageProducer(permanent_store,
- period=0.1)
- # looping call for dumping to SoledadStore
- self._write_loop = LoopingCall(self.write_messages,
- permanent_store)
-
- # We can start the write loop right now, why wait?
- self._start_write_loop()
- else:
- # We have a memory-only store.
- self.producer = None
- self._write_loop = None
-
- # TODO -- remove
- def _start_write_loop(self):
- """
- Start loop for writing to disk database.
- """
- if self._write_loop is None:
- return
- if not self._write_loop.running:
- self._write_loop.start(self._write_period, now=True)
-
- # TODO -- remove
- def _stop_write_loop(self):
- """
- Stop loop for writing to disk database.
- """
- if self._write_loop is None:
- return
- if self._write_loop.running:
- self._write_loop.stop()
-
- # IMessageStore
-
- # XXX this would work well for whole message operations.
- # We would have to add a put_flags operation to modify only
- # the flags doc (and set the dirty flag accordingly)
-
- def create_message(self, mbox, uid, message, observer,
- notify_on_disk=True):
- """
- Create the passed message into this MemoryStore.
-
- By default we consider that any message is a new message.
-
- :param mbox: the mailbox
- :type mbox: str or unicode
- :param uid: the UID for the message
- :type uid: int
- :param message: a message to be added
- :type message: MessageWrapper
- :param observer:
- the deferred that will fire with the UID of the message. If
- notify_on_disk is True, this will happen when the message is
- written to Soledad. Otherwise it will fire as soon as we've added
- the message to the memory store.
- :type observer: Deferred
- :param notify_on_disk:
- whether the `observer` deferred should wait until the message is
- written to disk to be fired.
- :type notify_on_disk: bool
- """
- # TODO -- return a deferred
- log.msg("Adding new doc to memstore %r (%r)" % (mbox, uid))
- key = mbox, uid
-
- self._add_message(mbox, uid, message, notify_on_disk)
- self._new.add(key)
-
- if observer is not None:
- if notify_on_disk:
- # We store this deferred so we can keep track of the pending
- # operations internally.
- # TODO this should fire with the UID !!! -- change that in
- # the soledad store code.
- self._new_deferreds[key] = observer
-
- else:
- # Caller does not care, just fired and forgot, so we pass
- # a defer that will inmediately have its callback triggered.
- reactor.callFromThread(observer.callback, uid)
-
- def put_message(self, mbox, uid, message, notify_on_disk=True):
- """
- Put an existing message.
-
- This will also set the dirty flag on the MemoryStore.
-
- :param mbox: the mailbox
- :type mbox: str or unicode
- :param uid: the UID for the message
- :type uid: int
- :param message: a message to be added
- :type message: MessageWrapper
- :param notify_on_disk: whether the deferred that is returned should
- wait until the message is written to disk to
- be fired.
- :type notify_on_disk: bool
-
- :return: a Deferred. if notify_on_disk is True, will be fired
- when written to the db on disk.
- Otherwise will fire inmediately
- :rtype: Deferred
- """
- key = mbox, uid
- d = defer.Deferred()
- d.addCallback(lambda result: log.msg("message PUT save: %s" % result))
-
- self._dirty.add(key)
- self._dirty_deferreds[key] = d
- self._add_message(mbox, uid, message, notify_on_disk)
- return d
-
- def _add_message(self, mbox, uid, message, notify_on_disk=True):
- """
- Helper method, called by both create_message and put_message.
- See those for parameter documentation.
- """
- msg_dict = message.as_dict()
-
- fdoc = msg_dict.get(FDOC, None)
- if fdoc is not None:
- fdoc_store = self._fdoc_store[mbox][uid]
- fdoc_store.update(fdoc)
- chash_fdoc_store = self._chash_fdoc_store
-
- # content-hash indexing
- chash = fdoc.get(fields.CONTENT_HASH_KEY)
- chash_fdoc_store[chash][mbox] = weakref.proxy(
- self._fdoc_store[mbox][uid])
-
- hdoc = msg_dict.get(HDOC, None)
- if hdoc is not None:
- chash = hdoc.get(fields.CONTENT_HASH_KEY)
- hdoc_store = self._hdoc_store[chash]
- hdoc_store.update(hdoc)
-
- cdocs = message.cdocs
- for cdoc in cdocs.values():
- phash = cdoc.get(fields.PAYLOAD_HASH_KEY, None)
- if not phash:
- continue
- cdoc_store = self._cdoc_store[phash]
- cdoc_store.update(cdoc)
-
- # Update memory store size
- # XXX this should use [mbox][uid]
- # TODO --- this has to be deferred to thread,
- # TODO add hdoc and cdocs sizes too
- # it's slowing things down here.
- # key = mbox, uid
- # self._sizes[key] = size.get_size(self._fdoc_store[key])
-
- def purge_fdoc_store(self, mbox):
- """
- Purge the empty documents from a fdoc store.
- Called during initialization of the SoledadMailbox
-
- :param mbox: the mailbox
- :type mbox: str or unicode
- """
- # XXX This is really a workaround until I find the conditions
- # that are making the empty items remain there.
- # This happens, for instance, after running several times
- # the regression test, that issues a store deleted + expunge + select
- # The items are being correclty deleted, but in succesive appends
- # the empty items with previously deleted uids reappear as empty
- # documents. I suspect it's a timing condition with a previously
- # evaluated sequence being used after the items has been removed.
-
- for uid, value in self._fdoc_store[mbox].items():
- if empty(value):
- del self._fdoc_store[mbox][uid]
-
- def get_docid_for_fdoc(self, mbox, uid):
- """
- Return Soledad document id for the flags-doc for a given mbox and uid,
- or None of no flags document could be found.
-
- :param mbox: the mailbox
- :type mbox: str or unicode
- :param uid: the message UID
- :type uid: int
- :rtype: unicode or None
- """
- with self._fdoc_docid_lock:
- doc_id = self._fdoc_id_store[mbox][uid]
-
- if empty(doc_id):
- fdoc = self._permanent_store.get_flags_doc(mbox, uid)
- if empty(fdoc) or empty(fdoc.content):
- return None
- doc_id = fdoc.doc_id
- self._fdoc_id_store[mbox][uid] = doc_id
-
- return doc_id
-
- def get_message(self, mbox, uid, dirtystate=DirtyState.none,
- flags_only=False):
- """
- Get a MessageWrapper for the given mbox and uid combination.
-
- :param mbox: the mailbox
- :type mbox: str or unicode
- :param uid: the message UID
- :type uid: int
- :param dirtystate: DirtyState enum: one of `dirty`, `new`
- or `none` (default)
- :type dirtystate: enum
- :param flags_only: whether the message should carry only a reference
- to the flags document.
- :type flags_only: bool
- :
-
- :return: MessageWrapper or None
- """
- # TODO -- return deferred
- if dirtystate == DirtyState.dirty:
- flags_only = True
-
- key = mbox, uid
-
- fdoc = self._fdoc_store[mbox][uid]
- if empty(fdoc):
- return None
-
- new, dirty = False, False
- if dirtystate == DirtyState.none:
- new, dirty = self._get_new_dirty_state(key)
- if dirtystate == DirtyState.dirty:
- new, dirty = False, True
- if dirtystate == DirtyState.new:
- new, dirty = True, False
-
- if flags_only:
- return MessageWrapper(fdoc=fdoc,
- new=new, dirty=dirty,
- memstore=weakref.proxy(self))
- else:
- chash = fdoc.get(fields.CONTENT_HASH_KEY)
- hdoc = self._hdoc_store[chash]
- if empty(hdoc):
- # XXX this will be a deferred
- hdoc = self._permanent_store.get_headers_doc(chash)
- if empty(hdoc):
- return None
- if not empty(hdoc.content):
- self._hdoc_store[chash] = hdoc.content
- hdoc = hdoc.content
- cdocs = None
-
- pmap = hdoc.get(fields.PARTS_MAP_KEY, None)
- if new and pmap is not None:
- # take the different cdocs for write...
- cdoc_store = self._cdoc_store
- cdocs_list = phash_iter(hdoc)
- cdocs = dict(enumerate(
- [cdoc_store[phash] for phash in cdocs_list], 1))
-
- return MessageWrapper(fdoc=fdoc, hdoc=hdoc, cdocs=cdocs,
- new=new, dirty=dirty,
- memstore=weakref.proxy(self))
-
- def remove_message(self, mbox, uid):
- """
- Remove a Message from this MemoryStore.
-
- :param mbox: the mailbox
- :type mbox: str or unicode
- :param uid: the message UID
- :type uid: int
- """
- # XXX For the moment we are only removing the flags and headers
- # docs. The rest we leave there polluting your hard disk,
- # until we think about a good way of deorphaning.
-
- # XXX implement elijah's idea of using a PUT document as a
- # token to ensure consistency in the removal.
-
- try:
- del self._fdoc_store[mbox][uid]
- except KeyError:
- pass
-
- try:
- key = mbox, uid
- self._new.discard(key)
- self._dirty.discard(key)
- if key in self._sizes:
- del self._sizes[key]
- self._known_uids[mbox].discard(uid)
- except KeyError:
- pass
- except Exception as exc:
- logger.error("error while removing message!")
- logger.exception(exc)
- try:
- with self._fdoc_docid_lock:
- del self._fdoc_id_store[mbox][uid]
- except KeyError:
- pass
- except Exception as exc:
- logger.error("error while removing message!")
- logger.exception(exc)
-
- # IMessageStoreWriter
-
- # TODO -- I think we don't need this anymore.
- # instead, we can have
- def write_messages(self, store):
- """
- Write the message documents in this MemoryStore to a different store.
-
- :param store: the IMessageStore to write to
- :rtype: False if queue is not empty, None otherwise.
- """
- # For now, we pass if the queue is not empty, to avoid duplicate
- # queuing.
- # We would better use a flag to know when we've already enqueued an
- # item.
-
- # XXX this could return the deferred for all the enqueued operations
-
- if not self.producer.is_queue_empty():
- return False
-
- if any(map(lambda i: not empty(i), (self._new, self._dirty))):
- logger.info("Writing messages to Soledad...")
-
- # TODO change for lock, and make the property access
- # is accquired
- with set_bool_flag(self, self.WRITING_FLAG):
- for rflags_doc_wrapper in self.all_rdocs_iter():
- self.producer.push(rflags_doc_wrapper,
- state=self.producer.STATE_DIRTY)
- for msg_wrapper in self.all_new_msg_iter():
- self.producer.push(msg_wrapper,
- state=self.producer.STATE_NEW)
- for msg_wrapper in self.all_dirty_msg_iter():
- self.producer.push(msg_wrapper,
- state=self.producer.STATE_DIRTY)
-
- # MemoryStore specific methods.
-
- def get_uids(self, mbox):
- """
- Get all uids for a given mbox.
-
- :param mbox: the mailbox
- :type mbox: str or unicode
- :rtype: list
- """
- return self._fdoc_store[mbox].keys()
-
- def get_soledad_known_uids(self, mbox):
- """
- Get all uids that soledad knows about, from the memory cache.
- :param mbox: the mailbox
- :type mbox: str or unicode
- :rtype: list
- """
- return self._known_uids.get(mbox, [])
-
- # last_uid
-
- def get_last_uid(self, mbox):
- """
- Return the highest UID for a given mbox.
- It will be the highest between the highest uid in the message store for
- the mailbox, and the soledad integer cache.
-
- :param mbox: the mailbox
- :type mbox: str or unicode
- :rtype: int
- """
- uids = self.get_uids(mbox)
- last_mem_uid = uids and max(uids) or 0
- last_soledad_uid = self.get_last_soledad_uid(mbox)
- return max(last_mem_uid, last_soledad_uid)
-
- def get_last_soledad_uid(self, mbox):
- """
- Get last uid for a given mbox from the soledad integer cache.
-
- :param mbox: the mailbox
- :type mbox: str or unicode
- """
- return self._last_uid.get(mbox, 0)
-
- def set_last_soledad_uid(self, mbox, value):
- """
- Set last uid for a given mbox in the soledad integer cache.
- SoledadMailbox should prime this value during initialization.
- Other methods (during message adding) SHOULD call
- `increment_last_soledad_uid` instead.
-
- :param mbox: the mailbox
- :type mbox: str or unicode
- :param value: the value to set
- :type value: int
- """
- # can be long???
- # leap_assert_type(value, int)
- logger.info("setting last soledad uid for %s to %s" %
- (mbox, value))
- # if we already have a value here, don't do anything
- with self._last_uid_lock:
- if not self._last_uid.get(mbox, None):
- self._last_uid[mbox] = value
-
- def set_known_uids(self, mbox, value):
- """
- Set the value fo the known-uids set for this mbox.
-
- :param mbox: the mailbox
- :type mbox: str or unicode
- :param value: a sequence of integers to be added to the set.
- :type value: tuple
- """
- current = self._known_uids[mbox]
- self._known_uids[mbox] = current.union(set(value))
-
- def increment_last_soledad_uid(self, mbox):
- """
- Increment by one the soledad integer cache for the last_uid for
- this mbox, and fire a defer-to-thread to update the soledad value.
- The caller should lock the call tho this method.
-
- :param mbox: the mailbox
- :type mbox: str or unicode
- """
- with self._last_uid_lock:
- self._last_uid[mbox] += 1
- value = self._last_uid[mbox]
- reactor.callInThread(self.write_last_uid, mbox, value)
- return value
-
- def write_last_uid(self, mbox, value):
- """
- Increment the soledad integer cache for the highest uid value.
-
- :param mbox: the mailbox
- :type mbox: str or unicode
- :param value: the value to set
- :type value: int
- """
- leap_assert_type(value, int)
- if self._permanent_store:
- self._permanent_store.write_last_uid(mbox, value)
-
- def load_flag_docs(self, mbox, flag_docs):
- """
- Load the flag documents for the given mbox.
- Used during initial flag docs prefetch.
-
- :param mbox: the mailbox
- :type mbox: str or unicode
- :param flag_docs: a dict with the content for the flag docs, indexed
- by uid.
- :type flag_docs: dict
- """
- # We can do direct assignments cause we know this will only
- # be called during initialization of the mailbox.
- # TODO could hook here a sanity-check
- # for duplicates
-
- fdoc_store = self._fdoc_store[mbox]
- chash_fdoc_store = self._chash_fdoc_store
- for uid in flag_docs:
- rdict = ReferenciableDict(flag_docs[uid])
- fdoc_store[uid] = rdict
- # populate chash dict too, to avoid fdoc duplication
- chash = flag_docs[uid]["chash"]
- chash_fdoc_store[chash][mbox] = weakref.proxy(
- self._fdoc_store[mbox][uid])
-
- def update_flags(self, mbox, uid, fdoc):
- """
- Update the flag document for a given mbox and uid combination,
- and set the dirty flag.
- We could use put_message, but this is faster.
-
- :param mbox: the mailbox
- :type mbox: str or unicode
- :param uid: the uid of the message
- :type uid: int
-
- :param fdoc: a dict with the content for the flag docs
- :type fdoc: dict
- """
- key = mbox, uid
- self._fdoc_store[mbox][uid].update(fdoc)
- self._dirty.add(key)
-
- def load_header_docs(self, header_docs):
- """
- Load the flag documents for the given mbox.
- Used during header docs prefetch, and during cache after
- a read from soledad if the hdoc property in message did not
- find its value in here.
-
- :param flag_docs: a dict with the content for the flag docs.
- :type flag_docs: dict
- """
- hdoc_store = self._hdoc_store
- for chash in header_docs:
- hdoc_store[chash] = ReferenciableDict(header_docs[chash])
-
- def all_flags(self, mbox):
- """
- Return a dictionary with all the flags for a given mbox.
-
- :param mbox: the mailbox
- :type mbox: str or unicode
- :rtype: dict
- """
- fdict = {}
- uids = self.get_uids(mbox)
- fstore = self._fdoc_store[mbox]
-
- for uid in uids:
- try:
- fdict[uid] = fstore[uid][fields.FLAGS_KEY]
- except KeyError:
- continue
- return fdict
-
- def all_headers(self, mbox):
- """
- Return a dictionary with all the header docs for a given mbox.
-
- :param mbox: the mailbox
- :type mbox: str or unicode
- :rtype: dict
- """
- headers_dict = {}
- uids = self.get_uids(mbox)
- fdoc_store = self._fdoc_store[mbox]
- hdoc_store = self._hdoc_store
-
- for uid in uids:
- try:
- chash = fdoc_store[uid][fields.CONTENT_HASH_KEY]
- hdoc = hdoc_store[chash]
- if not empty(hdoc):
- headers_dict[uid] = hdoc
- except KeyError:
- continue
- return headers_dict
-
- # Counting sheeps...
-
- def count_new_mbox(self, mbox):
- """
- Count the new messages by mailbox.
-
- :param mbox: the mailbox
- :type mbox: str or unicode
- :return: number of new messages
- :rtype: int
- """
- return len([(m, uid) for m, uid in self._new if mbox == mbox])
-
- # XXX used at all?
- def count_new(self):
- """
- Count all the new messages in the MemoryStore.
-
- :rtype: int
- """
- return len(self._new)
-
- def count(self, mbox):
- """
- Return the count of messages for a given mbox.
-
- :param mbox: the mailbox
- :type mbox: str or unicode
- :return: number of messages
- :rtype: int
- """
- return len(self._fdoc_store[mbox])
-
- def unseen_iter(self, mbox):
- """
- Get an iterator for the message UIDs with no `seen` flag
- for a given mailbox.
-
- :param mbox: the mailbox
- :type mbox: str or unicode
- :return: iterator through unseen message doc UIDs
- :rtype: iterable
- """
- fdocs = self._fdoc_store[mbox]
-
- return [uid for uid, value
- in fdocs.items()
- if fields.SEEN_FLAG not in value.get(fields.FLAGS_KEY, [])]
-
- def get_cdoc_from_phash(self, phash):
- """
- Return a content-document by its payload-hash.
-
- :param phash: the payload hash to check against
- :type phash: str or unicode
- :rtype: MessagePartDoc
- """
- doc = self._cdoc_store.get(phash, None)
-
- # XXX return None for consistency?
-
- # XXX have to keep a mapping between phash and its linkage
- # info, to know if this payload is been already saved or not.
- # We will be able to get this from the linkage-docs,
- # not yet implemented.
- new = True
- dirty = False
- return MessagePartDoc(
- new=new, dirty=dirty, store="mem",
- part=MessagePartType.cdoc,
- content=doc,
- doc_id=None)
-
- def get_fdoc_from_chash(self, chash, mbox):
- """
- Return a flags-document by its content-hash and a given mailbox.
- Used during content-duplication detection while copying or adding a
- message.
-
- :param chash: the content hash to check against
- :type chash: str or unicode
- :param mbox: the mailbox
- :type mbox: str or unicode
-
- :return: MessagePartDoc. It will return None if the flags document
- has empty content or it is flagged as \\Deleted.
- """
- fdoc = self._chash_fdoc_store[chash][mbox]
-
- # a couple of special cases.
- # 1. We might have a doc with empty content...
- if empty(fdoc):
- return None
-
- # 2. ...Or the message could exist, but being flagged for deletion.
- # We want to create a new one in this case.
- # Hmmm what if the deletion is un-done?? We would end with a
- # duplicate...
- if fdoc and fields.DELETED_FLAG in fdoc.get(fields.FLAGS_KEY, []):
- return None
-
- uid = fdoc[fields.UID_KEY]
- key = mbox, uid
- new = key in self._new
- dirty = key in self._dirty
-
- return MessagePartDoc(
- new=new, dirty=dirty, store="mem",
- part=MessagePartType.fdoc,
- content=fdoc,
- doc_id=None)
-
- def iter_fdoc_keys(self):
- """
- Return a generator through all the mbox, uid keys in the flags-doc
- store.
- """
- fdoc_store = self._fdoc_store
- for mbox in fdoc_store:
- for uid in fdoc_store[mbox]:
- yield mbox, uid
-
- def all_new_msg_iter(self):
- """
- Return generator that iterates through all new messages.
-
- :return: generator of MessageWrappers
- :rtype: generator
- """
- gm = self.get_message
- # need to freeze, set can change during iteration
- new = [gm(*key, dirtystate=DirtyState.new) for key in tuple(self._new)]
- # move content from new set to the queue
- self._new_queue.update(self._new)
- self._new.difference_update(self._new)
- return new
-
- def all_dirty_msg_iter(self):
- """
- Return generator that iterates through all dirty messages.
-
- :return: generator of MessageWrappers
- :rtype: generator
- """
- gm = self.get_message
- # need to freeze, set can change during iteration
- dirty = [gm(*key, flags_only=True, dirtystate=DirtyState.dirty)
- for key in tuple(self._dirty)]
- # move content from new and dirty sets to the queue
-
- self._dirty_queue.update(self._dirty)
- self._dirty.difference_update(self._dirty)
- return dirty
-
- def all_deleted_uid_iter(self, mbox):
- """
- Return a list with the UIDs for all messags
- with deleted flag in a given mailbox.
-
- :param mbox: the mailbox
- :type mbox: str or unicode
- :return: list of integers
- :rtype: list
- """
- # This *needs* to return a fixed sequence. Otherwise the dictionary len
- # will change during iteration, when we modify it
- fdocs = self._fdoc_store[mbox]
- return [uid for uid, value
- in fdocs.items()
- if fields.DELETED_FLAG in value.get(fields.FLAGS_KEY, [])]
-
- # new, dirty flags
-
- def _get_new_dirty_state(self, key):
- """
- Return `new` and `dirty` flags for a given message.
-
- :param key: the key for the message, in the form mbox, uid
- :type key: tuple
- :return: tuple of bools
- :rtype: tuple
- """
- # TODO change indexing of sets to [mbox][key] too.
- # XXX should return *first* the news, and *then* the dirty...
-
- # TODO should query in queues too , true?
- #
- return map(lambda _set: key in _set, (self._new, self._dirty))
-
- def set_new_queued(self, key):
- """
- Add the key value to the `new-queue` set.
-
- :param key: the key for the message, in the form mbox, uid
- :type key: tuple
- """
- self._new_queue.add(key)
-
- def unset_new_queued(self, key):
- """
- Remove the key value from the `new-queue` set.
-
- :param key: the key for the message, in the form mbox, uid
- :type key: tuple
- """
- self._new_queue.discard(key)
- deferreds = self._new_deferreds
- d = deferreds.get(key, None)
- if d:
- # XXX use a namedtuple for passing the result
- # when we check it in the other side.
- d.callback('%s, ok' % str(key))
- deferreds.pop(key)
-
- def set_dirty_queued(self, key):
- """
- Add the key value to the `dirty-queue` set.
-
- :param key: the key for the message, in the form mbox, uid
- :type key: tuple
- """
- self._dirty_queue.add(key)
-
- def unset_dirty_queued(self, key):
- """
- Remove the key value from the `dirty-queue` set.
-
- :param key: the key for the message, in the form mbox, uid
- :type key: tuple
- """
- self._dirty_queue.discard(key)
- deferreds = self._dirty_deferreds
- d = deferreds.get(key, None)
- if d:
- # XXX use a namedtuple for passing the result
- # when we check it in the other side.
- d.callback('%s, ok' % str(key))
- deferreds.pop(key)
-
- # Recent Flags
-
- def set_recent_flag(self, mbox, uid):
- """
- Set the `Recent` flag for a given mailbox and UID.
-
- :param mbox: the mailbox
- :type mbox: str or unicode
- :param uid: the message UID
- :type uid: int
- """
- self._rflags_dirty.add(mbox)
- self._rflags_store[mbox]['set'].add(uid)
-
- # TODO --- nice but unused
- def unset_recent_flag(self, mbox, uid):
- """
- Unset the `Recent` flag for a given mailbox and UID.
-
- :param mbox: the mailbox
- :type mbox: str or unicode
- :param uid: the message UID
- :type uid: int
- """
- self._rflags_store[mbox]['set'].discard(uid)
-
- def set_recent_flags(self, mbox, value):
- """
- Set the value for the set of the recent flags.
- Used from the property in the MessageCollection.
-
- :param mbox: the mailbox
- :type mbox: str or unicode
- :param value: a sequence of flags to set
- :type value: sequence
- """
- self._rflags_dirty.add(mbox)
- self._rflags_store[mbox]['set'] = set(value)
-
- def load_recent_flags(self, mbox, flags_doc):
- """
- Load the passed flags document in the recent flags store, for a given
- mailbox.
-
- :param mbox: the mailbox
- :type mbox: str or unicode
- :param flags_doc: A dictionary containing the `doc_id` of the Soledad
- flags-document for this mailbox, and the `set`
- of uids marked with that flag.
- """
- self._rflags_store[mbox] = flags_doc
-
- def get_recent_flags(self, mbox):
- """
- Return the set of UIDs with the `Recent` flag for this mailbox.
-
- :param mbox: the mailbox
- :type mbox: str or unicode
- :rtype: set, or None
- """
- rflag_for_mbox = self._rflags_store.get(mbox, None)
- if not rflag_for_mbox:
- return None
- return self._rflags_store[mbox]['set']
-
- # XXX -- remove
- def all_rdocs_iter(self):
- """
- Return an iterator through all in-memory recent flag dicts, wrapped
- under a RecentFlagsDoc namedtuple.
- Used for saving to disk.
-
- :return: a generator of RecentFlagDoc
- :rtype: generator
- """
- # XXX use enums
- DOC_ID = "doc_id"
- SET = "set"
-
- rflags_store = self._rflags_store
-
- def get_rdoc(mbox, rdict):
- mbox_rflag_set = rdict[SET]
- recent_set = copy(mbox_rflag_set)
- # zero it!
- mbox_rflag_set.difference_update(mbox_rflag_set)
- return RecentFlagsDoc(
- doc_id=rflags_store[mbox][DOC_ID],
- content={
- fields.TYPE_KEY: fields.TYPE_RECENT_VAL,
- fields.MBOX_KEY: mbox,
- fields.RECENTFLAGS_KEY: list(recent_set)
- })
-
- return (get_rdoc(mbox, rdict) for mbox, rdict in rflags_store.items()
- if not empty(rdict[SET]))
-
- # Methods that mirror the IMailbox interface
-
- def remove_all_deleted(self, mbox):
- """
- Remove all messages flagged \\Deleted from this Memory Store only.
- Called from `expunge`
-
- :param mbox: the mailbox
- :type mbox: str or unicode
- :return: a list of UIDs
- :rtype: list
- """
- mem_deleted = self.all_deleted_uid_iter(mbox)
- for uid in mem_deleted:
- self.remove_message(mbox, uid)
- return mem_deleted
-
- # TODO -- remove
- def stop_and_flush(self):
- """
- Stop the write loop and trigger a write to the producer.
- """
- self._stop_write_loop()
- if self._permanent_store is not None:
- # XXX we should check if we did get a True value on this
- # operation. If we got False we should retry! (queue was not empty)
- self.write_messages(self._permanent_store)
- self.producer.flush()
-
- def expunge(self, mbox, observer):
- """
- Remove all messages flagged \\Deleted, from the Memory Store
- and from the permanent store also.
-
- It first queues up a last write, and wait for the deferreds to be done
- before continuing.
-
- :param mbox: the mailbox
- :type mbox: str or unicode
- :param observer: a deferred that will be fired when expunge is done
- :type observer: Deferred
- """
- soledad_store = self._permanent_store
- if soledad_store is None:
- # just-in memory store, easy then.
- self._delete_from_memory(mbox, observer)
- return
-
- # We have a soledad storage.
- try:
- # Stop and trigger last write
- self.stop_and_flush()
- # Wait on the writebacks to finish
-
- # XXX what if pending deferreds is empty?
- pending_deferreds = (self._new_deferreds.get(mbox, []) +
- self._dirty_deferreds.get(mbox, []))
- d1 = defer.gatherResults(pending_deferreds, consumeErrors=True)
- d1.addCallback(
- self._delete_from_soledad_and_memory, mbox, observer)
- except Exception as exc:
- logger.exception(exc)
-
- def _delete_from_memory(self, mbox, observer):
- """
- Remove all messages marked as deleted from soledad and memory.
-
- :param mbox: the mailbox
- :type mbox: str or unicode
- :param observer: a deferred that will be fired when expunge is done
- :type observer: Deferred
- """
- mem_deleted = self.remove_all_deleted(mbox)
- # TODO return a DeferredList
- observer.callback(mem_deleted)
-
- def _delete_from_soledad_and_memory(self, result, mbox, observer):
- """
- Remove all messages marked as deleted from soledad and memory.
-
- :param result: ignored. the result of the deferredList that triggers
- this as a callback from `expunge`.
- :param mbox: the mailbox
- :type mbox: str or unicode
- :param observer: a deferred that will be fired when expunge is done
- :type observer: Deferred
- """
- all_deleted = []
- soledad_store = self._permanent_store
-
- try:
- # 1. Delete all messages marked as deleted in soledad.
- logger.debug("DELETING FROM SOLEDAD ALL FOR %r" % (mbox,))
- sol_deleted = soledad_store.remove_all_deleted(mbox)
-
- try:
- self._known_uids[mbox].difference_update(set(sol_deleted))
- except Exception as exc:
- logger.exception(exc)
-
- # 2. Delete all messages marked as deleted in memory.
- logger.debug("DELETING FROM MEM ALL FOR %r" % (mbox,))
- mem_deleted = self.remove_all_deleted(mbox)
-
- all_deleted = set(mem_deleted).union(set(sol_deleted))
- logger.debug("deleted %r" % all_deleted)
- except Exception as exc:
- logger.exception(exc)
- finally:
- self._start_write_loop()
-
- observer.callback(all_deleted)
-
- # Mailbox documents and attributes
-
- # This could be also be cached in memstore, but proxying directly
- # to soledad since it's not too performance-critical.
-
- def get_mbox_doc(self, mbox):
- """
- Return the soledad document for a given mailbox.
-
- :param mbox: the mailbox
- :type mbox: str or unicode
- :rtype: SoledadDocument or None.
- """
- if self.permanent_store is not None:
- return self.permanent_store.get_mbox_document(mbox)
- else:
- return None
-
- def get_mbox_closed(self, mbox):
- """
- Return the closed attribute for a given mailbox.
-
- :param mbox: the mailbox
- :type mbox: str or unicode
- :rtype: bool
- """
- if self.permanent_store is not None:
- return self.permanent_store.get_mbox_closed(mbox)
- else:
- return self._mbox_closed[mbox]
-
- def set_mbox_closed(self, mbox, closed):
- """
- Set the closed attribute for a given mailbox.
-
- :param mbox: the mailbox
- :type mbox: str or unicode
- """
- if self.permanent_store is not None:
- self.permanent_store.set_mbox_closed(mbox, closed)
- else:
- self._mbox_closed[mbox] = closed
-
- def get_mbox_flags(self, mbox):
- """
- Get the flags for a given mbox.
- :rtype: list
- """
- return sorted(self._mbox_flags[mbox])
-
- def set_mbox_flags(self, mbox, flags):
- """
- Set the mbox flags
- """
- self._mbox_flags[mbox] = set(flags)
- # TODO
- # This should write to the permanent store!!!
-
- # Rename flag-documents
-
- def rename_fdocs_mailbox(self, old_mbox, new_mbox):
- """
- Change the mailbox name for all flag documents in a given mailbox.
- Used from account.rename
-
- :param old_mbox: name for the old mbox
- :type old_mbox: str or unicode
- :param new_mbox: name for the new mbox
- :type new_mbox: str or unicode
- """
- fs = self._fdoc_store
- keys = fs[old_mbox].keys()
- for k in keys:
- fdoc = fs[old_mbox][k]
- fdoc['mbox'] = new_mbox
- fs[new_mbox][k] = fdoc
- fs[old_mbox].pop(k)
- self._dirty.add((new_mbox, k))
-
- # Dump-to-disk controls.
-
- @property
- def is_writing(self):
- """
- Property that returns whether the store is currently writing its
- internal state to a permanent storage.
-
- Used to evaluate whether the CHECK command can inform that the field
- is clear to proceed, or waiting for the write operations to complete
- is needed instead.
-
- :rtype: bool
- """
- # FIXME this should return a deferred !!!
- # TODO this should be moved to soledadStore instead
- # (all pending deferreds)
- return getattr(self, self.WRITING_FLAG)
-
- @property
- def permanent_store(self):
- return self._permanent_store
-
- # Memory management.
-
- def get_size(self):
- """
- Return the size of the internal storage.
- Use for calculating the limit beyond which we should flush the store.
-
- :rtype: int
- """
- return reduce(lambda x, y: x + y, self._sizes, 0)
diff --git a/src/leap/mail/imap/messageparts.py b/src/leap/mail/imap/messageparts.py
deleted file mode 100644
index fb1d75a..0000000
--- a/src/leap/mail/imap/messageparts.py
+++ /dev/null
@@ -1,586 +0,0 @@
-# messageparts.py
-# Copyright (C) 2014 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-"""
-MessagePart implementation. Used from LeapMessage.
-"""
-import logging
-import StringIO
-import weakref
-
-from collections import namedtuple
-
-from enum import Enum
-from zope.interface import implements
-from twisted.mail import imap4
-
-from leap.common.decorators import memoized_method
-from leap.common.mail import get_email_charset
-from leap.mail.imap import interfaces
-from leap.mail.imap.fields import fields
-from leap.mail.utils import empty, first, find_charset
-
-MessagePartType = Enum("MessagePartType", "hdoc fdoc cdoc cdocs docs_id")
-
-
-logger = logging.getLogger(__name__)
-
-
-"""
-A MessagePartDoc is a light wrapper around the dictionary-like
-data that we pass along for message parts. It can be used almost everywhere
-that you would expect a SoledadDocument, since it has a dict under the
-`content` attribute.
-
-We also keep some metadata on it, relative in part to the message as a whole,
-and sometimes to a part in particular only.
-
-* `new` indicates that the document has just been created. SoledadStore
- should just create a new doc for all the related message parts.
-* `store` indicates the type of store a given MessagePartDoc lives in.
- We currently use this to indicate that the document comes from memeory,
- but we should probably get rid of it as soon as we extend the use of the
- SoledadStore interface along LeapMessage, MessageCollection and Mailbox.
-* `part` is one of the MessagePartType enums.
-
-* `dirty` indicates that, while we already have the document in Soledad,
- we have modified its state in memory, so we need to put_doc instead while
- dumping the MemoryStore contents.
- `dirty` attribute would only apply to flags-docs and linkage-docs.
-* `doc_id` is the identifier for the document in the u1db database, if any.
-
-"""
-
-MessagePartDoc = namedtuple(
- 'MessagePartDoc',
- ['new', 'dirty', 'part', 'store', 'content', 'doc_id'])
-
-"""
-A RecentFlagsDoc is used to send the recent-flags document payload to the
-SoledadWriter during dumps.
-"""
-RecentFlagsDoc = namedtuple(
- 'RecentFlagsDoc',
- ['content', 'doc_id'])
-
-
-class ReferenciableDict(dict):
- """
- A dict that can be weak-referenced.
-
- Some builtin objects are not weak-referenciable unless
- subclassed. So we do.
-
- Used to return pointers to the items in the MemoryStore.
- """
-
-
-class MessageWrapper(object):
- """
- A simple nested dictionary container around the different message subparts.
- """
- implements(interfaces.IMessageContainer)
-
- FDOC = "fdoc"
- HDOC = "hdoc"
- CDOCS = "cdocs"
- DOCS_ID = "docs_id"
-
- # Using slots to limit some the memory use,
- # Add your attribute here.
-
- __slots__ = ["_dict", "_new", "_dirty", "_storetype", "memstore"]
-
- def __init__(self, fdoc=None, hdoc=None, cdocs=None,
- from_dict=None, memstore=None,
- new=True, dirty=False, docs_id={}):
- """
- Initialize a MessageWrapper.
- """
- # TODO add optional reference to original message in the incoming
- self._dict = {}
- self.memstore = memstore
-
- self._new = new
- self._dirty = dirty
-
- self._storetype = "mem"
-
- if from_dict is not None:
- self.from_dict(from_dict)
- else:
- if fdoc is not None:
- self._dict[self.FDOC] = ReferenciableDict(fdoc)
- if hdoc is not None:
- self._dict[self.HDOC] = ReferenciableDict(hdoc)
- if cdocs is not None:
- self._dict[self.CDOCS] = ReferenciableDict(cdocs)
-
- # This will keep references to the doc_ids to be able to put
- # messages to soledad. It will be populated during the walk() to avoid
- # the overhead of reading from the db.
-
- # XXX it really *only* make sense for the FDOC, the other parts
- # should not be "dirty", just new...!!!
- self._dict[self.DOCS_ID] = docs_id
-
- # properties
-
- # TODO Could refactor new and dirty properties together.
-
- def _get_new(self):
- """
- Get the value for the `new` flag.
-
- :rtype: bool
- """
- return self._new
-
- def _set_new(self, value=False):
- """
- Set the value for the `new` flag, and propagate it
- to the memory store if any.
-
- :param value: the value to set
- :type value: bool
- """
- self._new = value
- if self.memstore:
- mbox = self.fdoc.content.get('mbox', None)
- uid = self.fdoc.content.get('uid', None)
- if not mbox or not uid:
- logger.warning("Malformed fdoc")
- return
- key = mbox, uid
- fun = [self.memstore.unset_new_queued,
- self.memstore.set_new_queued][int(value)]
- fun(key)
- else:
- logger.warning("Could not find a memstore referenced from this "
- "MessageWrapper. The value for new will not be "
- "propagated")
-
- new = property(_get_new, _set_new,
- doc="The `new` flag for this MessageWrapper")
-
- def _get_dirty(self):
- """
- Get the value for the `dirty` flag.
-
- :rtype: bool
- """
- return self._dirty
-
- def _set_dirty(self, value=True):
- """
- Set the value for the `dirty` flag, and propagate it
- to the memory store if any.
-
- :param value: the value to set
- :type value: bool
- """
- self._dirty = value
- if self.memstore:
- mbox = self.fdoc.content.get('mbox', None)
- uid = self.fdoc.content.get('uid', None)
- if not mbox or not uid:
- logger.warning("Malformed fdoc")
- return
- key = mbox, uid
- fun = [self.memstore.unset_dirty_queued,
- self.memstore.set_dirty_queued][int(value)]
- fun(key)
- else:
- logger.warning("Could not find a memstore referenced from this "
- "MessageWrapper. The value for new will not be "
- "propagated")
-
- dirty = property(_get_dirty, _set_dirty)
-
- # IMessageContainer
-
- @property
- def fdoc(self):
- """
- Return a MessagePartDoc wrapping around a weak reference to
- the flags-document in this MemoryStore, if any.
-
- :rtype: MessagePartDoc
- """
- _fdoc = self._dict.get(self.FDOC, None)
- if _fdoc:
- content_ref = weakref.proxy(_fdoc)
- else:
- logger.warning("NO FDOC!!!")
- content_ref = {}
-
- return MessagePartDoc(new=self.new, dirty=self.dirty,
- store=self._storetype,
- part=MessagePartType.fdoc,
- content=content_ref,
- doc_id=self._dict[self.DOCS_ID].get(
- self.FDOC, None))
-
- @property
- def hdoc(self):
- """
- Return a MessagePartDoc wrapping around a weak reference to
- the headers-document in this MemoryStore, if any.
-
- :rtype: MessagePartDoc
- """
- _hdoc = self._dict.get(self.HDOC, None)
- if _hdoc:
- content_ref = weakref.proxy(_hdoc)
- else:
- content_ref = {}
- return MessagePartDoc(new=self.new, dirty=self.dirty,
- store=self._storetype,
- part=MessagePartType.hdoc,
- content=content_ref,
- doc_id=self._dict[self.DOCS_ID].get(
- self.HDOC, None))
-
- @property
- def cdocs(self):
- """
- Return a weak reference to a zero-indexed dict containing
- the content-documents, or an empty dict if none found.
- If you want access to the MessagePartDoc for the individual
- parts, use the generator returned by `walk` instead.
-
- :rtype: dict
- """
- _cdocs = self._dict.get(self.CDOCS, None)
- if _cdocs:
- return weakref.proxy(_cdocs)
- else:
- return {}
-
- def walk(self):
- """
- Generator that iterates through all the parts, returning
- MessagePartDoc. Used for writing to SoledadStore.
-
- :rtype: generator
- """
- if self._dirty:
- try:
- mbox = self.fdoc.content[fields.MBOX_KEY]
- uid = self.fdoc.content[fields.UID_KEY]
- docid_dict = self._dict[self.DOCS_ID]
- docid_dict[self.FDOC] = self.memstore.get_docid_for_fdoc(
- mbox, uid)
- except Exception as exc:
- logger.debug("Error while walking message...")
- logger.exception(exc)
-
- if not empty(self.fdoc.content) and 'uid' in self.fdoc.content:
- yield self.fdoc
- if not empty(self.hdoc.content):
- yield self.hdoc
- for cdoc in self.cdocs.values():
- if not empty(cdoc):
- content_ref = weakref.proxy(cdoc)
- yield MessagePartDoc(new=self.new, dirty=self.dirty,
- store=self._storetype,
- part=MessagePartType.cdoc,
- content=content_ref,
- doc_id=None)
-
- # i/o
-
- def as_dict(self):
- """
- Return a dict representation of the parts contained.
-
- :rtype: dict
- """
- return self._dict
-
- def from_dict(self, msg_dict):
- """
- Populate MessageWrapper parts from a dictionary.
- It expects the same format that we use in a
- MessageWrapper.
-
-
- :param msg_dict: a dictionary containing the parts to populate
- the MessageWrapper from
- :type msg_dict: dict
- """
- fdoc, hdoc, cdocs = map(
- lambda part: msg_dict.get(part, None),
- [self.FDOC, self.HDOC, self.CDOCS])
-
- for t, doc in ((self.FDOC, fdoc), (self.HDOC, hdoc),
- (self.CDOCS, cdocs)):
- self._dict[t] = ReferenciableDict(doc) if doc else None
-
-
-class MessagePart(object):
- """
- IMessagePart implementor, to be passed to several methods
- of the IMAP4Server.
- It takes a subpart message and is able to find
- the inner parts.
-
- See the interface documentation.
- """
-
- implements(imap4.IMessagePart)
-
- def __init__(self, soledad, part_map):
- """
- Initializes the MessagePart.
-
- :param soledad: Soledad instance.
- :type soledad: Soledad
- :param part_map: a dictionary containing the parts map for this
- message
- :type part_map: dict
- """
- # TODO
- # It would be good to pass the uid/mailbox also
- # for references while debugging.
-
- # We have a problem on bulk moves, and is
- # that when the fetch on the new mailbox is done
- # the parts maybe are not complete.
- # So we should be able to fail with empty
- # docs until we solve that. The ideal would be
- # to gather the results of the deferred operations
- # to signal the operation is complete.
- #leap_assert(part_map, "part map dict cannot be null")
-
- self._soledad = soledad
- self._pmap = part_map
-
- def getSize(self):
- """
- Return the total size, in octets, of this message part.
-
- :return: size of the message, in octets
- :rtype: int
- """
- if empty(self._pmap):
- return 0
- size = self._pmap.get('size', None)
- if size is None:
- logger.error("Message part cannot find size in the partmap")
- size = 0
- return size
-
- def getBodyFile(self):
- """
- Retrieve a file object containing only the body of this message.
-
- :return: file-like object opened for reading
- :rtype: StringIO
- """
- fd = StringIO.StringIO()
- if not empty(self._pmap):
- multi = self._pmap.get('multi')
- if not multi:
- phash = self._pmap.get("phash", None)
- else:
- pmap = self._pmap.get('part_map')
- first_part = pmap.get('1', None)
- if not empty(first_part):
- phash = first_part['phash']
- else:
- phash = None
-
- if phash is None:
- logger.warning("Could not find phash for this subpart!")
- payload = ""
- else:
- payload = self._get_payload_from_document_memoized(phash)
- if empty(payload):
- payload = self._get_payload_from_document(phash)
-
- else:
- logger.warning("Message with no part_map!")
- payload = ""
-
- if payload:
- content_type = self._get_ctype_from_document(phash)
- charset = find_charset(content_type)
- if charset is None:
- charset = self._get_charset(payload)
- try:
- if isinstance(payload, unicode):
- payload = payload.encode(charset)
- except UnicodeError as exc:
- logger.error(
- "Unicode error, using 'replace'. {0!r}".format(exc))
- payload = payload.encode(charset, 'replace')
-
- fd.write(payload)
- fd.seek(0)
- return fd
-
- # TODO should memory-bound this memoize!!!
- @memoized_method
- def _get_payload_from_document_memoized(self, phash):
- """
- Memoized method call around the regular method, to be able
- to call the non-memoized method in case we got a None.
-
- :param phash: the payload hash to retrieve by.
- :type phash: str or unicode
- :rtype: str or unicode or None
- """
- return self._get_payload_from_document(phash)
-
- def _get_payload_from_document(self, phash):
- """
- Return the message payload from the content document.
-
- :param phash: the payload hash to retrieve by.
- :type phash: str or unicode
- :rtype: str or unicode or None
- """
- cdocs = self._soledad.get_from_index(
- fields.TYPE_P_HASH_IDX,
- fields.TYPE_CONTENT_VAL, str(phash))
-
- cdoc = first(cdocs)
- if cdoc is None:
- logger.warning(
- "Could not find the content doc "
- "for phash %s" % (phash,))
- payload = ""
- else:
- payload = cdoc.content.get(fields.RAW_KEY, "")
- return payload
-
- # TODO should memory-bound this memoize!!!
- @memoized_method
- def _get_ctype_from_document(self, phash):
- """
- Reeturn the content-type from the content document.
-
- :param phash: the payload hash to retrieve by.
- :type phash: str or unicode
- :rtype: str or unicode
- """
- cdocs = self._soledad.get_from_index(
- fields.TYPE_P_HASH_IDX,
- fields.TYPE_CONTENT_VAL, str(phash))
-
- cdoc = first(cdocs)
- if not cdoc:
- logger.warning(
- "Could not find the content doc "
- "for phash %s" % (phash,))
- ctype = cdoc.content.get('ctype', "")
- return ctype
-
- @memoized_method
- def _get_charset(self, stuff):
- # TODO put in a common class with LeapMessage
- """
- Gets (guesses?) the charset of a payload.
-
- :param stuff: the stuff to guess about.
- :type stuff: str or unicode
- :return: charset
- :rtype: unicode
- """
- # XXX existential doubt 2. shouldn't we make the scope
- # of the decorator somewhat more persistent?
- # ah! yes! and put memory bounds.
- return get_email_charset(stuff)
-
- def getHeaders(self, negate, *names):
- """
- Retrieve a group of message headers.
-
- :param names: The names of the headers to retrieve or omit.
- :type names: tuple of str
-
- :param negate: If True, indicates that the headers listed in names
- should be omitted from the return value, rather
- than included.
- :type negate: bool
-
- :return: A mapping of header field names to header field values
- :rtype: dict
- """
- # XXX refactor together with MessagePart method
- if not self._pmap:
- logger.warning("No pmap in Subpart!")
- return {}
- headers = dict(self._pmap.get("headers", []))
-
- names = map(lambda s: s.upper(), names)
- if negate:
- cond = lambda key: key.upper() not in names
- else:
- cond = lambda key: key.upper() in names
-
- # default to most likely standard
- charset = find_charset(headers, "utf-8")
- headers2 = dict()
- for key, value in headers.items():
- # twisted imap server expects *some* headers to be lowercase
- # We could use a CaseInsensitiveDict here...
- if key.lower() == "content-type":
- key = key.lower()
-
- if not isinstance(key, str):
- key = key.encode(charset, 'replace')
- if not isinstance(value, str):
- value = value.encode(charset, 'replace')
-
- # filter original dict by negate-condition
- if cond(key):
- headers2[key] = value
- return headers2
-
- def isMultipart(self):
- """
- Return True if this message is multipart.
- """
- if empty(self._pmap):
- logger.warning("Could not get part map!")
- return False
- multi = self._pmap.get("multi", False)
- return multi
-
- def getSubPart(self, part):
- """
- Retrieve a MIME submessage
-
- :type part: C{int}
- :param part: The number of the part to retrieve, indexed from 0.
- :raise IndexError: Raised if the specified part does not exist.
- :raise TypeError: Raised if this message is not multipart.
- :rtype: Any object implementing C{IMessagePart}.
- :return: The specified sub-part.
- """
- if not self.isMultipart():
- raise TypeError
-
- sub_pmap = self._pmap.get("part_map", {})
- try:
- part_map = sub_pmap[str(part + 1)]
- except KeyError:
- logger.debug("getSubpart for %s: KeyError" % (part,))
- raise IndexError
-
- # XXX check for validity
- return MessagePart(self._soledad, part_map)
diff --git a/src/leap/mail/imap/messages.py b/src/leap/mail/imap/messages.py
index d47c8eb..7e0f973 100644
--- a/src/leap/mail/imap/messages.py
+++ b/src/leap/mail/imap/messages.py
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
-# messages.py
-# Copyright (C) 2013, 2014 LEAP
+# imap/messages.py
+# Copyright (C) 2013-2015 LEAP
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -15,85 +15,41 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
-LeapMessage and MessageCollection.
+IMAPMessage and IMAPMessageCollection.
"""
-import copy
import logging
-import threading
-import StringIO
-
-from collections import defaultdict
-from functools import partial
-
+# import StringIO
from twisted.mail import imap4
-from twisted.internet import reactor
from zope.interface import implements
-from zope.proxy import sameProxiedObjects
from leap.common.check import leap_assert, leap_assert_type
from leap.common.decorators import memoized_method
from leap.common.mail import get_email_charset
-from leap.mail.adaptors import soledad_indexes as indexes
-from leap.mail.constants import INBOX_NAME
-from leap.mail.utils import find_charset, empty
-from leap.mail.imap.index import IndexedDB
-from leap.mail.imap.fields import fields, WithMsgFields
-from leap.mail.imap.messageparts import MessagePart, MessagePartDoc
-from leap.mail.imap.parser import MBoxParser
-
-logger = logging.getLogger(__name__)
-
-# TODO ------------------------------------------------------------
-# [ ] Add ref to incoming message during add_msg
-# [ ] Add linked-from info.
-# * Need a new type of documents: linkage info.
-# * HDOCS are linked from FDOCs (ref to chash)
-# * CDOCS are linked from HDOCS (ref to chash)
+from leap.mail.utils import find_charset
-# [ ] Delete incoming mail only after successful write!
-# [ ] Remove UID from syncable db. Store only those indexes locally.
+from leap.mail.imap.messageparts import MessagePart
+# from leap.mail.imap.messagepargs import MessagePartDoc
+logger = logging.getLogger(__name__)
-def try_unique_query(curried):
- """
- Try to execute a query that is expected to have a
- single outcome, and log a warning if more than one document found.
-
- :param curried: a curried function
- :type curried: callable
- """
- # XXX FIXME ---------- convert to deferreds
- leap_assert(callable(curried), "A callable is expected")
- try:
- query = curried()
- if query:
- if len(query) > 1:
- # TODO we could take action, like trigger a background
- # process to kill dupes.
- name = getattr(curried, 'expected', 'doc')
- logger.warning(
- "More than one %s found for this mbox, "
- "we got a duplicate!!" % (name,))
- return query.pop()
- else:
- return None
- except Exception as exc:
- logger.exception("Unhandled error %r" % exc)
-
+# TODO ------------------------------------------------------------
-# FIXME remove-me
-#fdoc_locks = defaultdict(lambda: defaultdict(lambda: threading.Lock()))
+# [ ] Add ref to incoming message during add_msg.
+# [ ] Delete incoming mail only after successful write.
-class IMAPMessage(fields, MBoxParser):
+class IMAPMessage(object):
"""
The main representation of a message.
"""
implements(imap4.IMessage)
- def __init__(self, soledad, uid, mbox):
+ # TODO ---- see what should we pass here instead
+ # where's UID added to the message?
+ # def __init__(self, soledad, uid, mbox):
+ def __init__(self, message, collection):
"""
Initializes a LeapMessage.
@@ -103,81 +59,14 @@ class IMAPMessage(fields, MBoxParser):
:type uid: int or basestring
:param mbox: the mbox this message belongs to
:type mbox: str or unicode
- :param collection: a reference to the parent collection object
- :type collection: MessageCollection
- :param container: a IMessageContainer implementor instance
- :type container: IMessageContainer
"""
- self._soledad = soledad
- self._uid = int(uid) if uid is not None else None
- self._mbox = self._parse_mailbox_name(mbox)
-
- self.__chash = None
- self.__bdoc = None
-
- # TODO collection and container are deprecated.
-
- # TODO move to adaptor
-
- #@property
- #def fdoc(self):
- #"""
- #An accessor to the flags document.
- #"""
- #if all(map(bool, (self._uid, self._mbox))):
- #fdoc = None
- #if self._container is not None:
- #fdoc = self._container.fdoc
- #if not fdoc:
- #fdoc = self._get_flags_doc()
- #if fdoc:
- #fdoc_content = fdoc.content
- #self.__chash = fdoc_content.get(
- #fields.CONTENT_HASH_KEY, None)
- #return fdoc
-#
- #@property
- #def hdoc(self):
- #"""
- #An accessor to the headers document.
- #"""
- #container = self._container
- #if container is not None:
- #hdoc = self._container.hdoc
- #if hdoc and not empty(hdoc.content):
- #return hdoc
- #hdoc = self._get_headers_doc()
-#
- #if container and not empty(hdoc.content):
- # mem-cache it
- #hdoc_content = hdoc.content
- #chash = hdoc_content.get(fields.CONTENT_HASH_KEY)
- #hdocs = {chash: hdoc_content}
- #container.memstore.load_header_docs(hdocs)
- #return hdoc
-#
- #@property
- #def chash(self):
- #"""
- #An accessor to the content hash for this message.
- #"""
- #if not self.fdoc:
- #return None
- #if not self.__chash and self.fdoc:
- #self.__chash = self.fdoc.content.get(
- #fields.CONTENT_HASH_KEY, None)
- #return self.__chash
-
- #@property
- #def bdoc(self):
- #"""
- #An accessor to the body document.
- #"""
- #if not self.hdoc:
- #return None
- #if not self.__bdoc:
- #self.__bdoc = self._get_body_doc()
- #return self.__bdoc
+ #self._uid = int(uid) if uid is not None else None
+ #self._mbox = normalize_mailbox(mbox)
+
+ self.message = message
+
+ # TODO maybe not needed, see setFlags below
+ self.collection = collection
# IMessage implementation
@@ -188,12 +77,7 @@ class IMAPMessage(fields, MBoxParser):
:return: uid for this message
:rtype: int
"""
- # TODO ----> return lookup in local sqlcipher table.
- return self._uid
-
- # --------------------------------------------------------------
- # TODO -- from here on, all the methods should be proxied to the
- # instance of leap.mail.mail.Message
+ return self.message.get_uid()
def getFlags(self):
"""
@@ -202,24 +86,14 @@ class IMAPMessage(fields, MBoxParser):
:return: The flags, represented as strings
:rtype: tuple
"""
- uid = self._uid
-
- flags = set([])
- fdoc = self.fdoc
- if fdoc:
- flags = set(fdoc.content.get(self.FLAGS_KEY, None))
+ return self.message.get_flags()
- msgcol = self._collection
+ # setFlags not in the interface spec but we use it with store command.
- # We treat the recent flag specially: gotten from
- # a mailbox-level document.
- if msgcol and uid in msgcol.recent_flags:
- flags.add(fields.RECENT_FLAG)
- if flags:
- flags = map(str, flags)
- return tuple(flags)
+ # XXX if we can move it to a collection method, we don't need to pass
+ # collection to the IMAPMessage
- # setFlags not in the interface spec but we use it with store command.
+ # lookup method? IMAPMailbox?
def setFlags(self, flags, mode):
"""
@@ -231,32 +105,11 @@ class IMAPMessage(fields, MBoxParser):
:type mode: int
"""
leap_assert(isinstance(flags, tuple), "flags need to be a tuple")
- mbox, uid = self._mbox, self._uid
-
- APPEND = 1
- REMOVE = -1
- SET = 0
-
- doc = self.fdoc
- if not doc:
- logger.warning(
- "Could not find FDOC for %r:%s while setting flags!" %
- (mbox, uid))
- return
- current = doc.content[self.FLAGS_KEY]
- if mode == APPEND:
- newflags = tuple(set(tuple(current) + flags))
- elif mode == REMOVE:
- newflags = tuple(set(current).difference(set(flags)))
- elif mode == SET:
- newflags = flags
- new_fdoc = {
- self.FLAGS_KEY: newflags,
- self.SEEN_KEY: self.SEEN_FLAG in newflags,
- self.DEL_KEY: self.DELETED_FLAG in newflags}
- self._collection.memstore.update_flags(mbox, uid, new_fdoc)
-
- return map(str, newflags)
+ # XXX
+ # return new flags
+ # map to str
+ #self.message.set_flags(flags, mode)
+ self.collection.update_flags(self.message, flags, mode)
def getInternalDate(self):
"""
@@ -273,8 +126,7 @@ class IMAPMessage(fields, MBoxParser):
:return: An RFC822-formatted date string.
:rtype: str
"""
- date = self.hdoc.content.get(fields.DATE_KEY, '')
- return date
+ return self.message.get_internal_date()
#
# IMessagePart
@@ -290,42 +142,40 @@ class IMAPMessage(fields, MBoxParser):
:return: file-like object opened for reading
:rtype: StringIO
"""
- def write_fd(body):
- fd.write(body)
- fd.seek(0)
- return fd
-
+ #def write_fd(body):
+ #fd.write(body)
+ #fd.seek(0)
+ #return fd
+#
# TODO refactor with getBodyFile in MessagePart
-
- fd = StringIO.StringIO()
-
- if self.bdoc is not None:
- bdoc_content = self.bdoc.content
- if empty(bdoc_content):
- logger.warning("No BDOC content found for message!!!")
- return write_fd("")
-
- body = bdoc_content.get(self.RAW_KEY, "")
- content_type = bdoc_content.get('content-type', "")
- charset = find_charset(content_type)
- if charset is None:
- charset = self._get_charset(body)
- try:
- if isinstance(body, unicode):
- body = body.encode(charset)
- except UnicodeError as exc:
- logger.error(
- "Unicode error, using 'replace'. {0!r}".format(exc))
- logger.debug("Attempted to encode with: %s" % charset)
- body = body.encode(charset, 'replace')
- finally:
- return write_fd(body)
-
- # We are still returning funky characters from here.
- else:
- logger.warning("No BDOC found for message.")
- return write_fd("")
-
+#
+ #fd = StringIO.StringIO()
+#
+ #if self.bdoc is not None:
+ #bdoc_content = self.bdoc.content
+ #if empty(bdoc_content):
+ #logger.warning("No BDOC content found for message!!!")
+ #return write_fd("")
+#
+ #body = bdoc_content.get(self.RAW_KEY, "")
+ #content_type = bdoc_content.get('content-type', "")
+ #charset = find_charset(content_type)
+ #if charset is None:
+ #charset = self._get_charset(body)
+ #try:
+ #if isinstance(body, unicode):
+ #body = body.encode(charset)
+ #except UnicodeError as exc:
+ #logger.error(
+ #"Unicode error, using 'replace'. {0!r}".format(exc))
+ #logger.debug("Attempted to encode with: %s" % charset)
+ #body = body.encode(charset, 'replace')
+ #finally:
+ #return write_fd(body)
+
+ return self.message.get_body_file()
+
+ # TODO move to mail.mail
@memoized_method
def _get_charset(self, stuff):
"""
@@ -337,7 +187,7 @@ class IMAPMessage(fields, MBoxParser):
"""
# XXX shouldn't we make the scope
# of the decorator somewhat more persistent?
- # ah! yes! and put memory bounds.
+ # and put memory bounds.
return get_email_charset(stuff)
def getSize(self):
@@ -347,17 +197,11 @@ class IMAPMessage(fields, MBoxParser):
:return: size of the message, in octets
:rtype: int
"""
- size = None
- if self.fdoc is not None:
- fdoc_content = self.fdoc.content
- size = fdoc_content.get(self.SIZE_KEY, False)
- else:
- logger.warning("No FLAGS doc for %s:%s" % (self._mbox,
- self._uid))
- #if not size:
- # XXX fallback, should remove when all migrated.
- #size = self.getBodyFile().len
- return size
+ #size = None
+ #fdoc_content = self.fdoc.content
+ #size = fdoc_content.get(self.SIZE_KEY, False)
+ #return size
+ return self.message.get_size()
def getHeaders(self, negate, *names):
"""
@@ -374,10 +218,10 @@ class IMAPMessage(fields, MBoxParser):
:return: A mapping of header field names to header field values
:rtype: dict
"""
- # TODO split in smaller methods
+ # TODO split in smaller methods -- format_headers()?
# XXX refactor together with MessagePart method
- headers = self._get_headers()
+ headers = self.message.get_headers()
# XXX keep this in the imap imessage implementation,
# because the server impl. expects content-type to be present.
@@ -417,34 +261,15 @@ class IMAPMessage(fields, MBoxParser):
headers2[key] = value
return headers2
- def _get_headers(self):
- """
- Return the headers dict for this message.
- """
- if self.hdoc is not None:
- hdoc_content = self.hdoc.content
- headers = hdoc_content.get(self.HEADERS_KEY, {})
- return headers
-
- else:
- logger.warning(
- "No HEADERS doc for msg %s:%s" % (
- self._mbox,
- self._uid))
-
def isMultipart(self):
"""
Return True if this message is multipart.
"""
- if self.fdoc:
- fdoc_content = self.fdoc.content
- is_multipart = fdoc_content.get(self.MULTIPART_KEY, False)
- return is_multipart
- else:
- logger.warning(
- "No FLAGS doc for msg %s:%s" % (
- self._mbox,
- self._uid))
+ #fdoc_content = self.fdoc.content
+ #is_multipart = fdoc_content.get(self.MULTIPART_KEY, False)
+ #return is_multipart
+
+ return self.message.fdoc.is_multi
def getSubPart(self, part):
"""
@@ -463,12 +288,16 @@ class IMAPMessage(fields, MBoxParser):
pmap_dict = self._get_part_from_parts_map(part + 1)
except KeyError:
raise IndexError
+
+ # TODO move access to adaptor ----
return MessagePart(self._soledad, pmap_dict)
#
# accessors
#
+ # FIXME
+ # -- move to wrapper/adaptor
def _get_part_from_parts_map(self, part):
"""
Get a part map from the headers doc
@@ -476,100 +305,44 @@ class IMAPMessage(fields, MBoxParser):
:raises: KeyError if key does not exist
:rtype: dict
"""
- if not self.hdoc:
- logger.warning("Tried to get part but no HDOC found!")
- return None
-
- hdoc_content = self.hdoc.content
- pmap = hdoc_content.get(fields.PARTS_MAP_KEY, {})
+ raise NotImplementedError()
+ #hdoc_content = self.hdoc.content
+ #pmap = hdoc_content.get(fields.PARTS_MAP_KEY, {})
+#
# remember, lads, soledad is using strings in its keys,
# not integers!
- return pmap[str(part)]
+ #return pmap[str(part)]
- # XXX moved to memory store
- # move the rest too. ------------------------------------------
- def _get_flags_doc(self):
- """
- Return the document that keeps the flags for this
- message.
- """
- def get_first_if_any(docs):
- result = first(docs)
- return result if result else {}
-
- d = self._soledad.get_from_index(
- fields.TYPE_MBOX_UID_IDX,
- fields.TYPE_FLAGS_VAL, self._mbox, str(self._uid))
- d.addCallback(get_first_if_any)
- return d
-
- # TODO move to soledadstore instead of accessing soledad directly
- def _get_headers_doc(self):
- """
- Return the document that keeps the headers for this
- message.
- """
- d = self._soledad.get_from_index(
- fields.TYPE_C_HASH_IDX,
- fields.TYPE_HEADERS_VAL, str(self.chash))
- d.addCallback(lambda docs: first(docs))
- return d
-
- # TODO move to soledadstore instead of accessing soledad directly
+ # TODO move to wrapper/adaptor
def _get_body_doc(self):
"""
Return the document that keeps the body for this
message.
"""
- # XXX FIXME --- this might need a maybedeferred
- # on the receiving side...
- hdoc_content = self.hdoc.content
- body_phash = hdoc_content.get(
- fields.BODY_KEY, None)
- if not body_phash:
- logger.warning("No body phash for this document!")
- return None
-
- # XXX get from memstore too...
- # if memstore: memstore.get_phrash
- # memstore should keep a dict with weakrefs to the
- # phash doc...
-
- if self._container is not None:
- bdoc = self._container.memstore.get_cdoc_from_phash(body_phash)
- if not empty(bdoc) and not empty(bdoc.content):
- return bdoc
-
+ # FIXME
+ # -- just get the body and retrieve the cdoc P-<phash>
+ #hdoc_content = self.hdoc.content
+ #body_phash = hdoc_content.get(
+ #fields.BODY_KEY, None)
+ #if not body_phash:
+ #logger.warning("No body phash for this document!")
+ #return None
+#
+ #if self._container is not None:
+ #bdoc = self._container.memstore.get_cdoc_from_phash(body_phash)
+ #if not empty(bdoc) and not empty(bdoc.content):
+ #return bdoc
+#
# no memstore, or no body doc found there
- d = self._soledad.get_from_index(
- fields.TYPE_P_HASH_IDX,
- fields.TYPE_CONTENT_VAL, str(body_phash))
- d.addCallback(lambda docs: first(docs))
- return d
-
- def __getitem__(self, key):
- """
- Return an item from the content of the flags document,
- for convenience.
-
- :param key: The key
- :type key: str
-
- :return: The content value indexed by C{key} or None
- :rtype: str
- """
- return self.fdoc.content.get(key, None)
-
- def does_exist(self):
- """
- Return True if there is actually a flags document for this
- UID and mbox.
- """
- return not empty(self.fdoc)
+ #d = self._soledad.get_from_index(
+ #fields.TYPE_P_HASH_IDX,
+ #fields.TYPE_CONTENT_VAL, str(body_phash))
+ #d.addCallback(lambda docs: first(docs))
+ #return d
-class MessageCollection(WithMsgFields, IndexedDB, MBoxParser):
+class IMAPMessageCollection(object):
"""
A collection of messages, surprisingly.
@@ -578,9 +351,15 @@ class MessageCollection(WithMsgFields, IndexedDB, MBoxParser):
database.
"""
- # XXX this should be able to produce a MessageSet methinks
- # could validate these kinds of objects turning them
- # into a template for the class.
+ messageklass = IMAPMessage
+
+ # TODO
+ # [ ] Add RECENT flags docs to mailbox-doc attributes (list-of-uids)
+ # [ ] move Query for all the headers documents to Collection
+
+ # TODO this should be able to produce a MessageSet methinks
+ # TODO --- reimplement, review and prune documentation below.
+
FLAGS_DOC = "FLAGS"
HEADERS_DOC = "HEADERS"
CONTENT_DOC = "CONTENT"
@@ -604,145 +383,40 @@ class MessageCollection(WithMsgFields, IndexedDB, MBoxParser):
"""
HDOCS_SET_DOC = "HDOCS_SET"
- templates = {
-
- # Mailbox Level
-
- RECENT_DOC: {
- "type": indexes.RECENT,
- "mbox": INBOX_NAME,
- fields.RECENTFLAGS_KEY: [],
- },
-
- HDOCS_SET_DOC: {
- "type": indexes.HDOCS_SET,
- "mbox": INBOX_NAME,
- fields.HDOCS_SET_KEY: [],
- }
-
-
- }
-
- # Different locks for wrapping both the u1db document getting/setting
- # and the property getting/settting in an atomic operation.
-
- # TODO --- deprecate ! --- use SoledadDocumentWrapper + locks
- _rdoc_lock = defaultdict(lambda: threading.Lock())
- _rdoc_write_lock = defaultdict(lambda: threading.Lock())
- _rdoc_read_lock = defaultdict(lambda: threading.Lock())
- _rdoc_property_lock = defaultdict(lambda: threading.Lock())
-
- _initialized = {}
-
- def __init__(self, mbox=None, soledad=None, memstore=None):
- """
- Constructor for MessageCollection.
-
- On initialization, we ensure that we have a document for
- storing the recent flags. The nature of this flag make us wanting
- to store the set of the UIDs with this flag at the level of the
- MessageCollection for each mailbox, instead of treating them
- as a property of each message.
-
- We are passed an instance of MemoryStore, the same for the
- SoledadBackedAccount, that we use as a read cache and a buffer
- for writes.
-
- :param mbox: the name of the mailbox. It is the name
- with which we filter the query over the
- messages database.
- :type mbox: str
- :param soledad: Soledad database
- :type soledad: Soledad instance
- :param memstore: a MemoryStore instance
- :type memstore: MemoryStore
+ def __init__(self, collection):
"""
- leap_assert(mbox, "Need a mailbox name to initialize")
- leap_assert(mbox.strip() != "", "mbox cannot be blank space")
- leap_assert(isinstance(mbox, (str, unicode)),
- "mbox needs to be a string")
- leap_assert(soledad, "Need a soledad instance to initialize")
+ Constructor for IMAPMessageCollection.
- # okay, all in order, keep going...
-
- self.mbox = self._parse_mailbox_name(mbox)
-
- # XXX get a SoledadStore passed instead
- self._soledad = soledad
- self.memstore = memstore
-
- self.__rflags = None
-
- if not self._initialized.get(mbox, False):
- try:
- self.initialize_db()
- # ensure that we have a recent-flags doc
- self._get_or_create_rdoc()
- except Exception:
- logger.debug("Error initializing %r" % (mbox,))
- else:
- self._initialized[mbox] = True
-
- def _get_empty_doc(self, _type=FLAGS_DOC):
- """
- Returns an empty doc for storing different message parts.
- Defaults to returning a template for a flags document.
- :return: a dict with the template
- :rtype: dict
- """
- if _type not in self.templates.keys():
- raise TypeError("Improper type passed to _get_empty_doc")
- return copy.deepcopy(self.templates[_type])
-
- def _get_or_create_rdoc(self):
- """
- Try to retrieve the recent-flags doc for this MessageCollection,
- and create one if not found.
+ :param collection: an instance of a MessageCollection
+ :type collection: MessageCollection
"""
- # XXX should move this to memstore too
- with self._rdoc_write_lock[self.mbox]:
- rdoc = self._get_recent_doc_from_soledad()
- if rdoc is None:
- rdoc = self._get_empty_doc(self.RECENT_DOC)
- if self.mbox != fields.INBOX_VAL:
- rdoc[fields.MBOX_KEY] = self.mbox
- self._soledad.create_doc(rdoc)
-
- # --------------------------------------------------------------------
+ leap_assert(
+ collection.is_mailbox_collection(),
+ "Need a mailbox name to initialize")
+ mbox_name = collection.mbox_name
+ leap_assert(mbox_name.strip() != "", "mbox cannot be blank space")
+ leap_assert(isinstance(mbox_name, (str, unicode)),
+ "mbox needs to be a string")
+ self.collection = collection
- # -----------------------------------------------------------------------
+ # XXX this has to be done in IMAPAccount
+ # (Where the collection must be instantiated and passed to us)
+ # self.mbox = normalize_mailbox(mbox)
- def _fdoc_already_exists(self, chash):
+ @property
+ def mbox_name(self):
"""
- Check whether we can find a flags doc for this mailbox with the
- given content-hash. It enforces that we can only have the same maessage
- listed once for a a given mailbox.
-
- :param chash: the content-hash to check about.
- :type chash: basestring
- :return: False, if it does not exist, or UID.
+ Return the string that identifies this mailbox.
"""
- exist = False
- exist = self.memstore.get_fdoc_from_chash(chash, self.mbox)
+ return self.collection.mbox_name
- if not exist:
- exist = self._get_fdoc_from_chash(chash)
- if exist and exist.content is not None:
- return exist.content.get(fields.UID_KEY, "unknown-uid")
- else:
- return False
-
- def add_msg(self, raw, subject=None, flags=None, date=None,
- notify_on_disk=False):
+ def add_msg(self, raw, flags=None, date=None):
"""
Creates a new message document.
:param raw: the raw message
:type raw: str
- :param subject: subject of the message.
- :type subject: str
-
:param flags: flags
:type flags: list
@@ -756,212 +430,30 @@ class MessageCollection(WithMsgFields, IndexedDB, MBoxParser):
if flags is None:
flags = tuple()
leap_assert_type(flags, tuple)
+ return self.collection.add_msg(raw, flags, date)
- # TODO ---- proxy to MessageCollection addMessage
-
- #observer = defer.Deferred()
- #d = self._do_parse(raw)
- #d.addCallback(lambda result: reactor.callInThread(
- #self._do_add_msg, result, flags, subject, date,
- #notify_on_disk, observer))
- #return observer
-
- # TODO ---------------------------------------------------
- # move this to leap.mail.adaptors.soledad
-
- def _do_add_msg(self, parse_result, flags, subject,
- date, notify_on_disk, observer):
- """
- """
- msg, parts, chash, size, multi = parse_result
-
- # XXX move to SoledadAdaptor write operation ... ???
- # check for uniqueness --------------------------------
- # Watch out! We're reserving a UID right after this!
- existing_uid = self._fdoc_already_exists(chash)
- if existing_uid:
- msg = self.get_msg_by_uid(existing_uid)
- reactor.callFromThread(observer.callback, existing_uid)
- msg.setFlags((fields.DELETED_FLAG,), -1)
- return
-
- # TODO move UID autoincrement to MessageCollection.addMessage(mailbox)
- # TODO S2 -- get FUCKING UID from autoincremental table
- #uid = self.memstore.increment_last_soledad_uid(self.mbox)
- #self.set_recent_flag(uid)
-
-
- # ------------------------------------------------------------
-
- #
- # getters: specific queries
- #
-
- # recent flags
-
- def _get_recent_flags(self):
- """
- An accessor for the recent-flags set for this mailbox.
+ def get_msg_by_uid(self, uid, absolute=True):
"""
- # XXX check if we should remove this
- if self.__rflags is not None:
- return self.__rflags
-
- if self.memstore is not None:
- with self._rdoc_lock[self.mbox]:
- rflags = self.memstore.get_recent_flags(self.mbox)
- if not rflags:
- # not loaded in the memory store yet.
- # let's fetch them from soledad...
- rdoc = self._get_recent_doc_from_soledad()
- if rdoc is None:
- return set([])
- rflags = set(rdoc.content.get(
- fields.RECENTFLAGS_KEY, []))
- # ...and cache them now.
- self.memstore.load_recent_flags(
- self.mbox,
- {'doc_id': rdoc.doc_id, 'set': rflags})
- return rflags
-
- def _set_recent_flags(self, value):
- """
- Setter for the recent-flags set for this mailbox.
- """
- if self.memstore is not None:
- self.memstore.set_recent_flags(self.mbox, value)
-
- recent_flags = property(
- _get_recent_flags, _set_recent_flags,
- doc="Set of UIDs with the recent flag for this mailbox.")
-
- def _get_recent_doc_from_soledad(self):
- """
- Get recent-flags document from Soledad for this mailbox.
- :rtype: SoledadDocument or None
- """
- # FIXME ----- use deferreds.
- curried = partial(
- self._soledad.get_from_index,
- fields.TYPE_MBOX_IDX,
- fields.TYPE_RECENT_VAL, self.mbox)
- curried.expected = "rdoc"
- with self._rdoc_read_lock[self.mbox]:
- return try_unique_query(curried)
-
- # Property-set modification (protected by a different
- # lock to give atomicity to the read/write operation)
-
- def unset_recent_flags(self, uids):
- """
- Unset Recent flag for a sequence of uids.
-
- :param uids: the uids to unset
- :type uid: sequence
- """
- # FIXME ----- use deferreds.
- with self._rdoc_property_lock[self.mbox]:
- self.recent_flags.difference_update(
- set(uids))
-
- # Individual flags operations
-
- def unset_recent_flag(self, uid):
- """
- Unset Recent flag for a given uid.
-
- :param uid: the uid to unset
- :type uid: int
- """
- # FIXME ----- use deferreds.
- with self._rdoc_property_lock[self.mbox]:
- self.recent_flags.difference_update(
- set([uid]))
-
- def set_recent_flag(self, uid):
- """
- Set Recent flag for a given uid.
+ Retrieves a IMAPMessage by UID.
+ This is used primarity in the Mailbox fetch and store methods.
- :param uid: the uid to set
+ :param uid: the message uid to query by
:type uid: int
- """
- # FIXME ----- use deferreds.
- with self._rdoc_property_lock[self.mbox]:
- self.recent_flags = self.recent_flags.union(
- set([uid]))
-
- # individual doc getters, message layer.
- def _get_fdoc_from_chash(self, chash):
+ :rtype: IMAPMessage
"""
- Return a flags document for this mailbox with a given chash.
+ def make_imap_msg(msg):
+ kls = self.messageklass
+ # TODO --- remove ref to collection
+ return kls(msg, self.collection)
- :return: A SoledadDocument containing the Flags Document, or None if
- the query failed.
- :rtype: SoledadDocument or None.
- """
- # USED from:
- # [ ] duplicated fdoc detection
- # [ ] _get_uid_from_msgidCb
-
- # FIXME ----- use deferreds.
- curried = partial(
- self._soledad.get_from_index,
- fields.TYPE_MBOX_C_HASH_IDX,
- fields.TYPE_FLAGS_VAL, self.mbox, chash)
- curried.expected = "fdoc"
- fdoc = try_unique_query(curried)
- if fdoc is not None:
- return fdoc
- else:
- # probably this should be the other way round,
- # ie, try fist on memstore...
- cf = self.memstore._chash_fdoc_store
- fdoc = cf[chash][self.mbox]
- # hey, I just needed to wrap fdoc thing into
- # a "content" attribute, look a better way...
- if not empty(fdoc):
- return MessagePartDoc(
- new=None, dirty=None, part=None,
- store=None, doc_id=None,
- content=fdoc)
-
- def _get_uid_from_msgidCb(self, msgid):
- hdoc = None
- curried = partial(
- self._soledad.get_from_index,
- fields.TYPE_MSGID_IDX,
- fields.TYPE_HEADERS_VAL, msgid)
- curried.expected = "hdoc"
- hdoc = try_unique_query(curried)
-
- # XXX this is only a quick hack to avoid regression
- # on the "multiple copies of the draft" issue, but
- # this is currently broken since it's not efficient to
- # look for this. Should lookup better.
- # FIXME!
-
- if hdoc is not None:
- hdoc_dict = hdoc.content
+ d = self.collection.get_msg_by_uid(uid, absolute=absolute)
+ d.addCalback(make_imap_msg)
+ return d
- else:
- hdocstore = self.memstore._hdoc_store
- match = [x for _, x in hdocstore.items() if x['msgid'] == msgid]
- hdoc_dict = first(match)
-
- if hdoc_dict is None:
- logger.warning("Could not find hdoc for msgid %s"
- % (msgid,))
- return None
- msg_chash = hdoc_dict.get(fields.CONTENT_HASH_KEY)
-
- fdoc = self._get_fdoc_from_chash(msg_chash)
- if not fdoc:
- logger.warning("Could not find fdoc for msgid %s"
- % (msgid,))
- return None
- return fdoc.content.get(fields.UID_KEY, None)
+ # TODO -- move this to collection too
+ # Used for the Search (Drafts) queries?
def _get_uid_from_msgid(self, msgid):
"""
Return a UID for a given message-id.
@@ -972,15 +464,10 @@ class MessageCollection(WithMsgFields, IndexedDB, MBoxParser):
:return: A UID, or None
"""
- # We need to wait a little bit, cause in some of the cases
- # the query is received right after we've saved the document,
- # and we cannot find it otherwise. This seems to be enough.
-
- # XXX do a deferLater instead ??
- # XXX is this working?
return self._get_uid_from_msgidCb(msgid)
- def set_flags(self, mbox, messages, flags, mode, observer):
+ # TODO handle deferreds
+ def set_flags(self, messages, flags, mode):
"""
Set flags for a sequence of messages.
@@ -1000,142 +487,27 @@ class MessageCollection(WithMsgFields, IndexedDB, MBoxParser):
getmsg = self.get_msg_by_uid
def set_flags(uid, flags, mode):
- msg = getmsg(uid, mem_only=True, flags_only=True)
+ msg = getmsg(uid)
if msg is not None:
+ # XXX IMAPMessage needs access to the collection
+ # to be able to set flags. Better if we make use
+ # of collection... here.
return uid, msg.setFlags(flags, mode)
setted_flags = [set_flags(uid, flags, mode) for uid in messages]
result = dict(filter(None, setted_flags))
+ # XXX return gatherResults or something
+ return result
- # TODO -- remove
- reactor.callFromThread(observer.callback, result)
-
- # getters: generic for a mailbox
-
- def get_msg_by_uid(self, uid, mem_only=False, flags_only=False):
- """
- Retrieves a LeapMessage by UID.
- This is used primarity in the Mailbox fetch and store methods.
-
- :param uid: the message uid to query by
- :type uid: int
- :param mem_only: a flag that indicates whether this Message should
- pass a reference to soledad to retrieve missing pieces
- or not.
- :type mem_only: bool
- :param flags_only: whether the message should carry only a reference
- to the flags document.
- :type flags_only: bool
-
- :return: A LeapMessage instance matching the query,
- or None if not found.
- :rtype: LeapMessage
- """
- msg_container = self.memstore.get_message(
- self.mbox, uid, flags_only=flags_only)
-
- if msg_container is not None:
- if mem_only:
- msg = IMAPMessage(None, uid, self.mbox, collection=self,
- container=msg_container)
- else:
- # We pass a reference to soledad just to be able to retrieve
- # missing parts that cannot be found in the container, like
- # the content docs after a copy.
- msg = IMAPMessage(self._soledad, uid, self.mbox,
- collection=self, container=msg_container)
- else:
- msg = IMAPMessage(self._soledad, uid, self.mbox, collection=self)
-
- if not msg.does_exist():
- return None
- return msg
-
- # FIXME --- used where ? ---------------------------------------------
- #def get_all_docs(self, _type=fields.TYPE_FLAGS_VAL):
- #"""
- #Get all documents for the selected mailbox of the
- #passed type. By default, it returns the flag docs.
-#
- #If you want acess to the content, use __iter__ instead
-#
- #:return: a Deferred, that will fire with a list of u1db documents
- #:rtype: Deferred (promise of list of SoledadDocument)
- #"""
- #if _type not in fields.__dict__.values():
- #raise TypeError("Wrong type passed to get_all_docs")
-#
- # FIXME ----- either raise or return a deferred wrapper.
- #if sameProxiedObjects(self._soledad, None):
- #logger.warning('Tried to get messages but soledad is None!')
- #return []
-#
- #def get_sorted_docs(docs):
- #all_docs = [doc for doc in docs]
- # inneficient, but first let's grok it and then
- # let's worry about efficiency.
- # XXX FIXINDEX -- should implement order by in soledad
- # FIXME ----------------------------------------------
- #return sorted(all_docs, key=lambda item: item.content['uid'])
-#
- #d = self._soledad.get_from_index(
- #fields.TYPE_MBOX_IDX, _type, self.mbox)
- #d.addCallback(get_sorted_docs)
- #return d
-
- def all_soledad_uid_iter(self):
- """
- Return an iterator through the UIDs of all messages, sorted in
- ascending order.
- """
- # XXX FIXME ------ sorted???
-
- def get_uids(docs):
- return set([
- doc.content[self.UID_KEY] for doc in docs if not empty(doc)])
-
- d = self._soledad.get_from_index(
- fields.TYPE_MBOX_IDX, fields.TYPE_FLAGS_VAL, self.mbox)
- d.addCallback(get_uids)
- return d
-
- def all_uid_iter(self):
- """
- Return an iterator through the UIDs of all messages, from memory.
+ def count(self):
"""
- mem_uids = self.memstore.get_uids(self.mbox)
- soledad_known_uids = self.memstore.get_soledad_known_uids(
- self.mbox)
- combined = tuple(set(mem_uids).union(soledad_known_uids))
- return combined
+ Return the count of messages for this mailbox.
- def get_all_soledad_flag_docs(self):
+ :rtype: int
"""
- Return a dict with the content of all the flag documents
- in soledad store for the given mbox.
+ return self.collection.count()
- :param mbox: the mailbox
- :type mbox: str or unicode
- :rtype: dict
- """
- # XXX we really could return a reduced version with
- # just {'uid': (flags-tuple,) since the prefetch is
- # only oriented to get the flag tuples.
-
- def get_content(docs):
- all_docs = [(
- doc.content[self.UID_KEY],
- dict(doc.content))
- for doc in docs
- if not empty(doc.content)]
- all_flags = dict(all_docs)
- return all_flags
-
- d = self._soledad.get_from_index(
- fields.TYPE_MBOX_IDX,
- fields.TYPE_FLAGS_VAL, self.mbox)
- d.addCallback(get_content)
- return d
+ # headers query
def all_headers(self):
"""
@@ -1144,15 +516,9 @@ class MessageCollection(WithMsgFields, IndexedDB, MBoxParser):
:rtype: dict
"""
- return self.memstore.all_headers(self.mbox)
-
- def count(self):
- """
- Return the count of messages for this mailbox.
-
- :rtype: int
- """
- return self.memstore.count(self.mbox)
+ # Use self.collection.mbox_indexer
+ # and derive all the doc_ids for the hdocs
+ raise NotImplementedError()
# unseen messages
@@ -1164,7 +530,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MBoxParser):
:return: iterator through unseen message doc UIDs
:rtype: iterable
"""
- return self.memstore.unseen_iter(self.mbox)
+ raise NotImplementedError()
def count_unseen(self):
"""
@@ -1182,13 +548,12 @@ class MessageCollection(WithMsgFields, IndexedDB, MBoxParser):
:returns: a list of LeapMessages
:rtype: list
"""
- return [IMAPMessage(self._soledad, docid, self.mbox, collection=self)
- for docid in self.unseen_iter()]
+ raise NotImplementedError()
+ #return [self.messageklass(self._soledad, doc_id, self.mbox)
+ #for doc_id in self.unseen_iter()]
# recent messages
- # XXX take it from memstore
- # XXX Used somewhere?
def count_recent(self):
"""
Count all messages with the `Recent` flag.
@@ -1199,32 +564,22 @@ class MessageCollection(WithMsgFields, IndexedDB, MBoxParser):
:returns: count
:rtype: int
"""
- return len(self.recent_flags)
+ raise NotImplementedError()
+
+ # magic
def __len__(self):
"""
Returns the number of messages on this mailbox.
-
:rtype: int
"""
return self.count()
- def __iter__(self):
- """
- Returns an iterator over all messages.
-
- :returns: iterator of dicts with content for all messages.
- :rtype: iterable
- """
- return (IMAPMessage(self._soledad, docuid, self.mbox, collection=self)
- for docuid in self.all_uid_iter())
-
def __repr__(self):
"""
Representation string for this object.
"""
- return u"<MessageCollection: mbox '%s' (%s)>" % (
- self.mbox, self.count())
+ return u"<IMAPMessageCollection: mbox '%s' (%s)>" % (
+ self.mbox_name, self.count())
- # XXX should implement __eq__ also !!!
- # use chash...
+ # TODO implement __iter__ ?
diff --git a/src/leap/mail/imap/soledadstore.py b/src/leap/mail/imap/soledadstore.py
deleted file mode 100644
index fc8ea55..0000000
--- a/src/leap/mail/imap/soledadstore.py
+++ /dev/null
@@ -1,617 +0,0 @@
-# -*- coding: utf-8 -*-
-# soledadstore.py
-# Copyright (C) 2014 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-"""
-A MessageStore that writes to Soledad.
-"""
-import logging
-import threading
-
-from collections import defaultdict
-from itertools import chain
-
-from u1db import errors as u1db_errors
-from twisted.python import log
-from zope.interface import implements
-
-from leap.common.check import leap_assert_type, leap_assert
-from leap.mail.decorators import deferred_to_thread
-from leap.mail.imap.messageparts import MessagePartType
-from leap.mail.imap.messageparts import MessageWrapper
-from leap.mail.imap.messageparts import RecentFlagsDoc
-from leap.mail.imap.fields import fields
-from leap.mail.imap.interfaces import IMessageStore
-from leap.mail.messageflow import IMessageConsumer
-from leap.mail.utils import first, empty, accumulator_queue
-
-logger = logging.getLogger(__name__)
-
-
-class ContentDedup(object):
- """
- Message deduplication.
-
- We do a query for the content hashes before writing to our beloved
- sqlcipher backend of Soledad. This means, by now, that:
-
- 1. We will not store the same body/attachment twice, only the hash of it.
- 2. We will not store the same message header twice, only the hash of it.
-
- The first case is useful if you are always receiving the same old memes
- from unwary friends that still have not discovered that 4chan is the
- generator of the internet. The second will save your day if you have
- initiated session with the same account in two different machines. I also
- wonder why would you do that, but let's respect each other choices, like
- with the religious celebrations, and assume that one day we'll be able
- to run Bitmask in completely free phones. Yes, I mean that, the whole GSM
- Stack.
- """
- # TODO refactor using unique_query
-
- def _header_does_exist(self, doc):
- """
- Check whether we already have a header document for this
- content hash in our database.
-
- :param doc: tentative header for document
- :type doc: dict
- :returns: True if it exists, False otherwise.
- """
- if not doc:
- return False
- chash = doc[fields.CONTENT_HASH_KEY]
- header_docs = self._soledad.get_from_index(
- fields.TYPE_C_HASH_IDX,
- fields.TYPE_HEADERS_VAL, str(chash))
- if not header_docs:
- return False
-
- # FIXME enable only to debug this problem.
- #if len(header_docs) != 1:
- #logger.warning("Found more than one copy of chash %s!"
- #% (chash,))
-
- #logger.debug("Found header doc with that hash! Skipping save!")
- return True
-
- def _content_does_exist(self, doc):
- """
- Check whether we already have a content document for a payload
- with this hash in our database.
-
- :param doc: tentative content for document
- :type doc: dict
- :returns: True if it exists, False otherwise.
- """
- if not doc:
- return False
- phash = doc[fields.PAYLOAD_HASH_KEY]
- attach_docs = self._soledad.get_from_index(
- fields.TYPE_P_HASH_IDX,
- fields.TYPE_CONTENT_VAL, str(phash))
- if not attach_docs:
- return False
-
- # FIXME enable only to debug this problem
- #if len(attach_docs) != 1:
- #logger.warning("Found more than one copy of phash %s!"
- #% (phash,))
- #logger.debug("Found attachment doc with that hash! Skipping save!")
- return True
-
-
-class MsgWriteError(Exception):
- """
- Raised if any exception is found while saving message parts.
- """
- pass
-
-
-"""
-A lock per document.
-"""
-# TODO should bound the space of this!!!
-# http://stackoverflow.com/a/2437645/1157664
-# Setting this to twice the number of threads in the threadpool
-# should be safe.
-
-put_locks = defaultdict(lambda: threading.Lock())
-mbox_doc_locks = defaultdict(lambda: threading.Lock())
-
-
-class SoledadStore(ContentDedup):
- """
- This will create docs in the local Soledad database.
- """
- _remove_lock = threading.Lock()
-
- implements(IMessageConsumer, IMessageStore)
-
- def __init__(self, soledad):
- """
- Initialize the permanent store that writes to Soledad database.
-
- :param soledad: the soledad instance
- :type soledad: Soledad
- """
- from twisted.internet import reactor
- self.reactor = reactor
-
- self._soledad = soledad
-
- self._CREATE_DOC_FUN = self._soledad.create_doc
- self._PUT_DOC_FUN = self._soledad.put_doc
- self._GET_DOC_FUN = self._soledad.get_doc
-
- # we instantiate an accumulator to batch the notifications
- self.docs_notify_queue = accumulator_queue(
- lambda item: reactor.callFromThread(self._unset_new_dirty, item),
- 20)
-
- # IMessageStore
-
- # -------------------------------------------------------------------
- # We are not yet using this interface, but it would make sense
- # to implement it.
-
- def create_message(self, mbox, uid, message):
- """
- Create the passed message into this SoledadStore.
-
- :param mbox: the mbox this message belongs.
- :type mbox: str or unicode
- :param uid: the UID that identifies this message in this mailbox.
- :type uid: int
- :param message: a IMessageContainer implementor.
- """
- raise NotImplementedError()
-
- def put_message(self, mbox, uid, message):
- """
- Put the passed existing message into this SoledadStore.
-
- :param mbox: the mbox this message belongs.
- :type mbox: str or unicode
- :param uid: the UID that identifies this message in this mailbox.
- :type uid: int
- :param message: a IMessageContainer implementor.
- """
- raise NotImplementedError()
-
- def remove_message(self, mbox, uid):
- """
- Remove the given message from this SoledadStore.
-
- :param mbox: the mbox this message belongs.
- :type mbox: str or unicode
- :param uid: the UID that identifies this message in this mailbox.
- :type uid: int
- """
- raise NotImplementedError()
-
- def get_message(self, mbox, uid):
- """
- Get a IMessageContainer for the given mbox and uid combination.
-
- :param mbox: the mbox this message belongs.
- :type mbox: str or unicode
- :param uid: the UID that identifies this message in this mailbox.
- :type uid: int
- """
- raise NotImplementedError()
-
- # IMessageConsumer
-
- # TODO should handle the delete case
- # TODO should handle errors better
- # TODO could generalize this method into a generic consumer
- # and only implement `process` here
-
- def consume(self, queue):
- """
- Creates a new document in soledad db.
-
- :param queue: a tuple of queues to get item from, with content of the
- document to be inserted.
- :type queue: tuple of Queues
- """
- new, dirty = queue
- while not new.empty():
- doc_wrapper = new.get()
- self.reactor.callInThread(self._consume_doc, doc_wrapper,
- self.docs_notify_queue)
- while not dirty.empty():
- doc_wrapper = dirty.get()
- self.reactor.callInThread(self._consume_doc, doc_wrapper,
- self.docs_notify_queue)
-
- # Queue empty, flush the notifications queue.
- self.docs_notify_queue(None, flush=True)
-
- def _unset_new_dirty(self, doc_wrapper):
- """
- Unset the `new` and `dirty` flags for this document wrapper in the
- memory store.
-
- :param doc_wrapper: a MessageWrapper instance
- :type doc_wrapper: MessageWrapper
- """
- if isinstance(doc_wrapper, MessageWrapper):
- # XXX still needed for debug quite often
- #logger.info("unsetting new flag!")
- doc_wrapper.new = False
- doc_wrapper.dirty = False
-
- @deferred_to_thread
- def _consume_doc(self, doc_wrapper, notify_queue):
- """
- Consume each document wrapper in a separate thread.
- We pass an instance of an accumulator that handles the notifications
- to the memorystore when the write has been done.
-
- :param doc_wrapper: a MessageWrapper or RecentFlagsDoc instance
- :type doc_wrapper: MessageWrapper or RecentFlagsDoc
- :param notify_queue: a callable that handles the writeback
- notifications to the memstore.
- :type notify_queue: callable
- """
- def queueNotifyBack(failed, doc_wrapper):
- if failed:
- log.msg("There was an error writing the mesage...")
- else:
- notify_queue(doc_wrapper)
-
- def doSoledadCalls(items):
- # we prime the generator, that should return the
- # message or flags wrapper item in the first place.
- try:
- doc_wrapper = items.next()
- except StopIteration:
- pass
- else:
- failed = self._soledad_write_document_parts(items)
- queueNotifyBack(failed, doc_wrapper)
-
- doSoledadCalls(self._iter_wrapper_subparts(doc_wrapper))
-
- #
- # SoledadStore specific methods.
- #
-
- def _soledad_write_document_parts(self, items):
- """
- Write the document parts to soledad in a separate thread.
-
- :param items: the iterator through the different document wrappers
- payloads.
- :type items: iterator
- :return: whether the write was successful or not
- :rtype: bool
- """
- failed = False
- for item, call in items:
- if empty(item):
- continue
- try:
- self._try_call(call, item)
- except Exception as exc:
- logger.debug("ITEM WAS: %s" % repr(item))
- if hasattr(item, 'content'):
- logger.debug("ITEM CONTENT WAS: %s" %
- repr(item.content))
- logger.exception(exc)
- failed = True
- continue
- return failed
-
- def _iter_wrapper_subparts(self, doc_wrapper):
- """
- Return an iterator that will yield the doc_wrapper in the first place,
- followed by the subparts item and the proper call type for every
- item in the queue, if any.
-
- :param doc_wrapper: a MessageWrapper or RecentFlagsDoc instance
- :type doc_wrapper: MessageWrapper or RecentFlagsDoc
- """
- if isinstance(doc_wrapper, MessageWrapper):
- return chain((doc_wrapper,),
- self._get_calls_for_msg_parts(doc_wrapper))
- elif isinstance(doc_wrapper, RecentFlagsDoc):
- return chain((doc_wrapper,),
- self._get_calls_for_rflags_doc(doc_wrapper))
- else:
- logger.warning("CANNOT PROCESS ITEM!")
- return (i for i in [])
-
- def _try_call(self, call, item):
- """
- Try to invoke a given call with item as a parameter.
-
- :param call: the function to call
- :type call: callable
- :param item: the payload to pass to the call as argument
- :type item: object
- """
- if call is None:
- return
-
- if call == self._PUT_DOC_FUN:
- doc_id = item.doc_id
- if doc_id is None:
- logger.warning("BUG! Dirty doc but has no doc_id!")
- return
- with put_locks[doc_id]:
- doc = self._GET_DOC_FUN(doc_id)
-
- if doc is None:
- logger.warning("BUG! Dirty doc but could not "
- "find document %s" % (doc_id,))
- return
-
- doc.content = dict(item.content)
-
- item = doc
- try:
- call(item)
- except u1db_errors.RevisionConflict as exc:
- logger.exception("Error: %r" % (exc,))
- raise exc
- except Exception as exc:
- logger.exception("Error: %r" % (exc,))
- raise exc
-
- else:
- try:
- call(item)
- except u1db_errors.RevisionConflict as exc:
- logger.exception("Error: %r" % (exc,))
- raise exc
- except Exception as exc:
- logger.exception("Error: %r" % (exc,))
- raise exc
-
- def _get_calls_for_msg_parts(self, msg_wrapper):
- """
- Generator that return the proper call type for a given item.
-
- :param msg_wrapper: A MessageWrapper
- :type msg_wrapper: IMessageContainer
- :return: a generator of tuples with recent-flags doc payload
- and callable
- :rtype: generator
- """
- call = None
-
- if msg_wrapper.new:
- call = self._CREATE_DOC_FUN
-
- # item is expected to be a MessagePartDoc
- for item in msg_wrapper.walk():
- if item.part == MessagePartType.fdoc:
- yield dict(item.content), call
-
- elif item.part == MessagePartType.hdoc:
- if not self._header_does_exist(item.content):
- yield dict(item.content), call
-
- elif item.part == MessagePartType.cdoc:
- if not self._content_does_exist(item.content):
- yield dict(item.content), call
-
- # For now, the only thing that will be dirty is
- # the flags doc.
-
- elif msg_wrapper.dirty:
- call = self._PUT_DOC_FUN
- # item is expected to be a MessagePartDoc
- for item in msg_wrapper.walk():
- # XXX FIXME Give error if dirty and not doc_id !!!
- doc_id = item.doc_id # defend!
- if not doc_id:
- logger.warning("Dirty item but no doc_id!")
- continue
-
- if item.part == MessagePartType.fdoc:
- yield item, call
-
- # XXX also for linkage-doc !!!
- else:
- logger.error("Cannot delete documents yet from the queue...!")
-
- def _get_calls_for_rflags_doc(self, rflags_wrapper):
- """
- We always put these documents.
-
- :param rflags_wrapper: A wrapper around recent flags doc.
- :type rflags_wrapper: RecentFlagsWrapper
- :return: a tuple with recent-flags doc payload and callable
- :rtype: tuple
- """
- call = self._PUT_DOC_FUN
-
- payload = rflags_wrapper.content
- if payload:
- logger.debug("Saving RFLAGS to Soledad...")
- yield rflags_wrapper, call
-
- # Mbox documents and attributes
-
- def get_mbox_document(self, mbox):
- """
- Return mailbox document.
-
- :param mbox: the mailbox
- :type mbox: str or unicode
- :return: A SoledadDocument containing this mailbox, or None if
- the query failed.
- :rtype: SoledadDocument or None.
- """
- with mbox_doc_locks[mbox]:
- return self._get_mbox_document(mbox)
-
- def _get_mbox_document(self, mbox):
- """
- Helper for returning the mailbox document.
- """
- try:
- query = self._soledad.get_from_index(
- fields.TYPE_MBOX_IDX,
- fields.TYPE_MBOX_VAL, mbox)
- if query:
- return query.pop()
- else:
- logger.error("Could not find mbox document for %r" %
- (mbox,))
- except Exception as exc:
- logger.exception("Unhandled error %r" % exc)
-
- def get_mbox_closed(self, mbox):
- """
- Return the closed attribute for a given mailbox.
-
- :param mbox: the mailbox
- :type mbox: str or unicode
- :rtype: bool
- """
- mbox_doc = self.get_mbox_document()
- return mbox_doc.content.get(fields.CLOSED_KEY, False)
-
- def set_mbox_closed(self, mbox, closed):
- """
- Set the closed attribute for a given mailbox.
-
- :param mbox: the mailbox
- :type mbox: str or unicode
- :param closed: the value to be set
- :type closed: bool
- """
- leap_assert(isinstance(closed, bool), "closed needs to be boolean")
- with mbox_doc_locks[mbox]:
- mbox_doc = self._get_mbox_document(mbox)
- if mbox_doc is None:
- logger.error(
- "Could not find mbox document for %r" % (mbox,))
- return
- mbox_doc.content[fields.CLOSED_KEY] = closed
- self._soledad.put_doc(mbox_doc)
-
- def write_last_uid(self, mbox, value):
- """
- Write the `last_uid` integer to the proper mailbox document
- in Soledad.
- This is called from the deferred triggered by
- memorystore.increment_last_soledad_uid, which is expected to
- run in a separate thread.
-
- :param mbox: the mailbox
- :type mbox: str or unicode
- :param value: the value to set
- :type value: int
- """
- leap_assert_type(value, int)
- key = fields.LAST_UID_KEY
-
- # XXX use accumulator to reduce number of hits
- with mbox_doc_locks[mbox]:
- mbox_doc = self._get_mbox_document(mbox)
- old_val = mbox_doc.content[key]
- if value > old_val:
- mbox_doc.content[key] = value
- try:
- self._soledad.put_doc(mbox_doc)
- except Exception as exc:
- logger.error("Error while setting last_uid for %r"
- % (mbox,))
- logger.exception(exc)
-
- def get_flags_doc(self, mbox, uid):
- """
- Return the SoledadDocument for the given mbox and uid.
-
- :param mbox: the mailbox
- :type mbox: str or unicode
- :param uid: the UID for the message
- :type uid: int
- :rtype: SoledadDocument or None
- """
- # TODO -- inlineCallbacks
- result = None
- try:
- # TODO -- yield
- flag_docs = self._soledad.get_from_index(
- fields.TYPE_MBOX_UID_IDX,
- fields.TYPE_FLAGS_VAL, mbox, str(uid))
- if len(flag_docs) != 1:
- logger.warning("More than one flag doc for %r:%s" %
- (mbox, uid))
- result = first(flag_docs)
- except Exception as exc:
- # ugh! Something's broken down there!
- logger.warning("ERROR while getting flags for UID: %s" % uid)
- logger.exception(exc)
- finally:
- return result
-
- def get_headers_doc(self, chash):
- """
- Return the document that keeps the headers for a message
- indexed by its content-hash.
-
- :param chash: the content-hash to retrieve the document from.
- :type chash: str or unicode
- :rtype: SoledadDocument or None
- """
- head_docs = self._soledad.get_from_index(
- fields.TYPE_C_HASH_IDX,
- fields.TYPE_HEADERS_VAL, str(chash))
- return first(head_docs)
-
- # deleted messages
-
- def deleted_iter(self, mbox):
- """
- Get an iterator for the the doc_id for SoledadDocuments for messages
- with \\Deleted flag for a given mailbox.
-
- :param mbox: the mailbox
- :type mbox: str or unicode
- :return: iterator through deleted message docs
- :rtype: iterable
- """
- return [doc.doc_id for doc in self._soledad.get_from_index(
- fields.TYPE_MBOX_DEL_IDX,
- fields.TYPE_FLAGS_VAL, mbox, '1')]
-
- def remove_all_deleted(self, mbox):
- """
- Remove from Soledad all messages flagged as deleted for a given
- mailbox.
-
- :param mbox: the mailbox
- :type mbox: str or unicode
- """
- deleted = []
- for doc_id in self.deleted_iter(mbox):
- with self._remove_lock:
- doc = self._soledad.get_doc(doc_id)
- if doc is not None:
- self._soledad.delete_doc(doc)
- try:
- deleted.append(doc.content[fields.UID_KEY])
- except TypeError:
- # empty content
- pass
- return deleted
diff --git a/src/leap/mail/mail.py b/src/leap/mail/mail.py
index ca07f67..482b64d 100644
--- a/src/leap/mail/mail.py
+++ b/src/leap/mail/mail.py
@@ -20,6 +20,7 @@ Generic Access to Mail objects: Public LEAP Mail API.
from twisted.internet import defer
from leap.mail.constants import INBOX_NAME
+from leap.mail.constants import MessageFlags
from leap.mail.mailbox_indexer import MailboxIndexer
from leap.mail.adaptors.soledad import SoledadMailAdaptor
@@ -61,6 +62,18 @@ class Message(object):
def get_internal_date(self):
"""
+ Retrieve the date internally associated with this message
+
+ According to the spec, this is NOT the date and time in the
+ RFC-822 header, but rather a date and time that reflects when the
+ message was received.
+
+ * In SMTP, date and time of final delivery.
+ * In COPY, internal date/time of the source message.
+ * In APPEND, date/time specified.
+
+ :return: An RFC822-formatted date string.
+ :rtype: str
"""
return self._wrapper.fdoc.date
@@ -99,6 +112,15 @@ class Message(object):
return tuple(self._wrapper.fdoc.tags)
+class Flagsmode(object):
+ """
+ Modes for setting the flags/tags.
+ """
+ APPEND = 1
+ REMOVE = -1
+ SET = 0
+
+
class MessageCollection(object):
"""
A generic collection of messages. It can be messages sharing the same
@@ -132,6 +154,7 @@ class MessageCollection(object):
def __init__(self, adaptor, store, mbox_indexer=None, mbox_wrapper=None):
"""
+ Constructore for a MessageCollection.
"""
self.adaptor = adaptor
self.store = store
@@ -149,6 +172,20 @@ class MessageCollection(object):
"""
return bool(self.mbox_wrapper)
+ @property
+ def mbox_name(self):
+ wrapper = getattr(self, "mbox_wrapper", None)
+ if not wrapper:
+ return None
+ return wrapper.mbox
+
+ def get_mbox_attr(self, attr):
+ return getattr(self.mbox_wrapper, attr)
+
+ def set_mbox_attr(self, attr, value):
+ setattr(self.mbox_wrapper, attr, value)
+ return self.mbox_wrapper.update(self.store)
+
# Get messages
def get_message_by_content_hash(self, chash, get_cdocs=False):
@@ -162,7 +199,7 @@ class MessageCollection(object):
# or use the internal collection of pointers-to-docs.
raise NotImplementedError()
- metamsg_id = _get_mdoc_id(self.mbox_wrapper.mbox, chash)
+ metamsg_id = _get_mdoc_id(self.mbox_name, chash)
return self.adaptor.get_msg_from_mdoc_id(
self.messageklass, self.store,
@@ -181,25 +218,37 @@ class MessageCollection(object):
raise NotImplementedError("Does not support relative ids yet")
def get_msg_from_mdoc_id(doc_id):
+ # XXX pass UID?
return self.adaptor.get_msg_from_mdoc_id(
self.messageklass, self.store,
doc_id, get_cdocs=get_cdocs)
- d = self.mbox_indexer.get_doc_id_from_uid(self.mbox_wrapper.mbox, uid)
+ d = self.mbox_indexer.get_doc_id_from_uid(self.mbox_name, uid)
d.addCallback(get_msg_from_mdoc_id)
return d
def count(self):
"""
Count the messages in this collection.
- :rtype: int
+ :return: a Deferred that will fire with the integer for the count.
+ :rtype: Deferred
"""
if not self.is_mailbox_collection():
raise NotImplementedError()
- return self.mbox_indexer.count(self.mbox_wrapper.mbox)
+ return self.mbox_indexer.count(self.mbox_name)
+
+ def get_uid_next(self):
+ """
+ Get the next integer beyond the highest UID count for this mailbox.
+
+ :return: a Deferred that will fire with the integer for the next uid.
+ :rtype: Deferred
+ """
+ return self.mbox_indexer.get_uid_next(self.mbox_name)
# Manipulate messages
+ # TODO pass flags, date too...
def add_msg(self, raw_msg):
"""
Add a message to this collection.
@@ -208,14 +257,14 @@ class MessageCollection(object):
wrapper = msg.get_wrapper()
if self.is_mailbox_collection():
- mbox = self.mbox_wrapper.mbox
+ mbox = self.mbox_name
wrapper.set_mbox(mbox)
def insert_mdoc_id(_):
# XXX does this work?
doc_id = wrapper.mdoc.doc_id
return self.mbox_indexer.insert_doc(
- self.mbox_wrapper.mbox, doc_id)
+ self.mbox_name, doc_id)
d = wrapper.create(self.store)
d.addCallback(insert_mdoc_id)
@@ -248,31 +297,45 @@ class MessageCollection(object):
# XXX does this work?
doc_id = wrapper.mdoc.doc_id
return self.mbox_indexer.delete_doc_by_hash(
- self.mbox_wrapper.mbox, doc_id)
+ self.mbox_name, doc_id)
d = wrapper.delete(self.store)
d.addCallback(delete_mdoc_id)
return d
# TODO should add a delete-by-uid to collection?
+ def _update_flags_or_tags(self, old, new, mode):
+ if mode == Flagsmode.APPEND:
+ final = list((set(tuple(old) + new)))
+ elif mode == Flagsmode.REMOVE:
+ final = list(set(old).difference(set(new)))
+ elif mode == Flagsmode.SET:
+ final = new
+ return final
+
def udpate_flags(self, msg, flags, mode):
"""
Update flags for a given message.
"""
wrapper = msg.get_wrapper()
- # 1. update the flags in the message wrapper --- stored where???
- # 2. update the special flags in the wrapper (seen, etc)
- # 3. call adaptor.update_msg(store)
- pass
+ current = wrapper.fdoc.flags
+ newflags = self._update_flags_or_tags(current, flags, mode)
+ wrapper.fdoc.flags = newflags
+
+ wrapper.fdoc.seen = MessageFlags.SEEN_FLAG in newflags
+ wrapper.fdoc.deleted = MessageFlags.DELETED_FLAG in newflags
+
+ return self.adaptor.update_msg(self.store, msg)
def update_tags(self, msg, tags, mode):
"""
Update tags for a given message.
"""
wrapper = msg.get_wrapper()
- # 1. update the tags in the message wrapper --- stored where???
- # 2. call adaptor.update_msg(store)
- pass
+ current = wrapper.fdoc.tags
+ newtags = self._update_flags_or_tags(current, tags, mode)
+ wrapper.fdoc.tags = newtags
+ return self.adaptor.update_msg(self.store, msg)
class Account(object):
@@ -382,6 +445,8 @@ class Account(object):
d.addCallback(rename_uid_table_cb)
return d
+ # Get Collections
+
def get_collection_by_mailbox(self, name):
"""
:rtype: MessageCollection
diff --git a/src/leap/mail/messageflow.py b/src/leap/mail/messageflow.py
deleted file mode 100644
index c8f224c..0000000
--- a/src/leap/mail/messageflow.py
+++ /dev/null
@@ -1,200 +0,0 @@
-# -*- coding: utf-8 -*-
-# messageflow.py
-# Copyright (C) 2013 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-"""
-Message Producers and Consumers for flow control.
-"""
-import Queue
-
-from twisted.internet.task import LoopingCall
-
-from zope.interface import Interface, implements
-
-
-class IMessageConsumer(Interface):
- """
- I consume messages from a queue.
- """
-
- def consume(self, queue):
- """
- Consumes the passed item.
-
- :param item: a queue where we put the object to be consumed.
- :type item: object
- """
- # TODO we could add an optional type to be passed
- # for doing type check.
-
- # TODO in case of errors, we could return the object to
- # the queue, maybe wrapped in an object with a retries attribute.
-
-
-class IMessageProducer(Interface):
- """
- I produce messages and put them in a store to be consumed by other
- entities.
- """
-
- def push(self, item, state=None):
- """
- Push a new item in the queue.
- """
-
- def start(self):
- """
- Start producing items.
- """
-
- def stop(self):
- """
- Stop producing items.
- """
-
- def flush(self):
- """
- Flush queued messages to consumer.
- """
-
-
-class DummyMsgConsumer(object):
-
- implements(IMessageConsumer)
-
- def consume(self, queue):
- """
- Just prints the passed item.
- """
- if not queue.empty():
- print "got item %s" % queue.get()
-
-
-class MessageProducer(object):
- """
- A Producer class that we can use to temporarily buffer the production
- of messages so that different objects can consume them.
-
- This is useful for serializing the consumption of the messages stream
- in the case of an slow resource (db), or for returning early from a
- deferred chain and leave further processing detached from the calling loop,
- as in the case of smtp.
- """
- implements(IMessageProducer)
-
- # TODO this can be seen as a first step towards properly implementing
- # components that implement IPushProducer / IConsumer interfaces.
- # However, I need to think more about how to pause the streaming.
- # In any case, the differential rate between message production
- # and consumption is not likely (?) to consume huge amounts of memory in
- # our current settings, so the need to pause the stream is not urgent now.
-
- # TODO use enum
- STATE_NEW = 1
- STATE_DIRTY = 2
-
- def __init__(self, consumer, queue=Queue.Queue, period=1):
- """
- Initializes the MessageProducer
-
- :param consumer: an instance of a IMessageConsumer that will consume
- the new messages.
- :param queue: any queue implementation to be used as the temporary
- buffer for new items. Default is a FIFO Queue.
- :param period: the period to check for new items, in seconds.
- """
- # XXX should assert it implements IConsumer / IMailConsumer
- # it should implement a `consume` method
- self._consumer = consumer
-
- self._queue_new = queue()
- self._queue_dirty = queue()
- self._period = period
-
- self._loop = LoopingCall(self._check_for_new)
-
- # private methods
-
- def _check_for_new(self):
- """
- Check for new items in the internal queue, and calls the consume
- method in the consumer.
-
- If the queue is found empty, the loop is stopped. It will be started
- again after the addition of new items.
- """
- self._consumer.consume((self._queue_new, self._queue_dirty))
- if self.is_queue_empty():
- self.stop()
-
- def is_queue_empty(self):
- """
- Return True if queue is empty, False otherwise.
- """
- new = self._queue_new
- dirty = self._queue_dirty
- return new.empty() and dirty.empty()
-
- # public methods: IMessageProducer
-
- def push(self, item, state=None):
- """
- Push a new item in the queue.
-
- If the queue was empty, we will start the loop again.
- """
- # XXX this might raise if the queue does not accept any new
- # items. what to do then?
- queue = self._queue_new
-
- if state == self.STATE_NEW:
- queue = self._queue_new
- if state == self.STATE_DIRTY:
- queue = self._queue_dirty
-
- queue.put(item)
- self.start()
-
- def start(self):
- """
- Start polling for new items.
- """
- if not self._loop.running:
- self._loop.start(self._period, now=True)
-
- def stop(self):
- """
- Stop polling for new items.
- """
- if self._loop.running:
- self._loop.stop()
-
- def flush(self):
- """
- Flush queued messages to consumer.
- """
- self._check_for_new()
-
-
-if __name__ == "__main__":
- from twisted.internet import reactor
- producer = MessageProducer(DummyMsgConsumer())
- producer.start()
-
- for delay, item in ((2, 1), (3, 2), (4, 3),
- (6, 4), (7, 5), (8, 6), (8.2, 7),
- (15, 'a'), (16, 'b'), (17, 'c')):
- reactor.callLater(delay, producer.put, item)
- reactor.run()