From 0f5eb825c5654a718c1f4f2fd3a9ac2f0efad395 Mon Sep 17 00:00:00 2001 From: "Kali Kaneko (leap communications)" Date: Thu, 22 Dec 2016 15:40:52 +0100 Subject: [test] add script for checking the mail --- tests/e2e/e2e-test.sh | 7 +- tests/e2e/getmail | 358 ++++++++++++++++++++++++++++++++++++ tests/integration/mail/imap/getmail | 344 ---------------------------------- 3 files changed, 363 insertions(+), 346 deletions(-) create mode 100755 tests/e2e/getmail delete mode 100755 tests/integration/mail/imap/getmail diff --git a/tests/e2e/e2e-test.sh b/tests/e2e/e2e-test.sh index 3e6d2bb..96c211d 100755 --- a/tests/e2e/e2e-test.sh +++ b/tests/e2e/e2e-test.sh @@ -52,9 +52,12 @@ while [[ $imap_pw == *"None"* ]]; do imap_pw=$(echo "$response" | head -n 1 | sed 's/ */ /g' | cut -d' ' -f 2) done -#echo "IMAP/SMTP PASSWD: $imap_pw" - $SWAKS $FROM_EXTERNAL_OPTS +echo "IMAP/SMTP PASSWD: $imap_pw" + + +# XXX get mail we just sent. +./getmail --mailbox INBOX --subject "my_unique_subject" $user $imap_pw diff --git a/tests/e2e/getmail b/tests/e2e/getmail new file mode 100755 index 0000000..367d79d --- /dev/null +++ b/tests/e2e/getmail @@ -0,0 +1,358 @@ +#!/usr/bin/env python + +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE in twisted for details. + +# Modifications by LEAP Developers 2014 to fit +# Bitmask configuration settings. +""" +Simple IMAP4 client which displays the subjects of all messages in a +particular mailbox. +""" + +import os +import sys + +from twisted.internet import defer +from twisted.internet import protocol +from twisted.internet import reactor +from twisted.internet import ssl +from twisted.internet import stdio +from twisted.mail import imap4 +from twisted.protocols import basic +from twisted.python import log + +# Global options stored here from main +_opts = {} + +EXITCODE = 0 + + +class TrivialPrompter(basic.LineReceiver): + from os import linesep as delimiter + + promptDeferred = None + + def prompt(self, msg): + assert self.promptDeferred is None + self.display(msg) + self.promptDeferred = defer.Deferred() + return self.promptDeferred + + def display(self, msg): + self.transport.write(msg) + + def lineReceived(self, line): + if self.promptDeferred is None: + return + d, self.promptDeferred = self.promptDeferred, None + d.callback(line) + + +class SimpleIMAP4Client(imap4.IMAP4Client): + """ + A client with callbacks for greeting messages from an IMAP server. + """ + greetDeferred = None + + def serverGreeting(self, caps): + self.serverCapabilities = caps + if self.greetDeferred is not None: + d, self.greetDeferred = self.greetDeferred, None + d.callback(self) + + +class SimpleIMAP4ClientFactory(protocol.ClientFactory): + usedUp = False + + protocol = SimpleIMAP4Client + + def __init__(self, username, onConn): + self.ctx = ssl.ClientContextFactory() + + self.username = username + self.onConn = onConn + + def buildProtocol(self, addr): + """ + Initiate the protocol instance. Since we are building a simple IMAP + client, we don't bother checking what capabilities the server has. We + just add all the authenticators twisted.mail has. + """ + assert not self.usedUp + self.usedUp = True + + p = self.protocol(self.ctx) + p.factory = self + p.greetDeferred = self.onConn + + p.registerAuthenticator(imap4.PLAINAuthenticator(self.username)) + p.registerAuthenticator(imap4.LOGINAuthenticator(self.username)) + p.registerAuthenticator( + imap4.CramMD5ClientAuthenticator(self.username)) + + return p + + def clientConnectionFailed(self, connector, reason): + d, self.onConn = self.onConn, None + d.errback(reason) + + +def cbServerGreeting(proto, username, password): + """ + Initial callback - invoked after the server sends us its greet message. + """ + # Hook up stdio + tp = TrivialPrompter() + stdio.StandardIO(tp) + + # And make it easily accessible + proto.prompt = tp.prompt + proto.display = tp.display + + # Try to authenticate securely + return proto.authenticate( + password).addCallback( + cbAuthentication, + proto).addErrback( + ebAuthentication, proto, username, password + ) + + +def ebConnection(reason): + """ + Fallback error-handler. If anything goes wrong, log it and quit. + """ + log.startLogging(sys.stdout) + log.err(reason) + return reason + + +def cbAuthentication(result, proto): + """ + Callback after authentication has succeeded. + + Lists a bunch of mailboxes. + """ + return proto.list("", "*" + ).addCallback(cbMailboxList, proto + ) + + +def ebAuthentication(failure, proto, username, password): + """ + Errback invoked when authentication fails. + + If it failed because no SASL mechanisms match, offer the user the choice + of logging in insecurely. + + If you are trying to connect to your Gmail account, you will be here! + """ + failure.trap(imap4.NoSupportedAuthentication) + return InsecureLogin(proto, username, password) + + +def InsecureLogin(proto, username, password): + """ + insecure-login. + """ + return proto.login(username, password + ).addCallback(cbAuthentication, proto + ) + + +def cbMailboxList(result, proto): + """ + Callback invoked when a list of mailboxes has been retrieved. + If we have a selected mailbox in the global options, we directly pick it. + Otherwise, we offer a prompt to let user choose one. + """ + all_mbox_list = [e[2] for e in result] + s = '\n'.join(['%d. %s' % (n + 1, m) for (n, m) in zip(range(len(all_mbox_list)), all_mbox_list)]) + if not s: + return defer.fail(Exception("No mailboxes exist on server!")) + + selected_mailbox = _opts.get('mailbox') + + if not selected_mailbox: + return proto.prompt(s + "\nWhich mailbox? [1] " + ).addCallback(cbPickMailbox, proto, all_mbox_list + ) + else: + mboxes_lower = map(lambda s: s.lower(), all_mbox_list) + index = mboxes_lower.index(selected_mailbox.lower()) + 1 + return cbPickMailbox(index, proto, all_mbox_list) + + +def cbPickMailbox(result, proto, mboxes): + """ + When the user selects a mailbox, "examine" it. + """ + mbox = mboxes[int(result or '1') - 1] + return proto.examine(mbox + ).addCallback(cbExamineMbox, proto + ) + + +def cbExamineMbox(result, proto): + """ + Callback invoked when examine command completes. + + Retrieve the subject header of every message in the mailbox. + """ + return proto.fetchSpecific('1:*', + headerType='HEADER.FIELDS', + headerArgs=['SUBJECT'], + ).addCallback(cbFetch, proto, + ) + + +def cbFetch(result, proto): + """ + Display a listing of the messages in the mailbox, based on the collected + headers. + """ + selected_subject = _opts.get('subject', None) + index = None + + if result: + keys = result.keys() + keys.sort() + + if selected_subject: + for k in keys: + # remove 'Subject: ' preffix plus eol + subject = result[k][0][2][9:].rstrip('\r\n') + if subject.lower() == selected_subject.lower(): + index = k + break + else: + for k in keys: + proto.display('%s %s' % (k, result[k][0][2])) + else: + print "Hey, an empty mailbox!" + + if not index: + if selected_subject: + global EXITCODE + EXITCODE=42 + print "NO SUCH MAIL" + return reactor.stop() + + return proto.prompt("\nWhich message? [1] (Q quits) " + ).addCallback(cbPickMessage, proto) + else: + return cbPickMessage(index, proto) + + +def cbPickMessage(result, proto): + """ + Pick a message. + """ + if result == "Q": + print "Bye!" + return proto.logout() + + return proto.fetchSpecific( + '%s' % result, + headerType='', + headerArgs=['BODY.PEEK[]'], + ).addCallback(cbShowmessage, proto) + + +def cbShowmessage(result, proto): + """ + Display message. + """ + if result: + keys = result.keys() + keys.sort() + for k in keys: + proto.display('%s %s' % (k, result[k][0][2])) + else: + print "Hey, an empty message!" + + return proto.logout() + + +def cbClose(result): + """ + Close the connection when we finish everything. + """ + from twisted.internet import reactor + reactor.stop() + + +def main(): + import argparse + import ConfigParser + import sys + from twisted.internet import reactor + + global EXITCODE + + description = ( + 'Get messages from a LEAP IMAP Proxy.\nThis is a ' + 'debugging tool, do not use this to retrieve any sensitive ' + 'information, or we will send ninjas to your house!') + epilog = ( + 'In case you want to automate the usage of this utility ' + 'you can place your credentials in a file pointed by ' + 'BITMASK_CREDENTIALS. You need to have a [Credentials] ' + 'section, with username= and password fields') + + parser = argparse.ArgumentParser(description=description, epilog=epilog) + credentials = os.environ.get('BITMASK_CREDENTIALS') + + if credentials: + try: + config = ConfigParser.ConfigParser() + config.read(credentials) + username = config.get('Credentials', 'username') + password = config.get('Credentials', 'password') + except Exception, e: + print "Error reading credentials file: {0}".format(e) + sys.exit() + else: + parser.add_argument('username', type=str) + parser.add_argument('password', type=str) + + parser.add_argument( + '--mailbox', dest='mailbox', default=None, + help='Which mailbox to retrieve. Empty for interactive prompt.') + parser.add_argument( + '--subject', dest='subject', default=None, + help='A subject for retrieve a mail that matches. Empty for interactive prompt.') + + ns = parser.parse_args() + + if not credentials: + username = ns.username + password = ns.password + + _opts['mailbox'] = ns.mailbox + _opts['subject'] = ns.subject + + hostname = "localhost" + port = "1984" + + onConn = defer.Deferred( + ).addCallback(cbServerGreeting, username, password + ).addErrback(ebConnection + ).addBoth(cbClose) + + factory = SimpleIMAP4ClientFactory(username, onConn) + + if port == '993': + reactor.connectSSL( + hostname, int(port), factory, ssl.ClientContextFactory()) + else: + if not port: + port = 143 + reactor.connectTCP(hostname, int(port), factory) + reactor.run() + sys.exit(EXITCODE) + + +if __name__ == '__main__': + main() diff --git a/tests/integration/mail/imap/getmail b/tests/integration/mail/imap/getmail deleted file mode 100755 index dd3fa0b..0000000 --- a/tests/integration/mail/imap/getmail +++ /dev/null @@ -1,344 +0,0 @@ -#!/usr/bin/env python - -# Copyright (c) Twisted Matrix Laboratories. -# See LICENSE in twisted for details. - -# Modifications by LEAP Developers 2014 to fit -# Bitmask configuration settings. -""" -Simple IMAP4 client which displays the subjects of all messages in a -particular mailbox. -""" - -import os -import sys - -from twisted.internet import protocol -from twisted.internet import ssl -from twisted.internet import defer -from twisted.internet import stdio -from twisted.mail import imap4 -from twisted.protocols import basic -from twisted.python import log - -# Global options stored here from main -_opts = {} - - -class TrivialPrompter(basic.LineReceiver): - from os import linesep as delimiter - - promptDeferred = None - - def prompt(self, msg): - assert self.promptDeferred is None - self.display(msg) - self.promptDeferred = defer.Deferred() - return self.promptDeferred - - def display(self, msg): - self.transport.write(msg) - - def lineReceived(self, line): - if self.promptDeferred is None: - return - d, self.promptDeferred = self.promptDeferred, None - d.callback(line) - - -class SimpleIMAP4Client(imap4.IMAP4Client): - """ - A client with callbacks for greeting messages from an IMAP server. - """ - greetDeferred = None - - def serverGreeting(self, caps): - self.serverCapabilities = caps - if self.greetDeferred is not None: - d, self.greetDeferred = self.greetDeferred, None - d.callback(self) - - -class SimpleIMAP4ClientFactory(protocol.ClientFactory): - usedUp = False - - protocol = SimpleIMAP4Client - - def __init__(self, username, onConn): - self.ctx = ssl.ClientContextFactory() - - self.username = username - self.onConn = onConn - - def buildProtocol(self, addr): - """ - Initiate the protocol instance. Since we are building a simple IMAP - client, we don't bother checking what capabilities the server has. We - just add all the authenticators twisted.mail has. - """ - assert not self.usedUp - self.usedUp = True - - p = self.protocol(self.ctx) - p.factory = self - p.greetDeferred = self.onConn - - p.registerAuthenticator(imap4.PLAINAuthenticator(self.username)) - p.registerAuthenticator(imap4.LOGINAuthenticator(self.username)) - p.registerAuthenticator( - imap4.CramMD5ClientAuthenticator(self.username)) - - return p - - def clientConnectionFailed(self, connector, reason): - d, self.onConn = self.onConn, None - d.errback(reason) - - -def cbServerGreeting(proto, username, password): - """ - Initial callback - invoked after the server sends us its greet message. - """ - # Hook up stdio - tp = TrivialPrompter() - stdio.StandardIO(tp) - - # And make it easily accessible - proto.prompt = tp.prompt - proto.display = tp.display - - # Try to authenticate securely - return proto.authenticate( - password).addCallback( - cbAuthentication, - proto).addErrback( - ebAuthentication, proto, username, password - ) - - -def ebConnection(reason): - """ - Fallback error-handler. If anything goes wrong, log it and quit. - """ - log.startLogging(sys.stdout) - log.err(reason) - return reason - - -def cbAuthentication(result, proto): - """ - Callback after authentication has succeeded. - - Lists a bunch of mailboxes. - """ - return proto.list("", "*" - ).addCallback(cbMailboxList, proto - ) - - -def ebAuthentication(failure, proto, username, password): - """ - Errback invoked when authentication fails. - - If it failed because no SASL mechanisms match, offer the user the choice - of logging in insecurely. - - If you are trying to connect to your Gmail account, you will be here! - """ - failure.trap(imap4.NoSupportedAuthentication) - return InsecureLogin(proto, username, password) - - -def InsecureLogin(proto, username, password): - """ - insecure-login. - """ - return proto.login(username, password - ).addCallback(cbAuthentication, proto - ) - - -def cbMailboxList(result, proto): - """ - Callback invoked when a list of mailboxes has been retrieved. - If we have a selected mailbox in the global options, we directly pick it. - Otherwise, we offer a prompt to let user choose one. - """ - all_mbox_list = [e[2] for e in result] - s = '\n'.join(['%d. %s' % (n + 1, m) for (n, m) in zip(range(len(all_mbox_list)), all_mbox_list)]) - if not s: - return defer.fail(Exception("No mailboxes exist on server!")) - - selected_mailbox = _opts.get('mailbox') - - if not selected_mailbox: - return proto.prompt(s + "\nWhich mailbox? [1] " - ).addCallback(cbPickMailbox, proto, all_mbox_list - ) - else: - mboxes_lower = map(lambda s: s.lower(), all_mbox_list) - index = mboxes_lower.index(selected_mailbox.lower()) + 1 - return cbPickMailbox(index, proto, all_mbox_list) - - -def cbPickMailbox(result, proto, mboxes): - """ - When the user selects a mailbox, "examine" it. - """ - mbox = mboxes[int(result or '1') - 1] - return proto.examine(mbox - ).addCallback(cbExamineMbox, proto - ) - - -def cbExamineMbox(result, proto): - """ - Callback invoked when examine command completes. - - Retrieve the subject header of every message in the mailbox. - """ - return proto.fetchSpecific('1:*', - headerType='HEADER.FIELDS', - headerArgs=['SUBJECT'], - ).addCallback(cbFetch, proto, - ) - - -def cbFetch(result, proto): - """ - Display a listing of the messages in the mailbox, based on the collected - headers. - """ - selected_subject = _opts.get('subject', None) - index = None - - if result: - keys = result.keys() - keys.sort() - - if selected_subject: - for k in keys: - # remove 'Subject: ' preffix plus eol - subject = result[k][0][2][9:].rstrip('\r\n') - if subject.lower() == selected_subject.lower(): - index = k - break - else: - for k in keys: - proto.display('%s %s' % (k, result[k][0][2])) - else: - print "Hey, an empty mailbox!" - - if not index: - return proto.prompt("\nWhich message? [1] (Q quits) " - ).addCallback(cbPickMessage, proto) - else: - return cbPickMessage(index, proto) - - -def cbPickMessage(result, proto): - """ - Pick a message. - """ - if result == "Q": - print "Bye!" - return proto.logout() - - return proto.fetchSpecific( - '%s' % result, - headerType='', - headerArgs=['BODY.PEEK[]'], - ).addCallback(cbShowmessage, proto) - - -def cbShowmessage(result, proto): - """ - Display message. - """ - if result: - keys = result.keys() - keys.sort() - for k in keys: - proto.display('%s %s' % (k, result[k][0][2])) - else: - print "Hey, an empty message!" - - return proto.logout() - - -def cbClose(result): - """ - Close the connection when we finish everything. - """ - from twisted.internet import reactor - reactor.stop() - - -def main(): - import argparse - import ConfigParser - import sys - from twisted.internet import reactor - - description = ( - 'Get messages from a LEAP IMAP Proxy.\nThis is a ' - 'debugging tool, do not use this to retrieve any sensitive ' - 'information, or we will send ninjas to your house!') - epilog = ( - 'In case you want to automate the usage of this utility ' - 'you can place your credentials in a file pointed by ' - 'BITMASK_CREDENTIALS. You need to have a [Credentials] ' - 'section, with username= and password fields') - - parser = argparse.ArgumentParser(description=description, epilog=epilog) - credentials = os.environ.get('BITMASK_CREDENTIALS') - - if credentials: - try: - config = ConfigParser.ConfigParser() - config.read(credentials) - username = config.get('Credentials', 'username') - password = config.get('Credentials', 'password') - except Exception, e: - print "Error reading credentials file: {0}".format(e) - sys.exit() - else: - parser.add_argument('username', type=str) - parser.add_argument('password', type=str) - - parser.add_argument('--mailbox', dest='mailbox', default=None, - help='Which mailbox to retrieve. Empty for interactive prompt.') - parser.add_argument('--subject', dest='subject', default=None, - help='A subject for retrieve a mail that matches. Empty for interactive prompt.') - - ns = parser.parse_args() - - if not credentials: - username = ns.username - password = ns.password - - _opts['mailbox'] = ns.mailbox - _opts['subject'] = ns.subject - - hostname = "localhost" - port = "1984" - - onConn = defer.Deferred( - ).addCallback(cbServerGreeting, username, password - ).addErrback(ebConnection - ).addBoth(cbClose) - - factory = SimpleIMAP4ClientFactory(username, onConn) - - if port == '993': - reactor.connectSSL( - hostname, int(port), factory, ssl.ClientContextFactory()) - else: - if not port: - port = 143 - reactor.connectTCP(hostname, int(port), factory) - reactor.run() - - -if __name__ == '__main__': - main() -- cgit v1.2.3