diff options
| author | Tomás Touceda <chiiph@leap.se> | 2014-01-16 10:37:38 -0300 | 
|---|---|---|
| committer | Tomás Touceda <chiiph@leap.se> | 2014-01-16 10:37:38 -0300 | 
| commit | 06eebf54ab572ebaf6730f2095a062cd9549f12e (patch) | |
| tree | 35c66cbeae132418dfbfc1524b36b55482080dcb /src/leap/mail/imap/messages.py | |
| parent | df5dfdb2d8dd407ed9ed0d2b043e0da299b9c341 (diff) | |
| parent | 1069e7b9470fb63f70a43fa72407fcd4276d550d (diff) | |
Merge remote-tracking branch 'refs/remotes/kali/bug/bug_4963_fix-bulky-fetch' into develop
Diffstat (limited to 'src/leap/mail/imap/messages.py')
| -rw-r--r-- | src/leap/mail/imap/messages.py | 192 | 
1 files changed, 107 insertions, 85 deletions
| diff --git a/src/leap/mail/imap/messages.py b/src/leap/mail/imap/messages.py index a3d29d6..22de356 100644 --- a/src/leap/mail/imap/messages.py +++ b/src/leap/mail/imap/messages.py @@ -20,7 +20,6 @@ LeapMessage and MessageCollection.  import copy  import logging  import re -import threading  import time  import StringIO @@ -46,15 +45,11 @@ from leap.mail.messageflow import IMessageConsumer, MessageProducer  logger = logging.getLogger(__name__) -read_write_lock = threading.Lock() -  # TODO ------------------------------------------------------------  # [ ] Add linked-from info.  # [ ] Delete incoming mail only after successful write!  # [ ] Remove UID from syncable db. Store only those indexes locally. -# [ ] Send patch to twisted for bug in imap4.py:5717  (content-type can be -#     none? lower-case?)  def lowerdict(_dict): @@ -659,10 +654,18 @@ class LeapMessage(fields, MailParser, MBoxParser):          Return the document that keeps the flags for this          message.          """ -        flag_docs = self._soledad.get_from_index( -            fields.TYPE_MBOX_UID_IDX, -            fields.TYPE_FLAGS_VAL, self._mbox, str(self._uid)) -        return first(flag_docs) +        result = {} +        try: +            flag_docs = self._soledad.get_from_index( +                fields.TYPE_MBOX_UID_IDX, +                fields.TYPE_FLAGS_VAL, self._mbox, str(self._uid)) +            result = first(flag_docs) +        except Exception as exc: +            # ugh! Something's broken down there! +            logger.warning("ERROR while getting flags for UID: %s" % self._uid) +            logger.exception(exc) +        finally: +            return result      def _get_headers_doc(self):          """ @@ -772,6 +775,51 @@ class LeapMessage(fields, MailParser, MBoxParser):          return self._fdoc is not None +class ContentDedup(object): +    """ +    Message deduplication. + +    We do a query for the content hashes before writing to our beloved +    sqlcipher backend of Soledad. This means, by now, that: + +    1. We will not store the same attachment twice, only the hash of it. +    2. We will not store the same message body twice, only the hash of it. + +    The first case is useful if you are always receiving the same old memes +    from unwary friends that still have not discovered that 4chan is the +    generator of the internet. The second will save your day if you have +    initiated session with the same account in two different machines. I also +    wonder why would you do that, but let's respect each other choices, like +    with the religious celebrations, and assume that one day we'll be able +    to run Bitmask in completely free phones. Yes, I mean that, the whole GSM +    Stack. +    """ + +    def _content_does_exist(self, doc): +        """ +        Check whether we already have a content document for a payload +        with this hash in our database. + +        :param doc: tentative body document +        :type doc: dict +        :returns: True if that happens, False otherwise. +        """ +        if not doc: +            return False +        phash = doc[fields.PAYLOAD_HASH_KEY] +        attach_docs = self._soledad.get_from_index( +            fields.TYPE_P_HASH_IDX, +            fields.TYPE_CONTENT_VAL, str(phash)) +        if not attach_docs: +            return False + +        if len(attach_docs) != 1: +            logger.warning("Found more than one copy of phash %s!" +                           % (phash,)) +        logger.debug("Found attachment doc with that hash! Skipping save!") +        return True + +  SoledadWriterPayload = namedtuple(      'SoledadWriterPayload', ['mode', 'payload']) @@ -783,6 +831,13 @@ SoledadWriterPayload.PUT = 2  SoledadWriterPayload.CONTENT_CREATE = 3 +""" +SoledadDocWriter was used to avoid writing to the db from multiple threads. +Its use here has been deprecated in favor of a local rw_lock in the client. +But we might want to reuse in in the near future to implement priority queues. +""" + +  class SoledadDocWriter(object):      """      This writer will create docs serially in the local soledad database. @@ -854,51 +909,9 @@ class SoledadDocWriter(object):              empty = queue.empty() -    """ -    Message deduplication. -    We do a query for the content hashes before writing to our beloved -    sqlcipher backend of Soledad. This means, by now, that: - -    1. We will not store the same attachment twice, only the hash of it. -    2. We will not store the same message body twice, only the hash of it. - -    The first case is useful if you are always receiving the same old memes -    from unwary friends that still have not discovered that 4chan is the -    generator of the internet. The second will save your day if you have -    initiated session with the same account in two different machines. I also -    wonder why would you do that, but let's respect each other choices, like -    with the religious celebrations, and assume that one day we'll be able -    to run Bitmask in completely free phones. Yes, I mean that, the whole GSM -    Stack. -    """ - -    def _content_does_exist(self, doc): -        """ -        Check whether we already have a content document for a payload -        with this hash in our database. - -        :param doc: tentative body document -        :type doc: dict -        :returns: True if that happens, False otherwise. -        """ -        if not doc: -            return False -        phash = doc[fields.PAYLOAD_HASH_KEY] -        attach_docs = self._soledad.get_from_index( -            fields.TYPE_P_HASH_IDX, -            fields.TYPE_CONTENT_VAL, str(phash)) -        if not attach_docs: -            return False - -        if len(attach_docs) != 1: -            logger.warning("Found more than one copy of phash %s!" -                           % (phash,)) -        logger.debug("Found attachment doc with that hash! Skipping save!") -        return True - - -class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): +class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, +                        ContentDedup):      """      A collection of messages, surprisingly. @@ -1147,24 +1160,21 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):              hd[key] = parts_map[key]          del parts_map -        docs = [fd, hd] -        cdocs = walk.get_raw_docs(msg, parts) -          # Saving -        logger.debug('enqueuing message docs for write') -        ptuple = SoledadWriterPayload -        with read_write_lock: -            # first, regular docs: flags and headers -            for doc in docs: -                self.soledad_writer.put(ptuple( -                    mode=ptuple.CREATE, payload=doc)) +        # first, regular docs: flags and headers +        self._soledad.create_doc(fd) -            # and last, but not least, try to create -            # content docs if not already there. -            for cd in cdocs: -                self.soledad_writer.put(ptuple( -                    mode=ptuple.CONTENT_CREATE, payload=cd)) +        # XXX should check for content duplication on headers too +        # but with chash. !!! +        self._soledad.create_doc(hd) + +        # and last, but not least, try to create +        # content docs if not already there. +        cdocs = walk.get_raw_docs(msg, parts) +        for cdoc in cdocs: +            if not self._content_does_exist(cdoc): +                self._soledad.create_doc(cdoc)      def _remove_cb(self, result):          return result @@ -1219,21 +1229,20 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):      def _get_uid_from_msgidCb(self, msgid):          hdoc = None -        with read_write_lock: -            try: -                query = self._soledad.get_from_index( -                    fields.TYPE_MSGID_IDX, -                    fields.TYPE_HEADERS_VAL, msgid) -                if query: -                    if len(query) > 1: -                        logger.warning( -                            "More than one hdoc found for this msgid, " -                            "we got a duplicate!!") -                        # XXX we could take action, like trigger a background -                        # process to kill dupes. -                    hdoc = query.pop() -            except Exception as exc: -                logger.exception("Unhandled error %r" % exc) +        try: +            query = self._soledad.get_from_index( +                fields.TYPE_MSGID_IDX, +                fields.TYPE_HEADERS_VAL, msgid) +            if query: +                if len(query) > 1: +                    logger.warning( +                        "More than one hdoc found for this msgid, " +                        "we got a duplicate!!") +                    # XXX we could take action, like trigger a background +                    # process to kill dupes. +                hdoc = query.pop() +        except Exception as exc: +            logger.exception("Unhandled error %r" % exc)          if hdoc is None:              logger.warning("Could not find hdoc for msgid %s" @@ -1316,17 +1325,30 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):          # XXX FIXINDEX -- should implement order by in soledad          return sorted(all_docs, key=lambda item: item.content['uid']) -    def all_msg_iter(self): +    def all_uid_iter(self):          """          Return an iterator trhough the UIDs of all messages, sorted in          ascending order.          """ +        # XXX we should get this from the uid table, local-only          all_uids = (doc.content[self.UID_KEY] for doc in                      self._soledad.get_from_index(                          fields.TYPE_MBOX_IDX,                          fields.TYPE_FLAGS_VAL, self.mbox))          return (u for u in sorted(all_uids)) +    def all_flags(self): +        """ +        Return a dict with all flags documents for this mailbox. +        """ +        all_flags = dict((( +            doc.content[self.UID_KEY], +            doc.content[self.FLAGS_KEY]) for doc in +            self._soledad.get_from_index( +                fields.TYPE_MBOX_IDX, +                fields.TYPE_FLAGS_VAL, self.mbox))) +        return all_flags +      def count(self):          """          Return the count of messages for this mailbox. @@ -1451,7 +1473,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):          :rtype: iterable          """          return (LeapMessage(self._soledad, docuid, self.mbox) -                for docuid in self.all_msg_iter()) +                for docuid in self.all_uid_iter())      def __repr__(self):          """ | 
