From e2218eec4fd91e4648160a05e3debc05efa0d0d9 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Thu, 23 Jan 2014 02:36:38 -0400 Subject: add soledadstore class move parts-related bits to messageparts pass soledad in initialization for memory messages --- src/leap/mail/imap/soledadstore.py | 237 +++++++++++++++++++++++++++++++++++++ 1 file changed, 237 insertions(+) create mode 100644 src/leap/mail/imap/soledadstore.py (limited to 'src/leap/mail/imap/soledadstore.py') diff --git a/src/leap/mail/imap/soledadstore.py b/src/leap/mail/imap/soledadstore.py new file mode 100644 index 0000000..62a3c53 --- /dev/null +++ b/src/leap/mail/imap/soledadstore.py @@ -0,0 +1,237 @@ +# -*- coding: utf-8 -*- +# soledadstore.py +# Copyright (C) 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +""" +A MessageStore that writes to Soledad. +""" +import logging + +from u1db import errors as u1db_errors +from zope.interface import implements + +from leap.mail.imap.messageparts import MessagePartType +from leap.mail.imap.fields import fields +from leap.mail.imap.interfaces import IMessageStore +from leap.mail.messageflow import IMessageConsumer + +logger = logging.getLogger(__name__) + + +class ContentDedup(object): + """ + Message deduplication. + + We do a query for the content hashes before writing to our beloved + sqlcipher backend of Soledad. This means, by now, that: + + 1. We will not store the same attachment twice, only the hash of it. + 2. We will not store the same message body twice, only the hash of it. + + The first case is useful if you are always receiving the same old memes + from unwary friends that still have not discovered that 4chan is the + generator of the internet. The second will save your day if you have + initiated session with the same account in two different machines. I also + wonder why would you do that, but let's respect each other choices, like + with the religious celebrations, and assume that one day we'll be able + to run Bitmask in completely free phones. Yes, I mean that, the whole GSM + Stack. + """ + + def _header_does_exist(self, doc): + """ + Check whether we already have a header document for this + content hash in our database. + + :param doc: tentative header document + :type doc: dict + :returns: True if it exists, False otherwise. + """ + if not doc: + return False + chash = doc[fields.CONTENT_HASH_KEY] + header_docs = self._soledad.get_from_index( + fields.TYPE_C_HASH_IDX, + fields.TYPE_HEADERS_VAL, str(chash)) + if not header_docs: + return False + + if len(header_docs) != 1: + logger.warning("Found more than one copy of chash %s!" + % (chash,)) + logger.debug("Found header doc with that hash! Skipping save!") + return True + + def _content_does_exist(self, doc): + """ + Check whether we already have a content document for a payload + with this hash in our database. + + :param doc: tentative content document + :type doc: dict + :returns: True if it exists, False otherwise. + """ + if not doc: + return False + phash = doc[fields.PAYLOAD_HASH_KEY] + attach_docs = self._soledad.get_from_index( + fields.TYPE_P_HASH_IDX, + fields.TYPE_CONTENT_VAL, str(phash)) + if not attach_docs: + return False + + if len(attach_docs) != 1: + logger.warning("Found more than one copy of phash %s!" + % (phash,)) + logger.debug("Found attachment doc with that hash! Skipping save!") + return True + + +class SoledadStore(ContentDedup): + """ + This will create docs in the local Soledad database. + """ + + implements(IMessageConsumer, IMessageStore) + + def __init__(self, soledad): + """ + Initialize the writer. + + :param soledad: the soledad instance + :type soledad: Soledad + """ + self._soledad = soledad + + # IMessageStore + + # ------------------------------------------------------------------- + # We are not yet using this interface, but it would make sense + # to implement it. + + def create_message(self, mbox, uid, message): + """ + Create the passed message into this SoledadStore. + + :param mbox: the mbox this message belongs. + :param uid: the UID that identifies this message in this mailbox. + :param message: a IMessageContainer implementor. + """ + + def put_message(self, mbox, uid, message): + """ + Put the passed existing message into this SoledadStore. + + :param mbox: the mbox this message belongs. + :param uid: the UID that identifies this message in this mailbox. + :param message: a IMessageContainer implementor. + """ + + def remove_message(self, mbox, uid): + """ + Remove the given message from this SoledadStore. + + :param mbox: the mbox this message belongs. + :param uid: the UID that identifies this message in this mailbox. + """ + + def get_message(self, mbox, uid): + """ + Get a IMessageContainer for the given mbox and uid combination. + + :param mbox: the mbox this message belongs. + :param uid: the UID that identifies this message in this mailbox. + """ + + # IMessageConsumer + + def consume(self, queue): + """ + Creates a new document in soledad db. + + :param queue: queue to get item from, with content of the document + to be inserted. + :type queue: Queue + """ + # TODO should delete the original message from incoming after + # the writes are done. + # TODO should handle the delete case + # TODO should handle errors + + empty = queue.empty() + while not empty: + for item, call in self._process(queue): + self._try_call(call, item) + empty = queue.empty() + + # + # SoledadStore specific methods. + # + + def _process(self, queue): + """ + Return the item and the proper call type for the next + item in the queue if any. + + :param queue: the queue from where we'll pick item. + :type queue: Queue + """ + msg_wrapper = queue.get() + return self._get_calls_for_msg_parts(msg_wrapper) + + def _try_call(self, call, item): + """ + Try to invoke a given call with item as a parameter. + """ + if not call: + return + try: + call(item) + except u1db_errors.RevisionConflict as exc: + logger.error("Error: %r" % (exc,)) + raise exc + + def _get_calls_for_msg_parts(self, msg_wrapper): + """ + Return the proper call type for a given item. + + :param msg_wrapper: A MessageWrapper + :type msg_wrapper: IMessageContainer + """ + call = None + + if msg_wrapper.new is True: + call = self._soledad.create_doc + + # item is expected to be a MessagePartDoc + for item in msg_wrapper.walk(): + if item.part == MessagePartType.fdoc: + yield dict(item.content), call + + if item.part == MessagePartType.hdoc: + if not self._header_does_exist(item.content): + yield dict(item.content), call + + if item.part == MessagePartType.cdoc: + if self._content_does_exist(item.content): + yield dict(item.content), call + + # TODO should check for elements with the dirty state + # TODO if new == False and dirty == True, put_doc + # XXX for puts, we will have to retrieve + # the document, change the content, and + # pass the whole document under "content" + else: + logger.error("Cannot put documents yet!") -- cgit v1.2.3 From ff28e22977db802c87f0b7be99e37c6de29183e9 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Thu, 23 Jan 2014 13:32:01 -0400 Subject: Unset new flag after successful write --- src/leap/mail/imap/soledadstore.py | 84 +++++++++++++++++++++++++++++++------- 1 file changed, 70 insertions(+), 14 deletions(-) (limited to 'src/leap/mail/imap/soledadstore.py') diff --git a/src/leap/mail/imap/soledadstore.py b/src/leap/mail/imap/soledadstore.py index 62a3c53..d36acae 100644 --- a/src/leap/mail/imap/soledadstore.py +++ b/src/leap/mail/imap/soledadstore.py @@ -19,6 +19,8 @@ A MessageStore that writes to Soledad. """ import logging +from itertools import chain + from u1db import errors as u1db_errors from zope.interface import implements @@ -30,6 +32,13 @@ from leap.mail.messageflow import IMessageConsumer logger = logging.getLogger(__name__) +# TODO +# [ ] Delete original message from the incoming queue after all successful +# writes. +# [ ] Implement a retry queue. +# [ ] Consider journaling of operations. + + class ContentDedup(object): """ Message deduplication. @@ -37,8 +46,8 @@ class ContentDedup(object): We do a query for the content hashes before writing to our beloved sqlcipher backend of Soledad. This means, by now, that: - 1. We will not store the same attachment twice, only the hash of it. - 2. We will not store the same message body twice, only the hash of it. + 1. We will not store the same body/attachment twice, only the hash of it. + 2. We will not store the same message header twice, only the hash of it. The first case is useful if you are always receiving the same old memes from unwary friends that still have not discovered that 4chan is the @@ -49,6 +58,7 @@ class ContentDedup(object): to run Bitmask in completely free phones. Yes, I mean that, the whole GSM Stack. """ + # TODO refactor using unique_query def _header_does_exist(self, doc): """ @@ -99,6 +109,12 @@ class ContentDedup(object): return True +class MsgWriteError(Exception): + """ + Raised if any exception is found while saving message parts. + """ + + class SoledadStore(ContentDedup): """ This will create docs in the local Soledad database. @@ -108,7 +124,7 @@ class SoledadStore(ContentDedup): def __init__(self, soledad): """ - Initialize the writer. + Initialize the permanent store that writes to Soledad database. :param soledad: the soledad instance :type soledad: Soledad @@ -165,15 +181,40 @@ class SoledadStore(ContentDedup): to be inserted. :type queue: Queue """ - # TODO should delete the original message from incoming after + # TODO should delete the original message from incoming only after # the writes are done. # TODO should handle the delete case # TODO should handle errors + # TODO could generalize this method into a generic consumer + # and only implement `process` here empty = queue.empty() while not empty: - for item, call in self._process(queue): - self._try_call(call, item) + items = self._process(queue) + # we prime the generator, that should return the + # item in the first place. + msg_wrapper = items.next() + + # From here, we unpack the subpart items and + # the right soledad call. + try: + failed = False + for item, call in items: + try: + self._try_call(call, item) + except Exception: + failed = True + continue + if failed: + raise MsgWriteError + + except MsgWriteError: + logger.error("Error while processing item.") + pass + else: + # If everything went well, we can unset the new flag + # in the source store (memory store) + msg_wrapper.new = False empty = queue.empty() # @@ -182,14 +223,16 @@ class SoledadStore(ContentDedup): def _process(self, queue): """ - Return the item and the proper call type for the next - item in the queue if any. + Return an iterator that will yield the msg_wrapper in the first place, + followed by the subparts item and the proper call type for every + item in the queue, if any. :param queue: the queue from where we'll pick item. :type queue: Queue """ msg_wrapper = queue.get() - return self._get_calls_for_msg_parts(msg_wrapper) + return chain((msg_wrapper,), + self._get_calls_for_msg_parts(msg_wrapper)) def _try_call(self, call, item): """ @@ -205,7 +248,7 @@ class SoledadStore(ContentDedup): def _get_calls_for_msg_parts(self, msg_wrapper): """ - Return the proper call type for a given item. + Generator that return the proper call type for a given item. :param msg_wrapper: A MessageWrapper :type msg_wrapper: IMessageContainer @@ -220,18 +263,31 @@ class SoledadStore(ContentDedup): if item.part == MessagePartType.fdoc: yield dict(item.content), call - if item.part == MessagePartType.hdoc: + elif item.part == MessagePartType.hdoc: if not self._header_does_exist(item.content): yield dict(item.content), call - if item.part == MessagePartType.cdoc: - if self._content_does_exist(item.content): + elif item.part == MessagePartType.cdoc: + if not self._content_does_exist(item.content): + + # XXX DEBUG ------------------- + print "about to write content-doc ", + #import pprint; pprint.pprint(item.content) + yield dict(item.content), call + # TODO should write back to the queue + # with the results of the operation. + # We can write there: + # (*) MsgWriteACK --> Should remove from incoming queue. + # (We should do this here). + + # Implement using callbacks for each operation. + # TODO should check for elements with the dirty state # TODO if new == False and dirty == True, put_doc # XXX for puts, we will have to retrieve # the document, change the content, and # pass the whole document under "content" else: - logger.error("Cannot put documents yet!") + logger.error("Cannot put/delete documents yet!") -- cgit v1.2.3 From e02db78b1b6d8fe021efd4adb250c64a1dd4bac4 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Fri, 24 Jan 2014 05:39:13 -0400 Subject: flags use the memstore * add new/dirty deferred dict to notify when written to disk * fix eventual duplication after copy * fix flag flickering on first retrieval. --- src/leap/mail/imap/soledadstore.py | 35 +++++++++++++++++++++++++++-------- 1 file changed, 27 insertions(+), 8 deletions(-) (limited to 'src/leap/mail/imap/soledadstore.py') diff --git a/src/leap/mail/imap/soledadstore.py b/src/leap/mail/imap/soledadstore.py index d36acae..b321da8 100644 --- a/src/leap/mail/imap/soledadstore.py +++ b/src/leap/mail/imap/soledadstore.py @@ -81,7 +81,8 @@ class ContentDedup(object): if len(header_docs) != 1: logger.warning("Found more than one copy of chash %s!" % (chash,)) - logger.debug("Found header doc with that hash! Skipping save!") + # XXX re-enable + #logger.debug("Found header doc with that hash! Skipping save!") return True def _content_does_exist(self, doc): @@ -105,7 +106,8 @@ class ContentDedup(object): if len(attach_docs) != 1: logger.warning("Found more than one copy of phash %s!" % (phash,)) - logger.debug("Found attachment doc with that hash! Skipping save!") + # XXX re-enable + #logger.debug("Found attachment doc with that hash! Skipping save!") return True @@ -215,6 +217,7 @@ class SoledadStore(ContentDedup): # If everything went well, we can unset the new flag # in the source store (memory store) msg_wrapper.new = False + msg_wrapper.dirty = False empty = queue.empty() # @@ -261,6 +264,9 @@ class SoledadStore(ContentDedup): # item is expected to be a MessagePartDoc for item in msg_wrapper.walk(): if item.part == MessagePartType.fdoc: + + # FIXME add content duplication for HEADERS too! + # (only 1 chash per mailbox!) yield dict(item.content), call elif item.part == MessagePartType.hdoc: @@ -276,18 +282,31 @@ class SoledadStore(ContentDedup): yield dict(item.content), call + # For now, the only thing that will be dirty is + # the flags doc. + + elif msg_wrapper.dirty is True: + print "DIRTY DOC! ----------------------" + call = self._soledad.put_doc + + # item is expected to be a MessagePartDoc + for item in msg_wrapper.walk(): + doc_id = item.doc_id # defend! + doc = self._soledad.get_doc(doc_id) + doc.content = item.content + + if item.part == MessagePartType.fdoc: + print "Will PUT the doc: ", doc + yield dict(doc), call + + # XXX also for linkage-doc + # TODO should write back to the queue # with the results of the operation. # We can write there: # (*) MsgWriteACK --> Should remove from incoming queue. # (We should do this here). - # Implement using callbacks for each operation. - # TODO should check for elements with the dirty state - # TODO if new == False and dirty == True, put_doc - # XXX for puts, we will have to retrieve - # the document, change the content, and - # pass the whole document under "content" else: logger.error("Cannot put/delete documents yet!") -- cgit v1.2.3 From a5508429b90e2e9b58c5d073610ee5a10274663f Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Fri, 24 Jan 2014 23:14:38 -0400 Subject: recent-flags use the memory store --- src/leap/mail/imap/soledadstore.py | 59 +++++++++++++++++++++++++++++++------- 1 file changed, 49 insertions(+), 10 deletions(-) (limited to 'src/leap/mail/imap/soledadstore.py') diff --git a/src/leap/mail/imap/soledadstore.py b/src/leap/mail/imap/soledadstore.py index b321da8..ea5b36e 100644 --- a/src/leap/mail/imap/soledadstore.py +++ b/src/leap/mail/imap/soledadstore.py @@ -25,6 +25,8 @@ from u1db import errors as u1db_errors from zope.interface import implements from leap.mail.imap.messageparts import MessagePartType +from leap.mail.imap.messageparts import MessageWrapper +from leap.mail.imap.messageparts import RecentFlagsDoc from leap.mail.imap.fields import fields from leap.mail.imap.interfaces import IMessageStore from leap.mail.messageflow import IMessageConsumer @@ -193,9 +195,10 @@ class SoledadStore(ContentDedup): empty = queue.empty() while not empty: items = self._process(queue) + # we prime the generator, that should return the - # item in the first place. - msg_wrapper = items.next() + # message or flags wrapper item in the first place. + doc_wrapper = items.next() # From here, we unpack the subpart items and # the right soledad call. @@ -214,10 +217,11 @@ class SoledadStore(ContentDedup): logger.error("Error while processing item.") pass else: - # If everything went well, we can unset the new flag - # in the source store (memory store) - msg_wrapper.new = False - msg_wrapper.dirty = False + if isinstance(doc_wrapper, MessageWrapper): + # If everything went well, we can unset the new flag + # in the source store (memory store) + doc_wrapper.new = False + doc_wrapper.dirty = False empty = queue.empty() # @@ -233,9 +237,20 @@ class SoledadStore(ContentDedup): :param queue: the queue from where we'll pick item. :type queue: Queue """ - msg_wrapper = queue.get() - return chain((msg_wrapper,), - self._get_calls_for_msg_parts(msg_wrapper)) + doc_wrapper = queue.get() + + if isinstance(doc_wrapper, MessageWrapper): + return chain((doc_wrapper,), + self._get_calls_for_msg_parts(doc_wrapper)) + elif isinstance(doc_wrapper, RecentFlagsDoc): + print "getting calls for rflags" + return chain((doc_wrapper,), + self._get_calls_for_rflags_doc(doc_wrapper)) + else: + print "********************" + print "CANNOT PROCESS ITEM!" + print "item --------------------->", doc_wrapper + return (i for i in []) def _try_call(self, call, item): """ @@ -309,4 +324,28 @@ class SoledadStore(ContentDedup): # Implement using callbacks for each operation. else: - logger.error("Cannot put/delete documents yet!") + logger.error("Cannot delete documents yet!") + + def _get_calls_for_rflags_doc(self, rflags_wrapper): + """ + We always put these documents. + """ + call = self._soledad.put_doc + rdoc = self._soledad.get_doc(rflags_wrapper.doc_id) + + payload = rflags_wrapper.content + print "rdoc", rdoc + print "SAVING RFLAGS TO SOLEDAD..." + import pprint; pprint.pprint(payload) + + if payload: + rdoc.content = payload + print + print "YIELDING -----", rdoc + print "AND ----------", call + yield rdoc, call + else: + print ">>>>>>>>>>>>>>>>>" + print ">>>>>>>>>>>>>>>>>" + print ">>>>>>>>>>>>>>>>>" + print "No payload" -- cgit v1.2.3 From f5365ae0c2edb8b3e879f876f2f7e42b25f4616a Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Mon, 27 Jan 2014 16:11:53 -0400 Subject: handle last_uid property in memory store --- src/leap/mail/imap/soledadstore.py | 129 ++++++++++++++++++++++++++++--------- 1 file changed, 100 insertions(+), 29 deletions(-) (limited to 'src/leap/mail/imap/soledadstore.py') diff --git a/src/leap/mail/imap/soledadstore.py b/src/leap/mail/imap/soledadstore.py index ea5b36e..60576a3 100644 --- a/src/leap/mail/imap/soledadstore.py +++ b/src/leap/mail/imap/soledadstore.py @@ -18,18 +18,22 @@ A MessageStore that writes to Soledad. """ import logging +import threading from itertools import chain +#from twisted.internet import defer from u1db import errors as u1db_errors from zope.interface import implements +from leap.common.check import leap_assert_type from leap.mail.imap.messageparts import MessagePartType from leap.mail.imap.messageparts import MessageWrapper from leap.mail.imap.messageparts import RecentFlagsDoc from leap.mail.imap.fields import fields from leap.mail.imap.interfaces import IMessageStore from leap.mail.messageflow import IMessageConsumer +from leap.mail.utils import first logger = logging.getLogger(__name__) @@ -123,6 +127,7 @@ class SoledadStore(ContentDedup): """ This will create docs in the local Soledad database. """ + _last_uid_lock = threading.Lock() implements(IMessageConsumer, IMessageStore) @@ -177,6 +182,7 @@ class SoledadStore(ContentDedup): # IMessageConsumer + #@profile def consume(self, queue): """ Creates a new document in soledad db. @@ -220,6 +226,7 @@ class SoledadStore(ContentDedup): if isinstance(doc_wrapper, MessageWrapper): # If everything went well, we can unset the new flag # in the source store (memory store) + print "unsetting new flag!" doc_wrapper.new = False doc_wrapper.dirty = False empty = queue.empty() @@ -243,13 +250,11 @@ class SoledadStore(ContentDedup): return chain((doc_wrapper,), self._get_calls_for_msg_parts(doc_wrapper)) elif isinstance(doc_wrapper, RecentFlagsDoc): - print "getting calls for rflags" return chain((doc_wrapper,), self._get_calls_for_rflags_doc(doc_wrapper)) else: print "********************" print "CANNOT PROCESS ITEM!" - print "item --------------------->", doc_wrapper return (i for i in []) def _try_call(self, call, item): @@ -275,6 +280,7 @@ class SoledadStore(ContentDedup): if msg_wrapper.new is True: call = self._soledad.create_doc + print "NEW DOC ----------------------" # item is expected to be a MessagePartDoc for item in msg_wrapper.walk(): @@ -301,30 +307,22 @@ class SoledadStore(ContentDedup): # the flags doc. elif msg_wrapper.dirty is True: - print "DIRTY DOC! ----------------------" call = self._soledad.put_doc - # item is expected to be a MessagePartDoc for item in msg_wrapper.walk(): + # XXX FIXME Give error if dirty and not doc_id !!! doc_id = item.doc_id # defend! + if not doc_id: + continue doc = self._soledad.get_doc(doc_id) - doc.content = item.content - + doc.content = dict(item.content) if item.part == MessagePartType.fdoc: - print "Will PUT the doc: ", doc - yield dict(doc), call - - # XXX also for linkage-doc - - # TODO should write back to the queue - # with the results of the operation. - # We can write there: - # (*) MsgWriteACK --> Should remove from incoming queue. - # (We should do this here). - # Implement using callbacks for each operation. + logger.debug("PUT dirty fdoc") + yield doc, call + # XXX also for linkage-doc !!! else: - logger.error("Cannot delete documents yet!") + logger.error("Cannot delete documents yet from the queue...!") def _get_calls_for_rflags_doc(self, rflags_wrapper): """ @@ -334,18 +332,91 @@ class SoledadStore(ContentDedup): rdoc = self._soledad.get_doc(rflags_wrapper.doc_id) payload = rflags_wrapper.content - print "rdoc", rdoc - print "SAVING RFLAGS TO SOLEDAD..." - import pprint; pprint.pprint(payload) + logger.debug("Saving RFLAGS to Soledad...") if payload: rdoc.content = payload - print - print "YIELDING -----", rdoc - print "AND ----------", call yield rdoc, call - else: - print ">>>>>>>>>>>>>>>>>" - print ">>>>>>>>>>>>>>>>>" - print ">>>>>>>>>>>>>>>>>" - print "No payload" + + def _get_mbox_document(self, mbox): + """ + Return mailbox document. + + :return: A SoledadDocument containing this mailbox, or None if + the query failed. + :rtype: SoledadDocument or None. + """ + try: + query = self._soledad.get_from_index( + fields.TYPE_MBOX_IDX, + fields.TYPE_MBOX_VAL, mbox) + if query: + return query.pop() + except Exception as exc: + logger.exception("Unhandled error %r" % exc) + + def get_flags_doc(self, mbox, uid): + """ + Return the SoledadDocument for the given mbox and uid. + """ + try: + flag_docs = self._soledad.get_from_index( + fields.TYPE_MBOX_UID_IDX, + fields.TYPE_FLAGS_VAL, mbox, str(uid)) + result = first(flag_docs) + except Exception as exc: + # ugh! Something's broken down there! + logger.warning("ERROR while getting flags for UID: %s" % uid) + logger.exception(exc) + finally: + return result + + def write_last_uid(self, mbox, value): + """ + Write the `last_uid` integer to the proper mailbox document + in Soledad. + This is called from the deferred triggered by + memorystore.increment_last_soledad_uid, which is expected to + run in a separate thread. + """ + leap_assert_type(value, int) + key = fields.LAST_UID_KEY + + with self._last_uid_lock: + mbox_doc = self._get_mbox_document(mbox) + old_val = mbox_doc.content[key] + if value < old_val: + logger.error("%s:%s Tried to write a UID lesser than what's " + "stored!" % (mbox, value)) + mbox_doc.content[key] = value + self._soledad.put_doc(mbox_doc) + + # deleted messages + + def deleted_iter(self, mbox): + """ + Get an iterator for the SoledadDocuments for messages + with \\Deleted flag for a given mailbox. + + :return: iterator through deleted message docs + :rtype: iterable + """ + return (doc for doc in self._soledad.get_from_index( + fields.TYPE_MBOX_DEL_IDX, + fields.TYPE_FLAGS_VAL, mbox, '1')) + + # TODO can deferToThread this? + def remove_all_deleted(self, mbox): + """ + Remove from Soledad all messages flagged as deleted for a given + mailbox. + """ + print "DELETING ALL DOCS FOR -------", mbox + deleted = [] + for doc in self.deleted_iter(mbox): + deleted.append(doc.content[fields.UID_KEY]) + print + print ">>>>>>>>>>>>>>>>>>>>" + print "deleting doc: ", doc.doc_id, doc.content + self._soledad.delete_doc(doc) + return deleted -- cgit v1.2.3 From a7e0054b595822325f749b0b1df7d25cab4e6486 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Tue, 28 Jan 2014 18:39:59 -0400 Subject: docstring fixes Also some fixes for None comparisons. --- src/leap/mail/imap/soledadstore.py | 87 ++++++++++++++++++++++++-------------- 1 file changed, 56 insertions(+), 31 deletions(-) (limited to 'src/leap/mail/imap/soledadstore.py') diff --git a/src/leap/mail/imap/soledadstore.py b/src/leap/mail/imap/soledadstore.py index 60576a3..f64ed23 100644 --- a/src/leap/mail/imap/soledadstore.py +++ b/src/leap/mail/imap/soledadstore.py @@ -22,7 +22,6 @@ import threading from itertools import chain -#from twisted.internet import defer from u1db import errors as u1db_errors from zope.interface import implements @@ -71,7 +70,7 @@ class ContentDedup(object): Check whether we already have a header document for this content hash in our database. - :param doc: tentative header document + :param doc: tentative header for document :type doc: dict :returns: True if it exists, False otherwise. """ @@ -87,8 +86,7 @@ class ContentDedup(object): if len(header_docs) != 1: logger.warning("Found more than one copy of chash %s!" % (chash,)) - # XXX re-enable - #logger.debug("Found header doc with that hash! Skipping save!") + logger.debug("Found header doc with that hash! Skipping save!") return True def _content_does_exist(self, doc): @@ -96,7 +94,7 @@ class ContentDedup(object): Check whether we already have a content document for a payload with this hash in our database. - :param doc: tentative content document + :param doc: tentative content for document :type doc: dict :returns: True if it exists, False otherwise. """ @@ -112,8 +110,7 @@ class ContentDedup(object): if len(attach_docs) != 1: logger.warning("Found more than one copy of phash %s!" % (phash,)) - # XXX re-enable - #logger.debug("Found attachment doc with that hash! Skipping save!") + logger.debug("Found attachment doc with that hash! Skipping save!") return True @@ -151,38 +148,49 @@ class SoledadStore(ContentDedup): Create the passed message into this SoledadStore. :param mbox: the mbox this message belongs. + :type mbox: str or unicode :param uid: the UID that identifies this message in this mailbox. + :type uid: int :param message: a IMessageContainer implementor. """ + raise NotImplementedError() def put_message(self, mbox, uid, message): """ Put the passed existing message into this SoledadStore. :param mbox: the mbox this message belongs. + :type mbox: str or unicode :param uid: the UID that identifies this message in this mailbox. + :type uid: int :param message: a IMessageContainer implementor. """ + raise NotImplementedError() def remove_message(self, mbox, uid): """ Remove the given message from this SoledadStore. :param mbox: the mbox this message belongs. + :type mbox: str or unicode :param uid: the UID that identifies this message in this mailbox. + :type uid: int """ + raise NotImplementedError() def get_message(self, mbox, uid): """ Get a IMessageContainer for the given mbox and uid combination. :param mbox: the mbox this message belongs. + :type mbox: str or unicode :param uid: the UID that identifies this message in this mailbox. + :type uid: int """ + raise NotImplementedError() # IMessageConsumer - #@profile def consume(self, queue): """ Creates a new document in soledad db. @@ -198,8 +206,7 @@ class SoledadStore(ContentDedup): # TODO could generalize this method into a generic consumer # and only implement `process` here - empty = queue.empty() - while not empty: + while not queue.empty(): items = self._process(queue) # we prime the generator, that should return the @@ -213,23 +220,22 @@ class SoledadStore(ContentDedup): for item, call in items: try: self._try_call(call, item) - except Exception: - failed = True + except Exception as exc: + failed = exc continue if failed: raise MsgWriteError except MsgWriteError: logger.error("Error while processing item.") - pass + logger.exception(failed) else: if isinstance(doc_wrapper, MessageWrapper): # If everything went well, we can unset the new flag # in the source store (memory store) - print "unsetting new flag!" + logger.info("unsetting new flag!") doc_wrapper.new = False doc_wrapper.dirty = False - empty = queue.empty() # # SoledadStore specific methods. @@ -253,20 +259,24 @@ class SoledadStore(ContentDedup): return chain((doc_wrapper,), self._get_calls_for_rflags_doc(doc_wrapper)) else: - print "********************" - print "CANNOT PROCESS ITEM!" + logger.warning("CANNOT PROCESS ITEM!") return (i for i in []) def _try_call(self, call, item): """ Try to invoke a given call with item as a parameter. + + :param call: the function to call + :type call: callable + :param item: the payload to pass to the call as argument + :type item: object """ - if not call: + if call is None: return try: call(item) except u1db_errors.RevisionConflict as exc: - logger.error("Error: %r" % (exc,)) + logger.exception("Error: %r" % (exc,)) raise exc def _get_calls_for_msg_parts(self, msg_wrapper): @@ -275,12 +285,14 @@ class SoledadStore(ContentDedup): :param msg_wrapper: A MessageWrapper :type msg_wrapper: IMessageContainer + :return: a generator of tuples with recent-flags doc payload + and callable + :rtype: generator """ call = None - if msg_wrapper.new is True: + if msg_wrapper.new: call = self._soledad.create_doc - print "NEW DOC ----------------------" # item is expected to be a MessagePartDoc for item in msg_wrapper.walk(): @@ -296,17 +308,12 @@ class SoledadStore(ContentDedup): elif item.part == MessagePartType.cdoc: if not self._content_does_exist(item.content): - - # XXX DEBUG ------------------- - print "about to write content-doc ", - #import pprint; pprint.pprint(item.content) - yield dict(item.content), call # For now, the only thing that will be dirty is # the flags doc. - elif msg_wrapper.dirty is True: + elif msg_wrapper.dirty: call = self._soledad.put_doc # item is expected to be a MessagePartDoc for item in msg_wrapper.walk(): @@ -327,6 +334,11 @@ class SoledadStore(ContentDedup): def _get_calls_for_rflags_doc(self, rflags_wrapper): """ We always put these documents. + + :param rflags_wrapper: A wrapper around recent flags doc. + :type rflags_wrapper: RecentFlagsWrapper + :return: a tuple with recent-flags doc payload and callable + :rtype: tuple """ call = self._soledad.put_doc rdoc = self._soledad.get_doc(rflags_wrapper.doc_id) @@ -342,6 +354,8 @@ class SoledadStore(ContentDedup): """ Return mailbox document. + :param mbox: the mailbox + :type mbox: str or unicode :return: A SoledadDocument containing this mailbox, or None if the query failed. :rtype: SoledadDocument or None. @@ -358,6 +372,11 @@ class SoledadStore(ContentDedup): def get_flags_doc(self, mbox, uid): """ Return the SoledadDocument for the given mbox and uid. + + :param mbox: the mailbox + :type mbox: str or unicode + :param uid: the UID for the message + :type uid: int """ try: flag_docs = self._soledad.get_from_index( @@ -378,6 +397,11 @@ class SoledadStore(ContentDedup): This is called from the deferred triggered by memorystore.increment_last_soledad_uid, which is expected to run in a separate thread. + + :param mbox: the mailbox + :type mbox: str or unicode + :param value: the value to set + :type value: int """ leap_assert_type(value, int) key = fields.LAST_UID_KEY @@ -398,6 +422,8 @@ class SoledadStore(ContentDedup): Get an iterator for the SoledadDocuments for messages with \\Deleted flag for a given mailbox. + :param mbox: the mailbox + :type mbox: str or unicode :return: iterator through deleted message docs :rtype: iterable """ @@ -410,13 +436,12 @@ class SoledadStore(ContentDedup): """ Remove from Soledad all messages flagged as deleted for a given mailbox. + + :param mbox: the mailbox + :type mbox: str or unicode """ - print "DELETING ALL DOCS FOR -------", mbox deleted = [] for doc in self.deleted_iter(mbox): deleted.append(doc.content[fields.UID_KEY]) - print - print ">>>>>>>>>>>>>>>>>>>>" - print "deleting doc: ", doc.doc_id, doc.content self._soledad.delete_doc(doc) return deleted -- cgit v1.2.3 From 1b71ba510a2e6680f1ecc84eacfc492b0bbe24fc Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Wed, 29 Jan 2014 00:54:20 -0400 Subject: Fix copy and deletion problems * reorganize and simplify STORE command processing * add the notification after the processing of the whole sequence --- src/leap/mail/imap/soledadstore.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) (limited to 'src/leap/mail/imap/soledadstore.py') diff --git a/src/leap/mail/imap/soledadstore.py b/src/leap/mail/imap/soledadstore.py index f64ed23..ae5c583 100644 --- a/src/leap/mail/imap/soledadstore.py +++ b/src/leap/mail/imap/soledadstore.py @@ -26,6 +26,7 @@ from u1db import errors as u1db_errors from zope.interface import implements from leap.common.check import leap_assert_type +from leap.mail.decorators import deferred from leap.mail.imap.messageparts import MessagePartType from leap.mail.imap.messageparts import MessageWrapper from leap.mail.imap.messageparts import RecentFlagsDoc @@ -191,6 +192,7 @@ class SoledadStore(ContentDedup): # IMessageConsumer + @deferred def consume(self, queue): """ Creates a new document in soledad db. @@ -297,9 +299,6 @@ class SoledadStore(ContentDedup): # item is expected to be a MessagePartDoc for item in msg_wrapper.walk(): if item.part == MessagePartType.fdoc: - - # FIXME add content duplication for HEADERS too! - # (only 1 chash per mailbox!) yield dict(item.content), call elif item.part == MessagePartType.hdoc: -- cgit v1.2.3 From ff7de0c9bc760e097c0286d2d62a19095be3f35e Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Thu, 30 Jan 2014 18:35:03 -0400 Subject: prime-uids We pre-fetch the uids from soledad on mailbox initialization --- src/leap/mail/imap/soledadstore.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'src/leap/mail/imap/soledadstore.py') diff --git a/src/leap/mail/imap/soledadstore.py b/src/leap/mail/imap/soledadstore.py index ae5c583..ff5e03b 100644 --- a/src/leap/mail/imap/soledadstore.py +++ b/src/leap/mail/imap/soledadstore.py @@ -192,7 +192,8 @@ class SoledadStore(ContentDedup): # IMessageConsumer - @deferred + # It's not thread-safe to defer this to a different thread + def consume(self, queue): """ Creates a new document in soledad db. -- cgit v1.2.3 From 0f6a8e1c83995cffec51e81f626d4bb29d4f7345 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Fri, 31 Jan 2014 03:34:03 -0400 Subject: properly implement deferreds in several commands Passing along a deferred as an observer whose callback will be called with the proper result. Returning to thread in the appropiate points. just let's remember that twisted APIs are not thread safe! SoledadStore process_item also properly returned to thread. Changed @deferred to @deferred_to_thread so it results less confusing to read. "know the territory". aha! --- src/leap/mail/imap/soledadstore.py | 99 ++++++++++++++++++++++++++------------ 1 file changed, 68 insertions(+), 31 deletions(-) (limited to 'src/leap/mail/imap/soledadstore.py') diff --git a/src/leap/mail/imap/soledadstore.py b/src/leap/mail/imap/soledadstore.py index ff5e03b..82f27e7 100644 --- a/src/leap/mail/imap/soledadstore.py +++ b/src/leap/mail/imap/soledadstore.py @@ -23,10 +23,12 @@ import threading from itertools import chain from u1db import errors as u1db_errors +from twisted.internet import defer +from twisted.python import log from zope.interface import implements from leap.common.check import leap_assert_type -from leap.mail.decorators import deferred +from leap.mail.decorators import deferred_to_thread from leap.mail.imap.messageparts import MessagePartType from leap.mail.imap.messageparts import MessageWrapper from leap.mail.imap.messageparts import RecentFlagsDoc @@ -209,52 +211,87 @@ class SoledadStore(ContentDedup): # TODO could generalize this method into a generic consumer # and only implement `process` here + def docWriteCallBack(doc_wrapper): + """ + Callback for a successful write of a document wrapper. + """ + if isinstance(doc_wrapper, MessageWrapper): + # If everything went well, we can unset the new flag + # in the source store (memory store) + self._unset_new_dirty(doc_wrapper) + + def docWriteErrorBack(failure): + """ + Errorback for write operations. + """ + log.error("Error while processing item.") + log.msg(failure.getTraceBack()) + while not queue.empty(): - items = self._process(queue) + doc_wrapper = queue.get() + d = defer.Deferred() + d.addCallbacks(docWriteCallBack, docWriteErrorBack) + + self._consume_doc(doc_wrapper, d) + + @deferred_to_thread + def _unset_new_dirty(self, doc_wrapper): + """ + Unset the `new` and `dirty` flags for this document wrapper in the + memory store. + + :param doc_wrapper: a MessageWrapper instance + :type doc_wrapper: MessageWrapper + """ + # XXX debug msg id/mbox? + logger.info("unsetting new flag!") + doc_wrapper.new = False + doc_wrapper.dirty = False - # we prime the generator, that should return the - # message or flags wrapper item in the first place. - doc_wrapper = items.next() + @deferred_to_thread + def _consume_doc(self, doc_wrapper, deferred): + """ + Consume each document wrapper in a separate thread. + + :param doc_wrapper: + :type doc_wrapper: + :param deferred: + :type deferred: Deferred + """ + items = self._process(doc_wrapper) - # From here, we unpack the subpart items and - # the right soledad call. + # we prime the generator, that should return the + # message or flags wrapper item in the first place. + doc_wrapper = items.next() + + # From here, we unpack the subpart items and + # the right soledad call. + failed = False + for item, call in items: try: - failed = False - for item, call in items: - try: - self._try_call(call, item) - except Exception as exc: - failed = exc - continue - if failed: - raise MsgWriteError - - except MsgWriteError: - logger.error("Error while processing item.") - logger.exception(failed) - else: - if isinstance(doc_wrapper, MessageWrapper): - # If everything went well, we can unset the new flag - # in the source store (memory store) - logger.info("unsetting new flag!") - doc_wrapper.new = False - doc_wrapper.dirty = False + self._try_call(call, item) + except Exception as exc: + failed = exc + continue + if failed: + deferred.errback(MsgWriteError( + "There was an error writing the mesage")) + else: + deferred.callback(doc_wrapper) # # SoledadStore specific methods. # - def _process(self, queue): + def _process(self, doc_wrapper): """ - Return an iterator that will yield the msg_wrapper in the first place, + Return an iterator that will yield the doc_wrapper in the first place, followed by the subparts item and the proper call type for every item in the queue, if any. :param queue: the queue from where we'll pick item. :type queue: Queue """ - doc_wrapper = queue.get() - if isinstance(doc_wrapper, MessageWrapper): return chain((doc_wrapper,), self._get_calls_for_msg_parts(doc_wrapper)) -- cgit v1.2.3 From 23e28bae2c3cb74e00e29ee8add0b73adeb65c2b Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Tue, 4 Feb 2014 10:57:49 -0400 Subject: fixes after review * Some more docstring completion/fixes. * Removed unneeded str coertion. * Handle mailbox name in logs. * Separate manhole boilerplate into its own file. --- src/leap/mail/imap/soledadstore.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) (limited to 'src/leap/mail/imap/soledadstore.py') diff --git a/src/leap/mail/imap/soledadstore.py b/src/leap/mail/imap/soledadstore.py index 82f27e7..8e22f26 100644 --- a/src/leap/mail/imap/soledadstore.py +++ b/src/leap/mail/imap/soledadstore.py @@ -253,9 +253,11 @@ class SoledadStore(ContentDedup): """ Consume each document wrapper in a separate thread. - :param doc_wrapper: - :type doc_wrapper: - :param deferred: + :param doc_wrapper: a MessageWrapper or RecentFlagsDoc instance + :type doc_wrapper: MessageWrapper or RecentFlagsDoc + :param deferred: a deferred that will be fired when the write operation + has finished, either calling its callback or its + errback depending on whether it succeed. :type deferred: Deferred """ items = self._process(doc_wrapper) @@ -415,6 +417,7 @@ class SoledadStore(ContentDedup): :param uid: the UID for the message :type uid: int """ + result = None try: flag_docs = self._soledad.get_from_index( fields.TYPE_MBOX_UID_IDX, @@ -447,7 +450,7 @@ class SoledadStore(ContentDedup): mbox_doc = self._get_mbox_document(mbox) old_val = mbox_doc.content[key] if value < old_val: - logger.error("%s:%s Tried to write a UID lesser than what's " + logger.error("%r:%s Tried to write a UID lesser than what's " "stored!" % (mbox, value)) mbox_doc.content[key] = value self._soledad.put_doc(mbox_doc) -- cgit v1.2.3 From c955c7015b5986af40b2253ac98846f4547e5e00 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Wed, 5 Feb 2014 21:40:20 -0400 Subject: lock document retrieval/put --- src/leap/mail/imap/soledadstore.py | 47 +++++++++++++++++++++++--------------- 1 file changed, 29 insertions(+), 18 deletions(-) (limited to 'src/leap/mail/imap/soledadstore.py') diff --git a/src/leap/mail/imap/soledadstore.py b/src/leap/mail/imap/soledadstore.py index 8e22f26..bfa53b6 100644 --- a/src/leap/mail/imap/soledadstore.py +++ b/src/leap/mail/imap/soledadstore.py @@ -128,6 +128,7 @@ class SoledadStore(ContentDedup): This will create docs in the local Soledad database. """ _last_uid_lock = threading.Lock() + _soledad_rw_lock = threading.Lock() implements(IMessageConsumer, IMessageStore) @@ -140,6 +141,10 @@ class SoledadStore(ContentDedup): """ self._soledad = soledad + self._CREATE_DOC_FUN = self._soledad.create_doc + self._PUT_DOC_FUN = self._soledad.put_doc + self._GET_DOC_FUN = self._soledad.get_doc + # IMessageStore # ------------------------------------------------------------------- @@ -224,7 +229,7 @@ class SoledadStore(ContentDedup): """ Errorback for write operations. """ - log.error("Error while processing item.") + log.msg("ERROR: Error while processing item.") log.msg(failure.getTraceBack()) while not queue.empty(): @@ -234,6 +239,7 @@ class SoledadStore(ContentDedup): self._consume_doc(doc_wrapper, d) + # FIXME this should not run the callback in the deferred thred @deferred_to_thread def _unset_new_dirty(self, doc_wrapper): """ @@ -248,7 +254,8 @@ class SoledadStore(ContentDedup): doc_wrapper.new = False doc_wrapper.dirty = False - @deferred_to_thread + # FIXME this should not run the callback in the deferred thred + #@deferred_to_thread def _consume_doc(self, doc_wrapper, deferred): """ Consume each document wrapper in a separate thread. @@ -273,6 +280,7 @@ class SoledadStore(ContentDedup): try: self._try_call(call, item) except Exception as exc: + logger.exception(exc) failed = exc continue if failed: @@ -315,11 +323,18 @@ class SoledadStore(ContentDedup): """ if call is None: return - try: - call(item) - except u1db_errors.RevisionConflict as exc: - logger.exception("Error: %r" % (exc,)) - raise exc + + with self._soledad_rw_lock: + if call == self._PUT_DOC_FUN: + doc_id = item.doc_id + doc = self._GET_DOC_FUN(doc_id) + doc.content = dict(item.content) + item = doc + try: + call(item) + except u1db_errors.RevisionConflict as exc: + logger.exception("Error: %r" % (exc,)) + raise exc def _get_calls_for_msg_parts(self, msg_wrapper): """ @@ -334,7 +349,7 @@ class SoledadStore(ContentDedup): call = None if msg_wrapper.new: - call = self._soledad.create_doc + call = self._CREATE_DOC_FUN # item is expected to be a MessagePartDoc for item in msg_wrapper.walk(): @@ -353,18 +368,17 @@ class SoledadStore(ContentDedup): # the flags doc. elif msg_wrapper.dirty: - call = self._soledad.put_doc + call = self._PUT_DOC_FUN # item is expected to be a MessagePartDoc for item in msg_wrapper.walk(): # XXX FIXME Give error if dirty and not doc_id !!! doc_id = item.doc_id # defend! if not doc_id: continue - doc = self._soledad.get_doc(doc_id) - doc.content = dict(item.content) + if item.part == MessagePartType.fdoc: logger.debug("PUT dirty fdoc") - yield doc, call + yield item, call # XXX also for linkage-doc !!! else: @@ -379,15 +393,12 @@ class SoledadStore(ContentDedup): :return: a tuple with recent-flags doc payload and callable :rtype: tuple """ - call = self._soledad.put_doc - rdoc = self._soledad.get_doc(rflags_wrapper.doc_id) + call = self._CREATE_DOC_FUN payload = rflags_wrapper.content - logger.debug("Saving RFLAGS to Soledad...") - if payload: - rdoc.content = payload - yield rdoc, call + logger.debug("Saving RFLAGS to Soledad...") + yield payload, call def _get_mbox_document(self, mbox): """ -- cgit v1.2.3 From 860e407ba0a86be30865a77ec29c6ecacf7899a4 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Thu, 6 Feb 2014 01:39:47 -0400 Subject: defer copy and soledad writes --- src/leap/mail/imap/soledadstore.py | 61 +++++++++++++++++++++++++------------- 1 file changed, 40 insertions(+), 21 deletions(-) (limited to 'src/leap/mail/imap/soledadstore.py') diff --git a/src/leap/mail/imap/soledadstore.py b/src/leap/mail/imap/soledadstore.py index bfa53b6..13f896f 100644 --- a/src/leap/mail/imap/soledadstore.py +++ b/src/leap/mail/imap/soledadstore.py @@ -216,6 +216,8 @@ class SoledadStore(ContentDedup): # TODO could generalize this method into a generic consumer # and only implement `process` here + from twisted.internet import reactor + def docWriteCallBack(doc_wrapper): """ Callback for a successful write of a document wrapper. @@ -234,10 +236,10 @@ class SoledadStore(ContentDedup): while not queue.empty(): doc_wrapper = queue.get() + d = defer.Deferred() d.addCallbacks(docWriteCallBack, docWriteErrorBack) - - self._consume_doc(doc_wrapper, d) + reactor.callLater(0, self._consume_doc, doc_wrapper, d) # FIXME this should not run the callback in the deferred thred @deferred_to_thread @@ -254,8 +256,6 @@ class SoledadStore(ContentDedup): doc_wrapper.new = False doc_wrapper.dirty = False - # FIXME this should not run the callback in the deferred thred - #@deferred_to_thread def _consume_doc(self, doc_wrapper, deferred): """ Consume each document wrapper in a separate thread. @@ -267,33 +267,52 @@ class SoledadStore(ContentDedup): errback depending on whether it succeed. :type deferred: Deferred """ - items = self._process(doc_wrapper) + def notifyBack(failed, observer, doc_wrapper): + if failed: + observer.errback(MsgWriteError( + "There was an error writing the mesage")) + else: + observer.callback(doc_wrapper) + + def doSoledadCalls(items, observer): + # we prime the generator, that should return the + # message or flags wrapper item in the first place. + doc_wrapper = items.next() + d_sol = self._soledad_write_document_parts(items) + d_sol.addCallback(notifyBack, observer, doc_wrapper) + d_sol.addErrback(ebSoledadCalls) - # we prime the generator, that should return the - # message or flags wrapper item in the first place. - doc_wrapper = items.next() + def ebSoledadCalls(failure): + log.msg(failure.getTraceback()) + + d = self._iter_wrapper_subparts(doc_wrapper) + d.addCallback(doSoledadCalls, deferred) + d.addErrback(ebSoledadCalls) + + # + # SoledadStore specific methods. + # - # From here, we unpack the subpart items and - # the right soledad call. + @deferred_to_thread + def _soledad_write_document_parts(self, items): + """ + Write the document parts to soledad in a separate thread. + :param items: the iterator through the different document wrappers + payloads. + :type items: iterator + """ failed = False for item, call in items: try: self._try_call(call, item) except Exception as exc: logger.exception(exc) - failed = exc + failed = True continue - if failed: - deferred.errback(MsgWriteError( - "There was an error writing the mesage")) - else: - deferred.callback(doc_wrapper) + return failed - # - # SoledadStore specific methods. - # - - def _process(self, doc_wrapper): + @deferred_to_thread + def _iter_wrapper_subparts(self, doc_wrapper): """ Return an iterator that will yield the doc_wrapper in the first place, followed by the subparts item and the proper call type for every -- cgit v1.2.3 From b7d28d1ee8208e1361caa73740d826af3b4c572e Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Thu, 6 Feb 2014 10:29:36 -0400 Subject: defend against empty items --- src/leap/mail/imap/soledadstore.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'src/leap/mail/imap/soledadstore.py') diff --git a/src/leap/mail/imap/soledadstore.py b/src/leap/mail/imap/soledadstore.py index 13f896f..3c0b6f9 100644 --- a/src/leap/mail/imap/soledadstore.py +++ b/src/leap/mail/imap/soledadstore.py @@ -35,7 +35,7 @@ from leap.mail.imap.messageparts import RecentFlagsDoc from leap.mail.imap.fields import fields from leap.mail.imap.interfaces import IMessageStore from leap.mail.messageflow import IMessageConsumer -from leap.mail.utils import first +from leap.mail.utils import first, empty logger = logging.getLogger(__name__) @@ -303,6 +303,8 @@ class SoledadStore(ContentDedup): """ failed = False for item, call in items: + if empty(item): + continue try: self._try_call(call, item) except Exception as exc: -- cgit v1.2.3 From ff3a6a640fdb345449a5f9cd3379bbaefa36111e Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Thu, 6 Feb 2014 15:46:17 -0400 Subject: take recent count from memstore --- src/leap/mail/imap/soledadstore.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) (limited to 'src/leap/mail/imap/soledadstore.py') diff --git a/src/leap/mail/imap/soledadstore.py b/src/leap/mail/imap/soledadstore.py index 3c0b6f9..a74b49c 100644 --- a/src/leap/mail/imap/soledadstore.py +++ b/src/leap/mail/imap/soledadstore.py @@ -86,10 +86,12 @@ class ContentDedup(object): if not header_docs: return False - if len(header_docs) != 1: - logger.warning("Found more than one copy of chash %s!" - % (chash,)) - logger.debug("Found header doc with that hash! Skipping save!") + # FIXME enable only to debug this problem. + #if len(header_docs) != 1: + #logger.warning("Found more than one copy of chash %s!" + #% (chash,)) + + #logger.debug("Found header doc with that hash! Skipping save!") return True def _content_does_exist(self, doc): @@ -110,10 +112,11 @@ class ContentDedup(object): if not attach_docs: return False - if len(attach_docs) != 1: - logger.warning("Found more than one copy of phash %s!" - % (phash,)) - logger.debug("Found attachment doc with that hash! Skipping save!") + # FIXME enable only to debug this problem + #if len(attach_docs) != 1: + #logger.warning("Found more than one copy of phash %s!" + #% (phash,)) + #logger.debug("Found attachment doc with that hash! Skipping save!") return True -- cgit v1.2.3 From b92e63c316c1cf9f8b6481dbfa70737acfb3eee9 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Fri, 7 Feb 2014 05:50:55 -0400 Subject: separate better dirty/new flags; add cdocs --- src/leap/mail/imap/soledadstore.py | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) (limited to 'src/leap/mail/imap/soledadstore.py') diff --git a/src/leap/mail/imap/soledadstore.py b/src/leap/mail/imap/soledadstore.py index a74b49c..6cd3749 100644 --- a/src/leap/mail/imap/soledadstore.py +++ b/src/leap/mail/imap/soledadstore.py @@ -212,10 +212,8 @@ class SoledadStore(ContentDedup): to be inserted. :type queue: Queue """ - # TODO should delete the original message from incoming only after - # the writes are done. # TODO should handle the delete case - # TODO should handle errors + # TODO should handle errors better # TODO could generalize this method into a generic consumer # and only implement `process` here @@ -235,7 +233,7 @@ class SoledadStore(ContentDedup): Errorback for write operations. """ log.msg("ERROR: Error while processing item.") - log.msg(failure.getTraceBack()) + log.msg(failure.getTraceback()) while not queue.empty(): doc_wrapper = queue.get() @@ -354,6 +352,7 @@ class SoledadStore(ContentDedup): doc = self._GET_DOC_FUN(doc_id) doc.content = dict(item.content) item = doc + try: call(item) except u1db_errors.RevisionConflict as exc: @@ -451,6 +450,7 @@ class SoledadStore(ContentDedup): :type mbox: str or unicode :param uid: the UID for the message :type uid: int + :rtype: SoledadDocument or None """ result = None try: @@ -465,6 +465,20 @@ class SoledadStore(ContentDedup): finally: return result + def get_headers_doc(self, chash): + """ + Return the document that keeps the headers for a message + indexed by its content-hash. + + :param chash: the content-hash to retrieve the document from. + :type chash: str or unicode + :rtype: SoledadDocument or None + """ + head_docs = self._soledad.get_from_index( + fields.TYPE_C_HASH_IDX, + fields.TYPE_HEADERS_VAL, str(chash)) + return first(head_docs) + def write_last_uid(self, mbox, value): """ Write the `last_uid` integer to the proper mailbox document -- cgit v1.2.3 From 9ffcaa09c2d6a57f3f34350298eff8412b540bc9 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Fri, 7 Feb 2014 07:27:38 -0400 Subject: defer_to_thread the bulk of write operations and batch the notifications back to the memorystore, within the reactor thread. --- src/leap/mail/imap/soledadstore.py | 88 +++++++++++++++----------------------- 1 file changed, 34 insertions(+), 54 deletions(-) (limited to 'src/leap/mail/imap/soledadstore.py') diff --git a/src/leap/mail/imap/soledadstore.py b/src/leap/mail/imap/soledadstore.py index 6cd3749..e7c6b29 100644 --- a/src/leap/mail/imap/soledadstore.py +++ b/src/leap/mail/imap/soledadstore.py @@ -23,7 +23,6 @@ import threading from itertools import chain from u1db import errors as u1db_errors -from twisted.internet import defer from twisted.python import log from zope.interface import implements @@ -35,7 +34,7 @@ from leap.mail.imap.messageparts import RecentFlagsDoc from leap.mail.imap.fields import fields from leap.mail.imap.interfaces import IMessageStore from leap.mail.messageflow import IMessageConsumer -from leap.mail.utils import first, empty +from leap.mail.utils import first, empty, accumulator_queue logger = logging.getLogger(__name__) @@ -142,12 +141,18 @@ class SoledadStore(ContentDedup): :param soledad: the soledad instance :type soledad: Soledad """ + from twisted.internet import reactor self._soledad = soledad self._CREATE_DOC_FUN = self._soledad.create_doc self._PUT_DOC_FUN = self._soledad.put_doc self._GET_DOC_FUN = self._soledad.get_doc + # we instantiate an accumulator to batch the notifications + self.docs_notify_queue = accumulator_queue( + lambda item: reactor.callFromThread(self._unset_new_dirty, item), + 20) + # IMessageStore # ------------------------------------------------------------------- @@ -202,7 +207,10 @@ class SoledadStore(ContentDedup): # IMessageConsumer - # It's not thread-safe to defer this to a different thread + # TODO should handle the delete case + # TODO should handle errors better + # TODO could generalize this method into a generic consumer + # and only implement `process` here def consume(self, queue): """ @@ -212,38 +220,16 @@ class SoledadStore(ContentDedup): to be inserted. :type queue: Queue """ - # TODO should handle the delete case - # TODO should handle errors better - # TODO could generalize this method into a generic consumer - # and only implement `process` here - from twisted.internet import reactor - def docWriteCallBack(doc_wrapper): - """ - Callback for a successful write of a document wrapper. - """ - if isinstance(doc_wrapper, MessageWrapper): - # If everything went well, we can unset the new flag - # in the source store (memory store) - self._unset_new_dirty(doc_wrapper) - - def docWriteErrorBack(failure): - """ - Errorback for write operations. - """ - log.msg("ERROR: Error while processing item.") - log.msg(failure.getTraceback()) - while not queue.empty(): doc_wrapper = queue.get() + reactor.callInThread(self._consume_doc, doc_wrapper, + self.docs_notify_queue) - d = defer.Deferred() - d.addCallbacks(docWriteCallBack, docWriteErrorBack) - reactor.callLater(0, self._consume_doc, doc_wrapper, d) + # Queue empty, flush the notifications queue. + self.docs_notify_queue(None, flush=True) - # FIXME this should not run the callback in the deferred thred - @deferred_to_thread def _unset_new_dirty(self, doc_wrapper): """ Unset the `new` and `dirty` flags for this document wrapper in the @@ -252,49 +238,38 @@ class SoledadStore(ContentDedup): :param doc_wrapper: a MessageWrapper instance :type doc_wrapper: MessageWrapper """ - # XXX debug msg id/mbox? - logger.info("unsetting new flag!") - doc_wrapper.new = False - doc_wrapper.dirty = False + if isinstance(doc_wrapper, MessageWrapper): + logger.info("unsetting new flag!") + doc_wrapper.new = False + doc_wrapper.dirty = False - def _consume_doc(self, doc_wrapper, deferred): + @deferred_to_thread + def _consume_doc(self, doc_wrapper, notify_queue): """ Consume each document wrapper in a separate thread. :param doc_wrapper: a MessageWrapper or RecentFlagsDoc instance :type doc_wrapper: MessageWrapper or RecentFlagsDoc - :param deferred: a deferred that will be fired when the write operation - has finished, either calling its callback or its - errback depending on whether it succeed. - :type deferred: Deferred """ - def notifyBack(failed, observer, doc_wrapper): + def queueNotifyBack(failed, doc_wrapper): if failed: - observer.errback(MsgWriteError( - "There was an error writing the mesage")) + log.msg("There was an error writing the mesage...") else: - observer.callback(doc_wrapper) + notify_queue(doc_wrapper) - def doSoledadCalls(items, observer): + def doSoledadCalls(items): # we prime the generator, that should return the # message or flags wrapper item in the first place. doc_wrapper = items.next() - d_sol = self._soledad_write_document_parts(items) - d_sol.addCallback(notifyBack, observer, doc_wrapper) - d_sol.addErrback(ebSoledadCalls) - - def ebSoledadCalls(failure): - log.msg(failure.getTraceback()) + failed = self._soledad_write_document_parts(items) + queueNotifyBack(failed, doc_wrapper) - d = self._iter_wrapper_subparts(doc_wrapper) - d.addCallback(doSoledadCalls, deferred) - d.addErrback(ebSoledadCalls) + doSoledadCalls(self._iter_wrapper_subparts(doc_wrapper)) # # SoledadStore specific methods. # - @deferred_to_thread def _soledad_write_document_parts(self, items): """ Write the document parts to soledad in a separate thread. @@ -314,7 +289,6 @@ class SoledadStore(ContentDedup): continue return failed - @deferred_to_thread def _iter_wrapper_subparts(self, doc_wrapper): """ Return an iterator that will yield the doc_wrapper in the first place, @@ -350,6 +324,12 @@ class SoledadStore(ContentDedup): if call == self._PUT_DOC_FUN: doc_id = item.doc_id doc = self._GET_DOC_FUN(doc_id) + + if doc is None: + logger.warning("BUG! Dirty doc but could not " + "find document %s" % (doc_id,)) + return + doc.content = dict(item.content) item = doc -- cgit v1.2.3 From 4338368aa2ba0efaee742e9000e21b81af34d3db Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Tue, 11 Feb 2014 01:43:14 -0400 Subject: separate new and dirty queues --- src/leap/mail/imap/soledadstore.py | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) (limited to 'src/leap/mail/imap/soledadstore.py') diff --git a/src/leap/mail/imap/soledadstore.py b/src/leap/mail/imap/soledadstore.py index e7c6b29..667e64d 100644 --- a/src/leap/mail/imap/soledadstore.py +++ b/src/leap/mail/imap/soledadstore.py @@ -220,12 +220,15 @@ class SoledadStore(ContentDedup): to be inserted. :type queue: Queue """ - from twisted.internet import reactor - - while not queue.empty(): - doc_wrapper = queue.get() - reactor.callInThread(self._consume_doc, doc_wrapper, - self.docs_notify_queue) + new, dirty = queue + while not new.empty(): + doc_wrapper = new.get() + self.reactor.callInThread(self._consume_doc, doc_wrapper, + self.docs_notify_queue) + while not dirty.empty(): + doc_wrapper = dirty.get() + self.reactor.callInThread(self._consume_doc, doc_wrapper, + self.docs_notify_queue) # Queue empty, flush the notifications queue. self.docs_notify_queue(None, flush=True) @@ -239,7 +242,8 @@ class SoledadStore(ContentDedup): :type doc_wrapper: MessageWrapper """ if isinstance(doc_wrapper, MessageWrapper): - logger.info("unsetting new flag!") + # XXX still needed for debug quite often + #logger.info("unsetting new flag!") doc_wrapper.new = False doc_wrapper.dirty = False @@ -284,6 +288,8 @@ class SoledadStore(ContentDedup): try: self._try_call(call, item) except Exception as exc: + logger.debug("ITEM WAS: %s" % str(item)) + logger.debug("ITEM CONTENT WAS: %s" % str(item.content)) logger.exception(exc) failed = True continue -- cgit v1.2.3 From f869b7eecab67d07a23dfb8b2931b3844f7523e3 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Tue, 11 Feb 2014 01:45:20 -0400 Subject: fine grained locks for puts --- src/leap/mail/imap/soledadstore.py | 40 +++++++++++++++++++++++++++++++++----- 1 file changed, 35 insertions(+), 5 deletions(-) (limited to 'src/leap/mail/imap/soledadstore.py') diff --git a/src/leap/mail/imap/soledadstore.py b/src/leap/mail/imap/soledadstore.py index 667e64d..9d19857 100644 --- a/src/leap/mail/imap/soledadstore.py +++ b/src/leap/mail/imap/soledadstore.py @@ -20,6 +20,7 @@ A MessageStore that writes to Soledad. import logging import threading +from collections import defaultdict from itertools import chain from u1db import errors as u1db_errors @@ -123,6 +124,17 @@ class MsgWriteError(Exception): """ Raised if any exception is found while saving message parts. """ + pass + + +""" +A lock per document. +""" +# TODO should bound the space of this!!! +# http://stackoverflow.com/a/2437645/1157664 +# Setting this to twice the number of threads in the threadpool +# should be safe. +put_locks = defaultdict(lambda: threading.Lock()) class SoledadStore(ContentDedup): @@ -142,6 +154,8 @@ class SoledadStore(ContentDedup): :type soledad: Soledad """ from twisted.internet import reactor + self.reactor = reactor + self._soledad = soledad self._CREATE_DOC_FUN = self._soledad.create_doc @@ -326,9 +340,9 @@ class SoledadStore(ContentDedup): if call is None: return - with self._soledad_rw_lock: - if call == self._PUT_DOC_FUN: - doc_id = item.doc_id + if call == self._PUT_DOC_FUN: + doc_id = item.doc_id + with put_locks[doc_id]: doc = self._GET_DOC_FUN(doc_id) if doc is None: @@ -337,13 +351,26 @@ class SoledadStore(ContentDedup): return doc.content = dict(item.content) + item = doc + try: + call(item) + except u1db_errors.RevisionConflict as exc: + logger.exception("Error: %r" % (exc,)) + raise exc + except Exception as exc: + logger.exception("Error: %r" % (exc,)) + raise exc + else: try: call(item) except u1db_errors.RevisionConflict as exc: logger.exception("Error: %r" % (exc,)) raise exc + except Exception as exc: + logger.exception("Error: %r" % (exc,)) + raise exc def _get_calls_for_msg_parts(self, msg_wrapper): """ @@ -383,10 +410,11 @@ class SoledadStore(ContentDedup): # XXX FIXME Give error if dirty and not doc_id !!! doc_id = item.doc_id # defend! if not doc_id: + logger.warning("Dirty item but no doc_id!") continue if item.part == MessagePartType.fdoc: - logger.debug("PUT dirty fdoc") + #logger.debug("PUT dirty fdoc") yield item, call # XXX also for linkage-doc !!! @@ -443,6 +471,9 @@ class SoledadStore(ContentDedup): flag_docs = self._soledad.get_from_index( fields.TYPE_MBOX_UID_IDX, fields.TYPE_FLAGS_VAL, mbox, str(uid)) + if len(flag_docs) != 1: + logger.warning("More than one flag doc for %r:%s" % + (mbox, uid)) result = first(flag_docs) except Exception as exc: # ugh! Something's broken down there! @@ -506,7 +537,6 @@ class SoledadStore(ContentDedup): fields.TYPE_MBOX_DEL_IDX, fields.TYPE_FLAGS_VAL, mbox, '1')) - # TODO can deferToThread this? def remove_all_deleted(self, mbox): """ Remove from Soledad all messages flagged as deleted for a given -- cgit v1.2.3 From b83b94bf84ffb6d3bcabb192d64cf91fcfef96b4 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Tue, 11 Feb 2014 01:46:00 -0400 Subject: fix last_uid write to avoid updates to lesser values --- src/leap/mail/imap/soledadstore.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) (limited to 'src/leap/mail/imap/soledadstore.py') diff --git a/src/leap/mail/imap/soledadstore.py b/src/leap/mail/imap/soledadstore.py index 9d19857..657f21f 100644 --- a/src/leap/mail/imap/soledadstore.py +++ b/src/leap/mail/imap/soledadstore.py @@ -515,11 +515,12 @@ class SoledadStore(ContentDedup): with self._last_uid_lock: mbox_doc = self._get_mbox_document(mbox) old_val = mbox_doc.content[key] - if value < old_val: + if value > old_val: + mbox_doc.content[key] = value + self._soledad.put_doc(mbox_doc) + else: logger.error("%r:%s Tried to write a UID lesser than what's " "stored!" % (mbox, value)) - mbox_doc.content[key] = value - self._soledad.put_doc(mbox_doc) # deleted messages -- cgit v1.2.3 From f5fe0fdefa61f4989735800980dc2a3241c2fdf3 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Wed, 12 Feb 2014 01:35:48 -0400 Subject: avoid revision conflict during deletion --- src/leap/mail/imap/soledadstore.py | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) (limited to 'src/leap/mail/imap/soledadstore.py') diff --git a/src/leap/mail/imap/soledadstore.py b/src/leap/mail/imap/soledadstore.py index 657f21f..3415fa8 100644 --- a/src/leap/mail/imap/soledadstore.py +++ b/src/leap/mail/imap/soledadstore.py @@ -143,6 +143,7 @@ class SoledadStore(ContentDedup): """ _last_uid_lock = threading.Lock() _soledad_rw_lock = threading.Lock() + _remove_lock = threading.Lock() implements(IMessageConsumer, IMessageStore) @@ -526,7 +527,7 @@ class SoledadStore(ContentDedup): def deleted_iter(self, mbox): """ - Get an iterator for the SoledadDocuments for messages + Get an iterator for the the doc_id for SoledadDocuments for messages with \\Deleted flag for a given mailbox. :param mbox: the mailbox @@ -534,9 +535,9 @@ class SoledadStore(ContentDedup): :return: iterator through deleted message docs :rtype: iterable """ - return (doc for doc in self._soledad.get_from_index( + return [doc.doc_id for doc in self._soledad.get_from_index( fields.TYPE_MBOX_DEL_IDX, - fields.TYPE_FLAGS_VAL, mbox, '1')) + fields.TYPE_FLAGS_VAL, mbox, '1')] def remove_all_deleted(self, mbox): """ @@ -547,7 +548,13 @@ class SoledadStore(ContentDedup): :type mbox: str or unicode """ deleted = [] - for doc in self.deleted_iter(mbox): - deleted.append(doc.content[fields.UID_KEY]) - self._soledad.delete_doc(doc) + for doc_id in self.deleted_iter(mbox): + with self._remove_lock: + doc = self._soledad.get_doc(doc_id) + self._soledad.delete_doc(doc) + try: + deleted.append(doc.content[fields.UID_KEY]) + except TypeError: + # empty content + pass return deleted -- cgit v1.2.3 From b520a60d0e48f36dcebe03d19b65839afc460fe9 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Wed, 12 Feb 2014 12:39:33 -0400 Subject: move mbox-doc handling to soledadstore, and lock it --- src/leap/mail/imap/soledadstore.py | 115 ++++++++++++++++++++++++++----------- 1 file changed, 80 insertions(+), 35 deletions(-) (limited to 'src/leap/mail/imap/soledadstore.py') diff --git a/src/leap/mail/imap/soledadstore.py b/src/leap/mail/imap/soledadstore.py index 3415fa8..f415894 100644 --- a/src/leap/mail/imap/soledadstore.py +++ b/src/leap/mail/imap/soledadstore.py @@ -27,7 +27,7 @@ from u1db import errors as u1db_errors from twisted.python import log from zope.interface import implements -from leap.common.check import leap_assert_type +from leap.common.check import leap_assert_type, leap_assert from leap.mail.decorators import deferred_to_thread from leap.mail.imap.messageparts import MessagePartType from leap.mail.imap.messageparts import MessageWrapper @@ -141,9 +141,9 @@ class SoledadStore(ContentDedup): """ This will create docs in the local Soledad database. """ - _last_uid_lock = threading.Lock() _soledad_rw_lock = threading.Lock() _remove_lock = threading.Lock() + _mbox_doc_locks = defaultdict(lambda: threading.Lock()) implements(IMessageConsumer, IMessageStore) @@ -438,7 +438,9 @@ class SoledadStore(ContentDedup): logger.debug("Saving RFLAGS to Soledad...") yield payload, call - def _get_mbox_document(self, mbox): + # Mbox documents and attributes + + def get_mbox_document(self, mbox): """ Return mailbox document. @@ -448,15 +450,83 @@ class SoledadStore(ContentDedup): the query failed. :rtype: SoledadDocument or None. """ + with self._mbox_doc_locks[mbox]: + return self._get_mbox_document(mbox) + + def _get_mbox_document(self, mbox): + """ + Helper for returning the mailbox document. + """ try: query = self._soledad.get_from_index( fields.TYPE_MBOX_IDX, fields.TYPE_MBOX_VAL, mbox) if query: return query.pop() + else: + logger.error("Could not find mbox document for %r" % + (self.mbox,)) except Exception as exc: logger.exception("Unhandled error %r" % exc) + def get_mbox_closed(self, mbox): + """ + Return the closed attribute for a given mailbox. + + :param mbox: the mailbox + :type mbox: str or unicode + :rtype: bool + """ + mbox_doc = self.get_mbox_document() + return mbox_doc.content.get(fields.CLOSED_KEY, False) + + def set_mbox_closed(self, mbox, closed): + """ + Set the closed attribute for a given mailbox. + + :param mbox: the mailbox + :type mbox: str or unicode + :param closed: the value to be set + :type closed: bool + """ + leap_assert(isinstance(closed, bool), "closed needs to be boolean") + with self._mbox_doc_locks[mbox]: + mbox_doc = self._get_mbox_document(mbox) + if mbox_doc is None: + logger.error( + "Could not find mbox document for %r" % (mbox,)) + return + mbox_doc.content[fields.CLOSED_KEY] = closed + self._soledad.put_doc(mbox_doc) + + def write_last_uid(self, mbox, value): + """ + Write the `last_uid` integer to the proper mailbox document + in Soledad. + This is called from the deferred triggered by + memorystore.increment_last_soledad_uid, which is expected to + run in a separate thread. + + :param mbox: the mailbox + :type mbox: str or unicode + :param value: the value to set + :type value: int + """ + leap_assert_type(value, int) + key = fields.LAST_UID_KEY + + # XXX change for a lock related to the mbox document + # itself. + with self._mbox_doc_locks[mbox]: + mbox_doc = self._get_mbox_document(mbox) + old_val = mbox_doc.content[key] + if value > old_val: + mbox_doc.content[key] = value + self._soledad.put_doc(mbox_doc) + else: + logger.error("%r:%s Tried to write a UID lesser than what's " + "stored!" % (mbox, value)) + def get_flags_doc(self, mbox, uid): """ Return the SoledadDocument for the given mbox and uid. @@ -497,32 +567,6 @@ class SoledadStore(ContentDedup): fields.TYPE_HEADERS_VAL, str(chash)) return first(head_docs) - def write_last_uid(self, mbox, value): - """ - Write the `last_uid` integer to the proper mailbox document - in Soledad. - This is called from the deferred triggered by - memorystore.increment_last_soledad_uid, which is expected to - run in a separate thread. - - :param mbox: the mailbox - :type mbox: str or unicode - :param value: the value to set - :type value: int - """ - leap_assert_type(value, int) - key = fields.LAST_UID_KEY - - with self._last_uid_lock: - mbox_doc = self._get_mbox_document(mbox) - old_val = mbox_doc.content[key] - if value > old_val: - mbox_doc.content[key] = value - self._soledad.put_doc(mbox_doc) - else: - logger.error("%r:%s Tried to write a UID lesser than what's " - "stored!" % (mbox, value)) - # deleted messages def deleted_iter(self, mbox): @@ -551,10 +595,11 @@ class SoledadStore(ContentDedup): for doc_id in self.deleted_iter(mbox): with self._remove_lock: doc = self._soledad.get_doc(doc_id) - self._soledad.delete_doc(doc) - try: - deleted.append(doc.content[fields.UID_KEY]) - except TypeError: - # empty content - pass + if doc is not None: + self._soledad.delete_doc(doc) + try: + deleted.append(doc.content[fields.UID_KEY]) + except TypeError: + # empty content + pass return deleted -- cgit v1.2.3 From 1ff1663fb2968adff87208134c374c4084670ace Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Wed, 12 Feb 2014 13:05:08 -0400 Subject: docstring fixes --- src/leap/mail/imap/soledadstore.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) (limited to 'src/leap/mail/imap/soledadstore.py') diff --git a/src/leap/mail/imap/soledadstore.py b/src/leap/mail/imap/soledadstore.py index f415894..6d6d382 100644 --- a/src/leap/mail/imap/soledadstore.py +++ b/src/leap/mail/imap/soledadstore.py @@ -41,9 +41,7 @@ logger = logging.getLogger(__name__) # TODO -# [ ] Delete original message from the incoming queue after all successful -# writes. -# [ ] Implement a retry queue. +# [ ] Implement a retry queue? # [ ] Consider journaling of operations. @@ -231,9 +229,9 @@ class SoledadStore(ContentDedup): """ Creates a new document in soledad db. - :param queue: queue to get item from, with content of the document - to be inserted. - :type queue: Queue + :param queue: a tuple of queues to get item from, with content of the + document to be inserted. + :type queue: tuple of Queues """ new, dirty = queue while not new.empty(): @@ -266,9 +264,14 @@ class SoledadStore(ContentDedup): def _consume_doc(self, doc_wrapper, notify_queue): """ Consume each document wrapper in a separate thread. + We pass an instance of an accumulator that handles the notifications + to the memorystore when the write has been done. :param doc_wrapper: a MessageWrapper or RecentFlagsDoc instance :type doc_wrapper: MessageWrapper or RecentFlagsDoc + :param notify_queue: a callable that handles the writeback + notifications to the memstore. + :type notify_queue: callable """ def queueNotifyBack(failed, doc_wrapper): if failed: @@ -316,8 +319,8 @@ class SoledadStore(ContentDedup): followed by the subparts item and the proper call type for every item in the queue, if any. - :param queue: the queue from where we'll pick item. - :type queue: Queue + :param doc_wrapper: a MessageWrapper or RecentFlagsDoc instance + :type doc_wrapper: MessageWrapper or RecentFlagsDoc """ if isinstance(doc_wrapper, MessageWrapper): return chain((doc_wrapper,), -- cgit v1.2.3 From 45733a231128cc06e123f352b4eb9886d6820878 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Fri, 14 Feb 2014 12:41:58 -0400 Subject: docstring fixes --- src/leap/mail/imap/soledadstore.py | 3 +++ 1 file changed, 3 insertions(+) (limited to 'src/leap/mail/imap/soledadstore.py') diff --git a/src/leap/mail/imap/soledadstore.py b/src/leap/mail/imap/soledadstore.py index 6d6d382..e1a278a 100644 --- a/src/leap/mail/imap/soledadstore.py +++ b/src/leap/mail/imap/soledadstore.py @@ -295,9 +295,12 @@ class SoledadStore(ContentDedup): def _soledad_write_document_parts(self, items): """ Write the document parts to soledad in a separate thread. + :param items: the iterator through the different document wrappers payloads. :type items: iterator + :return: whether the write was successful or not + :rtype: bool """ failed = False for item, call in items: -- cgit v1.2.3 From 2f114c5a85775f4044aa12cd421231ed92544549 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Mon, 17 Feb 2014 10:53:30 -0400 Subject: remove floody log --- src/leap/mail/imap/soledadstore.py | 3 --- 1 file changed, 3 deletions(-) (limited to 'src/leap/mail/imap/soledadstore.py') diff --git a/src/leap/mail/imap/soledadstore.py b/src/leap/mail/imap/soledadstore.py index e1a278a..732fe03 100644 --- a/src/leap/mail/imap/soledadstore.py +++ b/src/leap/mail/imap/soledadstore.py @@ -529,9 +529,6 @@ class SoledadStore(ContentDedup): if value > old_val: mbox_doc.content[key] = value self._soledad.put_doc(mbox_doc) - else: - logger.error("%r:%s Tried to write a UID lesser than what's " - "stored!" % (mbox, value)) def get_flags_doc(self, mbox, uid): """ -- cgit v1.2.3 From d6ee618534596e76733d3c6b6cf0a75bdb4aa905 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Tue, 18 Feb 2014 09:58:54 -0400 Subject: catch soledad error while updating mbox doc --- src/leap/mail/imap/soledadstore.py | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) (limited to 'src/leap/mail/imap/soledadstore.py') diff --git a/src/leap/mail/imap/soledadstore.py b/src/leap/mail/imap/soledadstore.py index 732fe03..919f834 100644 --- a/src/leap/mail/imap/soledadstore.py +++ b/src/leap/mail/imap/soledadstore.py @@ -133,15 +133,14 @@ A lock per document. # Setting this to twice the number of threads in the threadpool # should be safe. put_locks = defaultdict(lambda: threading.Lock()) +mbox_doc_locks = defaultdict(lambda: threading.Lock()) class SoledadStore(ContentDedup): """ This will create docs in the local Soledad database. """ - _soledad_rw_lock = threading.Lock() _remove_lock = threading.Lock() - _mbox_doc_locks = defaultdict(lambda: threading.Lock()) implements(IMessageConsumer, IMessageStore) @@ -456,7 +455,7 @@ class SoledadStore(ContentDedup): the query failed. :rtype: SoledadDocument or None. """ - with self._mbox_doc_locks[mbox]: + with mbox_doc_locks[mbox]: return self._get_mbox_document(mbox) def _get_mbox_document(self, mbox): @@ -471,7 +470,7 @@ class SoledadStore(ContentDedup): return query.pop() else: logger.error("Could not find mbox document for %r" % - (self.mbox,)) + (mbox,)) except Exception as exc: logger.exception("Unhandled error %r" % exc) @@ -496,7 +495,7 @@ class SoledadStore(ContentDedup): :type closed: bool """ leap_assert(isinstance(closed, bool), "closed needs to be boolean") - with self._mbox_doc_locks[mbox]: + with mbox_doc_locks[mbox]: mbox_doc = self._get_mbox_document(mbox) if mbox_doc is None: logger.error( @@ -521,14 +520,18 @@ class SoledadStore(ContentDedup): leap_assert_type(value, int) key = fields.LAST_UID_KEY - # XXX change for a lock related to the mbox document - # itself. - with self._mbox_doc_locks[mbox]: + # XXX use accumulator to reduce number of hits + with mbox_doc_locks[mbox]: mbox_doc = self._get_mbox_document(mbox) old_val = mbox_doc.content[key] if value > old_val: mbox_doc.content[key] = value - self._soledad.put_doc(mbox_doc) + try: + self._soledad.put_doc(mbox_doc) + except Exception as exc: + logger.error("Error while setting last_uid for %r" + % (mbox,)) + logger.exception(exc) def get_flags_doc(self, mbox, uid): """ -- cgit v1.2.3 From 95e00da239bd119ae161f249a74af229cb6c7759 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Thu, 20 Feb 2014 00:00:24 -0400 Subject: fix attribute error on debug line --- src/leap/mail/imap/soledadstore.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) (limited to 'src/leap/mail/imap/soledadstore.py') diff --git a/src/leap/mail/imap/soledadstore.py b/src/leap/mail/imap/soledadstore.py index 919f834..ed5259a 100644 --- a/src/leap/mail/imap/soledadstore.py +++ b/src/leap/mail/imap/soledadstore.py @@ -308,8 +308,10 @@ class SoledadStore(ContentDedup): try: self._try_call(call, item) except Exception as exc: - logger.debug("ITEM WAS: %s" % str(item)) - logger.debug("ITEM CONTENT WAS: %s" % str(item.content)) + logger.debug("ITEM WAS: %s" % repr(item)) + if hasattr(item, 'content'): + logger.debug("ITEM CONTENT WAS: %s" % + repr(item.content)) logger.exception(exc) failed = True continue -- cgit v1.2.3 From 4bcb32639bff9a5aab076dba2bdc7667cea60c7f Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Thu, 20 Feb 2014 01:11:26 -0400 Subject: fix rdoc duplication --- src/leap/mail/imap/soledadstore.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) (limited to 'src/leap/mail/imap/soledadstore.py') diff --git a/src/leap/mail/imap/soledadstore.py b/src/leap/mail/imap/soledadstore.py index ed5259a..e047e2e 100644 --- a/src/leap/mail/imap/soledadstore.py +++ b/src/leap/mail/imap/soledadstore.py @@ -350,6 +350,9 @@ class SoledadStore(ContentDedup): if call == self._PUT_DOC_FUN: doc_id = item.doc_id + if doc_id is None: + logger.warning("BUG! Dirty doc but has no doc_id!") + return with put_locks[doc_id]: doc = self._GET_DOC_FUN(doc_id) @@ -438,12 +441,12 @@ class SoledadStore(ContentDedup): :return: a tuple with recent-flags doc payload and callable :rtype: tuple """ - call = self._CREATE_DOC_FUN + call = self._PUT_DOC_FUN payload = rflags_wrapper.content if payload: logger.debug("Saving RFLAGS to Soledad...") - yield payload, call + yield rflags_wrapper, call # Mbox documents and attributes -- cgit v1.2.3 From 52868a4c67170abbfa19deda9bd20931c21554b7 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Thu, 20 Feb 2014 01:21:46 -0400 Subject: catch stopiteration --- src/leap/mail/imap/soledadstore.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) (limited to 'src/leap/mail/imap/soledadstore.py') diff --git a/src/leap/mail/imap/soledadstore.py b/src/leap/mail/imap/soledadstore.py index e047e2e..25f00bb 100644 --- a/src/leap/mail/imap/soledadstore.py +++ b/src/leap/mail/imap/soledadstore.py @@ -281,9 +281,13 @@ class SoledadStore(ContentDedup): def doSoledadCalls(items): # we prime the generator, that should return the # message or flags wrapper item in the first place. - doc_wrapper = items.next() - failed = self._soledad_write_document_parts(items) - queueNotifyBack(failed, doc_wrapper) + try: + doc_wrapper = items.next() + except StopIteration: + pass + else: + failed = self._soledad_write_document_parts(items) + queueNotifyBack(failed, doc_wrapper) doSoledadCalls(self._iter_wrapper_subparts(doc_wrapper)) -- cgit v1.2.3 From b2d97c9faef6037a065e2903afe5b0ab2624917e Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Thu, 20 Feb 2014 02:52:17 -0400 Subject: mail parsing performance improvements Although the do_parse function is deferred to threads, we were actually waiting till its return to fire the callback of the deferred, and hence the "append ok" was being delayed. During massive appends, this was a tight loop contributing as much as 35 msec, of a total of 100 msec average. Several ineficiencies are addressed here: * use pycryptopp hash functions. * avoiding function calling overhead. * avoid duplicate call to message.as_string * make use of the string size caching capabilities. * avoiding the mail Parser initialization/method call completely, in favor of the module helper to get the object from string. Overall, these changes cut parsing to 50% of the initial timing by my measurements with line_profiler, YMMV. --- src/leap/mail/imap/soledadstore.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'src/leap/mail/imap/soledadstore.py') diff --git a/src/leap/mail/imap/soledadstore.py b/src/leap/mail/imap/soledadstore.py index 25f00bb..f3de8eb 100644 --- a/src/leap/mail/imap/soledadstore.py +++ b/src/leap/mail/imap/soledadstore.py @@ -314,8 +314,8 @@ class SoledadStore(ContentDedup): except Exception as exc: logger.debug("ITEM WAS: %s" % repr(item)) if hasattr(item, 'content'): - logger.debug("ITEM CONTENT WAS: %s" % - repr(item.content)) + logger.debug("ITEM CONTENT WAS: %s" % + repr(item.content)) logger.exception(exc) failed = True continue -- cgit v1.2.3