summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKali Kaneko <kali@leap.se>2014-01-24 23:14:38 -0400
committerKali Kaneko <kali@leap.se>2014-01-28 19:38:45 -0400
commitb9042503becebfe07b3a4586bd56126b334e0182 (patch)
treee1d3eda4a23812b828d7061e11c70bfa7ab42962
parent77f836cb1e698792cd28bca1d44ece6174b5f04d (diff)
recent-flags use the memory store
-rw-r--r--mail/src/leap/mail/imap/memorystore.py112
-rw-r--r--mail/src/leap/mail/imap/messageparts.py8
-rw-r--r--mail/src/leap/mail/imap/messages.py60
-rw-r--r--mail/src/leap/mail/imap/soledadstore.py59
4 files changed, 205 insertions, 34 deletions
diff --git a/mail/src/leap/mail/imap/memorystore.py b/mail/src/leap/mail/imap/memorystore.py
index dcae6b03..232a2fb2 100644
--- a/mail/src/leap/mail/imap/memorystore.py
+++ b/mail/src/leap/mail/imap/memorystore.py
@@ -21,6 +21,8 @@ import contextlib
import logging
import weakref
+from collections import defaultdict
+
from twisted.internet import defer
from twisted.internet.task import LoopingCall
from twisted.python import log
@@ -32,6 +34,7 @@ from leap.mail.messageflow import MessageProducer
from leap.mail.imap import interfaces
from leap.mail.imap.fields import fields
from leap.mail.imap.messageparts import MessagePartType, MessagePartDoc
+from leap.mail.imap.messageparts import RecentFlagsDoc
from leap.mail.imap.messageparts import MessageWrapper
from leap.mail.imap.messageparts import ReferenciableDict
@@ -109,16 +112,38 @@ class MemoryStore(object):
# Internal Storage: content-hash:fdoc
"""
+ chash-fdoc-store keeps references to
+ the flag-documents indexed by content-hash.
+
{'chash': {'mbox-a': weakref.proxy(dict),
'mbox-b': weakref.proxy(dict)}
}
"""
self._chash_fdoc_store = {}
- # TODO ----------------- implement mailbox-level flags store too! ----
- self._rflags_store = {}
+ # Internal Storage: recent-flags store
+ """
+ recent-flags store keeps one dict per mailbox,
+ with the document-id of the u1db document
+ and the set of the UIDs that have the recent flag.
+
+ {'mbox-a': {'doc_id': 'deadbeef',
+ 'set': {1,2,3,4}
+ }
+ }
+ """
+ # TODO this will have to transition to content-hash
+ # indexes after we move to local-only UIDs.
+
+ self._rflags_store = defaultdict(
+ lambda: {'doc_id': None, 'set': set([])})
+
+ # TODO ----------------- implement mailbox-level flags store too?
+ # XXX maybe we don't need this anymore...
+ # let's see how good does it prefetch the headers if
+ # we cache them in the store.
self._hdocset_store = {}
- # TODO ----------------- implement mailbox-level flags store too! ----
+ # --------------------------------------------------------------
# New and dirty flags, to set MessageWrapper State.
self._new = set([])
@@ -224,6 +249,8 @@ class MemoryStore(object):
def _add_message(self, mbox, uid, message, notify_on_disk=True):
# XXX have to differentiate between notify_new and notify_dirty
+ # TODO defaultdict the hell outa here...
+
key = mbox, uid
msg_dict = message.as_dict()
@@ -331,6 +358,8 @@ class MemoryStore(object):
with set_bool_flag(self, self.WRITING_FLAG):
for msg_wrapper in self.all_new_dirty_msg_iter():
self.producer.push(msg_wrapper)
+ for rflags_doc_wrapper in self.all_rdocs_iter():
+ self.producer.push(rflags_doc_wrapper)
# MemoryStore specific methods.
@@ -486,6 +515,79 @@ class MemoryStore(object):
d.callback('%s, ok' % str(key))
deferreds.pop(key)
+ # Recent Flags
+
+ # TODO --- nice but unused
+ def set_recent_flag(self, mbox, uid):
+ """
+ Set the `Recent` flag for a given mailbox and UID.
+ """
+ self._rflags_store[mbox]['set'].add(uid)
+
+ # TODO --- nice but unused
+ def unset_recent_flag(self, mbox, uid):
+ """
+ Unset the `Recent` flag for a given mailbox and UID.
+ """
+ self._rflags_store[mbox]['set'].discard(uid)
+
+ def set_recent_flags(self, mbox, value):
+ """
+ Set the value for the set of the recent flags.
+ Used from the property in the MessageCollection.
+ """
+ self._rflags_store[mbox]['set'] = set(value)
+
+ def load_recent_flags(self, mbox, flags_doc):
+ """
+ Load the passed flags document in the recent flags store, for a given
+ mailbox.
+
+ :param flags_doc: A dictionary containing the `doc_id` of the Soledad
+ flags-document for this mailbox, and the `set`
+ of uids marked with that flag.
+ """
+ self._rflags_store[mbox] = flags_doc
+
+ def get_recent_flags(self, mbox):
+ """
+ Get the set of UIDs with the `Recent` flag for this mailbox.
+
+ :return: set, or None
+ """
+ rflag_for_mbox = self._rflags_store.get(mbox, None)
+ if not rflag_for_mbox:
+ return None
+ return self._rflags_store[mbox]['set']
+
+ def all_rdocs_iter(self):
+ """
+ Return an iterator through all in-memory recent flag dicts, wrapped
+ under a RecentFlagsDoc namedtuple.
+ Used for saving to disk.
+
+ :rtype: generator
+ """
+ rflags_store = self._rflags_store
+
+ # XXX use enums
+ DOC_ID = "doc_id"
+ SET = "set"
+
+ print "LEN RFLAGS_STORE ------->", len(rflags_store)
+ return (
+ RecentFlagsDoc(
+ doc_id=rflags_store[mbox][DOC_ID],
+ content={
+ fields.TYPE_KEY: fields.TYPE_RECENT_VAL,
+ fields.MBOX_KEY: mbox,
+ fields.RECENTFLAGS_KEY: list(
+ rflags_store[mbox][SET])
+ })
+ for mbox in rflags_store)
+
+ # Dump-to-disk controls.
+
@property
def is_writing(self):
"""
@@ -498,7 +600,9 @@ class MemoryStore(object):
:rtype: bool
"""
- # XXX this should probably return a deferred !!!
+ # FIXME this should return a deferred !!!
+ # XXX ----- can fire when all new + dirty deferreds
+ # are done (gatherResults)
return getattr(self, self.WRITING_FLAG)
def put_part(self, part_type, value):
diff --git a/mail/src/leap/mail/imap/messageparts.py b/mail/src/leap/mail/imap/messageparts.py
index 055e6a5a..257d3f0e 100644
--- a/mail/src/leap/mail/imap/messageparts.py
+++ b/mail/src/leap/mail/imap/messageparts.py
@@ -73,6 +73,14 @@ MessagePartDoc = namedtuple(
'MessagePartDoc',
['new', 'dirty', 'part', 'store', 'content', 'doc_id'])
+"""
+A RecentFlagsDoc is used to send the recent-flags document payload to the
+SoledadWriter during dumps.
+"""
+RecentFlagsDoc = namedtuple(
+ 'RecentFlagsDoc',
+ ['content', 'doc_id'])
+
class ReferenciableDict(dict):
"""
diff --git a/mail/src/leap/mail/imap/messages.py b/mail/src/leap/mail/imap/messages.py
index c212472b..5de638bc 100644
--- a/mail/src/leap/mail/imap/messages.py
+++ b/mail/src/leap/mail/imap/messages.py
@@ -813,6 +813,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
leap_assert(soledad, "Need a soledad instance to initialize")
# okay, all in order, keep going...
+
self.mbox = self._parse_mailbox_name(mbox)
# XXX get a SoledadStore passed instead
@@ -996,8 +997,6 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
# check for uniqueness.
if self._fdoc_already_exists(chash):
print ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>"
- print
- print
logger.warning("We already have that message in this mailbox.")
# note that this operation will leave holes in the UID sequence,
# but we're gonna change that all the same for a local-only table.
@@ -1023,21 +1022,16 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
# XXX review-me
cdocs = dict((index, doc) for index, doc in
enumerate(walk.get_raw_docs(msg, parts)))
- print "cdocs is", cdocs
- # Saving ----------------------------------------
- # XXX should check for content duplication on headers too
- # but with chash. !!!
+ self.set_recent_flag(uid)
+ # Saving ----------------------------------------
# XXX adapt hdocset to use memstore
#hdoc = self._soledad.create_doc(hd)
# We add the newly created hdoc to the fast-access set of
# headers documents associated with the mailbox.
#self.add_hdocset_docid(hdoc.doc_id)
- # XXX move to memory store too
- # self.set_recent_flag(uid)
-
# TODO ---- add reference to original doc, to be deleted
# after writes are done.
msg_container = MessageWrapper(fd, hd, cdocs)
@@ -1088,24 +1082,48 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
"""
An accessor for the recent-flags set for this mailbox.
"""
- if not self.__rflags:
+ if self.__rflags is not None:
+ return self.__rflags
+
+ if self.memstore:
+ with self._rdoc_lock:
+ rflags = self.memstore.get_recent_flags(self.mbox)
+ if not rflags:
+ # not loaded in the memory store yet.
+ # let's fetch them from soledad...
+ rdoc = self._get_recent_doc()
+ rflags = set(rdoc.content.get(
+ fields.RECENTFLAGS_KEY, []))
+ # ...and cache them now.
+ self.memstore.load_recent_flags(
+ self.mbox,
+ {'doc_id': rdoc.doc_id, 'set': rflags})
+ return rflags
+
+ else:
+ # fallback for cases without memory store
with self._rdoc_lock:
rdoc = self._get_recent_doc()
self.__rflags = set(rdoc.content.get(
fields.RECENTFLAGS_KEY, []))
- return self.__rflags
+ return self.__rflags
def _set_recent_flags(self, value):
"""
Setter for the recent-flags set for this mailbox.
"""
- with self._rdoc_lock:
- rdoc = self._get_recent_doc()
- newv = set(value)
- self.__rflags = newv
- rdoc.content[fields.RECENTFLAGS_KEY] = list(newv)
- # XXX should deferLater 0 it?
- self._soledad.put_doc(rdoc)
+ if self.memstore:
+ self.memstore.set_recent_flags(self.mbox, value)
+
+ else:
+ # fallback for cases without memory store
+ with self._rdoc_lock:
+ rdoc = self._get_recent_doc()
+ newv = set(value)
+ self.__rflags = newv
+ rdoc.content[fields.RECENTFLAGS_KEY] = list(newv)
+ # XXX should deferLater 0 it?
+ self._soledad.put_doc(rdoc)
recent_flags = property(
_get_recent_flags, _set_recent_flags,
@@ -1131,15 +1149,17 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
Unset Recent flag for a sequence of uids.
"""
with self._rdoc_property_lock:
- self.recent_flags = self.recent_flags.difference(
+ self.recent_flags.difference_update(
set(uids))
+ # Individual flags operations
+
def unset_recent_flag(self, uid):
"""
Unset Recent flag for a given uid.
"""
with self._rdoc_property_lock:
- self.recent_flags = self.recent_flags.difference(
+ self.recent_flags.difference_update(
set([uid]))
def set_recent_flag(self, uid):
diff --git a/mail/src/leap/mail/imap/soledadstore.py b/mail/src/leap/mail/imap/soledadstore.py
index b321da8a..ea5b36e0 100644
--- a/mail/src/leap/mail/imap/soledadstore.py
+++ b/mail/src/leap/mail/imap/soledadstore.py
@@ -25,6 +25,8 @@ from u1db import errors as u1db_errors
from zope.interface import implements
from leap.mail.imap.messageparts import MessagePartType
+from leap.mail.imap.messageparts import MessageWrapper
+from leap.mail.imap.messageparts import RecentFlagsDoc
from leap.mail.imap.fields import fields
from leap.mail.imap.interfaces import IMessageStore
from leap.mail.messageflow import IMessageConsumer
@@ -193,9 +195,10 @@ class SoledadStore(ContentDedup):
empty = queue.empty()
while not empty:
items = self._process(queue)
+
# we prime the generator, that should return the
- # item in the first place.
- msg_wrapper = items.next()
+ # message or flags wrapper item in the first place.
+ doc_wrapper = items.next()
# From here, we unpack the subpart items and
# the right soledad call.
@@ -214,10 +217,11 @@ class SoledadStore(ContentDedup):
logger.error("Error while processing item.")
pass
else:
- # If everything went well, we can unset the new flag
- # in the source store (memory store)
- msg_wrapper.new = False
- msg_wrapper.dirty = False
+ if isinstance(doc_wrapper, MessageWrapper):
+ # If everything went well, we can unset the new flag
+ # in the source store (memory store)
+ doc_wrapper.new = False
+ doc_wrapper.dirty = False
empty = queue.empty()
#
@@ -233,9 +237,20 @@ class SoledadStore(ContentDedup):
:param queue: the queue from where we'll pick item.
:type queue: Queue
"""
- msg_wrapper = queue.get()
- return chain((msg_wrapper,),
- self._get_calls_for_msg_parts(msg_wrapper))
+ doc_wrapper = queue.get()
+
+ if isinstance(doc_wrapper, MessageWrapper):
+ return chain((doc_wrapper,),
+ self._get_calls_for_msg_parts(doc_wrapper))
+ elif isinstance(doc_wrapper, RecentFlagsDoc):
+ print "getting calls for rflags"
+ return chain((doc_wrapper,),
+ self._get_calls_for_rflags_doc(doc_wrapper))
+ else:
+ print "********************"
+ print "CANNOT PROCESS ITEM!"
+ print "item --------------------->", doc_wrapper
+ return (i for i in [])
def _try_call(self, call, item):
"""
@@ -309,4 +324,28 @@ class SoledadStore(ContentDedup):
# Implement using callbacks for each operation.
else:
- logger.error("Cannot put/delete documents yet!")
+ logger.error("Cannot delete documents yet!")
+
+ def _get_calls_for_rflags_doc(self, rflags_wrapper):
+ """
+ We always put these documents.
+ """
+ call = self._soledad.put_doc
+ rdoc = self._soledad.get_doc(rflags_wrapper.doc_id)
+
+ payload = rflags_wrapper.content
+ print "rdoc", rdoc
+ print "SAVING RFLAGS TO SOLEDAD..."
+ import pprint; pprint.pprint(payload)
+
+ if payload:
+ rdoc.content = payload
+ print
+ print "YIELDING -----", rdoc
+ print "AND ----------", call
+ yield rdoc, call
+ else:
+ print ">>>>>>>>>>>>>>>>>"
+ print ">>>>>>>>>>>>>>>>>"
+ print ">>>>>>>>>>>>>>>>>"
+ print "No payload"