diff options
Diffstat (limited to 'mail')
-rw-r--r-- | mail/changes/feature_5095_flush-data-to-disk-when-stopping | 1 | ||||
-rw-r--r-- | mail/src/leap/mail/imap/mailbox.py | 7 | ||||
-rw-r--r-- | mail/src/leap/mail/imap/memorystore.py | 63 | ||||
-rw-r--r-- | mail/src/leap/mail/imap/messageparts.py | 17 | ||||
-rw-r--r-- | mail/src/leap/mail/imap/service/imap.py | 31 | ||||
-rwxr-xr-x | mail/src/leap/mail/imap/tests/getmail | 2 | ||||
-rwxr-xr-x | mail/src/leap/mail/imap/tests/regressions | 451 | ||||
-rw-r--r-- | mail/src/leap/mail/messageflow.py | 11 |
8 files changed, 553 insertions, 30 deletions
diff --git a/mail/changes/feature_5095_flush-data-to-disk-when-stopping b/mail/changes/feature_5095_flush-data-to-disk-when-stopping new file mode 100644 index 00000000..d7c1ce7e --- /dev/null +++ b/mail/changes/feature_5095_flush-data-to-disk-when-stopping @@ -0,0 +1 @@ + o Flush IMAP data to disk when stopping. Closes #5095. diff --git a/mail/src/leap/mail/imap/mailbox.py b/mail/src/leap/mail/imap/mailbox.py index c6825786..d8af0a50 100644 --- a/mail/src/leap/mail/imap/mailbox.py +++ b/mail/src/leap/mail/imap/mailbox.py @@ -484,9 +484,6 @@ class SoledadMailbox(WithMsgFields, MBoxParser): d.addCallback(self._close_cb) return d - def _expunge_cb(self, result): - return result - def expunge(self): """ Remove all messages flagged \\Deleted @@ -494,9 +491,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser): if not self.isWriteable(): raise imap4.ReadOnlyMailbox d = defer.Deferred() - return self._memstore.expunge(self.mbox, d) - self._memstore.expunge(self.mbox) - d.addCallback(self._expunge_cb, d) + self._memstore.expunge(self.mbox, d) return d def _bound_seq(self, messages_asked): diff --git a/mail/src/leap/mail/imap/memorystore.py b/mail/src/leap/mail/imap/memorystore.py index a99148f3..ed2b3f20 100644 --- a/mail/src/leap/mail/imap/memorystore.py +++ b/mail/src/leap/mail/imap/memorystore.py @@ -236,10 +236,11 @@ class MemoryStore(object): self._add_message(mbox, uid, message, notify_on_disk) self._new.add(key) - def log_add(result): - log.msg("message save: %s" % result) - return result - observer.addCallback(log_add) + # XXX use this while debugging the callback firing, + # remove after unittesting this. + #def log_add(result): + #return result + #observer.addCallback(log_add) if notify_on_disk: # We store this deferred so we can keep track of the pending @@ -887,35 +888,54 @@ class MemoryStore(object): self.remove_message(mbox, uid) return mem_deleted + def stop_and_flush(self): + """ + Stop the write loop and trigger a write to the producer. + """ + self._stop_write_loop() + if self._permanent_store is not None: + self.write_messages(self._permanent_store) + self.producer.flush() + def expunge(self, mbox, observer): """ Remove all messages flagged \\Deleted, from the Memory Store and from the permanent store also. + It first queues up a last write, and wait for the deferreds to be done + before continuing. + :param mbox: the mailbox :type mbox: str or unicode :param observer: a deferred that will be fired when expunge is done :type observer: Deferred - :return: a list of UIDs - :rtype: list """ - # TODO expunge should add itself as a callback to the ongoing - # writes. soledad_store = self._permanent_store - all_deleted = [] - try: - # 1. Stop the writing call - self._stop_write_loop() - # 2. Enqueue a last write. - #self.write_messages(soledad_store) - # 3. Should wait on the writebacks to finish ??? - # FIXME wait for this, and add all the rest of the method - # as a callback!!! + # Stop and trigger last write + self.stop_and_flush() + # Wait on the writebacks to finish + pending_deferreds = (self._new_deferreds.get(mbox, []) + + self._dirty_deferreds.get(mbox, [])) + d1 = defer.gatherResults(pending_deferreds, consumeErrors=True) + d1.addCallback( + self._delete_from_soledad_and_memory, mbox, observer) except Exception as exc: logger.exception(exc) - # Now, we...: + def _delete_from_soledad_and_memory(self, result, mbox, observer): + """ + Remove all messages marked as deleted from soledad and memory. + + :param result: ignored. the result of the deferredList that triggers + this as a callback from `expunge`. + :param mbox: the mailbox + :type mbox: str or unicode + :param observer: a deferred that will be fired when expunge is done + :type observer: Deferred + """ + all_deleted = [] + soledad_store = self._permanent_store try: # 1. Delete all messages marked as deleted in soledad. @@ -940,8 +960,7 @@ class MemoryStore(object): logger.exception(exc) finally: self._start_write_loop() - observer.callback(True) - return all_deleted + observer.callback(all_deleted) # Dump-to-disk controls. @@ -962,6 +981,10 @@ class MemoryStore(object): # are done (gatherResults) return getattr(self, self.WRITING_FLAG) + @property + def permanent_store(self): + return self._permanent_store + # Memory management. def get_size(self): diff --git a/mail/src/leap/mail/imap/messageparts.py b/mail/src/leap/mail/imap/messageparts.py index b07681bf..b1f333a5 100644 --- a/mail/src/leap/mail/imap/messageparts.py +++ b/mail/src/leap/mail/imap/messageparts.py @@ -397,7 +397,9 @@ class MessagePart(object): logger.warning("Could not find phash for this subpart!") payload = "" else: - payload = self._get_payload_from_document(phash) + payload = self._get_payload_from_document_memoized(phash) + if empty(payload): + payload = self._get_payload_from_document(phash) else: logger.warning("Message with no part_map!") @@ -424,13 +426,24 @@ class MessagePart(object): # TODO should memory-bound this memoize!!! @memoized_method + def _get_payload_from_document_memoized(self, phash): + """ + Memoized method call around the regular method, to be able + to call the non-memoized method in case we got a None. + + :param phash: the payload hash to retrieve by. + :type phash: str or unicode + :rtype: str or unicode or None + """ + return self._get_payload_from_document(phash) + def _get_payload_from_document(self, phash): """ Return the message payload from the content document. :param phash: the payload hash to retrieve by. :type phash: str or unicode - :rtype: str or unicode + :rtype: str or unicode or None """ cdocs = self._soledad.get_from_index( fields.TYPE_P_HASH_IDX, diff --git a/mail/src/leap/mail/imap/service/imap.py b/mail/src/leap/mail/imap/service/imap.py index 5487cfc8..93df51d1 100644 --- a/mail/src/leap/mail/imap/service/imap.py +++ b/mail/src/leap/mail/imap/service/imap.py @@ -19,7 +19,9 @@ Imap service initialization """ import logging import os +import time +from twisted.internet import defer, threads from twisted.internet.protocol import ServerFactory from twisted.internet.error import CannotListenError from twisted.mail import imap4 @@ -122,6 +124,35 @@ class LeapIMAPFactory(ServerFactory): imapProtocol.factory = self return imapProtocol + def doStop(self, cv): + """ + Stops imap service (fetcher, factory and port). + + :param cv: A condition variable to which we can signal when imap + indeed stops. + :type cv: threading.Condition + :return: a Deferred that stops and flushes the in memory store data to + disk in another thread. + :rtype: Deferred + """ + ServerFactory.doStop(self) + + def _stop_imap_cb(): + logger.debug('Stopping in memory store.') + self._memstore.stop_and_flush() + while not self._memstore.producer.is_queue_empty(): + logger.debug('Waiting for queue to be empty.') + # TODO use a gatherResults over the new/dirty deferred list, + # as in memorystore's expunge() method. + time.sleep(1) + # notify that service has stopped + logger.debug('Notifying that service has stopped.') + cv.acquire() + cv.notify() + cv.release() + + return threads.deferToThread(_stop_imap_cb) + def run_service(*args, **kwargs): """ diff --git a/mail/src/leap/mail/imap/tests/getmail b/mail/src/leap/mail/imap/tests/getmail index 17e195c8..0fb00d21 100755 --- a/mail/src/leap/mail/imap/tests/getmail +++ b/mail/src/leap/mail/imap/tests/getmail @@ -5,8 +5,6 @@ # Modifications by LEAP Developers 2014 to fit # Bitmask configuration settings. - - """ Simple IMAP4 client which displays the subjects of all messages in a particular mailbox. diff --git a/mail/src/leap/mail/imap/tests/regressions b/mail/src/leap/mail/imap/tests/regressions new file mode 100755 index 00000000..0a433980 --- /dev/null +++ b/mail/src/leap/mail/imap/tests/regressions @@ -0,0 +1,451 @@ +#!/usr/bin/env python + +# -*- coding: utf-8 -*- +# regressions +# Copyright (C) 2014 LEAP +# Copyright (c) Twisted Matrix Laboratories. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Simple Regression Tests using IMAP4 client. + +Iterates trough all mails under a given folder and tries to APPEND them to +the server being tested. After FETCHING the pushed message, it compares +the received version with the one that was saved, and exits with an error +code if they do not match. +""" +import os +import StringIO +import sys + +from email.parser import Parser + +from twisted.internet import protocol +from twisted.internet import ssl +from twisted.internet import defer +from twisted.internet import stdio +from twisted.mail import imap4 +from twisted.protocols import basic +from twisted.python import log + + +REGRESSIONS_FOLDER = "regressions_test" + +parser = Parser() + + +def get_msg_parts(raw): + """ + Return a representation of the parts of a message suitable for + comparison. + + :param raw: string for the message + :type raw: str + """ + m = parser.parsestr(raw) + return [dict(part.items()) + if part.is_multipart() + else part.get_payload() + for part in m.walk()] + + +def compare_msg_parts(a, b): + """ + Compare two sequences of parts of messages. + + :param a: part sequence for message a + :param b: part sequence for message b + + :return: True if both message sequences are equivalent. + :rtype: bool + """ + # XXX This could be smarter and show the differences in the + # different parts when/where they differ. + #import pprint; pprint.pprint(a[0]) + #import pprint; pprint.pprint(b[0]) + + def lowerkey(d): + return dict((k.lower(), v.replace('\r', '')) + for k, v in d.iteritems()) + + def eq(x, y): + # For dicts, we compare a variation with their keys + # in lowercase, and \r removed from their values + if all(map(lambda i: isinstance(i, dict), (x, y))): + x, y = map(lowerkey, (x, y)) + return x == y + + compare_vector = map(lambda tup: eq(tup[0], tup[1]), zip(a, b)) + all_match = all(compare_vector) + + if not all_match: + print "PARTS MISMATCH!" + print "vector: ", compare_vector + index = compare_vector.index(False) + from pprint import pprint + print "Expected:" + pprint(a[index]) + print ("***") + print "Found:" + pprint(b[index]) + print + + + return all_match + + +def get_fd(string): + """ + Return a file descriptor with the passed string + as content. + """ + fd = StringIO.StringIO() + fd.write(string) + fd.seek(0) + return fd + + +class TrivialPrompter(basic.LineReceiver): + promptDeferred = None + + def prompt(self, msg): + assert self.promptDeferred is None + self.display(msg) + self.promptDeferred = defer.Deferred() + return self.promptDeferred + + def display(self, msg): + self.transport.write(msg) + + def lineReceived(self, line): + if self.promptDeferred is None: + return + d, self.promptDeferred = self.promptDeferred, None + d.callback(line) + + +class SimpleIMAP4Client(imap4.IMAP4Client): + """ + A client with callbacks for greeting messages from an IMAP server. + """ + greetDeferred = None + + def serverGreeting(self, caps): + self.serverCapabilities = caps + if self.greetDeferred is not None: + d, self.greetDeferred = self.greetDeferred, None + d.callback(self) + + +class SimpleIMAP4ClientFactory(protocol.ClientFactory): + usedUp = False + protocol = SimpleIMAP4Client + + def __init__(self, username, onConn): + self.ctx = ssl.ClientContextFactory() + + self.username = username + self.onConn = onConn + + def buildProtocol(self, addr): + """ + Initiate the protocol instance. Since we are building a simple IMAP + client, we don't bother checking what capabilities the server has. We + just add all the authenticators twisted.mail has. Note: Gmail no + longer uses any of the methods below, it's been using XOAUTH since + 2010. + """ + assert not self.usedUp + self.usedUp = True + + p = self.protocol(self.ctx) + p.factory = self + p.greetDeferred = self.onConn + + p.registerAuthenticator(imap4.PLAINAuthenticator(self.username)) + p.registerAuthenticator(imap4.LOGINAuthenticator(self.username)) + p.registerAuthenticator( + imap4.CramMD5ClientAuthenticator(self.username)) + + return p + + def clientConnectionFailed(self, connector, reason): + d, self.onConn = self.onConn, None + d.errback(reason) + + +def cbServerGreeting(proto, username, password): + """ + Initial callback - invoked after the server sends us its greet message. + """ + # Hook up stdio + tp = TrivialPrompter() + stdio.StandardIO(tp) + + # And make it easily accessible + proto.prompt = tp.prompt + proto.display = tp.display + + # Try to authenticate securely + return proto.authenticate( + password).addCallback( + cbAuthentication, + proto).addErrback( + ebAuthentication, proto, username, password + ) + + +def ebConnection(reason): + """ + Fallback error-handler. If anything goes wrong, log it and quit. + """ + log.startLogging(sys.stdout) + log.err(reason) + return reason + + +def cbAuthentication(result, proto): + """ + Callback after authentication has succeeded. + + Lists a bunch of mailboxes. + """ + return proto.select( + REGRESSIONS_FOLDER + ).addCallback( + cbSelectMbox, proto + ).addErrback( + ebSelectMbox, proto, REGRESSIONS_FOLDER) + + +def ebAuthentication(failure, proto, username, password): + """ + Errback invoked when authentication fails. + + If it failed because no SASL mechanisms match, offer the user the choice + of logging in insecurely. + + If you are trying to connect to your Gmail account, you will be here! + """ + failure.trap(imap4.NoSupportedAuthentication) + return InsecureLogin(proto, username, password) + + +def InsecureLogin(proto, username, password): + """ + Raise insecure-login error. + """ + return proto.login( + username, password + ).addCallback( + cbAuthentication, proto) + + +def cbSelectMbox(result, proto): + """ + Callback invoked when select command finishes successfully. + + If any message is in the test folder, it will flag them as deleted and + expunge. + If no messages found, it will start with the APPEND tests. + """ + print "SELECT: %s EXISTS " % result.get("EXISTS", "??") + + if result["EXISTS"] != 0: + # Flag as deleted, expunge, and do an examine again. + #print "There is mail here, will delete..." + return cbDeleteAndExpungeTestFolder(proto) + + else: + return cbAppendNextMessage(proto) + + +def ebSelectMbox(failure, proto, folder): + """ + Errback invoked when the examine command fails. + + Creates the folder. + """ + print failure.getTraceback() + log.msg("Folder %r does not exist. Creating..." % (folder,)) + return proto.create(folder).addCallback(cbAuthentication, proto) + + +def cbDeleteAndExpungeTestFolder(proto): + """ + Callback invoked fom cbExamineMbox when the number of messages in the + mailbox is not zero. It flags all messages as deleted and expunge the + mailbox. + """ + return proto.setFlags( + "1:*", ("\\Deleted",) + ).addCallback( + lambda r: proto.expunge() + ).addCallback( + cbExpunge, proto) + + +def cbExpunge(result, proto): + return proto.select( + REGRESSIONS_FOLDER + ).addCallback( + cbSelectMbox, proto + ).addErrback(ebSettingDeleted, proto) + + +def ebSettingDeleted(failure, proto): + """ + Report errors during deletion of messages in the mailbox. + """ + print failure.getTraceback() + + +def cbAppendNextMessage(proto): + """ + Appends the next message in the global queue to the test folder. + """ + # 1. Get the next test message from global tuple. + try: + next_sample = SAMPLES.pop() + except IndexError: + # we're done! + return proto.logout() + + print "\nAPPEND %s" % (next_sample,) + raw = open(next_sample).read() + msg = get_fd(raw) + return proto.append( + REGRESSIONS_FOLDER, msg + ).addCallback( + lambda r: proto.examine(REGRESSIONS_FOLDER) + ).addCallback( + cbAppend, proto, raw + ).addErrback( + ebAppend, proto, raw) + + +def cbAppend(result, proto, orig_msg): + """ + Fetches the message right after an append. + """ + # XXX keep account of highest UID + uid = "1:*" + + return proto.fetchSpecific( + '%s' % uid, + headerType='', + headerArgs=['BODY.PEEK[]'], + ).addCallback( + cbCompareMessage, proto, orig_msg + ).addErrback(ebAppend, proto, orig_msg) + + +def ebAppend(failure, proto, raw): + """ + Errorback for the append operation + """ + print "ERROR WHILE APPENDING!" + print failure.getTraceback() + + +def cbPickMessage(result, proto): + """ + Pick a message. + """ + return proto.fetchSpecific( + '%s' % result, + headerType='', + headerArgs=['BODY.PEEK[]'], + ).addCallback(cbCompareMessage, proto) + + +def cbCompareMessage(result, proto, raw): + """ + Display message and compare it with the original one. + """ + parts_orig = get_msg_parts(raw) + + if result: + keys = result.keys() + keys.sort() + + latest = max(keys) + + fetched_msg = result[latest][0][2] + parts_fetched = get_msg_parts(fetched_msg) + + equal = compare_msg_parts( + parts_orig, + parts_fetched) + + if equal: + print "[+] MESSAGES MATCH" + return cbAppendNextMessage(proto) + else: + print "[-] ERROR: MESSAGES DO NOT MATCH !!!" + print " ABORTING COMPARISON..." + # FIXME logout and print the subject ... + return proto.logout() + + +def cbClose(result): + """ + Close the connection when we finish everything. + """ + from twisted.internet import reactor + reactor.stop() + + +def main(): + import glob + import sys + + if len(sys.argv) != 4: + print "Usage: regressions <user> <pass> <samples-folder>" + sys.exit() + + hostname = "localhost" + port = "1984" + username = sys.argv[1] + password = sys.argv[2] + + samplesdir = sys.argv[3] + + if not os.path.isdir(samplesdir): + print ("Could not find samples folder! " + "Make sure of copying mail_breaker contents there.") + sys.exit() + + samples = glob.glob(samplesdir + '/*') + + global SAMPLES + SAMPLES = [] + SAMPLES += samples + + onConn = defer.Deferred( + ).addCallback( + cbServerGreeting, username, password + ).addErrback( + ebConnection + ).addBoth(cbClose) + + factory = SimpleIMAP4ClientFactory(username, onConn) + + from twisted.internet import reactor + reactor.connectTCP(hostname, int(port), factory) + reactor.run() + + +if __name__ == '__main__': + main() diff --git a/mail/src/leap/mail/messageflow.py b/mail/src/leap/mail/messageflow.py index b7fc0304..80121c85 100644 --- a/mail/src/leap/mail/messageflow.py +++ b/mail/src/leap/mail/messageflow.py @@ -64,6 +64,11 @@ class IMessageProducer(Interface): Stop producing items. """ + def flush(self): + """ + Flush queued messages to consumer. + """ + class DummyMsgConsumer(object): @@ -162,6 +167,12 @@ class MessageProducer(object): if self._loop.running: self._loop.stop() + def flush(self): + """ + Flush queued messages to consumer. + """ + self._check_for_new() + if __name__ == "__main__": from twisted.internet import reactor |