summaryrefslogtreecommitdiff
path: root/src/leap/mail/imap/service/imap.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/leap/mail/imap/service/imap.py')
-rw-r--r--src/leap/mail/imap/service/imap.py185
1 files changed, 107 insertions, 78 deletions
diff --git a/src/leap/mail/imap/service/imap.py b/src/leap/mail/imap/service/imap.py
index 8756ddc..10ba32a 100644
--- a/src/leap/mail/imap/service/imap.py
+++ b/src/leap/mail/imap/service/imap.py
@@ -17,23 +17,26 @@
"""
Imap service initialization
"""
-from copy import copy
-
import logging
+import os
+import time
+from twisted.internet import defer, threads
from twisted.internet.protocol import ServerFactory
from twisted.internet.error import CannotListenError
from twisted.mail import imap4
from twisted.python import log
-from twisted import cred
logger = logging.getLogger(__name__)
from leap.common import events as leap_events
from leap.common.check import leap_assert, leap_assert_type, leap_check
from leap.keymanager import KeyManager
-from leap.mail.imap.server import SoledadBackedAccount
+from leap.mail.imap.account import SoledadBackedAccount
from leap.mail.imap.fetch import LeapIncomingMail
+from leap.mail.imap.memorystore import MemoryStore
+from leap.mail.imap.server import LeapIMAPServer
+from leap.mail.imap.soledadstore import SoledadStore
from leap.soledad.client import Soledad
# The default port in which imap service will run
@@ -45,75 +48,38 @@ INCOMING_CHECK_PERIOD = 60
from leap.common.events.events_pb2 import IMAP_SERVICE_STARTED
from leap.common.events.events_pb2 import IMAP_SERVICE_FAILED_TO_START
-from leap.common.events.events_pb2 import IMAP_CLIENT_LOGIN
-
-
-class LeapIMAPServer(imap4.IMAP4Server):
- """
- An IMAP4 Server with mailboxes backed by soledad
- """
- def __init__(self, *args, **kwargs):
- # pop extraneous arguments
- soledad = kwargs.pop('soledad', None)
- uuid = kwargs.pop('uuid', None)
- userid = kwargs.pop('userid', None)
- leap_assert(soledad, "need a soledad instance")
- leap_assert_type(soledad, Soledad)
- leap_assert(uuid, "need a user in the initialization")
-
- self._userid = userid
- # initialize imap server!
- imap4.IMAP4Server.__init__(self, *args, **kwargs)
+######################################################
+# Temporary workaround for RecursionLimit when using
+# qt4reactor. Do remove when we move to poll or select
+# reactor, which do not show those problems. See #4974
+import resource
+import sys
- # we should initialize the account here,
- # but we move it to the factory so we can
- # populate the test account properly (and only once
- # per session)
+try:
+ sys.setrecursionlimit(10**7)
+except Exception:
+ print "Error setting recursion limit"
+try:
+ # Increase max stack size from 8MB to 256MB
+ resource.setrlimit(resource.RLIMIT_STACK, (2**28, -1))
+except Exception:
+ print "Error setting stack size"
- # theAccount = SoledadBackedAccount(
- # user, soledad=soledad)
+######################################################
- # ---------------------------------
- # XXX pre-populate acct for tests!!
- # populate_test_account(theAccount)
- # ---------------------------------
- #self.theAccount = theAccount
+DO_MANHOLE = os.environ.get("LEAP_MAIL_MANHOLE", None)
+if DO_MANHOLE:
+ from leap.mail.imap.service import manhole
- def lineReceived(self, line):
- """
- Attempt to parse a single line from the server.
+DO_PROFILE = os.environ.get("LEAP_PROFILE", None)
+if DO_PROFILE:
+ import cProfile
+ log.msg("Starting PROFILING...")
- :param line: the line from the server, without the line delimiter.
- :type line: str
- """
- if "login" in line.lower():
- # avoid to log the pass, even though we are using a dummy auth
- # by now.
- msg = line[:7] + " [...]"
- else:
- msg = copy(line)
- log.msg('rcv: %s' % msg)
- imap4.IMAP4Server.lineReceived(self, line)
-
- def authenticateLogin(self, username, password):
- """
- Lookup the account with the given parameters, and deny
- the improper combinations.
-
- :param username: the username that is attempting authentication.
- :type username: str
- :param password: the password to authenticate with.
- :type password: str
- """
- # XXX this should use portal:
- # return portal.login(cred.credentials.UsernamePassword(user, pass)
- if username != self._userid:
- # bad username, reject.
- raise cred.error.UnauthorizedLogin()
- # any dummy password is allowed so far. use realm instead!
- leap_events.signal(IMAP_CLIENT_LOGIN, "1")
- return imap4.IAccount, self.theAccount, lambda: None
+ PROFILE_DAT = "/tmp/leap_mail_profile.pstats"
+ pr = cProfile.Profile()
+ pr.enable()
class IMAPAuthRealm(object):
@@ -148,13 +114,23 @@ class LeapIMAPFactory(ServerFactory):
self._uuid = uuid
self._userid = userid
self._soledad = soledad
+ self._memstore = MemoryStore(
+ permanent_store=SoledadStore(soledad))
theAccount = SoledadBackedAccount(
- uuid, soledad=soledad)
+ uuid, soledad=soledad,
+ memstore=self._memstore)
self.theAccount = theAccount
+ # XXX how to pass the store along?
+
def buildProtocol(self, addr):
- "Return a protocol suitable for the job."
+ """
+ Return a protocol suitable for the job.
+
+ :param addr: remote ip address
+ :type addr: str
+ """
imapProtocol = LeapIMAPServer(
uuid=self._uuid,
userid=self._userid,
@@ -163,6 +139,42 @@ class LeapIMAPFactory(ServerFactory):
imapProtocol.factory = self
return imapProtocol
+ def doStop(self, cv=None):
+ """
+ Stops imap service (fetcher, factory and port).
+
+ :param cv: A condition variable to which we can signal when imap
+ indeed stops.
+ :type cv: threading.Condition
+ :return: a Deferred that stops and flushes the in memory store data to
+ disk in another thread.
+ :rtype: Deferred
+ """
+ if DO_PROFILE:
+ log.msg("Stopping PROFILING")
+ pr.disable()
+ pr.dump_stats(PROFILE_DAT)
+
+ ServerFactory.doStop(self)
+
+ if cv is not None:
+ def _stop_imap_cb():
+ logger.debug('Stopping in memory store.')
+ self._memstore.stop_and_flush()
+ while not self._memstore.producer.is_queue_empty():
+ logger.debug('Waiting for queue to be empty.')
+ # TODO use a gatherResults over the new/dirty
+ # deferred list,
+ # as in memorystore's expunge() method.
+ time.sleep(1)
+ # notify that service has stopped
+ logger.debug('Notifying that service has stopped.')
+ cv.acquire()
+ cv.notify()
+ cv.release()
+
+ return threads.deferToThread(_stop_imap_cb)
+
def run_service(*args, **kwargs):
"""
@@ -173,6 +185,11 @@ def run_service(*args, **kwargs):
the reactor when starts listening, and the factory for
the protocol.
"""
+ from twisted.internet import reactor
+ # it looks like qtreactor does not honor this,
+ # but other reactors should.
+ reactor.suggestThreadPoolSize(20)
+
leap_assert(len(args) == 2)
soledad, keymanager = args
leap_assert_type(soledad, Soledad)
@@ -182,21 +199,23 @@ def run_service(*args, **kwargs):
check_period = kwargs.get('check_period', INCOMING_CHECK_PERIOD)
userid = kwargs.get('userid', None)
leap_check(userid is not None, "need an user id")
+ offline = kwargs.get('offline', False)
uuid = soledad._get_uuid()
factory = LeapIMAPFactory(uuid, userid, soledad)
- from twisted.internet import reactor
-
try:
tport = reactor.listenTCP(port, factory,
interface="localhost")
- fetcher = LeapIncomingMail(
- keymanager,
- soledad,
- factory.theAccount,
- check_period,
- userid)
+ if not offline:
+ fetcher = LeapIncomingMail(
+ keymanager,
+ soledad,
+ factory.theAccount,
+ check_period,
+ userid)
+ else:
+ fetcher = None
except CannotListenError:
logger.error("IMAP Service failed to start: "
"cannot listen in port %s" % (port,))
@@ -204,7 +223,17 @@ def run_service(*args, **kwargs):
logger.error("Error launching IMAP service: %r" % (exc,))
else:
# all good.
- fetcher.start_loop()
+ # (the caller has still to call fetcher.start_loop)
+
+ if DO_MANHOLE:
+ # TODO get pass from env var.too.
+ manhole_factory = manhole.getManholeFactory(
+ {'f': factory,
+ 'a': factory.theAccount,
+ 'gm': factory.theAccount.getMailbox},
+ "boss", "leap")
+ reactor.listenTCP(manhole.MANHOLE_PORT, manhole_factory,
+ interface="127.0.0.1")
logger.debug("IMAP4 Server is RUNNING in port %s" % (port,))
leap_events.signal(IMAP_SERVICE_STARTED, str(port))
return fetcher, tport, factory