From ddad3391ba8ad611a9bdaaf689b408d44eec9cc6 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Wed, 11 Dec 2013 12:11:21 -0400 Subject: consume messages eagerly --- src/leap/mail/imap/server.py | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) (limited to 'src/leap/mail/imap/server.py') diff --git a/src/leap/mail/imap/server.py b/src/leap/mail/imap/server.py index 6320a51..73ec223 100644 --- a/src/leap/mail/imap/server.py +++ b/src/leap/mail/imap/server.py @@ -834,14 +834,19 @@ class SoledadDocWriter(object): """ self._soledad = soledad - def consume(self, item): + def consume(self, queue): """ Creates a new document in soledad db. - :param item: object to update. content of the document to be inserted. - :type item: dict + :param queue: queue to get item from, with content of the document + to be inserted. + :type queue: Queue """ - self._soledad.create_doc(item) + empty = queue.empty() + while not empty: + item = queue.get() + self._soledad.create_doc(item) + empty = queue.empty() class MessageCollection(WithMsgFields, IndexedDB): @@ -911,7 +916,7 @@ class MessageCollection(WithMsgFields, IndexedDB): self._soledad_writer = MessageProducer( SoledadDocWriter(soledad), - period=0.2) + period=0.1) def _get_empty_msg(self): """ @@ -941,6 +946,7 @@ class MessageCollection(WithMsgFields, IndexedDB): :param uid: the message uid for this mailbox :type uid: int """ + logger.debug('adding message') if flags is None: flags = tuple() leap_assert_type(flags, tuple) @@ -985,6 +991,7 @@ class MessageCollection(WithMsgFields, IndexedDB): # ...should get a sanity check here. content[self.UID_KEY] = uid + logger.debug('enqueuing message for write') self._soledad_writer.put(content) # XXX have to decide what shall we do with errors with this change... #return self._soledad.create_doc(content) @@ -1518,9 +1525,9 @@ class SoledadMailbox(WithMsgFields): """ if not self.isWriteable(): raise imap4.ReadOnlyMailbox - delete = [] deleted = [] + for m in self.messages.get_all(): if self.DELETED_FLAG in m.content[self.FLAGS_KEY]: delete.append(m) -- cgit v1.2.3 From d1719ca40f6c7838fb41915706960c822f081237 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Thu, 5 Dec 2013 11:24:23 -0400 Subject: count_foo uses expanded u1db count method. Other fixes in the commit: * Correct the semantic for the recent flag (reset) * Minor unicode fixes. * Use a field for tracking the last_uid In general, this tries to squash all the quick and naive methods that were relying on evaluating all the message objects before returning a result. Further work is still needed, planned also for 0.5 release. get_by_index needs to be indexed too. --- src/leap/mail/imap/server.py | 184 +++++++++++++++++++++++++++++++------------ 1 file changed, 135 insertions(+), 49 deletions(-) (limited to 'src/leap/mail/imap/server.py') diff --git a/src/leap/mail/imap/server.py b/src/leap/mail/imap/server.py index 73ec223..b79e691 100644 --- a/src/leap/mail/imap/server.py +++ b/src/leap/mail/imap/server.py @@ -74,6 +74,7 @@ class WithMsgFields(object): CREATED_KEY = "created" SUBSCRIBED_KEY = "subscribed" RW_KEY = "rw" + LAST_UID_KEY = "lastuid" # Document Type, for indexing TYPE_KEY = "type" @@ -165,6 +166,8 @@ class SoledadBackedAccount(WithMsgFields, IndexedDB): TYPE_SUBS_IDX = 'by-type-and-subscribed' TYPE_MBOX_SEEN_IDX = 'by-type-and-mbox-and-seen' TYPE_MBOX_RECT_IDX = 'by-type-and-mbox-and-recent' + # Tomas created the `recent and seen index`, but the semantic is not too + # correct since the recent flag is volatile. TYPE_MBOX_RECT_SEEN_IDX = 'by-type-and-mbox-and-recent-and-seen' KTYPE = WithMsgFields.TYPE_KEY @@ -197,6 +200,7 @@ class SoledadBackedAccount(WithMsgFields, IndexedDB): WithMsgFields.CLOSED_KEY: False, WithMsgFields.SUBSCRIBED_KEY: False, WithMsgFields.RW_KEY: 1, + WithMsgFields.LAST_UID_KEY: 0 } def __init__(self, account_name, soledad=None): @@ -618,14 +622,14 @@ class LeapMessage(WithMsgFields): Retrieve the flags associated with this message :return: The flags, represented as strings - :rtype: iterable + :rtype: tuple """ if self._doc is None: return [] flags = self._doc.content.get(self.FLAGS_KEY, None) if flags: flags = map(str, flags) - return flags + return tuple(flags) # setFlags, addFlags, removeFlags are not in the interface spec # but we use them with store command. @@ -637,11 +641,12 @@ class LeapMessage(WithMsgFields): Returns a SoledadDocument that needs to be updated by the caller. :param flags: the flags to update in the message. - :type flags: sequence of str + :type flags: tuple of str :return: a SoledadDocument instance :rtype: SoledadDocument """ + leap_assert(isinstance(flags, tuple), "flags need to be a tuple") log.msg('setting flags') doc = self._doc doc.content[self.FLAGS_KEY] = flags @@ -656,13 +661,14 @@ class LeapMessage(WithMsgFields): Returns a SoledadDocument that needs to be updated by the caller. :param flags: the flags to add to the message. - :type flags: sequence of str + :type flags: tuple of str :return: a SoledadDocument instance :rtype: SoledadDocument """ + leap_assert(isinstance(flags, tuple), "flags need to be a tuple") oldflags = self.getFlags() - return self.setFlags(list(set(flags + oldflags))) + return self.setFlags(tuple(set(flags + oldflags))) def removeFlags(self, flags): """ @@ -671,20 +677,21 @@ class LeapMessage(WithMsgFields): Returns a SoledadDocument that needs to be updated by the caller. :param flags: the flags to be removed from the message. - :type flags: sequence of str + :type flags: tuple of str :return: a SoledadDocument instance :rtype: SoledadDocument """ + leap_assert(isinstance(flags, tuple), "flags need to be a tuple") oldflags = self.getFlags() - return self.setFlags(list(set(oldflags) - set(flags))) + return self.setFlags(tuple(set(oldflags) - set(flags))) def getInternalDate(self): """ Retrieve the date internally associated with this message - @rtype: C{str} - @retur: An RFC822-formatted date string. + :rtype: C{str} + :return: An RFC822-formatted date string. """ return str(self._doc.content.get(self.DATE_KEY, '')) @@ -710,8 +717,9 @@ class LeapMessage(WithMsgFields): :rtype: StringIO """ fd = cStringIO.StringIO() - charset = get_email_charset(self._doc.content.get(self.RAW_KEY, '')) content = self._doc.content.get(self.RAW_KEY, '') + charset = get_email_charset( + unicode(self._doc.content.get(self.RAW_KEY, ''))) try: content = content.encode(charset) except (UnicodeEncodeError, UnicodeDecodeError) as e: @@ -736,8 +744,9 @@ class LeapMessage(WithMsgFields): :rtype: StringIO """ fd = StringIO.StringIO() - charset = get_email_charset(self._doc.content.get(self.RAW_KEY, '')) content = self._doc.content.get(self.RAW_KEY, '') + charset = get_email_charset( + unicode(self._doc.content.get(self.RAW_KEY, ''))) try: content = content.encode(charset) except (UnicodeEncodeError, UnicodeDecodeError) as e: @@ -1046,6 +1055,8 @@ class MessageCollection(WithMsgFields, IndexedDB): :param index: the index of the sequence (zero-indexed) :type index: int """ + # XXX inneficient! ---- we should keep an index document + # with uid -- doc_uuid :) try: return self.get_all()[index] except IndexError: @@ -1071,15 +1082,6 @@ class MessageCollection(WithMsgFields, IndexedDB): """ return self.DELETED_FLAG in doc.content[self.FLAGS_KEY] - def get_last(self): - """ - Gets the last LeapMessage - """ - _all = self.get_all() - if not _all: - return None - return LeapMessage(_all[-1]) - def get_all(self): """ Get all message documents for the selected mailbox. @@ -1096,11 +1098,25 @@ class MessageCollection(WithMsgFields, IndexedDB): all_docs = [doc for doc in self._soledad.get_from_index( SoledadBackedAccount.TYPE_MBOX_IDX, self.TYPE_MESSAGE_VAL, self.mbox)] - #if not self.is_deleted(doc)] # highly inneficient, but first let's grok it and then # let's worry about efficiency. + + # XXX FIXINDEX return sorted(all_docs, key=lambda item: item.content['uid']) + def count(self): + """ + Return the count of messages for this mailbox. + + :rtype: int + """ + count = self._soledad.get_count_from_index( + SoledadBackedAccount.TYPE_MBOX_IDX, + self.TYPE_MESSAGE_VAL, self.mbox) + return count + + # unseen messages + def unseen_iter(self): """ Get an iterator for the message docs with no `seen` flag @@ -1110,8 +1126,20 @@ class MessageCollection(WithMsgFields, IndexedDB): """ return (doc for doc in self._soledad.get_from_index( - SoledadBackedAccount.TYPE_MBOX_RECT_SEEN_IDX, - self.TYPE_MESSAGE_VAL, self.mbox, '1', '0')) + SoledadBackedAccount.TYPE_MBOX_SEEN_IDX, + self.TYPE_MESSAGE_VAL, self.mbox, '0')) + + def count_unseen(self): + """ + Count all messages with the `Unseen` flag. + + :returns: count + :rtype: int + """ + count = self._soledad.get_count_from_index( + SoledadBackedAccount.TYPE_MBOX_SEEN_IDX, + self.TYPE_MESSAGE_VAL, self.mbox, '0') + return count def get_unseen(self): """ @@ -1122,6 +1150,8 @@ class MessageCollection(WithMsgFields, IndexedDB): """ return [LeapMessage(doc) for doc in self.unseen_iter()] + # recent messages + def recent_iter(self): """ Get an iterator for the message docs with `recent` flag. @@ -1143,13 +1173,17 @@ class MessageCollection(WithMsgFields, IndexedDB): """ return [LeapMessage(doc) for doc in self.recent_iter()] - def count(self): + def count_recent(self): """ - Return the count of messages for this mailbox. + Count all messages with the `Recent` flag. + :returns: count :rtype: int """ - return len(self.get_all()) + count = self._soledad.get_count_from_index( + SoledadBackedAccount.TYPE_MBOX_RECT_IDX, + self.TYPE_MESSAGE_VAL, self.mbox, '1') + return count def __len__(self): """ @@ -1179,8 +1213,7 @@ class MessageCollection(WithMsgFields, IndexedDB): :return: LeapMessage or None if not found. :rtype: LeapMessage """ - #try: - #return self.get_msg_by_uid(uid) + # XXX FIXME inneficcient, we are evaulating. try: return [doc for doc in self.get_all()][uid - 1] @@ -1252,7 +1285,7 @@ class SoledadMailbox(WithMsgFields): self._soledad = soledad self.messages = MessageCollection( - mbox=mbox, soledad=soledad) + mbox=mbox, soledad=self._soledad) if not self.getFlags(): self.setFlags(self.INIT_FLAGS) @@ -1367,6 +1400,32 @@ class SoledadMailbox(WithMsgFields): closed = property( _get_closed, _set_closed, doc="Closed attribute.") + def _get_last_uid(self): + """ + Return the last uid for this mailbox. + + :return: the last uid for messages in this mailbox + :rtype: bool + """ + mbox = self._get_mbox() + return mbox.content.get(self.LAST_UID_KEY, 1) + + def _set_last_uid(self, uid): + """ + Sets the last uid for this mailbox. + + :param uid: the uid to be set + :type uid: int + """ + leap_assert(isinstance(uid, int), "uid has to be int") + mbox = self._get_mbox() + key = self.LAST_UID_KEY + mbox.content[key] = uid + self._soledad.put_doc(mbox) + + last_uid = property( + _get_last_uid, _set_last_uid, doc="Last_UID attribute.") + def getUIDValidity(self): """ Return the unique validity identifier for this mailbox. @@ -1396,17 +1455,18 @@ class SoledadMailbox(WithMsgFields): def getUIDNext(self): """ Return the likely UID for the next message added to this - mailbox. Currently it returns the current length incremented - by one. + mailbox. Currently it returns the higher UID incremented by + one. + + We increment the next uid *each* time this function gets called. + In this way, there will be gaps if the message with the allocated + uid cannot be saved. But that is preferable to having race conditions + if we get to parallel message adding. :rtype: int """ - last = self.messages.get_last() - if last: - nextuid = last.getUID() + 1 - else: - nextuid = 1 - return nextuid + self.last_uid += 1 + return self.last_uid def getMessageCount(self): """ @@ -1423,7 +1483,7 @@ class SoledadMailbox(WithMsgFields): :return: count of messages flagged `unseen` :rtype: int """ - return len(self.messages.get_unseen()) + return self.messages.count_unseen() def getRecentCount(self): """ @@ -1432,7 +1492,7 @@ class SoledadMailbox(WithMsgFields): :return: count of messages flagged `recent` :rtype: int """ - return len(self.messages.get_recent()) + return self.messages.count_recent() def isWriteable(self): """ @@ -1489,6 +1549,7 @@ class SoledadMailbox(WithMsgFields): """ # XXX we should treat the message as an IMessage from here uid_next = self.getUIDNext() + logger.debug('Adding msg with UID :%s' % uid_next) if flags is None: flags = tuple() else: @@ -1497,8 +1558,11 @@ class SoledadMailbox(WithMsgFields): self.messages.add_msg(message, flags=flags, date=date, uid=uid_next) - exists = len(self.messages) - recent = len(self.messages.get_recent()) + exists = self.getMessageCount() + recent = self.getRecentCount() + logger.debug("there are %s messages, %s recent" % ( + exists, + recent)) for listener in self.listeners: listener.newMessages(exists, recent) return defer.succeed(None) @@ -1564,12 +1628,7 @@ class SoledadMailbox(WithMsgFields): iter(messages) except TypeError: # looks like we cannot iterate - last = self.messages.get_last() - if last is None: - uid_last = 1 - else: - uid_last = last.getUID() - messages.last = uid_last + messages.last = self.last_uid # for sequence numbers (uid = 0) if sequence: @@ -1588,14 +1647,37 @@ class SoledadMailbox(WithMsgFields): else: print "fetch %s, no msg found!!!" % msg_id + if self.isWriteable(): + self._unset_recent_flag() return tuple(result) + def _unset_recent_flag(self): + """ + Unsets `Recent` flag from a tuple of messages. + Called from fetch. + + From RFC, about `Recent`: + + Message is "recently" arrived in this mailbox. This session + is the first session to have been notified about this + message; if the session is read-write, subsequent sessions + will not see \Recent set for this message. This flag can not + be altered by the client. + + If it is not possible to determine whether or not this + session is the first session to be notified about a message, + then that message SHOULD be considered recent. + """ + for msg in (LeapMessage(doc) for doc in self.messages.recent_iter()): + newflags = msg.removeFlags((WithMsgFields.RECENT_FLAG,)) + self._update(newflags) + def _signal_unread_to_ui(self): """ Sends unread event to ui. """ - leap_events.signal( - IMAP_UNREAD_MAIL, str(self.getUnseenCount())) + unseen = self.getUnseenCount() + leap_events.signal(IMAP_UNREAD_MAIL, str(unseen)) def store(self, messages, flags, mode, uid): """ @@ -1627,6 +1709,10 @@ class SoledadMailbox(WithMsgFields): read-write. """ # XXX implement also sequence (uid = 0) + # XXX we should prevent cclient from setting Recent flag. + leap_assert(not isinstance(flags, basestring), + "flags cannot be a string") + flags = tuple(flags) if not self.isWriteable(): log.msg('read only mailbox!') -- cgit v1.2.3 From 0de2307c11338bf4c5e36dd9fe76f445b700c288 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Thu, 19 Dec 2013 13:30:10 -0400 Subject: deferToThread unsetting recent flag --- src/leap/mail/imap/server.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) (limited to 'src/leap/mail/imap/server.py') diff --git a/src/leap/mail/imap/server.py b/src/leap/mail/imap/server.py index b79e691..c79cf85 100644 --- a/src/leap/mail/imap/server.py +++ b/src/leap/mail/imap/server.py @@ -31,8 +31,10 @@ from zope.proxy import sameProxiedObjects from twisted.mail import imap4 from twisted.internet import defer +from twisted.internet.threads import deferToThread from twisted.python import log + from leap.common import events as leap_events from leap.common.events.events_pb2 import IMAP_UNREAD_MAIL from leap.common.check import leap_assert, leap_assert_type @@ -1648,7 +1650,8 @@ class SoledadMailbox(WithMsgFields): print "fetch %s, no msg found!!!" % msg_id if self.isWriteable(): - self._unset_recent_flag() + deferToThread(self._unset_recent_flag) + return tuple(result) def _unset_recent_flag(self): @@ -1668,6 +1671,7 @@ class SoledadMailbox(WithMsgFields): session is the first session to be notified about a message, then that message SHOULD be considered recent. """ + log.msg('unsetting recent flags...') for msg in (LeapMessage(doc) for doc in self.messages.recent_iter()): newflags = msg.removeFlags((WithMsgFields.RECENT_FLAG,)) self._update(newflags) -- cgit v1.2.3 From 9460ea1bb8fd7e536aa3dcf3ed746e3765c96fa1 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Fri, 20 Dec 2013 13:41:34 -0400 Subject: use soledad_writer for puts also --- src/leap/mail/imap/server.py | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) (limited to 'src/leap/mail/imap/server.py') diff --git a/src/leap/mail/imap/server.py b/src/leap/mail/imap/server.py index c79cf85..f77bf2c 100644 --- a/src/leap/mail/imap/server.py +++ b/src/leap/mail/imap/server.py @@ -755,7 +755,7 @@ class LeapMessage(WithMsgFields): logger.error("Unicode error {0}".format(e)) content = content.encode(charset, 'replace') fd.write(content) - # SHOULD use a separate BODY FIELD ... + # XXX SHOULD use a separate BODY FIELD ... fd.seek(0) return fd @@ -856,7 +856,12 @@ class SoledadDocWriter(object): empty = queue.empty() while not empty: item = queue.get() - self._soledad.create_doc(item) + payload = item['payload'] + mode = item['mode'] + if mode == "create": + self._soledad.create_doc(payload) + elif mode == "put": + self._soledad.put_doc(payload) empty = queue.empty() @@ -925,7 +930,7 @@ class MessageCollection(WithMsgFields, IndexedDB): # to be processed serially by the consumer (the writer). We just # need to `put` the new material on its plate. - self._soledad_writer = MessageProducer( + self.soledad_writer = MessageProducer( SoledadDocWriter(soledad), period=0.1) @@ -1003,7 +1008,10 @@ class MessageCollection(WithMsgFields, IndexedDB): content[self.UID_KEY] = uid logger.debug('enqueuing message for write') - self._soledad_writer.put(content) + + # XXX create namedtuple + self.soledad_writer.put({"mode": "create", + "payload": content}) # XXX have to decide what shall we do with errors with this change... #return self._soledad.create_doc(content) @@ -1650,7 +1658,7 @@ class SoledadMailbox(WithMsgFields): print "fetch %s, no msg found!!!" % msg_id if self.isWriteable(): - deferToThread(self._unset_recent_flag) + self._unset_recent_flag() return tuple(result) @@ -1761,8 +1769,9 @@ class SoledadMailbox(WithMsgFields): """ Updates document in u1db database """ - #log.msg('updating doc... %s ' % doc) - self._soledad.put_doc(doc) + # XXX create namedtuple + self.messages.soledad_writer.put({"mode": "put", + "payload": doc}) def __repr__(self): """ -- cgit v1.2.3 From ee1fa7da3bdc2de2bd12c55a4da9ccc291d3e82c Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Fri, 20 Dec 2013 16:36:21 -0400 Subject: safety catch against wrong last_uid --- src/leap/mail/imap/server.py | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) (limited to 'src/leap/mail/imap/server.py') diff --git a/src/leap/mail/imap/server.py b/src/leap/mail/imap/server.py index f77bf2c..d92ab9d 100644 --- a/src/leap/mail/imap/server.py +++ b/src/leap/mail/imap/server.py @@ -1430,7 +1430,22 @@ class SoledadMailbox(WithMsgFields): leap_assert(isinstance(uid, int), "uid has to be int") mbox = self._get_mbox() key = self.LAST_UID_KEY - mbox.content[key] = uid + + count = mbox.getMessageCount() + + # XXX safety-catch. If we do get duplicates, + # we want to avoid further duplication. + + if uid >= count: + value = uid + else: + # something is wrong, + # just set the last uid + # beyond the max msg count. + logger.debug("WRONG uid < count. Setting last uid to ", count) + value = count + + mbox.content[key] = value self._soledad.put_doc(mbox) last_uid = property( -- cgit v1.2.3 From aaedde4a6a04a77e06a332e1ab917afd60f72205 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Fri, 20 Dec 2013 17:06:57 -0400 Subject: fix wrong object call --- src/leap/mail/imap/server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/leap/mail/imap/server.py') diff --git a/src/leap/mail/imap/server.py b/src/leap/mail/imap/server.py index d92ab9d..5672e25 100644 --- a/src/leap/mail/imap/server.py +++ b/src/leap/mail/imap/server.py @@ -1431,7 +1431,7 @@ class SoledadMailbox(WithMsgFields): mbox = self._get_mbox() key = self.LAST_UID_KEY - count = mbox.getMessageCount() + count = self.getMessageCount() # XXX safety-catch. If we do get duplicates, # we want to avoid further duplication. -- cgit v1.2.3 From 8d4e17a279218de99b495955e96672587cb237e7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Touceda?= Date: Fri, 20 Dec 2013 19:20:50 -0300 Subject: Limit the size of the returned messages from IMAP to MUA to 100 --- src/leap/mail/imap/server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/leap/mail/imap/server.py') diff --git a/src/leap/mail/imap/server.py b/src/leap/mail/imap/server.py index 5672e25..2739f8c 100644 --- a/src/leap/mail/imap/server.py +++ b/src/leap/mail/imap/server.py @@ -1675,7 +1675,7 @@ class SoledadMailbox(WithMsgFields): if self.isWriteable(): self._unset_recent_flag() - return tuple(result) + return tuple(result[:100]) def _unset_recent_flag(self): """ -- cgit v1.2.3