summaryrefslogtreecommitdiff
path: root/src/leap/mail/imap/soledadstore.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/leap/mail/imap/soledadstore.py')
-rw-r--r--src/leap/mail/imap/soledadstore.py129
1 files changed, 100 insertions, 29 deletions
diff --git a/src/leap/mail/imap/soledadstore.py b/src/leap/mail/imap/soledadstore.py
index ea5b36e..60576a3 100644
--- a/src/leap/mail/imap/soledadstore.py
+++ b/src/leap/mail/imap/soledadstore.py
@@ -18,18 +18,22 @@
A MessageStore that writes to Soledad.
"""
import logging
+import threading
from itertools import chain
+#from twisted.internet import defer
from u1db import errors as u1db_errors
from zope.interface import implements
+from leap.common.check import leap_assert_type
from leap.mail.imap.messageparts import MessagePartType
from leap.mail.imap.messageparts import MessageWrapper
from leap.mail.imap.messageparts import RecentFlagsDoc
from leap.mail.imap.fields import fields
from leap.mail.imap.interfaces import IMessageStore
from leap.mail.messageflow import IMessageConsumer
+from leap.mail.utils import first
logger = logging.getLogger(__name__)
@@ -123,6 +127,7 @@ class SoledadStore(ContentDedup):
"""
This will create docs in the local Soledad database.
"""
+ _last_uid_lock = threading.Lock()
implements(IMessageConsumer, IMessageStore)
@@ -177,6 +182,7 @@ class SoledadStore(ContentDedup):
# IMessageConsumer
+ #@profile
def consume(self, queue):
"""
Creates a new document in soledad db.
@@ -220,6 +226,7 @@ class SoledadStore(ContentDedup):
if isinstance(doc_wrapper, MessageWrapper):
# If everything went well, we can unset the new flag
# in the source store (memory store)
+ print "unsetting new flag!"
doc_wrapper.new = False
doc_wrapper.dirty = False
empty = queue.empty()
@@ -243,13 +250,11 @@ class SoledadStore(ContentDedup):
return chain((doc_wrapper,),
self._get_calls_for_msg_parts(doc_wrapper))
elif isinstance(doc_wrapper, RecentFlagsDoc):
- print "getting calls for rflags"
return chain((doc_wrapper,),
self._get_calls_for_rflags_doc(doc_wrapper))
else:
print "********************"
print "CANNOT PROCESS ITEM!"
- print "item --------------------->", doc_wrapper
return (i for i in [])
def _try_call(self, call, item):
@@ -275,6 +280,7 @@ class SoledadStore(ContentDedup):
if msg_wrapper.new is True:
call = self._soledad.create_doc
+ print "NEW DOC ----------------------"
# item is expected to be a MessagePartDoc
for item in msg_wrapper.walk():
@@ -301,30 +307,22 @@ class SoledadStore(ContentDedup):
# the flags doc.
elif msg_wrapper.dirty is True:
- print "DIRTY DOC! ----------------------"
call = self._soledad.put_doc
-
# item is expected to be a MessagePartDoc
for item in msg_wrapper.walk():
+ # XXX FIXME Give error if dirty and not doc_id !!!
doc_id = item.doc_id # defend!
+ if not doc_id:
+ continue
doc = self._soledad.get_doc(doc_id)
- doc.content = item.content
-
+ doc.content = dict(item.content)
if item.part == MessagePartType.fdoc:
- print "Will PUT the doc: ", doc
- yield dict(doc), call
-
- # XXX also for linkage-doc
-
- # TODO should write back to the queue
- # with the results of the operation.
- # We can write there:
- # (*) MsgWriteACK --> Should remove from incoming queue.
- # (We should do this here).
- # Implement using callbacks for each operation.
+ logger.debug("PUT dirty fdoc")
+ yield doc, call
+ # XXX also for linkage-doc !!!
else:
- logger.error("Cannot delete documents yet!")
+ logger.error("Cannot delete documents yet from the queue...!")
def _get_calls_for_rflags_doc(self, rflags_wrapper):
"""
@@ -334,18 +332,91 @@ class SoledadStore(ContentDedup):
rdoc = self._soledad.get_doc(rflags_wrapper.doc_id)
payload = rflags_wrapper.content
- print "rdoc", rdoc
- print "SAVING RFLAGS TO SOLEDAD..."
- import pprint; pprint.pprint(payload)
+ logger.debug("Saving RFLAGS to Soledad...")
if payload:
rdoc.content = payload
- print
- print "YIELDING -----", rdoc
- print "AND ----------", call
yield rdoc, call
- else:
- print ">>>>>>>>>>>>>>>>>"
- print ">>>>>>>>>>>>>>>>>"
- print ">>>>>>>>>>>>>>>>>"
- print "No payload"
+
+ def _get_mbox_document(self, mbox):
+ """
+ Return mailbox document.
+
+ :return: A SoledadDocument containing this mailbox, or None if
+ the query failed.
+ :rtype: SoledadDocument or None.
+ """
+ try:
+ query = self._soledad.get_from_index(
+ fields.TYPE_MBOX_IDX,
+ fields.TYPE_MBOX_VAL, mbox)
+ if query:
+ return query.pop()
+ except Exception as exc:
+ logger.exception("Unhandled error %r" % exc)
+
+ def get_flags_doc(self, mbox, uid):
+ """
+ Return the SoledadDocument for the given mbox and uid.
+ """
+ try:
+ flag_docs = self._soledad.get_from_index(
+ fields.TYPE_MBOX_UID_IDX,
+ fields.TYPE_FLAGS_VAL, mbox, str(uid))
+ result = first(flag_docs)
+ except Exception as exc:
+ # ugh! Something's broken down there!
+ logger.warning("ERROR while getting flags for UID: %s" % uid)
+ logger.exception(exc)
+ finally:
+ return result
+
+ def write_last_uid(self, mbox, value):
+ """
+ Write the `last_uid` integer to the proper mailbox document
+ in Soledad.
+ This is called from the deferred triggered by
+ memorystore.increment_last_soledad_uid, which is expected to
+ run in a separate thread.
+ """
+ leap_assert_type(value, int)
+ key = fields.LAST_UID_KEY
+
+ with self._last_uid_lock:
+ mbox_doc = self._get_mbox_document(mbox)
+ old_val = mbox_doc.content[key]
+ if value < old_val:
+ logger.error("%s:%s Tried to write a UID lesser than what's "
+ "stored!" % (mbox, value))
+ mbox_doc.content[key] = value
+ self._soledad.put_doc(mbox_doc)
+
+ # deleted messages
+
+ def deleted_iter(self, mbox):
+ """
+ Get an iterator for the SoledadDocuments for messages
+ with \\Deleted flag for a given mailbox.
+
+ :return: iterator through deleted message docs
+ :rtype: iterable
+ """
+ return (doc for doc in self._soledad.get_from_index(
+ fields.TYPE_MBOX_DEL_IDX,
+ fields.TYPE_FLAGS_VAL, mbox, '1'))
+
+ # TODO can deferToThread this?
+ def remove_all_deleted(self, mbox):
+ """
+ Remove from Soledad all messages flagged as deleted for a given
+ mailbox.
+ """
+ print "DELETING ALL DOCS FOR -------", mbox
+ deleted = []
+ for doc in self.deleted_iter(mbox):
+ deleted.append(doc.content[fields.UID_KEY])
+ print
+ print ">>>>>>>>>>>>>>>>>>>>"
+ print "deleting doc: ", doc.doc_id, doc.content
+ self._soledad.delete_doc(doc)
+ return deleted