diff options
author | Kali Kaneko <kali@leap.se> | 2014-10-16 14:51:53 +0200 |
---|---|---|
committer | Kali Kaneko <kali@leap.se> | 2015-02-11 14:05:42 -0400 |
commit | 80ed7b9b85686b5d10f359114ca703dba4a5820b (patch) | |
tree | f041b51fa949c3b44e9a53a6e3c51b77fa8a6986 /src/leap/mail/imap/messages.py | |
parent | aa619124b9293b12d732e275c7f57b11dcb01f60 (diff) |
adapt to soledad 0.7 async API
Diffstat (limited to 'src/leap/mail/imap/messages.py')
-rw-r--r-- | src/leap/mail/imap/messages.py | 136 |
1 files changed, 77 insertions, 59 deletions
diff --git a/src/leap/mail/imap/messages.py b/src/leap/mail/imap/messages.py index e8d64d1..c761091 100644 --- a/src/leap/mail/imap/messages.py +++ b/src/leap/mail/imap/messages.py @@ -71,6 +71,7 @@ def try_unique_query(curried): :param curried: a curried function :type curried: callable """ + # XXX FIXME ---------- convert to deferreds leap_assert(callable(curried), "A callable is expected") try: query = curried() @@ -134,10 +135,11 @@ class LeapMessage(fields, MBoxParser): self.__chash = None self.__bdoc = None - self.reactor = reactor - # XXX make these properties public + # XXX FIXME ------ the documents can be + # deferreds too.... niice. + @property def fdoc(self): """ @@ -506,18 +508,15 @@ class LeapMessage(fields, MBoxParser): Return the document that keeps the flags for this message. """ - result = {} - try: - flag_docs = self._soledad.get_from_index( - fields.TYPE_MBOX_UID_IDX, - fields.TYPE_FLAGS_VAL, self._mbox, str(self._uid)) - result = first(flag_docs) - except Exception as exc: - # ugh! Something's broken down there! - logger.warning("ERROR while getting flags for UID: %s" % self._uid) - logger.exception(exc) - finally: - return result + def get_first_if_any(docs): + result = first(docs) + return result if result else {} + + d = self._soledad.get_from_index( + fields.TYPE_MBOX_UID_IDX, + fields.TYPE_FLAGS_VAL, self._mbox, str(self._uid)) + d.addCallback(get_first_if_any) + return d # TODO move to soledadstore instead of accessing soledad directly def _get_headers_doc(self): @@ -525,10 +524,11 @@ class LeapMessage(fields, MBoxParser): Return the document that keeps the headers for this message. """ - head_docs = self._soledad.get_from_index( + d = self._soledad.get_from_index( fields.TYPE_C_HASH_IDX, fields.TYPE_HEADERS_VAL, str(self.chash)) - return first(head_docs) + d.addCallback(lambda docs: first(docs)) + return d # TODO move to soledadstore instead of accessing soledad directly def _get_body_doc(self): @@ -536,6 +536,8 @@ class LeapMessage(fields, MBoxParser): Return the document that keeps the body for this message. """ + # XXX FIXME --- this might need a maybedeferred + # on the receiving side... hdoc_content = self.hdoc.content body_phash = hdoc_content.get( fields.BODY_KEY, None) @@ -554,13 +556,11 @@ class LeapMessage(fields, MBoxParser): return bdoc # no memstore, or no body doc found there - if self._soledad: - body_docs = self._soledad.get_from_index( - fields.TYPE_P_HASH_IDX, - fields.TYPE_CONTENT_VAL, str(body_phash)) - return first(body_docs) - else: - logger.error("No phash in container, and no soledad found!") + d = self._soledad.get_from_index( + fields.TYPE_P_HASH_IDX, + fields.TYPE_CONTENT_VAL, str(body_phash)) + d.addCallback(lambda docs: first(docs)) + return d def __getitem__(self, key): """ @@ -739,8 +739,6 @@ class MessageCollection(WithMsgFields, IndexedDB, MBoxParser): else: self._initialized[mbox] = True - self.reactor = reactor - def _get_empty_doc(self, _type=FLAGS_DOC): """ Returns an empty doc for storing different message parts. @@ -887,9 +885,10 @@ class MessageCollection(WithMsgFields, IndexedDB, MBoxParser): flags = tuple() leap_assert_type(flags, tuple) + # TODO return soledad deferred instead observer = defer.Deferred() d = self._do_parse(raw) - d.addCallback(lambda result: self.reactor.callInThread( + d.addCallback(lambda result: reactor.callInThread( self._do_add_msg, result, flags, subject, date, notify_on_disk, observer)) return observer @@ -924,17 +923,18 @@ class MessageCollection(WithMsgFields, IndexedDB, MBoxParser): msg = self.get_msg_by_uid(existing_uid) # We can say the observer that we're done - self.reactor.callFromThread(observer.callback, existing_uid) + # TODO return soledad deferred instead + reactor.callFromThread(observer.callback, existing_uid) msg.setFlags((fields.DELETED_FLAG,), -1) return - # XXX get FUCKING UID from autoincremental table + # TODO S2 -- get FUCKING UID from autoincremental table uid = self.memstore.increment_last_soledad_uid(self.mbox) # We can say the observer that we're done at this point, but # before that we should make sure it has no serious consequences # if we're issued, for instance, a fetch command right after... - # self.reactor.callFromThread(observer.callback, uid) + # reactor.callFromThread(observer.callback, uid) # if we did the notify, we need to invalidate the deferred # so not to try to fire it twice. # observer = None @@ -960,6 +960,8 @@ class MessageCollection(WithMsgFields, IndexedDB, MBoxParser): self.set_recent_flag(uid) msg_container = MessageWrapper(fd, hd, cdocs) + + # TODO S1 -- just pass this to memstore and return that deferred. self.memstore.create_message( self.mbox, uid, msg_container, observer=observer, notify_on_disk=notify_on_disk) @@ -1011,6 +1013,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MBoxParser): Get recent-flags document from Soledad for this mailbox. :rtype: SoledadDocument or None """ + # FIXME ----- use deferreds. curried = partial( self._soledad.get_from_index, fields.TYPE_MBOX_IDX, @@ -1029,6 +1032,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MBoxParser): :param uids: the uids to unset :type uid: sequence """ + # FIXME ----- use deferreds. with self._rdoc_property_lock[self.mbox]: self.recent_flags.difference_update( set(uids)) @@ -1042,11 +1046,11 @@ class MessageCollection(WithMsgFields, IndexedDB, MBoxParser): :param uid: the uid to unset :type uid: int """ + # FIXME ----- use deferreds. with self._rdoc_property_lock[self.mbox]: self.recent_flags.difference_update( set([uid])) - @deferred_to_thread def set_recent_flag(self, uid): """ Set Recent flag for a given uid. @@ -1054,6 +1058,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MBoxParser): :param uid: the uid to set :type uid: int """ + # FIXME ----- use deferreds. with self._rdoc_property_lock[self.mbox]: self.recent_flags = self.recent_flags.union( set([uid])) @@ -1068,6 +1073,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MBoxParser): the query failed. :rtype: SoledadDocument or None. """ + # FIXME ----- use deferreds. curried = partial( self._soledad.get_from_index, fields.TYPE_MBOX_C_HASH_IDX, @@ -1125,7 +1131,6 @@ class MessageCollection(WithMsgFields, IndexedDB, MBoxParser): return None return fdoc.content.get(fields.UID_KEY, None) - @deferred_to_thread def _get_uid_from_msgid(self, msgid): """ Return a UID for a given message-id. @@ -1144,7 +1149,6 @@ class MessageCollection(WithMsgFields, IndexedDB, MBoxParser): # XXX is this working? return self._get_uid_from_msgidCb(msgid) - @deferred_to_thread def set_flags(self, mbox, messages, flags, mode, observer): """ Set flags for a sequence of messages. @@ -1162,7 +1166,6 @@ class MessageCollection(WithMsgFields, IndexedDB, MBoxParser): done. :type observer: deferred """ - reactor = self.reactor getmsg = self.get_msg_by_uid def set_flags(uid, flags, mode): @@ -1173,6 +1176,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MBoxParser): setted_flags = [set_flags(uid, flags, mode) for uid in messages] result = dict(filter(None, setted_flags)) + # TODO -- remove reactor.callFromThread(observer.callback, result) # getters: generic for a mailbox @@ -1223,37 +1227,45 @@ class MessageCollection(WithMsgFields, IndexedDB, MBoxParser): If you want acess to the content, use __iter__ instead - :return: a list of u1db documents - :rtype: list of SoledadDocument + :return: a Deferred, that will fire with a list of u1db documents + :rtype: Deferred (promise of list of SoledadDocument) """ if _type not in fields.__dict__.values(): raise TypeError("Wrong type passed to get_all_docs") + # FIXME ----- either raise or return a deferred wrapper. if sameProxiedObjects(self._soledad, None): logger.warning('Tried to get messages but soledad is None!') return [] - all_docs = [doc for doc in self._soledad.get_from_index( - fields.TYPE_MBOX_IDX, - _type, self.mbox)] + def get_sorted_docs(docs): + all_docs = [doc for doc in docs] + # inneficient, but first let's grok it and then + # let's worry about efficiency. + # XXX FIXINDEX -- should implement order by in soledad + # FIXME ---------------------------------------------- + return sorted(all_docs, key=lambda item: item.content['uid']) - # inneficient, but first let's grok it and then - # let's worry about efficiency. - # XXX FIXINDEX -- should implement order by in soledad - # FIXME ---------------------------------------------- - return sorted(all_docs, key=lambda item: item.content['uid']) + d = self._soledad.get_from_index( + fields.TYPE_MBOX_IDX, _type, self.mbox) + d.addCallback(get_sorted_docs) + return d def all_soledad_uid_iter(self): """ Return an iterator through the UIDs of all messages, sorted in ascending order. """ - db_uids = set([doc.content[self.UID_KEY] for doc in - self._soledad.get_from_index( - fields.TYPE_MBOX_IDX, - fields.TYPE_FLAGS_VAL, self.mbox) - if not empty(doc)]) - return db_uids + # XXX FIXME ------ sorted??? + + def get_uids(docs): + return set([ + doc.content[self.UID_KEY] for doc in docs if not empty(doc)]) + + d = self._soledad.get_from_index( + fields.TYPE_MBOX_IDX, fields.TYPE_FLAGS_VAL, self.mbox) + d.addCallback(get_uids) + return d def all_uid_iter(self): """ @@ -1277,16 +1289,21 @@ class MessageCollection(WithMsgFields, IndexedDB, MBoxParser): # XXX we really could return a reduced version with # just {'uid': (flags-tuple,) since the prefetch is # only oriented to get the flag tuples. - all_docs = [( - doc.content[self.UID_KEY], - dict(doc.content)) - for doc in - self._soledad.get_from_index( - fields.TYPE_MBOX_IDX, - fields.TYPE_FLAGS_VAL, self.mbox) - if not empty(doc.content)] - all_flags = dict(all_docs) - return all_flags + + def get_content(docs): + all_docs = [( + doc.content[self.UID_KEY], + dict(doc.content)) + for doc in docs + if not empty(doc.content)] + all_flags = dict(all_docs) + return all_flags + + d = self._soledad.get_from_index( + fields.TYPE_MBOX_IDX, + fields.TYPE_FLAGS_VAL, self.mbox) + d.addCallback(get_content) + return d def all_headers(self): """ @@ -1339,6 +1356,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MBoxParser): # recent messages # XXX take it from memstore + # XXX Used somewhere? def count_recent(self): """ Count all messages with the `Recent` flag. |