summaryrefslogtreecommitdiff
path: root/src/leap/mail/imap/memorystore.py
diff options
context:
space:
mode:
authorKali Kaneko <kali@leap.se>2014-02-11 01:43:14 -0400
committerKali Kaneko <kali@leap.se>2014-02-17 11:39:48 -0400
commit4338368aa2ba0efaee742e9000e21b81af34d3db (patch)
tree3d3fff4030567a6f9a0d90682335de6c213f7d08 /src/leap/mail/imap/memorystore.py
parentde762b5c6e529f4e668bee1ec848eb1f6380369b (diff)
separate new and dirty queues
Diffstat (limited to 'src/leap/mail/imap/memorystore.py')
-rw-r--r--src/leap/mail/imap/memorystore.py80
1 files changed, 55 insertions, 25 deletions
diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py
index 786a9c4..a053f3f 100644
--- a/src/leap/mail/imap/memorystore.py
+++ b/src/leap/mail/imap/memorystore.py
@@ -24,7 +24,6 @@ import weakref
from collections import defaultdict
from copy import copy
-from itertools import chain
from twisted.internet import defer
from twisted.internet.task import LoopingCall
@@ -33,7 +32,6 @@ from zope.interface import implements
from leap.common.check import leap_assert_type
from leap.mail import size
-from leap.mail.decorators import deferred_to_thread
from leap.mail.utils import empty, phash_iter
from leap.mail.messageflow import MessageProducer
from leap.mail.imap import interfaces
@@ -48,7 +46,7 @@ logger = logging.getLogger(__name__)
# The default period to do writebacks to the permanent
# soledad storage, in seconds.
-SOLEDAD_WRITE_PERIOD = 30
+SOLEDAD_WRITE_PERIOD = 15
FDOC = MessagePartType.fdoc.key
HDOC = MessagePartType.hdoc.key
@@ -106,6 +104,9 @@ class MemoryStore(object):
:param write_period: the interval to dump messages to disk, in seconds.
:type write_period: int
"""
+ from twisted.internet import reactor
+ self.reactor = reactor
+
self._permanent_store = permanent_store
self._write_period = write_period
@@ -195,11 +196,15 @@ class MemoryStore(object):
# New and dirty flags, to set MessageWrapper State.
self._new = set([])
+ self._new_queue = set([])
self._new_deferreds = {}
+
self._dirty = set([])
- self._rflags_dirty = set([])
+ self._dirty_queue = set([])
self._dirty_deferreds = {}
+ self._rflags_dirty = set([])
+
# Flag for signaling we're busy writing to the disk storage.
setattr(self, self.WRITING_FLAG, False)
@@ -297,7 +302,7 @@ class MemoryStore(object):
"""
Put an existing message.
- This will set the dirty flag on the MemoryStore.
+ This will also set the dirty flag on the MemoryStore.
:param mbox: the mailbox
:type mbox: str or unicode
@@ -498,9 +503,14 @@ class MemoryStore(object):
# is accquired
with set_bool_flag(self, self.WRITING_FLAG):
for rflags_doc_wrapper in self.all_rdocs_iter():
- self.producer.push(rflags_doc_wrapper)
- for msg_wrapper in self.all_new_dirty_msg_iter():
- self.producer.push(msg_wrapper)
+ self.producer.push(rflags_doc_wrapper,
+ state=self.producer.STATE_DIRTY)
+ for msg_wrapper in self.all_new_msg_iter():
+ self.producer.push(msg_wrapper,
+ state=self.producer.STATE_NEW)
+ for msg_wrapper in self.all_dirty_msg_iter():
+ self.producer.push(msg_wrapper,
+ state=self.producer.STATE_DIRTY)
# MemoryStore specific methods.
@@ -784,17 +794,34 @@ class MemoryStore(object):
for uid in fdoc_store[mbox]:
yield mbox, uid
- def all_new_dirty_msg_iter(self):
+ def all_new_msg_iter(self):
"""
- Return generator that iterates through all new and dirty messages.
+ Return generator that iterates through all new messages.
:return: generator of MessageWrappers
:rtype: generator
"""
gm = self.get_message
- new = (gm(*key) for key in self._new)
- dirty = (gm(*key, flags_only=True) for key in self._dirty)
- return chain(new, dirty)
+ new = [gm(*key) for key in self._new]
+ # move content from new set to the queue
+ self._new_queue.update(self._new)
+ self._new.difference_update(self._new)
+ return new
+
+ def all_dirty_msg_iter(self):
+ """
+ Return generator that iterates through all dirty messages.
+
+ :return: generator of MessageWrappers
+ :rtype: generator
+ """
+ gm = self.get_message
+ dirty = [gm(*key, flags_only=True) for key in self._dirty]
+ # move content from new and dirty sets to the queue
+
+ self._dirty_queue.update(self._dirty)
+ self._dirty.difference_update(self._dirty)
+ return dirty
def all_deleted_uid_iter(self, mbox):
"""
@@ -826,25 +853,28 @@ class MemoryStore(object):
"""
# TODO change indexing of sets to [mbox][key] too.
# XXX should return *first* the news, and *then* the dirty...
+
+ # TODO should query in queues too , true?
+ #
return map(lambda _set: key in _set, (self._new, self._dirty))
- def set_new(self, key):
+ def set_new_queued(self, key):
"""
- Add the key value to the `new` set.
+ Add the key value to the `new-queue` set.
:param key: the key for the message, in the form mbox, uid
:type key: tuple
"""
- self._new.add(key)
+ self._new_queue.add(key)
- def unset_new(self, key):
+ def unset_new_queued(self, key):
"""
- Remove the key value from the `new` set.
+ Remove the key value from the `new-queue` set.
:param key: the key for the message, in the form mbox, uid
:type key: tuple
"""
- self._new.discard(key)
+ self._new_queue.discard(key)
deferreds = self._new_deferreds
d = deferreds.get(key, None)
if d:
@@ -853,23 +883,23 @@ class MemoryStore(object):
d.callback('%s, ok' % str(key))
deferreds.pop(key)
- def set_dirty(self, key):
+ def set_dirty_queued(self, key):
"""
- Add the key value to the `dirty` set.
+ Add the key value to the `dirty-queue` set.
:param key: the key for the message, in the form mbox, uid
:type key: tuple
"""
- self._dirty.add(key)
+ self._dirty_queue.add(key)
- def unset_dirty(self, key):
+ def unset_dirty_queued(self, key):
"""
- Remove the key value from the `dirty` set.
+ Remove the key value from the `dirty-queue` set.
:param key: the key for the message, in the form mbox, uid
:type key: tuple
"""
- self._dirty.discard(key)
+ self._dirty_queue.discard(key)
deferreds = self._dirty_deferreds
d = deferreds.get(key, None)
if d: