diff options
Diffstat (limited to 'src/leap/mail/imap/service/imap.py')
-rw-r--r-- | src/leap/mail/imap/service/imap.py | 185 |
1 files changed, 107 insertions, 78 deletions
diff --git a/src/leap/mail/imap/service/imap.py b/src/leap/mail/imap/service/imap.py index 8756ddc..10ba32a 100644 --- a/src/leap/mail/imap/service/imap.py +++ b/src/leap/mail/imap/service/imap.py @@ -17,23 +17,26 @@ """ Imap service initialization """ -from copy import copy - import logging +import os +import time +from twisted.internet import defer, threads from twisted.internet.protocol import ServerFactory from twisted.internet.error import CannotListenError from twisted.mail import imap4 from twisted.python import log -from twisted import cred logger = logging.getLogger(__name__) from leap.common import events as leap_events from leap.common.check import leap_assert, leap_assert_type, leap_check from leap.keymanager import KeyManager -from leap.mail.imap.server import SoledadBackedAccount +from leap.mail.imap.account import SoledadBackedAccount from leap.mail.imap.fetch import LeapIncomingMail +from leap.mail.imap.memorystore import MemoryStore +from leap.mail.imap.server import LeapIMAPServer +from leap.mail.imap.soledadstore import SoledadStore from leap.soledad.client import Soledad # The default port in which imap service will run @@ -45,75 +48,38 @@ INCOMING_CHECK_PERIOD = 60 from leap.common.events.events_pb2 import IMAP_SERVICE_STARTED from leap.common.events.events_pb2 import IMAP_SERVICE_FAILED_TO_START -from leap.common.events.events_pb2 import IMAP_CLIENT_LOGIN - - -class LeapIMAPServer(imap4.IMAP4Server): - """ - An IMAP4 Server with mailboxes backed by soledad - """ - def __init__(self, *args, **kwargs): - # pop extraneous arguments - soledad = kwargs.pop('soledad', None) - uuid = kwargs.pop('uuid', None) - userid = kwargs.pop('userid', None) - leap_assert(soledad, "need a soledad instance") - leap_assert_type(soledad, Soledad) - leap_assert(uuid, "need a user in the initialization") - - self._userid = userid - # initialize imap server! - imap4.IMAP4Server.__init__(self, *args, **kwargs) +###################################################### +# Temporary workaround for RecursionLimit when using +# qt4reactor. Do remove when we move to poll or select +# reactor, which do not show those problems. See #4974 +import resource +import sys - # we should initialize the account here, - # but we move it to the factory so we can - # populate the test account properly (and only once - # per session) +try: + sys.setrecursionlimit(10**7) +except Exception: + print "Error setting recursion limit" +try: + # Increase max stack size from 8MB to 256MB + resource.setrlimit(resource.RLIMIT_STACK, (2**28, -1)) +except Exception: + print "Error setting stack size" - # theAccount = SoledadBackedAccount( - # user, soledad=soledad) +###################################################### - # --------------------------------- - # XXX pre-populate acct for tests!! - # populate_test_account(theAccount) - # --------------------------------- - #self.theAccount = theAccount +DO_MANHOLE = os.environ.get("LEAP_MAIL_MANHOLE", None) +if DO_MANHOLE: + from leap.mail.imap.service import manhole - def lineReceived(self, line): - """ - Attempt to parse a single line from the server. +DO_PROFILE = os.environ.get("LEAP_PROFILE", None) +if DO_PROFILE: + import cProfile + log.msg("Starting PROFILING...") - :param line: the line from the server, without the line delimiter. - :type line: str - """ - if "login" in line.lower(): - # avoid to log the pass, even though we are using a dummy auth - # by now. - msg = line[:7] + " [...]" - else: - msg = copy(line) - log.msg('rcv: %s' % msg) - imap4.IMAP4Server.lineReceived(self, line) - - def authenticateLogin(self, username, password): - """ - Lookup the account with the given parameters, and deny - the improper combinations. - - :param username: the username that is attempting authentication. - :type username: str - :param password: the password to authenticate with. - :type password: str - """ - # XXX this should use portal: - # return portal.login(cred.credentials.UsernamePassword(user, pass) - if username != self._userid: - # bad username, reject. - raise cred.error.UnauthorizedLogin() - # any dummy password is allowed so far. use realm instead! - leap_events.signal(IMAP_CLIENT_LOGIN, "1") - return imap4.IAccount, self.theAccount, lambda: None + PROFILE_DAT = "/tmp/leap_mail_profile.pstats" + pr = cProfile.Profile() + pr.enable() class IMAPAuthRealm(object): @@ -148,13 +114,23 @@ class LeapIMAPFactory(ServerFactory): self._uuid = uuid self._userid = userid self._soledad = soledad + self._memstore = MemoryStore( + permanent_store=SoledadStore(soledad)) theAccount = SoledadBackedAccount( - uuid, soledad=soledad) + uuid, soledad=soledad, + memstore=self._memstore) self.theAccount = theAccount + # XXX how to pass the store along? + def buildProtocol(self, addr): - "Return a protocol suitable for the job." + """ + Return a protocol suitable for the job. + + :param addr: remote ip address + :type addr: str + """ imapProtocol = LeapIMAPServer( uuid=self._uuid, userid=self._userid, @@ -163,6 +139,42 @@ class LeapIMAPFactory(ServerFactory): imapProtocol.factory = self return imapProtocol + def doStop(self, cv=None): + """ + Stops imap service (fetcher, factory and port). + + :param cv: A condition variable to which we can signal when imap + indeed stops. + :type cv: threading.Condition + :return: a Deferred that stops and flushes the in memory store data to + disk in another thread. + :rtype: Deferred + """ + if DO_PROFILE: + log.msg("Stopping PROFILING") + pr.disable() + pr.dump_stats(PROFILE_DAT) + + ServerFactory.doStop(self) + + if cv is not None: + def _stop_imap_cb(): + logger.debug('Stopping in memory store.') + self._memstore.stop_and_flush() + while not self._memstore.producer.is_queue_empty(): + logger.debug('Waiting for queue to be empty.') + # TODO use a gatherResults over the new/dirty + # deferred list, + # as in memorystore's expunge() method. + time.sleep(1) + # notify that service has stopped + logger.debug('Notifying that service has stopped.') + cv.acquire() + cv.notify() + cv.release() + + return threads.deferToThread(_stop_imap_cb) + def run_service(*args, **kwargs): """ @@ -173,6 +185,11 @@ def run_service(*args, **kwargs): the reactor when starts listening, and the factory for the protocol. """ + from twisted.internet import reactor + # it looks like qtreactor does not honor this, + # but other reactors should. + reactor.suggestThreadPoolSize(20) + leap_assert(len(args) == 2) soledad, keymanager = args leap_assert_type(soledad, Soledad) @@ -182,21 +199,23 @@ def run_service(*args, **kwargs): check_period = kwargs.get('check_period', INCOMING_CHECK_PERIOD) userid = kwargs.get('userid', None) leap_check(userid is not None, "need an user id") + offline = kwargs.get('offline', False) uuid = soledad._get_uuid() factory = LeapIMAPFactory(uuid, userid, soledad) - from twisted.internet import reactor - try: tport = reactor.listenTCP(port, factory, interface="localhost") - fetcher = LeapIncomingMail( - keymanager, - soledad, - factory.theAccount, - check_period, - userid) + if not offline: + fetcher = LeapIncomingMail( + keymanager, + soledad, + factory.theAccount, + check_period, + userid) + else: + fetcher = None except CannotListenError: logger.error("IMAP Service failed to start: " "cannot listen in port %s" % (port,)) @@ -204,7 +223,17 @@ def run_service(*args, **kwargs): logger.error("Error launching IMAP service: %r" % (exc,)) else: # all good. - fetcher.start_loop() + # (the caller has still to call fetcher.start_loop) + + if DO_MANHOLE: + # TODO get pass from env var.too. + manhole_factory = manhole.getManholeFactory( + {'f': factory, + 'a': factory.theAccount, + 'gm': factory.theAccount.getMailbox}, + "boss", "leap") + reactor.listenTCP(manhole.MANHOLE_PORT, manhole_factory, + interface="127.0.0.1") logger.debug("IMAP4 Server is RUNNING in port %s" % (port,)) leap_events.signal(IMAP_SERVICE_STARTED, str(port)) return fetcher, tport, factory |