diff options
author | Kali Kaneko <kali@leap.se> | 2014-01-21 19:22:09 -0400 |
---|---|---|
committer | Kali Kaneko <kali@leap.se> | 2014-01-28 19:38:09 -0400 |
commit | 4ae6ad57a0f80143e3ded867c1fdd2264804a775 (patch) | |
tree | cb3d3a7ad91af220a292cf8ab3de332b6138ab32 /src/leap/mail/imap/messages.py | |
parent | 781bd2f4d2a047088d1a0ecd673a38c80ea0c0c0 (diff) |
memory store for append/fetch/copy
Diffstat (limited to 'src/leap/mail/imap/messages.py')
-rw-r--r-- | src/leap/mail/imap/messages.py | 206 |
1 files changed, 155 insertions, 51 deletions
diff --git a/src/leap/mail/imap/messages.py b/src/leap/mail/imap/messages.py index 34304ea..ef0b0a1 100644 --- a/src/leap/mail/imap/messages.py +++ b/src/leap/mail/imap/messages.py @@ -42,6 +42,7 @@ from leap.mail.utils import first, find_charset from leap.mail.decorators import deferred from leap.mail.imap.index import IndexedDB from leap.mail.imap.fields import fields, WithMsgFields +from leap.mail.imap.memorystore import MessageDict from leap.mail.imap.parser import MailParser, MBoxParser from leap.mail.messageflow import IMessageConsumer @@ -49,11 +50,20 @@ logger = logging.getLogger(__name__) # TODO ------------------------------------------------------------ +# [ ] Add ref to incoming message during add_msg # [ ] Add linked-from info. # [ ] Delete incoming mail only after successful write! # [ ] Remove UID from syncable db. Store only those indexes locally. +# XXX no longer needed, since i'm using proxies instead of direct weakrefs +def maybe_call(thing): + """ + Return the same thing, or the result of its invocation if it is a callable. + """ + return thing() if callable(thing) else thing + + def lowerdict(_dict): """ Return a dict with the keys in lowercase. @@ -333,7 +343,7 @@ class LeapMessage(fields, MailParser, MBoxParser): implements(imap4.IMessage) - def __init__(self, soledad, uid, mbox, collection=None): + def __init__(self, soledad, uid, mbox, collection=None, container=None): """ Initializes a LeapMessage. @@ -345,12 +355,15 @@ class LeapMessage(fields, MailParser, MBoxParser): :type mbox: basestring :param collection: a reference to the parent collection object :type collection: MessageCollection + :param container: a IMessageContainer implementor instance + :type container: IMessageContainer """ MailParser.__init__(self) self._soledad = soledad self._uid = int(uid) self._mbox = self._parse_mailbox_name(mbox) self._collection = collection + self._container = container self.__chash = None self.__bdoc = None @@ -361,13 +374,29 @@ class LeapMessage(fields, MailParser, MBoxParser): An accessor to the flags document. """ if all(map(bool, (self._uid, self._mbox))): - fdoc = self._get_flags_doc() + fdoc = None + if self._container is not None: + fdoc = self._container.fdoc + if not fdoc: + fdoc = self._get_flags_doc() if fdoc: - self.__chash = fdoc.content.get( + fdoc_content = maybe_call(fdoc.content) + self.__chash = fdoc_content.get( fields.CONTENT_HASH_KEY, None) return fdoc @property + def _hdoc(self): + """ + An accessor to the headers document. + """ + if self._container is not None: + hdoc = self._container.hdoc + if hdoc: + return hdoc + return self._get_headers_doc() + + @property def _chash(self): """ An accessor to the content hash for this message. @@ -375,18 +404,11 @@ class LeapMessage(fields, MailParser, MBoxParser): if not self._fdoc: return None if not self.__chash and self._fdoc: - self.__chash = self._fdoc.content.get( + self.__chash = maybe_call(self._fdoc.content).get( fields.CONTENT_HASH_KEY, None) return self.__chash @property - def _hdoc(self): - """ - An accessor to the headers document. - """ - return self._get_headers_doc() - - @property def _bdoc(self): """ An accessor to the body document. @@ -422,7 +444,7 @@ class LeapMessage(fields, MailParser, MBoxParser): flags = [] fdoc = self._fdoc if fdoc: - flags = fdoc.content.get(self.FLAGS_KEY, None) + flags = maybe_call(fdoc.content).get(self.FLAGS_KEY, None) msgcol = self._collection @@ -449,6 +471,8 @@ class LeapMessage(fields, MailParser, MBoxParser): :return: a SoledadDocument instance :rtype: SoledadDocument """ + # XXX use memory store ...! + leap_assert(isinstance(flags, tuple), "flags need to be a tuple") log.msg('setting flags: %s (%s)' % (self._uid, flags)) @@ -461,7 +485,9 @@ class LeapMessage(fields, MailParser, MBoxParser): doc.content[self.FLAGS_KEY] = flags doc.content[self.SEEN_KEY] = self.SEEN_FLAG in flags doc.content[self.DEL_KEY] = self.DELETED_FLAG in flags - self._soledad.put_doc(doc) + + if getattr(doc, 'store', None) != "mem": + self._soledad.put_doc(doc) def addFlags(self, flags): """ @@ -521,18 +547,26 @@ class LeapMessage(fields, MailParser, MBoxParser): """ # TODO refactor with getBodyFile in MessagePart fd = StringIO.StringIO() - bdoc = self._bdoc - if bdoc: - body = self._bdoc.content.get(self.RAW_KEY, "") - content_type = bdoc.content.get('content-type', "") + if self._bdoc is not None: + bdoc_content = self._bdoc.content + body = bdoc_content.get(self.RAW_KEY, "") + content_type = bdoc_content.get('content-type', "") charset = find_charset(content_type) + logger.debug('got charset from content-type: %s' % charset) if charset is None: charset = self._get_charset(body) try: body = body.encode(charset) except UnicodeError as e: - logger.error("Unicode error, using 'replace'. {0!r}".format(e)) - body = body.encode(charset, 'replace') + logger.error("Unicode error {0}".format(e)) + logger.debug("Attempted to encode with: %s" % charset) + try: + body = body.encode(charset, 'replace') + except UnicodeError as e: + try: + body = body.encode('utf-8', 'replace') + except: + pass # We are still returning funky characters from here. else: @@ -567,7 +601,8 @@ class LeapMessage(fields, MailParser, MBoxParser): """ size = None if self._fdoc: - size = self._fdoc.content.get(self.SIZE_KEY, False) + fdoc_content = maybe_call(self._fdoc.content) + size = fdoc_content.get(self.SIZE_KEY, False) else: logger.warning("No FLAGS doc for %s:%s" % (self._mbox, self._uid)) @@ -632,7 +667,8 @@ class LeapMessage(fields, MailParser, MBoxParser): Return the headers dict for this message. """ if self._hdoc is not None: - headers = self._hdoc.content.get(self.HEADERS_KEY, {}) + hdoc_content = maybe_call(self._hdoc.content) + headers = hdoc_content.get(self.HEADERS_KEY, {}) return headers else: @@ -646,7 +682,8 @@ class LeapMessage(fields, MailParser, MBoxParser): Return True if this message is multipart. """ if self._fdoc: - is_multipart = self._fdoc.content.get(self.MULTIPART_KEY, False) + fdoc_content = maybe_call(self._fdoc.content) + is_multipart = fdoc_content.get(self.MULTIPART_KEY, False) return is_multipart else: logger.warning( @@ -688,7 +725,8 @@ class LeapMessage(fields, MailParser, MBoxParser): logger.warning("Tried to get part but no HDOC found!") return None - pmap = self._hdoc.content.get(fields.PARTS_MAP_KEY, {}) + hdoc_content = maybe_call(self._hdoc.content) + pmap = hdoc_content.get(fields.PARTS_MAP_KEY, {}) return pmap[str(part)] def _get_flags_doc(self): @@ -724,16 +762,33 @@ class LeapMessage(fields, MailParser, MBoxParser): Return the document that keeps the body for this message. """ - body_phash = self._hdoc.content.get( + hdoc_content = maybe_call(self._hdoc.content) + body_phash = hdoc_content.get( fields.BODY_KEY, None) if not body_phash: logger.warning("No body phash for this document!") return None - body_docs = self._soledad.get_from_index( - fields.TYPE_P_HASH_IDX, - fields.TYPE_CONTENT_VAL, str(body_phash)) - return first(body_docs) + # XXX get from memstore too... + # if memstore: memstore.get_phrash + # memstore should keep a dict with weakrefs to the + # phash doc... + + if self._container is not None: + bdoc = self._container.memstore.get_by_phash(body_phash) + if bdoc: + return bdoc + else: + print "no doc for that phash found!" + + # no memstore or no doc found there + if self._soledad: + body_docs = self._soledad.get_from_index( + fields.TYPE_P_HASH_IDX, + fields.TYPE_CONTENT_VAL, str(body_phash)) + return first(body_docs) + else: + logger.error("No phash in container, and no soledad found!") def __getitem__(self, key): """ @@ -746,7 +801,7 @@ class LeapMessage(fields, MailParser, MBoxParser): :return: The content value indexed by C{key} or None :rtype: str """ - return self._fdoc.content.get(key, None) + return maybe_call(self._fdoc.content).get(key, None) # setters @@ -790,6 +845,8 @@ class LeapMessage(fields, MailParser, MBoxParser): # until we think about a good way of deorphaning. # Maybe a crawler of unreferenced docs. + # XXX remove from memory store!!! + # XXX implement elijah's idea of using a PUT document as a # token to ensure consistency in the removal. @@ -957,7 +1014,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, """ A collection of messages, surprisingly. - It is tied to a selected mailbox name that is passed to constructor. + It is tied to a selected mailbox name that is passed to its constructor. Implements a filter query over the messages contained in a soledad database. """ @@ -1058,7 +1115,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, _hdocset_lock = threading.Lock() _hdocset_property_lock = threading.Lock() - def __init__(self, mbox=None, soledad=None): + def __init__(self, mbox=None, soledad=None, memstore=None): """ Constructor for MessageCollection. @@ -1068,13 +1125,18 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, MessageCollection for each mailbox, instead of treating them as a property of each message. + We are passed an instance of MemoryStore, the same for the + SoledadBackedAccount, that we use as a read cache and a buffer + for writes. + :param mbox: the name of the mailbox. It is the name with which we filter the query over the - messages database + messages database. :type mbox: str - :param soledad: Soledad database :type soledad: Soledad instance + :param memstore: a MemoryStore instance + :type memstore: MemoryStore """ MailParser.__init__(self) leap_assert(mbox, "Need a mailbox name to initialize") @@ -1086,6 +1148,8 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, # okay, all in order, keep going... self.mbox = self._parse_mailbox_name(mbox) self._soledad = soledad + self._memstore = memstore + self.__rflags = None self.__hdocset = None self.initialize_db() @@ -1241,6 +1305,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, # when all the processing is done. # TODO add the linked-from info ! + # TODO add reference to the original message logger.debug('adding message') if flags is None: @@ -1273,24 +1338,29 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, hd[key] = parts_map[key] del parts_map - # Saving ---------------------------------------- - self.set_recent_flag(uid) + # The MessageContainer expects a dict, zero-indexed + # XXX review-me + cdocs = dict((index, doc) for index, doc in + enumerate(walk.get_raw_docs(msg, parts))) + print "cdocs is", cdocs - # first, regular docs: flags and headers - self._soledad.create_doc(fd) + # Saving ---------------------------------------- # XXX should check for content duplication on headers too # but with chash. !!! - hdoc = self._soledad.create_doc(hd) + + # XXX adapt hdocset to use memstore + #hdoc = self._soledad.create_doc(hd) # We add the newly created hdoc to the fast-access set of # headers documents associated with the mailbox. - self.add_hdocset_docid(hdoc.doc_id) + #self.add_hdocset_docid(hdoc.doc_id) - # and last, but not least, try to create - # content docs if not already there. - cdocs = walk.get_raw_docs(msg, parts) - for cdoc in cdocs: - if not self._content_does_exist(cdoc): - self._soledad.create_doc(cdoc) + # XXX move to memory store too + # self.set_recent_flag(uid) + + # TODO ---- add reference to original doc, to be deleted + # after writes are done. + msg_container = MessageDict(fd, hd, cdocs) + self._memstore.put(self.mbox, uid, msg_container) def _remove_cb(self, result): return result @@ -1321,6 +1391,8 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, # # recent flags + # XXX FIXME ------------------------------------- + # This should be rewritten to use memory store. def _get_recent_flags(self): """ @@ -1390,6 +1462,9 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, # headers-docs-set + # XXX FIXME ------------------------------------- + # This should be rewritten to use memory store. + def _get_hdocset(self): """ An accessor for the hdocs-set for this mailbox. @@ -1532,7 +1607,16 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, or None if not found. :rtype: LeapMessage """ - msg = LeapMessage(self._soledad, uid, self.mbox, collection=self) + print "getting msg by id!" + msg_container = self._memstore.get(self.mbox, uid) + print "msg container", msg_container + if msg_container is not None: + print "getting LeapMessage (from memstore)" + msg = LeapMessage(None, uid, self.mbox, collection=self, + container=msg_container) + print "got msg:", msg + else: + msg = LeapMessage(self._soledad, uid, self.mbox, collection=self) if not msg.does_exist(): return None return msg @@ -1570,11 +1654,19 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, ascending order. """ # XXX we should get this from the uid table, local-only - all_uids = (doc.content[self.UID_KEY] for doc in - self._soledad.get_from_index( - fields.TYPE_MBOX_IDX, - fields.TYPE_FLAGS_VAL, self.mbox)) - return (u for u in sorted(all_uids)) + # XXX FIXME ------------- + # This should be cached in the memstoretoo + db_uids = set([doc.content[self.UID_KEY] for doc in + self._soledad.get_from_index( + fields.TYPE_MBOX_IDX, + fields.TYPE_FLAGS_VAL, self.mbox)]) + if self._memstore is not None: + mem_uids = self._memstore.get_uids(self.mbox) + uids = db_uids.union(set(mem_uids)) + else: + uids = db_uids + + return (u for u in sorted(uids)) def reset_last_uid(self, param): """ @@ -1592,12 +1684,21 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, """ Return a dict with all flags documents for this mailbox. """ + # XXX get all from memstore and cahce it there all_flags = dict((( doc.content[self.UID_KEY], doc.content[self.FLAGS_KEY]) for doc in self._soledad.get_from_index( fields.TYPE_MBOX_IDX, fields.TYPE_FLAGS_VAL, self.mbox))) + if self._memstore is not None: + # XXX + uids = self._memstore.get_uids(self.mbox) + fdocs = [(uid, self._memstore.get(self.mbox, uid).fdoc) + for uid in uids] + for uid, doc in fdocs: + all_flags[uid] = doc.content[self.FLAGS_KEY] + return all_flags def all_flags_chash(self): @@ -1630,9 +1731,12 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, :rtype: int """ + # XXX We could cache this in memstore too until next write... count = self._soledad.get_count_from_index( fields.TYPE_MBOX_IDX, fields.TYPE_FLAGS_VAL, self.mbox) + if self._memstore is not None: + count += self._memstore.count_new() return count # unseen messages |