summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKali Kaneko <kali@leap.se>2014-01-15 16:57:18 -0400
committerKali Kaneko <kali@leap.se>2014-01-15 21:32:54 -0400
commitfc7ef201ea169e76123e15db346ac8d882d93c02 (patch)
treec72d331e9e8cf23373a7373bfdeea5d721e1dd7c
parent2b53238ce5211bc23da8d1e8903335daa12ca02e (diff)
remove use of soledad_writer
Since the soledad client lock gets us covered with writes now, it makes no sense to enqueue using the messageconsumer. The SoledadWriter is left orphaned by now. We might want to reuse it to enqueue low priority tasks that need a strategy of retries in case of revisionconflicts. the MessageConsumer abstraction should also be useful for the case of the smtp queue.
-rw-r--r--src/leap/mail/imap/messages.py163
1 files changed, 99 insertions, 64 deletions
diff --git a/src/leap/mail/imap/messages.py b/src/leap/mail/imap/messages.py
index 7c17dbe..b35b808 100644
--- a/src/leap/mail/imap/messages.py
+++ b/src/leap/mail/imap/messages.py
@@ -20,7 +20,6 @@ LeapMessage and MessageCollection.
import copy
import logging
import re
-import threading
import time
import StringIO
@@ -51,8 +50,6 @@ logger = logging.getLogger(__name__)
# [ ] Add linked-from info.
# [ ] Delete incoming mail only after successful write!
# [ ] Remove UID from syncable db. Store only those indexes locally.
-# [ ] Send patch to twisted for bug in imap4.py:5717 (content-type can be
-# none? lower-case?)
def lowerdict(_dict):
@@ -657,10 +654,27 @@ class LeapMessage(fields, MailParser, MBoxParser):
Return the document that keeps the flags for this
message.
"""
- flag_docs = self._soledad.get_from_index(
- fields.TYPE_MBOX_UID_IDX,
- fields.TYPE_FLAGS_VAL, self._mbox, str(self._uid))
- return first(flag_docs)
+ result = {}
+ try:
+ flag_docs = self._soledad.get_from_index(
+ fields.TYPE_MBOX_UID_IDX,
+ fields.TYPE_FLAGS_VAL, self._mbox, str(self._uid))
+ result = first(flag_docs)
+ except Exception as exc:
+ # ugh! Something's broken down there!
+ logger.warning("FUCKING ERROR ----- getting for UID:", self._uid)
+ logger.exception(exc)
+ try:
+ flag_docs = self._soledad.get_from_index(
+ fields.TYPE_MBOX_UID_IDX,
+ fields.TYPE_FLAGS_VAL, self._mbox, str(self._uid))
+ result = first(flag_docs)
+ except Exception as exc:
+ # ugh! Something's broken down there!
+ logger.warning("FUCKING ERROR, 2nd time -----")
+ logger.exception(exc)
+ finally:
+ return result
def _get_headers_doc(self):
"""
@@ -770,6 +784,51 @@ class LeapMessage(fields, MailParser, MBoxParser):
return self._fdoc is not None
+class ContentDedup(object):
+ """
+ Message deduplication.
+
+ We do a query for the content hashes before writing to our beloved
+ sqlcipher backend of Soledad. This means, by now, that:
+
+ 1. We will not store the same attachment twice, only the hash of it.
+ 2. We will not store the same message body twice, only the hash of it.
+
+ The first case is useful if you are always receiving the same old memes
+ from unwary friends that still have not discovered that 4chan is the
+ generator of the internet. The second will save your day if you have
+ initiated session with the same account in two different machines. I also
+ wonder why would you do that, but let's respect each other choices, like
+ with the religious celebrations, and assume that one day we'll be able
+ to run Bitmask in completely free phones. Yes, I mean that, the whole GSM
+ Stack.
+ """
+
+ def _content_does_exist(self, doc):
+ """
+ Check whether we already have a content document for a payload
+ with this hash in our database.
+
+ :param doc: tentative body document
+ :type doc: dict
+ :returns: True if that happens, False otherwise.
+ """
+ if not doc:
+ return False
+ phash = doc[fields.PAYLOAD_HASH_KEY]
+ attach_docs = self._soledad.get_from_index(
+ fields.TYPE_P_HASH_IDX,
+ fields.TYPE_CONTENT_VAL, str(phash))
+ if not attach_docs:
+ return False
+
+ if len(attach_docs) != 1:
+ logger.warning("Found more than one copy of phash %s!"
+ % (phash,))
+ logger.debug("Found attachment doc with that hash! Skipping save!")
+ return True
+
+
SoledadWriterPayload = namedtuple(
'SoledadWriterPayload', ['mode', 'payload'])
@@ -781,6 +840,13 @@ SoledadWriterPayload.PUT = 2
SoledadWriterPayload.CONTENT_CREATE = 3
+"""
+SoledadDocWriter was used to avoid writing to the db from multiple threads.
+Its use here has been deprecated in favor of a local rw_lock in the client.
+But we might want to reuse in in the near future to implement priority queues.
+"""
+
+
class SoledadDocWriter(object):
"""
This writer will create docs serially in the local soledad database.
@@ -852,51 +918,9 @@ class SoledadDocWriter(object):
empty = queue.empty()
- """
- Message deduplication.
- We do a query for the content hashes before writing to our beloved
- sqlcipher backend of Soledad. This means, by now, that:
-
- 1. We will not store the same attachment twice, only the hash of it.
- 2. We will not store the same message body twice, only the hash of it.
-
- The first case is useful if you are always receiving the same old memes
- from unwary friends that still have not discovered that 4chan is the
- generator of the internet. The second will save your day if you have
- initiated session with the same account in two different machines. I also
- wonder why would you do that, but let's respect each other choices, like
- with the religious celebrations, and assume that one day we'll be able
- to run Bitmask in completely free phones. Yes, I mean that, the whole GSM
- Stack.
- """
-
- def _content_does_exist(self, doc):
- """
- Check whether we already have a content document for a payload
- with this hash in our database.
-
- :param doc: tentative body document
- :type doc: dict
- :returns: True if that happens, False otherwise.
- """
- if not doc:
- return False
- phash = doc[fields.PAYLOAD_HASH_KEY]
- attach_docs = self._soledad.get_from_index(
- fields.TYPE_P_HASH_IDX,
- fields.TYPE_CONTENT_VAL, str(phash))
- if not attach_docs:
- return False
-
- if len(attach_docs) != 1:
- logger.warning("Found more than one copy of phash %s!"
- % (phash,))
- logger.debug("Found attachment doc with that hash! Skipping save!")
- return True
-
-
-class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
+class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser,
+ ContentDedup):
"""
A collection of messages, surprisingly.
@@ -1145,23 +1169,21 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
hd[key] = parts_map[key]
del parts_map
- docs = [fd, hd]
- cdocs = walk.get_raw_docs(msg, parts)
-
# Saving
- logger.debug('enqueuing message docs for write')
- ptuple = SoledadWriterPayload
# first, regular docs: flags and headers
- for doc in docs:
- self.soledad_writer.put(ptuple(
- mode=ptuple.CREATE, payload=doc))
+ self._soledad.create_doc(fd)
+
+ # XXX should check for content duplication on headers too
+ # but with chash. !!!
+ self._soledad.create_doc(hd)
# and last, but not least, try to create
# content docs if not already there.
- for cd in cdocs:
- self.soledad_writer.put(ptuple(
- mode=ptuple.CONTENT_CREATE, payload=cd))
+ cdocs = walk.get_raw_docs(msg, parts)
+ for cdoc in cdocs:
+ if not self._content_does_exist(cdoc):
+ self._soledad.create_doc(cdoc)
def _remove_cb(self, result):
return result
@@ -1312,17 +1334,30 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
# XXX FIXINDEX -- should implement order by in soledad
return sorted(all_docs, key=lambda item: item.content['uid'])
- def all_msg_iter(self):
+ def all_uid_iter(self):
"""
Return an iterator trhough the UIDs of all messages, sorted in
ascending order.
"""
+ # XXX we should get this from the uid table, local-only
all_uids = (doc.content[self.UID_KEY] for doc in
self._soledad.get_from_index(
fields.TYPE_MBOX_IDX,
fields.TYPE_FLAGS_VAL, self.mbox))
return (u for u in sorted(all_uids))
+ def all_flags(self):
+ """
+ Return a dict with all flags documents for this mailbox.
+ """
+ all_flags = dict(((
+ doc.content[self.UID_KEY],
+ doc.content[self.FLAGS_KEY]) for doc in
+ self._soledad.get_from_index(
+ fields.TYPE_MBOX_IDX,
+ fields.TYPE_FLAGS_VAL, self.mbox)))
+ return all_flags
+
def count(self):
"""
Return the count of messages for this mailbox.
@@ -1447,7 +1482,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
:rtype: iterable
"""
return (LeapMessage(self._soledad, docuid, self.mbox)
- for docuid in self.all_msg_iter())
+ for docuid in self.all_uid_iter())
def __repr__(self):
"""