# -*- coding: utf-8 -*-
# messages.py
# Copyright (C) 2013 LEAP
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
"""
LeapMessage and MessageCollection.
"""
import copy
import logging
import StringIO
from collections import namedtuple
from twisted.mail import imap4
from twisted.python import log
from u1db import errors as u1db_errors
from zope.interface import implements
from zope.proxy import sameProxiedObjects
from leap.common.check import leap_assert, leap_assert_type
from leap.common.mail import get_email_charset
from leap.mail.decorators import deferred
from leap.mail.imap.account import SoledadBackedAccount
from leap.mail.imap.index import IndexedDB
from leap.mail.imap.fields import fields, WithMsgFields
from leap.mail.imap.parser import MailParser, MBoxParser
from leap.mail.messageflow import IMessageConsumer, MessageProducer
logger = logging.getLogger(__name__)
class LeapMessage(fields, MailParser, MBoxParser):
implements(imap4.IMessage)
def __init__(self, soledad, uid, mbox):
"""
Initializes a LeapMessage.
:param soledad: a Soledad instance
:type soledad: Soledad
:param uid: the UID for the message.
:type uid: int or basestring
:param mbox: the mbox this message belongs to
:type mbox: basestring
"""
MailParser.__init__(self)
self._soledad = soledad
self._uid = int(uid)
self._mbox = self._parse_mailbox_name(mbox)
self._chash = None
self.__cdoc = None
@property
def _fdoc(self):
"""
An accessor to the flags document.
"""
return self._get_flags_doc()
@property
def _cdoc(self):
"""
An accessor to the content document.
"""
if not self.__cdoc:
self.__cdoc = self._get_content_doc()
return self.__cdoc
@property
def _chash(self):
"""
An accessor to the content hash for this message.
"""
if not self._fdoc:
return None
return self._fdoc.content.get(fields.CONTENT_HASH_KEY, None)
# IMessage implementation
def getUID(self):
"""
Retrieve the unique identifier associated with this message
:return: uid for this message
:rtype: int
"""
return self._uid
def getFlags(self):
"""
Retrieve the flags associated with this message
:return: The flags, represented as strings
:rtype: tuple
"""
if self._uid is None:
return []
flags = []
flag_doc = self._fdoc
if flag_doc:
flags = flag_doc.content.get(self.FLAGS_KEY, None)
if flags:
flags = map(str, flags)
return tuple(flags)
# setFlags, addFlags, removeFlags are not in the interface spec
# but we use them with store command.
def setFlags(self, flags):
"""
Sets the flags for this message
Returns a SoledadDocument that needs to be updated by the caller.
:param flags: the flags to update in the message.
:type flags: tuple of str
:return: a SoledadDocument instance
:rtype: SoledadDocument
"""
leap_assert(isinstance(flags, tuple), "flags need to be a tuple")
log.msg('setting flags: %s' % (self._uid))
doc = self._fdoc
doc.content[self.FLAGS_KEY] = flags
doc.content[self.SEEN_KEY] = self.SEEN_FLAG in flags
doc.content[self.RECENT_KEY] = self.RECENT_FLAG in flags
self._soledad.put_doc(doc)
def addFlags(self, flags):
"""
Adds flags to this message.
Returns a SoledadDocument that needs to be updated by the caller.
:param flags: the flags to add to the message.
:type flags: tuple of str
:return: a SoledadDocument instance
:rtype: SoledadDocument
"""
leap_assert(isinstance(flags, tuple), "flags need to be a tuple")
oldflags = self.getFlags()
self.setFlags(tuple(set(flags + oldflags)))
def removeFlags(self, flags):
"""
Remove flags from this message.
Returns a SoledadDocument that needs to be updated by the caller.
:param flags: the flags to be removed from the message.
:type flags: tuple of str
:return: a SoledadDocument instance
:rtype: SoledadDocument
"""
leap_assert(isinstance(flags, tuple), "flags need to be a tuple")
oldflags = self.getFlags()
self.setFlags(tuple(set(oldflags) - set(flags)))
def getInternalDate(self):
"""
Retrieve the date internally associated with this message
:rtype: C{str}
:return: An RFC822-formatted date string.
"""
return str(self._cdoc.content.get(self.DATE_KEY, ''))
#
# IMessagePart
#
# XXX we should implement this interface too for the subparts
# so we allow nested parts...
def getBodyFile(self):
"""
Retrieve a file object containing only the body of this message.
:return: file-like object opened for reading
:rtype: StringIO
"""
fd = StringIO.StringIO()
cdoc = self._cdoc
content = cdoc.content.get(self.RAW_KEY, '')
charset = get_email_charset(
unicode(cdoc.content.get(self.RAW_KEY, '')))
try:
content = content.encode(charset)
except (UnicodeEncodeError, UnicodeDecodeError) as e:
logger.error("Unicode error {0}".format(e))
content = content.encode(charset, 'replace')
raw = self._get_raw_msg()
msg = self._get_parsed_msg(raw)
body = msg.get_payload()
fd.write(body)
# XXX SHOULD use a separate BODY FIELD ...
fd.seek(0)
return fd
def getSize(self):
"""
Return the total size, in octets, of this message.
:return: size of the message, in octets
:rtype: int
"""
size = self._cdoc.content.get(self.SIZE_KEY, False)
if not size:
# XXX fallback, should remove when all migrated.
size = self.getBodyFile().len
return size
def _get_headers(self):
"""
Return the headers dict stored in this message document.
"""
# XXX get from the headers doc
return self._cdoc.content.get(self.HEADERS_KEY, {})
def getHeaders(self, negate, *names):
"""
Retrieve a group of message headers.
:param names: The names of the headers to retrieve or omit.
:type names: tuple of str
:param negate: If True, indicates that the headers listed in names
should be omitted from the return value, rather
than included.
:type negate: bool
:return: A mapping of header field names to header field values
:rtype: dict
"""
headers = self._get_headers()
names = map(lambda s: s.upper(), names)
if negate:
cond = lambda key: key.upper() not in names
else:
cond = lambda key: key.upper() in names
# unpack and filter original dict by negate-condition
filter_by_cond = [
map(str, (key, val)) for
key, val in headers.items()
if cond(key)]
return dict(filter_by_cond)
def isMultipart(self):
"""
Return True if this message is multipart.
"""
if self._cdoc:
retval = self._fdoc.content.get(self.MULTIPART_KEY, False)
return retval
def getSubPart(self, part):
"""
Retrieve a MIME submessage
:type part: C{int}
:param part: The number of the part to retrieve, indexed from 0.
:raise IndexError: Raised if the specified part does not exist.
:raise TypeError: Raised if this message is not multipart.
:rtype: Any object implementing C{IMessagePart}.
:return: The specified sub-part.
"""
if not self.isMultipart():
raise TypeError
msg = self._get_parsed_msg()
# XXX should wrap IMessagePart
return msg.get_payload()[part]
#
# accessors
#
def _get_flags_doc(self):
"""
Return the document that keeps the flags for this
message.
"""
flag_docs = self._soledad.get_from_index(
SoledadBackedAccount.TYPE_MBOX_UID_IDX,
fields.TYPE_FLAGS_VAL, self._mbox, str(self._uid))
flag_doc = flag_docs[0] if flag_docs else None
return flag_doc
def _get_content_doc(self):
"""
Return the document that keeps the flags for this
message.
"""
cont_docs = self._soledad.get_from_index(
SoledadBackedAccount.TYPE_HASH_IDX,
fields.TYPE_MESSAGE_VAL, self._content_hash, str(self._uid))
cont_doc = cont_docs[0] if cont_docs else None
return cont_doc
def _get_raw_msg(self):
"""
Return the raw msg.
:rtype: basestring
"""
return self._cdoc.content.get(self.RAW_KEY, '')
def __getitem__(self, key):
"""
Return the content of the message document.
:param key: The key
:type key: str
:return: The content value indexed by C{key} or None
:rtype: str
"""
return self._cdoc.content.get(key, None)
def does_exist(self):
"""
Return True if there is actually a message for this
UID and mbox.
"""
return bool(self._fdoc)
SoledadWriterPayload = namedtuple(
'SoledadWriterPayload', ['mode', 'payload'])
SoledadWriterPayload.CREATE = 1
SoledadWriterPayload.PUT = 2
class SoledadDocWriter(object):
"""
This writer will create docs serially in the local soledad database.
"""
implements(IMessageConsumer)
def __init__(self, soledad):
"""
Initialize the writer.
:param soledad: the soledad instance
:type soledad: Soledad
"""
self._soledad = soledad
def consume(self, queue):
"""
Creates a new document in soledad db.
:param queue: queue to get item from, with content of the document
to be inserted.
:type queue: Queue
"""
empty = queue.empty()
while not empty:
item = queue.get()
if item.mode == SoledadWriterPayload.CREATE:
call = self._soledad.create_doc
elif item.mode == SoledadWriterPayload.PUT:
call = self._soledad.put_doc
# should handle errors
try:
call(item.payload)
except u1db_errors.RevisionConflict as exc:
logger.error("Error: %r" % (exc,))
raise exc
empty = queue.empty()
class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
"""
A collection of messages, surprisingly.
It is tied to a selected mailbox name that is passed to constructor.
Implements a filter query over the messages contained in a soledad
database.
"""
# XXX this should be able to produce a MessageSet methinks
EMPTY_MSG = {
fields.TYPE_KEY: fields.TYPE_MESSAGE_VAL,
fields.UID_KEY: 1,
fields.MBOX_KEY: fields.INBOX_VAL,
fields.SUBJECT_KEY: "",
fields.DATE_KEY: "",
fields.RAW_KEY: "",
# XXX should separate headers into another doc
fields.HEADERS_KEY: {},
}
EMPTY_FLAGS = {
fields.TYPE_KEY: fields.TYPE_FLAGS_VAL,
fields.UID_KEY: 1,
fields.MBOX_KEY: fields.INBOX_VAL,
fields.FLAGS_KEY: [],
fields.SEEN_KEY: False,
fields.RECENT_KEY: True,
fields.MULTIPART_KEY: False,
}
# get from SoledadBackedAccount the needed index-related constants
INDEXES = SoledadBackedAccount.INDEXES
TYPE_IDX = SoledadBackedAccount.TYPE_IDX
def __init__(self, mbox=None, soledad=None):
"""
Constructor for MessageCollection.
:param mbox: the name of the mailbox. It is the name
with which we filter the query over the
messages database
:type mbox: str
:param soledad: Soledad database
:type soledad: Soledad instance
"""
MailParser.__init__(self)
leap_assert(mbox, "Need a mailbox name to initialize")
leap_assert(mbox.strip() != "", "mbox cannot be blank space")
leap_assert(isinstance(mbox, (str, unicode)),
"mbox needs to be a string")
leap_assert(soledad, "Need a soledad instance to initialize")
# okay, all in order, keep going...
self.mbox = self._parse_mailbox_name(mbox)
self._soledad = soledad
self.initialize_db()
# I think of someone like nietzsche when reading this
# this will be the producer that will enqueue the content
# to be processed serially by the consumer (the writer). We just
# need to `put` the new material on its plate.
self.soledad_writer = MessageProducer(
SoledadDocWriter(soledad),
period=0.05)
def _get_empty_msg(self):
"""
Returns an empty message.
:return: a dict containing a default empty message
:rtype: dict
"""
return copy.deepcopy(self.EMPTY_MSG)
def _get_empty_flags_doc(self):
"""
Returns an empty doc for storing flags.
:return:
:rtype:
"""
return copy.deepcopy(self.EMPTY_FLAGS)
@deferred
def add_msg(self, raw, subject=None, flags=None, date=None, uid=1):
"""
Creates a new message document.
:param raw: the raw message
:type raw: str
:param subject: subject of the message.
:type subject: str
:param flags: flags
:type flags: list
:param date: the received date for the message
:type date: str
:param uid: the message uid for this mailbox
:type uid: int
"""
# TODO: split in smaller methods
logger.debug('adding message')
if flags is None:
flags = tuple()
leap_assert_type(flags, tuple)
content_doc = self._get_empty_msg()
flags_doc = self._get_empty_flags_doc()
content_doc[self.MBOX_KEY] = self.mbox
flags_doc[self.MBOX_KEY] = self.mbox
# ...should get a sanity check here.
content_doc[self.UID_KEY] = uid
flags_doc[self.UID_KEY] = uid
if flags:
flags_doc[self.FLAGS_KEY] = map(self._stringify, flags)
flags_doc[self.SEEN_KEY] = self.SEEN_FLAG in flags
msg = self._get_parsed_msg(raw)
headers = dict(msg)
logger.debug("adding. is multipart:%s" % msg.is_multipart())
flags_doc[self.MULTIPART_KEY] = msg.is_multipart()
# XXX get lower case for keys?
# XXX get headers doc
content_doc[self.HEADERS_KEY] = headers
# set subject based on message headers and eventually replace by
# subject given as param
if self.SUBJECT_FIELD in headers:
content_doc[self.SUBJECT_KEY] = headers[self.SUBJECT_FIELD]
if subject is not None:
content_doc[self.SUBJECT_KEY] = subject
# XXX could separate body into its own doc
# but should also separate multiparts
# that should be wrapped in MessagePart
content_doc[self.RAW_KEY] = self._stringify(raw)
content_doc[self.SIZE_KEY] = len(raw)
if not date and self.DATE_FIELD in headers:
content_doc[self.DATE_KEY] = headers[self.DATE_FIELD]
else:
content_doc[self.DATE_KEY] = date
logger.debug('enqueuing message for write')
ptuple = SoledadWriterPayload
self.soledad_writer.put(ptuple(
mode=ptuple.CREATE, payload=content_doc))
self.soledad_writer.put(ptuple(
mode=ptuple.CREATE, payload=flags_doc))
def remove(self, msg):
"""
Removes a message.
:param msg: a Leapmessage instance
:type msg: LeapMessage
"""
# XXX remove
#self._soledad.delete_doc(msg)
msg.remove()
# getters
def get_msg_by_uid(self, uid):
"""
Retrieves a LeapMessage by UID.
:param uid: the message uid to query by
:type uid: int
:return: A LeapMessage instance matching the query,
or None if not found.
:rtype: LeapMessage
"""
msg = LeapMessage(self._soledad, uid, self.mbox)
if not msg.does_exist():
return None
return msg
def get_all_docs(self, _type=fields.TYPE_FLAGS_VAL):
"""
Get all documents for the selected mailbox of the
passed type. By default, it returns the flag docs.
If you want acess to the content, use __iter__ instead
:return: a list of u1db documents
:rtype: list of SoledadDocument
"""
if _type not in fields.__dict__.values():
raise TypeError("Wrong type passed to get_all")
if sameProxiedObjects(self._soledad, None):
logger.warning('Tried to get messages but soledad is None!')
return []
all_docs = [doc for doc in self._soledad.get_from_index(
SoledadBackedAccount.TYPE_MBOX_IDX,
_type, self.mbox)]
# inneficient, but first let's grok it and then
# let's worry about efficiency.
# XXX FIXINDEX -- should implement order by in soledad
return sorted(all_docs, key=lambda item: item.content['uid'])
def all_msg_iter(self):
"""
Return an iterator trhough the UIDs of all messages, sorted in
ascending order.
"""
all_uids = (doc.content[self.UID_KEY] for doc in
self._soledad.get_from_index(
SoledadBackedAccount.TYPE_MBOX_IDX,
self.TYPE_FLAGS_VAL, self.mbox))
return (u for u in sorted(all_uids))
def count(self):
"""
Return the count of messages for this mailbox.
:rtype: int
"""
count = self._soledad.get_count_from_index(
SoledadBackedAccount.TYPE_MBOX_IDX,
fields.TYPE_FLAGS_VAL, self.mbox)
return count
# unseen messages
def unseen_iter(self):
"""
Get an iterator for the message UIDs with no `seen` flag
for this mailbox.
:return: iterator through unseen message doc UIDs
:rtype: iterable
"""
return (doc.content[self.UID_KEY] for doc in
self._soledad.get_from_index(
SoledadBackedAccount.TYPE_MBOX_SEEN_IDX,
self.TYPE_FLAGS_VAL, self.mbox, '0'))
def count_unseen(self):
"""
Count all messages with the `Unseen` flag.
:returns: count
:rtype: int
"""
count = self._soledad.get_count_from_index(
SoledadBackedAccount.TYPE_MBOX_SEEN_IDX,
self.TYPE_FLAGS_VAL, self.mbox, '0')
return count
def get_unseen(self):
"""
Get all messages with the `Unseen` flag
:returns: a list of LeapMessages
:rtype: list
"""
return [LeapMessage(self._soledad, docid, self.mbox)
for docid in self.unseen_iter()]
# recent messages
def recent_iter(self):
"""
Get an iterator for the message docs with `recent` flag.
:return: iterator through recent message docs
:rtype: iterable
"""
return (doc.content[self.UID_KEY] for doc in
self._soledad.get_from_index(
SoledadBackedAccount.TYPE_MBOX_RECT_IDX,
self.TYPE_FLAGS_VAL, self.mbox, '1'))
def get_recent(self):
"""
Get all messages with the `Recent` flag.
:returns: a list of LeapMessages
:rtype: list
"""
return [LeapMessage(self._soledad, docid, self.mbox)
for docid in self.recent_iter()]
def count_recent(self):
"""
Count all messages with the `Recent` flag.
:returns: count
:rtype: int
"""
count = self._soledad.get_count_from_index(
SoledadBackedAccount.TYPE_MBOX_RECT_IDX,
self.TYPE_FLAGS_VAL, self.mbox, '1')
return count
def __len__(self):
"""
Returns the number of messages on this mailbox.
:rtype: int
"""
return self.count()
def __iter__(self):
"""
Returns an iterator over all messages.
:returns: iterator of dicts with content for all messages.
:rtype: iterable
"""
return (LeapMessage(self._soledad, docuid, self.mbox)
for docuid in self.all_msg_iter())
def __repr__(self):
"""
Representation string for this object.
"""
return u"" % (
self.mbox, self.count())
# XXX should implement __eq__ also !!! --- use a hash
# of content for that, will be used for dedup.