summaryrefslogtreecommitdiff
path: root/src/leap/mail
diff options
context:
space:
mode:
authorKali Kaneko <kali@leap.se>2014-01-23 02:36:38 -0400
committerKali Kaneko <kali@leap.se>2014-01-28 19:38:44 -0400
commite2218eec4fd91e4648160a05e3debc05efa0d0d9 (patch)
treec3d070f5e56c11f481da7a1dc541c11d8a8bc8c6 /src/leap/mail
parentd7a167e1ba5ea9bb8167e6255a81d4c96fdffef9 (diff)
add soledadstore class
move parts-related bits to messageparts pass soledad in initialization for memory messages
Diffstat (limited to 'src/leap/mail')
-rw-r--r--src/leap/mail/imap/mailbox.py29
-rw-r--r--src/leap/mail/imap/memorystore.py185
-rw-r--r--src/leap/mail/imap/messageparts.py183
-rw-r--r--src/leap/mail/imap/messages.py16
-rw-r--r--src/leap/mail/imap/service/imap.py4
-rw-r--r--src/leap/mail/imap/soledadstore.py237
6 files changed, 446 insertions, 208 deletions
diff --git a/src/leap/mail/imap/mailbox.py b/src/leap/mail/imap/mailbox.py
index 9babe6b..5e16b4b 100644
--- a/src/leap/mail/imap/mailbox.py
+++ b/src/leap/mail/imap/mailbox.py
@@ -37,8 +37,8 @@ from leap.common.events.events_pb2 import IMAP_UNREAD_MAIL
from leap.common.check import leap_assert, leap_assert_type
from leap.mail.decorators import deferred
from leap.mail.imap.fields import WithMsgFields, fields
-from leap.mail.imap.memorystore import MessageDict
from leap.mail.imap.messages import MessageCollection
+from leap.mail.imap.messageparts import MessageWrapper
from leap.mail.imap.parser import MBoxParser
logger = logging.getLogger(__name__)
@@ -549,10 +549,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
#sequence = True if uid == 0 else False
messages_asked = self._bound_seq(messages_asked)
- print "asked: ", messages_asked
seq_messg = self._filter_msg_seq(messages_asked)
-
- print "seq: ", seq_messg
getmsg = lambda uid: self.messages.get_msg_by_uid(uid)
# for sequence numbers (uid = 0)
@@ -791,29 +788,15 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
logger.debug("Tried to copy a MSG with no fdoc")
return
- #old_mbox = fdoc.content[self.MBOX_KEY]
- #old_uid = fdoc.content[self.UID_KEY]
- #old_key = old_mbox, old_uid
- #print "copying from OLD MBOX ", old_mbox
-
- # XXX bit doubt... to duplicate in memory
- # or not to...?
- # I think it should be ok to duplicate as long as we're
- # careful at the hour of writes...
- # We could use also proxies, but it will break when
- # the original mailbox is flushed.
-
- # XXX DEBUG ----------------------------------------
- #print "copying MESSAGE from %s (%s) to %s (%s)" % (
- #msg._mbox, msg._uid, self.mbox, uid_next)
-
new_fdoc = copy.deepcopy(fdoc.content)
new_fdoc[self.UID_KEY] = uid_next
new_fdoc[self.MBOX_KEY] = self.mbox
- self._memstore.put(self.mbox, uid_next, MessageDict(
- new_fdoc, hdoc.content))
+ self._memstore.create_message(
+ self.mbox, uid_next,
+ MessageWrapper(
+ new_fdoc, hdoc.content))
- # XXX use memory store
+ # XXX use memory store !!!
if hasattr(hdoc, 'doc_id'):
self.messages.add_hdocset_docid(hdoc.doc_id)
diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py
index b8829e0..7cb361f 100644
--- a/src/leap/mail/imap/memorystore.py
+++ b/src/leap/mail/imap/memorystore.py
@@ -21,187 +21,20 @@ import contextlib
import logging
import weakref
-from collections import namedtuple
-
from twisted.internet.task import LoopingCall
from zope.interface import implements
from leap.mail import size
from leap.mail.messageflow import MessageProducer
-from leap.mail.messageparts import MessagePartType
from leap.mail.imap import interfaces
from leap.mail.imap.fields import fields
+from leap.mail.imap.messageparts import MessagePartType, MessagePartDoc
+from leap.mail.imap.messageparts import MessageWrapper
+from leap.mail.imap.messageparts import ReferenciableDict
logger = logging.getLogger(__name__)
-"""
-A MessagePartDoc is a light wrapper around the dictionary-like
-data that we pass along for message parts. It can be used almost everywhere
-that you would expect a SoledadDocument, since it has a dict under the
-`content` attribute.
-
-We also keep some metadata on it, relative in part to the message as a whole,
-and sometimes to a part in particular only.
-
-* `new` indicates that the document has just been created. SoledadStore
- should just create a new doc for all the related message parts.
-* `store` indicates the type of store a given MessagePartDoc lives in.
- We currently use this to indicate that the document comes from memeory,
- but we should probably get rid of it as soon as we extend the use of the
- SoledadStore interface along LeapMessage, MessageCollection and Mailbox.
-* `part` is one of the MessagePartType enums.
-
-* `dirty` indicates that, while we already have the document in Soledad,
- we have modified its state in memory, so we need to put_doc instead while
- dumping the MemoryStore contents.
- `dirty` attribute would only apply to flags-docs and linkage-docs.
-
-
- XXX this is still not implemented!
-
-"""
-
-MessagePartDoc = namedtuple(
- 'MessagePartDoc',
- ['new', 'dirty', 'part', 'store', 'content'])
-
-
-class ReferenciableDict(dict):
- """
- A dict that can be weak-referenced.
-
- Some builtin objects are not weak-referenciable unless
- subclassed. So we do.
-
- Used to return pointers to the items in the MemoryStore.
- """
-
-
-class MessageWrapper(object):
- """
- A simple nested dictionary container around the different message subparts.
- """
- implements(interfaces.IMessageContainer)
-
- FDOC = "fdoc"
- HDOC = "hdoc"
- CDOCS = "cdocs"
-
- # XXX can use this to limit the memory footprint,
- # or is it too premature to optimize?
- # Does it work well together with the interfaces.implements?
-
- #__slots__ = ["_dict", "_new", "_dirty", "memstore"]
-
- def __init__(self, fdoc=None, hdoc=None, cdocs=None,
- from_dict=None, memstore=None,
- new=True, dirty=False):
- self._dict = {}
-
- self._new = new
- self._dirty = dirty
- self.memstore = memstore
-
- if from_dict is not None:
- self.from_dict(from_dict)
- else:
- if fdoc is not None:
- self._dict[self.FDOC] = ReferenciableDict(fdoc)
- if hdoc is not None:
- self._dict[self.HDOC] = ReferenciableDict(hdoc)
- if cdocs is not None:
- self._dict[self.CDOCS] = ReferenciableDict(cdocs)
-
- # properties
-
- @property
- def new(self):
- return self._new
-
- def set_new(self, value=True):
- self._new = value
-
- @property
- def dirty(self):
- return self._dirty
-
- def set_dirty(self, value=True):
- self._dirty = value
-
- # IMessageContainer
-
- @property
- def fdoc(self):
- _fdoc = self._dict.get(self.FDOC, None)
- if _fdoc:
- content_ref = weakref.proxy(_fdoc)
- else:
- logger.warning("NO FDOC!!!")
- content_ref = {}
- return MessagePartDoc(new=self.new, dirty=self.dirty,
- store=self._storetype,
- part=MessagePartType.fdoc,
- content=content_ref)
-
- @property
- def hdoc(self):
- _hdoc = self._dict.get(self.HDOC, None)
- if _hdoc:
- content_ref = weakref.proxy(_hdoc)
- else:
- logger.warning("NO HDOC!!!!")
- content_ref = {}
- return MessagePartDoc(new=self.new, dirty=self.dirty,
- store=self._storetype,
- part=MessagePartType.hdoc,
- content=content_ref)
-
- @property
- def cdocs(self):
- _cdocs = self._dict.get(self.CDOCS, None)
- if _cdocs:
- return weakref.proxy(_cdocs)
- else:
- return {}
-
- def walk(self):
- """
- Generator that iterates through all the parts, returning
- MessagePartDoc.
- """
- yield self.fdoc
- yield self.hdoc
- for cdoc in self.cdocs.values():
- # XXX this will break ----
- content_ref = weakref.proxy(cdoc)
- yield MessagePartDoc(new=self.new, dirty=self.dirty,
- store=self._storetype,
- part=MessagePartType.cdoc,
- content=content_ref)
-
- # i/o
-
- def as_dict(self):
- """
- Return a dict representation of the parts contained.
- """
- return self._dict
-
- def from_dict(self, msg_dict):
- """
- Populate MessageWrapper parts from a dictionary.
- It expects the same format that we use in a
- MessageWrapper.
- """
- fdoc, hdoc, cdocs = map(
- lambda part: msg_dict.get(part, None),
- [self.FDOC, self.HDOC, self.CDOCS])
- self._dict[self.FDOC] = fdoc
- self._dict[self.HDOC] = hdoc
- self._dict[self.CDOCS] = cdocs
-
-
@contextlib.contextmanager
def set_bool_flag(obj, att):
"""
@@ -232,8 +65,8 @@ class MemoryStore(object):
writes to the permanent storage is controled by the write_period parameter
in the constructor.
"""
- implements(interfaces.IMessageStore)
- implements(interfaces.IMessageStoreWriter)
+ implements(interfaces.IMessageStore,
+ interfaces.IMessageStoreWriter)
producer = None
@@ -332,7 +165,7 @@ class MemoryStore(object):
print "saving cdoc"
cdoc = self._msg_store[key]['cdocs'][cdoc_key]
- # XXX this should be done in the MessageWrapper constructor
+ # FIXME this should be done in the MessageWrapper constructor
# instead...
# first we make it weak-referenciable
referenciable_cdoc = ReferenciableDict(cdoc)
@@ -399,10 +232,8 @@ class MemoryStore(object):
"""
Get the highest UID for a given mbox.
"""
- # XXX should get from msg_store keys instead!
- if not self._new:
- return 0
- return max(self.get_uids(mbox))
+ uids = self.get_uids(mbox)
+ return uids and max(uids) or 0
def count_new_mbox(self, mbox):
"""
diff --git a/src/leap/mail/imap/messageparts.py b/src/leap/mail/imap/messageparts.py
index a47ea1d..3f89193 100644
--- a/src/leap/mail/imap/messageparts.py
+++ b/src/leap/mail/imap/messageparts.py
@@ -20,6 +20,9 @@ MessagePart implementation. Used from LeapMessage.
import logging
import re
import StringIO
+import weakref
+
+from collections import namedtuple
from enum import Enum
from zope.interface import implements
@@ -27,6 +30,7 @@ from twisted.mail import imap4
from leap.common.decorators import memoized_method
from leap.common.mail import get_email_charset
+from leap.mail.imap import interfaces
from leap.mail.imap.fields import fields
from leap.mail.utils import first
@@ -36,13 +40,188 @@ MessagePartType = Enum("hdoc", "fdoc", "cdoc")
logger = logging.getLogger(__name__)
+# XXX not needed anymoar ...
CHARSET_PATTERN = r"""charset=([\w-]+)"""
CHARSET_RE = re.compile(CHARSET_PATTERN, re.IGNORECASE)
+"""
+A MessagePartDoc is a light wrapper around the dictionary-like
+data that we pass along for message parts. It can be used almost everywhere
+that you would expect a SoledadDocument, since it has a dict under the
+`content` attribute.
+
+We also keep some metadata on it, relative in part to the message as a whole,
+and sometimes to a part in particular only.
+
+* `new` indicates that the document has just been created. SoledadStore
+ should just create a new doc for all the related message parts.
+* `store` indicates the type of store a given MessagePartDoc lives in.
+ We currently use this to indicate that the document comes from memeory,
+ but we should probably get rid of it as soon as we extend the use of the
+ SoledadStore interface along LeapMessage, MessageCollection and Mailbox.
+* `part` is one of the MessagePartType enums.
+
+* `dirty` indicates that, while we already have the document in Soledad,
+ we have modified its state in memory, so we need to put_doc instead while
+ dumping the MemoryStore contents.
+ `dirty` attribute would only apply to flags-docs and linkage-docs.
+
+
+ XXX this is still not implemented!
+
+"""
+
+MessagePartDoc = namedtuple(
+ 'MessagePartDoc',
+ ['new', 'dirty', 'part', 'store', 'content'])
+
+
+class ReferenciableDict(dict):
+ """
+ A dict that can be weak-referenced.
+
+ Some builtin objects are not weak-referenciable unless
+ subclassed. So we do.
+
+ Used to return pointers to the items in the MemoryStore.
+ """
+
+
+class MessageWrapper(object):
+ """
+ A simple nested dictionary container around the different message subparts.
+ """
+ implements(interfaces.IMessageContainer)
+
+ FDOC = "fdoc"
+ HDOC = "hdoc"
+ CDOCS = "cdocs"
+
+ # XXX can use this to limit the memory footprint,
+ # or is it too premature to optimize?
+ # Does it work well together with the interfaces.implements?
+
+ #__slots__ = ["_dict", "_new", "_dirty", "memstore"]
+
+ def __init__(self, fdoc=None, hdoc=None, cdocs=None,
+ from_dict=None, memstore=None,
+ new=True, dirty=False):
+ self._dict = {}
+ self.memstore = memstore
+
+ self._new = new
+ self._dirty = dirty
+ self._storetype = "mem"
+
+ if from_dict is not None:
+ self.from_dict(from_dict)
+ else:
+ if fdoc is not None:
+ self._dict[self.FDOC] = ReferenciableDict(fdoc)
+ if hdoc is not None:
+ self._dict[self.HDOC] = ReferenciableDict(hdoc)
+ if cdocs is not None:
+ self._dict[self.CDOCS] = ReferenciableDict(cdocs)
+
+ # properties
+
+ @property
+ def new(self):
+ return self._new
+
+ def set_new(self, value=True):
+ self._new = value
+
+ @property
+ def dirty(self):
+ return self._dirty
+
+ def set_dirty(self, value=True):
+ self._dirty = value
+
+ # IMessageContainer
+
+ @property
+ def fdoc(self):
+ _fdoc = self._dict.get(self.FDOC, None)
+ if _fdoc:
+ content_ref = weakref.proxy(_fdoc)
+ else:
+ logger.warning("NO FDOC!!!")
+ content_ref = {}
+ return MessagePartDoc(new=self.new, dirty=self.dirty,
+ store=self._storetype,
+ part=MessagePartType.fdoc,
+ content=content_ref)
+
+ @property
+ def hdoc(self):
+ _hdoc = self._dict.get(self.HDOC, None)
+ if _hdoc:
+ content_ref = weakref.proxy(_hdoc)
+ else:
+ logger.warning("NO HDOC!!!!")
+ content_ref = {}
+ return MessagePartDoc(new=self.new, dirty=self.dirty,
+ store=self._storetype,
+ part=MessagePartType.hdoc,
+ content=content_ref)
+
+ @property
+ def cdocs(self):
+ _cdocs = self._dict.get(self.CDOCS, None)
+ if _cdocs:
+ return weakref.proxy(_cdocs)
+ else:
+ return {}
+
+ def walk(self):
+ """
+ Generator that iterates through all the parts, returning
+ MessagePartDoc.
+ """
+ yield self.fdoc
+ yield self.hdoc
+ for cdoc in self.cdocs.values():
+ # XXX this will break ----
+ #content_ref = weakref.proxy(cdoc)
+ #yield MessagePartDoc(new=self.new, dirty=self.dirty,
+ #store=self._storetype,
+ #part=MessagePartType.cdoc,
+ #content=content_ref)
+
+ # the put is handling this for us, so
+ # we already have stored a MessagePartDoc
+ # but we should really do it while adding in the
+ # constructor or the from_dict method
+ yield cdoc
+
+ # i/o
+
+ def as_dict(self):
+ """
+ Return a dict representation of the parts contained.
+ """
+ return self._dict
+
+ def from_dict(self, msg_dict):
+ """
+ Populate MessageWrapper parts from a dictionary.
+ It expects the same format that we use in a
+ MessageWrapper.
+ """
+ fdoc, hdoc, cdocs = map(
+ lambda part: msg_dict.get(part, None),
+ [self.FDOC, self.HDOC, self.CDOCS])
+ self._dict[self.FDOC] = fdoc
+ self._dict[self.HDOC] = hdoc
+ self._dict[self.CDOCS] = cdocs
+
class MessagePart(object):
"""
- IMessagePart implementor.
+ IMessagePart implementor, to be passed to several methods
+ of the IMAP4Server.
It takes a subpart message and is able to find
the inner parts.
@@ -117,6 +296,8 @@ class MessagePart(object):
payload = str("")
if payload:
+ # XXX use find_charset instead --------------------------
+ # bad rebase???
content_type = self._get_ctype_from_document(phash)
charset = first(CHARSET_RE.findall(content_type))
logger.debug("Got charset from header: %s" % (charset,))
diff --git a/src/leap/mail/imap/messages.py b/src/leap/mail/imap/messages.py
index 67e5a41..46c9dc9 100644
--- a/src/leap/mail/imap/messages.py
+++ b/src/leap/mail/imap/messages.py
@@ -41,7 +41,7 @@ from leap.mail.utils import first, find_charset, lowerdict
from leap.mail.decorators import deferred
from leap.mail.imap.index import IndexedDB
from leap.mail.imap.fields import fields, WithMsgFields
-from leap.mail.imap.memorystore import MessageDict
+from leap.mail.imap.memorystore import MessageWrapper
from leap.mail.imap.parser import MailParser, MBoxParser
logger = logging.getLogger(__name__)
@@ -984,7 +984,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
# TODO ---- add reference to original doc, to be deleted
# after writes are done.
- msg_container = MessageDict(fd, hd, cdocs)
+ msg_container = MessageWrapper(fd, hd, cdocs)
self._memstore.create_message(self.mbox, uid, msg_container)
def _remove_cb(self, result):
@@ -1215,6 +1215,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
# and we cannot find it otherwise. This seems to be enough.
# XXX do a deferLater instead ??
+ # FIXME this won't be needed after the CHECK command is implemented.
time.sleep(0.3)
return self._get_uid_from_msgidCb(msgid)
@@ -1233,11 +1234,14 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
:rtype: LeapMessage
"""
print "getting msg by id!"
- msg_container = self._memstore.get(self.mbox, uid)
+ msg_container = self._memstore.get_message(self.mbox, uid)
print "msg container", msg_container
if msg_container is not None:
print "getting LeapMessage (from memstore)"
- msg = LeapMessage(None, uid, self.mbox, collection=self,
+ # We pass a reference to soledad just to be able to retrieve
+ # missing parts that cannot be found in the container, like
+ # the content docs after a copy.
+ msg = LeapMessage(self._soledad, uid, self.mbox, collection=self,
container=msg_container)
print "got msg:", msg
else:
@@ -1309,7 +1313,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
"""
Return a dict with all flags documents for this mailbox.
"""
- # XXX get all from memstore and cahce it there
+ # XXX get all from memstore and cache it there
all_flags = dict(((
doc.content[self.UID_KEY],
doc.content[self.FLAGS_KEY]) for doc in
@@ -1319,7 +1323,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
if self._memstore is not None:
# XXX
uids = self._memstore.get_uids(self.mbox)
- fdocs = [(uid, self._memstore.get(self.mbox, uid).fdoc)
+ fdocs = [(uid, self._memstore.get_message(self.mbox, uid).fdoc)
for uid in uids]
for uid, doc in fdocs:
all_flags[uid] = doc.content[self.FLAGS_KEY]
diff --git a/src/leap/mail/imap/service/imap.py b/src/leap/mail/imap/service/imap.py
index 3f99da6..8350988 100644
--- a/src/leap/mail/imap/service/imap.py
+++ b/src/leap/mail/imap/service/imap.py
@@ -32,6 +32,7 @@ from leap.mail.imap.account import SoledadBackedAccount
from leap.mail.imap.fetch import LeapIncomingMail
from leap.mail.imap.memorystore import MemoryStore
from leap.mail.imap.server import LeapIMAPServer
+from leap.mail.imap.soledadstore import SoledadStore
from leap.soledad.client import Soledad
# The default port in which imap service will run
@@ -96,7 +97,8 @@ class LeapIMAPFactory(ServerFactory):
self._uuid = uuid
self._userid = userid
self._soledad = soledad
- self._memstore = MemoryStore()
+ self._memstore = MemoryStore(
+ permanent_store=SoledadStore(soledad))
theAccount = SoledadBackedAccount(
uuid, soledad=soledad,
diff --git a/src/leap/mail/imap/soledadstore.py b/src/leap/mail/imap/soledadstore.py
new file mode 100644
index 0000000..62a3c53
--- /dev/null
+++ b/src/leap/mail/imap/soledadstore.py
@@ -0,0 +1,237 @@
+# -*- coding: utf-8 -*-
+# soledadstore.py
+# Copyright (C) 2014 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+A MessageStore that writes to Soledad.
+"""
+import logging
+
+from u1db import errors as u1db_errors
+from zope.interface import implements
+
+from leap.mail.imap.messageparts import MessagePartType
+from leap.mail.imap.fields import fields
+from leap.mail.imap.interfaces import IMessageStore
+from leap.mail.messageflow import IMessageConsumer
+
+logger = logging.getLogger(__name__)
+
+
+class ContentDedup(object):
+ """
+ Message deduplication.
+
+ We do a query for the content hashes before writing to our beloved
+ sqlcipher backend of Soledad. This means, by now, that:
+
+ 1. We will not store the same attachment twice, only the hash of it.
+ 2. We will not store the same message body twice, only the hash of it.
+
+ The first case is useful if you are always receiving the same old memes
+ from unwary friends that still have not discovered that 4chan is the
+ generator of the internet. The second will save your day if you have
+ initiated session with the same account in two different machines. I also
+ wonder why would you do that, but let's respect each other choices, like
+ with the religious celebrations, and assume that one day we'll be able
+ to run Bitmask in completely free phones. Yes, I mean that, the whole GSM
+ Stack.
+ """
+
+ def _header_does_exist(self, doc):
+ """
+ Check whether we already have a header document for this
+ content hash in our database.
+
+ :param doc: tentative header document
+ :type doc: dict
+ :returns: True if it exists, False otherwise.
+ """
+ if not doc:
+ return False
+ chash = doc[fields.CONTENT_HASH_KEY]
+ header_docs = self._soledad.get_from_index(
+ fields.TYPE_C_HASH_IDX,
+ fields.TYPE_HEADERS_VAL, str(chash))
+ if not header_docs:
+ return False
+
+ if len(header_docs) != 1:
+ logger.warning("Found more than one copy of chash %s!"
+ % (chash,))
+ logger.debug("Found header doc with that hash! Skipping save!")
+ return True
+
+ def _content_does_exist(self, doc):
+ """
+ Check whether we already have a content document for a payload
+ with this hash in our database.
+
+ :param doc: tentative content document
+ :type doc: dict
+ :returns: True if it exists, False otherwise.
+ """
+ if not doc:
+ return False
+ phash = doc[fields.PAYLOAD_HASH_KEY]
+ attach_docs = self._soledad.get_from_index(
+ fields.TYPE_P_HASH_IDX,
+ fields.TYPE_CONTENT_VAL, str(phash))
+ if not attach_docs:
+ return False
+
+ if len(attach_docs) != 1:
+ logger.warning("Found more than one copy of phash %s!"
+ % (phash,))
+ logger.debug("Found attachment doc with that hash! Skipping save!")
+ return True
+
+
+class SoledadStore(ContentDedup):
+ """
+ This will create docs in the local Soledad database.
+ """
+
+ implements(IMessageConsumer, IMessageStore)
+
+ def __init__(self, soledad):
+ """
+ Initialize the writer.
+
+ :param soledad: the soledad instance
+ :type soledad: Soledad
+ """
+ self._soledad = soledad
+
+ # IMessageStore
+
+ # -------------------------------------------------------------------
+ # We are not yet using this interface, but it would make sense
+ # to implement it.
+
+ def create_message(self, mbox, uid, message):
+ """
+ Create the passed message into this SoledadStore.
+
+ :param mbox: the mbox this message belongs.
+ :param uid: the UID that identifies this message in this mailbox.
+ :param message: a IMessageContainer implementor.
+ """
+
+ def put_message(self, mbox, uid, message):
+ """
+ Put the passed existing message into this SoledadStore.
+
+ :param mbox: the mbox this message belongs.
+ :param uid: the UID that identifies this message in this mailbox.
+ :param message: a IMessageContainer implementor.
+ """
+
+ def remove_message(self, mbox, uid):
+ """
+ Remove the given message from this SoledadStore.
+
+ :param mbox: the mbox this message belongs.
+ :param uid: the UID that identifies this message in this mailbox.
+ """
+
+ def get_message(self, mbox, uid):
+ """
+ Get a IMessageContainer for the given mbox and uid combination.
+
+ :param mbox: the mbox this message belongs.
+ :param uid: the UID that identifies this message in this mailbox.
+ """
+
+ # IMessageConsumer
+
+ def consume(self, queue):
+ """
+ Creates a new document in soledad db.
+
+ :param queue: queue to get item from, with content of the document
+ to be inserted.
+ :type queue: Queue
+ """
+ # TODO should delete the original message from incoming after
+ # the writes are done.
+ # TODO should handle the delete case
+ # TODO should handle errors
+
+ empty = queue.empty()
+ while not empty:
+ for item, call in self._process(queue):
+ self._try_call(call, item)
+ empty = queue.empty()
+
+ #
+ # SoledadStore specific methods.
+ #
+
+ def _process(self, queue):
+ """
+ Return the item and the proper call type for the next
+ item in the queue if any.
+
+ :param queue: the queue from where we'll pick item.
+ :type queue: Queue
+ """
+ msg_wrapper = queue.get()
+ return self._get_calls_for_msg_parts(msg_wrapper)
+
+ def _try_call(self, call, item):
+ """
+ Try to invoke a given call with item as a parameter.
+ """
+ if not call:
+ return
+ try:
+ call(item)
+ except u1db_errors.RevisionConflict as exc:
+ logger.error("Error: %r" % (exc,))
+ raise exc
+
+ def _get_calls_for_msg_parts(self, msg_wrapper):
+ """
+ Return the proper call type for a given item.
+
+ :param msg_wrapper: A MessageWrapper
+ :type msg_wrapper: IMessageContainer
+ """
+ call = None
+
+ if msg_wrapper.new is True:
+ call = self._soledad.create_doc
+
+ # item is expected to be a MessagePartDoc
+ for item in msg_wrapper.walk():
+ if item.part == MessagePartType.fdoc:
+ yield dict(item.content), call
+
+ if item.part == MessagePartType.hdoc:
+ if not self._header_does_exist(item.content):
+ yield dict(item.content), call
+
+ if item.part == MessagePartType.cdoc:
+ if self._content_does_exist(item.content):
+ yield dict(item.content), call
+
+ # TODO should check for elements with the dirty state
+ # TODO if new == False and dirty == True, put_doc
+ # XXX for puts, we will have to retrieve
+ # the document, change the content, and
+ # pass the whole document under "content"
+ else:
+ logger.error("Cannot put documents yet!")