From b520a60d0e48f36dcebe03d19b65839afc460fe9 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Wed, 12 Feb 2014 12:39:33 -0400 Subject: move mbox-doc handling to soledadstore, and lock it --- src/leap/mail/imap/mailbox.py | 22 ++----- src/leap/mail/imap/memorystore.py | 36 ++++++++++++ src/leap/mail/imap/soledadstore.py | 115 ++++++++++++++++++++++++++----------- 3 files changed, 120 insertions(+), 53 deletions(-) (limited to 'src') diff --git a/src/leap/mail/imap/mailbox.py b/src/leap/mail/imap/mailbox.py index 087780f..d18bc9a 100644 --- a/src/leap/mail/imap/mailbox.py +++ b/src/leap/mail/imap/mailbox.py @@ -200,7 +200,6 @@ class SoledadMailbox(WithMsgFields, MBoxParser): """ self.listeners.remove(listener) - # TODO move completely to soledadstore, under memstore reponsibility. def _get_mbox_doc(self): """ Return mailbox document. @@ -209,17 +208,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser): the query failed. :rtype: SoledadDocument or None. """ - try: - query = self._soledad.get_from_index( - fields.TYPE_MBOX_IDX, - fields.TYPE_MBOX_VAL, self.mbox) - if query: - return query.pop() - else: - logger.error("Could not find mbox document for %r" % - (self.mbox,)) - except Exception as exc: - logger.exception("Unhandled error %r" % exc) + return self._memstore.get_mbox_doc(self.mbox) def getFlags(self): """ @@ -234,6 +223,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser): flags = mbox.content.get(self.FLAGS_KEY, []) return map(str, flags) + # XXX move to memstore->soledadstore def setFlags(self, flags): """ Sets flags for this mailbox. @@ -258,8 +248,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser): :return: True if the mailbox is closed :rtype: bool """ - mbox = self._get_mbox_doc() - return mbox.content.get(self.CLOSED_KEY, False) + return self._memstore.get_mbox_closed(self.mbox) def _set_closed(self, closed): """ @@ -268,10 +257,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser): :param closed: the state to be set :type closed: bool """ - leap_assert(isinstance(closed, bool), "closed needs to be boolean") - mbox = self._get_mbox_doc() - mbox.content[self.CLOSED_KEY] = closed - self._soledad.put_doc(mbox) + self._memstore.set_mbox_closed(self.mbox, closed) closed = property( _get_closed, _set_closed, doc="Closed attribute.") diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py index 4aaee75..ba444b0 100644 --- a/src/leap/mail/imap/memorystore.py +++ b/src/leap/mail/imap/memorystore.py @@ -293,6 +293,7 @@ class MemoryStore(object): # a defer that will inmediately have its callback triggered. self.reactor.callFromThread(observer.callback, uid) + def put_message(self, mbox, uid, message, notify_on_disk=True): """ Put an existing message. @@ -1176,8 +1177,43 @@ class MemoryStore(object): logger.exception(exc) finally: self._start_write_loop() + observer.callback(all_deleted) + # Mailbox documents and attributes + + # This could be also be cached in memstore, but proxying directly + # to soledad since it's not too performance-critical. + + def get_mbox_doc(self, mbox): + """ + Return the soledad document for a given mailbox. + + :param mbox: the mailbox + :type mbox: str or unicode + :rtype: SoledadDocument or None. + """ + return self.permanent_store.get_mbox_document(mbox) + + def get_mbox_closed(self, mbox): + """ + Return the closed attribute for a given mailbox. + + :param mbox: the mailbox + :type mbox: str or unicode + :rtype: bool + """ + return self.permanent_store.get_mbox_closed(mbox) + + def set_mbox_closed(self, mbox, closed): + """ + Set the closed attribute for a given mailbox. + + :param mbox: the mailbox + :type mbox: str or unicode + """ + self.permanent_store.set_mbox_closed(mbox, closed) + # Dump-to-disk controls. @property diff --git a/src/leap/mail/imap/soledadstore.py b/src/leap/mail/imap/soledadstore.py index 3415fa8..f415894 100644 --- a/src/leap/mail/imap/soledadstore.py +++ b/src/leap/mail/imap/soledadstore.py @@ -27,7 +27,7 @@ from u1db import errors as u1db_errors from twisted.python import log from zope.interface import implements -from leap.common.check import leap_assert_type +from leap.common.check import leap_assert_type, leap_assert from leap.mail.decorators import deferred_to_thread from leap.mail.imap.messageparts import MessagePartType from leap.mail.imap.messageparts import MessageWrapper @@ -141,9 +141,9 @@ class SoledadStore(ContentDedup): """ This will create docs in the local Soledad database. """ - _last_uid_lock = threading.Lock() _soledad_rw_lock = threading.Lock() _remove_lock = threading.Lock() + _mbox_doc_locks = defaultdict(lambda: threading.Lock()) implements(IMessageConsumer, IMessageStore) @@ -438,7 +438,9 @@ class SoledadStore(ContentDedup): logger.debug("Saving RFLAGS to Soledad...") yield payload, call - def _get_mbox_document(self, mbox): + # Mbox documents and attributes + + def get_mbox_document(self, mbox): """ Return mailbox document. @@ -448,15 +450,83 @@ class SoledadStore(ContentDedup): the query failed. :rtype: SoledadDocument or None. """ + with self._mbox_doc_locks[mbox]: + return self._get_mbox_document(mbox) + + def _get_mbox_document(self, mbox): + """ + Helper for returning the mailbox document. + """ try: query = self._soledad.get_from_index( fields.TYPE_MBOX_IDX, fields.TYPE_MBOX_VAL, mbox) if query: return query.pop() + else: + logger.error("Could not find mbox document for %r" % + (self.mbox,)) except Exception as exc: logger.exception("Unhandled error %r" % exc) + def get_mbox_closed(self, mbox): + """ + Return the closed attribute for a given mailbox. + + :param mbox: the mailbox + :type mbox: str or unicode + :rtype: bool + """ + mbox_doc = self.get_mbox_document() + return mbox_doc.content.get(fields.CLOSED_KEY, False) + + def set_mbox_closed(self, mbox, closed): + """ + Set the closed attribute for a given mailbox. + + :param mbox: the mailbox + :type mbox: str or unicode + :param closed: the value to be set + :type closed: bool + """ + leap_assert(isinstance(closed, bool), "closed needs to be boolean") + with self._mbox_doc_locks[mbox]: + mbox_doc = self._get_mbox_document(mbox) + if mbox_doc is None: + logger.error( + "Could not find mbox document for %r" % (mbox,)) + return + mbox_doc.content[fields.CLOSED_KEY] = closed + self._soledad.put_doc(mbox_doc) + + def write_last_uid(self, mbox, value): + """ + Write the `last_uid` integer to the proper mailbox document + in Soledad. + This is called from the deferred triggered by + memorystore.increment_last_soledad_uid, which is expected to + run in a separate thread. + + :param mbox: the mailbox + :type mbox: str or unicode + :param value: the value to set + :type value: int + """ + leap_assert_type(value, int) + key = fields.LAST_UID_KEY + + # XXX change for a lock related to the mbox document + # itself. + with self._mbox_doc_locks[mbox]: + mbox_doc = self._get_mbox_document(mbox) + old_val = mbox_doc.content[key] + if value > old_val: + mbox_doc.content[key] = value + self._soledad.put_doc(mbox_doc) + else: + logger.error("%r:%s Tried to write a UID lesser than what's " + "stored!" % (mbox, value)) + def get_flags_doc(self, mbox, uid): """ Return the SoledadDocument for the given mbox and uid. @@ -497,32 +567,6 @@ class SoledadStore(ContentDedup): fields.TYPE_HEADERS_VAL, str(chash)) return first(head_docs) - def write_last_uid(self, mbox, value): - """ - Write the `last_uid` integer to the proper mailbox document - in Soledad. - This is called from the deferred triggered by - memorystore.increment_last_soledad_uid, which is expected to - run in a separate thread. - - :param mbox: the mailbox - :type mbox: str or unicode - :param value: the value to set - :type value: int - """ - leap_assert_type(value, int) - key = fields.LAST_UID_KEY - - with self._last_uid_lock: - mbox_doc = self._get_mbox_document(mbox) - old_val = mbox_doc.content[key] - if value > old_val: - mbox_doc.content[key] = value - self._soledad.put_doc(mbox_doc) - else: - logger.error("%r:%s Tried to write a UID lesser than what's " - "stored!" % (mbox, value)) - # deleted messages def deleted_iter(self, mbox): @@ -551,10 +595,11 @@ class SoledadStore(ContentDedup): for doc_id in self.deleted_iter(mbox): with self._remove_lock: doc = self._soledad.get_doc(doc_id) - self._soledad.delete_doc(doc) - try: - deleted.append(doc.content[fields.UID_KEY]) - except TypeError: - # empty content - pass + if doc is not None: + self._soledad.delete_doc(doc) + try: + deleted.append(doc.content[fields.UID_KEY]) + except TypeError: + # empty content + pass return deleted -- cgit v1.2.3