diff options
author | Tomás Touceda <chiiph@leap.se> | 2014-01-16 10:37:38 -0300 |
---|---|---|
committer | Tomás Touceda <chiiph@leap.se> | 2014-01-16 10:37:38 -0300 |
commit | 06eebf54ab572ebaf6730f2095a062cd9549f12e (patch) | |
tree | 35c66cbeae132418dfbfc1524b36b55482080dcb | |
parent | df5dfdb2d8dd407ed9ed0d2b043e0da299b9c341 (diff) | |
parent | 1069e7b9470fb63f70a43fa72407fcd4276d550d (diff) |
Merge remote-tracking branch 'refs/remotes/kali/bug/bug_4963_fix-bulky-fetch' into develop
-rw-r--r-- | .gitignore | 1 | ||||
-rw-r--r-- | src/leap/mail/imap/mailbox.py | 103 | ||||
-rw-r--r-- | src/leap/mail/imap/messages.py | 192 | ||||
-rw-r--r-- | src/leap/mail/imap/service/imap-server.tac | 182 | ||||
-rw-r--r-- | src/leap/mail/imap/service/imap.py | 37 | ||||
-rw-r--r-- | src/leap/mail/messageflow.py | 2 |
6 files changed, 358 insertions, 159 deletions
@@ -19,3 +19,4 @@ lib/ local/ share/ MANIFEST +twistd.pid diff --git a/src/leap/mail/imap/mailbox.py b/src/leap/mail/imap/mailbox.py index ccbf5c2..94070ac 100644 --- a/src/leap/mail/imap/mailbox.py +++ b/src/leap/mail/imap/mailbox.py @@ -20,13 +20,13 @@ Soledad Mailbox. import copy import threading import logging -import time import StringIO import cStringIO from collections import defaultdict from twisted.internet import defer +from twisted.internet.task import deferLater from twisted.python import log from twisted.mail import imap4 @@ -39,7 +39,6 @@ from leap.mail.decorators import deferred from leap.mail.imap.fields import WithMsgFields, fields from leap.mail.imap.messages import MessageCollection from leap.mail.imap.parser import MBoxParser -from leap.mail.utils import first logger = logging.getLogger(__name__) @@ -60,7 +59,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser): imap4.IMessageCopier) # XXX should finish the implementation of IMailboxListener - # XXX should implement ISearchableMailbox too + # XXX should completely implement ISearchableMailbox too messages = None _closed = False @@ -78,6 +77,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser): CMD_UNSEEN = "UNSEEN" _listeners = defaultdict(set) + next_uid_lock = threading.Lock() def __init__(self, mbox, soledad=None, rw=1): @@ -161,7 +161,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser): if query: return query.pop() except Exception as exc: - logger.error("Unhandled error %r" % exc) + logger.exception("Unhandled error %r" % exc) def getFlags(self): """ @@ -226,6 +226,11 @@ class SoledadMailbox(WithMsgFields, MBoxParser): :rtype: bool """ mbox = self._get_mbox() + if not mbox: + logger.error("We could not get a mbox!") + # XXX It looks like it has been corrupted. + # We need to be able to survive this. + return None return mbox.content.get(self.LAST_UID_KEY, 1) def _set_last_uid(self, uid): @@ -462,15 +467,16 @@ class SoledadMailbox(WithMsgFields, MBoxParser): return d @deferred - def fetch(self, messages, uid): + def fetch(self, messages_asked, uid): """ Retrieve one or more messages in this mailbox. from rfc 3501: The data items to be fetched can be either a single atom or a parenthesized list. - :param messages: IDs of the messages to retrieve information about - :type messages: MessageSet + :param messages_asked: IDs of the messages to retrieve information + about + :type messages_asked: MessageSet :param uid: If true, the IDs are UIDs. They are message sequence IDs otherwise. @@ -479,7 +485,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser): :rtype: A tuple of two-tuples of message sequence numbers and LeapMessage """ - result = [] + from twisted.internet import reactor # For the moment our UID is sequential, so we # can treat them all the same. @@ -489,12 +495,17 @@ class SoledadMailbox(WithMsgFields, MBoxParser): sequence = False #sequence = True if uid == 0 else False - if not messages.last: + if not messages_asked.last: try: - iter(messages) + iter(messages_asked) except TypeError: # looks like we cannot iterate - messages.last = self.last_uid + messages_asked.last = self.last_uid + + set_asked = set(messages_asked) + set_exist = set(self.messages.all_uid_iter()) + seq_messg = set_asked.intersection(set_exist) + getmsg = lambda msgid: self.messages.get_msg_by_uid(msgid) # for sequence numbers (uid = 0) if sequence: @@ -502,20 +513,68 @@ class SoledadMailbox(WithMsgFields, MBoxParser): raise NotImplementedError else: - for msg_id in messages: - msg = self.messages.get_msg_by_uid(msg_id) - if msg: - result.append((msg_id, msg)) - else: - logger.debug("fetch %s, no msg found!!!" % msg_id) + result = ((msgid, getmsg(msgid)) for msgid in seq_messg) if self.isWriteable(): + deferLater(reactor, 30, self._unset_recent_flag) + # XXX I should rewrite the scheduler so it handles a + # set of queues with different priority. self._unset_recent_flag() - self._signal_unread_to_ui() - # XXX workaround for hangs in thunderbird - #return tuple(result[:100]) # --- doesn't show all!! - return tuple(result) + # this should really be called as a final callback of + # the do_FETCH method... + deferLater(reactor, 1, self._signal_unread_to_ui) + return result + + @deferred + def fetch_flags(self, messages_asked, uid): + """ + A fast method to fetch all flags, tricking just the + needed subset of the MIME interface that's needed to satisfy + a generic FLAGS query. + Given how LEAP Mail is supposed to work without local cache, + this query is going to be quite common, and also we expect + it to be in the form 1:* at the beginning of a session, so + it's not bad to fetch all the flags doc at once. + + :param messages_asked: IDs of the messages to retrieve information + about + :type messages_asked: MessageSet + + :param uid: If true, the IDs are UIDs. They are message sequence IDs + otherwise. + :type uid: bool + + :return: A tuple of two-tuples of message sequence numbers and + flagsPart, which is a only a partial implementation of + MessagePart. + :rtype: tuple + """ + class flagsPart(object): + def __init__(self, uid, flags): + self.uid = uid + self.flags = flags + + def getUID(self): + return self.uid + + def getFlags(self): + return map(str, self.flags) + + if not messages_asked.last: + try: + iter(messages_asked) + except TypeError: + # looks like we cannot iterate + messages_asked.last = self.last_uid + + set_asked = set(messages_asked) + set_exist = set(self.messages.all_uid_iter()) + seq_messg = set_asked.intersection(set_exist) + all_flags = self.messages.all_flags() + result = ((msgid, flagsPart( + msgid, all_flags[msgid])) for msgid in seq_messg) + return result @deferred def _unset_recent_flag(self): @@ -544,8 +603,6 @@ class SoledadMailbox(WithMsgFields, MBoxParser): # 3. Route it through a queue with lesser priority than the # regularar writer. - # hmm let's try 2. in a quickndirty way... - time.sleep(1) log.msg('unsetting recent flags...') for msg in self.messages.get_recent(): msg.removeFlags((fields.RECENT_FLAG,)) diff --git a/src/leap/mail/imap/messages.py b/src/leap/mail/imap/messages.py index a3d29d6..22de356 100644 --- a/src/leap/mail/imap/messages.py +++ b/src/leap/mail/imap/messages.py @@ -20,7 +20,6 @@ LeapMessage and MessageCollection. import copy import logging import re -import threading import time import StringIO @@ -46,15 +45,11 @@ from leap.mail.messageflow import IMessageConsumer, MessageProducer logger = logging.getLogger(__name__) -read_write_lock = threading.Lock() - # TODO ------------------------------------------------------------ # [ ] Add linked-from info. # [ ] Delete incoming mail only after successful write! # [ ] Remove UID from syncable db. Store only those indexes locally. -# [ ] Send patch to twisted for bug in imap4.py:5717 (content-type can be -# none? lower-case?) def lowerdict(_dict): @@ -659,10 +654,18 @@ class LeapMessage(fields, MailParser, MBoxParser): Return the document that keeps the flags for this message. """ - flag_docs = self._soledad.get_from_index( - fields.TYPE_MBOX_UID_IDX, - fields.TYPE_FLAGS_VAL, self._mbox, str(self._uid)) - return first(flag_docs) + result = {} + try: + flag_docs = self._soledad.get_from_index( + fields.TYPE_MBOX_UID_IDX, + fields.TYPE_FLAGS_VAL, self._mbox, str(self._uid)) + result = first(flag_docs) + except Exception as exc: + # ugh! Something's broken down there! + logger.warning("ERROR while getting flags for UID: %s" % self._uid) + logger.exception(exc) + finally: + return result def _get_headers_doc(self): """ @@ -772,6 +775,51 @@ class LeapMessage(fields, MailParser, MBoxParser): return self._fdoc is not None +class ContentDedup(object): + """ + Message deduplication. + + We do a query for the content hashes before writing to our beloved + sqlcipher backend of Soledad. This means, by now, that: + + 1. We will not store the same attachment twice, only the hash of it. + 2. We will not store the same message body twice, only the hash of it. + + The first case is useful if you are always receiving the same old memes + from unwary friends that still have not discovered that 4chan is the + generator of the internet. The second will save your day if you have + initiated session with the same account in two different machines. I also + wonder why would you do that, but let's respect each other choices, like + with the religious celebrations, and assume that one day we'll be able + to run Bitmask in completely free phones. Yes, I mean that, the whole GSM + Stack. + """ + + def _content_does_exist(self, doc): + """ + Check whether we already have a content document for a payload + with this hash in our database. + + :param doc: tentative body document + :type doc: dict + :returns: True if that happens, False otherwise. + """ + if not doc: + return False + phash = doc[fields.PAYLOAD_HASH_KEY] + attach_docs = self._soledad.get_from_index( + fields.TYPE_P_HASH_IDX, + fields.TYPE_CONTENT_VAL, str(phash)) + if not attach_docs: + return False + + if len(attach_docs) != 1: + logger.warning("Found more than one copy of phash %s!" + % (phash,)) + logger.debug("Found attachment doc with that hash! Skipping save!") + return True + + SoledadWriterPayload = namedtuple( 'SoledadWriterPayload', ['mode', 'payload']) @@ -783,6 +831,13 @@ SoledadWriterPayload.PUT = 2 SoledadWriterPayload.CONTENT_CREATE = 3 +""" +SoledadDocWriter was used to avoid writing to the db from multiple threads. +Its use here has been deprecated in favor of a local rw_lock in the client. +But we might want to reuse in in the near future to implement priority queues. +""" + + class SoledadDocWriter(object): """ This writer will create docs serially in the local soledad database. @@ -854,51 +909,9 @@ class SoledadDocWriter(object): empty = queue.empty() - """ - Message deduplication. - We do a query for the content hashes before writing to our beloved - sqlcipher backend of Soledad. This means, by now, that: - - 1. We will not store the same attachment twice, only the hash of it. - 2. We will not store the same message body twice, only the hash of it. - - The first case is useful if you are always receiving the same old memes - from unwary friends that still have not discovered that 4chan is the - generator of the internet. The second will save your day if you have - initiated session with the same account in two different machines. I also - wonder why would you do that, but let's respect each other choices, like - with the religious celebrations, and assume that one day we'll be able - to run Bitmask in completely free phones. Yes, I mean that, the whole GSM - Stack. - """ - - def _content_does_exist(self, doc): - """ - Check whether we already have a content document for a payload - with this hash in our database. - - :param doc: tentative body document - :type doc: dict - :returns: True if that happens, False otherwise. - """ - if not doc: - return False - phash = doc[fields.PAYLOAD_HASH_KEY] - attach_docs = self._soledad.get_from_index( - fields.TYPE_P_HASH_IDX, - fields.TYPE_CONTENT_VAL, str(phash)) - if not attach_docs: - return False - - if len(attach_docs) != 1: - logger.warning("Found more than one copy of phash %s!" - % (phash,)) - logger.debug("Found attachment doc with that hash! Skipping save!") - return True - - -class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): +class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, + ContentDedup): """ A collection of messages, surprisingly. @@ -1147,24 +1160,21 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): hd[key] = parts_map[key] del parts_map - docs = [fd, hd] - cdocs = walk.get_raw_docs(msg, parts) - # Saving - logger.debug('enqueuing message docs for write') - ptuple = SoledadWriterPayload - with read_write_lock: - # first, regular docs: flags and headers - for doc in docs: - self.soledad_writer.put(ptuple( - mode=ptuple.CREATE, payload=doc)) + # first, regular docs: flags and headers + self._soledad.create_doc(fd) - # and last, but not least, try to create - # content docs if not already there. - for cd in cdocs: - self.soledad_writer.put(ptuple( - mode=ptuple.CONTENT_CREATE, payload=cd)) + # XXX should check for content duplication on headers too + # but with chash. !!! + self._soledad.create_doc(hd) + + # and last, but not least, try to create + # content docs if not already there. + cdocs = walk.get_raw_docs(msg, parts) + for cdoc in cdocs: + if not self._content_does_exist(cdoc): + self._soledad.create_doc(cdoc) def _remove_cb(self, result): return result @@ -1219,21 +1229,20 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): def _get_uid_from_msgidCb(self, msgid): hdoc = None - with read_write_lock: - try: - query = self._soledad.get_from_index( - fields.TYPE_MSGID_IDX, - fields.TYPE_HEADERS_VAL, msgid) - if query: - if len(query) > 1: - logger.warning( - "More than one hdoc found for this msgid, " - "we got a duplicate!!") - # XXX we could take action, like trigger a background - # process to kill dupes. - hdoc = query.pop() - except Exception as exc: - logger.exception("Unhandled error %r" % exc) + try: + query = self._soledad.get_from_index( + fields.TYPE_MSGID_IDX, + fields.TYPE_HEADERS_VAL, msgid) + if query: + if len(query) > 1: + logger.warning( + "More than one hdoc found for this msgid, " + "we got a duplicate!!") + # XXX we could take action, like trigger a background + # process to kill dupes. + hdoc = query.pop() + except Exception as exc: + logger.exception("Unhandled error %r" % exc) if hdoc is None: logger.warning("Could not find hdoc for msgid %s" @@ -1316,17 +1325,30 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): # XXX FIXINDEX -- should implement order by in soledad return sorted(all_docs, key=lambda item: item.content['uid']) - def all_msg_iter(self): + def all_uid_iter(self): """ Return an iterator trhough the UIDs of all messages, sorted in ascending order. """ + # XXX we should get this from the uid table, local-only all_uids = (doc.content[self.UID_KEY] for doc in self._soledad.get_from_index( fields.TYPE_MBOX_IDX, fields.TYPE_FLAGS_VAL, self.mbox)) return (u for u in sorted(all_uids)) + def all_flags(self): + """ + Return a dict with all flags documents for this mailbox. + """ + all_flags = dict((( + doc.content[self.UID_KEY], + doc.content[self.FLAGS_KEY]) for doc in + self._soledad.get_from_index( + fields.TYPE_MBOX_IDX, + fields.TYPE_FLAGS_VAL, self.mbox))) + return all_flags + def count(self): """ Return the count of messages for this mailbox. @@ -1451,7 +1473,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): :rtype: iterable """ return (LeapMessage(self._soledad, docuid, self.mbox) - for docuid in self.all_msg_iter()) + for docuid in self.all_uid_iter()) def __repr__(self): """ diff --git a/src/leap/mail/imap/service/imap-server.tac b/src/leap/mail/imap/service/imap-server.tac index da72cae..b65bb17 100644 --- a/src/leap/mail/imap/service/imap-server.tac +++ b/src/leap/mail/imap/service/imap-server.tac @@ -1,69 +1,157 @@ +# -*- coding: utf-8 -*- +# imap-server.tac +# Copyright (C) 2013,2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +TAC file for initialization of the imap service using twistd. + +Use this for debugging and testing the imap server using a native reactor. + +For now, and for debugging/testing purposes, you need +to pass a config file with the following structure: + +[leap_mail] +userid = "user@provider" +uuid = "deadbeefdeadabad" +passwd = "supersecret" # optional, will get prompted if not found. +""" import ConfigParser +import getpass import os +import sys -from leap.soledad.client import Soledad +from leap.keymanager import KeyManager from leap.mail.imap.service import imap -from leap.common.config import get_path_prefix - - -config = ConfigParser.ConfigParser() -config.read([os.path.expanduser('~/.config/leap/mail/mail.conf')]) - -userID = config.get('mail', 'address') -privkey = open(os.path.expanduser('~/.config/leap/mail/privkey')).read() -nickserver_url = "" +from leap.soledad.client import Soledad -d = {} +from twisted.application import service, internet -for key in ('uid', 'passphrase', 'server', 'pemfile', 'token'): - d[key] = config.get('mail', key) +# TODO should get this initializers from some authoritative mocked source +# We might want to put them the soledad itself. -def initialize_soledad_mailbox(user_uuid, soledad_pass, server_url, - server_pemfile, token): +def initialize_soledad(uuid, email, passwd, + secrets, localdb, + gnupg_home, tempdir): """ Initializes soledad by hand - :param user_uuid: - :param soledad_pass: - :param server_url: - :param server_pemfile: - :param token: - + :param email: ID for the user + :param gnupg_home: path to home used by gnupg + :param tempdir: path to temporal dir :rtype: Soledad instance """ + # XXX TODO unify with an authoritative source of mocks + # for soledad (or partial initializations). + # This is copied from the imap tests. + + server_url = "http://provider" + cert_file = "" - base_config = get_path_prefix() + class Mock(object): + def __init__(self, return_value=None): + self._return = return_value - secret_path = os.path.join( - base_config, "leap", "soledad", "%s.secret" % user_uuid) - soledad_path = os.path.join( - base_config, "leap", "soledad", "%s-mailbox.db" % user_uuid) + def __call__(self, *args, **kwargs): + return self._return - _soledad = Soledad( - user_uuid, - soledad_pass, - secret_path, - soledad_path, + class MockSharedDB(object): + + get_doc = Mock() + put_doc = Mock() + lock = Mock(return_value=('atoken', 300)) + unlock = Mock(return_value=True) + + def __call__(self): + return self + + Soledad._shared_db = MockSharedDB() + soledad = Soledad( + uuid, + passwd, + secrets, + localdb, server_url, - server_pemfile, - token) + cert_file) + + return soledad + +###################################################################### +# Remember to set your config files, see module documentation above! +###################################################################### + +print "[+] Running LEAP IMAP Service" + + +bmconf = os.environ.get("LEAP_MAIL_CONF", "") +if not bmconf: + print "[-] Please set LEAP_MAIL_CONF environment variable pointing to your config." + sys.exit(1) +SECTION = "leap_mail" +cp = ConfigParser.ConfigParser() +cp.read(bmconf) + +userid = cp.get(SECTION, "userid") +uuid = cp.get(SECTION, "uuid") +passwd = unicode(cp.get(SECTION, "passwd")) + +# XXX get this right from the environment variable !!! +port = 1984 + +if not userid or not uuid: + print "[-] Config file missing userid or uuid field" + sys.exit(1) + +if not passwd: + passwd = unicode(getpass.getpass("Soledad passphrase: ")) + + +secrets = os.path.expanduser("~/.config/leap/soledad/%s.secret" % (uuid,)) +localdb = os.path.expanduser("~/.config/leap/soledad/%s.db" % (uuid,)) + +# XXX Is this really used? Should point it to user var dirs defined in xdg? +gnupg_home = "/tmp/" +tempdir = "/tmp/" + +################################################### + +# Ad-hoc soledad/keymanager initialization. + +soledad = initialize_soledad(uuid, userid, passwd, secrets, localdb, gnupg_home, tempdir) +km_args = (userid, "https://localhost", soledad) +km_kwargs = { + "session_id": "", + "ca_cert_path": "", + "api_uri": "", + "api_version": "", + "uid": uuid, + "gpgbinary": "/usr/bin/gpg" +} +keymanager = KeyManager(*km_args, **km_kwargs) + +################################################## - return _soledad +# Ok, let's expose the application object for the twistd application +# framework to pick up from here... -soledad = initialize_soledad_mailbox( - d['uid'], - d['passphrase'], - d['server'], - d['pemfile'], - d['token']) -# import the private key ---- should sync it from remote! -from leap.common.keymanager.openpgp import OpenPGPScheme -opgp = OpenPGPScheme(soledad) -opgp.put_ascii_key(privkey) +def getIMAPService(): + factory = imap.LeapIMAPFactory(uuid, userid, soledad) + return internet.TCPServer(port, factory, interface="localhost") -from leap.common.keymanager import KeyManager -keymanager = KeyManager(userID, nickserver_url, soledad, d['token']) -imap.run_service(soledad, keymanager) +application = service.Application("LEAP IMAP Application") +service = getIMAPService() +service.setServiceParent(application) diff --git a/src/leap/mail/imap/service/imap.py b/src/leap/mail/imap/service/imap.py index c48e5c5..e877869 100644 --- a/src/leap/mail/imap/service/imap.py +++ b/src/leap/mail/imap/service/imap.py @@ -22,6 +22,7 @@ from copy import copy import logging from twisted.internet.protocol import ServerFactory +from twisted.internet.defer import maybeDeferred from twisted.internet.error import CannotListenError from twisted.mail import imap4 from twisted.python import log @@ -78,7 +79,6 @@ class LeapIMAPServer(imap4.IMAP4Server): :param line: the line from the server, without the line delimiter. :type line: str """ - print "RECV: STATE (%s)" % self.state if self.theAccount.closed is True and self.state != "unauth": log.msg("Closing the session. State: unauth") self.state = "unauth" @@ -89,7 +89,7 @@ class LeapIMAPServer(imap4.IMAP4Server): msg = line[:7] + " [...]" else: msg = copy(line) - log.msg('rcv: %s' % msg) + log.msg('rcv (%s): %s' % (self.state, msg)) imap4.IMAP4Server.lineReceived(self, line) def authenticateLogin(self, username, password): @@ -111,6 +111,39 @@ class LeapIMAPServer(imap4.IMAP4Server): leap_events.signal(IMAP_CLIENT_LOGIN, "1") return imap4.IAccount, self.theAccount, lambda: None + def do_FETCH(self, tag, messages, query, uid=0): + """ + Overwritten fetch dispatcher to use the fast fetch_flags + method + """ + log.msg("LEAP Overwritten fetch...") + if not query: + self.sendPositiveResponse(tag, 'FETCH complete') + return # XXX ??? + + cbFetch = self._IMAP4Server__cbFetch + ebFetch = self._IMAP4Server__ebFetch + + if str(query[0]) == "flags": + self._oldTimeout = self.setTimeout(None) + # no need to call iter, we get a generator + maybeDeferred( + self.mbox.fetch_flags, messages, uid=uid + ).addCallback( + cbFetch, tag, query, uid + ).addErrback(ebFetch, tag) + else: + self._oldTimeout = self.setTimeout(None) + # no need to call iter, we get a generator + maybeDeferred( + self.mbox.fetch, messages, uid=uid + ).addCallback( + cbFetch, tag, query, uid + ).addErrback(ebFetch, tag) + + select_FETCH = (do_FETCH, imap4.IMAP4Server.arg_seqset, + imap4.IMAP4Server.arg_fetchatt) + class IMAPAuthRealm(object): """ diff --git a/src/leap/mail/messageflow.py b/src/leap/mail/messageflow.py index a0a571d..ac26e45 100644 --- a/src/leap/mail/messageflow.py +++ b/src/leap/mail/messageflow.py @@ -121,8 +121,6 @@ class MessageProducer(object): """ if not self._loop.running: self._loop.start(self._period, now=True) - else: - print "was running..., not starting" def stop(self): """ |