summaryrefslogtreecommitdiff
path: root/src/leap/mail/imap/messages.py
diff options
context:
space:
mode:
authorKali Kaneko <kali@leap.se>2014-01-23 02:32:52 -0400
committerKali Kaneko <kali@leap.se>2014-01-28 19:38:44 -0400
commiteaa4bcb241d5d55c4fd2458cb05c74fcdc79368c (patch)
tree8a246acfd0faec57a087ced77dbe9804464feada /src/leap/mail/imap/messages.py
parentf3e23e9f3f41bc30d92e88b4ed7eb56b2aeb40ff (diff)
split messageparts
Diffstat (limited to 'src/leap/mail/imap/messages.py')
-rw-r--r--src/leap/mail/imap/messages.py423
1 files changed, 24 insertions, 399 deletions
diff --git a/src/leap/mail/imap/messages.py b/src/leap/mail/imap/messages.py
index ef0b0a1..67e5a41 100644
--- a/src/leap/mail/imap/messages.py
+++ b/src/leap/mail/imap/messages.py
@@ -24,13 +24,12 @@ import time
import threading
import StringIO
-from collections import defaultdict, namedtuple
+from collections import defaultdict
from functools import partial
from twisted.mail import imap4
from twisted.internet import defer
from twisted.python import log
-from u1db import errors as u1db_errors
from zope.interface import implements
from zope.proxy import sameProxiedObjects
@@ -38,13 +37,12 @@ from leap.common.check import leap_assert, leap_assert_type
from leap.common.decorators import memoized_method
from leap.common.mail import get_email_charset
from leap.mail import walk
-from leap.mail.utils import first, find_charset
+from leap.mail.utils import first, find_charset, lowerdict
from leap.mail.decorators import deferred
from leap.mail.imap.index import IndexedDB
from leap.mail.imap.fields import fields, WithMsgFields
from leap.mail.imap.memorystore import MessageDict
from leap.mail.imap.parser import MailParser, MBoxParser
-from leap.mail.messageflow import IMessageConsumer
logger = logging.getLogger(__name__)
@@ -52,29 +50,18 @@ logger = logging.getLogger(__name__)
# [ ] Add ref to incoming message during add_msg
# [ ] Add linked-from info.
+# * Need a new type of documents: linkage info.
+# * HDOCS are linked from FDOCs (ref to chash)
+# * CDOCS are linked from HDOCS (ref to chash)
+
# [ ] Delete incoming mail only after successful write!
# [ ] Remove UID from syncable db. Store only those indexes locally.
+CHARSET_PATTERN = r"""charset=([\w-]+)"""
+MSGID_PATTERN = r"""<([\w@.]+)>"""
-# XXX no longer needed, since i'm using proxies instead of direct weakrefs
-def maybe_call(thing):
- """
- Return the same thing, or the result of its invocation if it is a callable.
- """
- return thing() if callable(thing) else thing
-
-
-def lowerdict(_dict):
- """
- Return a dict with the keys in lowercase.
-
- :param _dict: the dict to convert
- :rtype: dict
- """
- # TODO should properly implement a CaseInsensitive dict.
- # Look into requests code.
- return dict((key.lower(), value)
- for key, value in _dict.items())
+CHARSET_RE = re.compile(CHARSET_PATTERN, re.IGNORECASE)
+MSGID_RE = re.compile(MSGID_PATTERN)
def try_unique_query(curried):
@@ -102,232 +89,6 @@ def try_unique_query(curried):
except Exception as exc:
logger.exception("Unhandled error %r" % exc)
-MSGID_PATTERN = r"""<([\w@.]+)>"""
-MSGID_RE = re.compile(MSGID_PATTERN)
-
-
-class MessagePart(object):
- """
- IMessagePart implementor.
- It takes a subpart message and is able to find
- the inner parts.
-
- Excusatio non petita: see the interface documentation.
- """
-
- implements(imap4.IMessagePart)
-
- def __init__(self, soledad, part_map):
- """
- Initializes the MessagePart.
-
- :param part_map: a dictionary containing the parts map for this
- message
- :type part_map: dict
- """
- # TODO
- # It would be good to pass the uid/mailbox also
- # for references while debugging.
-
- # We have a problem on bulk moves, and is
- # that when the fetch on the new mailbox is done
- # the parts maybe are not complete.
- # So we should be able to fail with empty
- # docs until we solve that. The ideal would be
- # to gather the results of the deferred operations
- # to signal the operation is complete.
- #leap_assert(part_map, "part map dict cannot be null")
- self._soledad = soledad
- self._pmap = part_map
-
- def getSize(self):
- """
- Return the total size, in octets, of this message part.
-
- :return: size of the message, in octets
- :rtype: int
- """
- if not self._pmap:
- return 0
- size = self._pmap.get('size', None)
- if not size:
- logger.error("Message part cannot find size in the partmap")
- return size
-
- def getBodyFile(self):
- """
- Retrieve a file object containing only the body of this message.
-
- :return: file-like object opened for reading
- :rtype: StringIO
- """
- fd = StringIO.StringIO()
- if self._pmap:
- multi = self._pmap.get('multi')
- if not multi:
- phash = self._pmap.get("phash", None)
- else:
- pmap = self._pmap.get('part_map')
- first_part = pmap.get('1', None)
- if first_part:
- phash = first_part['phash']
-
- if not phash:
- logger.warning("Could not find phash for this subpart!")
- payload = str("")
- else:
- payload = self._get_payload_from_document(phash)
-
- else:
- logger.warning("Message with no part_map!")
- payload = str("")
-
- if payload:
- content_type = self._get_ctype_from_document(phash)
- charset = find_charset(content_type)
- logger.debug("Got charset from header: %s" % (charset,))
- if charset is None:
- charset = self._get_charset(payload)
- logger.debug("Got charset: %s" % (charset,))
- try:
- payload = payload.encode(charset)
- except (UnicodeEncodeError, UnicodeDecodeError) as e:
- logger.error("Unicode error, using 'replace'. {0!r}".format(e))
- payload = payload.encode(charset, 'replace')
-
- fd.write(payload)
- fd.seek(0)
- return fd
-
- # TODO cache the phash retrieval
- def _get_payload_from_document(self, phash):
- """
- Gets the message payload from the content document.
-
- :param phash: the payload hash to retrieve by.
- :type phash: basestring
- """
- cdocs = self._soledad.get_from_index(
- fields.TYPE_P_HASH_IDX,
- fields.TYPE_CONTENT_VAL, str(phash))
-
- cdoc = first(cdocs)
- if not cdoc:
- logger.warning(
- "Could not find the content doc "
- "for phash %s" % (phash,))
- payload = cdoc.content.get(fields.RAW_KEY, "")
- return payload
-
- # TODO cache the pahash retrieval
- def _get_ctype_from_document(self, phash):
- """
- Gets the content-type from the content document.
-
- :param phash: the payload hash to retrieve by.
- :type phash: basestring
- """
- cdocs = self._soledad.get_from_index(
- fields.TYPE_P_HASH_IDX,
- fields.TYPE_CONTENT_VAL, str(phash))
-
- cdoc = first(cdocs)
- if not cdoc:
- logger.warning(
- "Could not find the content doc "
- "for phash %s" % (phash,))
- ctype = cdoc.content.get('ctype', "")
- return ctype
-
- @memoized_method
- def _get_charset(self, stuff):
- # TODO put in a common class with LeapMessage
- """
- Gets (guesses?) the charset of a payload.
-
- :param stuff: the stuff to guess about.
- :type stuff: basestring
- :returns: charset
- """
- # XXX existential doubt 2. shouldn't we make the scope
- # of the decorator somewhat more persistent?
- # ah! yes! and put memory bounds.
- return get_email_charset(unicode(stuff))
-
- def getHeaders(self, negate, *names):
- """
- Retrieve a group of message headers.
-
- :param names: The names of the headers to retrieve or omit.
- :type names: tuple of str
-
- :param negate: If True, indicates that the headers listed in names
- should be omitted from the return value, rather
- than included.
- :type negate: bool
-
- :return: A mapping of header field names to header field values
- :rtype: dict
- """
- if not self._pmap:
- logger.warning("No pmap in Subpart!")
- return {}
- headers = dict(self._pmap.get("headers", []))
-
- # twisted imap server expects *some* headers to be lowercase
- # We could use a CaseInsensitiveDict here...
- headers = dict(
- (str(key), str(value)) if key.lower() != "content-type"
- else (str(key.lower()), str(value))
- for (key, value) in headers.items())
-
- names = map(lambda s: s.upper(), names)
- if negate:
- cond = lambda key: key.upper() not in names
- else:
- cond = lambda key: key.upper() in names
-
- # unpack and filter original dict by negate-condition
- filter_by_cond = [
- map(str, (key, val)) for
- key, val in headers.items()
- if cond(key)]
- filtered = dict(filter_by_cond)
- return filtered
-
- def isMultipart(self):
- """
- Return True if this message is multipart.
- """
- if not self._pmap:
- logger.warning("Could not get part map!")
- return False
- multi = self._pmap.get("multi", False)
- return multi
-
- def getSubPart(self, part):
- """
- Retrieve a MIME submessage
-
- :type part: C{int}
- :param part: The number of the part to retrieve, indexed from 0.
- :raise IndexError: Raised if the specified part does not exist.
- :raise TypeError: Raised if this message is not multipart.
- :rtype: Any object implementing C{IMessagePart}.
- :return: The specified sub-part.
- """
- if not self.isMultipart():
- raise TypeError
- sub_pmap = self._pmap.get("part_map", {})
- try:
- part_map = sub_pmap[str(part + 1)]
- except KeyError:
- logger.debug("getSubpart for %s: KeyError" % (part,))
- raise IndexError
-
- # XXX check for validity
- return MessagePart(self._soledad, part_map)
-
class LeapMessage(fields, MailParser, MBoxParser):
"""
@@ -380,7 +141,7 @@ class LeapMessage(fields, MailParser, MBoxParser):
if not fdoc:
fdoc = self._get_flags_doc()
if fdoc:
- fdoc_content = maybe_call(fdoc.content)
+ fdoc_content = fdoc.content
self.__chash = fdoc_content.get(
fields.CONTENT_HASH_KEY, None)
return fdoc
@@ -404,7 +165,7 @@ class LeapMessage(fields, MailParser, MBoxParser):
if not self._fdoc:
return None
if not self.__chash and self._fdoc:
- self.__chash = maybe_call(self._fdoc.content).get(
+ self.__chash = self._fdoc.content.get(
fields.CONTENT_HASH_KEY, None)
return self.__chash
@@ -444,7 +205,7 @@ class LeapMessage(fields, MailParser, MBoxParser):
flags = []
fdoc = self._fdoc
if fdoc:
- flags = maybe_call(fdoc.content).get(self.FLAGS_KEY, None)
+ flags = fdoc.content.get(self.FLAGS_KEY, None)
msgcol = self._collection
@@ -557,12 +318,12 @@ class LeapMessage(fields, MailParser, MBoxParser):
charset = self._get_charset(body)
try:
body = body.encode(charset)
- except UnicodeError as e:
- logger.error("Unicode error {0}".format(e))
+ except UnicodeError as exc:
+ logger.error("Unicode error {0}".format(exc))
logger.debug("Attempted to encode with: %s" % charset)
try:
body = body.encode(charset, 'replace')
- except UnicodeError as e:
+ except UnicodeError as exc:
try:
body = body.encode('utf-8', 'replace')
except:
@@ -601,7 +362,7 @@ class LeapMessage(fields, MailParser, MBoxParser):
"""
size = None
if self._fdoc:
- fdoc_content = maybe_call(self._fdoc.content)
+ fdoc_content = self._fdoc.content
size = fdoc_content.get(self.SIZE_KEY, False)
else:
logger.warning("No FLAGS doc for %s:%s" % (self._mbox,
@@ -667,7 +428,7 @@ class LeapMessage(fields, MailParser, MBoxParser):
Return the headers dict for this message.
"""
if self._hdoc is not None:
- hdoc_content = maybe_call(self._hdoc.content)
+ hdoc_content = self._hdoc.content
headers = hdoc_content.get(self.HEADERS_KEY, {})
return headers
@@ -682,7 +443,7 @@ class LeapMessage(fields, MailParser, MBoxParser):
Return True if this message is multipart.
"""
if self._fdoc:
- fdoc_content = maybe_call(self._fdoc.content)
+ fdoc_content = self._fdoc.content
is_multipart = fdoc_content.get(self.MULTIPART_KEY, False)
return is_multipart
else:
@@ -725,7 +486,7 @@ class LeapMessage(fields, MailParser, MBoxParser):
logger.warning("Tried to get part but no HDOC found!")
return None
- hdoc_content = maybe_call(self._hdoc.content)
+ hdoc_content = self._hdoc.content
pmap = hdoc_content.get(fields.PARTS_MAP_KEY, {})
return pmap[str(part)]
@@ -762,7 +523,7 @@ class LeapMessage(fields, MailParser, MBoxParser):
Return the document that keeps the body for this
message.
"""
- hdoc_content = maybe_call(self._hdoc.content)
+ hdoc_content = self._hdoc.content
body_phash = hdoc_content.get(
fields.BODY_KEY, None)
if not body_phash:
@@ -801,7 +562,7 @@ class LeapMessage(fields, MailParser, MBoxParser):
:return: The content value indexed by C{key} or None
:rtype: str
"""
- return maybe_call(self._fdoc.content).get(key, None)
+ return self._fdoc.content.get(key, None)
# setters
@@ -874,143 +635,7 @@ class LeapMessage(fields, MailParser, MBoxParser):
return self._fdoc is not None
-class ContentDedup(object):
- """
- Message deduplication.
-
- We do a query for the content hashes before writing to our beloved
- sqlcipher backend of Soledad. This means, by now, that:
-
- 1. We will not store the same attachment twice, only the hash of it.
- 2. We will not store the same message body twice, only the hash of it.
-
- The first case is useful if you are always receiving the same old memes
- from unwary friends that still have not discovered that 4chan is the
- generator of the internet. The second will save your day if you have
- initiated session with the same account in two different machines. I also
- wonder why would you do that, but let's respect each other choices, like
- with the religious celebrations, and assume that one day we'll be able
- to run Bitmask in completely free phones. Yes, I mean that, the whole GSM
- Stack.
- """
-
- def _content_does_exist(self, doc):
- """
- Check whether we already have a content document for a payload
- with this hash in our database.
-
- :param doc: tentative body document
- :type doc: dict
- :returns: True if that happens, False otherwise.
- """
- if not doc:
- return False
- phash = doc[fields.PAYLOAD_HASH_KEY]
- attach_docs = self._soledad.get_from_index(
- fields.TYPE_P_HASH_IDX,
- fields.TYPE_CONTENT_VAL, str(phash))
- if not attach_docs:
- return False
-
- if len(attach_docs) != 1:
- logger.warning("Found more than one copy of phash %s!"
- % (phash,))
- logger.debug("Found attachment doc with that hash! Skipping save!")
- return True
-
-
-SoledadWriterPayload = namedtuple(
- 'SoledadWriterPayload', ['mode', 'payload'])
-
-# TODO we could consider using enum here:
-# https://pypi.python.org/pypi/enum
-
-SoledadWriterPayload.CREATE = 1
-SoledadWriterPayload.PUT = 2
-SoledadWriterPayload.CONTENT_CREATE = 3
-
-
-"""
-SoledadDocWriter was used to avoid writing to the db from multiple threads.
-Its use here has been deprecated in favor of a local rw_lock in the client.
-But we might want to reuse in in the near future to implement priority queues.
-"""
-
-
-class SoledadDocWriter(object):
- """
- This writer will create docs serially in the local soledad database.
- """
-
- implements(IMessageConsumer)
-
- def __init__(self, soledad):
- """
- Initialize the writer.
-
- :param soledad: the soledad instance
- :type soledad: Soledad
- """
- self._soledad = soledad
-
- def _get_call_for_item(self, item):
- """
- Return the proper call type for a given item.
-
- :param item: one of the types defined under the
- attributes of SoledadWriterPayload
- :type item: int
- """
- call = None
- payload = item.payload
-
- if item.mode == SoledadWriterPayload.CREATE:
- call = self._soledad.create_doc
- elif (item.mode == SoledadWriterPayload.CONTENT_CREATE
- and not self._content_does_exist(payload)):
- call = self._soledad.create_doc
- elif item.mode == SoledadWriterPayload.PUT:
- call = self._soledad.put_doc
- return call
-
- def _process(self, queue):
- """
- Return the item and the proper call type for the next
- item in the queue if any.
-
- :param queue: the queue from where we'll pick item.
- :type queue: Queue
- """
- item = queue.get()
- call = self._get_call_for_item(item)
- return item, call
-
- def consume(self, queue):
- """
- Creates a new document in soledad db.
-
- :param queue: queue to get item from, with content of the document
- to be inserted.
- :type queue: Queue
- """
- empty = queue.empty()
- while not empty:
- item, call = self._process(queue)
-
- if call:
- # XXX should handle the delete case
- # should handle errors
- try:
- call(item.payload)
- except u1db_errors.RevisionConflict as exc:
- logger.error("Error: %r" % (exc,))
- raise exc
-
- empty = queue.empty()
-
-
-class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser,
- ContentDedup):
+class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
"""
A collection of messages, surprisingly.
@@ -1360,7 +985,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser,
# TODO ---- add reference to original doc, to be deleted
# after writes are done.
msg_container = MessageDict(fd, hd, cdocs)
- self._memstore.put(self.mbox, uid, msg_container)
+ self._memstore.create_message(self.mbox, uid, msg_container)
def _remove_cb(self, result):
return result