summaryrefslogtreecommitdiff
path: root/src/leap/mail/imap
diff options
context:
space:
mode:
authorKali Kaneko <kali@leap.se>2014-01-23 13:32:01 -0400
committerKali Kaneko <kali@leap.se>2014-01-28 19:38:45 -0400
commitff28e22977db802c87f0b7be99e37c6de29183e9 (patch)
treec75bc293bc171cbc65dd494a8947057f306b8c96 /src/leap/mail/imap
parent0754dac293730b02942716991d5edc513c36ff7c (diff)
Unset new flag after successful write
Diffstat (limited to 'src/leap/mail/imap')
-rw-r--r--src/leap/mail/imap/memorystore.py16
-rw-r--r--src/leap/mail/imap/messageparts.py33
-rw-r--r--src/leap/mail/imap/messages.py27
-rw-r--r--src/leap/mail/imap/server.py5
-rw-r--r--src/leap/mail/imap/soledadstore.py84
5 files changed, 136 insertions, 29 deletions
diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py
index 7cb361f..f0bdab5 100644
--- a/src/leap/mail/imap/memorystore.py
+++ b/src/leap/mail/imap/memorystore.py
@@ -271,12 +271,28 @@ class MemoryStore(object):
return (self.get_message(*key)
for key in sorted(self._msg_store.keys()))
+ # new, dirty flags
+
def _get_new_dirty_state(self, key):
"""
Return `new` and `dirty` flags for a given message.
"""
return map(lambda _set: key in _set, (self._new, self._dirty))
+ def set_new(self, key):
+ """
+ Add the key value to the `new` set.
+ """
+ self._new.add(key)
+
+ def unset_new(self, key):
+ """
+ Remove the key value from the `new` set.
+ """
+ print "******************"
+ print "UNSETTING NEW FOR: %s" % str(key)
+ self._new.discard(key)
+
@property
def is_writing(self):
"""
diff --git a/src/leap/mail/imap/messageparts.py b/src/leap/mail/imap/messageparts.py
index 3f89193..42eef02 100644
--- a/src/leap/mail/imap/messageparts.py
+++ b/src/leap/mail/imap/messageparts.py
@@ -125,20 +125,41 @@ class MessageWrapper(object):
# properties
- @property
- def new(self):
+ def _get_new(self):
+ """
+ Get the value for the `new` flag.
+ """
return self._new
- def set_new(self, value=True):
+ def _set_new(self, value=True):
+ """
+ Set the value for the `new` flag, and propagate it
+ to the memory store if any.
+ """
self._new = value
+ if self.memstore:
+ mbox = self.fdoc.content['mbox']
+ uid = self.fdoc.content['uid']
+ key = mbox, uid
+ fun = [self.memstore.unset_new,
+ self.memstore.set_new][int(value)]
+ fun(key)
+ else:
+ logger.warning("Could not find a memstore referenced from this "
+ "MessageWrapper. The value for new will not be "
+ "propagated")
- @property
- def dirty(self):
+ new = property(_get_new, _set_new,
+ doc="The `new` flag for this MessageWrapper")
+
+ def _get_dirty(self):
return self._dirty
- def set_dirty(self, value=True):
+ def _set_dirty(self, value=True):
self._dirty = value
+ dirty = property(_get_dirty, _set_dirty)
+
# IMessageContainer
@property
diff --git a/src/leap/mail/imap/messages.py b/src/leap/mail/imap/messages.py
index 3c30aa8..94bd714 100644
--- a/src/leap/mail/imap/messages.py
+++ b/src/leap/mail/imap/messages.py
@@ -42,6 +42,7 @@ from leap.mail.decorators import deferred
from leap.mail.imap.index import IndexedDB
from leap.mail.imap.fields import fields, WithMsgFields
from leap.mail.imap.memorystore import MessageWrapper
+from leap.mail.imap.messageparts import MessagePart
from leap.mail.imap.parser import MailParser, MBoxParser
logger = logging.getLogger(__name__)
@@ -306,15 +307,25 @@ class LeapMessage(fields, MailParser, MBoxParser):
:return: file-like object opened for reading
:rtype: StringIO
"""
+ def write_fd(body):
+ fd.write(body)
+ fd.seek(0)
+ return fd
+
# TODO refactor with getBodyFile in MessagePart
fd = StringIO.StringIO()
if self._bdoc is not None:
bdoc_content = self._bdoc.content
+ if bdoc_content is None:
+ logger.warning("No BODC content found for message!!!")
+ return write_fd(str(""))
+
body = bdoc_content.get(self.RAW_KEY, "")
content_type = bdoc_content.get('content-type', "")
charset = find_charset(content_type)
logger.debug('got charset from content-type: %s' % charset)
if charset is None:
+ # XXX change for find_charset utility
charset = self._get_charset(body)
try:
body = body.encode(charset)
@@ -328,15 +339,13 @@ class LeapMessage(fields, MailParser, MBoxParser):
body = body.encode('utf-8', 'replace')
except:
pass
+ finally:
+ return write_fd(body)
# We are still returning funky characters from here.
else:
logger.warning("No BDOC found for message.")
- body = str("")
-
- fd.write(body)
- fd.seek(0)
- return fd
+ return write_fd(str(""))
@memoized_method
def _get_charset(self, stuff):
@@ -524,7 +533,7 @@ class LeapMessage(fields, MailParser, MBoxParser):
message.
"""
hdoc_content = self._hdoc.content
- print "hdoc: ", hdoc_content
+ #print "hdoc: ", hdoc_content
body_phash = hdoc_content.get(
fields.BODY_KEY, None)
print "body phash: ", body_phash
@@ -540,10 +549,10 @@ class LeapMessage(fields, MailParser, MBoxParser):
if self._container is not None:
bdoc = self._container.memstore.get_by_phash(body_phash)
print "bdoc from container -->", bdoc
- if bdoc:
+ if bdoc and bdoc.content is not None:
return bdoc
else:
- print "no doc for that phash found!"
+ print "no doc or not bdoc content for that phash found!"
print "nuthing. soledad?"
# no memstore or no doc found there
@@ -551,7 +560,7 @@ class LeapMessage(fields, MailParser, MBoxParser):
body_docs = self._soledad.get_from_index(
fields.TYPE_P_HASH_IDX,
fields.TYPE_CONTENT_VAL, str(body_phash))
- print "returning body docs,,,", body_docs
+ print "returning body docs...", body_docs
return first(body_docs)
else:
logger.error("No phash in container, and no soledad found!")
diff --git a/src/leap/mail/imap/server.py b/src/leap/mail/imap/server.py
index 8bd875b..c95a9be 100644
--- a/src/leap/mail/imap/server.py
+++ b/src/leap/mail/imap/server.py
@@ -196,4 +196,9 @@ class LeapIMAPServer(imap4.IMAP4Server):
"""
# TODO return the output of _memstore.is_writing
# XXX and that should return a deferred!
+
+ # XXX fake a delayed operation, to debug problem with messages getting
+ # back to the source mailbox...
+ import time
+ time.sleep(2)
return None
diff --git a/src/leap/mail/imap/soledadstore.py b/src/leap/mail/imap/soledadstore.py
index 62a3c53..d36acae 100644
--- a/src/leap/mail/imap/soledadstore.py
+++ b/src/leap/mail/imap/soledadstore.py
@@ -19,6 +19,8 @@ A MessageStore that writes to Soledad.
"""
import logging
+from itertools import chain
+
from u1db import errors as u1db_errors
from zope.interface import implements
@@ -30,6 +32,13 @@ from leap.mail.messageflow import IMessageConsumer
logger = logging.getLogger(__name__)
+# TODO
+# [ ] Delete original message from the incoming queue after all successful
+# writes.
+# [ ] Implement a retry queue.
+# [ ] Consider journaling of operations.
+
+
class ContentDedup(object):
"""
Message deduplication.
@@ -37,8 +46,8 @@ class ContentDedup(object):
We do a query for the content hashes before writing to our beloved
sqlcipher backend of Soledad. This means, by now, that:
- 1. We will not store the same attachment twice, only the hash of it.
- 2. We will not store the same message body twice, only the hash of it.
+ 1. We will not store the same body/attachment twice, only the hash of it.
+ 2. We will not store the same message header twice, only the hash of it.
The first case is useful if you are always receiving the same old memes
from unwary friends that still have not discovered that 4chan is the
@@ -49,6 +58,7 @@ class ContentDedup(object):
to run Bitmask in completely free phones. Yes, I mean that, the whole GSM
Stack.
"""
+ # TODO refactor using unique_query
def _header_does_exist(self, doc):
"""
@@ -99,6 +109,12 @@ class ContentDedup(object):
return True
+class MsgWriteError(Exception):
+ """
+ Raised if any exception is found while saving message parts.
+ """
+
+
class SoledadStore(ContentDedup):
"""
This will create docs in the local Soledad database.
@@ -108,7 +124,7 @@ class SoledadStore(ContentDedup):
def __init__(self, soledad):
"""
- Initialize the writer.
+ Initialize the permanent store that writes to Soledad database.
:param soledad: the soledad instance
:type soledad: Soledad
@@ -165,15 +181,40 @@ class SoledadStore(ContentDedup):
to be inserted.
:type queue: Queue
"""
- # TODO should delete the original message from incoming after
+ # TODO should delete the original message from incoming only after
# the writes are done.
# TODO should handle the delete case
# TODO should handle errors
+ # TODO could generalize this method into a generic consumer
+ # and only implement `process` here
empty = queue.empty()
while not empty:
- for item, call in self._process(queue):
- self._try_call(call, item)
+ items = self._process(queue)
+ # we prime the generator, that should return the
+ # item in the first place.
+ msg_wrapper = items.next()
+
+ # From here, we unpack the subpart items and
+ # the right soledad call.
+ try:
+ failed = False
+ for item, call in items:
+ try:
+ self._try_call(call, item)
+ except Exception:
+ failed = True
+ continue
+ if failed:
+ raise MsgWriteError
+
+ except MsgWriteError:
+ logger.error("Error while processing item.")
+ pass
+ else:
+ # If everything went well, we can unset the new flag
+ # in the source store (memory store)
+ msg_wrapper.new = False
empty = queue.empty()
#
@@ -182,14 +223,16 @@ class SoledadStore(ContentDedup):
def _process(self, queue):
"""
- Return the item and the proper call type for the next
- item in the queue if any.
+ Return an iterator that will yield the msg_wrapper in the first place,
+ followed by the subparts item and the proper call type for every
+ item in the queue, if any.
:param queue: the queue from where we'll pick item.
:type queue: Queue
"""
msg_wrapper = queue.get()
- return self._get_calls_for_msg_parts(msg_wrapper)
+ return chain((msg_wrapper,),
+ self._get_calls_for_msg_parts(msg_wrapper))
def _try_call(self, call, item):
"""
@@ -205,7 +248,7 @@ class SoledadStore(ContentDedup):
def _get_calls_for_msg_parts(self, msg_wrapper):
"""
- Return the proper call type for a given item.
+ Generator that return the proper call type for a given item.
:param msg_wrapper: A MessageWrapper
:type msg_wrapper: IMessageContainer
@@ -220,18 +263,31 @@ class SoledadStore(ContentDedup):
if item.part == MessagePartType.fdoc:
yield dict(item.content), call
- if item.part == MessagePartType.hdoc:
+ elif item.part == MessagePartType.hdoc:
if not self._header_does_exist(item.content):
yield dict(item.content), call
- if item.part == MessagePartType.cdoc:
- if self._content_does_exist(item.content):
+ elif item.part == MessagePartType.cdoc:
+ if not self._content_does_exist(item.content):
+
+ # XXX DEBUG -------------------
+ print "about to write content-doc ",
+ #import pprint; pprint.pprint(item.content)
+
yield dict(item.content), call
+ # TODO should write back to the queue
+ # with the results of the operation.
+ # We can write there:
+ # (*) MsgWriteACK --> Should remove from incoming queue.
+ # (We should do this here).
+
+ # Implement using callbacks for each operation.
+
# TODO should check for elements with the dirty state
# TODO if new == False and dirty == True, put_doc
# XXX for puts, we will have to retrieve
# the document, change the content, and
# pass the whole document under "content"
else:
- logger.error("Cannot put documents yet!")
+ logger.error("Cannot put/delete documents yet!")