diff options
Diffstat (limited to 'service/pixelated/maintenance.py')
-rw-r--r-- | service/pixelated/maintenance.py | 143 |
1 files changed, 84 insertions, 59 deletions
diff --git a/service/pixelated/maintenance.py b/service/pixelated/maintenance.py index 7170055c..f011658d 100644 --- a/service/pixelated/maintenance.py +++ b/service/pixelated/maintenance.py @@ -14,64 +14,81 @@ # You should have received a copy of the GNU Affero General Public License # along with Pixelated. If not, see <http://www.gnu.org/licenses/>. +import logging from mailbox import Maildir from twisted.internet import reactor, defer from twisted.internet.threads import deferToThread +from pixelated.adapter.mailstore.maintenance import SoledadMaintenance from pixelated.config.leap import initialize_leap from pixelated.config import logger, arguments -from leap.mail.imap.fields import WithMsgFields -import time +from leap.mail.constants import MessageFlags + + +REPAIR_COMMAND = 'repair' def initialize(): - import time args = arguments.parse_maintenance_args() logger.init(debug=args.debug) - leap_session = initialize_leap( - args.leap_provider_cert, - args.leap_provider_cert_fingerprint, - args.credentials_file, - organization_mode=False, - leap_home=args.leap_home) + @defer.inlineCallbacks + def _run(): + leap_session = yield initialize_leap( + args.leap_provider_cert, + args.leap_provider_cert_fingerprint, + args.credentials_file, + organization_mode=False, + leap_home=args.leap_home, + initial_sync=_do_initial_sync(args)) - execute_command = create_execute_command(args, leap_session) + execute_command(args, leap_session) - reactor.callWhenRunning(execute_command) + reactor.callWhenRunning(_run) reactor.run() -def create_execute_command(args, leap_session): - def execute_command(): +def _do_initial_sync(args): + return not _is_repair_command(args) + + +def _is_repair_command(args): + return args.command == REPAIR_COMMAND + + +def execute_command(args, leap_session): - def init_soledad(): - return leap_session + def init_soledad(): + return leap_session - def get_soledad_handle(leap_session): - soledad = leap_session.soledad_session.soledad + def get_soledad_handle(leap_session): + soledad = leap_session.soledad_session.soledad - return leap_session, soledad + return leap_session, soledad - def soledad_sync(args): - leap_session, soledad = args + @defer.inlineCallbacks + def soledad_sync(args): + leap_session, soledad = args + log = logging.getLogger('some logger') - soledad.sync() + log.warn('Before sync') - return args + yield soledad.sync() - tearDown = defer.Deferred() + log.warn('after sync') - prepare = deferToThread(init_soledad) - prepare.addCallback(get_soledad_handle) - prepare.addCallback(soledad_sync) - add_command_callback(args, prepare, tearDown) - tearDown.addCallback(soledad_sync) - tearDown.addCallback(shutdown) - tearDown.addErrback(shutdown_on_error) + defer.returnValue(args) - return execute_command + tearDown = defer.Deferred() + + prepare = deferToThread(init_soledad) + prepare.addCallback(get_soledad_handle) + prepare.addCallback(soledad_sync) + add_command_callback(args, prepare, tearDown) + tearDown.addCallback(soledad_sync) + tearDown.addCallback(shutdown) + tearDown.addErrback(shutdown_on_error) def add_command_callback(args, prepareDeferred, finalizeDeferred): @@ -87,6 +104,9 @@ def add_command_callback(args, prepareDeferred, finalizeDeferred): elif args.command == 'sync': # nothing to do here, sync is already part of the chain prepareDeferred.chainDeferred(finalizeDeferred) + elif args.command == REPAIR_COMMAND: + prepareDeferred.addCallback(repair) + prepareDeferred.chainDeferred(finalizeDeferred) else: print 'Unsupported command: %s' % args.command prepareDeferred.chainDeferred(finalizeDeferred) @@ -94,90 +114,95 @@ def add_command_callback(args, prepareDeferred, finalizeDeferred): return finalizeDeferred +@defer.inlineCallbacks def delete_all_mails(args): leap_session, soledad = args - generation, docs = soledad.get_all_docs() + generation, docs = yield soledad.get_all_docs() for doc in docs: if doc.content.get('type', None) in ['head', 'cnt', 'flags']: soledad.delete_doc(doc) - return args + defer.returnValue(args) def is_keep_file(mail): return mail['subject'] is None -def add_mail_folder(account, maildir, folder_name, deferreds): - if folder_name not in account.mailboxes: - account.addMailbox(folder_name) +@defer.inlineCallbacks +def add_mail_folder(store, maildir, folder_name, deferreds): + yield store.add_mailbox(folder_name) - mbx = account.getMailbox(folder_name) for mail in maildir: if is_keep_file(mail): continue - flags = (WithMsgFields.RECENT_FLAG,) if mail.get_subdir() == 'new' else () + flags = (MessageFlags.RECENT_FLAG,) if mail.get_subdir() == 'new' else () if 'S' in mail.get_flags(): - flags = (WithMsgFields.SEEN_FLAG,) + flags + flags = (MessageFlags.SEEN_FLAG,) + flags if 'R' in mail.get_flags(): - flags = (WithMsgFields.ANSWERED_FLAG,) + flags + flags = (MessageFlags.ANSWERED_FLAG,) + flags - deferreds.append(mbx.addMessage(mail.as_string(), flags=flags, notify_on_disk=False)) + deferreds.append(store.add_mail(folder_name, mail.as_string())) + # FIXME support flags @defer.inlineCallbacks def load_mails(args, mail_paths): leap_session, soledad = args - account = leap_session.account + store = leap_session.mail_store deferreds = [] for path in mail_paths: maildir = Maildir(path, factory=None) - add_mail_folder(account, maildir, 'INBOX', deferreds) + yield add_mail_folder(store, maildir, 'INBOX', deferreds) for mail_folder_name in maildir.list_folders(): mail_folder = maildir.get_folder(mail_folder_name) - add_mail_folder(account, mail_folder, mail_folder_name, deferreds) + yield add_mail_folder(store, mail_folder, mail_folder_name, deferreds) + + yield defer.gatherResults(deferreds, consumeErrors=True) - yield defer.DeferredList(deferreds) defer.returnValue(args) def flush_to_soledad(args, finalize): leap_session, soledad = args - account = leap_session.account - memstore = account._memstore - permanent_store = memstore._permanent_store - - d = memstore.write_messages(permanent_store) - def check_flushed(args): - if memstore.is_writing: - reactor.callLater(1, check_flushed, args) - else: - finalize.callback((leap_session, soledad)) + def after_sync(_): + finalize.callback((leap_session, soledad)) - d.addCallback(check_flushed) + d = soledad.sync() + d.addCallback(after_sync) return args +@defer.inlineCallbacks def dump_soledad(args): leap_session, soledad = args - generation, docs = soledad.get_all_docs() + generation, docs = yield soledad.get_all_docs() for doc in docs: print doc print '\n' - return args + defer.returnValue(args) + + +@defer.inlineCallbacks +def repair(args): + leap_session, soledad = args + + yield SoledadMaintenance(soledad).repair() + + defer.returnValue(args) def shutdown(args): - time.sleep(30) + # time.sleep(30) reactor.stop() |