summaryrefslogtreecommitdiff
path: root/src/leap/mail/imap/soledadstore.py
diff options
context:
space:
mode:
authorKali Kaneko <kali@leap.se>2014-01-23 13:32:01 -0400
committerKali Kaneko <kali@leap.se>2014-01-28 19:38:45 -0400
commitff28e22977db802c87f0b7be99e37c6de29183e9 (patch)
treec75bc293bc171cbc65dd494a8947057f306b8c96 /src/leap/mail/imap/soledadstore.py
parent0754dac293730b02942716991d5edc513c36ff7c (diff)
Unset new flag after successful write
Diffstat (limited to 'src/leap/mail/imap/soledadstore.py')
-rw-r--r--src/leap/mail/imap/soledadstore.py84
1 files changed, 70 insertions, 14 deletions
diff --git a/src/leap/mail/imap/soledadstore.py b/src/leap/mail/imap/soledadstore.py
index 62a3c53..d36acae 100644
--- a/src/leap/mail/imap/soledadstore.py
+++ b/src/leap/mail/imap/soledadstore.py
@@ -19,6 +19,8 @@ A MessageStore that writes to Soledad.
"""
import logging
+from itertools import chain
+
from u1db import errors as u1db_errors
from zope.interface import implements
@@ -30,6 +32,13 @@ from leap.mail.messageflow import IMessageConsumer
logger = logging.getLogger(__name__)
+# TODO
+# [ ] Delete original message from the incoming queue after all successful
+# writes.
+# [ ] Implement a retry queue.
+# [ ] Consider journaling of operations.
+
+
class ContentDedup(object):
"""
Message deduplication.
@@ -37,8 +46,8 @@ class ContentDedup(object):
We do a query for the content hashes before writing to our beloved
sqlcipher backend of Soledad. This means, by now, that:
- 1. We will not store the same attachment twice, only the hash of it.
- 2. We will not store the same message body twice, only the hash of it.
+ 1. We will not store the same body/attachment twice, only the hash of it.
+ 2. We will not store the same message header twice, only the hash of it.
The first case is useful if you are always receiving the same old memes
from unwary friends that still have not discovered that 4chan is the
@@ -49,6 +58,7 @@ class ContentDedup(object):
to run Bitmask in completely free phones. Yes, I mean that, the whole GSM
Stack.
"""
+ # TODO refactor using unique_query
def _header_does_exist(self, doc):
"""
@@ -99,6 +109,12 @@ class ContentDedup(object):
return True
+class MsgWriteError(Exception):
+ """
+ Raised if any exception is found while saving message parts.
+ """
+
+
class SoledadStore(ContentDedup):
"""
This will create docs in the local Soledad database.
@@ -108,7 +124,7 @@ class SoledadStore(ContentDedup):
def __init__(self, soledad):
"""
- Initialize the writer.
+ Initialize the permanent store that writes to Soledad database.
:param soledad: the soledad instance
:type soledad: Soledad
@@ -165,15 +181,40 @@ class SoledadStore(ContentDedup):
to be inserted.
:type queue: Queue
"""
- # TODO should delete the original message from incoming after
+ # TODO should delete the original message from incoming only after
# the writes are done.
# TODO should handle the delete case
# TODO should handle errors
+ # TODO could generalize this method into a generic consumer
+ # and only implement `process` here
empty = queue.empty()
while not empty:
- for item, call in self._process(queue):
- self._try_call(call, item)
+ items = self._process(queue)
+ # we prime the generator, that should return the
+ # item in the first place.
+ msg_wrapper = items.next()
+
+ # From here, we unpack the subpart items and
+ # the right soledad call.
+ try:
+ failed = False
+ for item, call in items:
+ try:
+ self._try_call(call, item)
+ except Exception:
+ failed = True
+ continue
+ if failed:
+ raise MsgWriteError
+
+ except MsgWriteError:
+ logger.error("Error while processing item.")
+ pass
+ else:
+ # If everything went well, we can unset the new flag
+ # in the source store (memory store)
+ msg_wrapper.new = False
empty = queue.empty()
#
@@ -182,14 +223,16 @@ class SoledadStore(ContentDedup):
def _process(self, queue):
"""
- Return the item and the proper call type for the next
- item in the queue if any.
+ Return an iterator that will yield the msg_wrapper in the first place,
+ followed by the subparts item and the proper call type for every
+ item in the queue, if any.
:param queue: the queue from where we'll pick item.
:type queue: Queue
"""
msg_wrapper = queue.get()
- return self._get_calls_for_msg_parts(msg_wrapper)
+ return chain((msg_wrapper,),
+ self._get_calls_for_msg_parts(msg_wrapper))
def _try_call(self, call, item):
"""
@@ -205,7 +248,7 @@ class SoledadStore(ContentDedup):
def _get_calls_for_msg_parts(self, msg_wrapper):
"""
- Return the proper call type for a given item.
+ Generator that return the proper call type for a given item.
:param msg_wrapper: A MessageWrapper
:type msg_wrapper: IMessageContainer
@@ -220,18 +263,31 @@ class SoledadStore(ContentDedup):
if item.part == MessagePartType.fdoc:
yield dict(item.content), call
- if item.part == MessagePartType.hdoc:
+ elif item.part == MessagePartType.hdoc:
if not self._header_does_exist(item.content):
yield dict(item.content), call
- if item.part == MessagePartType.cdoc:
- if self._content_does_exist(item.content):
+ elif item.part == MessagePartType.cdoc:
+ if not self._content_does_exist(item.content):
+
+ # XXX DEBUG -------------------
+ print "about to write content-doc ",
+ #import pprint; pprint.pprint(item.content)
+
yield dict(item.content), call
+ # TODO should write back to the queue
+ # with the results of the operation.
+ # We can write there:
+ # (*) MsgWriteACK --> Should remove from incoming queue.
+ # (We should do this here).
+
+ # Implement using callbacks for each operation.
+
# TODO should check for elements with the dirty state
# TODO if new == False and dirty == True, put_doc
# XXX for puts, we will have to retrieve
# the document, change the content, and
# pass the whole document under "content"
else:
- logger.error("Cannot put documents yet!")
+ logger.error("Cannot put/delete documents yet!")