diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/leap/mail/adaptors/soledad_indexes.py | 5 | ||||
-rw-r--r-- | src/leap/mail/imap/account.py | 7 | ||||
-rw-r--r-- | src/leap/mail/imap/service/imap.py | 4 | ||||
-rw-r--r-- | src/leap/mail/incoming/service.py | 24 | ||||
-rw-r--r-- | src/leap/mail/mail.py | 33 | ||||
-rw-r--r-- | src/leap/mail/mailbox_indexer.py | 11 | ||||
-rw-r--r-- | src/leap/mail/plugins/__init__.py | 3 | ||||
-rw-r--r-- | src/leap/mail/plugins/soledad_sync_hooks.py | 19 | ||||
-rw-r--r-- | src/leap/mail/sync_hooks.py | 119 |
9 files changed, 194 insertions, 31 deletions
diff --git a/src/leap/mail/adaptors/soledad_indexes.py b/src/leap/mail/adaptors/soledad_indexes.py index d2f8b71..eec7d28 100644 --- a/src/leap/mail/adaptors/soledad_indexes.py +++ b/src/leap/mail/adaptors/soledad_indexes.py @@ -101,9 +101,6 @@ MAIL_INDEXES = { TYPE_MBOX_DEL_IDX: [TYPE, MBOX_UUID, 'bool(deleted)'], # incoming queue - JUST_MAIL_IDX: [INCOMING_KEY, + JUST_MAIL_IDX: ["bool(%s)" % (INCOMING_KEY,), "bool(%s)" % (ERROR_DECRYPTING_KEY,)], - - # the backward-compatible index, will be deprecated at 0.7 - JUST_MAIL_COMPAT_IDX: [INCOMING_KEY], } diff --git a/src/leap/mail/imap/account.py b/src/leap/mail/imap/account.py index ccb4b75..cc56fff 100644 --- a/src/leap/mail/imap/account.py +++ b/src/leap/mail/imap/account.py @@ -58,7 +58,6 @@ class IMAPAccount(object): implements(imap4.IAccount, imap4.INamespacePresenter) selected = None - session_ended = False def __init__(self, user_id, store, d=defer.Deferred()): """ @@ -98,7 +97,11 @@ class IMAPAccount(object): Right now it's called from the client backend. """ # TODO move its use to the service shutdown in leap.mail - self.session_ended = True + self.account.end_session() + + @property + def session_ended(self): + return self.account.session_ended def callWhenReady(self, cb, *args, **kw): """ diff --git a/src/leap/mail/imap/service/imap.py b/src/leap/mail/imap/service/imap.py index 370c513..92d05cc 100644 --- a/src/leap/mail/imap/service/imap.py +++ b/src/leap/mail/imap/service/imap.py @@ -32,10 +32,9 @@ from twisted.python import log logger = logging.getLogger(__name__) from leap.common.events import emit, catalog -from leap.common.check import leap_assert_type, leap_check +from leap.common.check import leap_check from leap.mail.imap.account import IMAPAccount from leap.mail.imap.server import LEAPIMAPServer -from leap.soledad.client import Soledad DO_MANHOLE = os.environ.get("LEAP_MAIL_MANHOLE", None) @@ -91,7 +90,6 @@ class LeapIMAPFactory(ServerFactory): theAccount = IMAPAccount(uuid, soledad) self.theAccount = theAccount - self._connections = defaultdict() # XXX how to pass the store along? diff --git a/src/leap/mail/incoming/service.py b/src/leap/mail/incoming/service.py index be37396..3daf86b 100644 --- a/src/leap/mail/incoming/service.py +++ b/src/leap/mail/incoming/service.py @@ -161,21 +161,15 @@ class IncomingMail(Service): Calls a deferred that will execute the fetch callback in a separate thread """ - def mail_compat(failure): - if failure.check(u1db_errors.InvalidGlobbing): - # It looks like we are a dealing with an outdated - # mx. Fallback to the version of the index - warnings.warn("JUST_MAIL_COMPAT_IDX will be deprecated!", - DeprecationWarning) - return self._soledad.get_from_index( - fields.JUST_MAIL_COMPAT_IDX, "*") - return failure + def _sync_errback(failure): + log.err(failure) def syncSoledadCallback(_): + # XXX this should be moved to adaptors d = self._soledad.get_from_index( - fields.JUST_MAIL_IDX, "*", "0") - d.addErrback(mail_compat) + fields.JUST_MAIL_IDX, "1", "0") d.addCallback(self._process_doclist) + d.addErrback(_sync_errback) return d logger.debug("fetching mail for: %s %s" % ( @@ -212,8 +206,7 @@ class IncomingMail(Service): # synchronize incoming mail def _errback(self, failure): - logger.exception(failure.value) - traceback.print_exc() + log.err(failure) def _sync_soledad(self): """ @@ -246,7 +239,6 @@ class IncomingMail(Service): :returns: doclist :rtype: iterable """ - doclist = first(doclist) # gatherResults pass us a list if doclist: fetched_ts = time.mktime(time.gmtime()) num_mails = len(doclist) if doclist is not None else 0 @@ -305,7 +297,9 @@ class IncomingMail(Service): d.addCallback(self._extract_keys) d.addCallbacks(self._add_message_locally, self._errback) deferreds.append(d) - return defer.gatherResults(deferreds, consumeErrors=True) + d = defer.gatherResults(deferreds, consumeErrors=True) + d.addCallback(lambda _: doclist) + return d # # operations on individual messages diff --git a/src/leap/mail/mail.py b/src/leap/mail/mail.py index 1649d4a..8cb0b4a 100644 --- a/src/leap/mail/mail.py +++ b/src/leap/mail/mail.py @@ -34,6 +34,7 @@ from leap.mail.adaptors.soledad import SoledadMailAdaptor from leap.mail.constants import INBOX_NAME from leap.mail.constants import MessageFlags from leap.mail.mailbox_indexer import MailboxIndexer +from leap.mail.plugins import soledad_sync_hooks from leap.mail.utils import find_charset, CaseInsensitiveDict logger = logging.getLogger(name=__name__) @@ -42,8 +43,6 @@ logger = logging.getLogger(name=__name__) # TODO LIST # [ ] Probably change the name of this module to "api" or "account", mail is # too generic (there's also IncomingMail, and OutgoingMail -# [ ] Change the doc_ids scheme for part-docs: use mailbox UID validity -# identifier, instead of name! (renames are broken!) # [ ] Profile add_msg. def _get_mdoc_id(mbox, chash): @@ -616,8 +615,13 @@ class MessageCollection(object): return defer.succeed("mdoc_id not inserted") # XXX BUG ----------------------------------------- - return self.mbox_indexer.insert_doc( - self.mbox_uuid, doc_id) + # XXX BUG sometimes the table is not yet created, + # so workaround is to make sure we always check for it before + # inserting the doc. I should debug into the real cause. + d = self.mbox_indexer.create_table(self.mbox_uuid) + d.addCallback(lambda _: self.mbox_indexer.insert_doc( + self.mbox_uuid, doc_id)) + return d d = wrapper.create( self.store, @@ -804,10 +808,17 @@ class Account(object): self.adaptor = self.adaptor_class() self.mbox_indexer = MailboxIndexer(self.store) + # This flag is only used from the imap service for the moment. + # In the future, we should prevent any public method to continue if + # this is set to True. Also, it would be good to plug to the + # authentication layer. + self.session_ended = False + self.deferred_initialization = defer.Deferred() self._ready_cb = ready_cb self._init_d = self._initialize_storage() + self._initialize_sync_hooks() def _initialize_storage(self): @@ -836,6 +847,14 @@ class Account(object): self.deferred_initialization.addCallback(cb, *args, **kw) return self.deferred_initialization + # Sync hooks + + def _initialize_sync_hooks(self): + soledad_sync_hooks.post_sync_uid_reindexer.set_account(self) + + def _teardown_sync_hooks(self): + soledad_sync_hooks.post_sync_uid_reindexer.set_account(None) + # # Public API Starts # @@ -948,3 +967,9 @@ class Account(object): :rtype: MessageCollection """ raise NotImplementedError() + + # Session handling + + def end_session(self): + self._teardown_sync_hooks() + self.session_ended = True diff --git a/src/leap/mail/mailbox_indexer.py b/src/leap/mail/mailbox_indexer.py index 664d580..ab0967d 100644 --- a/src/leap/mail/mailbox_indexer.py +++ b/src/leap/mail/mailbox_indexer.py @@ -88,6 +88,10 @@ class MailboxIndexer(object): assert self.store is not None return self.store.raw_sqlcipher_query(*args, **kw) + def _operation(self, *args, **kw): + assert self.store is not None + return self.store.raw_sqlcipher_operation(*args, **kw) + def create_table(self, mailbox_uuid): """ Create the UID table for a given mailbox. @@ -100,7 +104,8 @@ class MailboxIndexer(object): "uid INTEGER PRIMARY KEY AUTOINCREMENT, " "hash TEXT UNIQUE NOT NULL)".format( preffix=self.table_preffix, name=sanitize(mailbox_uuid))) - return self._query(sql) + print "CREATING TABLE..." + return self._operation(sql) def delete_table(self, mailbox_uuid): """ @@ -112,7 +117,7 @@ class MailboxIndexer(object): check_good_uuid(mailbox_uuid) sql = ("DROP TABLE if exists {preffix}{name}".format( preffix=self.table_preffix, name=sanitize(mailbox_uuid))) - return self._query(sql) + return self._operation(sql) def insert_doc(self, mailbox_uuid, doc_id): """ @@ -149,7 +154,7 @@ class MailboxIndexer(object): "LIMIT 1;").format( preffix=self.table_preffix, name=sanitize(mailbox_uuid)) - d = self._query(sql, values) + d = self._operation(sql, values) d.addCallback(lambda _: self._query(sql_last)) d.addCallback(get_rowid) d.addErrback(lambda f: f.printTraceback()) diff --git a/src/leap/mail/plugins/__init__.py b/src/leap/mail/plugins/__init__.py new file mode 100644 index 0000000..ddb8691 --- /dev/null +++ b/src/leap/mail/plugins/__init__.py @@ -0,0 +1,3 @@ +from twisted.plugin import pluginPackagePaths +__path__.extend(pluginPackagePaths(__name__)) +__all__ = [] diff --git a/src/leap/mail/plugins/soledad_sync_hooks.py b/src/leap/mail/plugins/soledad_sync_hooks.py new file mode 100644 index 0000000..9d48126 --- /dev/null +++ b/src/leap/mail/plugins/soledad_sync_hooks.py @@ -0,0 +1,19 @@ +# -*- coding: utf-8 -*- +# soledad_sync_hooks.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +from leap.mail.sync_hooks import MailProcessingPostSyncHook +post_sync_uid_reindexer = MailProcessingPostSyncHook() diff --git a/src/leap/mail/sync_hooks.py b/src/leap/mail/sync_hooks.py new file mode 100644 index 0000000..a8a69c9 --- /dev/null +++ b/src/leap/mail/sync_hooks.py @@ -0,0 +1,119 @@ +# -*- coding: utf-8 -*- +# sync_hooks.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Soledad PostSync Hooks. + +Process every new document of interest after every soledad synchronization, +using the hooks that soledad exposes via plugins. +""" +import logging + +from re import compile as regex_compile + +from zope.interface import implements +from twisted.internet import defer +from twisted.plugin import IPlugin +from twisted.python import log + +from leap.soledad.client.interfaces import ISoledadPostSyncPlugin +from leap.mail import constants + + +logger = logging.getLogger(__name__) + +_get_doc_type_preffix = lambda s: s[:2] + + +class MailProcessingPostSyncHook(object): + implements(IPlugin, ISoledadPostSyncPlugin) + + META_DOC_PREFFIX = _get_doc_type_preffix(constants.METAMSGID) + watched_doc_types = (META_DOC_PREFFIX, ) + + _account = None + _pending_docs = [] + _processing_deferreds = [] + + def process_received_docs(self, doc_id_list): + if self._has_configured_account(): + process_fun = self._make_uid_index + else: + self._processing_deferreds = [] + process_fun = self._queue_doc_id + + for doc_id in doc_id_list: + if _get_doc_type_preffix(doc_id) in self.watched_doc_types: + log.msg("Mail post-sync hook: processing %s" % doc_id) + process_fun(doc_id) + + return defer.gatherResults(self._processing_deferreds) + + def set_account(self, account): + self._account = account + if account: + self._process_queued_docs() + + def _has_configured_account(self): + return self._account is not None + + def _queue_doc_id(self, doc_id): + self._pending_docs.append(doc_id) + + def _make_uid_index(self, mdoc_id): + indexer = self._account.mbox_indexer + mbox_uuid = _get_mbox_uuid(mdoc_id) + if mbox_uuid: + chash = _get_chash_from_mdoc(mdoc_id) + logger.debug("Making index table for %s:%s" % (mbox_uuid, chash)) + index_docid = constants.METAMSGID.format( + mbox_uuid=mbox_uuid.replace('-', '_'), + chash=chash) + # XXX could avoid creating table if I track which ones I already + # have seen -- but make sure *it's already created* before + # inserting the index entry!. + d = indexer.create_table(mbox_uuid) + d.addCallback(lambda _: indexer.insert_doc(mbox_uuid, index_docid)) + self._processing_deferreds.append(d) + + def _process_queued_docs(self): + assert(self._has_configured_account()) + pending = self._pending_docs + log.msg("Mail post-sync hook: processing queued docs") + + def remove_pending_docs(res): + self._pending_docs = [] + return res + + d = self.process_received_docs(pending) + d.addCallback(remove_pending_docs) + return d + + +_mbox_uuid_regex = regex_compile(constants.METAMSGID_MBOX_RE) +_mdoc_chash_regex = regex_compile(constants.METAMSGID_CHASH_RE) + + +def _get_mbox_uuid(doc_id): + matches = _mbox_uuid_regex.findall(doc_id) + if matches: + return matches[0].replace('_', '-') + + +def _get_chash_from_mdoc(doc_id): + matches = _mdoc_chash_regex.findall(doc_id) + if matches: + return matches[0] |