From 04dfa3afdbb2080c717bfd32d6e47641615967fc Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Tue, 11 Feb 2014 01:52:21 -0400 Subject: improve flag-docs relative internal storage --- src/leap/mail/imap/memorystore.py | 58 +++++++++++++++++++++++++++++---------- 1 file changed, 44 insertions(+), 14 deletions(-) (limited to 'src/leap/mail') diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py index a053f3f..2835826 100644 --- a/src/leap/mail/imap/memorystore.py +++ b/src/leap/mail/imap/memorystore.py @@ -92,6 +92,7 @@ class MemoryStore(object): WRITING_FLAG = "_writing" _last_uid_lock = threading.Lock() + _fdoc_docid_lock = threading.Lock() def __init__(self, permanent_store=None, write_period=SOLEDAD_WRITE_PERIOD): @@ -158,7 +159,7 @@ class MemoryStore(object): 'mbox-b': weakref.proxy(dict)} } """ - self._chash_fdoc_store = {} + self._chash_fdoc_store = defaultdict(lambda: defaultdict(lambda: None)) # Internal Storage: recent-flags store """ @@ -275,7 +276,7 @@ class MemoryStore(object): """ from twisted.internet import reactor - log.msg("adding new doc to memstore %r (%r)" % (mbox, uid)) + log.msg("Adding new doc to memstore %r (%r)" % (mbox, uid)) key = mbox, uid self._add_message(mbox, uid, message, notify_on_disk) @@ -340,15 +341,12 @@ class MemoryStore(object): if fdoc is not None: fdoc_store = self._fdoc_store[mbox][uid] fdoc_store.update(fdoc) + chash_fdoc_store = self._chash_fdoc_store # content-hash indexing chash = fdoc.get(fields.CONTENT_HASH_KEY) - chash_fdoc_store = self._chash_fdoc_store - if not chash in chash_fdoc_store: - chash_fdoc_store[chash] = {} - chash_fdoc_store[chash][mbox] = weakref.proxy( - fdoc_store) + self._fdoc_store[mbox][uid]) hdoc = msg_dict.get(HDOC, None) if hdoc is not None: @@ -381,7 +379,8 @@ class MemoryStore(object): :type uid: int :rtype: unicode or None """ - doc_id = self._fdoc_id_store[mbox][uid] + with self._fdoc_docid_lock: + doc_id = self._fdoc_id_store[mbox][uid] if empty(doc_id): fdoc = self._permanent_store.get_flags_doc(mbox, uid) @@ -475,6 +474,8 @@ class MemoryStore(object): if key in self._sizes: del self._sizes[key] self._fdoc_store[mbox].pop(uid, None) + with self._fdoc_docid_lock: + self._fdoc_id_store[mbox].pop(uid, None) except Exception as exc: logger.exception(exc) @@ -571,7 +572,8 @@ class MemoryStore(object): :param value: the value to set :type value: int """ - leap_assert_type(value, int) + # can be long??? + #leap_assert_type(value, int) logger.info("setting last soledad uid for %s to %s" % (mbox, value)) # if we already have a value here, don't do anything @@ -603,10 +605,9 @@ class MemoryStore(object): with self._last_uid_lock: self._last_uid[mbox] += 1 value = self._last_uid[mbox] - self.write_last_uid(mbox, value) + self.reactor.callInThread(self.write_last_uid, mbox, value) return value - @deferred_to_thread def write_last_uid(self, mbox, value): """ Increment the soledad integer cache for the highest uid value. @@ -633,10 +634,36 @@ class MemoryStore(object): """ # We can do direct assignments cause we know this will only # be called during initialization of the mailbox. + # TODO could hook here a sanity-check + # for duplicates fdoc_store = self._fdoc_store[mbox] + chash_fdoc_store = self._chash_fdoc_store for uid in flag_docs: - fdoc_store[uid] = ReferenciableDict(flag_docs[uid]) + rdict = ReferenciableDict(flag_docs[uid]) + fdoc_store[uid] = rdict + # populate chash dict too, to avoid fdoc duplication + chash = flag_docs[uid]["chash"] + chash_fdoc_store[chash][mbox] = weakref.proxy( + self._fdoc_store[mbox][uid]) + + def update_flags(self, mbox, uid, fdoc): + """ + Update the flag document for a given mbox and uid combination, + and set the dirty flag. + We could use put_message, but this is faster. + + :param mbox: the mailbox + :type mbox: str or unicode + :param uid: the uid of the message + :type uid: int + + :param fdoc: a dict with the content for the flag docs + :type fdoc: dict + """ + key = mbox, uid + self._fdoc_store[mbox][uid].update(fdoc) + self._dirty.add(key) def load_header_docs(self, header_docs): """ @@ -759,8 +786,7 @@ class MemoryStore(object): :return: MessagePartDoc. It will return None if the flags document has empty content or it is flagged as \\Deleted. """ - docs_dict = self._chash_fdoc_store.get(chash, None) - fdoc = docs_dict.get(mbox, None) if docs_dict else None + fdoc = self._chash_fdoc_store[chash][mbox] # a couple of special cases. # 1. We might have a doc with empty content... @@ -778,6 +804,7 @@ class MemoryStore(object): key = mbox, uid new = key in self._new dirty = key in self._dirty + return MessagePartDoc( new=new, dirty=dirty, store="mem", part=MessagePartType.fdoc, @@ -1027,6 +1054,8 @@ class MemoryStore(object): """ self._stop_write_loop() if self._permanent_store is not None: + # XXX we should check if we did get a True value on this + # operation. If we got False we should retry! (queue was not empty) self.write_messages(self._permanent_store) self.producer.flush() @@ -1090,6 +1119,7 @@ class MemoryStore(object): try: # 1. Delete all messages marked as deleted in soledad. + logger.debug("DELETING FROM SOLEDAD ALL FOR %r" % (mbox,)) sol_deleted = soledad_store.remove_all_deleted(mbox) try: -- cgit v1.2.3