From c1903d399b724f5b911129eeb723be7c6bfca536 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Fri, 24 Jan 2014 05:39:13 -0400 Subject: flags use the memstore * add new/dirty deferred dict to notify when written to disk * fix eventual duplication after copy * fix flag flickering on first retrieval. --- mail/src/leap/mail/imap/mailbox.py | 70 ++++++--- mail/src/leap/mail/imap/memorystore.py | 265 ++++++++++++++++++++++++++++---- mail/src/leap/mail/imap/messageparts.py | 72 ++++++--- mail/src/leap/mail/imap/messages.py | 162 ++++++++++++------- mail/src/leap/mail/imap/soledadstore.py | 35 ++++- mail/src/leap/mail/messageflow.py | 8 +- mail/src/leap/mail/utils.py | 9 ++ 7 files changed, 479 insertions(+), 142 deletions(-) diff --git a/mail/src/leap/mail/imap/mailbox.py b/mail/src/leap/mail/imap/mailbox.py index 5e16b4b..108d0da 100644 --- a/mail/src/leap/mail/imap/mailbox.py +++ b/mail/src/leap/mail/imap/mailbox.py @@ -36,6 +36,7 @@ from leap.common import events as leap_events from leap.common.events.events_pb2 import IMAP_UNREAD_MAIL from leap.common.check import leap_assert, leap_assert_type from leap.mail.decorators import deferred +from leap.mail.utils import empty from leap.mail.imap.fields import WithMsgFields, fields from leap.mail.imap.messages import MessageCollection from leap.mail.imap.messageparts import MessageWrapper @@ -475,8 +476,17 @@ class SoledadMailbox(WithMsgFields, MBoxParser): """ Remove all messages flagged \\Deleted """ + print "EXPUNGE!" if not self.isWriteable(): raise imap4.ReadOnlyMailbox + mstore = self._memstore + if mstore is not None: + deleted = mstore.all_deleted_uid_iter(self.mbox) + print "deleted ", list(deleted) + for uid in deleted: + mstore.remove_message(self.mbox, uid) + + print "now deleting from soledad" d = self.messages.remove_all_deleted() d.addCallback(self._expunge_cb) d.addCallback(self.messages.reset_last_uid) @@ -709,21 +719,21 @@ class SoledadMailbox(WithMsgFields, MBoxParser): msg = self.messages.get_msg_by_uid(msg_id) if not msg: continue + # We duplicate the set operations here + # to return the result because it's less costly than + # retrieving the flags again. + newflags = set(msg.getFlags()) + if mode == 1: msg.addFlags(flags) + newflags = newflags.union(set(flags)) elif mode == -1: msg.removeFlags(flags) + newflags.difference_update(flags) elif mode == 0: msg.setFlags(flags) - result[msg_id] = msg.getFlags() - - # After changing flags, we want to signal again to the - # UI because the number of unread might have changed. - # Hoever, we should probably limit this to INBOX only? - # this should really be called as a final callback of - # the do_STORE method... - from twisted.internet import reactor - deferLater(reactor, 1, self.signal_unread_to_ui) + newflags = set(flags) + result[msg_id] = newflags return result # ISearchableMailbox @@ -780,6 +790,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser): from twisted.internet import reactor uid_next = self.getUIDNext() msg = messageObject + memstore = self._memstore # XXX should use a public api instead fdoc = msg._fdoc @@ -787,20 +798,35 @@ class SoledadMailbox(WithMsgFields, MBoxParser): if not fdoc: logger.debug("Tried to copy a MSG with no fdoc") return - new_fdoc = copy.deepcopy(fdoc.content) - new_fdoc[self.UID_KEY] = uid_next - new_fdoc[self.MBOX_KEY] = self.mbox - self._memstore.create_message( - self.mbox, uid_next, - MessageWrapper( - new_fdoc, hdoc.content)) - - # XXX use memory store !!! - if hasattr(hdoc, 'doc_id'): - self.messages.add_hdocset_docid(hdoc.doc_id) - - deferLater(reactor, 1, self.notify_new) + + fdoc_chash = new_fdoc[fields.CONTENT_HASH_KEY] + dest_fdoc = memstore.get_fdoc_from_chash( + fdoc_chash, self.mbox) + exist = dest_fdoc and not empty(dest_fdoc.content) + + if exist: + print "Destination message already exists!" + + else: + print "DO COPY MESSAGE!" + new_fdoc[self.UID_KEY] = uid_next + new_fdoc[self.MBOX_KEY] = self.mbox + + # XXX set recent! + + print "****************************" + print "copy message..." + print "new fdoc ", new_fdoc + print "hdoc: ", hdoc + print "****************************" + + self._memstore.create_message( + self.mbox, uid_next, + MessageWrapper( + new_fdoc, hdoc.content)) + + deferLater(reactor, 1, self.notify_new) # convenience fun diff --git a/mail/src/leap/mail/imap/memorystore.py b/mail/src/leap/mail/imap/memorystore.py index f0bdab5..f0c0d4b 100644 --- a/mail/src/leap/mail/imap/memorystore.py +++ b/mail/src/leap/mail/imap/memorystore.py @@ -21,10 +21,13 @@ import contextlib import logging import weakref +from twisted.internet import defer from twisted.internet.task import LoopingCall +from twisted.python import log from zope.interface import implements from leap.mail import size +from leap.mail.utils import empty from leap.mail.messageflow import MessageProducer from leap.mail.imap import interfaces from leap.mail.imap.fields import fields @@ -34,6 +37,8 @@ from leap.mail.imap.messageparts import ReferenciableDict logger = logging.getLogger(__name__) +SOLEDAD_WRITE_PERIOD = 20 + @contextlib.contextmanager def set_bool_flag(obj, att): @@ -79,7 +84,8 @@ class MemoryStore(object): WRITING_FLAG = "_writing" - def __init__(self, permanent_store=None, write_period=60): + def __init__(self, permanent_store=None, + write_period=SOLEDAD_WRITE_PERIOD): """ Initialize a MemoryStore. @@ -92,10 +98,23 @@ class MemoryStore(object): self._permanent_store = permanent_store self._write_period = write_period - # Internal Storage + # Internal Storage: messages self._msg_store = {} + + # Internal Storage: payload-hash + """ + {'phash': weakreaf.proxy(dict)} + """ self._phash_store = {} + # Internal Storage: content-hash:fdoc + """ + {'chash': {'mbox-a': weakref.proxy(dict), + 'mbox-b': weakref.proxy(dict)} + } + """ + self._chash_fdoc_store = {} + # TODO ----------------- implement mailbox-level flags store too! ---- self._rflags_store = {} self._hdocset_store = {} @@ -103,7 +122,9 @@ class MemoryStore(object): # New and dirty flags, to set MessageWrapper State. self._new = set([]) + self._new_deferreds = {} self._dirty = set([]) + self._dirty_deferreds = {} # Flag for signaling we're busy writing to the disk storage. setattr(self, self.WRITING_FLAG, False) @@ -141,48 +162,141 @@ class MemoryStore(object): # We would have to add a put_flags operation to modify only # the flags doc (and set the dirty flag accordingly) - def create_message(self, mbox, uid, message): + def create_message(self, mbox, uid, message, notify_on_disk=True): """ Create the passed message into this MemoryStore. By default we consider that any message is a new message. + + :param mbox: the mailbox + :type mbox: basestring + :param uid: the UID for the message + :type uid: int + :param message: a to be added + :type message: MessageWrapper + :param notify_on_disk: + :type notify_on_disk: bool + + :return: a Deferred. if notify_on_disk is True, will be fired + when written to the db on disk. + Otherwise will fire inmediately + :rtype: Deferred """ print "adding new doc to memstore %s (%s)" % (mbox, uid) key = mbox, uid + + d = defer.Deferred() + d.addCallback(lambda result: log.msg("message save: %s" % result)) + self._new.add(key) + self._new_deferreds[key] = d + self._add_message(mbox, uid, message, notify_on_disk) + print "create message: ", d + return d - msg_dict = message.as_dict() - self._msg_store[key] = msg_dict + def put_message(self, mbox, uid, message, notify_on_disk=True): + """ + Put an existing message. - cdocs = message.cdocs + :param mbox: the mailbox + :type mbox: basestring + :param uid: the UID for the message + :type uid: int + :param message: a to be added + :type message: MessageWrapper + :param notify_on_disk: + :type notify_on_disk: bool - dirty = key in self._dirty - new = key in self._new + :return: a Deferred. if notify_on_disk is True, will be fired + when written to the db on disk. + Otherwise will fire inmediately + :rtype: Deferred + """ + key = mbox, uid + + d = defer.Deferred() + d.addCallback(lambda result: log.msg("message save: %s" % result)) + + self._dirty.add(key) + self._dirty_deferreds[key] = d + self._add_message(mbox, uid, message, notify_on_disk) + return d - # XXX should capture this in log... + def _add_message(self, mbox, uid, message, notify_on_disk=True): + # XXX have to differentiate between notify_new and notify_dirty + + key = mbox, uid + msg_dict = message.as_dict() + print "ADDING MESSAGE..." + import pprint; pprint.pprint(msg_dict) + + # XXX use the enum as keys + + try: + store = self._msg_store[key] + except KeyError: + self._msg_store[key] = {'fdoc': {}, + 'hdoc': {}, + 'cdocs': {}, + 'docs_id': {}} + store = self._msg_store[key] + + print "In store (before):" + import pprint; pprint.pprint(store) + + #self._msg_store[key] = msg_dict + fdoc = msg_dict.get('fdoc', None) + if fdoc: + if not store.get('fdoc', None): + store['fdoc'] = ReferenciableDict({}) + store['fdoc'].update(fdoc) + + # content-hash indexing + chash = fdoc.get(fields.CONTENT_HASH_KEY) + chash_fdoc_store = self._chash_fdoc_store + if not chash in chash_fdoc_store: + chash_fdoc_store[chash] = {} + + chash_fdoc_store[chash][mbox] = weakref.proxy( + store['fdoc']) + + hdoc = msg_dict.get('hdoc', None) + if hdoc: + if not store.get('hdoc', None): + store['hdoc'] = ReferenciableDict({}) + store['hdoc'].update(hdoc) + + docs_id = msg_dict.get('docs_id', None) + if docs_id: + if not store.get('docs_id', None): + store['docs_id'] = {} + store['docs_id'].update(docs_id) + cdocs = message.cdocs for cdoc_key in cdocs.keys(): - print "saving cdoc" - cdoc = self._msg_store[key]['cdocs'][cdoc_key] + if not store.get('cdocs', None): + store['cdocs'] = {} - # FIXME this should be done in the MessageWrapper constructor - # instead... + cdoc = cdocs[cdoc_key] # first we make it weak-referenciable referenciable_cdoc = ReferenciableDict(cdoc) - self._msg_store[key]['cdocs'][cdoc_key] = MessagePartDoc( - new=new, dirty=dirty, store="mem", - part=MessagePartType.cdoc, - content=referenciable_cdoc) + store['cdocs'][cdoc_key] = referenciable_cdoc phash = cdoc.get(fields.PAYLOAD_HASH_KEY, None) if not phash: continue self._phash_store[phash] = weakref.proxy(referenciable_cdoc) - def put_message(self, mbox, uid, msg): - """ - Put an existing message. - """ - return NotImplementedError() + def prune(seq, store): + for key in seq: + if key in store and empty(store.get(key)): + store.pop(key) + + prune(('fdoc', 'hdoc', 'cdocs', 'docs_id'), store) + #import ipdb; ipdb.set_trace() + + + print "after appending to store: ", key + import pprint; pprint.pprint(self._msg_store[key]) def get_message(self, mbox, uid): """ @@ -203,7 +317,13 @@ class MemoryStore(object): """ Remove a Message from this MemoryStore. """ - raise NotImplementedError() + try: + key = mbox, uid + self._new.discard(key) + self._dirty.discard(key) + self._msg_store.pop(key, None) + except Exception as exc: + logger.exception(exc) # IMessageStoreWriter @@ -211,12 +331,15 @@ class MemoryStore(object): """ Write the message documents in this MemoryStore to a different store. """ - # XXX pass if it's writing (ie, the queue is not empty...) - # See how to make the writing_flag aware of the queue state... - print "writing messages to producer..." + # For now, we pass if the queue is not empty, to avoid duplication. + # We would better use a flag to know when we've already enqueued an + # item. + if not self.producer.is_queue_empty(): + return + print "Writing messages to Soledad..." with set_bool_flag(self, self.WRITING_FLAG): - for msg_wrapper in self.all_msg_iter(): + for msg_wrapper in self.all_new_dirty_msg_iter(): self.producer.push(msg_wrapper) # MemoryStore specific methods. @@ -247,12 +370,14 @@ class MemoryStore(object): """ return len(self._new) - def get_by_phash(self, phash): + def get_cdoc_from_phash(self, phash): """ Return a content-document by its payload-hash. """ doc = self._phash_store.get(phash, None) + # XXX return None for consistency? + # XXX have to keep a mapping between phash and its linkage # info, to know if this payload is been already saved or not. # We will be able to get this from the linkage-docs, @@ -262,7 +387,40 @@ class MemoryStore(object): return MessagePartDoc( new=new, dirty=dirty, store="mem", part=MessagePartType.cdoc, - content=doc) + content=doc, + doc_id=None) + + def get_fdoc_from_chash(self, chash, mbox): + """ + Return a flags-document by its content-hash and a given mailbox. + + :return: MessagePartDoc, or None. + """ + docs_dict = self._chash_fdoc_store.get(chash, None) + fdoc = docs_dict.get(mbox, None) if docs_dict else None + + print "GETTING FDOC BY CHASH:", fdoc + + # a couple of special cases. + # 1. We might have a doc with empty content... + if empty(fdoc): + return None + + # ...Or the message could exist, but being flagged for deletion. + # We want to create a new one in this case. + # Hmmm what if the deletion is un-done?? We would end with a + # duplicate... + if fdoc and fields.DELETED_FLAG in fdoc[fields.FLAGS_KEY]: + return None + + # XXX get flags + new = True + dirty = False + return MessagePartDoc( + new=new, dirty=dirty, store="mem", + part=MessagePartType.fdoc, + content=fdoc, + doc_id=None) def all_msg_iter(self): """ @@ -271,6 +429,25 @@ class MemoryStore(object): return (self.get_message(*key) for key in sorted(self._msg_store.keys())) + def all_new_dirty_msg_iter(self): + """ + Return geneator that iterates through all new and dirty messages. + """ + return (self.get_message(*key) + for key in sorted(self._msg_store.keys()) + if key in self._new or key in self._dirty) + + def all_deleted_uid_iter(self, mbox): + """ + Return generator that iterates through the UIDs for all messags + with deleted flag in a given mailbox. + """ + all_deleted = ( + msg['fdoc']['uid'] for msg in self._msg_store.values() + if msg.get('fdoc', None) + and fields.DELETED_FLAG in msg['fdoc']['flags']) + return all_deleted + # new, dirty flags def _get_new_dirty_state(self, key): @@ -289,9 +466,35 @@ class MemoryStore(object): """ Remove the key value from the `new` set. """ - print "******************" - print "UNSETTING NEW FOR: %s" % str(key) + print "Unsetting NEW for: %s" % str(key) self._new.discard(key) + deferreds = self._new_deferreds + d = deferreds.get(key, None) + if d: + # XXX use a namedtuple for passing the result + # when we check it in the other side. + d.callback('%s, ok' % str(key)) + deferreds.pop(key) + + def set_dirty(self, key): + """ + Add the key value to the `dirty` set. + """ + self._dirty.add(key) + + def unset_dirty(self, key): + """ + Remove the key value from the `dirty` set. + """ + print "Unsetting DIRTY for: %s" % str(key) + self._dirty.discard(key) + deferreds = self._dirty_deferreds + d = deferreds.get(key, None) + if d: + # XXX use a namedtuple for passing the result + # when we check it in the other side. + d.callback('%s, ok' % str(key)) + deferreds.pop(key) @property def is_writing(self): diff --git a/mail/src/leap/mail/imap/messageparts.py b/mail/src/leap/mail/imap/messageparts.py index 42eef02..b43bc37 100644 --- a/mail/src/leap/mail/imap/messageparts.py +++ b/mail/src/leap/mail/imap/messageparts.py @@ -65,15 +65,13 @@ and sometimes to a part in particular only. we have modified its state in memory, so we need to put_doc instead while dumping the MemoryStore contents. `dirty` attribute would only apply to flags-docs and linkage-docs. - - - XXX this is still not implemented! +* `doc_id` is the identifier for the document in the u1db database, if any. """ MessagePartDoc = namedtuple( 'MessagePartDoc', - ['new', 'dirty', 'part', 'store', 'content']) + ['new', 'dirty', 'part', 'store', 'content', 'doc_id']) class ReferenciableDict(dict): @@ -96,6 +94,7 @@ class MessageWrapper(object): FDOC = "fdoc" HDOC = "hdoc" CDOCS = "cdocs" + DOCS_ID = "docs_id" # XXX can use this to limit the memory footprint, # or is it too premature to optimize? @@ -105,12 +104,17 @@ class MessageWrapper(object): def __init__(self, fdoc=None, hdoc=None, cdocs=None, from_dict=None, memstore=None, - new=True, dirty=False): + new=True, dirty=False, docs_id={}): + """ + Initialize a MessageWrapper. + """ + # TODO add optional reference to original message in the incoming self._dict = {} self.memstore = memstore self._new = new self._dirty = dirty + self._storetype = "mem" if from_dict is not None: @@ -122,6 +126,7 @@ class MessageWrapper(object): self._dict[self.HDOC] = ReferenciableDict(hdoc) if cdocs is not None: self._dict[self.CDOCS] = ReferenciableDict(cdocs) + self._dict[self.DOCS_ID] = docs_id # properties @@ -153,10 +158,28 @@ class MessageWrapper(object): doc="The `new` flag for this MessageWrapper") def _get_dirty(self): + """ + Get the value for the `dirty` flag. + """ return self._dirty def _set_dirty(self, value=True): + """ + Set the value for the `dirty` flag, and propagate it + to the memory store if any. + """ self._dirty = value + if self.memstore: + mbox = self.fdoc.content['mbox'] + uid = self.fdoc.content['uid'] + key = mbox, uid + fun = [self.memstore.unset_dirty, + self.memstore.set_dirty][int(value)] + fun(key) + else: + logger.warning("Could not find a memstore referenced from this " + "MessageWrapper. The value for new will not be " + "propagated") dirty = property(_get_dirty, _set_dirty) @@ -173,7 +196,9 @@ class MessageWrapper(object): return MessagePartDoc(new=self.new, dirty=self.dirty, store=self._storetype, part=MessagePartType.fdoc, - content=content_ref) + content=content_ref, + doc_id=self._dict[self.DOCS_ID].get( + self.FDOC, None)) @property def hdoc(self): @@ -186,7 +211,9 @@ class MessageWrapper(object): return MessagePartDoc(new=self.new, dirty=self.dirty, store=self._storetype, part=MessagePartType.hdoc, - content=content_ref) + content=content_ref, + doc_id=self._dict[self.DOCS_ID].get( + self.HDOC, None)) @property def cdocs(self): @@ -201,21 +228,18 @@ class MessageWrapper(object): Generator that iterates through all the parts, returning MessagePartDoc. """ - yield self.fdoc - yield self.hdoc + if self.fdoc is not None: + yield self.fdoc + if self.hdoc is not None: + yield self.hdoc for cdoc in self.cdocs.values(): - # XXX this will break ---- - #content_ref = weakref.proxy(cdoc) - #yield MessagePartDoc(new=self.new, dirty=self.dirty, - #store=self._storetype, - #part=MessagePartType.cdoc, - #content=content_ref) - - # the put is handling this for us, so - # we already have stored a MessagePartDoc - # but we should really do it while adding in the - # constructor or the from_dict method - yield cdoc + if cdoc is not None: + content_ref = weakref.proxy(cdoc) + yield MessagePartDoc(new=self.new, dirty=self.dirty, + store=self._storetype, + part=MessagePartType.cdoc, + content=content_ref, + doc_id=None) # i/o @@ -234,9 +258,9 @@ class MessageWrapper(object): fdoc, hdoc, cdocs = map( lambda part: msg_dict.get(part, None), [self.FDOC, self.HDOC, self.CDOCS]) - self._dict[self.FDOC] = fdoc - self._dict[self.HDOC] = hdoc - self._dict[self.CDOCS] = cdocs + for t, doc in ((self.FDOC, fdoc), (self.HDOC, hdoc), + (self.CDOCS, cdocs)): + self._dict[t] = ReferenciableDict(doc) if doc else None class MessagePart(object): diff --git a/mail/src/leap/mail/imap/messages.py b/mail/src/leap/mail/imap/messages.py index 94bd714..c212472 100644 --- a/mail/src/leap/mail/imap/messages.py +++ b/mail/src/leap/mail/imap/messages.py @@ -37,7 +37,7 @@ from leap.common.check import leap_assert, leap_assert_type from leap.common.decorators import memoized_method from leap.common.mail import get_email_charset from leap.mail import walk -from leap.mail.utils import first, find_charset, lowerdict +from leap.mail.utils import first, find_charset, lowerdict, empty from leap.mail.decorators import deferred from leap.mail.imap.index import IndexedDB from leap.mail.imap.fields import fields, WithMsgFields @@ -130,6 +130,8 @@ class LeapMessage(fields, MailParser, MBoxParser): self.__chash = None self.__bdoc = None + # XXX make these properties public + @property def _fdoc(self): """ @@ -154,8 +156,9 @@ class LeapMessage(fields, MailParser, MBoxParser): """ if self._container is not None: hdoc = self._container.hdoc - if hdoc: + if hdoc and not empty(hdoc.content): return hdoc + # XXX cache this into the memory store !!! return self._get_headers_doc() @property @@ -248,7 +251,13 @@ class LeapMessage(fields, MailParser, MBoxParser): doc.content[self.SEEN_KEY] = self.SEEN_FLAG in flags doc.content[self.DEL_KEY] = self.DELETED_FLAG in flags - if getattr(doc, 'store', None) != "mem": + if self._collection.memstore is not None: + self._collection.memstore.put_message( + self._mbox, self._uid, + MessageWrapper(fdoc=doc.content, new=False, dirty=True, + docs_id={'fdoc': doc.doc_id})) + else: + # fallback for non-memstore initializations. self._soledad.put_doc(doc) def addFlags(self, flags): @@ -547,20 +556,18 @@ class LeapMessage(fields, MailParser, MBoxParser): # phash doc... if self._container is not None: - bdoc = self._container.memstore.get_by_phash(body_phash) + bdoc = self._container.memstore.get_cdoc_from_phash(body_phash) print "bdoc from container -->", bdoc if bdoc and bdoc.content is not None: return bdoc else: print "no doc or not bdoc content for that phash found!" - print "nuthing. soledad?" # no memstore or no doc found there if self._soledad: body_docs = self._soledad.get_from_index( fields.TYPE_P_HASH_IDX, fields.TYPE_CONTENT_VAL, str(body_phash)) - print "returning body docs...", body_docs return first(body_docs) else: logger.error("No phash in container, and no soledad found!") @@ -581,32 +588,32 @@ class LeapMessage(fields, MailParser, MBoxParser): # setters # XXX to be used in the messagecopier interface?! - - def set_uid(self, uid): - """ - Set new uid for this message. - - :param uid: the new uid - :type uid: basestring - """ +# + #def set_uid(self, uid): + #""" + #Set new uid for this message. +# + #:param uid: the new uid + #:type uid: basestring + #""" # XXX dangerous! lock? - self._uid = uid - d = self._fdoc - d.content[self.UID_KEY] = uid - self._soledad.put_doc(d) - - def set_mbox(self, mbox): - """ - Set new mbox for this message. - - :param mbox: the new mbox - :type mbox: basestring - """ + #self._uid = uid + #d = self._fdoc + #d.content[self.UID_KEY] = uid + #self._soledad.put_doc(d) +# + #def set_mbox(self, mbox): + #""" + #Set new mbox for this message. +# + #:param mbox: the new mbox + #:type mbox: basestring + #""" # XXX dangerous! lock? - self._mbox = mbox - d = self._fdoc - d.content[self.MBOX_KEY] = mbox - self._soledad.put_doc(d) + #self._mbox = mbox + #d = self._fdoc + #d.content[self.MBOX_KEY] = mbox + #self._soledad.put_doc(d) # destructor @@ -614,14 +621,13 @@ class LeapMessage(fields, MailParser, MBoxParser): def remove(self): """ Remove all docs associated with this message. + Currently it removes only the flags doc. """ # XXX For the moment we are only removing the flags and headers # docs. The rest we leave there polluting your hard disk, # until we think about a good way of deorphaning. # Maybe a crawler of unreferenced docs. - # XXX remove from memory store!!! - # XXX implement elijah's idea of using a PUT document as a # token to ensure consistency in the removal. @@ -632,13 +638,35 @@ class LeapMessage(fields, MailParser, MBoxParser): #bd = self._get_body_doc() #docs = [fd, hd, bd] - docs = [fd] + try: + memstore = self._collection.memstore + except AttributeError: + memstore = False + + if memstore and hasattr(fd, "store", None) == "mem": + key = self._mbox, self._uid + if fd.new: + # it's a new document, so we can remove it and it will not + # be writen. Watch out! We need to be sure it has not been + # just queued to write! + memstore.remove_message(*key) + + if fd.dirty: + doc_id = fd.doc_id + doc = self._soledad.get_doc(doc_id) + try: + self._soledad.delete_doc(doc) + except Exception as exc: + logger.exception(exc) - for d in filter(None, docs): + else: + # we just got a soledad_doc try: - self._soledad.delete_doc(d) + doc_id = fd.doc_id + latest_doc = self._soledad.get_doc(doc_id) + self._soledad.delete_doc(latest_doc) except Exception as exc: - logger.error(exc) + logger.exception(exc) return uid def does_exist(self): @@ -786,8 +814,10 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): # okay, all in order, keep going... self.mbox = self._parse_mailbox_name(mbox) + + # XXX get a SoledadStore passed instead self._soledad = soledad - self._memstore = memstore + self.memstore = memstore self.__rflags = None self.__hdocset = None @@ -913,13 +943,21 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): :type chash: basestring :return: False, if it does not exist, or UID. """ - exist = self._get_fdoc_from_chash(chash) + exist = False + if self.memstore is not None: + exist = self.memstore.get_fdoc_from_chash(chash, self.mbox) + + if not exist: + exist = self._get_fdoc_from_chash(chash) + + print "FDOC EXIST?", exist if exist: return exist.content.get(fields.UID_KEY, "unknown-uid") else: return False - @deferred + # not deferring to thread cause this now uses deferred asa retval + #@deferred def add_msg(self, raw, subject=None, flags=None, date=None, uid=1): """ Creates a new message document. @@ -945,6 +983,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): # TODO add the linked-from info ! # TODO add reference to the original message + print "ADDING MESSAGE..." logger.debug('adding message') if flags is None: @@ -956,11 +995,14 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): # check for uniqueness. if self._fdoc_already_exists(chash): + print ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>" + print + print logger.warning("We already have that message in this mailbox.") # note that this operation will leave holes in the UID sequence, # but we're gonna change that all the same for a local-only table. # so not touch it by the moment. - return False + return defer.succeed('already_exists') fd = self._populate_flags(flags, uid, chash, size, multi) hd = self._populate_headr(msg, chash, subject, date) @@ -999,7 +1041,16 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): # TODO ---- add reference to original doc, to be deleted # after writes are done. msg_container = MessageWrapper(fd, hd, cdocs) - self._memstore.create_message(self.mbox, uid, msg_container) + + # XXX Should allow also to dump to disk directly, + # for no-memstore cases. + + # we return a deferred that, by default, will be triggered when + # saved to disk + d = self.memstore.create_message(self.mbox, uid, msg_container) + print "defered-add", d + print "adding message", d + return d def _remove_cb(self, result): return result @@ -1247,17 +1298,13 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): or None if not found. :rtype: LeapMessage """ - print "getting msg by id!" - msg_container = self._memstore.get_message(self.mbox, uid) - print "msg container", msg_container + msg_container = self.memstore.get_message(self.mbox, uid) if msg_container is not None: - print "getting LeapMessage (from memstore)" # We pass a reference to soledad just to be able to retrieve # missing parts that cannot be found in the container, like # the content docs after a copy. msg = LeapMessage(self._soledad, uid, self.mbox, collection=self, container=msg_container) - print "got msg:", msg else: msg = LeapMessage(self._soledad, uid, self.mbox, collection=self) if not msg.does_exist(): @@ -1303,8 +1350,8 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): self._soledad.get_from_index( fields.TYPE_MBOX_IDX, fields.TYPE_FLAGS_VAL, self.mbox)]) - if self._memstore is not None: - mem_uids = self._memstore.get_uids(self.mbox) + if self.memstore is not None: + mem_uids = self.memstore.get_uids(self.mbox) uids = db_uids.union(set(mem_uids)) else: uids = db_uids @@ -1328,19 +1375,22 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): Return a dict with all flags documents for this mailbox. """ # XXX get all from memstore and cache it there + # FIXME should get all uids, get them fro memstore, + # and get only the missing ones from disk. + all_flags = dict((( doc.content[self.UID_KEY], doc.content[self.FLAGS_KEY]) for doc in self._soledad.get_from_index( fields.TYPE_MBOX_IDX, fields.TYPE_FLAGS_VAL, self.mbox))) - if self._memstore is not None: + if self.memstore is not None: # XXX - uids = self._memstore.get_uids(self.mbox) - fdocs = [(uid, self._memstore.get_message(self.mbox, uid).fdoc) - for uid in uids] - for uid, doc in fdocs: - all_flags[uid] = doc.content[self.FLAGS_KEY] + uids = self.memstore.get_uids(self.mbox) + docs = ((uid, self.memstore.get_message(self.mbox, uid)) + for uid in uids) + for uid, doc in docs: + all_flags[uid] = doc.fdoc.content[self.FLAGS_KEY] return all_flags @@ -1378,8 +1428,8 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): count = self._soledad.get_count_from_index( fields.TYPE_MBOX_IDX, fields.TYPE_FLAGS_VAL, self.mbox) - if self._memstore is not None: - count += self._memstore.count_new() + if self.memstore is not None: + count += self.memstore.count_new() return count # unseen messages diff --git a/mail/src/leap/mail/imap/soledadstore.py b/mail/src/leap/mail/imap/soledadstore.py index d36acae..b321da8 100644 --- a/mail/src/leap/mail/imap/soledadstore.py +++ b/mail/src/leap/mail/imap/soledadstore.py @@ -81,7 +81,8 @@ class ContentDedup(object): if len(header_docs) != 1: logger.warning("Found more than one copy of chash %s!" % (chash,)) - logger.debug("Found header doc with that hash! Skipping save!") + # XXX re-enable + #logger.debug("Found header doc with that hash! Skipping save!") return True def _content_does_exist(self, doc): @@ -105,7 +106,8 @@ class ContentDedup(object): if len(attach_docs) != 1: logger.warning("Found more than one copy of phash %s!" % (phash,)) - logger.debug("Found attachment doc with that hash! Skipping save!") + # XXX re-enable + #logger.debug("Found attachment doc with that hash! Skipping save!") return True @@ -215,6 +217,7 @@ class SoledadStore(ContentDedup): # If everything went well, we can unset the new flag # in the source store (memory store) msg_wrapper.new = False + msg_wrapper.dirty = False empty = queue.empty() # @@ -261,6 +264,9 @@ class SoledadStore(ContentDedup): # item is expected to be a MessagePartDoc for item in msg_wrapper.walk(): if item.part == MessagePartType.fdoc: + + # FIXME add content duplication for HEADERS too! + # (only 1 chash per mailbox!) yield dict(item.content), call elif item.part == MessagePartType.hdoc: @@ -276,18 +282,31 @@ class SoledadStore(ContentDedup): yield dict(item.content), call + # For now, the only thing that will be dirty is + # the flags doc. + + elif msg_wrapper.dirty is True: + print "DIRTY DOC! ----------------------" + call = self._soledad.put_doc + + # item is expected to be a MessagePartDoc + for item in msg_wrapper.walk(): + doc_id = item.doc_id # defend! + doc = self._soledad.get_doc(doc_id) + doc.content = item.content + + if item.part == MessagePartType.fdoc: + print "Will PUT the doc: ", doc + yield dict(doc), call + + # XXX also for linkage-doc + # TODO should write back to the queue # with the results of the operation. # We can write there: # (*) MsgWriteACK --> Should remove from incoming queue. # (We should do this here). - # Implement using callbacks for each operation. - # TODO should check for elements with the dirty state - # TODO if new == False and dirty == True, put_doc - # XXX for puts, we will have to retrieve - # the document, change the content, and - # pass the whole document under "content" else: logger.error("Cannot put/delete documents yet!") diff --git a/mail/src/leap/mail/messageflow.py b/mail/src/leap/mail/messageflow.py index ed6abcd..b7fc030 100644 --- a/mail/src/leap/mail/messageflow.py +++ b/mail/src/leap/mail/messageflow.py @@ -126,9 +126,15 @@ class MessageProducer(object): again after the addition of new items. """ self._consumer.consume(self._queue) - if self._queue.empty(): + if self.is_queue_empty(): self.stop() + def is_queue_empty(self): + """ + Return True if queue is empty, False otherwise. + """ + return self._queue.empty() + # public methods: IMessageProducer def push(self, item): diff --git a/mail/src/leap/mail/utils.py b/mail/src/leap/mail/utils.py index 64af04f..bae2898 100644 --- a/mail/src/leap/mail/utils.py +++ b/mail/src/leap/mail/utils.py @@ -36,6 +36,15 @@ def first(things): return None +def empty(thing): + """ + Return True if a thing is None or its length is zero. + """ + if thing is None: + return True + return len(thing) == 0 + + def maybe_call(thing): """ Return the same thing, or the result of its invocation if it is a -- cgit v1.2.3