summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorKali Kaneko <kali@leap.se>2015-01-28 02:03:12 -0400
committerKali Kaneko <kali@leap.se>2015-02-11 14:05:44 -0400
commit9d2cf05fa575aa5a01e255837738f3aa2740b19e (patch)
tree01cc37a6752f82722874b7a8fcefcbbf437ff68f /src
parentfdbc6a7a448ca5f329bb452dc4517bd6be8804b4 (diff)
append/copy performance improvement
Diffstat (limited to 'src')
-rw-r--r--src/leap/mail/adaptors/soledad.py63
-rw-r--r--src/leap/mail/imap/mailbox.py55
-rw-r--r--src/leap/mail/mail.py26
3 files changed, 86 insertions, 58 deletions
diff --git a/src/leap/mail/adaptors/soledad.py b/src/leap/mail/adaptors/soledad.py
index 470562d..490e014 100644
--- a/src/leap/mail/adaptors/soledad.py
+++ b/src/leap/mail/adaptors/soledad.py
@@ -136,6 +136,7 @@ class SoledadDocumentWrapper(models.DocumentWrapper):
d = store.create_doc(self.serialize(),
doc_id=self.future_doc_id)
d.addCallback(update_doc_id)
+ d.addErrback(self._catch_revision_conflict, self.future_doc_id)
return d
def update(self, store):
@@ -447,7 +448,7 @@ class MessageWrapper(object):
implements(IMessageWrapper)
- def __init__(self, mdoc, fdoc, hdoc, cdocs=None):
+ def __init__(self, mdoc, fdoc, hdoc, cdocs=None, is_copy=False):
"""
Need at least a metamsg-document, a flag-document and a header-document
to instantiate a MessageWrapper. Content-documents can be retrieved
@@ -456,7 +457,11 @@ class MessageWrapper(object):
cdocs, if any, should be a dictionary in which the keys are ascending
integers, beginning at one, and the values are dictionaries with the
content of the content-docs.
+
+ is_copy, if set to True, will only attempt to create mdoc and fdoc
+ (because hdoc and cdocs are supposed to exist already)
"""
+ self._is_copy = is_copy
def get_doc_wrapper(doc, cls):
if isinstance(doc, SoledadDocument):
@@ -486,9 +491,33 @@ class MessageWrapper(object):
for doc_id, cdoc in zip(self.mdoc.cdocs, self.cdocs.values()):
cdoc.set_future_doc_id(doc_id)
- def create(self, store):
+ def create(self, store, notify_just_mdoc=False):
"""
Create all the parts for this message in the store.
+
+ :param store: an instance of Soledad
+
+ :param notify_just_mdoc:
+ if set to True, this method will return *only* the deferred
+ corresponding to the creation of the meta-message document.
+ Be warned that in that case there will be no record of failures
+ when creating the other part-documents.
+
+ Other-wise, this method will return a deferred that will wait for
+ the creation of all the part documents.
+
+ Setting this flag to True is mostly a convenient workaround for the
+ fact that massive serial appends will take too much time, and in
+ most of the cases the MUA will only switch to the mailbox where the
+ appends have happened after a certain time, which in most of the
+ times will be enough to have all the queued insert operations
+ finished.
+ :type notify_just_mdoc: bool
+
+ :return: a deferred whose callback will be called when either all the
+ part documents have been written, or just the metamsg-doc,
+ depending on the value of the notify_just_mdoc flag
+ :rtype: defer.Deferred
"""
leap_assert(self.cdocs,
"Need non empty cdocs to create the "
@@ -500,17 +529,24 @@ class MessageWrapper(object):
# TODO check that the doc_ids in the mdoc are coherent
d = []
- d.append(self.mdoc.create(store))
+ mdoc_created = self.mdoc.create(store)
+ d.append(mdoc_created)
d.append(self.fdoc.create(store))
- if self.hdoc.doc_id is None:
- d.append(self.hdoc.create(store))
- for cdoc in self.cdocs.values():
- if cdoc.doc_id is not None:
- # we could be just linking to an existing
- # content-doc.
- continue
- d.append(cdoc.create(store))
- return defer.gatherResults(d)
+
+ if not self._is_copy:
+ if self.hdoc.doc_id is None:
+ d.append(self.hdoc.create(store))
+ for cdoc in self.cdocs.values():
+ if cdoc.doc_id is not None:
+ # we could be just linking to an existing
+ # content-doc.
+ continue
+ d.append(cdoc.create(store))
+
+ if notify_just_mdoc:
+ return mdoc_created
+ else:
+ return defer.gatherResults(d)
def update(self, store):
"""
@@ -544,7 +580,8 @@ class MessageWrapper(object):
# the future doc_ids is properly set because we modified
# the pointers in mdoc, which has precedence.
- new_wrapper = MessageWrapper(new_mdoc, new_fdoc, None, None)
+ new_wrapper = MessageWrapper(new_mdoc, new_fdoc, None, None,
+ is_copy=True)
new_wrapper.hdoc = self.hdoc
new_wrapper.cdocs = self.cdocs
new_wrapper.set_mbox_uuid(new_mbox_uuid)
diff --git a/src/leap/mail/imap/mailbox.py b/src/leap/mail/imap/mailbox.py
index 1bc530e..9ec6ea8 100644
--- a/src/leap/mail/imap/mailbox.py
+++ b/src/leap/mail/imap/mailbox.py
@@ -158,7 +158,8 @@ class IMAPMailbox(object):
if not NOTIFY_NEW:
return
- logger.debug('adding mailbox listener: %s' % listener)
+ logger.debug('adding mailbox listener: %s. Total: %s' % (
+ listener, len(self.listeners)))
self.listeners.add(listener)
def removeListener(self, listener):
@@ -196,29 +197,6 @@ class IMAPMailbox(object):
"flags expected to be a tuple")
return self.collection.set_mbox_attr("flags", flags)
- # TODO - not used?
- @property
- def is_closed(self):
- """
- Return the closed attribute for this mailbox.
-
- :return: True if the mailbox is closed
- :rtype: bool
- """
- return self.collection.get_mbox_attr("closed")
-
- # TODO - not used?
- def set_closed(self, closed):
- """
- Set the closed attribute for this mailbox.
-
- :param closed: the state to be set
- :type closed: bool
-
- :rtype: Deferred
- """
- return self.collection.set_mbox_attr("closed", closed)
-
def getUIDValidity(self):
"""
Return the unique validity identifier for this mailbox.
@@ -345,8 +323,10 @@ class IMAPMailbox(object):
:param date: timestamp
:type date: str
- :return: a deferred that evals to None
+ :return: a deferred that will be triggered with the UID of the added
+ message.
"""
+ # TODO should raise ReadOnlyMailbox if not rw.
# TODO have a look at the cases for internal date in the rfc
if isinstance(message, (cStringIO.OutputType, StringIO.StringIO)):
message = message.getvalue()
@@ -362,20 +342,23 @@ class IMAPMailbox(object):
if date is None:
date = formatdate(time.time())
+ # A better place for this would be the COPY/APPEND dispatcher
# if PROFILE_CMD:
# do_profile_cmd(d, "APPEND")
- # XXX should review now that we're not using qtreactor.
- # A better place for this would be the COPY/APPEND dispatcher
- # in server.py, but qtreactor hangs when I do that, so this seems
- # to work fine for now.
+ # just_mdoc=True: feels HACKY, but improves a *lot* the responsiveness
+ # of the APPENDS: we just need to be notified when the mdoc
+ # is saved, and let's hope that the other parts are doing just fine.
+ # This will not catch any errors when the inserts of the other parts
+ # fail, but on the other hand allows us to return very quickly, which
+ # seems a good compromise given that we have to serialize the appends.
+ # A better solution will probably involve implementing MULTIAPPEND
+ # or patching imap server to support pipelining.
- def notifyCallback(x):
- reactor.callLater(0, self.notify_new)
- return x
+ d = self.collection.add_msg(message, flags=flags, date=date,
+ notify_just_mdoc=True)
- d = self.collection.add_msg(message, flags=flags, date=date)
- d.addCallback(notifyCallback)
+ # XXX signal to UI? --- should do it only if INBOX...
d.addErrback(lambda f: log.msg(f.getTraceback()))
return d
@@ -486,9 +469,9 @@ class IMAPMailbox(object):
"""
# TODO we could pass the asked sequence to the indexer
# all_uid_iter, and bound the sql query instead.
- def filter_by_asked(sequence):
+ def filter_by_asked(all_msg_uid):
set_asked = set(messages_asked)
- set_exist = set(sequence)
+ set_exist = set(all_msg_uid)
return set_asked.intersection(set_exist)
d = self.collection.all_uid_iter()
diff --git a/src/leap/mail/mail.py b/src/leap/mail/mail.py
index b46d223..d74f6b8 100644
--- a/src/leap/mail/mail.py
+++ b/src/leap/mail/mail.py
@@ -478,10 +478,24 @@ class MessageCollection(object):
# Manipulate messages
- def add_msg(self, raw_msg, flags=tuple(), tags=tuple(), date=""):
+ def add_msg(self, raw_msg, flags=tuple(), tags=tuple(), date="",
+ notify_just_mdoc=False):
"""
Add a message to this collection.
+
+ :param notify_just_mdoc:
+ boolean passed to the wrapper.create method,
+ to indicate whether we're interested in being notified when only
+ the mdoc has been written (faster, but potentially unsafe), or we
+ want to wait untill all the parts have been written.
+ Used by the imap mailbox implementation to get faster responses.
+ :type notify_just_mdoc: bool
+
+ :returns: a deferred that will fire with the UID of the inserted
+ message.
+ :rtype: deferred
"""
+ # XXX mdoc ref is a leaky abstraction here. generalize.
leap_assert_type(flags, tuple)
leap_assert_type(date, str)
@@ -503,10 +517,9 @@ class MessageCollection(object):
return self.mbox_indexer.insert_doc(
self.mbox_uuid, doc_id)
- d = wrapper.create(self.store)
+ d = wrapper.create(self.store, notify_just_mdoc=notify_just_mdoc)
d.addCallback(insert_mdoc_id, wrapper)
d.addErrback(lambda f: f.printTraceback())
-
return d
def copy_msg(self, msg, new_mbox_uuid):
@@ -519,17 +532,12 @@ class MessageCollection(object):
def insert_copied_mdoc_id(wrapper_new_msg):
return self.mbox_indexer.insert_doc(
- new_mbox_uuid, wrapper.mdoc.doc_id)
+ new_mbox_uuid, wrapper_new_msg.mdoc.doc_id)
wrapper = msg.get_wrapper()
- def print_result(result):
- print "COPY CALLBACK:>>>", result
- return result
-
d = wrapper.copy(self.store, new_mbox_uuid)
d.addCallback(insert_copied_mdoc_id)
- d.addCallback(print_result)
return d
def delete_msg(self, msg):