summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKali Kaneko <kali@leap.se>2014-01-24 05:39:13 -0400
committerKali Kaneko <kali@leap.se>2014-01-28 19:38:45 -0400
commite02db78b1b6d8fe021efd4adb250c64a1dd4bac4 (patch)
tree8837eb90579898f2488dfe7fd581c87dd3a43def
parentff28e22977db802c87f0b7be99e37c6de29183e9 (diff)
flags use the memstore
* add new/dirty deferred dict to notify when written to disk * fix eventual duplication after copy * fix flag flickering on first retrieval.
-rw-r--r--src/leap/mail/imap/mailbox.py70
-rw-r--r--src/leap/mail/imap/memorystore.py265
-rw-r--r--src/leap/mail/imap/messageparts.py72
-rw-r--r--src/leap/mail/imap/messages.py162
-rw-r--r--src/leap/mail/imap/soledadstore.py35
-rw-r--r--src/leap/mail/messageflow.py8
-rw-r--r--src/leap/mail/utils.py9
7 files changed, 479 insertions, 142 deletions
diff --git a/src/leap/mail/imap/mailbox.py b/src/leap/mail/imap/mailbox.py
index 5e16b4b..108d0da 100644
--- a/src/leap/mail/imap/mailbox.py
+++ b/src/leap/mail/imap/mailbox.py
@@ -36,6 +36,7 @@ from leap.common import events as leap_events
from leap.common.events.events_pb2 import IMAP_UNREAD_MAIL
from leap.common.check import leap_assert, leap_assert_type
from leap.mail.decorators import deferred
+from leap.mail.utils import empty
from leap.mail.imap.fields import WithMsgFields, fields
from leap.mail.imap.messages import MessageCollection
from leap.mail.imap.messageparts import MessageWrapper
@@ -475,8 +476,17 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
"""
Remove all messages flagged \\Deleted
"""
+ print "EXPUNGE!"
if not self.isWriteable():
raise imap4.ReadOnlyMailbox
+ mstore = self._memstore
+ if mstore is not None:
+ deleted = mstore.all_deleted_uid_iter(self.mbox)
+ print "deleted ", list(deleted)
+ for uid in deleted:
+ mstore.remove_message(self.mbox, uid)
+
+ print "now deleting from soledad"
d = self.messages.remove_all_deleted()
d.addCallback(self._expunge_cb)
d.addCallback(self.messages.reset_last_uid)
@@ -709,21 +719,21 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
msg = self.messages.get_msg_by_uid(msg_id)
if not msg:
continue
+ # We duplicate the set operations here
+ # to return the result because it's less costly than
+ # retrieving the flags again.
+ newflags = set(msg.getFlags())
+
if mode == 1:
msg.addFlags(flags)
+ newflags = newflags.union(set(flags))
elif mode == -1:
msg.removeFlags(flags)
+ newflags.difference_update(flags)
elif mode == 0:
msg.setFlags(flags)
- result[msg_id] = msg.getFlags()
-
- # After changing flags, we want to signal again to the
- # UI because the number of unread might have changed.
- # Hoever, we should probably limit this to INBOX only?
- # this should really be called as a final callback of
- # the do_STORE method...
- from twisted.internet import reactor
- deferLater(reactor, 1, self.signal_unread_to_ui)
+ newflags = set(flags)
+ result[msg_id] = newflags
return result
# ISearchableMailbox
@@ -780,6 +790,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
from twisted.internet import reactor
uid_next = self.getUIDNext()
msg = messageObject
+ memstore = self._memstore
# XXX should use a public api instead
fdoc = msg._fdoc
@@ -787,20 +798,35 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
if not fdoc:
logger.debug("Tried to copy a MSG with no fdoc")
return
-
new_fdoc = copy.deepcopy(fdoc.content)
- new_fdoc[self.UID_KEY] = uid_next
- new_fdoc[self.MBOX_KEY] = self.mbox
- self._memstore.create_message(
- self.mbox, uid_next,
- MessageWrapper(
- new_fdoc, hdoc.content))
-
- # XXX use memory store !!!
- if hasattr(hdoc, 'doc_id'):
- self.messages.add_hdocset_docid(hdoc.doc_id)
-
- deferLater(reactor, 1, self.notify_new)
+
+ fdoc_chash = new_fdoc[fields.CONTENT_HASH_KEY]
+ dest_fdoc = memstore.get_fdoc_from_chash(
+ fdoc_chash, self.mbox)
+ exist = dest_fdoc and not empty(dest_fdoc.content)
+
+ if exist:
+ print "Destination message already exists!"
+
+ else:
+ print "DO COPY MESSAGE!"
+ new_fdoc[self.UID_KEY] = uid_next
+ new_fdoc[self.MBOX_KEY] = self.mbox
+
+ # XXX set recent!
+
+ print "****************************"
+ print "copy message..."
+ print "new fdoc ", new_fdoc
+ print "hdoc: ", hdoc
+ print "****************************"
+
+ self._memstore.create_message(
+ self.mbox, uid_next,
+ MessageWrapper(
+ new_fdoc, hdoc.content))
+
+ deferLater(reactor, 1, self.notify_new)
# convenience fun
diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py
index f0bdab5..f0c0d4b 100644
--- a/src/leap/mail/imap/memorystore.py
+++ b/src/leap/mail/imap/memorystore.py
@@ -21,10 +21,13 @@ import contextlib
import logging
import weakref
+from twisted.internet import defer
from twisted.internet.task import LoopingCall
+from twisted.python import log
from zope.interface import implements
from leap.mail import size
+from leap.mail.utils import empty
from leap.mail.messageflow import MessageProducer
from leap.mail.imap import interfaces
from leap.mail.imap.fields import fields
@@ -34,6 +37,8 @@ from leap.mail.imap.messageparts import ReferenciableDict
logger = logging.getLogger(__name__)
+SOLEDAD_WRITE_PERIOD = 20
+
@contextlib.contextmanager
def set_bool_flag(obj, att):
@@ -79,7 +84,8 @@ class MemoryStore(object):
WRITING_FLAG = "_writing"
- def __init__(self, permanent_store=None, write_period=60):
+ def __init__(self, permanent_store=None,
+ write_period=SOLEDAD_WRITE_PERIOD):
"""
Initialize a MemoryStore.
@@ -92,10 +98,23 @@ class MemoryStore(object):
self._permanent_store = permanent_store
self._write_period = write_period
- # Internal Storage
+ # Internal Storage: messages
self._msg_store = {}
+
+ # Internal Storage: payload-hash
+ """
+ {'phash': weakreaf.proxy(dict)}
+ """
self._phash_store = {}
+ # Internal Storage: content-hash:fdoc
+ """
+ {'chash': {'mbox-a': weakref.proxy(dict),
+ 'mbox-b': weakref.proxy(dict)}
+ }
+ """
+ self._chash_fdoc_store = {}
+
# TODO ----------------- implement mailbox-level flags store too! ----
self._rflags_store = {}
self._hdocset_store = {}
@@ -103,7 +122,9 @@ class MemoryStore(object):
# New and dirty flags, to set MessageWrapper State.
self._new = set([])
+ self._new_deferreds = {}
self._dirty = set([])
+ self._dirty_deferreds = {}
# Flag for signaling we're busy writing to the disk storage.
setattr(self, self.WRITING_FLAG, False)
@@ -141,48 +162,141 @@ class MemoryStore(object):
# We would have to add a put_flags operation to modify only
# the flags doc (and set the dirty flag accordingly)
- def create_message(self, mbox, uid, message):
+ def create_message(self, mbox, uid, message, notify_on_disk=True):
"""
Create the passed message into this MemoryStore.
By default we consider that any message is a new message.
+
+ :param mbox: the mailbox
+ :type mbox: basestring
+ :param uid: the UID for the message
+ :type uid: int
+ :param message: a to be added
+ :type message: MessageWrapper
+ :param notify_on_disk:
+ :type notify_on_disk: bool
+
+ :return: a Deferred. if notify_on_disk is True, will be fired
+ when written to the db on disk.
+ Otherwise will fire inmediately
+ :rtype: Deferred
"""
print "adding new doc to memstore %s (%s)" % (mbox, uid)
key = mbox, uid
+
+ d = defer.Deferred()
+ d.addCallback(lambda result: log.msg("message save: %s" % result))
+
self._new.add(key)
+ self._new_deferreds[key] = d
+ self._add_message(mbox, uid, message, notify_on_disk)
+ print "create message: ", d
+ return d
- msg_dict = message.as_dict()
- self._msg_store[key] = msg_dict
+ def put_message(self, mbox, uid, message, notify_on_disk=True):
+ """
+ Put an existing message.
- cdocs = message.cdocs
+ :param mbox: the mailbox
+ :type mbox: basestring
+ :param uid: the UID for the message
+ :type uid: int
+ :param message: a to be added
+ :type message: MessageWrapper
+ :param notify_on_disk:
+ :type notify_on_disk: bool
- dirty = key in self._dirty
- new = key in self._new
+ :return: a Deferred. if notify_on_disk is True, will be fired
+ when written to the db on disk.
+ Otherwise will fire inmediately
+ :rtype: Deferred
+ """
+ key = mbox, uid
+
+ d = defer.Deferred()
+ d.addCallback(lambda result: log.msg("message save: %s" % result))
+
+ self._dirty.add(key)
+ self._dirty_deferreds[key] = d
+ self._add_message(mbox, uid, message, notify_on_disk)
+ return d
- # XXX should capture this in log...
+ def _add_message(self, mbox, uid, message, notify_on_disk=True):
+ # XXX have to differentiate between notify_new and notify_dirty
+
+ key = mbox, uid
+ msg_dict = message.as_dict()
+ print "ADDING MESSAGE..."
+ import pprint; pprint.pprint(msg_dict)
+
+ # XXX use the enum as keys
+
+ try:
+ store = self._msg_store[key]
+ except KeyError:
+ self._msg_store[key] = {'fdoc': {},
+ 'hdoc': {},
+ 'cdocs': {},
+ 'docs_id': {}}
+ store = self._msg_store[key]
+
+ print "In store (before):"
+ import pprint; pprint.pprint(store)
+
+ #self._msg_store[key] = msg_dict
+ fdoc = msg_dict.get('fdoc', None)
+ if fdoc:
+ if not store.get('fdoc', None):
+ store['fdoc'] = ReferenciableDict({})
+ store['fdoc'].update(fdoc)
+
+ # content-hash indexing
+ chash = fdoc.get(fields.CONTENT_HASH_KEY)
+ chash_fdoc_store = self._chash_fdoc_store
+ if not chash in chash_fdoc_store:
+ chash_fdoc_store[chash] = {}
+
+ chash_fdoc_store[chash][mbox] = weakref.proxy(
+ store['fdoc'])
+
+ hdoc = msg_dict.get('hdoc', None)
+ if hdoc:
+ if not store.get('hdoc', None):
+ store['hdoc'] = ReferenciableDict({})
+ store['hdoc'].update(hdoc)
+
+ docs_id = msg_dict.get('docs_id', None)
+ if docs_id:
+ if not store.get('docs_id', None):
+ store['docs_id'] = {}
+ store['docs_id'].update(docs_id)
+ cdocs = message.cdocs
for cdoc_key in cdocs.keys():
- print "saving cdoc"
- cdoc = self._msg_store[key]['cdocs'][cdoc_key]
+ if not store.get('cdocs', None):
+ store['cdocs'] = {}
- # FIXME this should be done in the MessageWrapper constructor
- # instead...
+ cdoc = cdocs[cdoc_key]
# first we make it weak-referenciable
referenciable_cdoc = ReferenciableDict(cdoc)
- self._msg_store[key]['cdocs'][cdoc_key] = MessagePartDoc(
- new=new, dirty=dirty, store="mem",
- part=MessagePartType.cdoc,
- content=referenciable_cdoc)
+ store['cdocs'][cdoc_key] = referenciable_cdoc
phash = cdoc.get(fields.PAYLOAD_HASH_KEY, None)
if not phash:
continue
self._phash_store[phash] = weakref.proxy(referenciable_cdoc)
- def put_message(self, mbox, uid, msg):
- """
- Put an existing message.
- """
- return NotImplementedError()
+ def prune(seq, store):
+ for key in seq:
+ if key in store and empty(store.get(key)):
+ store.pop(key)
+
+ prune(('fdoc', 'hdoc', 'cdocs', 'docs_id'), store)
+ #import ipdb; ipdb.set_trace()
+
+
+ print "after appending to store: ", key
+ import pprint; pprint.pprint(self._msg_store[key])
def get_message(self, mbox, uid):
"""
@@ -203,7 +317,13 @@ class MemoryStore(object):
"""
Remove a Message from this MemoryStore.
"""
- raise NotImplementedError()
+ try:
+ key = mbox, uid
+ self._new.discard(key)
+ self._dirty.discard(key)
+ self._msg_store.pop(key, None)
+ except Exception as exc:
+ logger.exception(exc)
# IMessageStoreWriter
@@ -211,12 +331,15 @@ class MemoryStore(object):
"""
Write the message documents in this MemoryStore to a different store.
"""
- # XXX pass if it's writing (ie, the queue is not empty...)
- # See how to make the writing_flag aware of the queue state...
- print "writing messages to producer..."
+ # For now, we pass if the queue is not empty, to avoid duplication.
+ # We would better use a flag to know when we've already enqueued an
+ # item.
+ if not self.producer.is_queue_empty():
+ return
+ print "Writing messages to Soledad..."
with set_bool_flag(self, self.WRITING_FLAG):
- for msg_wrapper in self.all_msg_iter():
+ for msg_wrapper in self.all_new_dirty_msg_iter():
self.producer.push(msg_wrapper)
# MemoryStore specific methods.
@@ -247,12 +370,14 @@ class MemoryStore(object):
"""
return len(self._new)
- def get_by_phash(self, phash):
+ def get_cdoc_from_phash(self, phash):
"""
Return a content-document by its payload-hash.
"""
doc = self._phash_store.get(phash, None)
+ # XXX return None for consistency?
+
# XXX have to keep a mapping between phash and its linkage
# info, to know if this payload is been already saved or not.
# We will be able to get this from the linkage-docs,
@@ -262,7 +387,40 @@ class MemoryStore(object):
return MessagePartDoc(
new=new, dirty=dirty, store="mem",
part=MessagePartType.cdoc,
- content=doc)
+ content=doc,
+ doc_id=None)
+
+ def get_fdoc_from_chash(self, chash, mbox):
+ """
+ Return a flags-document by its content-hash and a given mailbox.
+
+ :return: MessagePartDoc, or None.
+ """
+ docs_dict = self._chash_fdoc_store.get(chash, None)
+ fdoc = docs_dict.get(mbox, None) if docs_dict else None
+
+ print "GETTING FDOC BY CHASH:", fdoc
+
+ # a couple of special cases.
+ # 1. We might have a doc with empty content...
+ if empty(fdoc):
+ return None
+
+ # ...Or the message could exist, but being flagged for deletion.
+ # We want to create a new one in this case.
+ # Hmmm what if the deletion is un-done?? We would end with a
+ # duplicate...
+ if fdoc and fields.DELETED_FLAG in fdoc[fields.FLAGS_KEY]:
+ return None
+
+ # XXX get flags
+ new = True
+ dirty = False
+ return MessagePartDoc(
+ new=new, dirty=dirty, store="mem",
+ part=MessagePartType.fdoc,
+ content=fdoc,
+ doc_id=None)
def all_msg_iter(self):
"""
@@ -271,6 +429,25 @@ class MemoryStore(object):
return (self.get_message(*key)
for key in sorted(self._msg_store.keys()))
+ def all_new_dirty_msg_iter(self):
+ """
+ Return geneator that iterates through all new and dirty messages.
+ """
+ return (self.get_message(*key)
+ for key in sorted(self._msg_store.keys())
+ if key in self._new or key in self._dirty)
+
+ def all_deleted_uid_iter(self, mbox):
+ """
+ Return generator that iterates through the UIDs for all messags
+ with deleted flag in a given mailbox.
+ """
+ all_deleted = (
+ msg['fdoc']['uid'] for msg in self._msg_store.values()
+ if msg.get('fdoc', None)
+ and fields.DELETED_FLAG in msg['fdoc']['flags'])
+ return all_deleted
+
# new, dirty flags
def _get_new_dirty_state(self, key):
@@ -289,9 +466,35 @@ class MemoryStore(object):
"""
Remove the key value from the `new` set.
"""
- print "******************"
- print "UNSETTING NEW FOR: %s" % str(key)
+ print "Unsetting NEW for: %s" % str(key)
self._new.discard(key)
+ deferreds = self._new_deferreds
+ d = deferreds.get(key, None)
+ if d:
+ # XXX use a namedtuple for passing the result
+ # when we check it in the other side.
+ d.callback('%s, ok' % str(key))
+ deferreds.pop(key)
+
+ def set_dirty(self, key):
+ """
+ Add the key value to the `dirty` set.
+ """
+ self._dirty.add(key)
+
+ def unset_dirty(self, key):
+ """
+ Remove the key value from the `dirty` set.
+ """
+ print "Unsetting DIRTY for: %s" % str(key)
+ self._dirty.discard(key)
+ deferreds = self._dirty_deferreds
+ d = deferreds.get(key, None)
+ if d:
+ # XXX use a namedtuple for passing the result
+ # when we check it in the other side.
+ d.callback('%s, ok' % str(key))
+ deferreds.pop(key)
@property
def is_writing(self):
diff --git a/src/leap/mail/imap/messageparts.py b/src/leap/mail/imap/messageparts.py
index 42eef02..b43bc37 100644
--- a/src/leap/mail/imap/messageparts.py
+++ b/src/leap/mail/imap/messageparts.py
@@ -65,15 +65,13 @@ and sometimes to a part in particular only.
we have modified its state in memory, so we need to put_doc instead while
dumping the MemoryStore contents.
`dirty` attribute would only apply to flags-docs and linkage-docs.
-
-
- XXX this is still not implemented!
+* `doc_id` is the identifier for the document in the u1db database, if any.
"""
MessagePartDoc = namedtuple(
'MessagePartDoc',
- ['new', 'dirty', 'part', 'store', 'content'])
+ ['new', 'dirty', 'part', 'store', 'content', 'doc_id'])
class ReferenciableDict(dict):
@@ -96,6 +94,7 @@ class MessageWrapper(object):
FDOC = "fdoc"
HDOC = "hdoc"
CDOCS = "cdocs"
+ DOCS_ID = "docs_id"
# XXX can use this to limit the memory footprint,
# or is it too premature to optimize?
@@ -105,12 +104,17 @@ class MessageWrapper(object):
def __init__(self, fdoc=None, hdoc=None, cdocs=None,
from_dict=None, memstore=None,
- new=True, dirty=False):
+ new=True, dirty=False, docs_id={}):
+ """
+ Initialize a MessageWrapper.
+ """
+ # TODO add optional reference to original message in the incoming
self._dict = {}
self.memstore = memstore
self._new = new
self._dirty = dirty
+
self._storetype = "mem"
if from_dict is not None:
@@ -122,6 +126,7 @@ class MessageWrapper(object):
self._dict[self.HDOC] = ReferenciableDict(hdoc)
if cdocs is not None:
self._dict[self.CDOCS] = ReferenciableDict(cdocs)
+ self._dict[self.DOCS_ID] = docs_id
# properties
@@ -153,10 +158,28 @@ class MessageWrapper(object):
doc="The `new` flag for this MessageWrapper")
def _get_dirty(self):
+ """
+ Get the value for the `dirty` flag.
+ """
return self._dirty
def _set_dirty(self, value=True):
+ """
+ Set the value for the `dirty` flag, and propagate it
+ to the memory store if any.
+ """
self._dirty = value
+ if self.memstore:
+ mbox = self.fdoc.content['mbox']
+ uid = self.fdoc.content['uid']
+ key = mbox, uid
+ fun = [self.memstore.unset_dirty,
+ self.memstore.set_dirty][int(value)]
+ fun(key)
+ else:
+ logger.warning("Could not find a memstore referenced from this "
+ "MessageWrapper. The value for new will not be "
+ "propagated")
dirty = property(_get_dirty, _set_dirty)
@@ -173,7 +196,9 @@ class MessageWrapper(object):
return MessagePartDoc(new=self.new, dirty=self.dirty,
store=self._storetype,
part=MessagePartType.fdoc,
- content=content_ref)
+ content=content_ref,
+ doc_id=self._dict[self.DOCS_ID].get(
+ self.FDOC, None))
@property
def hdoc(self):
@@ -186,7 +211,9 @@ class MessageWrapper(object):
return MessagePartDoc(new=self.new, dirty=self.dirty,
store=self._storetype,
part=MessagePartType.hdoc,
- content=content_ref)
+ content=content_ref,
+ doc_id=self._dict[self.DOCS_ID].get(
+ self.HDOC, None))
@property
def cdocs(self):
@@ -201,21 +228,18 @@ class MessageWrapper(object):
Generator that iterates through all the parts, returning
MessagePartDoc.
"""
- yield self.fdoc
- yield self.hdoc
+ if self.fdoc is not None:
+ yield self.fdoc
+ if self.hdoc is not None:
+ yield self.hdoc
for cdoc in self.cdocs.values():
- # XXX this will break ----
- #content_ref = weakref.proxy(cdoc)
- #yield MessagePartDoc(new=self.new, dirty=self.dirty,
- #store=self._storetype,
- #part=MessagePartType.cdoc,
- #content=content_ref)
-
- # the put is handling this for us, so
- # we already have stored a MessagePartDoc
- # but we should really do it while adding in the
- # constructor or the from_dict method
- yield cdoc
+ if cdoc is not None:
+ content_ref = weakref.proxy(cdoc)
+ yield MessagePartDoc(new=self.new, dirty=self.dirty,
+ store=self._storetype,
+ part=MessagePartType.cdoc,
+ content=content_ref,
+ doc_id=None)
# i/o
@@ -234,9 +258,9 @@ class MessageWrapper(object):
fdoc, hdoc, cdocs = map(
lambda part: msg_dict.get(part, None),
[self.FDOC, self.HDOC, self.CDOCS])
- self._dict[self.FDOC] = fdoc
- self._dict[self.HDOC] = hdoc
- self._dict[self.CDOCS] = cdocs
+ for t, doc in ((self.FDOC, fdoc), (self.HDOC, hdoc),
+ (self.CDOCS, cdocs)):
+ self._dict[t] = ReferenciableDict(doc) if doc else None
class MessagePart(object):
diff --git a/src/leap/mail/imap/messages.py b/src/leap/mail/imap/messages.py
index 94bd714..c212472 100644
--- a/src/leap/mail/imap/messages.py
+++ b/src/leap/mail/imap/messages.py
@@ -37,7 +37,7 @@ from leap.common.check import leap_assert, leap_assert_type
from leap.common.decorators import memoized_method
from leap.common.mail import get_email_charset
from leap.mail import walk
-from leap.mail.utils import first, find_charset, lowerdict
+from leap.mail.utils import first, find_charset, lowerdict, empty
from leap.mail.decorators import deferred
from leap.mail.imap.index import IndexedDB
from leap.mail.imap.fields import fields, WithMsgFields
@@ -130,6 +130,8 @@ class LeapMessage(fields, MailParser, MBoxParser):
self.__chash = None
self.__bdoc = None
+ # XXX make these properties public
+
@property
def _fdoc(self):
"""
@@ -154,8 +156,9 @@ class LeapMessage(fields, MailParser, MBoxParser):
"""
if self._container is not None:
hdoc = self._container.hdoc
- if hdoc:
+ if hdoc and not empty(hdoc.content):
return hdoc
+ # XXX cache this into the memory store !!!
return self._get_headers_doc()
@property
@@ -248,7 +251,13 @@ class LeapMessage(fields, MailParser, MBoxParser):
doc.content[self.SEEN_KEY] = self.SEEN_FLAG in flags
doc.content[self.DEL_KEY] = self.DELETED_FLAG in flags
- if getattr(doc, 'store', None) != "mem":
+ if self._collection.memstore is not None:
+ self._collection.memstore.put_message(
+ self._mbox, self._uid,
+ MessageWrapper(fdoc=doc.content, new=False, dirty=True,
+ docs_id={'fdoc': doc.doc_id}))
+ else:
+ # fallback for non-memstore initializations.
self._soledad.put_doc(doc)
def addFlags(self, flags):
@@ -547,20 +556,18 @@ class LeapMessage(fields, MailParser, MBoxParser):
# phash doc...
if self._container is not None:
- bdoc = self._container.memstore.get_by_phash(body_phash)
+ bdoc = self._container.memstore.get_cdoc_from_phash(body_phash)
print "bdoc from container -->", bdoc
if bdoc and bdoc.content is not None:
return bdoc
else:
print "no doc or not bdoc content for that phash found!"
- print "nuthing. soledad?"
# no memstore or no doc found there
if self._soledad:
body_docs = self._soledad.get_from_index(
fields.TYPE_P_HASH_IDX,
fields.TYPE_CONTENT_VAL, str(body_phash))
- print "returning body docs...", body_docs
return first(body_docs)
else:
logger.error("No phash in container, and no soledad found!")
@@ -581,32 +588,32 @@ class LeapMessage(fields, MailParser, MBoxParser):
# setters
# XXX to be used in the messagecopier interface?!
-
- def set_uid(self, uid):
- """
- Set new uid for this message.
-
- :param uid: the new uid
- :type uid: basestring
- """
+#
+ #def set_uid(self, uid):
+ #"""
+ #Set new uid for this message.
+#
+ #:param uid: the new uid
+ #:type uid: basestring
+ #"""
# XXX dangerous! lock?
- self._uid = uid
- d = self._fdoc
- d.content[self.UID_KEY] = uid
- self._soledad.put_doc(d)
-
- def set_mbox(self, mbox):
- """
- Set new mbox for this message.
-
- :param mbox: the new mbox
- :type mbox: basestring
- """
+ #self._uid = uid
+ #d = self._fdoc
+ #d.content[self.UID_KEY] = uid
+ #self._soledad.put_doc(d)
+#
+ #def set_mbox(self, mbox):
+ #"""
+ #Set new mbox for this message.
+#
+ #:param mbox: the new mbox
+ #:type mbox: basestring
+ #"""
# XXX dangerous! lock?
- self._mbox = mbox
- d = self._fdoc
- d.content[self.MBOX_KEY] = mbox
- self._soledad.put_doc(d)
+ #self._mbox = mbox
+ #d = self._fdoc
+ #d.content[self.MBOX_KEY] = mbox
+ #self._soledad.put_doc(d)
# destructor
@@ -614,14 +621,13 @@ class LeapMessage(fields, MailParser, MBoxParser):
def remove(self):
"""
Remove all docs associated with this message.
+ Currently it removes only the flags doc.
"""
# XXX For the moment we are only removing the flags and headers
# docs. The rest we leave there polluting your hard disk,
# until we think about a good way of deorphaning.
# Maybe a crawler of unreferenced docs.
- # XXX remove from memory store!!!
-
# XXX implement elijah's idea of using a PUT document as a
# token to ensure consistency in the removal.
@@ -632,13 +638,35 @@ class LeapMessage(fields, MailParser, MBoxParser):
#bd = self._get_body_doc()
#docs = [fd, hd, bd]
- docs = [fd]
+ try:
+ memstore = self._collection.memstore
+ except AttributeError:
+ memstore = False
+
+ if memstore and hasattr(fd, "store", None) == "mem":
+ key = self._mbox, self._uid
+ if fd.new:
+ # it's a new document, so we can remove it and it will not
+ # be writen. Watch out! We need to be sure it has not been
+ # just queued to write!
+ memstore.remove_message(*key)
+
+ if fd.dirty:
+ doc_id = fd.doc_id
+ doc = self._soledad.get_doc(doc_id)
+ try:
+ self._soledad.delete_doc(doc)
+ except Exception as exc:
+ logger.exception(exc)
- for d in filter(None, docs):
+ else:
+ # we just got a soledad_doc
try:
- self._soledad.delete_doc(d)
+ doc_id = fd.doc_id
+ latest_doc = self._soledad.get_doc(doc_id)
+ self._soledad.delete_doc(latest_doc)
except Exception as exc:
- logger.error(exc)
+ logger.exception(exc)
return uid
def does_exist(self):
@@ -786,8 +814,10 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
# okay, all in order, keep going...
self.mbox = self._parse_mailbox_name(mbox)
+
+ # XXX get a SoledadStore passed instead
self._soledad = soledad
- self._memstore = memstore
+ self.memstore = memstore
self.__rflags = None
self.__hdocset = None
@@ -913,13 +943,21 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
:type chash: basestring
:return: False, if it does not exist, or UID.
"""
- exist = self._get_fdoc_from_chash(chash)
+ exist = False
+ if self.memstore is not None:
+ exist = self.memstore.get_fdoc_from_chash(chash, self.mbox)
+
+ if not exist:
+ exist = self._get_fdoc_from_chash(chash)
+
+ print "FDOC EXIST?", exist
if exist:
return exist.content.get(fields.UID_KEY, "unknown-uid")
else:
return False
- @deferred
+ # not deferring to thread cause this now uses deferred asa retval
+ #@deferred
def add_msg(self, raw, subject=None, flags=None, date=None, uid=1):
"""
Creates a new message document.
@@ -945,6 +983,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
# TODO add the linked-from info !
# TODO add reference to the original message
+ print "ADDING MESSAGE..."
logger.debug('adding message')
if flags is None:
@@ -956,11 +995,14 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
# check for uniqueness.
if self._fdoc_already_exists(chash):
+ print ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>"
+ print
+ print
logger.warning("We already have that message in this mailbox.")
# note that this operation will leave holes in the UID sequence,
# but we're gonna change that all the same for a local-only table.
# so not touch it by the moment.
- return False
+ return defer.succeed('already_exists')
fd = self._populate_flags(flags, uid, chash, size, multi)
hd = self._populate_headr(msg, chash, subject, date)
@@ -999,7 +1041,16 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
# TODO ---- add reference to original doc, to be deleted
# after writes are done.
msg_container = MessageWrapper(fd, hd, cdocs)
- self._memstore.create_message(self.mbox, uid, msg_container)
+
+ # XXX Should allow also to dump to disk directly,
+ # for no-memstore cases.
+
+ # we return a deferred that, by default, will be triggered when
+ # saved to disk
+ d = self.memstore.create_message(self.mbox, uid, msg_container)
+ print "defered-add", d
+ print "adding message", d
+ return d
def _remove_cb(self, result):
return result
@@ -1247,17 +1298,13 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
or None if not found.
:rtype: LeapMessage
"""
- print "getting msg by id!"
- msg_container = self._memstore.get_message(self.mbox, uid)
- print "msg container", msg_container
+ msg_container = self.memstore.get_message(self.mbox, uid)
if msg_container is not None:
- print "getting LeapMessage (from memstore)"
# We pass a reference to soledad just to be able to retrieve
# missing parts that cannot be found in the container, like
# the content docs after a copy.
msg = LeapMessage(self._soledad, uid, self.mbox, collection=self,
container=msg_container)
- print "got msg:", msg
else:
msg = LeapMessage(self._soledad, uid, self.mbox, collection=self)
if not msg.does_exist():
@@ -1303,8 +1350,8 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
self._soledad.get_from_index(
fields.TYPE_MBOX_IDX,
fields.TYPE_FLAGS_VAL, self.mbox)])
- if self._memstore is not None:
- mem_uids = self._memstore.get_uids(self.mbox)
+ if self.memstore is not None:
+ mem_uids = self.memstore.get_uids(self.mbox)
uids = db_uids.union(set(mem_uids))
else:
uids = db_uids
@@ -1328,19 +1375,22 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
Return a dict with all flags documents for this mailbox.
"""
# XXX get all from memstore and cache it there
+ # FIXME should get all uids, get them fro memstore,
+ # and get only the missing ones from disk.
+
all_flags = dict(((
doc.content[self.UID_KEY],
doc.content[self.FLAGS_KEY]) for doc in
self._soledad.get_from_index(
fields.TYPE_MBOX_IDX,
fields.TYPE_FLAGS_VAL, self.mbox)))
- if self._memstore is not None:
+ if self.memstore is not None:
# XXX
- uids = self._memstore.get_uids(self.mbox)
- fdocs = [(uid, self._memstore.get_message(self.mbox, uid).fdoc)
- for uid in uids]
- for uid, doc in fdocs:
- all_flags[uid] = doc.content[self.FLAGS_KEY]
+ uids = self.memstore.get_uids(self.mbox)
+ docs = ((uid, self.memstore.get_message(self.mbox, uid))
+ for uid in uids)
+ for uid, doc in docs:
+ all_flags[uid] = doc.fdoc.content[self.FLAGS_KEY]
return all_flags
@@ -1378,8 +1428,8 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
count = self._soledad.get_count_from_index(
fields.TYPE_MBOX_IDX,
fields.TYPE_FLAGS_VAL, self.mbox)
- if self._memstore is not None:
- count += self._memstore.count_new()
+ if self.memstore is not None:
+ count += self.memstore.count_new()
return count
# unseen messages
diff --git a/src/leap/mail/imap/soledadstore.py b/src/leap/mail/imap/soledadstore.py
index d36acae..b321da8 100644
--- a/src/leap/mail/imap/soledadstore.py
+++ b/src/leap/mail/imap/soledadstore.py
@@ -81,7 +81,8 @@ class ContentDedup(object):
if len(header_docs) != 1:
logger.warning("Found more than one copy of chash %s!"
% (chash,))
- logger.debug("Found header doc with that hash! Skipping save!")
+ # XXX re-enable
+ #logger.debug("Found header doc with that hash! Skipping save!")
return True
def _content_does_exist(self, doc):
@@ -105,7 +106,8 @@ class ContentDedup(object):
if len(attach_docs) != 1:
logger.warning("Found more than one copy of phash %s!"
% (phash,))
- logger.debug("Found attachment doc with that hash! Skipping save!")
+ # XXX re-enable
+ #logger.debug("Found attachment doc with that hash! Skipping save!")
return True
@@ -215,6 +217,7 @@ class SoledadStore(ContentDedup):
# If everything went well, we can unset the new flag
# in the source store (memory store)
msg_wrapper.new = False
+ msg_wrapper.dirty = False
empty = queue.empty()
#
@@ -261,6 +264,9 @@ class SoledadStore(ContentDedup):
# item is expected to be a MessagePartDoc
for item in msg_wrapper.walk():
if item.part == MessagePartType.fdoc:
+
+ # FIXME add content duplication for HEADERS too!
+ # (only 1 chash per mailbox!)
yield dict(item.content), call
elif item.part == MessagePartType.hdoc:
@@ -276,18 +282,31 @@ class SoledadStore(ContentDedup):
yield dict(item.content), call
+ # For now, the only thing that will be dirty is
+ # the flags doc.
+
+ elif msg_wrapper.dirty is True:
+ print "DIRTY DOC! ----------------------"
+ call = self._soledad.put_doc
+
+ # item is expected to be a MessagePartDoc
+ for item in msg_wrapper.walk():
+ doc_id = item.doc_id # defend!
+ doc = self._soledad.get_doc(doc_id)
+ doc.content = item.content
+
+ if item.part == MessagePartType.fdoc:
+ print "Will PUT the doc: ", doc
+ yield dict(doc), call
+
+ # XXX also for linkage-doc
+
# TODO should write back to the queue
# with the results of the operation.
# We can write there:
# (*) MsgWriteACK --> Should remove from incoming queue.
# (We should do this here).
-
# Implement using callbacks for each operation.
- # TODO should check for elements with the dirty state
- # TODO if new == False and dirty == True, put_doc
- # XXX for puts, we will have to retrieve
- # the document, change the content, and
- # pass the whole document under "content"
else:
logger.error("Cannot put/delete documents yet!")
diff --git a/src/leap/mail/messageflow.py b/src/leap/mail/messageflow.py
index ed6abcd..b7fc030 100644
--- a/src/leap/mail/messageflow.py
+++ b/src/leap/mail/messageflow.py
@@ -126,9 +126,15 @@ class MessageProducer(object):
again after the addition of new items.
"""
self._consumer.consume(self._queue)
- if self._queue.empty():
+ if self.is_queue_empty():
self.stop()
+ def is_queue_empty(self):
+ """
+ Return True if queue is empty, False otherwise.
+ """
+ return self._queue.empty()
+
# public methods: IMessageProducer
def push(self, item):
diff --git a/src/leap/mail/utils.py b/src/leap/mail/utils.py
index 64af04f..bae2898 100644
--- a/src/leap/mail/utils.py
+++ b/src/leap/mail/utils.py
@@ -36,6 +36,15 @@ def first(things):
return None
+def empty(thing):
+ """
+ Return True if a thing is None or its length is zero.
+ """
+ if thing is None:
+ return True
+ return len(thing) == 0
+
+
def maybe_call(thing):
"""
Return the same thing, or the result of its invocation if it is a