From 7542de6aeb370d647840a0b78b71f69c6fdacf8e Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Tue, 20 Jan 2015 13:48:21 -0400 Subject: imap: complete FETCH implementation --- src/leap/mail/imap/mailbox.py | 95 ++++++++++++++++++++++++++++-------------- src/leap/mail/imap/messages.py | 23 +++++++++- src/leap/mail/imap/server.py | 3 ++ 3 files changed, 87 insertions(+), 34 deletions(-) (limited to 'src/leap/mail/imap') diff --git a/src/leap/mail/imap/mailbox.py b/src/leap/mail/imap/mailbox.py index e1eb6bf..a000133 100644 --- a/src/leap/mail/imap/mailbox.py +++ b/src/leap/mail/imap/mailbox.py @@ -143,9 +143,9 @@ class IMAPMailbox(object): return self._listeners[self.mbox_name] def get_imap_message(self, message): - msg = IMAPMessage(message) - msg.store = self.collection.store - return msg + d = defer.Deferred() + IMAPMessage(message, store=self.collection.store, d=d) + return d # FIXME this grows too crazily when many instances are fired, like # during imaptest stress testing. Should have a queue of limited size @@ -468,7 +468,6 @@ class IMAPMailbox(object): raise imap4.ReadOnlyMailbox return self.collection.delete_all_flagged() - # FIXME -- get last_uid from mbox_indexer def _bound_seq(self, messages_asked): """ Put an upper bound to a messages sequence if this is open. @@ -477,16 +476,18 @@ class IMAPMailbox(object): :type messages_asked: MessageSet :rtype: MessageSet """ + def set_last(last_uid): + messages_asked.last = last_uid + return messages_asked + if not messages_asked.last: try: iter(messages_asked) except TypeError: # looks like we cannot iterate - try: - # XXX fixme, does not exist - messages_asked.last = self.last_uid - except ValueError: - pass + d = self.collection.get_last_uid() + d.addCallback(set_last) + return d return messages_asked def _filter_msg_seq(self, messages_asked): @@ -524,50 +525,64 @@ class IMAPMailbox(object): otherwise. :type uid: bool - :rtype: deferred + :rtype: deferred with a generator that yields... """ # For the moment our UID is sequential, so we # can treat them all the same. # Change this to the flag that twisted expects when we # switch to content-hash based index + local UID table. - sequence = False - # sequence = True if uid == 0 else False + is_sequence = True if uid == 0 else False getmsg = self.collection.get_message_by_uid + getimapmsg = self.get_imap_message - messages_asked = self._bound_seq(messages_asked) - d_sequence = self._filter_msg_seq(messages_asked) + def get_imap_messages_for_sequence(msg_sequence): + + def _get_imap_msg(messages): + d_imapmsg = [] + for msg in messages: + d_imapmsg.append(getimapmsg(msg)) + return defer.gatherResults(d_imapmsg) - def get_imap_messages_for_sequence(sequence): - def _zip_msgid(messages): - return zip( - list(sequence), - map(self.get_imap_message, messages)) + def _zip_msgid(imap_messages): + zipped = zip( + list(msg_sequence), imap_messages) + return (item for item in zipped) def _unset_recent(sequence): reactor.callLater(0, self.unset_recent_flags, sequence) return sequence d_msg = [] - for msgid in sequence: + for msgid in msg_sequence: d_msg.append(getmsg(msgid)) d = defer.gatherResults(d_msg) + d.addCallback(_get_imap_msg) d.addCallback(_zip_msgid) return d # for sequence numbers (uid = 0) - if sequence: + if is_sequence: logger.debug("Getting msg by index: INEFFICIENT call!") # TODO --- implement sequences in mailbox indexer raise NotImplementedError else: - d_sequence.addCallback(get_imap_messages_for_sequence) + d = self._get_sequence_of_messages(messages_asked) + d.addCallback(get_imap_messages_for_sequence) # TODO -- call signal_to_ui # d.addCallback(self.cb_signal_unread_to_ui) - return d_sequence + return d + + def _get_sequence_of_messages(self, messages_asked): + def get_sequence(messages_asked): + return self._filter_msg_seq(messages_asked) + + d = defer.maybeDeferred(self._bound_seq, messages_asked) + d.addCallback(get_sequence) + return d def fetch_flags(self, messages_asked, uid): """ @@ -611,8 +626,8 @@ class IMAPMailbox(object): :param d: deferred whose callback will be called with result. :type d: Deferred - :rtype: A tuple of two-tuples of message sequence numbers and - flagsPart + :rtype: A generator that yields two-tuples of message sequence numbers + and flagsPart """ class flagsPart(object): def __init__(self, uid, flags): @@ -625,14 +640,30 @@ class IMAPMailbox(object): def getFlags(self): return map(str, self.flags) - messages_asked = self._bound_seq(messages_asked) - seq_messg = self._filter_msg_seq(messages_asked) + def pack_flags(result): + #if result is None: + #print "No result" + #return + _uid, _flags = result + return _uid, flagsPart(_uid, _flags) - # FIXME use deferreds here - all_flags = self.collection.get_all_flags(self.mbox_name) - result = ((msgid, flagsPart( - msgid, all_flags.get(msgid, tuple()))) for msgid in seq_messg) - d.callback(result) + def get_flags_for_seq(sequence): + d_all_flags = [] + for msgid in sequence: + d_flags_per_uid = self.collection.get_flags_by_uid(msgid) + d_flags_per_uid.addCallback(pack_flags) + d_all_flags.append(d_flags_per_uid) + gotflags = defer.gatherResults(d_all_flags) + gotflags.addCallback(get_uid_flag_generator) + return gotflags + + def get_uid_flag_generator(result): + generator = (item for item in result) + d.callback(generator) + + d_seq = self._get_sequence_of_messages(messages_asked) + d_seq.addCallback(get_flags_for_seq) + return d_seq def fetch_headers(self, messages_asked, uid): """ diff --git a/src/leap/mail/imap/messages.py b/src/leap/mail/imap/messages.py index 9b00162..d4b5d1f 100644 --- a/src/leap/mail/imap/messages.py +++ b/src/leap/mail/imap/messages.py @@ -19,6 +19,7 @@ IMAPMessage and IMAPMessageCollection. """ import logging from twisted.mail import imap4 +from twisted.internet import defer from zope.interface import implements from leap.common.check import leap_assert, leap_assert_type @@ -40,11 +41,17 @@ class IMAPMessage(object): implements(imap4.IMessage) - def __init__(self, message): + def __init__(self, message, prefetch_body=True, + store=None, d=defer.Deferred()): """ Initializes a LeapMessage. """ self.message = message + self.__body_fd = None + self.store = store + if prefetch_body: + gotbody = self.__prefetch_body_file() + gotbody.addCallback(lambda _: d.callback(self)) # IMessage implementation @@ -109,14 +116,26 @@ class IMAPMessage(object): # # IMessagePart # + def __prefetch_body_file(self): + def assign_body_fd(fd): + self.__body_fd = fd + return fd + d = self.getBodyFile() + d.addCallback(assign_body_fd) + return d def getBodyFile(self, store=None): """ Retrieve a file object containing only the body of this message. :return: file-like object opened for reading - :rtype: StringIO + :rtype: a deferred that will fire with a StringIO object. """ + if self.__body_fd is not None: + fd = self.__body_fd + fd.seek(0) + return fd + if store is None: store = self.store return self.message.get_body_file(store) diff --git a/src/leap/mail/imap/server.py b/src/leap/mail/imap/server.py index 32c921d..38a3fd4 100644 --- a/src/leap/mail/imap/server.py +++ b/src/leap/mail/imap/server.py @@ -112,6 +112,7 @@ class LEAPIMAPServer(imap4.IMAP4Server): ebFetch = self._IMAP4Server__ebFetch if len(query) == 1 and str(query[0]) == "flags": + print ">>>>>>>>> fetching flags" self._oldTimeout = self.setTimeout(None) # no need to call iter, we get a generator maybeDeferred( @@ -121,6 +122,7 @@ class LEAPIMAPServer(imap4.IMAP4Server): ).addErrback(ebFetch, tag) elif len(query) == 1 and str(query[0]) == "rfc822.header": + print ">>>>>>>> fetching headers" self._oldTimeout = self.setTimeout(None) # no need to call iter, we get a generator maybeDeferred( @@ -129,6 +131,7 @@ class LEAPIMAPServer(imap4.IMAP4Server): cbFetch, tag, query, uid ).addErrback(ebFetch, tag) else: + print ">>>>>>> Fetching other" self._oldTimeout = self.setTimeout(None) # no need to call iter, we get a generator maybeDeferred( -- cgit v1.2.3