summaryrefslogtreecommitdiff
path: root/mail
diff options
context:
space:
mode:
authorKali Kaneko <kali@leap.se>2014-01-29 00:54:20 -0400
committerKali Kaneko <kali@leap.se>2014-01-30 14:20:56 -0400
commitac43a4fff07950fa16a2e8f6c4948bc78f1af3c5 (patch)
treec29ceac5b160a840d5d68417ff4c4016acf3ce71 /mail
parentc6d010a151c1e1d1789e6b1227d54f7254d562a2 (diff)
Fix copy and deletion problems
* reorganize and simplify STORE command processing * add the notification after the processing of the whole sequence
Diffstat (limited to 'mail')
-rw-r--r--mail/src/leap/mail/imap/mailbox.py24
-rw-r--r--mail/src/leap/mail/imap/memorystore.py20
-rw-r--r--mail/src/leap/mail/imap/messages.py156
-rw-r--r--mail/src/leap/mail/imap/server.py26
-rw-r--r--mail/src/leap/mail/imap/soledadstore.py5
5 files changed, 118 insertions, 113 deletions
diff --git a/mail/src/leap/mail/imap/mailbox.py b/mail/src/leap/mail/imap/mailbox.py
index a0eb0a97..3a6937f8 100644
--- a/mail/src/leap/mail/imap/mailbox.py
+++ b/mail/src/leap/mail/imap/mailbox.py
@@ -654,7 +654,6 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
unseen = self.getUnseenCount()
leap_events.signal(IMAP_UNREAD_MAIL, str(unseen))
- @deferred
def store(self, messages_asked, flags, mode, uid):
"""
Sets the flags of one or more messages.
@@ -697,28 +696,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
log.msg('read only mailbox!')
raise imap4.ReadOnlyMailbox
- result = {}
- for msg_id in seq_messg:
- log.msg("MSG ID = %s" % msg_id)
- msg = self.messages.get_msg_by_uid(msg_id)
- if not msg:
- continue
- # We duplicate the set operations here
- # to return the result because it's less costly than
- # retrieving the flags again.
- newflags = set(msg.getFlags())
-
- if mode == 1:
- msg.addFlags(flags)
- newflags = newflags.union(set(flags))
- elif mode == -1:
- msg.removeFlags(flags)
- newflags.difference_update(flags)
- elif mode == 0:
- msg.setFlags(flags)
- newflags = set(flags)
- result[msg_id] = newflags
- return result
+ return self.messages.set_flags(self.mbox, seq_messg, flags, mode)
# ISearchableMailbox
diff --git a/mail/src/leap/mail/imap/memorystore.py b/mail/src/leap/mail/imap/memorystore.py
index 2d60b13f..fac66adb 100644
--- a/mail/src/leap/mail/imap/memorystore.py
+++ b/mail/src/leap/mail/imap/memorystore.py
@@ -357,7 +357,7 @@ class MemoryStore(object):
doc_id = fdoc.doc_id
return doc_id
- def get_message(self, mbox, uid):
+ def get_message(self, mbox, uid, flags_only=False):
"""
Get a MessageWrapper for the given mbox and uid combination.
@@ -365,17 +365,27 @@ class MemoryStore(object):
:type mbox: str or unicode
:param uid: the message UID
:type uid: int
+ :param flags_only: whether the message should carry only a reference
+ to the flags document.
+ :type flags_only: bool
:return: MessageWrapper or None
"""
key = mbox, uid
+ FDOC = MessagePartType.fdoc.key
+
msg_dict = self._msg_store.get(key, None)
if empty(msg_dict):
return None
new, dirty = self._get_new_dirty_state(key)
- return MessageWrapper(from_dict=msg_dict,
- new=new, dirty=dirty,
- memstore=weakref.proxy(self))
+ if flags_only:
+ return MessageWrapper(fdoc=msg_dict[FDOC],
+ new=new, dirty=dirty,
+ memstore=weakref.proxy(self))
+ else:
+ return MessageWrapper(from_dict=msg_dict,
+ new=new, dirty=dirty,
+ memstore=weakref.proxy(self))
def remove_message(self, mbox, uid):
"""
@@ -590,7 +600,7 @@ class MemoryStore(object):
if fdoc and fields.DELETED_FLAG in fdoc[fields.FLAGS_KEY]:
return None
- uid = fdoc.content[fields.UID_KEY]
+ uid = fdoc[fields.UID_KEY]
key = mbox, uid
new = key in self._new
dirty = key in self._dirty
diff --git a/mail/src/leap/mail/imap/messages.py b/mail/src/leap/mail/imap/messages.py
index 315cdda6..5770868b 100644
--- a/mail/src/leap/mail/imap/messages.py
+++ b/mail/src/leap/mail/imap/messages.py
@@ -20,7 +20,6 @@ LeapMessage and MessageCollection.
import copy
import logging
import re
-import time
import threading
import StringIO
@@ -97,11 +96,13 @@ class LeapMessage(fields, MailParser, MBoxParser):
"""
# TODO this has to change.
- # Should index primarily by chash, and keep a local-lonly
+ # Should index primarily by chash, and keep a local-only
# UID table.
implements(imap4.IMessage)
+ flags_lock = threading.Lock()
+
def __init__(self, soledad, uid, mbox, collection=None, container=None):
"""
Initializes a LeapMessage.
@@ -111,7 +112,7 @@ class LeapMessage(fields, MailParser, MBoxParser):
:param uid: the UID for the message.
:type uid: int or basestring
:param mbox: the mbox this message belongs to
- :type mbox: basestring
+ :type mbox: str or unicode
:param collection: a reference to the parent collection object
:type collection: MessageCollection
:param container: a IMessageContainer implementor instance
@@ -216,23 +217,17 @@ class LeapMessage(fields, MailParser, MBoxParser):
flags = map(str, flags)
return tuple(flags)
- # setFlags, addFlags, removeFlags are not in the interface spec
- # but we use them with store command.
+ # setFlags not in the interface spec but we use it with store command.
- def setFlags(self, flags):
+ def setFlags(self, flags, mode):
"""
Sets the flags for this message
- Returns a SoledadDocument that needs to be updated by the caller.
-
:param flags: the flags to update in the message.
:type flags: tuple of str
-
- :return: a SoledadDocument instance
- :rtype: SoledadDocument
+ :param mode: the mode for setting. 1 is append, -1 is remove, 0 set.
+ :type mode: int
"""
- # XXX Move logic to memory store ...
-
leap_assert(isinstance(flags, tuple), "flags need to be a tuple")
log.msg('setting flags: %s (%s)' % (self._uid, flags))
@@ -242,51 +237,36 @@ class LeapMessage(fields, MailParser, MBoxParser):
"Could not find FDOC for %s:%s while setting flags!" %
(self._mbox, self._uid))
return
- doc.content[self.FLAGS_KEY] = flags
- doc.content[self.SEEN_KEY] = self.SEEN_FLAG in flags
- doc.content[self.DEL_KEY] = self.DELETED_FLAG in flags
-
- if self._collection.memstore is not None:
- log.msg("putting message in collection")
- self._collection.memstore.put_message(
- self._mbox, self._uid,
- MessageWrapper(fdoc=doc.content, new=False, dirty=True,
- docs_id={'fdoc': doc.doc_id}))
- else:
- # fallback for non-memstore initializations.
- self._soledad.put_doc(doc)
-
- def addFlags(self, flags):
- """
- Adds flags to this message.
-
- Returns a SoledadDocument that needs to be updated by the caller.
-
- :param flags: the flags to add to the message.
- :type flags: tuple of str
-
- :return: a SoledadDocument instance
- :rtype: SoledadDocument
- """
- leap_assert(isinstance(flags, tuple), "flags need to be a tuple")
- oldflags = self.getFlags()
- self.setFlags(tuple(set(flags + oldflags)))
-
- def removeFlags(self, flags):
- """
- Remove flags from this message.
-
- Returns a SoledadDocument that needs to be updated by the caller.
- :param flags: the flags to be removed from the message.
- :type flags: tuple of str
-
- :return: a SoledadDocument instance
- :rtype: SoledadDocument
- """
- leap_assert(isinstance(flags, tuple), "flags need to be a tuple")
- oldflags = self.getFlags()
- self.setFlags(tuple(set(oldflags) - set(flags)))
+ APPEND = 1
+ REMOVE = -1
+ SET = 0
+
+ with self.flags_lock:
+ current = doc.content[self.FLAGS_KEY]
+ if mode == APPEND:
+ newflags = tuple(set(tuple(current) + flags))
+ elif mode == REMOVE:
+ newflags = tuple(set(current).difference(set(flags)))
+ elif mode == SET:
+ newflags = flags
+
+ # We could defer this, but I think it's better
+ # to put it under the lock...
+ doc.content[self.FLAGS_KEY] = newflags
+ doc.content[self.SEEN_KEY] = self.SEEN_FLAG in flags
+ doc.content[self.DEL_KEY] = self.DELETED_FLAG in flags
+
+ if self._collection.memstore is not None:
+ log.msg("putting message in collection")
+ self._collection.memstore.put_message(
+ self._mbox, self._uid,
+ MessageWrapper(fdoc=doc.content, new=False, dirty=True,
+ docs_id={'fdoc': doc.doc_id}))
+ else:
+ # fallback for non-memstore initializations.
+ self._soledad.put_doc(doc)
+ return map(str, newflags)
def getInternalDate(self):
"""
@@ -1022,6 +1002,9 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
def unset_recent_flags(self, uids):
"""
Unset Recent flag for a sequence of uids.
+
+ :param uids: the uids to unset
+ :type uid: sequence
"""
with self._rdoc_property_lock:
self.recent_flags.difference_update(
@@ -1032,6 +1015,9 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
def unset_recent_flag(self, uid):
"""
Unset Recent flag for a given uid.
+
+ :param uid: the uid to unset
+ :type uid: int
"""
with self._rdoc_property_lock:
self.recent_flags.difference_update(
@@ -1040,6 +1026,9 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
def set_recent_flag(self, uid):
"""
Set Recent flag for a given uid.
+
+ :param uid: the uid to set
+ :type uid: int
"""
with self._rdoc_property_lock:
self.recent_flags = self.recent_flags.union(
@@ -1099,31 +1088,64 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
# and we cannot find it otherwise. This seems to be enough.
# XXX do a deferLater instead ??
- # FIXME this won't be needed after the CHECK command is implemented.
- time.sleep(0.3)
+ # XXX is this working?
return self._get_uid_from_msgidCb(msgid)
+ def set_flags(self, mbox, messages, flags, mode):
+ """
+ Set flags for a sequence of messages.
+
+ :param mbox: the mbox this message belongs to
+ :type mbox: str or unicode
+ :param messages: the messages to iterate through
+ :type messages: sequence
+ :flags: the flags to be set
+ :type flags: tuple
+ :param mode: the mode for setting. 1 is append, -1 is remove, 0 set.
+ :type mode: int
+ """
+ result = {}
+ for msg_id in messages:
+ log.msg("MSG ID = %s" % msg_id)
+ msg = self.get_msg_by_uid(msg_id, mem_only=True, flags_only=True)
+ if not msg:
+ continue
+ result[msg_id] = msg.setFlags(flags, mode)
+
+ return result
+
# getters: generic for a mailbox
- def get_msg_by_uid(self, uid):
+ def get_msg_by_uid(self, uid, mem_only=False, flags_only=False):
"""
Retrieves a LeapMessage by UID.
This is used primarity in the Mailbox fetch and store methods.
:param uid: the message uid to query by
:type uid: int
+ :param mem_only: a flag that indicates whether this Message should
+ pass a reference to soledad to retrieve missing pieces
+ or not.
+ :type mem_only: bool
+ :param flags_only: whether the message should carry only a reference
+ to the flags document.
+ :type flags_only: bool
:return: A LeapMessage instance matching the query,
or None if not found.
:rtype: LeapMessage
"""
- msg_container = self.memstore.get_message(self.mbox, uid)
+ msg_container = self.memstore.get_message(self.mbox, uid, flags_only)
if msg_container is not None:
- # We pass a reference to soledad just to be able to retrieve
- # missing parts that cannot be found in the container, like
- # the content docs after a copy.
- msg = LeapMessage(self._soledad, uid, self.mbox, collection=self,
- container=msg_container)
+ if mem_only:
+ msg = LeapMessage(None, uid, self.mbox, collection=self,
+ container=msg_container)
+ else:
+ # We pass a reference to soledad just to be able to retrieve
+ # missing parts that cannot be found in the container, like
+ # the content docs after a copy.
+ msg = LeapMessage(self._soledad, uid, self.mbox,
+ collection=self, container=msg_container)
else:
msg = LeapMessage(self._soledad, uid, self.mbox, collection=self)
if not msg.does_exist():
@@ -1159,7 +1181,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
def all_uid_iter(self):
"""
- Return an iterator trhough the UIDs of all messages, sorted in
+ Return an iterator through the UIDs of all messages, sorted in
ascending order.
"""
# XXX we should get this from the uid table, local-only
diff --git a/mail/src/leap/mail/imap/server.py b/mail/src/leap/mail/imap/server.py
index b77678a4..7bca39db 100644
--- a/mail/src/leap/mail/imap/server.py
+++ b/mail/src/leap/mail/imap/server.py
@@ -99,10 +99,9 @@ class LeapIMAPServer(imap4.IMAP4Server):
Overwritten fetch dispatcher to use the fast fetch_flags
method
"""
- from twisted.internet import reactor
if not query:
self.sendPositiveResponse(tag, 'FETCH complete')
- return # XXX ???
+ return
cbFetch = self._IMAP4Server__cbFetch
ebFetch = self._IMAP4Server__ebFetch
@@ -131,16 +130,19 @@ class LeapIMAPServer(imap4.IMAP4Server):
).addCallback(
cbFetch, tag, query, uid
).addErrback(
- ebFetch, tag)
-
- # XXX should be a callback
- deferLater(reactor,
- 2, self.mbox.unset_recent_flags, messages)
- deferLater(reactor, 1, self.mbox.signal_unread_to_ui)
+ ebFetch, tag
+ ).addCallback(
+ self.on_fetch_finished, messages)
select_FETCH = (do_FETCH, imap4.IMAP4Server.arg_seqset,
imap4.IMAP4Server.arg_fetchatt)
+ def on_fetch_finished(self, _, messages):
+ from twisted.internet import reactor
+ deferLater(reactor, 0, self.notifyNew)
+ deferLater(reactor, 0, self.mbox.unset_recent_flags, messages)
+ deferLater(reactor, 0, self.mbox.signal_unread_to_ui)
+
def on_copy_finished(self, defers):
d = defer.gatherResults(filter(None, defers))
d.addCallback(self.notifyNew)
@@ -156,7 +158,7 @@ class LeapIMAPServer(imap4.IMAP4Server):
select_COPY = (do_COPY, imap4.IMAP4Server.arg_seqset,
imap4.IMAP4Server.arg_astring)
- def notifyNew(self, ignored):
+ def notifyNew(self, ignored=None):
"""
Notify new messages to listeners.
"""
@@ -203,10 +205,4 @@ class LeapIMAPServer(imap4.IMAP4Server):
"""
# TODO return the output of _memstore.is_writing
# XXX and that should return a deferred!
-
- # XXX fake a delayed operation, to debug problem with messages getting
- # back to the source mailbox...
- print "faking checkpoint..."
- import time
- time.sleep(5)
return None
diff --git a/mail/src/leap/mail/imap/soledadstore.py b/mail/src/leap/mail/imap/soledadstore.py
index f64ed233..ae5c583e 100644
--- a/mail/src/leap/mail/imap/soledadstore.py
+++ b/mail/src/leap/mail/imap/soledadstore.py
@@ -26,6 +26,7 @@ from u1db import errors as u1db_errors
from zope.interface import implements
from leap.common.check import leap_assert_type
+from leap.mail.decorators import deferred
from leap.mail.imap.messageparts import MessagePartType
from leap.mail.imap.messageparts import MessageWrapper
from leap.mail.imap.messageparts import RecentFlagsDoc
@@ -191,6 +192,7 @@ class SoledadStore(ContentDedup):
# IMessageConsumer
+ @deferred
def consume(self, queue):
"""
Creates a new document in soledad db.
@@ -297,9 +299,6 @@ class SoledadStore(ContentDedup):
# item is expected to be a MessagePartDoc
for item in msg_wrapper.walk():
if item.part == MessagePartType.fdoc:
-
- # FIXME add content duplication for HEADERS too!
- # (only 1 chash per mailbox!)
yield dict(item.content), call
elif item.part == MessagePartType.hdoc: