From 8c65f09a16e4e00452dffa7d72771d9fac21c9c0 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Tue, 20 Jan 2015 13:48:21 -0400 Subject: imap: complete FETCH implementation --- src/leap/mail/adaptors/soledad.py | 38 +++++++++++++--- src/leap/mail/imap/mailbox.py | 95 ++++++++++++++++++++++++++------------- src/leap/mail/imap/messages.py | 23 +++++++++- src/leap/mail/imap/server.py | 3 ++ src/leap/mail/mail.py | 49 ++++++++++++++++---- src/leap/mail/mailbox_indexer.py | 14 +++++- 6 files changed, 171 insertions(+), 51 deletions(-) diff --git a/src/leap/mail/adaptors/soledad.py b/src/leap/mail/adaptors/soledad.py index d99f677..46dbe4c 100644 --- a/src/leap/mail/adaptors/soledad.py +++ b/src/leap/mail/adaptors/soledad.py @@ -364,6 +364,12 @@ class FlagsDocWrapper(SoledadDocumentWrapper): self._future_doc_id = new_id self.mbox_uuid = mbox_uuid + def get_flags(self): + """ + Get the flags for this message (as a tuple of strings, not unicode). + """ + return map(str, self.flags) + class HeaderDocWrapper(SoledadDocumentWrapper): @@ -727,11 +733,6 @@ class SoledadMailAdaptor(SoledadIndexMixin): mboxwrapper_klass = MailboxWrapper - def __init__(self): - SoledadIndexMixin.__init__(self) - - mboxwrapper_klass = MailboxWrapper - def __init__(self): SoledadIndexMixin.__init__(self) @@ -792,7 +793,7 @@ class SoledadMailAdaptor(SoledadIndexMixin): fdoc, hdoc = doc_list[:3] cdocs = dict(enumerate(doc_list[3:], 1)) return self.get_msg_from_docs( - msg_class, mdoc, fdoc, hdoc, cdocs, uid=None) + msg_class, mdoc, fdoc, hdoc, cdocs, uid=uid) def get_msg_from_mdoc_id(self, MessageClass, store, mdoc_id, uid=None, get_cdocs=False): @@ -847,6 +848,30 @@ class SoledadMailAdaptor(SoledadIndexMixin): msg_class=MessageClass, uid=uid)) return d + def get_flags_from_mdoc_id(self, store, mdoc_id): + """ + # XXX stuff here... + """ + mbox = re.findall(constants.METAMSGID_MBOX_RE, mdoc_id)[0] + chash = re.findall(constants.METAMSGID_CHASH_RE, mdoc_id)[0] + + def _get_fdoc_id_from_mdoc_id(): + return constants.FDOCID.format(mbox_uuid=mbox, chash=chash) + + fdoc_id = _get_fdoc_id_from_mdoc_id() + + def wrap_fdoc(doc): + cls = FlagsDocWrapper + return cls(doc_id=doc.doc_id, **doc.content) + + def get_flags(fdoc_wrapper): + return fdoc_wrapper.get_flags() + + d = store.get_doc(fdoc_id) + d.addCallback(wrap_fdoc) + d.addCallback(get_flags) + return d + def create_msg(self, store, msg): """ :param store: an instance of soledad, or anything that behaves alike @@ -881,7 +906,6 @@ class SoledadMailAdaptor(SoledadIndexMixin): Delete all messages flagged as deleted. """ def err(f): - print "ERROR GETTING FROM INDEX" f.printTraceback() def delete_fdoc_and_mdoc_flagged(fdocs): diff --git a/src/leap/mail/imap/mailbox.py b/src/leap/mail/imap/mailbox.py index e1eb6bf..a000133 100644 --- a/src/leap/mail/imap/mailbox.py +++ b/src/leap/mail/imap/mailbox.py @@ -143,9 +143,9 @@ class IMAPMailbox(object): return self._listeners[self.mbox_name] def get_imap_message(self, message): - msg = IMAPMessage(message) - msg.store = self.collection.store - return msg + d = defer.Deferred() + IMAPMessage(message, store=self.collection.store, d=d) + return d # FIXME this grows too crazily when many instances are fired, like # during imaptest stress testing. Should have a queue of limited size @@ -468,7 +468,6 @@ class IMAPMailbox(object): raise imap4.ReadOnlyMailbox return self.collection.delete_all_flagged() - # FIXME -- get last_uid from mbox_indexer def _bound_seq(self, messages_asked): """ Put an upper bound to a messages sequence if this is open. @@ -477,16 +476,18 @@ class IMAPMailbox(object): :type messages_asked: MessageSet :rtype: MessageSet """ + def set_last(last_uid): + messages_asked.last = last_uid + return messages_asked + if not messages_asked.last: try: iter(messages_asked) except TypeError: # looks like we cannot iterate - try: - # XXX fixme, does not exist - messages_asked.last = self.last_uid - except ValueError: - pass + d = self.collection.get_last_uid() + d.addCallback(set_last) + return d return messages_asked def _filter_msg_seq(self, messages_asked): @@ -524,50 +525,64 @@ class IMAPMailbox(object): otherwise. :type uid: bool - :rtype: deferred + :rtype: deferred with a generator that yields... """ # For the moment our UID is sequential, so we # can treat them all the same. # Change this to the flag that twisted expects when we # switch to content-hash based index + local UID table. - sequence = False - # sequence = True if uid == 0 else False + is_sequence = True if uid == 0 else False getmsg = self.collection.get_message_by_uid + getimapmsg = self.get_imap_message - messages_asked = self._bound_seq(messages_asked) - d_sequence = self._filter_msg_seq(messages_asked) + def get_imap_messages_for_sequence(msg_sequence): + + def _get_imap_msg(messages): + d_imapmsg = [] + for msg in messages: + d_imapmsg.append(getimapmsg(msg)) + return defer.gatherResults(d_imapmsg) - def get_imap_messages_for_sequence(sequence): - def _zip_msgid(messages): - return zip( - list(sequence), - map(self.get_imap_message, messages)) + def _zip_msgid(imap_messages): + zipped = zip( + list(msg_sequence), imap_messages) + return (item for item in zipped) def _unset_recent(sequence): reactor.callLater(0, self.unset_recent_flags, sequence) return sequence d_msg = [] - for msgid in sequence: + for msgid in msg_sequence: d_msg.append(getmsg(msgid)) d = defer.gatherResults(d_msg) + d.addCallback(_get_imap_msg) d.addCallback(_zip_msgid) return d # for sequence numbers (uid = 0) - if sequence: + if is_sequence: logger.debug("Getting msg by index: INEFFICIENT call!") # TODO --- implement sequences in mailbox indexer raise NotImplementedError else: - d_sequence.addCallback(get_imap_messages_for_sequence) + d = self._get_sequence_of_messages(messages_asked) + d.addCallback(get_imap_messages_for_sequence) # TODO -- call signal_to_ui # d.addCallback(self.cb_signal_unread_to_ui) - return d_sequence + return d + + def _get_sequence_of_messages(self, messages_asked): + def get_sequence(messages_asked): + return self._filter_msg_seq(messages_asked) + + d = defer.maybeDeferred(self._bound_seq, messages_asked) + d.addCallback(get_sequence) + return d def fetch_flags(self, messages_asked, uid): """ @@ -611,8 +626,8 @@ class IMAPMailbox(object): :param d: deferred whose callback will be called with result. :type d: Deferred - :rtype: A tuple of two-tuples of message sequence numbers and - flagsPart + :rtype: A generator that yields two-tuples of message sequence numbers + and flagsPart """ class flagsPart(object): def __init__(self, uid, flags): @@ -625,14 +640,30 @@ class IMAPMailbox(object): def getFlags(self): return map(str, self.flags) - messages_asked = self._bound_seq(messages_asked) - seq_messg = self._filter_msg_seq(messages_asked) + def pack_flags(result): + #if result is None: + #print "No result" + #return + _uid, _flags = result + return _uid, flagsPart(_uid, _flags) - # FIXME use deferreds here - all_flags = self.collection.get_all_flags(self.mbox_name) - result = ((msgid, flagsPart( - msgid, all_flags.get(msgid, tuple()))) for msgid in seq_messg) - d.callback(result) + def get_flags_for_seq(sequence): + d_all_flags = [] + for msgid in sequence: + d_flags_per_uid = self.collection.get_flags_by_uid(msgid) + d_flags_per_uid.addCallback(pack_flags) + d_all_flags.append(d_flags_per_uid) + gotflags = defer.gatherResults(d_all_flags) + gotflags.addCallback(get_uid_flag_generator) + return gotflags + + def get_uid_flag_generator(result): + generator = (item for item in result) + d.callback(generator) + + d_seq = self._get_sequence_of_messages(messages_asked) + d_seq.addCallback(get_flags_for_seq) + return d_seq def fetch_headers(self, messages_asked, uid): """ diff --git a/src/leap/mail/imap/messages.py b/src/leap/mail/imap/messages.py index 9b00162..d4b5d1f 100644 --- a/src/leap/mail/imap/messages.py +++ b/src/leap/mail/imap/messages.py @@ -19,6 +19,7 @@ IMAPMessage and IMAPMessageCollection. """ import logging from twisted.mail import imap4 +from twisted.internet import defer from zope.interface import implements from leap.common.check import leap_assert, leap_assert_type @@ -40,11 +41,17 @@ class IMAPMessage(object): implements(imap4.IMessage) - def __init__(self, message): + def __init__(self, message, prefetch_body=True, + store=None, d=defer.Deferred()): """ Initializes a LeapMessage. """ self.message = message + self.__body_fd = None + self.store = store + if prefetch_body: + gotbody = self.__prefetch_body_file() + gotbody.addCallback(lambda _: d.callback(self)) # IMessage implementation @@ -109,14 +116,26 @@ class IMAPMessage(object): # # IMessagePart # + def __prefetch_body_file(self): + def assign_body_fd(fd): + self.__body_fd = fd + return fd + d = self.getBodyFile() + d.addCallback(assign_body_fd) + return d def getBodyFile(self, store=None): """ Retrieve a file object containing only the body of this message. :return: file-like object opened for reading - :rtype: StringIO + :rtype: a deferred that will fire with a StringIO object. """ + if self.__body_fd is not None: + fd = self.__body_fd + fd.seek(0) + return fd + if store is None: store = self.store return self.message.get_body_file(store) diff --git a/src/leap/mail/imap/server.py b/src/leap/mail/imap/server.py index 32c921d..38a3fd4 100644 --- a/src/leap/mail/imap/server.py +++ b/src/leap/mail/imap/server.py @@ -112,6 +112,7 @@ class LEAPIMAPServer(imap4.IMAP4Server): ebFetch = self._IMAP4Server__ebFetch if len(query) == 1 and str(query[0]) == "flags": + print ">>>>>>>>> fetching flags" self._oldTimeout = self.setTimeout(None) # no need to call iter, we get a generator maybeDeferred( @@ -121,6 +122,7 @@ class LEAPIMAPServer(imap4.IMAP4Server): ).addErrback(ebFetch, tag) elif len(query) == 1 and str(query[0]) == "rfc822.header": + print ">>>>>>>> fetching headers" self._oldTimeout = self.setTimeout(None) # no need to call iter, we get a generator maybeDeferred( @@ -129,6 +131,7 @@ class LEAPIMAPServer(imap4.IMAP4Server): cbFetch, tag, query, uid ).addErrback(ebFetch, tag) else: + print ">>>>>>> Fetching other" self._oldTimeout = self.setTimeout(None) # no need to call iter, we get a generator maybeDeferred( diff --git a/src/leap/mail/mail.py b/src/leap/mail/mail.py index 8629d0e..976df5a 100644 --- a/src/leap/mail/mail.py +++ b/src/leap/mail/mail.py @@ -155,7 +155,7 @@ class Message(object): Get flags for this message. :rtype: tuple """ - return tuple(self._wrapper.fdoc.flags) + return self._wrapper.fdoc.get_flags() def get_internal_date(self): """ @@ -184,6 +184,7 @@ class Message(object): def get_body_file(self, store): """ + Get a file descriptor with the body content. """ def write_and_rewind_if_found(cdoc): if not cdoc: @@ -367,14 +368,36 @@ class MessageCollection(object): d.addCallback(get_msg_from_mdoc_id) return d - # TODO deprecate ??? --- - def _prime_count(self): - def update_count(count): - self._count = count - d = self.mbox_indexer.count(self.mbox_name) - d.addCallback(update_count) + def get_flags_by_uid(self, uid, absolute=True): + if not absolute: + raise NotImplementedError("Does not support relative ids yet") + + def get_flags_from_mdoc_id(doc_id): + if doc_id is None: # XXX needed? or bug? + return None + return self.adaptor.get_flags_from_mdoc_id( + self.store, doc_id) + + def wrap_in_tuple(flags): + return (uid, flags) + + d = self.mbox_indexer.get_doc_id_from_uid(self.mbox_uuid, uid) + d.addCallback(get_flags_from_mdoc_id) + d.addCallback(wrap_in_tuple) return d + # TODO ------------------------------ FIXME FIXME FIXME implement this! + def set_flags(self, *args, **kw): + pass + + # TODO deprecate ??? --- + #def _prime_count(self): + #def update_count(count): + #self._count = count + #d = self.mbox_indexer.count(self.mbox_name) + #d.addCallback(update_count) + #return d + def count(self): """ Count the messages in this collection. @@ -389,11 +412,13 @@ class MessageCollection(object): def count_recent(self): # FIXME HACK - return 0 + # TODO ------------------------ implement this + return 3 def count_unseen(self): # FIXME hack - return 0 + # TODO ------------------------ implement this + return 3 def get_uid_next(self): """ @@ -404,6 +429,12 @@ class MessageCollection(object): """ return self.mbox_indexer.get_next_uid(self.mbox_uuid) + def get_last_uid(self): + """ + Get the last UID for this mailbox. + """ + return self.mbox_indexer.get_last_uid(self.mbox_uuid) + def all_uid_iter(self): """ Iterator through all the uids for this collection. diff --git a/src/leap/mail/mailbox_indexer.py b/src/leap/mail/mailbox_indexer.py index 22e57d4..43a1f60 100644 --- a/src/leap/mail/mailbox_indexer.py +++ b/src/leap/mail/mailbox_indexer.py @@ -305,12 +305,24 @@ class MailboxIndexer(object): return 1 return uid + 1 + d = self.get_last_uid(mailbox_id) + d.addCallback(increment) + return d + + def get_last_uid(self, mailbox_id): + """ + Get the highest UID for a given mailbox. + """ + check_good_uuid(mailbox_id) sql = ("SELECT MAX(rowid) FROM {preffix}{name} " "LIMIT 1;").format( preffix=self.table_preffix, name=sanitize(mailbox_id)) + def getit(result): + return _maybe_first_query_item(result) + d = self._query(sql) - d.addCallback(increment) + d.addCallback(getit) return d def all_uid_iter(self, mailbox_id): -- cgit v1.2.3