From ece1f50d25dd9810fedd7498557dd2048fba2540 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Thu, 21 May 2015 12:47:27 -0400 Subject: [feature] post-sync mail processing hooks using the new soledad plugin capablity, mail hooks to the post-sync event by subscribing to the Meta-Doc type of documents. In this way, we can create the uid tables and the uid entries needed to keep local indexes for mail that has been processed in another instance. however, this won't prevent a conflict if a given mail is received and processed in two different instances. that is a problem that we still have to deal with. Resolves: #6996 Releases: 0.4.0 --- mail/src/leap/mail/imap/service/imap.py | 9 ++ mail/src/leap/mail/mail.py | 2 - mail/src/leap/mail/plugins/__init__.py | 3 + mail/src/leap/mail/plugins/soledad_sync_hooks.py | 19 ++++ mail/src/leap/mail/sync_hooks.py | 121 +++++++++++++++++++++++ 5 files changed, 152 insertions(+), 2 deletions(-) create mode 100644 mail/src/leap/mail/plugins/__init__.py create mode 100644 mail/src/leap/mail/plugins/soledad_sync_hooks.py create mode 100644 mail/src/leap/mail/sync_hooks.py (limited to 'mail/src') diff --git a/mail/src/leap/mail/imap/service/imap.py b/mail/src/leap/mail/imap/service/imap.py index 370c513..e401283 100644 --- a/mail/src/leap/mail/imap/service/imap.py +++ b/mail/src/leap/mail/imap/service/imap.py @@ -35,6 +35,7 @@ from leap.common.events import emit, catalog from leap.common.check import leap_assert_type, leap_check from leap.mail.imap.account import IMAPAccount from leap.mail.imap.server import LEAPIMAPServer +from leap.mail.plugins import soledad_sync_hooks from leap.soledad.client import Soledad @@ -91,10 +92,17 @@ class LeapIMAPFactory(ServerFactory): theAccount = IMAPAccount(uuid, soledad) self.theAccount = theAccount + self._initialize_sync_hooks() self._connections = defaultdict() # XXX how to pass the store along? + def _initialize_sync_hooks(self): + soledad_sync_hooks.post_sync_uid_reindexer.set_account(self.theAccount) + + def _teardown_sync_hooks(self): + soledad_sync_hooks.post_sync_uid_reindexer.set_account(None) + def buildProtocol(self, addr): """ Return a protocol suitable for the job. @@ -128,6 +136,7 @@ class LeapIMAPFactory(ServerFactory): # mark account as unusable, so any imap command will fail # with unauth state. self.theAccount.end_session() + self._teardown_sync_hooks() # TODO should wait for all the pending deferreds, # the twisted way! diff --git a/mail/src/leap/mail/mail.py b/mail/src/leap/mail/mail.py index 1649d4a..bab73cb 100644 --- a/mail/src/leap/mail/mail.py +++ b/mail/src/leap/mail/mail.py @@ -42,8 +42,6 @@ logger = logging.getLogger(name=__name__) # TODO LIST # [ ] Probably change the name of this module to "api" or "account", mail is # too generic (there's also IncomingMail, and OutgoingMail -# [ ] Change the doc_ids scheme for part-docs: use mailbox UID validity -# identifier, instead of name! (renames are broken!) # [ ] Profile add_msg. def _get_mdoc_id(mbox, chash): diff --git a/mail/src/leap/mail/plugins/__init__.py b/mail/src/leap/mail/plugins/__init__.py new file mode 100644 index 0000000..ddb8691 --- /dev/null +++ b/mail/src/leap/mail/plugins/__init__.py @@ -0,0 +1,3 @@ +from twisted.plugin import pluginPackagePaths +__path__.extend(pluginPackagePaths(__name__)) +__all__ = [] diff --git a/mail/src/leap/mail/plugins/soledad_sync_hooks.py b/mail/src/leap/mail/plugins/soledad_sync_hooks.py new file mode 100644 index 0000000..9d48126 --- /dev/null +++ b/mail/src/leap/mail/plugins/soledad_sync_hooks.py @@ -0,0 +1,19 @@ +# -*- coding: utf-8 -*- +# soledad_sync_hooks.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from leap.mail.sync_hooks import MailProcessingPostSyncHook +post_sync_uid_reindexer = MailProcessingPostSyncHook() diff --git a/mail/src/leap/mail/sync_hooks.py b/mail/src/leap/mail/sync_hooks.py new file mode 100644 index 0000000..b5bded5 --- /dev/null +++ b/mail/src/leap/mail/sync_hooks.py @@ -0,0 +1,121 @@ +# -*- coding: utf-8 -*- +# sync_hooks.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +""" +Soledad PostSync Hooks. + +Process every new document of interest after every soledad synchronization, +using the hooks that soledad exposes via plugins. +""" +import logging + +from re import compile as regex_compile + +from zope.interface import implements +from twisted.internet import defer +from twisted.plugin import IPlugin +from twisted.python import log + +from leap.soledad.client.interfaces import ISoledadPostSyncPlugin +from leap.mail import constants + + +logger = logging.getLogger(__name__) + +_get_doc_type_preffix = lambda s: s[:2] + + +class MailProcessingPostSyncHook(object): + implements(IPlugin, ISoledadPostSyncPlugin) + + META_DOC_PREFFIX = _get_doc_type_preffix(constants.METAMSGID) + watched_doc_types = (META_DOC_PREFFIX, ) + + _account = None + _pending_docs = [] + _processing_deferreds = [] + + def process_received_docs(self, doc_id_list): + if self._has_configured_account(): + process_fun = self._make_uid_index + else: + self._processing_deferreds = [] + process_fun = self._queue_doc_id + + for doc_id in doc_id_list: + if _get_doc_type_preffix(doc_id) in self.watched_doc_types: + log.msg("Mail post-sync hook: processing %s" % doc_id) + process_fun(doc_id) + + if self._processing_deferreds: + return defer.gatherResults(self._processing_deferreds) + + def set_account(self, account): + self._account = account + if account: + self._process_queued_docs() + + def _has_configured_account(self): + return self._account is not None + + def _queue_doc_id(self, doc_id): + self._pending_docs.append(doc_id) + + def _make_uid_index(self, mdoc_id): + indexer = self._account.account.mbox_indexer + mbox_uuid = _get_mbox_uuid(mdoc_id) + if mbox_uuid: + chash = _get_chash_from_mdoc(mdoc_id) + logger.debug("Making index table for %s:%s" % (mbox_uuid, chash)) + index_docid = constants.METAMSGID.format( + mbox_uuid=mbox_uuid.replace('-', '_'), + chash=chash) + # XXX could avoid creating table if I track which ones I already + # have seen -- but make sure *it's already created* before + # inserting the index entry!. + d = indexer.create_table(mbox_uuid) + d.addCallback(lambda _: indexer.insert_doc(mbox_uuid, index_docid)) + self._processing_deferreds.append(d) + + def _process_queued_docs(self): + assert(self._has_configured_account()) + pending = self._pending_docs + log.msg("Mail post-sync hook: processing queued docs") + + def remove_pending_docs(res): + self._pending_docs = [] + return res + + d = self.process_received_docs(pending) + if d: + d.addCallback(remove_pending_docs) + return d + + +_mbox_uuid_regex = regex_compile(constants.METAMSGID_MBOX_RE) +_mdoc_chash_regex = regex_compile(constants.METAMSGID_CHASH_RE) + + +def _get_mbox_uuid(doc_id): + matches = _mbox_uuid_regex.findall(doc_id) + if matches: + return matches[0].replace('_', '-') + + +def _get_chash_from_mdoc(doc_id): + matches = _mdoc_chash_regex.findall(doc_id) + if matches: + return matches[0] -- cgit v1.2.3 From 16dedad1a467693aebf3c8f2a100bf8c8bef9ae3 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Thu, 28 May 2015 19:03:24 -0400 Subject: [refactor] move hooks to account --- mail/src/leap/mail/imap/account.py | 7 +++++-- mail/src/leap/mail/imap/service/imap.py | 13 +------------ mail/src/leap/mail/incoming/service.py | 1 + mail/src/leap/mail/mail.py | 22 ++++++++++++++++++++++ mail/src/leap/mail/sync_hooks.py | 2 +- 5 files changed, 30 insertions(+), 15 deletions(-) (limited to 'mail/src') diff --git a/mail/src/leap/mail/imap/account.py b/mail/src/leap/mail/imap/account.py index ccb4b75..cc56fff 100644 --- a/mail/src/leap/mail/imap/account.py +++ b/mail/src/leap/mail/imap/account.py @@ -58,7 +58,6 @@ class IMAPAccount(object): implements(imap4.IAccount, imap4.INamespacePresenter) selected = None - session_ended = False def __init__(self, user_id, store, d=defer.Deferred()): """ @@ -98,7 +97,11 @@ class IMAPAccount(object): Right now it's called from the client backend. """ # TODO move its use to the service shutdown in leap.mail - self.session_ended = True + self.account.end_session() + + @property + def session_ended(self): + return self.account.session_ended def callWhenReady(self, cb, *args, **kw): """ diff --git a/mail/src/leap/mail/imap/service/imap.py b/mail/src/leap/mail/imap/service/imap.py index e401283..92d05cc 100644 --- a/mail/src/leap/mail/imap/service/imap.py +++ b/mail/src/leap/mail/imap/service/imap.py @@ -32,11 +32,9 @@ from twisted.python import log logger = logging.getLogger(__name__) from leap.common.events import emit, catalog -from leap.common.check import leap_assert_type, leap_check +from leap.common.check import leap_check from leap.mail.imap.account import IMAPAccount from leap.mail.imap.server import LEAPIMAPServer -from leap.mail.plugins import soledad_sync_hooks -from leap.soledad.client import Soledad DO_MANHOLE = os.environ.get("LEAP_MAIL_MANHOLE", None) @@ -92,17 +90,9 @@ class LeapIMAPFactory(ServerFactory): theAccount = IMAPAccount(uuid, soledad) self.theAccount = theAccount - self._initialize_sync_hooks() - self._connections = defaultdict() # XXX how to pass the store along? - def _initialize_sync_hooks(self): - soledad_sync_hooks.post_sync_uid_reindexer.set_account(self.theAccount) - - def _teardown_sync_hooks(self): - soledad_sync_hooks.post_sync_uid_reindexer.set_account(None) - def buildProtocol(self, addr): """ Return a protocol suitable for the job. @@ -136,7 +126,6 @@ class LeapIMAPFactory(ServerFactory): # mark account as unusable, so any imap command will fail # with unauth state. self.theAccount.end_session() - self._teardown_sync_hooks() # TODO should wait for all the pending deferreds, # the twisted way! diff --git a/mail/src/leap/mail/incoming/service.py b/mail/src/leap/mail/incoming/service.py index be37396..23aff3d 100644 --- a/mail/src/leap/mail/incoming/service.py +++ b/mail/src/leap/mail/incoming/service.py @@ -246,6 +246,7 @@ class IncomingMail(Service): :returns: doclist :rtype: iterable """ + # FIXME WTF len(doclist) is 69? doclist = first(doclist) # gatherResults pass us a list if doclist: fetched_ts = time.mktime(time.gmtime()) diff --git a/mail/src/leap/mail/mail.py b/mail/src/leap/mail/mail.py index bab73cb..fe8226e 100644 --- a/mail/src/leap/mail/mail.py +++ b/mail/src/leap/mail/mail.py @@ -34,6 +34,7 @@ from leap.mail.adaptors.soledad import SoledadMailAdaptor from leap.mail.constants import INBOX_NAME from leap.mail.constants import MessageFlags from leap.mail.mailbox_indexer import MailboxIndexer +from leap.mail.plugins import soledad_sync_hooks from leap.mail.utils import find_charset, CaseInsensitiveDict logger = logging.getLogger(name=__name__) @@ -802,10 +803,17 @@ class Account(object): self.adaptor = self.adaptor_class() self.mbox_indexer = MailboxIndexer(self.store) + # This flag is only used from the imap service for the moment. + # In the future, we should prevent any public method to continue if + # this is set to True. Also, it would be good to plug to the + # authentication layer. + self.session_ended = False + self.deferred_initialization = defer.Deferred() self._ready_cb = ready_cb self._init_d = self._initialize_storage() + self._initialize_sync_hooks() def _initialize_storage(self): @@ -834,6 +842,14 @@ class Account(object): self.deferred_initialization.addCallback(cb, *args, **kw) return self.deferred_initialization + # Sync hooks + + def _initialize_sync_hooks(self): + soledad_sync_hooks.post_sync_uid_reindexer.set_account(self) + + def _teardown_sync_hooks(self): + soledad_sync_hooks.post_sync_uid_reindexer.set_account(None) + # # Public API Starts # @@ -946,3 +962,9 @@ class Account(object): :rtype: MessageCollection """ raise NotImplementedError() + + # Session handling + + def end_session(self): + self._teardown_sync_hooks() + self.session_ended = True diff --git a/mail/src/leap/mail/sync_hooks.py b/mail/src/leap/mail/sync_hooks.py index b5bded5..3cf858b 100644 --- a/mail/src/leap/mail/sync_hooks.py +++ b/mail/src/leap/mail/sync_hooks.py @@ -75,7 +75,7 @@ class MailProcessingPostSyncHook(object): self._pending_docs.append(doc_id) def _make_uid_index(self, mdoc_id): - indexer = self._account.account.mbox_indexer + indexer = self._account.mbox_indexer mbox_uuid = _get_mbox_uuid(mdoc_id) if mbox_uuid: chash = _get_chash_from_mdoc(mdoc_id) -- cgit v1.2.3 From ddd67ecc07c10f0b48e9f14839c1a8a172d87f1c Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Thu, 4 Jun 2015 10:22:34 -0400 Subject: [bug] prevent missing uid table exception --- mail/src/leap/mail/mail.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) (limited to 'mail/src') diff --git a/mail/src/leap/mail/mail.py b/mail/src/leap/mail/mail.py index fe8226e..8cb0b4a 100644 --- a/mail/src/leap/mail/mail.py +++ b/mail/src/leap/mail/mail.py @@ -615,8 +615,13 @@ class MessageCollection(object): return defer.succeed("mdoc_id not inserted") # XXX BUG ----------------------------------------- - return self.mbox_indexer.insert_doc( - self.mbox_uuid, doc_id) + # XXX BUG sometimes the table is not yet created, + # so workaround is to make sure we always check for it before + # inserting the doc. I should debug into the real cause. + d = self.mbox_indexer.create_table(self.mbox_uuid) + d.addCallback(lambda _: self.mbox_indexer.insert_doc( + self.mbox_uuid, doc_id)) + return d d = wrapper.create( self.store, -- cgit v1.2.3 From 0e868109cf842dc60466a0b6b5b86311246a013e Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Thu, 4 Jun 2015 10:23:07 -0400 Subject: [feature] use operation, doesn't return result --- mail/src/leap/mail/mailbox_indexer.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) (limited to 'mail/src') diff --git a/mail/src/leap/mail/mailbox_indexer.py b/mail/src/leap/mail/mailbox_indexer.py index 664d580..ab0967d 100644 --- a/mail/src/leap/mail/mailbox_indexer.py +++ b/mail/src/leap/mail/mailbox_indexer.py @@ -88,6 +88,10 @@ class MailboxIndexer(object): assert self.store is not None return self.store.raw_sqlcipher_query(*args, **kw) + def _operation(self, *args, **kw): + assert self.store is not None + return self.store.raw_sqlcipher_operation(*args, **kw) + def create_table(self, mailbox_uuid): """ Create the UID table for a given mailbox. @@ -100,7 +104,8 @@ class MailboxIndexer(object): "uid INTEGER PRIMARY KEY AUTOINCREMENT, " "hash TEXT UNIQUE NOT NULL)".format( preffix=self.table_preffix, name=sanitize(mailbox_uuid))) - return self._query(sql) + print "CREATING TABLE..." + return self._operation(sql) def delete_table(self, mailbox_uuid): """ @@ -112,7 +117,7 @@ class MailboxIndexer(object): check_good_uuid(mailbox_uuid) sql = ("DROP TABLE if exists {preffix}{name}".format( preffix=self.table_preffix, name=sanitize(mailbox_uuid))) - return self._query(sql) + return self._operation(sql) def insert_doc(self, mailbox_uuid, doc_id): """ @@ -149,7 +154,7 @@ class MailboxIndexer(object): "LIMIT 1;").format( preffix=self.table_preffix, name=sanitize(mailbox_uuid)) - d = self._query(sql, values) + d = self._operation(sql, values) d.addCallback(lambda _: self._query(sql_last)) d.addCallback(get_rowid) d.addErrback(lambda f: f.printTraceback()) -- cgit v1.2.3 From 037bb4569d4014c85e91d1ecd23b3232c2157575 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Thu, 4 Jun 2015 20:08:46 -0400 Subject: [refactor] deprecate old incoming index --- mail/src/leap/mail/adaptors/soledad_indexes.py | 5 +---- mail/src/leap/mail/incoming/service.py | 16 +++++----------- 2 files changed, 6 insertions(+), 15 deletions(-) (limited to 'mail/src') diff --git a/mail/src/leap/mail/adaptors/soledad_indexes.py b/mail/src/leap/mail/adaptors/soledad_indexes.py index d2f8b71..eec7d28 100644 --- a/mail/src/leap/mail/adaptors/soledad_indexes.py +++ b/mail/src/leap/mail/adaptors/soledad_indexes.py @@ -101,9 +101,6 @@ MAIL_INDEXES = { TYPE_MBOX_DEL_IDX: [TYPE, MBOX_UUID, 'bool(deleted)'], # incoming queue - JUST_MAIL_IDX: [INCOMING_KEY, + JUST_MAIL_IDX: ["bool(%s)" % (INCOMING_KEY,), "bool(%s)" % (ERROR_DECRYPTING_KEY,)], - - # the backward-compatible index, will be deprecated at 0.7 - JUST_MAIL_COMPAT_IDX: [INCOMING_KEY], } diff --git a/mail/src/leap/mail/incoming/service.py b/mail/src/leap/mail/incoming/service.py index 23aff3d..71edf08 100644 --- a/mail/src/leap/mail/incoming/service.py +++ b/mail/src/leap/mail/incoming/service.py @@ -161,21 +161,15 @@ class IncomingMail(Service): Calls a deferred that will execute the fetch callback in a separate thread """ - def mail_compat(failure): - if failure.check(u1db_errors.InvalidGlobbing): - # It looks like we are a dealing with an outdated - # mx. Fallback to the version of the index - warnings.warn("JUST_MAIL_COMPAT_IDX will be deprecated!", - DeprecationWarning) - return self._soledad.get_from_index( - fields.JUST_MAIL_COMPAT_IDX, "*") - return failure + def _sync_errback(failure): + failure.printTraceback() def syncSoledadCallback(_): + # XXX this should be moved to adaptors d = self._soledad.get_from_index( - fields.JUST_MAIL_IDX, "*", "0") - d.addErrback(mail_compat) + fields.JUST_MAIL_IDX, "1", "0") d.addCallback(self._process_doclist) + d.addErrback(_sync_errback) return d logger.debug("fetching mail for: %s %s" % ( -- cgit v1.2.3 From fc0e04a5abe379a82ad54796d630e237b4800dbe Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Tue, 9 Jun 2015 11:32:33 -0400 Subject: [refactor] log failure properly --- mail/src/leap/mail/incoming/service.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) (limited to 'mail/src') diff --git a/mail/src/leap/mail/incoming/service.py b/mail/src/leap/mail/incoming/service.py index 71edf08..4738dd4 100644 --- a/mail/src/leap/mail/incoming/service.py +++ b/mail/src/leap/mail/incoming/service.py @@ -162,7 +162,7 @@ class IncomingMail(Service): in a separate thread """ def _sync_errback(failure): - failure.printTraceback() + log.err(failure) def syncSoledadCallback(_): # XXX this should be moved to adaptors @@ -206,8 +206,7 @@ class IncomingMail(Service): # synchronize incoming mail def _errback(self, failure): - logger.exception(failure.value) - traceback.print_exc() + log.err(failure) def _sync_soledad(self): """ -- cgit v1.2.3 From 6f7a23aed1b813aef17504ae7f729453ac24b95e Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Tue, 9 Jun 2015 11:32:41 -0400 Subject: [bug] pass the doclist to the ui signal before, we were taking the length of a string, signalling an incorrect number to the ui. currently this event is not being used, just only logged. in the future the ui could probably might want to make use of this info to keep record of a separate counter (how many mails received in the last sync). --- mail/src/leap/mail/incoming/service.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'mail/src') diff --git a/mail/src/leap/mail/incoming/service.py b/mail/src/leap/mail/incoming/service.py index 4738dd4..3daf86b 100644 --- a/mail/src/leap/mail/incoming/service.py +++ b/mail/src/leap/mail/incoming/service.py @@ -239,8 +239,6 @@ class IncomingMail(Service): :returns: doclist :rtype: iterable """ - # FIXME WTF len(doclist) is 69? - doclist = first(doclist) # gatherResults pass us a list if doclist: fetched_ts = time.mktime(time.gmtime()) num_mails = len(doclist) if doclist is not None else 0 @@ -299,7 +297,9 @@ class IncomingMail(Service): d.addCallback(self._extract_keys) d.addCallbacks(self._add_message_locally, self._errback) deferreds.append(d) - return defer.gatherResults(deferreds, consumeErrors=True) + d = defer.gatherResults(deferreds, consumeErrors=True) + d.addCallback(lambda _: doclist) + return d # # operations on individual messages -- cgit v1.2.3 From cfd645a7f8478ba958aea5a56a04721e3df5aede Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Wed, 10 Jun 2015 10:49:34 -0400 Subject: [refactor] remove unneeded conditional --- mail/src/leap/mail/sync_hooks.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) (limited to 'mail/src') diff --git a/mail/src/leap/mail/sync_hooks.py b/mail/src/leap/mail/sync_hooks.py index 3cf858b..a8a69c9 100644 --- a/mail/src/leap/mail/sync_hooks.py +++ b/mail/src/leap/mail/sync_hooks.py @@ -60,8 +60,7 @@ class MailProcessingPostSyncHook(object): log.msg("Mail post-sync hook: processing %s" % doc_id) process_fun(doc_id) - if self._processing_deferreds: - return defer.gatherResults(self._processing_deferreds) + return defer.gatherResults(self._processing_deferreds) def set_account(self, account): self._account = account @@ -100,9 +99,8 @@ class MailProcessingPostSyncHook(object): return res d = self.process_received_docs(pending) - if d: - d.addCallback(remove_pending_docs) - return d + d.addCallback(remove_pending_docs) + return d _mbox_uuid_regex = regex_compile(constants.METAMSGID_MBOX_RE) -- cgit v1.2.3