diff options
author | Kali Kaneko <kali@leap.se> | 2015-01-28 02:03:12 -0400 |
---|---|---|
committer | Kali Kaneko <kali@leap.se> | 2015-01-28 02:04:19 -0400 |
commit | b5cecfd9ad09e5e82eea02994726eeaca0866155 (patch) | |
tree | 01cc37a6752f82722874b7a8fcefcbbf437ff68f | |
parent | 8f8f4bc1efe9b20c912969a9b18073a91612662e (diff) |
append/copy performance improvement
-rw-r--r-- | src/leap/mail/adaptors/soledad.py | 63 | ||||
-rw-r--r-- | src/leap/mail/imap/mailbox.py | 55 | ||||
-rw-r--r-- | src/leap/mail/mail.py | 26 |
3 files changed, 86 insertions, 58 deletions
diff --git a/src/leap/mail/adaptors/soledad.py b/src/leap/mail/adaptors/soledad.py index 470562d..490e014 100644 --- a/src/leap/mail/adaptors/soledad.py +++ b/src/leap/mail/adaptors/soledad.py @@ -136,6 +136,7 @@ class SoledadDocumentWrapper(models.DocumentWrapper): d = store.create_doc(self.serialize(), doc_id=self.future_doc_id) d.addCallback(update_doc_id) + d.addErrback(self._catch_revision_conflict, self.future_doc_id) return d def update(self, store): @@ -447,7 +448,7 @@ class MessageWrapper(object): implements(IMessageWrapper) - def __init__(self, mdoc, fdoc, hdoc, cdocs=None): + def __init__(self, mdoc, fdoc, hdoc, cdocs=None, is_copy=False): """ Need at least a metamsg-document, a flag-document and a header-document to instantiate a MessageWrapper. Content-documents can be retrieved @@ -456,7 +457,11 @@ class MessageWrapper(object): cdocs, if any, should be a dictionary in which the keys are ascending integers, beginning at one, and the values are dictionaries with the content of the content-docs. + + is_copy, if set to True, will only attempt to create mdoc and fdoc + (because hdoc and cdocs are supposed to exist already) """ + self._is_copy = is_copy def get_doc_wrapper(doc, cls): if isinstance(doc, SoledadDocument): @@ -486,9 +491,33 @@ class MessageWrapper(object): for doc_id, cdoc in zip(self.mdoc.cdocs, self.cdocs.values()): cdoc.set_future_doc_id(doc_id) - def create(self, store): + def create(self, store, notify_just_mdoc=False): """ Create all the parts for this message in the store. + + :param store: an instance of Soledad + + :param notify_just_mdoc: + if set to True, this method will return *only* the deferred + corresponding to the creation of the meta-message document. + Be warned that in that case there will be no record of failures + when creating the other part-documents. + + Other-wise, this method will return a deferred that will wait for + the creation of all the part documents. + + Setting this flag to True is mostly a convenient workaround for the + fact that massive serial appends will take too much time, and in + most of the cases the MUA will only switch to the mailbox where the + appends have happened after a certain time, which in most of the + times will be enough to have all the queued insert operations + finished. + :type notify_just_mdoc: bool + + :return: a deferred whose callback will be called when either all the + part documents have been written, or just the metamsg-doc, + depending on the value of the notify_just_mdoc flag + :rtype: defer.Deferred """ leap_assert(self.cdocs, "Need non empty cdocs to create the " @@ -500,17 +529,24 @@ class MessageWrapper(object): # TODO check that the doc_ids in the mdoc are coherent d = [] - d.append(self.mdoc.create(store)) + mdoc_created = self.mdoc.create(store) + d.append(mdoc_created) d.append(self.fdoc.create(store)) - if self.hdoc.doc_id is None: - d.append(self.hdoc.create(store)) - for cdoc in self.cdocs.values(): - if cdoc.doc_id is not None: - # we could be just linking to an existing - # content-doc. - continue - d.append(cdoc.create(store)) - return defer.gatherResults(d) + + if not self._is_copy: + if self.hdoc.doc_id is None: + d.append(self.hdoc.create(store)) + for cdoc in self.cdocs.values(): + if cdoc.doc_id is not None: + # we could be just linking to an existing + # content-doc. + continue + d.append(cdoc.create(store)) + + if notify_just_mdoc: + return mdoc_created + else: + return defer.gatherResults(d) def update(self, store): """ @@ -544,7 +580,8 @@ class MessageWrapper(object): # the future doc_ids is properly set because we modified # the pointers in mdoc, which has precedence. - new_wrapper = MessageWrapper(new_mdoc, new_fdoc, None, None) + new_wrapper = MessageWrapper(new_mdoc, new_fdoc, None, None, + is_copy=True) new_wrapper.hdoc = self.hdoc new_wrapper.cdocs = self.cdocs new_wrapper.set_mbox_uuid(new_mbox_uuid) diff --git a/src/leap/mail/imap/mailbox.py b/src/leap/mail/imap/mailbox.py index 1bc530e..9ec6ea8 100644 --- a/src/leap/mail/imap/mailbox.py +++ b/src/leap/mail/imap/mailbox.py @@ -158,7 +158,8 @@ class IMAPMailbox(object): if not NOTIFY_NEW: return - logger.debug('adding mailbox listener: %s' % listener) + logger.debug('adding mailbox listener: %s. Total: %s' % ( + listener, len(self.listeners))) self.listeners.add(listener) def removeListener(self, listener): @@ -196,29 +197,6 @@ class IMAPMailbox(object): "flags expected to be a tuple") return self.collection.set_mbox_attr("flags", flags) - # TODO - not used? - @property - def is_closed(self): - """ - Return the closed attribute for this mailbox. - - :return: True if the mailbox is closed - :rtype: bool - """ - return self.collection.get_mbox_attr("closed") - - # TODO - not used? - def set_closed(self, closed): - """ - Set the closed attribute for this mailbox. - - :param closed: the state to be set - :type closed: bool - - :rtype: Deferred - """ - return self.collection.set_mbox_attr("closed", closed) - def getUIDValidity(self): """ Return the unique validity identifier for this mailbox. @@ -345,8 +323,10 @@ class IMAPMailbox(object): :param date: timestamp :type date: str - :return: a deferred that evals to None + :return: a deferred that will be triggered with the UID of the added + message. """ + # TODO should raise ReadOnlyMailbox if not rw. # TODO have a look at the cases for internal date in the rfc if isinstance(message, (cStringIO.OutputType, StringIO.StringIO)): message = message.getvalue() @@ -362,20 +342,23 @@ class IMAPMailbox(object): if date is None: date = formatdate(time.time()) + # A better place for this would be the COPY/APPEND dispatcher # if PROFILE_CMD: # do_profile_cmd(d, "APPEND") - # XXX should review now that we're not using qtreactor. - # A better place for this would be the COPY/APPEND dispatcher - # in server.py, but qtreactor hangs when I do that, so this seems - # to work fine for now. + # just_mdoc=True: feels HACKY, but improves a *lot* the responsiveness + # of the APPENDS: we just need to be notified when the mdoc + # is saved, and let's hope that the other parts are doing just fine. + # This will not catch any errors when the inserts of the other parts + # fail, but on the other hand allows us to return very quickly, which + # seems a good compromise given that we have to serialize the appends. + # A better solution will probably involve implementing MULTIAPPEND + # or patching imap server to support pipelining. - def notifyCallback(x): - reactor.callLater(0, self.notify_new) - return x + d = self.collection.add_msg(message, flags=flags, date=date, + notify_just_mdoc=True) - d = self.collection.add_msg(message, flags=flags, date=date) - d.addCallback(notifyCallback) + # XXX signal to UI? --- should do it only if INBOX... d.addErrback(lambda f: log.msg(f.getTraceback())) return d @@ -486,9 +469,9 @@ class IMAPMailbox(object): """ # TODO we could pass the asked sequence to the indexer # all_uid_iter, and bound the sql query instead. - def filter_by_asked(sequence): + def filter_by_asked(all_msg_uid): set_asked = set(messages_asked) - set_exist = set(sequence) + set_exist = set(all_msg_uid) return set_asked.intersection(set_exist) d = self.collection.all_uid_iter() diff --git a/src/leap/mail/mail.py b/src/leap/mail/mail.py index b46d223..d74f6b8 100644 --- a/src/leap/mail/mail.py +++ b/src/leap/mail/mail.py @@ -478,10 +478,24 @@ class MessageCollection(object): # Manipulate messages - def add_msg(self, raw_msg, flags=tuple(), tags=tuple(), date=""): + def add_msg(self, raw_msg, flags=tuple(), tags=tuple(), date="", + notify_just_mdoc=False): """ Add a message to this collection. + + :param notify_just_mdoc: + boolean passed to the wrapper.create method, + to indicate whether we're interested in being notified when only + the mdoc has been written (faster, but potentially unsafe), or we + want to wait untill all the parts have been written. + Used by the imap mailbox implementation to get faster responses. + :type notify_just_mdoc: bool + + :returns: a deferred that will fire with the UID of the inserted + message. + :rtype: deferred """ + # XXX mdoc ref is a leaky abstraction here. generalize. leap_assert_type(flags, tuple) leap_assert_type(date, str) @@ -503,10 +517,9 @@ class MessageCollection(object): return self.mbox_indexer.insert_doc( self.mbox_uuid, doc_id) - d = wrapper.create(self.store) + d = wrapper.create(self.store, notify_just_mdoc=notify_just_mdoc) d.addCallback(insert_mdoc_id, wrapper) d.addErrback(lambda f: f.printTraceback()) - return d def copy_msg(self, msg, new_mbox_uuid): @@ -519,17 +532,12 @@ class MessageCollection(object): def insert_copied_mdoc_id(wrapper_new_msg): return self.mbox_indexer.insert_doc( - new_mbox_uuid, wrapper.mdoc.doc_id) + new_mbox_uuid, wrapper_new_msg.mdoc.doc_id) wrapper = msg.get_wrapper() - def print_result(result): - print "COPY CALLBACK:>>>", result - return result - d = wrapper.copy(self.store, new_mbox_uuid) d.addCallback(insert_copied_mdoc_id) - d.addCallback(print_result) return d def delete_msg(self, msg): |