From ac43a4fff07950fa16a2e8f6c4948bc78f1af3c5 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Wed, 29 Jan 2014 00:54:20 -0400 Subject: Fix copy and deletion problems * reorganize and simplify STORE command processing * add the notification after the processing of the whole sequence --- mail/src/leap/mail/imap/mailbox.py | 24 +---- mail/src/leap/mail/imap/memorystore.py | 20 +++- mail/src/leap/mail/imap/messages.py | 156 ++++++++++++++++++-------------- mail/src/leap/mail/imap/server.py | 26 +++--- mail/src/leap/mail/imap/soledadstore.py | 5 +- 5 files changed, 118 insertions(+), 113 deletions(-) diff --git a/mail/src/leap/mail/imap/mailbox.py b/mail/src/leap/mail/imap/mailbox.py index a0eb0a97..3a6937f8 100644 --- a/mail/src/leap/mail/imap/mailbox.py +++ b/mail/src/leap/mail/imap/mailbox.py @@ -654,7 +654,6 @@ class SoledadMailbox(WithMsgFields, MBoxParser): unseen = self.getUnseenCount() leap_events.signal(IMAP_UNREAD_MAIL, str(unseen)) - @deferred def store(self, messages_asked, flags, mode, uid): """ Sets the flags of one or more messages. @@ -697,28 +696,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser): log.msg('read only mailbox!') raise imap4.ReadOnlyMailbox - result = {} - for msg_id in seq_messg: - log.msg("MSG ID = %s" % msg_id) - msg = self.messages.get_msg_by_uid(msg_id) - if not msg: - continue - # We duplicate the set operations here - # to return the result because it's less costly than - # retrieving the flags again. - newflags = set(msg.getFlags()) - - if mode == 1: - msg.addFlags(flags) - newflags = newflags.union(set(flags)) - elif mode == -1: - msg.removeFlags(flags) - newflags.difference_update(flags) - elif mode == 0: - msg.setFlags(flags) - newflags = set(flags) - result[msg_id] = newflags - return result + return self.messages.set_flags(self.mbox, seq_messg, flags, mode) # ISearchableMailbox diff --git a/mail/src/leap/mail/imap/memorystore.py b/mail/src/leap/mail/imap/memorystore.py index 2d60b13f..fac66adb 100644 --- a/mail/src/leap/mail/imap/memorystore.py +++ b/mail/src/leap/mail/imap/memorystore.py @@ -357,7 +357,7 @@ class MemoryStore(object): doc_id = fdoc.doc_id return doc_id - def get_message(self, mbox, uid): + def get_message(self, mbox, uid, flags_only=False): """ Get a MessageWrapper for the given mbox and uid combination. @@ -365,17 +365,27 @@ class MemoryStore(object): :type mbox: str or unicode :param uid: the message UID :type uid: int + :param flags_only: whether the message should carry only a reference + to the flags document. + :type flags_only: bool :return: MessageWrapper or None """ key = mbox, uid + FDOC = MessagePartType.fdoc.key + msg_dict = self._msg_store.get(key, None) if empty(msg_dict): return None new, dirty = self._get_new_dirty_state(key) - return MessageWrapper(from_dict=msg_dict, - new=new, dirty=dirty, - memstore=weakref.proxy(self)) + if flags_only: + return MessageWrapper(fdoc=msg_dict[FDOC], + new=new, dirty=dirty, + memstore=weakref.proxy(self)) + else: + return MessageWrapper(from_dict=msg_dict, + new=new, dirty=dirty, + memstore=weakref.proxy(self)) def remove_message(self, mbox, uid): """ @@ -590,7 +600,7 @@ class MemoryStore(object): if fdoc and fields.DELETED_FLAG in fdoc[fields.FLAGS_KEY]: return None - uid = fdoc.content[fields.UID_KEY] + uid = fdoc[fields.UID_KEY] key = mbox, uid new = key in self._new dirty = key in self._dirty diff --git a/mail/src/leap/mail/imap/messages.py b/mail/src/leap/mail/imap/messages.py index 315cdda6..5770868b 100644 --- a/mail/src/leap/mail/imap/messages.py +++ b/mail/src/leap/mail/imap/messages.py @@ -20,7 +20,6 @@ LeapMessage and MessageCollection. import copy import logging import re -import time import threading import StringIO @@ -97,11 +96,13 @@ class LeapMessage(fields, MailParser, MBoxParser): """ # TODO this has to change. - # Should index primarily by chash, and keep a local-lonly + # Should index primarily by chash, and keep a local-only # UID table. implements(imap4.IMessage) + flags_lock = threading.Lock() + def __init__(self, soledad, uid, mbox, collection=None, container=None): """ Initializes a LeapMessage. @@ -111,7 +112,7 @@ class LeapMessage(fields, MailParser, MBoxParser): :param uid: the UID for the message. :type uid: int or basestring :param mbox: the mbox this message belongs to - :type mbox: basestring + :type mbox: str or unicode :param collection: a reference to the parent collection object :type collection: MessageCollection :param container: a IMessageContainer implementor instance @@ -216,23 +217,17 @@ class LeapMessage(fields, MailParser, MBoxParser): flags = map(str, flags) return tuple(flags) - # setFlags, addFlags, removeFlags are not in the interface spec - # but we use them with store command. + # setFlags not in the interface spec but we use it with store command. - def setFlags(self, flags): + def setFlags(self, flags, mode): """ Sets the flags for this message - Returns a SoledadDocument that needs to be updated by the caller. - :param flags: the flags to update in the message. :type flags: tuple of str - - :return: a SoledadDocument instance - :rtype: SoledadDocument + :param mode: the mode for setting. 1 is append, -1 is remove, 0 set. + :type mode: int """ - # XXX Move logic to memory store ... - leap_assert(isinstance(flags, tuple), "flags need to be a tuple") log.msg('setting flags: %s (%s)' % (self._uid, flags)) @@ -242,51 +237,36 @@ class LeapMessage(fields, MailParser, MBoxParser): "Could not find FDOC for %s:%s while setting flags!" % (self._mbox, self._uid)) return - doc.content[self.FLAGS_KEY] = flags - doc.content[self.SEEN_KEY] = self.SEEN_FLAG in flags - doc.content[self.DEL_KEY] = self.DELETED_FLAG in flags - - if self._collection.memstore is not None: - log.msg("putting message in collection") - self._collection.memstore.put_message( - self._mbox, self._uid, - MessageWrapper(fdoc=doc.content, new=False, dirty=True, - docs_id={'fdoc': doc.doc_id})) - else: - # fallback for non-memstore initializations. - self._soledad.put_doc(doc) - - def addFlags(self, flags): - """ - Adds flags to this message. - - Returns a SoledadDocument that needs to be updated by the caller. - - :param flags: the flags to add to the message. - :type flags: tuple of str - - :return: a SoledadDocument instance - :rtype: SoledadDocument - """ - leap_assert(isinstance(flags, tuple), "flags need to be a tuple") - oldflags = self.getFlags() - self.setFlags(tuple(set(flags + oldflags))) - - def removeFlags(self, flags): - """ - Remove flags from this message. - - Returns a SoledadDocument that needs to be updated by the caller. - :param flags: the flags to be removed from the message. - :type flags: tuple of str - - :return: a SoledadDocument instance - :rtype: SoledadDocument - """ - leap_assert(isinstance(flags, tuple), "flags need to be a tuple") - oldflags = self.getFlags() - self.setFlags(tuple(set(oldflags) - set(flags))) + APPEND = 1 + REMOVE = -1 + SET = 0 + + with self.flags_lock: + current = doc.content[self.FLAGS_KEY] + if mode == APPEND: + newflags = tuple(set(tuple(current) + flags)) + elif mode == REMOVE: + newflags = tuple(set(current).difference(set(flags))) + elif mode == SET: + newflags = flags + + # We could defer this, but I think it's better + # to put it under the lock... + doc.content[self.FLAGS_KEY] = newflags + doc.content[self.SEEN_KEY] = self.SEEN_FLAG in flags + doc.content[self.DEL_KEY] = self.DELETED_FLAG in flags + + if self._collection.memstore is not None: + log.msg("putting message in collection") + self._collection.memstore.put_message( + self._mbox, self._uid, + MessageWrapper(fdoc=doc.content, new=False, dirty=True, + docs_id={'fdoc': doc.doc_id})) + else: + # fallback for non-memstore initializations. + self._soledad.put_doc(doc) + return map(str, newflags) def getInternalDate(self): """ @@ -1022,6 +1002,9 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): def unset_recent_flags(self, uids): """ Unset Recent flag for a sequence of uids. + + :param uids: the uids to unset + :type uid: sequence """ with self._rdoc_property_lock: self.recent_flags.difference_update( @@ -1032,6 +1015,9 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): def unset_recent_flag(self, uid): """ Unset Recent flag for a given uid. + + :param uid: the uid to unset + :type uid: int """ with self._rdoc_property_lock: self.recent_flags.difference_update( @@ -1040,6 +1026,9 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): def set_recent_flag(self, uid): """ Set Recent flag for a given uid. + + :param uid: the uid to set + :type uid: int """ with self._rdoc_property_lock: self.recent_flags = self.recent_flags.union( @@ -1099,31 +1088,64 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): # and we cannot find it otherwise. This seems to be enough. # XXX do a deferLater instead ?? - # FIXME this won't be needed after the CHECK command is implemented. - time.sleep(0.3) + # XXX is this working? return self._get_uid_from_msgidCb(msgid) + def set_flags(self, mbox, messages, flags, mode): + """ + Set flags for a sequence of messages. + + :param mbox: the mbox this message belongs to + :type mbox: str or unicode + :param messages: the messages to iterate through + :type messages: sequence + :flags: the flags to be set + :type flags: tuple + :param mode: the mode for setting. 1 is append, -1 is remove, 0 set. + :type mode: int + """ + result = {} + for msg_id in messages: + log.msg("MSG ID = %s" % msg_id) + msg = self.get_msg_by_uid(msg_id, mem_only=True, flags_only=True) + if not msg: + continue + result[msg_id] = msg.setFlags(flags, mode) + + return result + # getters: generic for a mailbox - def get_msg_by_uid(self, uid): + def get_msg_by_uid(self, uid, mem_only=False, flags_only=False): """ Retrieves a LeapMessage by UID. This is used primarity in the Mailbox fetch and store methods. :param uid: the message uid to query by :type uid: int + :param mem_only: a flag that indicates whether this Message should + pass a reference to soledad to retrieve missing pieces + or not. + :type mem_only: bool + :param flags_only: whether the message should carry only a reference + to the flags document. + :type flags_only: bool :return: A LeapMessage instance matching the query, or None if not found. :rtype: LeapMessage """ - msg_container = self.memstore.get_message(self.mbox, uid) + msg_container = self.memstore.get_message(self.mbox, uid, flags_only) if msg_container is not None: - # We pass a reference to soledad just to be able to retrieve - # missing parts that cannot be found in the container, like - # the content docs after a copy. - msg = LeapMessage(self._soledad, uid, self.mbox, collection=self, - container=msg_container) + if mem_only: + msg = LeapMessage(None, uid, self.mbox, collection=self, + container=msg_container) + else: + # We pass a reference to soledad just to be able to retrieve + # missing parts that cannot be found in the container, like + # the content docs after a copy. + msg = LeapMessage(self._soledad, uid, self.mbox, + collection=self, container=msg_container) else: msg = LeapMessage(self._soledad, uid, self.mbox, collection=self) if not msg.does_exist(): @@ -1159,7 +1181,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): def all_uid_iter(self): """ - Return an iterator trhough the UIDs of all messages, sorted in + Return an iterator through the UIDs of all messages, sorted in ascending order. """ # XXX we should get this from the uid table, local-only diff --git a/mail/src/leap/mail/imap/server.py b/mail/src/leap/mail/imap/server.py index b77678a4..7bca39db 100644 --- a/mail/src/leap/mail/imap/server.py +++ b/mail/src/leap/mail/imap/server.py @@ -99,10 +99,9 @@ class LeapIMAPServer(imap4.IMAP4Server): Overwritten fetch dispatcher to use the fast fetch_flags method """ - from twisted.internet import reactor if not query: self.sendPositiveResponse(tag, 'FETCH complete') - return # XXX ??? + return cbFetch = self._IMAP4Server__cbFetch ebFetch = self._IMAP4Server__ebFetch @@ -131,16 +130,19 @@ class LeapIMAPServer(imap4.IMAP4Server): ).addCallback( cbFetch, tag, query, uid ).addErrback( - ebFetch, tag) - - # XXX should be a callback - deferLater(reactor, - 2, self.mbox.unset_recent_flags, messages) - deferLater(reactor, 1, self.mbox.signal_unread_to_ui) + ebFetch, tag + ).addCallback( + self.on_fetch_finished, messages) select_FETCH = (do_FETCH, imap4.IMAP4Server.arg_seqset, imap4.IMAP4Server.arg_fetchatt) + def on_fetch_finished(self, _, messages): + from twisted.internet import reactor + deferLater(reactor, 0, self.notifyNew) + deferLater(reactor, 0, self.mbox.unset_recent_flags, messages) + deferLater(reactor, 0, self.mbox.signal_unread_to_ui) + def on_copy_finished(self, defers): d = defer.gatherResults(filter(None, defers)) d.addCallback(self.notifyNew) @@ -156,7 +158,7 @@ class LeapIMAPServer(imap4.IMAP4Server): select_COPY = (do_COPY, imap4.IMAP4Server.arg_seqset, imap4.IMAP4Server.arg_astring) - def notifyNew(self, ignored): + def notifyNew(self, ignored=None): """ Notify new messages to listeners. """ @@ -203,10 +205,4 @@ class LeapIMAPServer(imap4.IMAP4Server): """ # TODO return the output of _memstore.is_writing # XXX and that should return a deferred! - - # XXX fake a delayed operation, to debug problem with messages getting - # back to the source mailbox... - print "faking checkpoint..." - import time - time.sleep(5) return None diff --git a/mail/src/leap/mail/imap/soledadstore.py b/mail/src/leap/mail/imap/soledadstore.py index f64ed233..ae5c583e 100644 --- a/mail/src/leap/mail/imap/soledadstore.py +++ b/mail/src/leap/mail/imap/soledadstore.py @@ -26,6 +26,7 @@ from u1db import errors as u1db_errors from zope.interface import implements from leap.common.check import leap_assert_type +from leap.mail.decorators import deferred from leap.mail.imap.messageparts import MessagePartType from leap.mail.imap.messageparts import MessageWrapper from leap.mail.imap.messageparts import RecentFlagsDoc @@ -191,6 +192,7 @@ class SoledadStore(ContentDedup): # IMessageConsumer + @deferred def consume(self, queue): """ Creates a new document in soledad db. @@ -297,9 +299,6 @@ class SoledadStore(ContentDedup): # item is expected to be a MessagePartDoc for item in msg_wrapper.walk(): if item.part == MessagePartType.fdoc: - - # FIXME add content duplication for HEADERS too! - # (only 1 chash per mailbox!) yield dict(item.content), call elif item.part == MessagePartType.hdoc: -- cgit v1.2.3