From 57b276b6651a5634f025e8ab99f2bdac24b8b336 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Sun, 2 Feb 2014 09:26:37 -0400 Subject: fix missing content after in-memory add because THE KEYS WILL BE STRINGS AFTER ADDED TO SOLEDAD Can I remember that? * Fix copy from local folders * Fix copy when we already have a copy of the message in the inbox, marked as deleted. * Fix also bad deferred.succeed in add_msg when it already exist. --- mail/src/leap/mail/imap/mailbox.py | 5 +- mail/src/leap/mail/imap/memorystore.py | 6 ++- mail/src/leap/mail/imap/messageparts.py | 12 +++-- mail/src/leap/mail/imap/messages.py | 88 ++++++++++++++++++--------------- mail/src/leap/mail/imap/server.py | 13 ++++- mail/src/leap/mail/utils.py | 38 ++++++++++++++ 6 files changed, 110 insertions(+), 52 deletions(-) (limited to 'mail/src') diff --git a/mail/src/leap/mail/imap/mailbox.py b/mail/src/leap/mail/imap/mailbox.py index 79fb4767..688f941d 100644 --- a/mail/src/leap/mail/imap/mailbox.py +++ b/mail/src/leap/mail/imap/mailbox.py @@ -162,6 +162,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser): """ if not NOTIFY_NEW: return + logger.debug('adding mailbox listener: %s' % listener) self.listeners.add(listener) @@ -801,7 +802,6 @@ class SoledadMailbox(WithMsgFields, MBoxParser): from twisted.internet import reactor print "COPY :", message d = defer.Deferred() - # XXX this should not happen ... track it down, # probably to FETCH... if message is None: @@ -810,7 +810,6 @@ class SoledadMailbox(WithMsgFields, MBoxParser): deferLater(reactor, 0, self._do_copy, message, d) return d - #@profile def _do_copy(self, message, observer): """ Call invoked from the deferLater in `copy`. This will @@ -851,7 +850,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser): logger.warning("Destination message already exists!") # XXX I'm still not clear if we should raise the - # callback. This actually rases an ugly warning + # errback. This actually rases an ugly warning # in some muas like thunderbird. I guess the user does # not deserve that. #observer.errback(MessageCopyError("Already exists!")) diff --git a/mail/src/leap/mail/imap/memorystore.py b/mail/src/leap/mail/imap/memorystore.py index 211d2826..542e227c 100644 --- a/mail/src/leap/mail/imap/memorystore.py +++ b/mail/src/leap/mail/imap/memorystore.py @@ -318,7 +318,7 @@ class MemoryStore(object): store[FDOC]) hdoc = msg_dict.get(HDOC, None) - if hdoc: + if hdoc is not None: if not store.get(HDOC, None): store[HDOC] = ReferenciableDict({}) store[HDOC].update(hdoc) @@ -438,7 +438,8 @@ class MemoryStore(object): if not self.producer.is_queue_empty(): return - logger.info("Writing messages to Soledad...") + if any(map(lambda i: not empty(i), (self._new, self._dirty))): + logger.info("Writing messages to Soledad...") # TODO change for lock, and make the property access # is accquired @@ -885,6 +886,7 @@ class MemoryStore(object): # TODO expunge should add itself as a callback to the ongoing # writes. soledad_store = self._permanent_store + all_deleted = [] try: # 1. Stop the writing call diff --git a/mail/src/leap/mail/imap/messageparts.py b/mail/src/leap/mail/imap/messageparts.py index 5067263b..b07681bf 100644 --- a/mail/src/leap/mail/imap/messageparts.py +++ b/mail/src/leap/mail/imap/messageparts.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # messageparts.py # Copyright (C) 2014 LEAP # @@ -315,6 +314,7 @@ class MessageWrapper(object): fdoc, hdoc, cdocs = map( lambda part: msg_dict.get(part, None), [self.FDOC, self.HDOC, self.CDOCS]) + for t, doc in ((self.FDOC, fdoc), (self.HDOC, hdoc), (self.CDOCS, cdocs)): self._dict[t] = ReferenciableDict(doc) if doc else None @@ -390,8 +390,10 @@ class MessagePart(object): first_part = pmap.get('1', None) if not empty(first_part): phash = first_part['phash'] + else: + phash = None - if not phash: + if phash is None: logger.warning("Could not find phash for this subpart!") payload = "" else: @@ -435,11 +437,13 @@ class MessagePart(object): fields.TYPE_CONTENT_VAL, str(phash)) cdoc = first(cdocs) - if not cdoc: + if cdoc is None: logger.warning( "Could not find the content doc " "for phash %s" % (phash,)) - payload = cdoc.content.get(fields.RAW_KEY, "") + payload = "" + else: + payload = cdoc.content.get(fields.RAW_KEY, "") return payload # TODO should memory-bound this memoize!!! diff --git a/mail/src/leap/mail/imap/messages.py b/mail/src/leap/mail/imap/messages.py index 4a07ef77..6f822dbb 100644 --- a/mail/src/leap/mail/imap/messages.py +++ b/mail/src/leap/mail/imap/messages.py @@ -37,6 +37,7 @@ from leap.common.decorators import memoized_method from leap.common.mail import get_email_charset from leap.mail import walk from leap.mail.utils import first, find_charset, lowerdict, empty +from leap.mail.utils import stringify_parts_map from leap.mail.decorators import deferred_to_thread from leap.mail.imap.index import IndexedDB from leap.mail.imap.fields import fields, WithMsgFields @@ -219,7 +220,6 @@ class LeapMessage(fields, MailParser, MBoxParser): # setFlags not in the interface spec but we use it with store command. - #@profile def setFlags(self, flags, mode): """ Sets the flags for this message @@ -243,30 +243,30 @@ class LeapMessage(fields, MailParser, MBoxParser): REMOVE = -1 SET = 0 - #with self.flags_lock: - current = doc.content[self.FLAGS_KEY] - if mode == APPEND: - newflags = tuple(set(tuple(current) + flags)) - elif mode == REMOVE: - newflags = tuple(set(current).difference(set(flags))) - elif mode == SET: - newflags = flags - - # We could defer this, but I think it's better - # to put it under the lock... - doc.content[self.FLAGS_KEY] = newflags - doc.content[self.SEEN_KEY] = self.SEEN_FLAG in flags - doc.content[self.DEL_KEY] = self.DELETED_FLAG in flags - - if self._collection.memstore is not None: - log.msg("putting message in collection") - self._collection.memstore.put_message( - self._mbox, self._uid, - MessageWrapper(fdoc=doc.content, new=False, dirty=True, - docs_id={'fdoc': doc.doc_id})) - else: - # fallback for non-memstore initializations. - self._soledad.put_doc(doc) + with self.flags_lock: + current = doc.content[self.FLAGS_KEY] + if mode == APPEND: + newflags = tuple(set(tuple(current) + flags)) + elif mode == REMOVE: + newflags = tuple(set(current).difference(set(flags))) + elif mode == SET: + newflags = flags + + # We could defer this, but I think it's better + # to put it under the lock... + doc.content[self.FLAGS_KEY] = newflags + doc.content[self.SEEN_KEY] = self.SEEN_FLAG in flags + doc.content[self.DEL_KEY] = self.DELETED_FLAG in flags + + if self._collection.memstore is not None: + log.msg("putting message in collection") + self._collection.memstore.put_message( + self._mbox, self._uid, + MessageWrapper(fdoc=doc.content, new=False, dirty=True, + docs_id={'fdoc': doc.doc_id})) + else: + # fallback for non-memstore initializations. + self._soledad.put_doc(doc) return map(str, newflags) def getInternalDate(self): @@ -483,6 +483,9 @@ class LeapMessage(fields, MailParser, MBoxParser): hdoc_content = self._hdoc.content pmap = hdoc_content.get(fields.PARTS_MAP_KEY, {}) + + # remember, lads, soledad is using strings in its keys, + # not integers! return pmap[str(part)] # XXX moved to memory store @@ -534,10 +537,10 @@ class LeapMessage(fields, MailParser, MBoxParser): if self._container is not None: bdoc = self._container.memstore.get_cdoc_from_phash(body_phash) - if bdoc and bdoc.content is not None: + if not empty(bdoc) and not empty(bdoc.content): return bdoc - # no memstore or no doc found there + # no memstore, or no body doc found there if self._soledad: body_docs = self._soledad.get_from_index( fields.TYPE_P_HASH_IDX, @@ -847,7 +850,6 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): else: return False - #@profile def add_msg(self, raw, subject=None, flags=None, date=None, uid=None, notify_on_disk=False): """ @@ -881,7 +883,8 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): self._do_add_msg(raw, flags, subject, date, notify_on_disk, d) return d - @deferred_to_thread + # We SHOULD defer this (or the heavy load here) to the thread pool, + # but it gives troubles with the QSocketNotifier used by Qt... def _do_add_msg(self, raw, flags, subject, date, notify_on_disk, observer): """ Helper that creates a new message document. @@ -907,9 +910,19 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): # So we probably should just do an in-memory check and # move the complete check to the soledad writer? # Watch out! We're reserving a UID right after this! - if self._fdoc_already_exists(chash): - logger.warning("We already have that message in this mailbox.") - return defer.succeed('already_exists') + existing_uid = self._fdoc_already_exists(chash) + if existing_uid: + logger.warning("We already have that message in this " + "mailbox, unflagging as deleted") + uid = existing_uid + msg = self.get_msg_by_uid(uid) + msg.setFlags((fields.DELETED_FLAG,), -1) + + # XXX if this is deferred to thread again we should not use + # the callback in the deferred thread, but return and + # call the callback from the caller fun... + observer.callback(uid) + return uid = self.memstore.increment_last_soledad_uid(self.mbox) logger.info("ADDING MSG WITH UID: %s" % uid) @@ -929,17 +942,15 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): hd[key] = parts_map[key] del parts_map + hd = stringify_parts_map(hd) + # The MessageContainer expects a dict, one-indexed # XXX review-me cdocs = dict(((key + 1, doc) for key, doc in enumerate(walk.get_raw_docs(msg, parts)))) self.set_recent_flag(uid) - - # TODO ---- add reference to original doc, to be deleted - # after writes are done. msg_container = MessageWrapper(fd, hd, cdocs) - self.memstore.create_message(self.mbox, uid, msg_container, observer=observer, notify_on_disk=notify_on_disk) @@ -950,7 +961,6 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): # recent flags - #@profile def _get_recent_flags(self): """ An accessor for the recent-flags set for this mailbox. @@ -1004,7 +1014,6 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): doc="Set of UIDs with the recent flag for this mailbox.") # XXX change naming, indicate soledad query. - #@profile def _get_recent_doc(self): """ Get recent-flags document from Soledad for this mailbox. @@ -1114,7 +1123,6 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): # XXX is this working? return self._get_uid_from_msgidCb(msgid) - #@profile def set_flags(self, mbox, messages, flags, mode, observer): """ Set flags for a sequence of messages. @@ -1220,7 +1228,6 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): # FIXME ---------------------------------------------- return sorted(all_docs, key=lambda item: item.content['uid']) - #@profile def all_soledad_uid_iter(self): """ Return an iterator through the UIDs of all messages, sorted in @@ -1232,7 +1239,6 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): fields.TYPE_FLAGS_VAL, self.mbox)]) return db_uids - #@profile def all_uid_iter(self): """ Return an iterator through the UIDs of all messages, from memory. diff --git a/mail/src/leap/mail/imap/server.py b/mail/src/leap/mail/imap/server.py index 7bca39db..ba63846b 100644 --- a/mail/src/leap/mail/imap/server.py +++ b/mail/src/leap/mail/imap/server.py @@ -139,14 +139,22 @@ class LeapIMAPServer(imap4.IMAP4Server): def on_fetch_finished(self, _, messages): from twisted.internet import reactor + + print "FETCH FINISHED -- NOTIFY NEW" deferLater(reactor, 0, self.notifyNew) deferLater(reactor, 0, self.mbox.unset_recent_flags, messages) deferLater(reactor, 0, self.mbox.signal_unread_to_ui) def on_copy_finished(self, defers): d = defer.gatherResults(filter(None, defers)) - d.addCallback(self.notifyNew) - d.addCallback(self.mbox.signal_unread_to_ui) + + def when_finished(result): + log.msg("COPY FINISHED") + self.notifyNew() + self.mbox.signal_unread_to_ui() + d.addCallback(when_finished) + #d.addCallback(self.notifyNew) + #d.addCallback(self.mbox.signal_unread_to_ui) def do_COPY(self, tag, messages, mailbox, uid=0): from twisted.internet import reactor @@ -162,6 +170,7 @@ class LeapIMAPServer(imap4.IMAP4Server): """ Notify new messages to listeners. """ + print "TRYING TO NOTIFY NEW" self.mbox.notify_new() def _cbSelectWork(self, mbox, cmdName, tag): diff --git a/mail/src/leap/mail/utils.py b/mail/src/leap/mail/utils.py index 6a1fcdeb..942acfb9 100644 --- a/mail/src/leap/mail/utils.py +++ b/mail/src/leap/mail/utils.py @@ -17,6 +17,7 @@ """ Mail utilities. """ +import copy import json import re import traceback @@ -92,6 +93,43 @@ def lowerdict(_dict): for key, value in _dict.items()) +PART_MAP = "part_map" + + +def _str_dict(d, k): + """ + Convert the dictionary key to string if it was a string. + + :param d: the dict + :type d: dict + :param k: the key + :type k: object + """ + if isinstance(k, int): + val = d[k] + d[str(k)] = val + del(d[k]) + + +def stringify_parts_map(d): + """ + Modify a dictionary making all the nested dicts under "part_map" keys + having strings as keys. + + :param d: the dictionary to modify + :type d: dictionary + :rtype: dictionary + """ + for k in d: + if k == PART_MAP: + pmap = d[k] + for kk in pmap.keys(): + _str_dict(d[k], kk) + for kk in pmap.keys(): + stringify_parts_map(d[k][str(kk)]) + return d + + class CustomJsonScanner(object): """ This class is a context manager definition used to monkey patch the default -- cgit v1.2.3