summaryrefslogtreecommitdiff
path: root/src/leap/mail
diff options
context:
space:
mode:
authorKali Kaneko <kali@leap.se>2014-02-11 01:43:14 -0400
committerKali Kaneko <kali@leap.se>2014-02-17 11:39:48 -0400
commit4338368aa2ba0efaee742e9000e21b81af34d3db (patch)
tree3d3fff4030567a6f9a0d90682335de6c213f7d08 /src/leap/mail
parentde762b5c6e529f4e668bee1ec848eb1f6380369b (diff)
separate new and dirty queues
Diffstat (limited to 'src/leap/mail')
-rw-r--r--src/leap/mail/imap/memorystore.py80
-rw-r--r--src/leap/mail/imap/messageparts.py25
-rw-r--r--src/leap/mail/imap/soledadstore.py20
-rw-r--r--src/leap/mail/messageflow.py26
4 files changed, 102 insertions, 49 deletions
diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py
index 786a9c4..a053f3f 100644
--- a/src/leap/mail/imap/memorystore.py
+++ b/src/leap/mail/imap/memorystore.py
@@ -24,7 +24,6 @@ import weakref
from collections import defaultdict
from copy import copy
-from itertools import chain
from twisted.internet import defer
from twisted.internet.task import LoopingCall
@@ -33,7 +32,6 @@ from zope.interface import implements
from leap.common.check import leap_assert_type
from leap.mail import size
-from leap.mail.decorators import deferred_to_thread
from leap.mail.utils import empty, phash_iter
from leap.mail.messageflow import MessageProducer
from leap.mail.imap import interfaces
@@ -48,7 +46,7 @@ logger = logging.getLogger(__name__)
# The default period to do writebacks to the permanent
# soledad storage, in seconds.
-SOLEDAD_WRITE_PERIOD = 30
+SOLEDAD_WRITE_PERIOD = 15
FDOC = MessagePartType.fdoc.key
HDOC = MessagePartType.hdoc.key
@@ -106,6 +104,9 @@ class MemoryStore(object):
:param write_period: the interval to dump messages to disk, in seconds.
:type write_period: int
"""
+ from twisted.internet import reactor
+ self.reactor = reactor
+
self._permanent_store = permanent_store
self._write_period = write_period
@@ -195,11 +196,15 @@ class MemoryStore(object):
# New and dirty flags, to set MessageWrapper State.
self._new = set([])
+ self._new_queue = set([])
self._new_deferreds = {}
+
self._dirty = set([])
- self._rflags_dirty = set([])
+ self._dirty_queue = set([])
self._dirty_deferreds = {}
+ self._rflags_dirty = set([])
+
# Flag for signaling we're busy writing to the disk storage.
setattr(self, self.WRITING_FLAG, False)
@@ -297,7 +302,7 @@ class MemoryStore(object):
"""
Put an existing message.
- This will set the dirty flag on the MemoryStore.
+ This will also set the dirty flag on the MemoryStore.
:param mbox: the mailbox
:type mbox: str or unicode
@@ -498,9 +503,14 @@ class MemoryStore(object):
# is accquired
with set_bool_flag(self, self.WRITING_FLAG):
for rflags_doc_wrapper in self.all_rdocs_iter():
- self.producer.push(rflags_doc_wrapper)
- for msg_wrapper in self.all_new_dirty_msg_iter():
- self.producer.push(msg_wrapper)
+ self.producer.push(rflags_doc_wrapper,
+ state=self.producer.STATE_DIRTY)
+ for msg_wrapper in self.all_new_msg_iter():
+ self.producer.push(msg_wrapper,
+ state=self.producer.STATE_NEW)
+ for msg_wrapper in self.all_dirty_msg_iter():
+ self.producer.push(msg_wrapper,
+ state=self.producer.STATE_DIRTY)
# MemoryStore specific methods.
@@ -784,17 +794,34 @@ class MemoryStore(object):
for uid in fdoc_store[mbox]:
yield mbox, uid
- def all_new_dirty_msg_iter(self):
+ def all_new_msg_iter(self):
"""
- Return generator that iterates through all new and dirty messages.
+ Return generator that iterates through all new messages.
:return: generator of MessageWrappers
:rtype: generator
"""
gm = self.get_message
- new = (gm(*key) for key in self._new)
- dirty = (gm(*key, flags_only=True) for key in self._dirty)
- return chain(new, dirty)
+ new = [gm(*key) for key in self._new]
+ # move content from new set to the queue
+ self._new_queue.update(self._new)
+ self._new.difference_update(self._new)
+ return new
+
+ def all_dirty_msg_iter(self):
+ """
+ Return generator that iterates through all dirty messages.
+
+ :return: generator of MessageWrappers
+ :rtype: generator
+ """
+ gm = self.get_message
+ dirty = [gm(*key, flags_only=True) for key in self._dirty]
+ # move content from new and dirty sets to the queue
+
+ self._dirty_queue.update(self._dirty)
+ self._dirty.difference_update(self._dirty)
+ return dirty
def all_deleted_uid_iter(self, mbox):
"""
@@ -826,25 +853,28 @@ class MemoryStore(object):
"""
# TODO change indexing of sets to [mbox][key] too.
# XXX should return *first* the news, and *then* the dirty...
+
+ # TODO should query in queues too , true?
+ #
return map(lambda _set: key in _set, (self._new, self._dirty))
- def set_new(self, key):
+ def set_new_queued(self, key):
"""
- Add the key value to the `new` set.
+ Add the key value to the `new-queue` set.
:param key: the key for the message, in the form mbox, uid
:type key: tuple
"""
- self._new.add(key)
+ self._new_queue.add(key)
- def unset_new(self, key):
+ def unset_new_queued(self, key):
"""
- Remove the key value from the `new` set.
+ Remove the key value from the `new-queue` set.
:param key: the key for the message, in the form mbox, uid
:type key: tuple
"""
- self._new.discard(key)
+ self._new_queue.discard(key)
deferreds = self._new_deferreds
d = deferreds.get(key, None)
if d:
@@ -853,23 +883,23 @@ class MemoryStore(object):
d.callback('%s, ok' % str(key))
deferreds.pop(key)
- def set_dirty(self, key):
+ def set_dirty_queued(self, key):
"""
- Add the key value to the `dirty` set.
+ Add the key value to the `dirty-queue` set.
:param key: the key for the message, in the form mbox, uid
:type key: tuple
"""
- self._dirty.add(key)
+ self._dirty_queue.add(key)
- def unset_dirty(self, key):
+ def unset_dirty_queued(self, key):
"""
- Remove the key value from the `dirty` set.
+ Remove the key value from the `dirty-queue` set.
:param key: the key for the message, in the form mbox, uid
:type key: tuple
"""
- self._dirty.discard(key)
+ self._dirty_queue.discard(key)
deferreds = self._dirty_deferreds
d = deferreds.get(key, None)
if d:
diff --git a/src/leap/mail/imap/messageparts.py b/src/leap/mail/imap/messageparts.py
index b1f333a..9b7de86 100644
--- a/src/leap/mail/imap/messageparts.py
+++ b/src/leap/mail/imap/messageparts.py
@@ -98,7 +98,7 @@ class MessageWrapper(object):
CDOCS = "cdocs"
DOCS_ID = "docs_id"
- # Using slots to limit some the memory footprint,
+ # Using slots to limit some the memory use,
# Add your attribute here.
__slots__ = ["_dict", "_new", "_dirty", "_storetype", "memstore"]
@@ -148,7 +148,7 @@ class MessageWrapper(object):
"""
return self._new
- def _set_new(self, value=True):
+ def _set_new(self, value=False):
"""
Set the value for the `new` flag, and propagate it
to the memory store if any.
@@ -161,8 +161,8 @@ class MessageWrapper(object):
mbox = self.fdoc.content['mbox']
uid = self.fdoc.content['uid']
key = mbox, uid
- fun = [self.memstore.unset_new,
- self.memstore.set_new][int(value)]
+ fun = [self.memstore.unset_new_queued,
+ self.memstore.set_new_queued][int(value)]
fun(key)
else:
logger.warning("Could not find a memstore referenced from this "
@@ -193,8 +193,8 @@ class MessageWrapper(object):
mbox = self.fdoc.content['mbox']
uid = self.fdoc.content['uid']
key = mbox, uid
- fun = [self.memstore.unset_dirty,
- self.memstore.set_dirty][int(value)]
+ fun = [self.memstore.unset_dirty_queued,
+ self.memstore.set_dirty_queued][int(value)]
fun(key)
else:
logger.warning("Could not find a memstore referenced from this "
@@ -271,11 +271,14 @@ class MessageWrapper(object):
:rtype: generator
"""
if self._dirty:
- mbox = self.fdoc.content[fields.MBOX_KEY]
- uid = self.fdoc.content[fields.UID_KEY]
- docid_dict = self._dict[self.DOCS_ID]
- docid_dict[self.FDOC] = self.memstore.get_docid_for_fdoc(
- mbox, uid)
+ try:
+ mbox = self.fdoc.content[fields.MBOX_KEY]
+ uid = self.fdoc.content[fields.UID_KEY]
+ docid_dict = self._dict[self.DOCS_ID]
+ docid_dict[self.FDOC] = self.memstore.get_docid_for_fdoc(
+ mbox, uid)
+ except Exception as exc:
+ logger.exception(exc)
if not empty(self.fdoc.content):
yield self.fdoc
diff --git a/src/leap/mail/imap/soledadstore.py b/src/leap/mail/imap/soledadstore.py
index e7c6b29..667e64d 100644
--- a/src/leap/mail/imap/soledadstore.py
+++ b/src/leap/mail/imap/soledadstore.py
@@ -220,12 +220,15 @@ class SoledadStore(ContentDedup):
to be inserted.
:type queue: Queue
"""
- from twisted.internet import reactor
-
- while not queue.empty():
- doc_wrapper = queue.get()
- reactor.callInThread(self._consume_doc, doc_wrapper,
- self.docs_notify_queue)
+ new, dirty = queue
+ while not new.empty():
+ doc_wrapper = new.get()
+ self.reactor.callInThread(self._consume_doc, doc_wrapper,
+ self.docs_notify_queue)
+ while not dirty.empty():
+ doc_wrapper = dirty.get()
+ self.reactor.callInThread(self._consume_doc, doc_wrapper,
+ self.docs_notify_queue)
# Queue empty, flush the notifications queue.
self.docs_notify_queue(None, flush=True)
@@ -239,7 +242,8 @@ class SoledadStore(ContentDedup):
:type doc_wrapper: MessageWrapper
"""
if isinstance(doc_wrapper, MessageWrapper):
- logger.info("unsetting new flag!")
+ # XXX still needed for debug quite often
+ #logger.info("unsetting new flag!")
doc_wrapper.new = False
doc_wrapper.dirty = False
@@ -284,6 +288,8 @@ class SoledadStore(ContentDedup):
try:
self._try_call(call, item)
except Exception as exc:
+ logger.debug("ITEM WAS: %s" % str(item))
+ logger.debug("ITEM CONTENT WAS: %s" % str(item.content))
logger.exception(exc)
failed = True
continue
diff --git a/src/leap/mail/messageflow.py b/src/leap/mail/messageflow.py
index 80121c8..c8f224c 100644
--- a/src/leap/mail/messageflow.py
+++ b/src/leap/mail/messageflow.py
@@ -49,7 +49,7 @@ class IMessageProducer(Interface):
entities.
"""
- def push(self, item):
+ def push(self, item, state=None):
"""
Push a new item in the queue.
"""
@@ -101,6 +101,10 @@ class MessageProducer(object):
# and consumption is not likely (?) to consume huge amounts of memory in
# our current settings, so the need to pause the stream is not urgent now.
+ # TODO use enum
+ STATE_NEW = 1
+ STATE_DIRTY = 2
+
def __init__(self, consumer, queue=Queue.Queue, period=1):
"""
Initializes the MessageProducer
@@ -115,7 +119,8 @@ class MessageProducer(object):
# it should implement a `consume` method
self._consumer = consumer
- self._queue = queue()
+ self._queue_new = queue()
+ self._queue_dirty = queue()
self._period = period
self._loop = LoopingCall(self._check_for_new)
@@ -130,7 +135,7 @@ class MessageProducer(object):
If the queue is found empty, the loop is stopped. It will be started
again after the addition of new items.
"""
- self._consumer.consume(self._queue)
+ self._consumer.consume((self._queue_new, self._queue_dirty))
if self.is_queue_empty():
self.stop()
@@ -138,11 +143,13 @@ class MessageProducer(object):
"""
Return True if queue is empty, False otherwise.
"""
- return self._queue.empty()
+ new = self._queue_new
+ dirty = self._queue_dirty
+ return new.empty() and dirty.empty()
# public methods: IMessageProducer
- def push(self, item):
+ def push(self, item, state=None):
"""
Push a new item in the queue.
@@ -150,7 +157,14 @@ class MessageProducer(object):
"""
# XXX this might raise if the queue does not accept any new
# items. what to do then?
- self._queue.put(item)
+ queue = self._queue_new
+
+ if state == self.STATE_NEW:
+ queue = self._queue_new
+ if state == self.STATE_DIRTY:
+ queue = self._queue_dirty
+
+ queue.put(item)
self.start()
def start(self):