summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKali Kaneko <kali@leap.se>2014-02-07 05:50:55 -0400
committerKali Kaneko <kali@leap.se>2014-02-17 11:39:44 -0400
commitb92e63c316c1cf9f8b6481dbfa70737acfb3eee9 (patch)
treee60e9b6e5367ad185c9f387f2ff6af9fd6c726f9
parent813db4a356141592337f39f9c801203367c63193 (diff)
separate better dirty/new flags; add cdocs
-rw-r--r--src/leap/mail/imap/memorystore.py88
-rw-r--r--src/leap/mail/imap/messages.py21
-rw-r--r--src/leap/mail/imap/soledadstore.py22
-rw-r--r--src/leap/mail/utils.py19
4 files changed, 106 insertions, 44 deletions
diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py
index 4156c0b..ee3ee92 100644
--- a/src/leap/mail/imap/memorystore.py
+++ b/src/leap/mail/imap/memorystore.py
@@ -24,6 +24,7 @@ import weakref
from collections import defaultdict
from copy import copy
+from itertools import chain
from twisted.internet import defer
from twisted.internet.task import LoopingCall
@@ -33,7 +34,7 @@ from zope.interface import implements
from leap.common.check import leap_assert_type
from leap.mail import size
from leap.mail.decorators import deferred_to_thread
-from leap.mail.utils import empty
+from leap.mail.utils import empty, phash_iter
from leap.mail.messageflow import MessageProducer
from leap.mail.imap import interfaces
from leap.mail.imap.fields import fields
@@ -110,13 +111,12 @@ class MemoryStore(object):
# Internal Storage: messages
"""
- Flags document store.
+ flags document store.
_fdoc_store[mbox][uid] = { 'content': 'aaa' }
"""
self._fdoc_store = defaultdict(lambda: defaultdict(
lambda: ReferenciableDict({})))
-<<<<<<< HEAD
# Sizes
"""
{'mbox, uid': <int>}
@@ -124,9 +124,14 @@ class MemoryStore(object):
self._sizes = {}
# Internal Storage: payload-hash
-=======
+ """
+ fdocs:doc-id store, stores document IDs for putting
+ the dirty flags-docs.
+ """
+ self._fdoc_id_store = defaultdict(lambda: defaultdict(
+ lambda: ''))
+
# Internal Storage: content-hash:hdoc
->>>>>>> change internal storage and keying scheme in memstore
"""
hdoc-store keeps references to
the header-documents indexed by content-hash.
@@ -360,14 +365,6 @@ class MemoryStore(object):
self._sizes[key] = size.get_size(self._fdoc_store[key])
# TODO add hdoc and cdocs sizes too
- # XXX what to do with this?
- #docs_id = msg_dict.get(DOCS_ID, None)
- #if docs_id is not None:
- #if not store.get(DOCS_ID, None):
- #store[DOCS_ID] = {}
- #store[DOCS_ID].update(docs_id)
-
-
def get_docid_for_fdoc(self, mbox, uid):
"""
Return Soledad document id for the flags-doc for a given mbox and uid,
@@ -379,13 +376,18 @@ class MemoryStore(object):
:type uid: int
:rtype: unicode or None
"""
- fdoc = self._permanent_store.get_flags_doc(mbox, uid)
- if empty(fdoc):
- return None
- doc_id = fdoc.doc_id
+ doc_id = self._fdoc_id_store[mbox][uid]
+
+ if empty(doc_id):
+ fdoc = self._permanent_store.get_flags_doc(mbox, uid)
+ if empty(fdoc.content):
+ return None
+ doc_id = fdoc.doc_id
+ self._fdoc_id_store[mbox][uid] = doc_id
+
return doc_id
- def get_message(self, mbox, uid, flags_only=False):
+ def get_message(self, mbox, uid, dirtystate="none", flags_only=False):
"""
Get a MessageWrapper for the given mbox and uid combination.
@@ -393,19 +395,32 @@ class MemoryStore(object):
:type mbox: str or unicode
:param uid: the message UID
:type uid: int
+ :param dirtystate: one of `dirty`, `new` or `none` (default)
+ :type dirtystate: str
:param flags_only: whether the message should carry only a reference
to the flags document.
:type flags_only: bool
+ :
:return: MessageWrapper or None
"""
+ if dirtystate == "dirty":
+ flags_only = True
+
key = mbox, uid
fdoc = self._fdoc_store[mbox][uid]
if empty(fdoc):
return None
- new, dirty = self._get_new_dirty_state(key)
+ new, dirty = False, False
+ if dirtystate == "none":
+ new, dirty = self._get_new_dirty_state(key)
+ if dirtystate == "dirty":
+ new, dirty = False, True
+ if dirtystate == "new":
+ new, dirty = True, False
+
if flags_only:
return MessageWrapper(fdoc=fdoc,
new=new, dirty=dirty,
@@ -413,7 +428,22 @@ class MemoryStore(object):
else:
chash = fdoc.get(fields.CONTENT_HASH_KEY)
hdoc = self._hdoc_store[chash]
- return MessageWrapper(fdoc=fdoc, hdoc=hdoc,
+ if empty(hdoc):
+ hdoc = self._permanent_store.get_headers_doc(chash)
+ if not empty(hdoc.content):
+ self._hdoc_store[chash] = hdoc.content
+ hdoc = hdoc.content
+ cdocs = None
+
+ pmap = hdoc.get(fields.PARTS_MAP_KEY, None)
+ if new and pmap is not None:
+ # take the different cdocs for write...
+ cdoc_store = self._cdoc_store
+ cdocs_list = phash_iter(hdoc)
+ cdocs = dict(enumerate(
+ [cdoc_store[phash] for phash in cdocs_list], 1))
+
+ return MessageWrapper(fdoc=fdoc, hdoc=hdoc, cdocs=cdocs,
new=new, dirty=dirty,
memstore=weakref.proxy(self))
@@ -437,14 +467,9 @@ class MemoryStore(object):
key = mbox, uid
self._new.discard(key)
self._dirty.discard(key)
-<<<<<<< HEAD
- self._msg_store.pop(key, None)
if key in self._sizes:
del self._sizes[key]
-
-=======
self._fdoc_store[mbox].pop(uid, None)
->>>>>>> change internal storage and keying scheme in memstore
except Exception as exc:
logger.exception(exc)
@@ -464,7 +489,7 @@ class MemoryStore(object):
# XXX this could return the deferred for all the enqueued operations
if not self.producer.is_queue_empty():
- return
+ return False
if any(map(lambda i: not empty(i), (self._new, self._dirty))):
logger.info("Writing messages to Soledad...")
@@ -598,6 +623,7 @@ class MemoryStore(object):
"""
# We can do direct assignments cause we know this will only
# be called during initialization of the mailbox.
+
fdoc_store = self._fdoc_store[mbox]
for uid in flag_docs:
fdoc_store[uid] = ReferenciableDict(flag_docs[uid])
@@ -626,7 +652,8 @@ class MemoryStore(object):
"""
flags_dict = {}
uids = self.get_uids(mbox)
- fdoc_store = self._fdoc_store
+ fdoc_store = self._fdoc_store[mbox]
+
for uid in uids:
try:
flags = fdoc_store[uid][fields.FLAGS_KEY]
@@ -763,9 +790,10 @@ class MemoryStore(object):
:return: generator of MessageWrappers
:rtype: generator
"""
- return (self.get_message(*key)
- for key in sorted(self.iter_fdoc_keys())
- if key in self._new or key in self._dirty)
+ gm = self.get_message
+ new = (gm(*key) for key in self._new)
+ dirty = (gm(*key, flags_only=True) for key in self._dirty)
+ return chain(new, dirty)
def all_deleted_uid_iter(self, mbox):
"""
diff --git a/src/leap/mail/imap/messages.py b/src/leap/mail/imap/messages.py
index 4b95689..8b6d3f3 100644
--- a/src/leap/mail/imap/messages.py
+++ b/src/leap/mail/imap/messages.py
@@ -264,17 +264,15 @@ class LeapMessage(fields, MailParser, MBoxParser):
# to put it under the lock...
doc.content[self.FLAGS_KEY] = newflags
doc.content[self.SEEN_KEY] = self.SEEN_FLAG in flags
+
+ # XXX check if this is working ok.
doc.content[self.DEL_KEY] = self.DELETED_FLAG in flags
- if self._collection.memstore is not None:
- log.msg("putting message in collection")
- self._collection.memstore.put_message(
- self._mbox, self._uid,
- MessageWrapper(fdoc=doc.content, new=False, dirty=True,
- docs_id={'fdoc': doc.doc_id}))
- else:
- # fallback for non-memstore initializations.
- self._soledad.put_doc(doc)
+ log.msg("putting message in collection")
+ self._collection.memstore.put_message(
+ self._mbox, self._uid,
+ MessageWrapper(fdoc=doc.content, new=False, dirty=True,
+ docs_id={'fdoc': doc.doc_id}))
return map(str, newflags)
def getInternalDate(self):
@@ -524,6 +522,7 @@ class LeapMessage(fields, MailParser, MBoxParser):
finally:
return result
+ # TODO move to soledadstore instead of accessing soledad directly
def _get_headers_doc(self):
"""
Return the document that keeps the headers for this
@@ -534,6 +533,7 @@ class LeapMessage(fields, MailParser, MBoxParser):
fields.TYPE_HEADERS_VAL, str(self.chash))
return first(head_docs)
+ # TODO move to soledadstore instead of accessing soledad directly
def _get_body_doc(self):
"""
Return the document that keeps the body for this
@@ -1165,7 +1165,8 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
or None if not found.
:rtype: LeapMessage
"""
- msg_container = self.memstore.get_message(self.mbox, uid, flags_only)
+ msg_container = self.memstore.get_message(
+ self.mbox, uid, flags_only=flags_only)
if msg_container is not None:
if mem_only:
msg = LeapMessage(None, uid, self.mbox, collection=self,
diff --git a/src/leap/mail/imap/soledadstore.py b/src/leap/mail/imap/soledadstore.py
index a74b49c..6cd3749 100644
--- a/src/leap/mail/imap/soledadstore.py
+++ b/src/leap/mail/imap/soledadstore.py
@@ -212,10 +212,8 @@ class SoledadStore(ContentDedup):
to be inserted.
:type queue: Queue
"""
- # TODO should delete the original message from incoming only after
- # the writes are done.
# TODO should handle the delete case
- # TODO should handle errors
+ # TODO should handle errors better
# TODO could generalize this method into a generic consumer
# and only implement `process` here
@@ -235,7 +233,7 @@ class SoledadStore(ContentDedup):
Errorback for write operations.
"""
log.msg("ERROR: Error while processing item.")
- log.msg(failure.getTraceBack())
+ log.msg(failure.getTraceback())
while not queue.empty():
doc_wrapper = queue.get()
@@ -354,6 +352,7 @@ class SoledadStore(ContentDedup):
doc = self._GET_DOC_FUN(doc_id)
doc.content = dict(item.content)
item = doc
+
try:
call(item)
except u1db_errors.RevisionConflict as exc:
@@ -451,6 +450,7 @@ class SoledadStore(ContentDedup):
:type mbox: str or unicode
:param uid: the UID for the message
:type uid: int
+ :rtype: SoledadDocument or None
"""
result = None
try:
@@ -465,6 +465,20 @@ class SoledadStore(ContentDedup):
finally:
return result
+ def get_headers_doc(self, chash):
+ """
+ Return the document that keeps the headers for a message
+ indexed by its content-hash.
+
+ :param chash: the content-hash to retrieve the document from.
+ :type chash: str or unicode
+ :rtype: SoledadDocument or None
+ """
+ head_docs = self._soledad.get_from_index(
+ fields.TYPE_C_HASH_IDX,
+ fields.TYPE_HEADERS_VAL, str(chash))
+ return first(head_docs)
+
def write_last_uid(self, mbox, value):
"""
Write the `last_uid` integer to the proper mailbox document
diff --git a/src/leap/mail/utils.py b/src/leap/mail/utils.py
index 942acfb..8b75cfc 100644
--- a/src/leap/mail/utils.py
+++ b/src/leap/mail/utils.py
@@ -94,6 +94,7 @@ def lowerdict(_dict):
PART_MAP = "part_map"
+PHASH = "phash"
def _str_dict(d, k):
@@ -130,6 +131,24 @@ def stringify_parts_map(d):
return d
+def phash_iter(d):
+ """
+ A recursive generator that extracts all the payload-hashes
+ from an arbitrary nested parts-map dictionary.
+
+ :param d: the dictionary to walk
+ :type d: dictionary
+ :return: a list of all the phashes found
+ :rtype: list
+ """
+ if PHASH in d:
+ yield d[PHASH]
+ if PART_MAP in d:
+ for key in d[PART_MAP]:
+ for phash in phash_iter(d[PART_MAP][key]):
+ yield phash
+
+
class CustomJsonScanner(object):
"""
This class is a context manager definition used to monkey patch the default