summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKali Kaneko <kali@leap.se>2014-02-11 01:52:21 -0400
committerKali Kaneko <kali@leap.se>2014-02-17 11:39:49 -0400
commit04dfa3afdbb2080c717bfd32d6e47641615967fc (patch)
treeba3b3f1fbfa6394770909ed91cfb363f5ceead74
parentdf9cd2e0d59600840acb6aa00f36b7eb43e48297 (diff)
improve flag-docs relative internal storage
-rw-r--r--src/leap/mail/imap/memorystore.py58
1 files changed, 44 insertions, 14 deletions
diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py
index a053f3f..2835826 100644
--- a/src/leap/mail/imap/memorystore.py
+++ b/src/leap/mail/imap/memorystore.py
@@ -92,6 +92,7 @@ class MemoryStore(object):
WRITING_FLAG = "_writing"
_last_uid_lock = threading.Lock()
+ _fdoc_docid_lock = threading.Lock()
def __init__(self, permanent_store=None,
write_period=SOLEDAD_WRITE_PERIOD):
@@ -158,7 +159,7 @@ class MemoryStore(object):
'mbox-b': weakref.proxy(dict)}
}
"""
- self._chash_fdoc_store = {}
+ self._chash_fdoc_store = defaultdict(lambda: defaultdict(lambda: None))
# Internal Storage: recent-flags store
"""
@@ -275,7 +276,7 @@ class MemoryStore(object):
"""
from twisted.internet import reactor
- log.msg("adding new doc to memstore %r (%r)" % (mbox, uid))
+ log.msg("Adding new doc to memstore %r (%r)" % (mbox, uid))
key = mbox, uid
self._add_message(mbox, uid, message, notify_on_disk)
@@ -340,15 +341,12 @@ class MemoryStore(object):
if fdoc is not None:
fdoc_store = self._fdoc_store[mbox][uid]
fdoc_store.update(fdoc)
+ chash_fdoc_store = self._chash_fdoc_store
# content-hash indexing
chash = fdoc.get(fields.CONTENT_HASH_KEY)
- chash_fdoc_store = self._chash_fdoc_store
- if not chash in chash_fdoc_store:
- chash_fdoc_store[chash] = {}
-
chash_fdoc_store[chash][mbox] = weakref.proxy(
- fdoc_store)
+ self._fdoc_store[mbox][uid])
hdoc = msg_dict.get(HDOC, None)
if hdoc is not None:
@@ -381,7 +379,8 @@ class MemoryStore(object):
:type uid: int
:rtype: unicode or None
"""
- doc_id = self._fdoc_id_store[mbox][uid]
+ with self._fdoc_docid_lock:
+ doc_id = self._fdoc_id_store[mbox][uid]
if empty(doc_id):
fdoc = self._permanent_store.get_flags_doc(mbox, uid)
@@ -475,6 +474,8 @@ class MemoryStore(object):
if key in self._sizes:
del self._sizes[key]
self._fdoc_store[mbox].pop(uid, None)
+ with self._fdoc_docid_lock:
+ self._fdoc_id_store[mbox].pop(uid, None)
except Exception as exc:
logger.exception(exc)
@@ -571,7 +572,8 @@ class MemoryStore(object):
:param value: the value to set
:type value: int
"""
- leap_assert_type(value, int)
+ # can be long???
+ #leap_assert_type(value, int)
logger.info("setting last soledad uid for %s to %s" %
(mbox, value))
# if we already have a value here, don't do anything
@@ -603,10 +605,9 @@ class MemoryStore(object):
with self._last_uid_lock:
self._last_uid[mbox] += 1
value = self._last_uid[mbox]
- self.write_last_uid(mbox, value)
+ self.reactor.callInThread(self.write_last_uid, mbox, value)
return value
- @deferred_to_thread
def write_last_uid(self, mbox, value):
"""
Increment the soledad integer cache for the highest uid value.
@@ -633,10 +634,36 @@ class MemoryStore(object):
"""
# We can do direct assignments cause we know this will only
# be called during initialization of the mailbox.
+ # TODO could hook here a sanity-check
+ # for duplicates
fdoc_store = self._fdoc_store[mbox]
+ chash_fdoc_store = self._chash_fdoc_store
for uid in flag_docs:
- fdoc_store[uid] = ReferenciableDict(flag_docs[uid])
+ rdict = ReferenciableDict(flag_docs[uid])
+ fdoc_store[uid] = rdict
+ # populate chash dict too, to avoid fdoc duplication
+ chash = flag_docs[uid]["chash"]
+ chash_fdoc_store[chash][mbox] = weakref.proxy(
+ self._fdoc_store[mbox][uid])
+
+ def update_flags(self, mbox, uid, fdoc):
+ """
+ Update the flag document for a given mbox and uid combination,
+ and set the dirty flag.
+ We could use put_message, but this is faster.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :param uid: the uid of the message
+ :type uid: int
+
+ :param fdoc: a dict with the content for the flag docs
+ :type fdoc: dict
+ """
+ key = mbox, uid
+ self._fdoc_store[mbox][uid].update(fdoc)
+ self._dirty.add(key)
def load_header_docs(self, header_docs):
"""
@@ -759,8 +786,7 @@ class MemoryStore(object):
:return: MessagePartDoc. It will return None if the flags document
has empty content or it is flagged as \\Deleted.
"""
- docs_dict = self._chash_fdoc_store.get(chash, None)
- fdoc = docs_dict.get(mbox, None) if docs_dict else None
+ fdoc = self._chash_fdoc_store[chash][mbox]
# a couple of special cases.
# 1. We might have a doc with empty content...
@@ -778,6 +804,7 @@ class MemoryStore(object):
key = mbox, uid
new = key in self._new
dirty = key in self._dirty
+
return MessagePartDoc(
new=new, dirty=dirty, store="mem",
part=MessagePartType.fdoc,
@@ -1027,6 +1054,8 @@ class MemoryStore(object):
"""
self._stop_write_loop()
if self._permanent_store is not None:
+ # XXX we should check if we did get a True value on this
+ # operation. If we got False we should retry! (queue was not empty)
self.write_messages(self._permanent_store)
self.producer.flush()
@@ -1090,6 +1119,7 @@ class MemoryStore(object):
try:
# 1. Delete all messages marked as deleted in soledad.
+ logger.debug("DELETING FROM SOLEDAD ALL FOR %r" % (mbox,))
sol_deleted = soledad_store.remove_all_deleted(mbox)
try: