From f5365ae0c2edb8b3e879f876f2f7e42b25f4616a Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Mon, 27 Jan 2014 16:11:53 -0400 Subject: handle last_uid property in memory store --- src/leap/mail/imap/mailbox.py | 131 +++++------ src/leap/mail/imap/memorystore.py | 236 +++++++++++++++---- src/leap/mail/imap/messageparts.py | 26 ++- src/leap/mail/imap/messages.py | 336 ++++++++++++--------------- src/leap/mail/imap/server.py | 1 + src/leap/mail/imap/soledadstore.py | 129 +++++++--- src/leap/mail/imap/tests/leap_tests_imap.zsh | 7 +- src/leap/mail/utils.py | 5 +- 8 files changed, 532 insertions(+), 339 deletions(-) (limited to 'src/leap') diff --git a/src/leap/mail/imap/mailbox.py b/src/leap/mail/imap/mailbox.py index 108d0da..b5c5719 100644 --- a/src/leap/mail/imap/mailbox.py +++ b/src/leap/mail/imap/mailbox.py @@ -26,7 +26,7 @@ import cStringIO from collections import defaultdict from twisted.internet import defer -from twisted.internet.task import deferLater +#from twisted.internet.task import deferLater from twisted.python import log from twisted.mail import imap4 @@ -119,6 +119,9 @@ class SoledadMailbox(WithMsgFields, MBoxParser): if not self.getFlags(): self.setFlags(self.INIT_FLAGS) + if self._memstore: + self.prime_last_uid_to_memstore() + @property def listeners(self): """ @@ -132,6 +135,9 @@ class SoledadMailbox(WithMsgFields, MBoxParser): """ return self._listeners[self.mbox] + # TODO this grows too crazily when many instances are fired, like + # during imaptest stress testing. Should have a queue of limited size + # instead. def addListener(self, listener): """ Add a listener to the listeners queue. @@ -153,6 +159,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser): """ self.listeners.remove(listener) + # TODO move completely to soledadstore, under memstore reponsibility. def _get_mbox(self): """ Return mailbox document. @@ -228,52 +235,28 @@ class SoledadMailbox(WithMsgFields, MBoxParser): def _get_last_uid(self): """ Return the last uid for this mailbox. + If we have a memory store, the last UID will be the highest + recorded UID in the message store, or a counter cached from + the mailbox document in soledad if this is higher. :return: the last uid for messages in this mailbox :rtype: bool """ - mbox = self._get_mbox() - if not mbox: - logger.error("We could not get a mbox!") - # XXX It looks like it has been corrupted. - # We need to be able to survive this. - return None - last = mbox.content.get(self.LAST_UID_KEY, 1) - if self._memstore: - last = max(last, self._memstore.get_last_uid(mbox)) + last = self._memstore.get_last_uid(self.mbox) + print "last uid for %s: %s (from memstore)" % (self.mbox, last) return last - def _set_last_uid(self, uid): - """ - Sets the last uid for this mailbox. + last_uid = property( + _get_last_uid, doc="Last_UID attribute.") - :param uid: the uid to be set - :type uid: int + def prime_last_uid_to_memstore(self): """ - leap_assert(isinstance(uid, int), "uid has to be int") - mbox = self._get_mbox() - key = self.LAST_UID_KEY - - count = self.getMessageCount() - - # XXX safety-catch. If we do get duplicates, - # we want to avoid further duplication. - - if uid >= count: - value = uid - else: - # something is wrong, - # just set the last uid - # beyond the max msg count. - logger.debug("WRONG uid < count. Setting last uid to %s", count) - value = count - - mbox.content[key] = value - # XXX this should be set in the memorystore instead!!! - self._soledad.put_doc(mbox) - - last_uid = property( - _get_last_uid, _set_last_uid, doc="Last_UID attribute.") + Prime memstore with last_uid value + """ + set_exist = set(self.messages.all_uid_iter()) + last = max(set_exist) + 1 if set_exist else 1 + logger.info("Priming Soledad last_uid to %s" % (last,)) + self._memstore.set_last_soledad_uid(self.mbox, last) def getUIDValidity(self): """ @@ -315,8 +298,15 @@ class SoledadMailbox(WithMsgFields, MBoxParser): :rtype: int """ with self.next_uid_lock: - self.last_uid += 1 - return self.last_uid + if self._memstore: + return self.last_uid + 1 + else: + # XXX after lock, it should be safe to + # return just the increment here, and + # have a different method that actually increments + # the counter when really adding. + self.last_uid += 1 + return self.last_uid def getMessageCount(self): """ @@ -397,26 +387,26 @@ class SoledadMailbox(WithMsgFields, MBoxParser): :return: a deferred that evals to None """ + # TODO have a look at the cases for internal date in the rfc if isinstance(message, (cStringIO.OutputType, StringIO.StringIO)): message = message.getvalue() - # XXX we should treat the message as an IMessage from here + + # XXX we could treat the message as an IMessage from here leap_assert_type(message, basestring) - uid_next = self.getUIDNext() - logger.debug('Adding msg with UID :%s' % uid_next) if flags is None: flags = tuple() else: flags = tuple(str(flag) for flag in flags) - d = self._do_add_message(message, flags=flags, date=date, uid=uid_next) + d = self._do_add_message(message, flags=flags, date=date) return d - def _do_add_message(self, message, flags, date, uid): + def _do_add_message(self, message, flags, date): """ - Calls to the messageCollection add_msg method (deferred to thread). + Calls to the messageCollection add_msg method. Invoked from addMessage. """ - d = self.messages.add_msg(message, flags=flags, date=date, uid=uid) + d = self.messages.add_msg(message, flags=flags, date=date) # XXX Removing notify temporarily. # This is interfering with imaptest results. I'm not clear if it's # because we clutter the logging or because the set of listeners is @@ -456,6 +446,8 @@ class SoledadMailbox(WithMsgFields, MBoxParser): # XXX removing the mailbox in situ for now, # we should postpone the removal + + # XXX move to memory store?? self._soledad.delete_doc(self._get_mbox()) def _close_cb(self, result): @@ -466,8 +458,8 @@ class SoledadMailbox(WithMsgFields, MBoxParser): Expunge and mark as closed """ d = self.expunge() - d.addCallback(self._close_cb) - return d + #d.addCallback(self._close_cb) + #return d def _expunge_cb(self, result): return result @@ -479,22 +471,15 @@ class SoledadMailbox(WithMsgFields, MBoxParser): print "EXPUNGE!" if not self.isWriteable(): raise imap4.ReadOnlyMailbox - mstore = self._memstore - if mstore is not None: - deleted = mstore.all_deleted_uid_iter(self.mbox) - print "deleted ", list(deleted) - for uid in deleted: - mstore.remove_message(self.mbox, uid) - - print "now deleting from soledad" - d = self.messages.remove_all_deleted() - d.addCallback(self._expunge_cb) - d.addCallback(self.messages.reset_last_uid) - - # XXX DEBUG ------------------- - # FIXME !!! - # XXX should remove the hdocset too!!! - return d + + return self._memstore.expunge(self.mbox) + + # TODO we can defer this back when it's correct + # but we should make sure the memstore has been synced. + + #d = self._memstore.expunge(self.mbox) + #d.addCallback(self._expunge_cb) + #return d def _bound_seq(self, messages_asked): """ @@ -783,12 +768,12 @@ class SoledadMailbox(WithMsgFields, MBoxParser): # IMessageCopier @deferred + #@profile def copy(self, messageObject): """ Copy the given message object into this mailbox. """ from twisted.internet import reactor - uid_next = self.getUIDNext() msg = messageObject memstore = self._memstore @@ -796,7 +781,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser): fdoc = msg._fdoc hdoc = msg._hdoc if not fdoc: - logger.debug("Tried to copy a MSG with no fdoc") + logger.warning("Tried to copy a MSG with no fdoc") return new_fdoc = copy.deepcopy(fdoc.content) @@ -807,11 +792,12 @@ class SoledadMailbox(WithMsgFields, MBoxParser): if exist: print "Destination message already exists!" - else: print "DO COPY MESSAGE!" + mbox = self.mbox + uid_next = memstore.increment_last_soledad_uid(mbox) new_fdoc[self.UID_KEY] = uid_next - new_fdoc[self.MBOX_KEY] = self.mbox + new_fdoc[self.MBOX_KEY] = mbox # XXX set recent! @@ -824,9 +810,8 @@ class SoledadMailbox(WithMsgFields, MBoxParser): self._memstore.create_message( self.mbox, uid_next, MessageWrapper( - new_fdoc, hdoc.content)) - - deferLater(reactor, 1, self.notify_new) + new_fdoc, hdoc.content), + notify_on_disk=False) # convenience fun diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py index 232a2fb..60e98c7 100644 --- a/src/leap/mail/imap/memorystore.py +++ b/src/leap/mail/imap/memorystore.py @@ -19,16 +19,20 @@ In-memory transient store for a LEAPIMAPServer. """ import contextlib import logging +import threading import weakref from collections import defaultdict +from copy import copy from twisted.internet import defer from twisted.internet.task import LoopingCall from twisted.python import log from zope.interface import implements +from leap.common.check import leap_assert_type from leap.mail import size +from leap.mail.decorators import deferred from leap.mail.utils import empty from leap.mail.messageflow import MessageProducer from leap.mail.imap import interfaces @@ -40,7 +44,10 @@ from leap.mail.imap.messageparts import ReferenciableDict logger = logging.getLogger(__name__) -SOLEDAD_WRITE_PERIOD = 20 + +# The default period to do writebacks to the permanent +# soledad storage, in seconds. +SOLEDAD_WRITE_PERIOD = 10 @contextlib.contextmanager @@ -76,16 +83,11 @@ class MemoryStore(object): implements(interfaces.IMessageStore, interfaces.IMessageStoreWriter) - producer = None - # TODO We will want to index by chash when we transition to local-only # UIDs. - # TODO should store RECENT-FLAGS too - # TODO should store HDOCSET too (use weakrefs!) -- will need to subclass - # TODO do use dirty flag (maybe use namedtuples for that) so we can use it - # also as a read-cache. WRITING_FLAG = "_writing" + _last_uid_lock = threading.Lock() def __init__(self, permanent_store=None, write_period=SOLEDAD_WRITE_PERIOD): @@ -138,17 +140,20 @@ class MemoryStore(object): self._rflags_store = defaultdict( lambda: {'doc_id': None, 'set': set([])}) - # TODO ----------------- implement mailbox-level flags store too? - # XXX maybe we don't need this anymore... - # let's see how good does it prefetch the headers if - # we cache them in the store. - self._hdocset_store = {} - # -------------------------------------------------------------- + """ + last-uid store keeps the count of the highest UID + per mailbox. + + {'mbox-a': 42, + 'mbox-b': 23} + """ + self._last_uid = {} # New and dirty flags, to set MessageWrapper State. self._new = set([]) self._new_deferreds = {} self._dirty = set([]) + self._rflags_dirty = set([]) self._dirty_deferreds = {} # Flag for signaling we're busy writing to the disk storage. @@ -210,14 +215,25 @@ class MemoryStore(object): print "adding new doc to memstore %s (%s)" % (mbox, uid) key = mbox, uid + self._add_message(mbox, uid, message, notify_on_disk) + d = defer.Deferred() d.addCallback(lambda result: log.msg("message save: %s" % result)) - self._new.add(key) + + # We store this deferred so we can keep track of the pending + # operations internally. self._new_deferreds[key] = d - self._add_message(mbox, uid, message, notify_on_disk) - print "create message: ", d - return d + + if notify_on_disk: + # Caller wants to be notified when the message is on disk + # so we pass the deferred that will be fired when the message + # has been written. + return d + else: + # Caller does not care, just fired and forgot, so we pass + # a defer that will inmediately have its callback triggered. + return defer.succeed('fire-and-forget:%s' % str(key)) def put_message(self, mbox, uid, message, notify_on_disk=True): """ @@ -238,13 +254,14 @@ class MemoryStore(object): :rtype: Deferred """ key = mbox, uid - d = defer.Deferred() - d.addCallback(lambda result: log.msg("message save: %s" % result)) + d.addCallback(lambda result: log.msg("message PUT save: %s" % result)) self._dirty.add(key) self._dirty_deferreds[key] = d self._add_message(mbox, uid, message, notify_on_disk) + #print "dirty ", self._dirty + #print "new ", self._new return d def _add_message(self, mbox, uid, message, notify_on_disk=True): @@ -315,6 +332,19 @@ class MemoryStore(object): store.pop(key) prune((FDOC, HDOC, CDOCS, DOCS_ID), store) + #print "after adding: " + #import pprint; pprint.pprint(self._msg_store[key]) + + def get_docid_for_fdoc(self, mbox, uid): + """ + Get Soledad document id for the flags-doc for a given mbox and uid. + """ + fdoc = self._permanent_store.get_flags_doc(mbox, uid) + if not fdoc: + return None + doc_id = fdoc.doc_id + return doc_id + def get_message(self, mbox, uid): """ Get a MessageWrapper for the given mbox and uid combination. @@ -326,6 +356,8 @@ class MemoryStore(object): if msg_dict: new, dirty = self._get_new_dirty_state(key) return MessageWrapper(from_dict=msg_dict, + new=new, + dirty=dirty, memstore=weakref.proxy(self)) else: return None @@ -334,6 +366,13 @@ class MemoryStore(object): """ Remove a Message from this MemoryStore. """ + # XXX For the moment we are only removing the flags and headers + # docs. The rest we leave there polluting your hard disk, + # until we think about a good way of deorphaning. + + # XXX implement elijah's idea of using a PUT document as a + # token to ensure consistency in the removal. + try: key = mbox, uid self._new.discard(key) @@ -348,18 +387,22 @@ class MemoryStore(object): """ Write the message documents in this MemoryStore to a different store. """ - # For now, we pass if the queue is not empty, to avoid duplication. + # For now, we pass if the queue is not empty, to avoid duplicate + # queuing. # We would better use a flag to know when we've already enqueued an # item. + + # XXX this could return the deferred for all the enqueued operations + if not self.producer.is_queue_empty(): return print "Writing messages to Soledad..." with set_bool_flag(self, self.WRITING_FLAG): - for msg_wrapper in self.all_new_dirty_msg_iter(): - self.producer.push(msg_wrapper) for rflags_doc_wrapper in self.all_rdocs_iter(): self.producer.push(rflags_doc_wrapper) + for msg_wrapper in self.all_new_dirty_msg_iter(): + self.producer.push(msg_wrapper) # MemoryStore specific methods. @@ -370,12 +413,61 @@ class MemoryStore(object): all_keys = self._msg_store.keys() return [uid for m, uid in all_keys if m == mbox] + # last_uid + def get_last_uid(self, mbox): """ Get the highest UID for a given mbox. + It will be the highest between the highest uid in the message store for + the mailbox, and the soledad integer cache. """ uids = self.get_uids(mbox) - return uids and max(uids) or 0 + last_mem_uid = uids and max(uids) or 0 + last_soledad_uid = self.get_last_soledad_uid(mbox) + return max(last_mem_uid, last_soledad_uid) + + def get_last_soledad_uid(self, mbox): + """ + Get last uid for a given mbox from the soledad integer cache. + """ + return self._last_uid.get(mbox, 0) + + def set_last_soledad_uid(self, mbox, value): + """ + Set last uid for a given mbox in the soledad integer cache. + SoledadMailbox should prime this value during initialization. + Other methods (during message adding) SHOULD call + `increment_last_soledad_uid` instead. + """ + leap_assert_type(value, int) + print "setting last soledad uid for ", mbox, "to", value + # if we already have a vlue here, don't do anything + with self._last_uid_lock: + if not self._last_uid.get(mbox, None): + self._last_uid[mbox] = value + + def increment_last_soledad_uid(self, mbox): + """ + Increment by one the soledad integer cache for the last_uid for + this mbox, and fire a defer-to-thread to update the soledad value. + The caller should lock the call tho this method. + """ + with self._last_uid_lock: + self._last_uid[mbox] += 1 + value = self._last_uid[mbox] + self.write_last_uid(mbox, value) + return value + + @deferred + def write_last_uid(self, mbox, value): + """ + Increment the soledad cache, + """ + leap_assert_type(value, int) + if self._permanent_store: + self._permanent_store.write_last_uid(mbox, value) + + # Counting sheeps... def count_new_mbox(self, mbox): """ @@ -418,14 +510,12 @@ class MemoryStore(object): docs_dict = self._chash_fdoc_store.get(chash, None) fdoc = docs_dict.get(mbox, None) if docs_dict else None - print "GETTING FDOC BY CHASH:", fdoc - # a couple of special cases. # 1. We might have a doc with empty content... if empty(fdoc): return None - # ...Or the message could exist, but being flagged for deletion. + # 2. ...Or the message could exist, but being flagged for deletion. # We want to create a new one in this case. # Hmmm what if the deletion is un-done?? We would end with a # duplicate... @@ -456,15 +546,22 @@ class MemoryStore(object): for key in sorted(self._msg_store.keys()) if key in self._new or key in self._dirty) + def all_msg_dict_for_mbox(self, mbox): + """ + Return all the message dicts for a given mbox. + """ + return [self._msg_store[(mb, uid)] + for mb, uid in self._msg_store if mb == mbox] + def all_deleted_uid_iter(self, mbox): """ Return generator that iterates through the UIDs for all messags with deleted flag in a given mailbox. """ - all_deleted = ( - msg['fdoc']['uid'] for msg in self._msg_store.values() + all_deleted = [ + msg['fdoc']['uid'] for msg in self.all_msg_dict_for_mbox(mbox) if msg.get('fdoc', None) - and fields.DELETED_FLAG in msg['fdoc']['flags']) + and fields.DELETED_FLAG in msg['fdoc']['flags']] return all_deleted # new, dirty flags @@ -473,6 +570,7 @@ class MemoryStore(object): """ Return `new` and `dirty` flags for a given message. """ + # XXX should return *first* the news, and *then* the dirty... return map(lambda _set: key in _set, (self._new, self._dirty)) def set_new(self, key): @@ -485,7 +583,7 @@ class MemoryStore(object): """ Remove the key value from the `new` set. """ - print "Unsetting NEW for: %s" % str(key) + #print "Unsetting NEW for: %s" % str(key) self._new.discard(key) deferreds = self._new_deferreds d = deferreds.get(key, None) @@ -505,7 +603,7 @@ class MemoryStore(object): """ Remove the key value from the `dirty` set. """ - print "Unsetting DIRTY for: %s" % str(key) + #print "Unsetting DIRTY for: %s" % str(key) self._dirty.discard(key) deferreds = self._dirty_deferreds d = deferreds.get(key, None) @@ -522,6 +620,7 @@ class MemoryStore(object): """ Set the `Recent` flag for a given mailbox and UID. """ + self._rflags_dirty.add(mbox) self._rflags_store[mbox]['set'].add(uid) # TODO --- nice but unused @@ -536,6 +635,7 @@ class MemoryStore(object): Set the value for the set of the recent flags. Used from the property in the MessageCollection. """ + self._rflags_dirty.add(mbox) self._rflags_store[mbox]['set'] = set(value) def load_recent_flags(self, mbox, flags_doc): @@ -568,23 +668,81 @@ class MemoryStore(object): :rtype: generator """ - rflags_store = self._rflags_store - # XXX use enums DOC_ID = "doc_id" SET = "set" - print "LEN RFLAGS_STORE ------->", len(rflags_store) - return ( - RecentFlagsDoc( + rflags_store = self._rflags_store + + def get_rdoc(mbox, rdict): + mbox_rflag_set = rdict[SET] + recent_set = copy(mbox_rflag_set) + # zero it! + mbox_rflag_set.difference_update(mbox_rflag_set) + return RecentFlagsDoc( doc_id=rflags_store[mbox][DOC_ID], content={ fields.TYPE_KEY: fields.TYPE_RECENT_VAL, fields.MBOX_KEY: mbox, - fields.RECENTFLAGS_KEY: list( - rflags_store[mbox][SET]) + fields.RECENTFLAGS_KEY: list(recent_set) }) - for mbox in rflags_store) + + return (get_rdoc(mbox, rdict) for mbox, rdict in rflags_store.items() + if not empty(rdict[SET])) + + # Methods that mirror the IMailbox interface + + def remove_all_deleted(self, mbox): + """ + Remove all messages flagged \\Deleted from this Memory Store only. + Called from `expunge` + """ + mem_deleted = self.all_deleted_uid_iter(mbox) + for uid in mem_deleted: + self.remove_message(mbox, uid) + return mem_deleted + + def expunge(self, mbox): + """ + Remove all messages flagged \\Deleted, from the Memory Store + and from the permanent store also. + """ + # TODO expunge should add itself as a callback to the ongoing + # writes. + soledad_store = self._permanent_store + + try: + # 1. Stop the writing call + self._stop_write_loop() + # 2. Enqueue a last write. + #self.write_messages(soledad_store) + # 3. Should wait on the writebacks to finish ??? + # FIXME wait for this, and add all the rest of the method + # as a callback!!! + except Exception as exc: + logger.exception(exc) + + # Now, we...: + + try: + # 1. Delete all messages marked as deleted in soledad. + + # XXX this could be deferred for faster operation. + if soledad_store: + sol_deleted = soledad_store.remove_all_deleted(mbox) + else: + sol_deleted = [] + + # 2. Delete all messages marked as deleted in memory. + mem_deleted = self.remove_all_deleted(mbox) + + all_deleted = set(mem_deleted).union(set(sol_deleted)) + print "deleted ", all_deleted + except Exception as exc: + logger.exception(exc) + finally: + self._start_write_loop() + return all_deleted # Dump-to-disk controls. diff --git a/src/leap/mail/imap/messageparts.py b/src/leap/mail/imap/messageparts.py index 257d3f0..6d8631a 100644 --- a/src/leap/mail/imap/messageparts.py +++ b/src/leap/mail/imap/messageparts.py @@ -32,7 +32,7 @@ from leap.common.decorators import memoized_method from leap.common.mail import get_email_charset from leap.mail.imap import interfaces from leap.mail.imap.fields import fields -from leap.mail.utils import first +from leap.mail.utils import empty, first MessagePartType = Enum("hdoc", "fdoc", "cdoc", "cdocs", "docs_id") @@ -134,6 +134,13 @@ class MessageWrapper(object): self._dict[self.HDOC] = ReferenciableDict(hdoc) if cdocs is not None: self._dict[self.CDOCS] = ReferenciableDict(cdocs) + + # This will keep references to the doc_ids to be able to put + # messages to soledad. It will be populated during the walk() to avoid + # the overhead of reading from the db. + + # XXX it really *only* make sense for the FDOC, the other parts + # should not be "dirty", just new...!!! self._dict[self.DOCS_ID] = docs_id # properties @@ -201,6 +208,7 @@ class MessageWrapper(object): else: logger.warning("NO FDOC!!!") content_ref = {} + return MessagePartDoc(new=self.new, dirty=self.dirty, store=self._storetype, part=MessagePartType.fdoc, @@ -214,7 +222,6 @@ class MessageWrapper(object): if _hdoc: content_ref = weakref.proxy(_hdoc) else: - logger.warning("NO HDOC!!!!") content_ref = {} return MessagePartDoc(new=self.new, dirty=self.dirty, store=self._storetype, @@ -234,14 +241,21 @@ class MessageWrapper(object): def walk(self): """ Generator that iterates through all the parts, returning - MessagePartDoc. + MessagePartDoc. Used for writing to SoledadStore. """ - if self.fdoc is not None: + if self._dirty: + mbox = self.fdoc.content[fields.MBOX_KEY] + uid = self.fdoc.content[fields.UID_KEY] + docid_dict = self._dict[self.DOCS_ID] + docid_dict[self.FDOC] = self.memstore.get_docid_for_fdoc( + mbox, uid) + + if not empty(self.fdoc.content): yield self.fdoc - if self.hdoc is not None: + if not empty(self.hdoc.content): yield self.hdoc for cdoc in self.cdocs.values(): - if cdoc is not None: + if not empty(cdoc): content_ref = weakref.proxy(cdoc) yield MessagePartDoc(new=self.new, dirty=self.dirty, store=self._storetype, diff --git a/src/leap/mail/imap/messages.py b/src/leap/mail/imap/messages.py index 5de638b..35c07f5 100644 --- a/src/leap/mail/imap/messages.py +++ b/src/leap/mail/imap/messages.py @@ -202,21 +202,21 @@ class LeapMessage(fields, MailParser, MBoxParser): :return: The flags, represented as strings :rtype: tuple """ - if self._uid is None: - return [] + #if self._uid is None: + #return [] uid = self._uid - flags = [] + flags = set([]) fdoc = self._fdoc if fdoc: - flags = fdoc.content.get(self.FLAGS_KEY, None) + flags = set(fdoc.content.get(self.FLAGS_KEY, None)) msgcol = self._collection # We treat the recent flag specially: gotten from # a mailbox-level document. if msgcol and uid in msgcol.recent_flags: - flags.append(fields.RECENT_FLAG) + flags.add(fields.RECENT_FLAG) if flags: flags = map(str, flags) return tuple(flags) @@ -236,7 +236,7 @@ class LeapMessage(fields, MailParser, MBoxParser): :return: a SoledadDocument instance :rtype: SoledadDocument """ - # XXX use memory store ...! + # XXX Move logic to memory store ... leap_assert(isinstance(flags, tuple), "flags need to be a tuple") log.msg('setting flags: %s (%s)' % (self._uid, flags)) @@ -252,6 +252,7 @@ class LeapMessage(fields, MailParser, MBoxParser): doc.content[self.DEL_KEY] = self.DELETED_FLAG in flags if self._collection.memstore is not None: + print "putting message in collection" self._collection.memstore.put_message( self._mbox, self._uid, MessageWrapper(fdoc=doc.content, new=False, dirty=True, @@ -508,6 +509,8 @@ class LeapMessage(fields, MailParser, MBoxParser): pmap = hdoc_content.get(fields.PARTS_MAP_KEY, {}) return pmap[str(part)] + # XXX moved to memory store + # move the rest too. ------------------------------------------ def _get_flags_doc(self): """ Return the document that keeps the flags for this @@ -617,57 +620,38 @@ class LeapMessage(fields, MailParser, MBoxParser): # destructor - @deferred - def remove(self): - """ - Remove all docs associated with this message. - Currently it removes only the flags doc. - """ - # XXX For the moment we are only removing the flags and headers - # docs. The rest we leave there polluting your hard disk, - # until we think about a good way of deorphaning. - # Maybe a crawler of unreferenced docs. - - # XXX implement elijah's idea of using a PUT document as a - # token to ensure consistency in the removal. - - uid = self._uid - - fd = self._get_flags_doc() - #hd = self._get_headers_doc() - #bd = self._get_body_doc() - #docs = [fd, hd, bd] - - try: - memstore = self._collection.memstore - except AttributeError: - memstore = False - - if memstore and hasattr(fd, "store", None) == "mem": - key = self._mbox, self._uid - if fd.new: - # it's a new document, so we can remove it and it will not - # be writen. Watch out! We need to be sure it has not been - # just queued to write! - memstore.remove_message(*key) - - if fd.dirty: - doc_id = fd.doc_id - doc = self._soledad.get_doc(doc_id) - try: - self._soledad.delete_doc(doc) - except Exception as exc: - logger.exception(exc) - - else: + # XXX this logic moved to remove_message in memory store... + #@deferred + #def remove(self): + #""" + #Remove all docs associated with this message. + #Currently it removes only the flags doc. + #""" + #fd = self._get_flags_doc() +# + #if fd.new: + # it's a new document, so we can remove it and it will not + # be writen. Watch out! We need to be sure it has not been + # just queued to write! + #memstore.remove_message(*key) +# + #if fd.dirty: + #doc_id = fd.doc_id + #doc = self._soledad.get_doc(doc_id) + #try: + #self._soledad.delete_doc(doc) + #except Exception as exc: + #logger.exception(exc) +# + #else: # we just got a soledad_doc - try: - doc_id = fd.doc_id - latest_doc = self._soledad.get_doc(doc_id) - self._soledad.delete_doc(latest_doc) - except Exception as exc: - logger.exception(exc) - return uid + #try: + #doc_id = fd.doc_id + #latest_doc = self._soledad.get_doc(doc_id) + #self._soledad.delete_doc(latest_doc) + #except Exception as exc: + #logger.exception(exc) + #return uid def does_exist(self): """ @@ -826,7 +810,9 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): # ensure that we have a recent-flags and a hdocs-sec doc self._get_or_create_rdoc() - self._get_or_create_hdocset() + + # Not for now... + #self._get_or_create_hdocset() def _get_empty_doc(self, _type=FLAGS_DOC): """ @@ -959,7 +945,9 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): # not deferring to thread cause this now uses deferred asa retval #@deferred - def add_msg(self, raw, subject=None, flags=None, date=None, uid=1): + #@profile + def add_msg(self, raw, subject=None, flags=None, date=None, uid=None, + notify_on_disk=False): """ Creates a new message document. Here lives the magic of the leap mail. Well, in soledad, really. @@ -994,7 +982,11 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): # parse msg, chash, size, multi = self._do_parse(raw) - # check for uniqueness. + # check for uniqueness -------------------------------- + # XXX profiler says that this test is costly. + # So we probably should just do an in-memory check and + # move the complete check to the soledad writer? + # Watch out! We're reserving a UID right after this! if self._fdoc_already_exists(chash): print ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>" logger.warning("We already have that message in this mailbox.") @@ -1003,6 +995,9 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): # so not touch it by the moment. return defer.succeed('already_exists') + uid = self.memstore.increment_last_soledad_uid(self.mbox) + print "ADDING MSG WITH UID: %s" % uid + fd = self._populate_flags(flags, uid, chash, size, multi) hd = self._populate_headr(msg, chash, subject, date) @@ -1039,36 +1034,22 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): # XXX Should allow also to dump to disk directly, # for no-memstore cases. - # we return a deferred that, by default, will be triggered when - # saved to disk - d = self.memstore.create_message(self.mbox, uid, msg_container) - print "defered-add", d + # we return a deferred that by default will be triggered + # inmediately. + d = self.memstore.create_message(self.mbox, uid, msg_container, + notify_on_disk=notify_on_disk) print "adding message", d return d - def _remove_cb(self, result): - return result - - def remove_all_deleted(self): - """ - Removes all messages flagged as deleted. - """ - delete_deferl = [] - for msg in self.get_deleted(): - delete_deferl.append(msg.remove()) - d1 = defer.gatherResults(delete_deferl, consumeErrors=True) - d1.addCallback(self._remove_cb) - return d1 - - def remove(self, msg): - """ - Remove a given msg. - :param msg: the message to be removed - :type msg: LeapMessage - """ - d = msg.remove() - d.addCallback(self._remove_cb) - return d + #def remove(self, msg): + #""" + #Remove a given msg. + #:param msg: the message to be removed + #:type msg: LeapMessage + #""" + #d = msg.remove() + #d.addCallback(self._remove_cb) + #return d # # getters: specific queries @@ -1175,76 +1156,76 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): # XXX FIXME ------------------------------------- # This should be rewritten to use memory store. - def _get_hdocset(self): - """ - An accessor for the hdocs-set for this mailbox. - """ - if not self.__hdocset: - with self._hdocset_lock: - hdocset_doc = self._get_hdocset_doc() - value = set(hdocset_doc.content.get( - fields.HDOCS_SET_KEY, [])) - self.__hdocset = value - return self.__hdocset - - def _set_hdocset(self, value): - """ - Setter for the hdocs-set for this mailbox. - """ - with self._hdocset_lock: - hdocset_doc = self._get_hdocset_doc() - newv = set(value) - self.__hdocset = newv - hdocset_doc.content[fields.HDOCS_SET_KEY] = list(newv) + #def _get_hdocset(self): + #""" + #An accessor for the hdocs-set for this mailbox. + #""" + #if not self.__hdocset: + #with self._hdocset_lock: + #hdocset_doc = self._get_hdocset_doc() + #value = set(hdocset_doc.content.get( + #fields.HDOCS_SET_KEY, [])) + #self.__hdocset = value + #return self.__hdocset +# + #def _set_hdocset(self, value): + #""" + #Setter for the hdocs-set for this mailbox. + #""" + #with self._hdocset_lock: + #hdocset_doc = self._get_hdocset_doc() + #newv = set(value) + #self.__hdocset = newv + #hdocset_doc.content[fields.HDOCS_SET_KEY] = list(newv) # XXX should deferLater 0 it? - self._soledad.put_doc(hdocset_doc) - - _hdocset = property( - _get_hdocset, _set_hdocset, - doc="Set of Document-IDs for the headers docs associated " - "with this mailbox.") - - def _get_hdocset_doc(self): - """ - Get hdocs-set document for this mailbox. - """ - curried = partial( - self._soledad.get_from_index, - fields.TYPE_MBOX_IDX, - fields.TYPE_HDOCS_SET_VAL, self.mbox) - curried.expected = "hdocset" - hdocset_doc = try_unique_query(curried) - return hdocset_doc - + #self._soledad.put_doc(hdocset_doc) +# + #_hdocset = property( + #_get_hdocset, _set_hdocset, + #doc="Set of Document-IDs for the headers docs associated " + #"with this mailbox.") +# + #def _get_hdocset_doc(self): + #""" + #Get hdocs-set document for this mailbox. + #""" + #curried = partial( + #self._soledad.get_from_index, + #fields.TYPE_MBOX_IDX, + #fields.TYPE_HDOCS_SET_VAL, self.mbox) + #curried.expected = "hdocset" + #hdocset_doc = try_unique_query(curried) + #return hdocset_doc +# # Property-set modification (protected by a different # lock to give atomicity to the read/write operation) - - def remove_hdocset_docids(self, docids): - """ - Remove the given document IDs from the set of - header-documents associated with this mailbox. - """ - with self._hdocset_property_lock: - self._hdocset = self._hdocset.difference( - set(docids)) - - def remove_hdocset_docid(self, docid): - """ - Remove the given document ID from the set of - header-documents associated with this mailbox. - """ - with self._hdocset_property_lock: - self._hdocset = self._hdocset.difference( - set([docid])) - - def add_hdocset_docid(self, docid): - """ - Add the given document ID to the set of - header-documents associated with this mailbox. - """ - with self._hdocset_property_lock: - self._hdocset = self._hdocset.union( - set([docid])) +# + #def remove_hdocset_docids(self, docids): + #""" + #Remove the given document IDs from the set of + #header-documents associated with this mailbox. + #""" + #with self._hdocset_property_lock: + #self._hdocset = self._hdocset.difference( + #set(docids)) +# + #def remove_hdocset_docid(self, docid): + #""" + #Remove the given document ID from the set of + #header-documents associated with this mailbox. + #""" + #with self._hdocset_property_lock: + #self._hdocset = self._hdocset.difference( + #set([docid])) +# + #def add_hdocset_docid(self, docid): + #""" + #Add the given document ID to the set of + #header-documents associated with this mailbox. + #""" + #with self._hdocset_property_lock: + #self._hdocset = self._hdocset.union( + #set([docid])) # individual doc getters, message layer. @@ -1378,18 +1359,20 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): return (u for u in sorted(uids)) - def reset_last_uid(self, param): - """ - Set the last uid to the highest uid found. - Used while expunging, passed as a callback. - """ - try: - self.last_uid = max(self.all_uid_iter()) + 1 - except ValueError: + # XXX Should be moved to memstore + #def reset_last_uid(self, param): + #""" + #Set the last uid to the highest uid found. + #Used while expunging, passed as a callback. + #""" + #try: + #self.last_uid = max(self.all_uid_iter()) + 1 + #except ValueError: # empty sequence - pass - return param + #pass + #return param + # XXX MOVE to memstore def all_flags(self): """ Return a dict with all flags documents for this mailbox. @@ -1444,7 +1427,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): :rtype: int """ - # XXX We could cache this in memstore too until next write... + # XXX We should cache this in memstore too until next write... count = self._soledad.get_count_from_index( fields.TYPE_MBOX_IDX, fields.TYPE_FLAGS_VAL, self.mbox) @@ -1491,6 +1474,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): # recent messages + # XXX take it from memstore def count_recent(self): """ Count all messages with the `Recent` flag. @@ -1503,30 +1487,6 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): """ return len(self.recent_flags) - # deleted messages - - def deleted_iter(self): - """ - Get an iterator for the message UIDs with `deleted` flag. - - :return: iterator through deleted message docs - :rtype: iterable - """ - return (doc.content[self.UID_KEY] for doc in - self._soledad.get_from_index( - fields.TYPE_MBOX_DEL_IDX, - fields.TYPE_FLAGS_VAL, self.mbox, '1')) - - def get_deleted(self): - """ - Get all messages with the `Deleted` flag. - - :returns: a generator of LeapMessages - :rtype: generator - """ - return (LeapMessage(self._soledad, docid, self.mbox) - for docid in self.deleted_iter()) - def __len__(self): """ Returns the number of messages on this mailbox. diff --git a/src/leap/mail/imap/server.py b/src/leap/mail/imap/server.py index c95a9be..3a6ac9a 100644 --- a/src/leap/mail/imap/server.py +++ b/src/leap/mail/imap/server.py @@ -199,6 +199,7 @@ class LeapIMAPServer(imap4.IMAP4Server): # XXX fake a delayed operation, to debug problem with messages getting # back to the source mailbox... + print "faking checkpoint..." import time time.sleep(2) return None diff --git a/src/leap/mail/imap/soledadstore.py b/src/leap/mail/imap/soledadstore.py index ea5b36e..60576a3 100644 --- a/src/leap/mail/imap/soledadstore.py +++ b/src/leap/mail/imap/soledadstore.py @@ -18,18 +18,22 @@ A MessageStore that writes to Soledad. """ import logging +import threading from itertools import chain +#from twisted.internet import defer from u1db import errors as u1db_errors from zope.interface import implements +from leap.common.check import leap_assert_type from leap.mail.imap.messageparts import MessagePartType from leap.mail.imap.messageparts import MessageWrapper from leap.mail.imap.messageparts import RecentFlagsDoc from leap.mail.imap.fields import fields from leap.mail.imap.interfaces import IMessageStore from leap.mail.messageflow import IMessageConsumer +from leap.mail.utils import first logger = logging.getLogger(__name__) @@ -123,6 +127,7 @@ class SoledadStore(ContentDedup): """ This will create docs in the local Soledad database. """ + _last_uid_lock = threading.Lock() implements(IMessageConsumer, IMessageStore) @@ -177,6 +182,7 @@ class SoledadStore(ContentDedup): # IMessageConsumer + #@profile def consume(self, queue): """ Creates a new document in soledad db. @@ -220,6 +226,7 @@ class SoledadStore(ContentDedup): if isinstance(doc_wrapper, MessageWrapper): # If everything went well, we can unset the new flag # in the source store (memory store) + print "unsetting new flag!" doc_wrapper.new = False doc_wrapper.dirty = False empty = queue.empty() @@ -243,13 +250,11 @@ class SoledadStore(ContentDedup): return chain((doc_wrapper,), self._get_calls_for_msg_parts(doc_wrapper)) elif isinstance(doc_wrapper, RecentFlagsDoc): - print "getting calls for rflags" return chain((doc_wrapper,), self._get_calls_for_rflags_doc(doc_wrapper)) else: print "********************" print "CANNOT PROCESS ITEM!" - print "item --------------------->", doc_wrapper return (i for i in []) def _try_call(self, call, item): @@ -275,6 +280,7 @@ class SoledadStore(ContentDedup): if msg_wrapper.new is True: call = self._soledad.create_doc + print "NEW DOC ----------------------" # item is expected to be a MessagePartDoc for item in msg_wrapper.walk(): @@ -301,30 +307,22 @@ class SoledadStore(ContentDedup): # the flags doc. elif msg_wrapper.dirty is True: - print "DIRTY DOC! ----------------------" call = self._soledad.put_doc - # item is expected to be a MessagePartDoc for item in msg_wrapper.walk(): + # XXX FIXME Give error if dirty and not doc_id !!! doc_id = item.doc_id # defend! + if not doc_id: + continue doc = self._soledad.get_doc(doc_id) - doc.content = item.content - + doc.content = dict(item.content) if item.part == MessagePartType.fdoc: - print "Will PUT the doc: ", doc - yield dict(doc), call - - # XXX also for linkage-doc - - # TODO should write back to the queue - # with the results of the operation. - # We can write there: - # (*) MsgWriteACK --> Should remove from incoming queue. - # (We should do this here). - # Implement using callbacks for each operation. + logger.debug("PUT dirty fdoc") + yield doc, call + # XXX also for linkage-doc !!! else: - logger.error("Cannot delete documents yet!") + logger.error("Cannot delete documents yet from the queue...!") def _get_calls_for_rflags_doc(self, rflags_wrapper): """ @@ -334,18 +332,91 @@ class SoledadStore(ContentDedup): rdoc = self._soledad.get_doc(rflags_wrapper.doc_id) payload = rflags_wrapper.content - print "rdoc", rdoc - print "SAVING RFLAGS TO SOLEDAD..." - import pprint; pprint.pprint(payload) + logger.debug("Saving RFLAGS to Soledad...") if payload: rdoc.content = payload - print - print "YIELDING -----", rdoc - print "AND ----------", call yield rdoc, call - else: - print ">>>>>>>>>>>>>>>>>" - print ">>>>>>>>>>>>>>>>>" - print ">>>>>>>>>>>>>>>>>" - print "No payload" + + def _get_mbox_document(self, mbox): + """ + Return mailbox document. + + :return: A SoledadDocument containing this mailbox, or None if + the query failed. + :rtype: SoledadDocument or None. + """ + try: + query = self._soledad.get_from_index( + fields.TYPE_MBOX_IDX, + fields.TYPE_MBOX_VAL, mbox) + if query: + return query.pop() + except Exception as exc: + logger.exception("Unhandled error %r" % exc) + + def get_flags_doc(self, mbox, uid): + """ + Return the SoledadDocument for the given mbox and uid. + """ + try: + flag_docs = self._soledad.get_from_index( + fields.TYPE_MBOX_UID_IDX, + fields.TYPE_FLAGS_VAL, mbox, str(uid)) + result = first(flag_docs) + except Exception as exc: + # ugh! Something's broken down there! + logger.warning("ERROR while getting flags for UID: %s" % uid) + logger.exception(exc) + finally: + return result + + def write_last_uid(self, mbox, value): + """ + Write the `last_uid` integer to the proper mailbox document + in Soledad. + This is called from the deferred triggered by + memorystore.increment_last_soledad_uid, which is expected to + run in a separate thread. + """ + leap_assert_type(value, int) + key = fields.LAST_UID_KEY + + with self._last_uid_lock: + mbox_doc = self._get_mbox_document(mbox) + old_val = mbox_doc.content[key] + if value < old_val: + logger.error("%s:%s Tried to write a UID lesser than what's " + "stored!" % (mbox, value)) + mbox_doc.content[key] = value + self._soledad.put_doc(mbox_doc) + + # deleted messages + + def deleted_iter(self, mbox): + """ + Get an iterator for the SoledadDocuments for messages + with \\Deleted flag for a given mailbox. + + :return: iterator through deleted message docs + :rtype: iterable + """ + return (doc for doc in self._soledad.get_from_index( + fields.TYPE_MBOX_DEL_IDX, + fields.TYPE_FLAGS_VAL, mbox, '1')) + + # TODO can deferToThread this? + def remove_all_deleted(self, mbox): + """ + Remove from Soledad all messages flagged as deleted for a given + mailbox. + """ + print "DELETING ALL DOCS FOR -------", mbox + deleted = [] + for doc in self.deleted_iter(mbox): + deleted.append(doc.content[fields.UID_KEY]) + print + print ">>>>>>>>>>>>>>>>>>>>" + print "deleting doc: ", doc.doc_id, doc.content + self._soledad.delete_doc(doc) + return deleted diff --git a/src/leap/mail/imap/tests/leap_tests_imap.zsh b/src/leap/mail/imap/tests/leap_tests_imap.zsh index 676d1a8..8f0df9f 100755 --- a/src/leap/mail/imap/tests/leap_tests_imap.zsh +++ b/src/leap/mail/imap/tests/leap_tests_imap.zsh @@ -61,7 +61,8 @@ IMAPTEST="imaptest" # These should be kept constant across benchmarking # runs across different machines, for comparability. -DURATION=200 +#DURATION=200 +DURATION=60 NUM_MSG=200 @@ -76,7 +77,7 @@ imaptest_cmd() { } stress_imap() { - mknod imap_pipe p + mkfifo imap_pipe cat imap_pipe | tee output & imaptest_cmd >> imap_pipe } @@ -99,7 +100,7 @@ print_results() { echo "----------------------" echo "\tavg\tstdev" $GREP "avg" ./output | sed -e 's/^ *//g' -e 's/ *$//g' | \ - awk ' + gawk ' function avg(data, count) { sum=0; for( x=0; x <= count-1; x++) { diff --git a/src/leap/mail/utils.py b/src/leap/mail/utils.py index bae2898..1f43947 100644 --- a/src/leap/mail/utils.py +++ b/src/leap/mail/utils.py @@ -42,7 +42,10 @@ def empty(thing): """ if thing is None: return True - return len(thing) == 0 + try: + return len(thing) == 0 + except ReferenceError: + return True def maybe_call(thing): -- cgit v1.2.3