summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKali Kaneko <kali@leap.se>2014-02-02 09:26:37 -0400
committerKali Kaneko <kali@leap.se>2014-02-02 18:08:31 -0400
commit18fed49c4143eb764ae9e806882d24f8f4e95744 (patch)
tree2c0c56a28ac9bf4839623b28de746670e6e4b499
parent3a8fda3aa4645adbba228e7d2f204bfe6d400321 (diff)
fix missing content after in-memory add
because THE KEYS WILL BE STRINGS AFTER ADDED TO SOLEDAD Can I remember that? * Fix copy from local folders * Fix copy when we already have a copy of the message in the inbox, marked as deleted. * Fix also bad deferred.succeed in add_msg when it already exist.
-rw-r--r--src/leap/mail/imap/mailbox.py5
-rw-r--r--src/leap/mail/imap/memorystore.py6
-rw-r--r--src/leap/mail/imap/messageparts.py12
-rw-r--r--src/leap/mail/imap/messages.py88
-rw-r--r--src/leap/mail/imap/server.py13
-rw-r--r--src/leap/mail/utils.py38
6 files changed, 110 insertions, 52 deletions
diff --git a/src/leap/mail/imap/mailbox.py b/src/leap/mail/imap/mailbox.py
index 79fb476..688f941 100644
--- a/src/leap/mail/imap/mailbox.py
+++ b/src/leap/mail/imap/mailbox.py
@@ -162,6 +162,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
"""
if not NOTIFY_NEW:
return
+
logger.debug('adding mailbox listener: %s' % listener)
self.listeners.add(listener)
@@ -801,7 +802,6 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
from twisted.internet import reactor
print "COPY :", message
d = defer.Deferred()
-
# XXX this should not happen ... track it down,
# probably to FETCH...
if message is None:
@@ -810,7 +810,6 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
deferLater(reactor, 0, self._do_copy, message, d)
return d
- #@profile
def _do_copy(self, message, observer):
"""
Call invoked from the deferLater in `copy`. This will
@@ -851,7 +850,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
logger.warning("Destination message already exists!")
# XXX I'm still not clear if we should raise the
- # callback. This actually rases an ugly warning
+ # errback. This actually rases an ugly warning
# in some muas like thunderbird. I guess the user does
# not deserve that.
#observer.errback(MessageCopyError("Already exists!"))
diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py
index 211d282..542e227 100644
--- a/src/leap/mail/imap/memorystore.py
+++ b/src/leap/mail/imap/memorystore.py
@@ -318,7 +318,7 @@ class MemoryStore(object):
store[FDOC])
hdoc = msg_dict.get(HDOC, None)
- if hdoc:
+ if hdoc is not None:
if not store.get(HDOC, None):
store[HDOC] = ReferenciableDict({})
store[HDOC].update(hdoc)
@@ -438,7 +438,8 @@ class MemoryStore(object):
if not self.producer.is_queue_empty():
return
- logger.info("Writing messages to Soledad...")
+ if any(map(lambda i: not empty(i), (self._new, self._dirty))):
+ logger.info("Writing messages to Soledad...")
# TODO change for lock, and make the property access
# is accquired
@@ -885,6 +886,7 @@ class MemoryStore(object):
# TODO expunge should add itself as a callback to the ongoing
# writes.
soledad_store = self._permanent_store
+ all_deleted = []
try:
# 1. Stop the writing call
diff --git a/src/leap/mail/imap/messageparts.py b/src/leap/mail/imap/messageparts.py
index 5067263..b07681b 100644
--- a/src/leap/mail/imap/messageparts.py
+++ b/src/leap/mail/imap/messageparts.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# messageparts.py
# Copyright (C) 2014 LEAP
#
@@ -315,6 +314,7 @@ class MessageWrapper(object):
fdoc, hdoc, cdocs = map(
lambda part: msg_dict.get(part, None),
[self.FDOC, self.HDOC, self.CDOCS])
+
for t, doc in ((self.FDOC, fdoc), (self.HDOC, hdoc),
(self.CDOCS, cdocs)):
self._dict[t] = ReferenciableDict(doc) if doc else None
@@ -390,8 +390,10 @@ class MessagePart(object):
first_part = pmap.get('1', None)
if not empty(first_part):
phash = first_part['phash']
+ else:
+ phash = None
- if not phash:
+ if phash is None:
logger.warning("Could not find phash for this subpart!")
payload = ""
else:
@@ -435,11 +437,13 @@ class MessagePart(object):
fields.TYPE_CONTENT_VAL, str(phash))
cdoc = first(cdocs)
- if not cdoc:
+ if cdoc is None:
logger.warning(
"Could not find the content doc "
"for phash %s" % (phash,))
- payload = cdoc.content.get(fields.RAW_KEY, "")
+ payload = ""
+ else:
+ payload = cdoc.content.get(fields.RAW_KEY, "")
return payload
# TODO should memory-bound this memoize!!!
diff --git a/src/leap/mail/imap/messages.py b/src/leap/mail/imap/messages.py
index 4a07ef7..6f822db 100644
--- a/src/leap/mail/imap/messages.py
+++ b/src/leap/mail/imap/messages.py
@@ -37,6 +37,7 @@ from leap.common.decorators import memoized_method
from leap.common.mail import get_email_charset
from leap.mail import walk
from leap.mail.utils import first, find_charset, lowerdict, empty
+from leap.mail.utils import stringify_parts_map
from leap.mail.decorators import deferred_to_thread
from leap.mail.imap.index import IndexedDB
from leap.mail.imap.fields import fields, WithMsgFields
@@ -219,7 +220,6 @@ class LeapMessage(fields, MailParser, MBoxParser):
# setFlags not in the interface spec but we use it with store command.
- #@profile
def setFlags(self, flags, mode):
"""
Sets the flags for this message
@@ -243,30 +243,30 @@ class LeapMessage(fields, MailParser, MBoxParser):
REMOVE = -1
SET = 0
- #with self.flags_lock:
- current = doc.content[self.FLAGS_KEY]
- if mode == APPEND:
- newflags = tuple(set(tuple(current) + flags))
- elif mode == REMOVE:
- newflags = tuple(set(current).difference(set(flags)))
- elif mode == SET:
- newflags = flags
-
- # We could defer this, but I think it's better
- # to put it under the lock...
- doc.content[self.FLAGS_KEY] = newflags
- doc.content[self.SEEN_KEY] = self.SEEN_FLAG in flags
- doc.content[self.DEL_KEY] = self.DELETED_FLAG in flags
-
- if self._collection.memstore is not None:
- log.msg("putting message in collection")
- self._collection.memstore.put_message(
- self._mbox, self._uid,
- MessageWrapper(fdoc=doc.content, new=False, dirty=True,
- docs_id={'fdoc': doc.doc_id}))
- else:
- # fallback for non-memstore initializations.
- self._soledad.put_doc(doc)
+ with self.flags_lock:
+ current = doc.content[self.FLAGS_KEY]
+ if mode == APPEND:
+ newflags = tuple(set(tuple(current) + flags))
+ elif mode == REMOVE:
+ newflags = tuple(set(current).difference(set(flags)))
+ elif mode == SET:
+ newflags = flags
+
+ # We could defer this, but I think it's better
+ # to put it under the lock...
+ doc.content[self.FLAGS_KEY] = newflags
+ doc.content[self.SEEN_KEY] = self.SEEN_FLAG in flags
+ doc.content[self.DEL_KEY] = self.DELETED_FLAG in flags
+
+ if self._collection.memstore is not None:
+ log.msg("putting message in collection")
+ self._collection.memstore.put_message(
+ self._mbox, self._uid,
+ MessageWrapper(fdoc=doc.content, new=False, dirty=True,
+ docs_id={'fdoc': doc.doc_id}))
+ else:
+ # fallback for non-memstore initializations.
+ self._soledad.put_doc(doc)
return map(str, newflags)
def getInternalDate(self):
@@ -483,6 +483,9 @@ class LeapMessage(fields, MailParser, MBoxParser):
hdoc_content = self._hdoc.content
pmap = hdoc_content.get(fields.PARTS_MAP_KEY, {})
+
+ # remember, lads, soledad is using strings in its keys,
+ # not integers!
return pmap[str(part)]
# XXX moved to memory store
@@ -534,10 +537,10 @@ class LeapMessage(fields, MailParser, MBoxParser):
if self._container is not None:
bdoc = self._container.memstore.get_cdoc_from_phash(body_phash)
- if bdoc and bdoc.content is not None:
+ if not empty(bdoc) and not empty(bdoc.content):
return bdoc
- # no memstore or no doc found there
+ # no memstore, or no body doc found there
if self._soledad:
body_docs = self._soledad.get_from_index(
fields.TYPE_P_HASH_IDX,
@@ -847,7 +850,6 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
else:
return False
- #@profile
def add_msg(self, raw, subject=None, flags=None, date=None, uid=None,
notify_on_disk=False):
"""
@@ -881,7 +883,8 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
self._do_add_msg(raw, flags, subject, date, notify_on_disk, d)
return d
- @deferred_to_thread
+ # We SHOULD defer this (or the heavy load here) to the thread pool,
+ # but it gives troubles with the QSocketNotifier used by Qt...
def _do_add_msg(self, raw, flags, subject, date, notify_on_disk, observer):
"""
Helper that creates a new message document.
@@ -907,9 +910,19 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
# So we probably should just do an in-memory check and
# move the complete check to the soledad writer?
# Watch out! We're reserving a UID right after this!
- if self._fdoc_already_exists(chash):
- logger.warning("We already have that message in this mailbox.")
- return defer.succeed('already_exists')
+ existing_uid = self._fdoc_already_exists(chash)
+ if existing_uid:
+ logger.warning("We already have that message in this "
+ "mailbox, unflagging as deleted")
+ uid = existing_uid
+ msg = self.get_msg_by_uid(uid)
+ msg.setFlags((fields.DELETED_FLAG,), -1)
+
+ # XXX if this is deferred to thread again we should not use
+ # the callback in the deferred thread, but return and
+ # call the callback from the caller fun...
+ observer.callback(uid)
+ return
uid = self.memstore.increment_last_soledad_uid(self.mbox)
logger.info("ADDING MSG WITH UID: %s" % uid)
@@ -929,17 +942,15 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
hd[key] = parts_map[key]
del parts_map
+ hd = stringify_parts_map(hd)
+
# The MessageContainer expects a dict, one-indexed
# XXX review-me
cdocs = dict(((key + 1, doc) for key, doc in
enumerate(walk.get_raw_docs(msg, parts))))
self.set_recent_flag(uid)
-
- # TODO ---- add reference to original doc, to be deleted
- # after writes are done.
msg_container = MessageWrapper(fd, hd, cdocs)
-
self.memstore.create_message(self.mbox, uid, msg_container,
observer=observer,
notify_on_disk=notify_on_disk)
@@ -950,7 +961,6 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
# recent flags
- #@profile
def _get_recent_flags(self):
"""
An accessor for the recent-flags set for this mailbox.
@@ -1004,7 +1014,6 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
doc="Set of UIDs with the recent flag for this mailbox.")
# XXX change naming, indicate soledad query.
- #@profile
def _get_recent_doc(self):
"""
Get recent-flags document from Soledad for this mailbox.
@@ -1114,7 +1123,6 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
# XXX is this working?
return self._get_uid_from_msgidCb(msgid)
- #@profile
def set_flags(self, mbox, messages, flags, mode, observer):
"""
Set flags for a sequence of messages.
@@ -1220,7 +1228,6 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
# FIXME ----------------------------------------------
return sorted(all_docs, key=lambda item: item.content['uid'])
- #@profile
def all_soledad_uid_iter(self):
"""
Return an iterator through the UIDs of all messages, sorted in
@@ -1232,7 +1239,6 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
fields.TYPE_FLAGS_VAL, self.mbox)])
return db_uids
- #@profile
def all_uid_iter(self):
"""
Return an iterator through the UIDs of all messages, from memory.
diff --git a/src/leap/mail/imap/server.py b/src/leap/mail/imap/server.py
index 7bca39d..ba63846 100644
--- a/src/leap/mail/imap/server.py
+++ b/src/leap/mail/imap/server.py
@@ -139,14 +139,22 @@ class LeapIMAPServer(imap4.IMAP4Server):
def on_fetch_finished(self, _, messages):
from twisted.internet import reactor
+
+ print "FETCH FINISHED -- NOTIFY NEW"
deferLater(reactor, 0, self.notifyNew)
deferLater(reactor, 0, self.mbox.unset_recent_flags, messages)
deferLater(reactor, 0, self.mbox.signal_unread_to_ui)
def on_copy_finished(self, defers):
d = defer.gatherResults(filter(None, defers))
- d.addCallback(self.notifyNew)
- d.addCallback(self.mbox.signal_unread_to_ui)
+
+ def when_finished(result):
+ log.msg("COPY FINISHED")
+ self.notifyNew()
+ self.mbox.signal_unread_to_ui()
+ d.addCallback(when_finished)
+ #d.addCallback(self.notifyNew)
+ #d.addCallback(self.mbox.signal_unread_to_ui)
def do_COPY(self, tag, messages, mailbox, uid=0):
from twisted.internet import reactor
@@ -162,6 +170,7 @@ class LeapIMAPServer(imap4.IMAP4Server):
"""
Notify new messages to listeners.
"""
+ print "TRYING TO NOTIFY NEW"
self.mbox.notify_new()
def _cbSelectWork(self, mbox, cmdName, tag):
diff --git a/src/leap/mail/utils.py b/src/leap/mail/utils.py
index 6a1fcde..942acfb 100644
--- a/src/leap/mail/utils.py
+++ b/src/leap/mail/utils.py
@@ -17,6 +17,7 @@
"""
Mail utilities.
"""
+import copy
import json
import re
import traceback
@@ -92,6 +93,43 @@ def lowerdict(_dict):
for key, value in _dict.items())
+PART_MAP = "part_map"
+
+
+def _str_dict(d, k):
+ """
+ Convert the dictionary key to string if it was a string.
+
+ :param d: the dict
+ :type d: dict
+ :param k: the key
+ :type k: object
+ """
+ if isinstance(k, int):
+ val = d[k]
+ d[str(k)] = val
+ del(d[k])
+
+
+def stringify_parts_map(d):
+ """
+ Modify a dictionary making all the nested dicts under "part_map" keys
+ having strings as keys.
+
+ :param d: the dictionary to modify
+ :type d: dictionary
+ :rtype: dictionary
+ """
+ for k in d:
+ if k == PART_MAP:
+ pmap = d[k]
+ for kk in pmap.keys():
+ _str_dict(d[k], kk)
+ for kk in pmap.keys():
+ stringify_parts_map(d[k][str(kk)])
+ return d
+
+
class CustomJsonScanner(object):
"""
This class is a context manager definition used to monkey patch the default