summaryrefslogtreecommitdiff
path: root/src/leap/mail/imap
diff options
context:
space:
mode:
Diffstat (limited to 'src/leap/mail/imap')
-rw-r--r--src/leap/mail/imap/mailbox.py131
-rw-r--r--src/leap/mail/imap/memorystore.py236
-rw-r--r--src/leap/mail/imap/messageparts.py26
-rw-r--r--src/leap/mail/imap/messages.py336
-rw-r--r--src/leap/mail/imap/server.py1
-rw-r--r--src/leap/mail/imap/soledadstore.py129
-rwxr-xr-xsrc/leap/mail/imap/tests/leap_tests_imap.zsh7
7 files changed, 528 insertions, 338 deletions
diff --git a/src/leap/mail/imap/mailbox.py b/src/leap/mail/imap/mailbox.py
index 108d0da..b5c5719 100644
--- a/src/leap/mail/imap/mailbox.py
+++ b/src/leap/mail/imap/mailbox.py
@@ -26,7 +26,7 @@ import cStringIO
from collections import defaultdict
from twisted.internet import defer
-from twisted.internet.task import deferLater
+#from twisted.internet.task import deferLater
from twisted.python import log
from twisted.mail import imap4
@@ -119,6 +119,9 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
if not self.getFlags():
self.setFlags(self.INIT_FLAGS)
+ if self._memstore:
+ self.prime_last_uid_to_memstore()
+
@property
def listeners(self):
"""
@@ -132,6 +135,9 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
"""
return self._listeners[self.mbox]
+ # TODO this grows too crazily when many instances are fired, like
+ # during imaptest stress testing. Should have a queue of limited size
+ # instead.
def addListener(self, listener):
"""
Add a listener to the listeners queue.
@@ -153,6 +159,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
"""
self.listeners.remove(listener)
+ # TODO move completely to soledadstore, under memstore reponsibility.
def _get_mbox(self):
"""
Return mailbox document.
@@ -228,52 +235,28 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
def _get_last_uid(self):
"""
Return the last uid for this mailbox.
+ If we have a memory store, the last UID will be the highest
+ recorded UID in the message store, or a counter cached from
+ the mailbox document in soledad if this is higher.
:return: the last uid for messages in this mailbox
:rtype: bool
"""
- mbox = self._get_mbox()
- if not mbox:
- logger.error("We could not get a mbox!")
- # XXX It looks like it has been corrupted.
- # We need to be able to survive this.
- return None
- last = mbox.content.get(self.LAST_UID_KEY, 1)
- if self._memstore:
- last = max(last, self._memstore.get_last_uid(mbox))
+ last = self._memstore.get_last_uid(self.mbox)
+ print "last uid for %s: %s (from memstore)" % (self.mbox, last)
return last
- def _set_last_uid(self, uid):
- """
- Sets the last uid for this mailbox.
+ last_uid = property(
+ _get_last_uid, doc="Last_UID attribute.")
- :param uid: the uid to be set
- :type uid: int
+ def prime_last_uid_to_memstore(self):
"""
- leap_assert(isinstance(uid, int), "uid has to be int")
- mbox = self._get_mbox()
- key = self.LAST_UID_KEY
-
- count = self.getMessageCount()
-
- # XXX safety-catch. If we do get duplicates,
- # we want to avoid further duplication.
-
- if uid >= count:
- value = uid
- else:
- # something is wrong,
- # just set the last uid
- # beyond the max msg count.
- logger.debug("WRONG uid < count. Setting last uid to %s", count)
- value = count
-
- mbox.content[key] = value
- # XXX this should be set in the memorystore instead!!!
- self._soledad.put_doc(mbox)
-
- last_uid = property(
- _get_last_uid, _set_last_uid, doc="Last_UID attribute.")
+ Prime memstore with last_uid value
+ """
+ set_exist = set(self.messages.all_uid_iter())
+ last = max(set_exist) + 1 if set_exist else 1
+ logger.info("Priming Soledad last_uid to %s" % (last,))
+ self._memstore.set_last_soledad_uid(self.mbox, last)
def getUIDValidity(self):
"""
@@ -315,8 +298,15 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
:rtype: int
"""
with self.next_uid_lock:
- self.last_uid += 1
- return self.last_uid
+ if self._memstore:
+ return self.last_uid + 1
+ else:
+ # XXX after lock, it should be safe to
+ # return just the increment here, and
+ # have a different method that actually increments
+ # the counter when really adding.
+ self.last_uid += 1
+ return self.last_uid
def getMessageCount(self):
"""
@@ -397,26 +387,26 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
:return: a deferred that evals to None
"""
+ # TODO have a look at the cases for internal date in the rfc
if isinstance(message, (cStringIO.OutputType, StringIO.StringIO)):
message = message.getvalue()
- # XXX we should treat the message as an IMessage from here
+
+ # XXX we could treat the message as an IMessage from here
leap_assert_type(message, basestring)
- uid_next = self.getUIDNext()
- logger.debug('Adding msg with UID :%s' % uid_next)
if flags is None:
flags = tuple()
else:
flags = tuple(str(flag) for flag in flags)
- d = self._do_add_message(message, flags=flags, date=date, uid=uid_next)
+ d = self._do_add_message(message, flags=flags, date=date)
return d
- def _do_add_message(self, message, flags, date, uid):
+ def _do_add_message(self, message, flags, date):
"""
- Calls to the messageCollection add_msg method (deferred to thread).
+ Calls to the messageCollection add_msg method.
Invoked from addMessage.
"""
- d = self.messages.add_msg(message, flags=flags, date=date, uid=uid)
+ d = self.messages.add_msg(message, flags=flags, date=date)
# XXX Removing notify temporarily.
# This is interfering with imaptest results. I'm not clear if it's
# because we clutter the logging or because the set of listeners is
@@ -456,6 +446,8 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
# XXX removing the mailbox in situ for now,
# we should postpone the removal
+
+ # XXX move to memory store??
self._soledad.delete_doc(self._get_mbox())
def _close_cb(self, result):
@@ -466,8 +458,8 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
Expunge and mark as closed
"""
d = self.expunge()
- d.addCallback(self._close_cb)
- return d
+ #d.addCallback(self._close_cb)
+ #return d
def _expunge_cb(self, result):
return result
@@ -479,22 +471,15 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
print "EXPUNGE!"
if not self.isWriteable():
raise imap4.ReadOnlyMailbox
- mstore = self._memstore
- if mstore is not None:
- deleted = mstore.all_deleted_uid_iter(self.mbox)
- print "deleted ", list(deleted)
- for uid in deleted:
- mstore.remove_message(self.mbox, uid)
-
- print "now deleting from soledad"
- d = self.messages.remove_all_deleted()
- d.addCallback(self._expunge_cb)
- d.addCallback(self.messages.reset_last_uid)
-
- # XXX DEBUG -------------------
- # FIXME !!!
- # XXX should remove the hdocset too!!!
- return d
+
+ return self._memstore.expunge(self.mbox)
+
+ # TODO we can defer this back when it's correct
+ # but we should make sure the memstore has been synced.
+
+ #d = self._memstore.expunge(self.mbox)
+ #d.addCallback(self._expunge_cb)
+ #return d
def _bound_seq(self, messages_asked):
"""
@@ -783,12 +768,12 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
# IMessageCopier
@deferred
+ #@profile
def copy(self, messageObject):
"""
Copy the given message object into this mailbox.
"""
from twisted.internet import reactor
- uid_next = self.getUIDNext()
msg = messageObject
memstore = self._memstore
@@ -796,7 +781,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
fdoc = msg._fdoc
hdoc = msg._hdoc
if not fdoc:
- logger.debug("Tried to copy a MSG with no fdoc")
+ logger.warning("Tried to copy a MSG with no fdoc")
return
new_fdoc = copy.deepcopy(fdoc.content)
@@ -807,11 +792,12 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
if exist:
print "Destination message already exists!"
-
else:
print "DO COPY MESSAGE!"
+ mbox = self.mbox
+ uid_next = memstore.increment_last_soledad_uid(mbox)
new_fdoc[self.UID_KEY] = uid_next
- new_fdoc[self.MBOX_KEY] = self.mbox
+ new_fdoc[self.MBOX_KEY] = mbox
# XXX set recent!
@@ -824,9 +810,8 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
self._memstore.create_message(
self.mbox, uid_next,
MessageWrapper(
- new_fdoc, hdoc.content))
-
- deferLater(reactor, 1, self.notify_new)
+ new_fdoc, hdoc.content),
+ notify_on_disk=False)
# convenience fun
diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py
index 232a2fb..60e98c7 100644
--- a/src/leap/mail/imap/memorystore.py
+++ b/src/leap/mail/imap/memorystore.py
@@ -19,16 +19,20 @@ In-memory transient store for a LEAPIMAPServer.
"""
import contextlib
import logging
+import threading
import weakref
from collections import defaultdict
+from copy import copy
from twisted.internet import defer
from twisted.internet.task import LoopingCall
from twisted.python import log
from zope.interface import implements
+from leap.common.check import leap_assert_type
from leap.mail import size
+from leap.mail.decorators import deferred
from leap.mail.utils import empty
from leap.mail.messageflow import MessageProducer
from leap.mail.imap import interfaces
@@ -40,7 +44,10 @@ from leap.mail.imap.messageparts import ReferenciableDict
logger = logging.getLogger(__name__)
-SOLEDAD_WRITE_PERIOD = 20
+
+# The default period to do writebacks to the permanent
+# soledad storage, in seconds.
+SOLEDAD_WRITE_PERIOD = 10
@contextlib.contextmanager
@@ -76,16 +83,11 @@ class MemoryStore(object):
implements(interfaces.IMessageStore,
interfaces.IMessageStoreWriter)
- producer = None
-
# TODO We will want to index by chash when we transition to local-only
# UIDs.
- # TODO should store RECENT-FLAGS too
- # TODO should store HDOCSET too (use weakrefs!) -- will need to subclass
- # TODO do use dirty flag (maybe use namedtuples for that) so we can use it
- # also as a read-cache.
WRITING_FLAG = "_writing"
+ _last_uid_lock = threading.Lock()
def __init__(self, permanent_store=None,
write_period=SOLEDAD_WRITE_PERIOD):
@@ -138,17 +140,20 @@ class MemoryStore(object):
self._rflags_store = defaultdict(
lambda: {'doc_id': None, 'set': set([])})
- # TODO ----------------- implement mailbox-level flags store too?
- # XXX maybe we don't need this anymore...
- # let's see how good does it prefetch the headers if
- # we cache them in the store.
- self._hdocset_store = {}
- # --------------------------------------------------------------
+ """
+ last-uid store keeps the count of the highest UID
+ per mailbox.
+
+ {'mbox-a': 42,
+ 'mbox-b': 23}
+ """
+ self._last_uid = {}
# New and dirty flags, to set MessageWrapper State.
self._new = set([])
self._new_deferreds = {}
self._dirty = set([])
+ self._rflags_dirty = set([])
self._dirty_deferreds = {}
# Flag for signaling we're busy writing to the disk storage.
@@ -210,14 +215,25 @@ class MemoryStore(object):
print "adding new doc to memstore %s (%s)" % (mbox, uid)
key = mbox, uid
+ self._add_message(mbox, uid, message, notify_on_disk)
+
d = defer.Deferred()
d.addCallback(lambda result: log.msg("message save: %s" % result))
-
self._new.add(key)
+
+ # We store this deferred so we can keep track of the pending
+ # operations internally.
self._new_deferreds[key] = d
- self._add_message(mbox, uid, message, notify_on_disk)
- print "create message: ", d
- return d
+
+ if notify_on_disk:
+ # Caller wants to be notified when the message is on disk
+ # so we pass the deferred that will be fired when the message
+ # has been written.
+ return d
+ else:
+ # Caller does not care, just fired and forgot, so we pass
+ # a defer that will inmediately have its callback triggered.
+ return defer.succeed('fire-and-forget:%s' % str(key))
def put_message(self, mbox, uid, message, notify_on_disk=True):
"""
@@ -238,13 +254,14 @@ class MemoryStore(object):
:rtype: Deferred
"""
key = mbox, uid
-
d = defer.Deferred()
- d.addCallback(lambda result: log.msg("message save: %s" % result))
+ d.addCallback(lambda result: log.msg("message PUT save: %s" % result))
self._dirty.add(key)
self._dirty_deferreds[key] = d
self._add_message(mbox, uid, message, notify_on_disk)
+ #print "dirty ", self._dirty
+ #print "new ", self._new
return d
def _add_message(self, mbox, uid, message, notify_on_disk=True):
@@ -315,6 +332,19 @@ class MemoryStore(object):
store.pop(key)
prune((FDOC, HDOC, CDOCS, DOCS_ID), store)
+ #print "after adding: "
+ #import pprint; pprint.pprint(self._msg_store[key])
+
+ def get_docid_for_fdoc(self, mbox, uid):
+ """
+ Get Soledad document id for the flags-doc for a given mbox and uid.
+ """
+ fdoc = self._permanent_store.get_flags_doc(mbox, uid)
+ if not fdoc:
+ return None
+ doc_id = fdoc.doc_id
+ return doc_id
+
def get_message(self, mbox, uid):
"""
Get a MessageWrapper for the given mbox and uid combination.
@@ -326,6 +356,8 @@ class MemoryStore(object):
if msg_dict:
new, dirty = self._get_new_dirty_state(key)
return MessageWrapper(from_dict=msg_dict,
+ new=new,
+ dirty=dirty,
memstore=weakref.proxy(self))
else:
return None
@@ -334,6 +366,13 @@ class MemoryStore(object):
"""
Remove a Message from this MemoryStore.
"""
+ # XXX For the moment we are only removing the flags and headers
+ # docs. The rest we leave there polluting your hard disk,
+ # until we think about a good way of deorphaning.
+
+ # XXX implement elijah's idea of using a PUT document as a
+ # token to ensure consistency in the removal.
+
try:
key = mbox, uid
self._new.discard(key)
@@ -348,18 +387,22 @@ class MemoryStore(object):
"""
Write the message documents in this MemoryStore to a different store.
"""
- # For now, we pass if the queue is not empty, to avoid duplication.
+ # For now, we pass if the queue is not empty, to avoid duplicate
+ # queuing.
# We would better use a flag to know when we've already enqueued an
# item.
+
+ # XXX this could return the deferred for all the enqueued operations
+
if not self.producer.is_queue_empty():
return
print "Writing messages to Soledad..."
with set_bool_flag(self, self.WRITING_FLAG):
- for msg_wrapper in self.all_new_dirty_msg_iter():
- self.producer.push(msg_wrapper)
for rflags_doc_wrapper in self.all_rdocs_iter():
self.producer.push(rflags_doc_wrapper)
+ for msg_wrapper in self.all_new_dirty_msg_iter():
+ self.producer.push(msg_wrapper)
# MemoryStore specific methods.
@@ -370,12 +413,61 @@ class MemoryStore(object):
all_keys = self._msg_store.keys()
return [uid for m, uid in all_keys if m == mbox]
+ # last_uid
+
def get_last_uid(self, mbox):
"""
Get the highest UID for a given mbox.
+ It will be the highest between the highest uid in the message store for
+ the mailbox, and the soledad integer cache.
"""
uids = self.get_uids(mbox)
- return uids and max(uids) or 0
+ last_mem_uid = uids and max(uids) or 0
+ last_soledad_uid = self.get_last_soledad_uid(mbox)
+ return max(last_mem_uid, last_soledad_uid)
+
+ def get_last_soledad_uid(self, mbox):
+ """
+ Get last uid for a given mbox from the soledad integer cache.
+ """
+ return self._last_uid.get(mbox, 0)
+
+ def set_last_soledad_uid(self, mbox, value):
+ """
+ Set last uid for a given mbox in the soledad integer cache.
+ SoledadMailbox should prime this value during initialization.
+ Other methods (during message adding) SHOULD call
+ `increment_last_soledad_uid` instead.
+ """
+ leap_assert_type(value, int)
+ print "setting last soledad uid for ", mbox, "to", value
+ # if we already have a vlue here, don't do anything
+ with self._last_uid_lock:
+ if not self._last_uid.get(mbox, None):
+ self._last_uid[mbox] = value
+
+ def increment_last_soledad_uid(self, mbox):
+ """
+ Increment by one the soledad integer cache for the last_uid for
+ this mbox, and fire a defer-to-thread to update the soledad value.
+ The caller should lock the call tho this method.
+ """
+ with self._last_uid_lock:
+ self._last_uid[mbox] += 1
+ value = self._last_uid[mbox]
+ self.write_last_uid(mbox, value)
+ return value
+
+ @deferred
+ def write_last_uid(self, mbox, value):
+ """
+ Increment the soledad cache,
+ """
+ leap_assert_type(value, int)
+ if self._permanent_store:
+ self._permanent_store.write_last_uid(mbox, value)
+
+ # Counting sheeps...
def count_new_mbox(self, mbox):
"""
@@ -418,14 +510,12 @@ class MemoryStore(object):
docs_dict = self._chash_fdoc_store.get(chash, None)
fdoc = docs_dict.get(mbox, None) if docs_dict else None
- print "GETTING FDOC BY CHASH:", fdoc
-
# a couple of special cases.
# 1. We might have a doc with empty content...
if empty(fdoc):
return None
- # ...Or the message could exist, but being flagged for deletion.
+ # 2. ...Or the message could exist, but being flagged for deletion.
# We want to create a new one in this case.
# Hmmm what if the deletion is un-done?? We would end with a
# duplicate...
@@ -456,15 +546,22 @@ class MemoryStore(object):
for key in sorted(self._msg_store.keys())
if key in self._new or key in self._dirty)
+ def all_msg_dict_for_mbox(self, mbox):
+ """
+ Return all the message dicts for a given mbox.
+ """
+ return [self._msg_store[(mb, uid)]
+ for mb, uid in self._msg_store if mb == mbox]
+
def all_deleted_uid_iter(self, mbox):
"""
Return generator that iterates through the UIDs for all messags
with deleted flag in a given mailbox.
"""
- all_deleted = (
- msg['fdoc']['uid'] for msg in self._msg_store.values()
+ all_deleted = [
+ msg['fdoc']['uid'] for msg in self.all_msg_dict_for_mbox(mbox)
if msg.get('fdoc', None)
- and fields.DELETED_FLAG in msg['fdoc']['flags'])
+ and fields.DELETED_FLAG in msg['fdoc']['flags']]
return all_deleted
# new, dirty flags
@@ -473,6 +570,7 @@ class MemoryStore(object):
"""
Return `new` and `dirty` flags for a given message.
"""
+ # XXX should return *first* the news, and *then* the dirty...
return map(lambda _set: key in _set, (self._new, self._dirty))
def set_new(self, key):
@@ -485,7 +583,7 @@ class MemoryStore(object):
"""
Remove the key value from the `new` set.
"""
- print "Unsetting NEW for: %s" % str(key)
+ #print "Unsetting NEW for: %s" % str(key)
self._new.discard(key)
deferreds = self._new_deferreds
d = deferreds.get(key, None)
@@ -505,7 +603,7 @@ class MemoryStore(object):
"""
Remove the key value from the `dirty` set.
"""
- print "Unsetting DIRTY for: %s" % str(key)
+ #print "Unsetting DIRTY for: %s" % str(key)
self._dirty.discard(key)
deferreds = self._dirty_deferreds
d = deferreds.get(key, None)
@@ -522,6 +620,7 @@ class MemoryStore(object):
"""
Set the `Recent` flag for a given mailbox and UID.
"""
+ self._rflags_dirty.add(mbox)
self._rflags_store[mbox]['set'].add(uid)
# TODO --- nice but unused
@@ -536,6 +635,7 @@ class MemoryStore(object):
Set the value for the set of the recent flags.
Used from the property in the MessageCollection.
"""
+ self._rflags_dirty.add(mbox)
self._rflags_store[mbox]['set'] = set(value)
def load_recent_flags(self, mbox, flags_doc):
@@ -568,23 +668,81 @@ class MemoryStore(object):
:rtype: generator
"""
- rflags_store = self._rflags_store
-
# XXX use enums
DOC_ID = "doc_id"
SET = "set"
- print "LEN RFLAGS_STORE ------->", len(rflags_store)
- return (
- RecentFlagsDoc(
+ rflags_store = self._rflags_store
+
+ def get_rdoc(mbox, rdict):
+ mbox_rflag_set = rdict[SET]
+ recent_set = copy(mbox_rflag_set)
+ # zero it!
+ mbox_rflag_set.difference_update(mbox_rflag_set)
+ return RecentFlagsDoc(
doc_id=rflags_store[mbox][DOC_ID],
content={
fields.TYPE_KEY: fields.TYPE_RECENT_VAL,
fields.MBOX_KEY: mbox,
- fields.RECENTFLAGS_KEY: list(
- rflags_store[mbox][SET])
+ fields.RECENTFLAGS_KEY: list(recent_set)
})
- for mbox in rflags_store)
+
+ return (get_rdoc(mbox, rdict) for mbox, rdict in rflags_store.items()
+ if not empty(rdict[SET]))
+
+ # Methods that mirror the IMailbox interface
+
+ def remove_all_deleted(self, mbox):
+ """
+ Remove all messages flagged \\Deleted from this Memory Store only.
+ Called from `expunge`
+ """
+ mem_deleted = self.all_deleted_uid_iter(mbox)
+ for uid in mem_deleted:
+ self.remove_message(mbox, uid)
+ return mem_deleted
+
+ def expunge(self, mbox):
+ """
+ Remove all messages flagged \\Deleted, from the Memory Store
+ and from the permanent store also.
+ """
+ # TODO expunge should add itself as a callback to the ongoing
+ # writes.
+ soledad_store = self._permanent_store
+
+ try:
+ # 1. Stop the writing call
+ self._stop_write_loop()
+ # 2. Enqueue a last write.
+ #self.write_messages(soledad_store)
+ # 3. Should wait on the writebacks to finish ???
+ # FIXME wait for this, and add all the rest of the method
+ # as a callback!!!
+ except Exception as exc:
+ logger.exception(exc)
+
+ # Now, we...:
+
+ try:
+ # 1. Delete all messages marked as deleted in soledad.
+
+ # XXX this could be deferred for faster operation.
+ if soledad_store:
+ sol_deleted = soledad_store.remove_all_deleted(mbox)
+ else:
+ sol_deleted = []
+
+ # 2. Delete all messages marked as deleted in memory.
+ mem_deleted = self.remove_all_deleted(mbox)
+
+ all_deleted = set(mem_deleted).union(set(sol_deleted))
+ print "deleted ", all_deleted
+ except Exception as exc:
+ logger.exception(exc)
+ finally:
+ self._start_write_loop()
+ return all_deleted
# Dump-to-disk controls.
diff --git a/src/leap/mail/imap/messageparts.py b/src/leap/mail/imap/messageparts.py
index 257d3f0..6d8631a 100644
--- a/src/leap/mail/imap/messageparts.py
+++ b/src/leap/mail/imap/messageparts.py
@@ -32,7 +32,7 @@ from leap.common.decorators import memoized_method
from leap.common.mail import get_email_charset
from leap.mail.imap import interfaces
from leap.mail.imap.fields import fields
-from leap.mail.utils import first
+from leap.mail.utils import empty, first
MessagePartType = Enum("hdoc", "fdoc", "cdoc", "cdocs", "docs_id")
@@ -134,6 +134,13 @@ class MessageWrapper(object):
self._dict[self.HDOC] = ReferenciableDict(hdoc)
if cdocs is not None:
self._dict[self.CDOCS] = ReferenciableDict(cdocs)
+
+ # This will keep references to the doc_ids to be able to put
+ # messages to soledad. It will be populated during the walk() to avoid
+ # the overhead of reading from the db.
+
+ # XXX it really *only* make sense for the FDOC, the other parts
+ # should not be "dirty", just new...!!!
self._dict[self.DOCS_ID] = docs_id
# properties
@@ -201,6 +208,7 @@ class MessageWrapper(object):
else:
logger.warning("NO FDOC!!!")
content_ref = {}
+
return MessagePartDoc(new=self.new, dirty=self.dirty,
store=self._storetype,
part=MessagePartType.fdoc,
@@ -214,7 +222,6 @@ class MessageWrapper(object):
if _hdoc:
content_ref = weakref.proxy(_hdoc)
else:
- logger.warning("NO HDOC!!!!")
content_ref = {}
return MessagePartDoc(new=self.new, dirty=self.dirty,
store=self._storetype,
@@ -234,14 +241,21 @@ class MessageWrapper(object):
def walk(self):
"""
Generator that iterates through all the parts, returning
- MessagePartDoc.
+ MessagePartDoc. Used for writing to SoledadStore.
"""
- if self.fdoc is not None:
+ if self._dirty:
+ mbox = self.fdoc.content[fields.MBOX_KEY]
+ uid = self.fdoc.content[fields.UID_KEY]
+ docid_dict = self._dict[self.DOCS_ID]
+ docid_dict[self.FDOC] = self.memstore.get_docid_for_fdoc(
+ mbox, uid)
+
+ if not empty(self.fdoc.content):
yield self.fdoc
- if self.hdoc is not None:
+ if not empty(self.hdoc.content):
yield self.hdoc
for cdoc in self.cdocs.values():
- if cdoc is not None:
+ if not empty(cdoc):
content_ref = weakref.proxy(cdoc)
yield MessagePartDoc(new=self.new, dirty=self.dirty,
store=self._storetype,
diff --git a/src/leap/mail/imap/messages.py b/src/leap/mail/imap/messages.py
index 5de638b..35c07f5 100644
--- a/src/leap/mail/imap/messages.py
+++ b/src/leap/mail/imap/messages.py
@@ -202,21 +202,21 @@ class LeapMessage(fields, MailParser, MBoxParser):
:return: The flags, represented as strings
:rtype: tuple
"""
- if self._uid is None:
- return []
+ #if self._uid is None:
+ #return []
uid = self._uid
- flags = []
+ flags = set([])
fdoc = self._fdoc
if fdoc:
- flags = fdoc.content.get(self.FLAGS_KEY, None)
+ flags = set(fdoc.content.get(self.FLAGS_KEY, None))
msgcol = self._collection
# We treat the recent flag specially: gotten from
# a mailbox-level document.
if msgcol and uid in msgcol.recent_flags:
- flags.append(fields.RECENT_FLAG)
+ flags.add(fields.RECENT_FLAG)
if flags:
flags = map(str, flags)
return tuple(flags)
@@ -236,7 +236,7 @@ class LeapMessage(fields, MailParser, MBoxParser):
:return: a SoledadDocument instance
:rtype: SoledadDocument
"""
- # XXX use memory store ...!
+ # XXX Move logic to memory store ...
leap_assert(isinstance(flags, tuple), "flags need to be a tuple")
log.msg('setting flags: %s (%s)' % (self._uid, flags))
@@ -252,6 +252,7 @@ class LeapMessage(fields, MailParser, MBoxParser):
doc.content[self.DEL_KEY] = self.DELETED_FLAG in flags
if self._collection.memstore is not None:
+ print "putting message in collection"
self._collection.memstore.put_message(
self._mbox, self._uid,
MessageWrapper(fdoc=doc.content, new=False, dirty=True,
@@ -508,6 +509,8 @@ class LeapMessage(fields, MailParser, MBoxParser):
pmap = hdoc_content.get(fields.PARTS_MAP_KEY, {})
return pmap[str(part)]
+ # XXX moved to memory store
+ # move the rest too. ------------------------------------------
def _get_flags_doc(self):
"""
Return the document that keeps the flags for this
@@ -617,57 +620,38 @@ class LeapMessage(fields, MailParser, MBoxParser):
# destructor
- @deferred
- def remove(self):
- """
- Remove all docs associated with this message.
- Currently it removes only the flags doc.
- """
- # XXX For the moment we are only removing the flags and headers
- # docs. The rest we leave there polluting your hard disk,
- # until we think about a good way of deorphaning.
- # Maybe a crawler of unreferenced docs.
-
- # XXX implement elijah's idea of using a PUT document as a
- # token to ensure consistency in the removal.
-
- uid = self._uid
-
- fd = self._get_flags_doc()
- #hd = self._get_headers_doc()
- #bd = self._get_body_doc()
- #docs = [fd, hd, bd]
-
- try:
- memstore = self._collection.memstore
- except AttributeError:
- memstore = False
-
- if memstore and hasattr(fd, "store", None) == "mem":
- key = self._mbox, self._uid
- if fd.new:
- # it's a new document, so we can remove it and it will not
- # be writen. Watch out! We need to be sure it has not been
- # just queued to write!
- memstore.remove_message(*key)
-
- if fd.dirty:
- doc_id = fd.doc_id
- doc = self._soledad.get_doc(doc_id)
- try:
- self._soledad.delete_doc(doc)
- except Exception as exc:
- logger.exception(exc)
-
- else:
+ # XXX this logic moved to remove_message in memory store...
+ #@deferred
+ #def remove(self):
+ #"""
+ #Remove all docs associated with this message.
+ #Currently it removes only the flags doc.
+ #"""
+ #fd = self._get_flags_doc()
+#
+ #if fd.new:
+ # it's a new document, so we can remove it and it will not
+ # be writen. Watch out! We need to be sure it has not been
+ # just queued to write!
+ #memstore.remove_message(*key)
+#
+ #if fd.dirty:
+ #doc_id = fd.doc_id
+ #doc = self._soledad.get_doc(doc_id)
+ #try:
+ #self._soledad.delete_doc(doc)
+ #except Exception as exc:
+ #logger.exception(exc)
+#
+ #else:
# we just got a soledad_doc
- try:
- doc_id = fd.doc_id
- latest_doc = self._soledad.get_doc(doc_id)
- self._soledad.delete_doc(latest_doc)
- except Exception as exc:
- logger.exception(exc)
- return uid
+ #try:
+ #doc_id = fd.doc_id
+ #latest_doc = self._soledad.get_doc(doc_id)
+ #self._soledad.delete_doc(latest_doc)
+ #except Exception as exc:
+ #logger.exception(exc)
+ #return uid
def does_exist(self):
"""
@@ -826,7 +810,9 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
# ensure that we have a recent-flags and a hdocs-sec doc
self._get_or_create_rdoc()
- self._get_or_create_hdocset()
+
+ # Not for now...
+ #self._get_or_create_hdocset()
def _get_empty_doc(self, _type=FLAGS_DOC):
"""
@@ -959,7 +945,9 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
# not deferring to thread cause this now uses deferred asa retval
#@deferred
- def add_msg(self, raw, subject=None, flags=None, date=None, uid=1):
+ #@profile
+ def add_msg(self, raw, subject=None, flags=None, date=None, uid=None,
+ notify_on_disk=False):
"""
Creates a new message document.
Here lives the magic of the leap mail. Well, in soledad, really.
@@ -994,7 +982,11 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
# parse
msg, chash, size, multi = self._do_parse(raw)
- # check for uniqueness.
+ # check for uniqueness --------------------------------
+ # XXX profiler says that this test is costly.
+ # So we probably should just do an in-memory check and
+ # move the complete check to the soledad writer?
+ # Watch out! We're reserving a UID right after this!
if self._fdoc_already_exists(chash):
print ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>"
logger.warning("We already have that message in this mailbox.")
@@ -1003,6 +995,9 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
# so not touch it by the moment.
return defer.succeed('already_exists')
+ uid = self.memstore.increment_last_soledad_uid(self.mbox)
+ print "ADDING MSG WITH UID: %s" % uid
+
fd = self._populate_flags(flags, uid, chash, size, multi)
hd = self._populate_headr(msg, chash, subject, date)
@@ -1039,36 +1034,22 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
# XXX Should allow also to dump to disk directly,
# for no-memstore cases.
- # we return a deferred that, by default, will be triggered when
- # saved to disk
- d = self.memstore.create_message(self.mbox, uid, msg_container)
- print "defered-add", d
+ # we return a deferred that by default will be triggered
+ # inmediately.
+ d = self.memstore.create_message(self.mbox, uid, msg_container,
+ notify_on_disk=notify_on_disk)
print "adding message", d
return d
- def _remove_cb(self, result):
- return result
-
- def remove_all_deleted(self):
- """
- Removes all messages flagged as deleted.
- """
- delete_deferl = []
- for msg in self.get_deleted():
- delete_deferl.append(msg.remove())
- d1 = defer.gatherResults(delete_deferl, consumeErrors=True)
- d1.addCallback(self._remove_cb)
- return d1
-
- def remove(self, msg):
- """
- Remove a given msg.
- :param msg: the message to be removed
- :type msg: LeapMessage
- """
- d = msg.remove()
- d.addCallback(self._remove_cb)
- return d
+ #def remove(self, msg):
+ #"""
+ #Remove a given msg.
+ #:param msg: the message to be removed
+ #:type msg: LeapMessage
+ #"""
+ #d = msg.remove()
+ #d.addCallback(self._remove_cb)
+ #return d
#
# getters: specific queries
@@ -1175,76 +1156,76 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
# XXX FIXME -------------------------------------
# This should be rewritten to use memory store.
- def _get_hdocset(self):
- """
- An accessor for the hdocs-set for this mailbox.
- """
- if not self.__hdocset:
- with self._hdocset_lock:
- hdocset_doc = self._get_hdocset_doc()
- value = set(hdocset_doc.content.get(
- fields.HDOCS_SET_KEY, []))
- self.__hdocset = value
- return self.__hdocset
-
- def _set_hdocset(self, value):
- """
- Setter for the hdocs-set for this mailbox.
- """
- with self._hdocset_lock:
- hdocset_doc = self._get_hdocset_doc()
- newv = set(value)
- self.__hdocset = newv
- hdocset_doc.content[fields.HDOCS_SET_KEY] = list(newv)
+ #def _get_hdocset(self):
+ #"""
+ #An accessor for the hdocs-set for this mailbox.
+ #"""
+ #if not self.__hdocset:
+ #with self._hdocset_lock:
+ #hdocset_doc = self._get_hdocset_doc()
+ #value = set(hdocset_doc.content.get(
+ #fields.HDOCS_SET_KEY, []))
+ #self.__hdocset = value
+ #return self.__hdocset
+#
+ #def _set_hdocset(self, value):
+ #"""
+ #Setter for the hdocs-set for this mailbox.
+ #"""
+ #with self._hdocset_lock:
+ #hdocset_doc = self._get_hdocset_doc()
+ #newv = set(value)
+ #self.__hdocset = newv
+ #hdocset_doc.content[fields.HDOCS_SET_KEY] = list(newv)
# XXX should deferLater 0 it?
- self._soledad.put_doc(hdocset_doc)
-
- _hdocset = property(
- _get_hdocset, _set_hdocset,
- doc="Set of Document-IDs for the headers docs associated "
- "with this mailbox.")
-
- def _get_hdocset_doc(self):
- """
- Get hdocs-set document for this mailbox.
- """
- curried = partial(
- self._soledad.get_from_index,
- fields.TYPE_MBOX_IDX,
- fields.TYPE_HDOCS_SET_VAL, self.mbox)
- curried.expected = "hdocset"
- hdocset_doc = try_unique_query(curried)
- return hdocset_doc
-
+ #self._soledad.put_doc(hdocset_doc)
+#
+ #_hdocset = property(
+ #_get_hdocset, _set_hdocset,
+ #doc="Set of Document-IDs for the headers docs associated "
+ #"with this mailbox.")
+#
+ #def _get_hdocset_doc(self):
+ #"""
+ #Get hdocs-set document for this mailbox.
+ #"""
+ #curried = partial(
+ #self._soledad.get_from_index,
+ #fields.TYPE_MBOX_IDX,
+ #fields.TYPE_HDOCS_SET_VAL, self.mbox)
+ #curried.expected = "hdocset"
+ #hdocset_doc = try_unique_query(curried)
+ #return hdocset_doc
+#
# Property-set modification (protected by a different
# lock to give atomicity to the read/write operation)
-
- def remove_hdocset_docids(self, docids):
- """
- Remove the given document IDs from the set of
- header-documents associated with this mailbox.
- """
- with self._hdocset_property_lock:
- self._hdocset = self._hdocset.difference(
- set(docids))
-
- def remove_hdocset_docid(self, docid):
- """
- Remove the given document ID from the set of
- header-documents associated with this mailbox.
- """
- with self._hdocset_property_lock:
- self._hdocset = self._hdocset.difference(
- set([docid]))
-
- def add_hdocset_docid(self, docid):
- """
- Add the given document ID to the set of
- header-documents associated with this mailbox.
- """
- with self._hdocset_property_lock:
- self._hdocset = self._hdocset.union(
- set([docid]))
+#
+ #def remove_hdocset_docids(self, docids):
+ #"""
+ #Remove the given document IDs from the set of
+ #header-documents associated with this mailbox.
+ #"""
+ #with self._hdocset_property_lock:
+ #self._hdocset = self._hdocset.difference(
+ #set(docids))
+#
+ #def remove_hdocset_docid(self, docid):
+ #"""
+ #Remove the given document ID from the set of
+ #header-documents associated with this mailbox.
+ #"""
+ #with self._hdocset_property_lock:
+ #self._hdocset = self._hdocset.difference(
+ #set([docid]))
+#
+ #def add_hdocset_docid(self, docid):
+ #"""
+ #Add the given document ID to the set of
+ #header-documents associated with this mailbox.
+ #"""
+ #with self._hdocset_property_lock:
+ #self._hdocset = self._hdocset.union(
+ #set([docid]))
# individual doc getters, message layer.
@@ -1378,18 +1359,20 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
return (u for u in sorted(uids))
- def reset_last_uid(self, param):
- """
- Set the last uid to the highest uid found.
- Used while expunging, passed as a callback.
- """
- try:
- self.last_uid = max(self.all_uid_iter()) + 1
- except ValueError:
+ # XXX Should be moved to memstore
+ #def reset_last_uid(self, param):
+ #"""
+ #Set the last uid to the highest uid found.
+ #Used while expunging, passed as a callback.
+ #"""
+ #try:
+ #self.last_uid = max(self.all_uid_iter()) + 1
+ #except ValueError:
# empty sequence
- pass
- return param
+ #pass
+ #return param
+ # XXX MOVE to memstore
def all_flags(self):
"""
Return a dict with all flags documents for this mailbox.
@@ -1444,7 +1427,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
:rtype: int
"""
- # XXX We could cache this in memstore too until next write...
+ # XXX We should cache this in memstore too until next write...
count = self._soledad.get_count_from_index(
fields.TYPE_MBOX_IDX,
fields.TYPE_FLAGS_VAL, self.mbox)
@@ -1491,6 +1474,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
# recent messages
+ # XXX take it from memstore
def count_recent(self):
"""
Count all messages with the `Recent` flag.
@@ -1503,30 +1487,6 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
"""
return len(self.recent_flags)
- # deleted messages
-
- def deleted_iter(self):
- """
- Get an iterator for the message UIDs with `deleted` flag.
-
- :return: iterator through deleted message docs
- :rtype: iterable
- """
- return (doc.content[self.UID_KEY] for doc in
- self._soledad.get_from_index(
- fields.TYPE_MBOX_DEL_IDX,
- fields.TYPE_FLAGS_VAL, self.mbox, '1'))
-
- def get_deleted(self):
- """
- Get all messages with the `Deleted` flag.
-
- :returns: a generator of LeapMessages
- :rtype: generator
- """
- return (LeapMessage(self._soledad, docid, self.mbox)
- for docid in self.deleted_iter())
-
def __len__(self):
"""
Returns the number of messages on this mailbox.
diff --git a/src/leap/mail/imap/server.py b/src/leap/mail/imap/server.py
index c95a9be..3a6ac9a 100644
--- a/src/leap/mail/imap/server.py
+++ b/src/leap/mail/imap/server.py
@@ -199,6 +199,7 @@ class LeapIMAPServer(imap4.IMAP4Server):
# XXX fake a delayed operation, to debug problem with messages getting
# back to the source mailbox...
+ print "faking checkpoint..."
import time
time.sleep(2)
return None
diff --git a/src/leap/mail/imap/soledadstore.py b/src/leap/mail/imap/soledadstore.py
index ea5b36e..60576a3 100644
--- a/src/leap/mail/imap/soledadstore.py
+++ b/src/leap/mail/imap/soledadstore.py
@@ -18,18 +18,22 @@
A MessageStore that writes to Soledad.
"""
import logging
+import threading
from itertools import chain
+#from twisted.internet import defer
from u1db import errors as u1db_errors
from zope.interface import implements
+from leap.common.check import leap_assert_type
from leap.mail.imap.messageparts import MessagePartType
from leap.mail.imap.messageparts import MessageWrapper
from leap.mail.imap.messageparts import RecentFlagsDoc
from leap.mail.imap.fields import fields
from leap.mail.imap.interfaces import IMessageStore
from leap.mail.messageflow import IMessageConsumer
+from leap.mail.utils import first
logger = logging.getLogger(__name__)
@@ -123,6 +127,7 @@ class SoledadStore(ContentDedup):
"""
This will create docs in the local Soledad database.
"""
+ _last_uid_lock = threading.Lock()
implements(IMessageConsumer, IMessageStore)
@@ -177,6 +182,7 @@ class SoledadStore(ContentDedup):
# IMessageConsumer
+ #@profile
def consume(self, queue):
"""
Creates a new document in soledad db.
@@ -220,6 +226,7 @@ class SoledadStore(ContentDedup):
if isinstance(doc_wrapper, MessageWrapper):
# If everything went well, we can unset the new flag
# in the source store (memory store)
+ print "unsetting new flag!"
doc_wrapper.new = False
doc_wrapper.dirty = False
empty = queue.empty()
@@ -243,13 +250,11 @@ class SoledadStore(ContentDedup):
return chain((doc_wrapper,),
self._get_calls_for_msg_parts(doc_wrapper))
elif isinstance(doc_wrapper, RecentFlagsDoc):
- print "getting calls for rflags"
return chain((doc_wrapper,),
self._get_calls_for_rflags_doc(doc_wrapper))
else:
print "********************"
print "CANNOT PROCESS ITEM!"
- print "item --------------------->", doc_wrapper
return (i for i in [])
def _try_call(self, call, item):
@@ -275,6 +280,7 @@ class SoledadStore(ContentDedup):
if msg_wrapper.new is True:
call = self._soledad.create_doc
+ print "NEW DOC ----------------------"
# item is expected to be a MessagePartDoc
for item in msg_wrapper.walk():
@@ -301,30 +307,22 @@ class SoledadStore(ContentDedup):
# the flags doc.
elif msg_wrapper.dirty is True:
- print "DIRTY DOC! ----------------------"
call = self._soledad.put_doc
-
# item is expected to be a MessagePartDoc
for item in msg_wrapper.walk():
+ # XXX FIXME Give error if dirty and not doc_id !!!
doc_id = item.doc_id # defend!
+ if not doc_id:
+ continue
doc = self._soledad.get_doc(doc_id)
- doc.content = item.content
-
+ doc.content = dict(item.content)
if item.part == MessagePartType.fdoc:
- print "Will PUT the doc: ", doc
- yield dict(doc), call
-
- # XXX also for linkage-doc
-
- # TODO should write back to the queue
- # with the results of the operation.
- # We can write there:
- # (*) MsgWriteACK --> Should remove from incoming queue.
- # (We should do this here).
- # Implement using callbacks for each operation.
+ logger.debug("PUT dirty fdoc")
+ yield doc, call
+ # XXX also for linkage-doc !!!
else:
- logger.error("Cannot delete documents yet!")
+ logger.error("Cannot delete documents yet from the queue...!")
def _get_calls_for_rflags_doc(self, rflags_wrapper):
"""
@@ -334,18 +332,91 @@ class SoledadStore(ContentDedup):
rdoc = self._soledad.get_doc(rflags_wrapper.doc_id)
payload = rflags_wrapper.content
- print "rdoc", rdoc
- print "SAVING RFLAGS TO SOLEDAD..."
- import pprint; pprint.pprint(payload)
+ logger.debug("Saving RFLAGS to Soledad...")
if payload:
rdoc.content = payload
- print
- print "YIELDING -----", rdoc
- print "AND ----------", call
yield rdoc, call
- else:
- print ">>>>>>>>>>>>>>>>>"
- print ">>>>>>>>>>>>>>>>>"
- print ">>>>>>>>>>>>>>>>>"
- print "No payload"
+
+ def _get_mbox_document(self, mbox):
+ """
+ Return mailbox document.
+
+ :return: A SoledadDocument containing this mailbox, or None if
+ the query failed.
+ :rtype: SoledadDocument or None.
+ """
+ try:
+ query = self._soledad.get_from_index(
+ fields.TYPE_MBOX_IDX,
+ fields.TYPE_MBOX_VAL, mbox)
+ if query:
+ return query.pop()
+ except Exception as exc:
+ logger.exception("Unhandled error %r" % exc)
+
+ def get_flags_doc(self, mbox, uid):
+ """
+ Return the SoledadDocument for the given mbox and uid.
+ """
+ try:
+ flag_docs = self._soledad.get_from_index(
+ fields.TYPE_MBOX_UID_IDX,
+ fields.TYPE_FLAGS_VAL, mbox, str(uid))
+ result = first(flag_docs)
+ except Exception as exc:
+ # ugh! Something's broken down there!
+ logger.warning("ERROR while getting flags for UID: %s" % uid)
+ logger.exception(exc)
+ finally:
+ return result
+
+ def write_last_uid(self, mbox, value):
+ """
+ Write the `last_uid` integer to the proper mailbox document
+ in Soledad.
+ This is called from the deferred triggered by
+ memorystore.increment_last_soledad_uid, which is expected to
+ run in a separate thread.
+ """
+ leap_assert_type(value, int)
+ key = fields.LAST_UID_KEY
+
+ with self._last_uid_lock:
+ mbox_doc = self._get_mbox_document(mbox)
+ old_val = mbox_doc.content[key]
+ if value < old_val:
+ logger.error("%s:%s Tried to write a UID lesser than what's "
+ "stored!" % (mbox, value))
+ mbox_doc.content[key] = value
+ self._soledad.put_doc(mbox_doc)
+
+ # deleted messages
+
+ def deleted_iter(self, mbox):
+ """
+ Get an iterator for the SoledadDocuments for messages
+ with \\Deleted flag for a given mailbox.
+
+ :return: iterator through deleted message docs
+ :rtype: iterable
+ """
+ return (doc for doc in self._soledad.get_from_index(
+ fields.TYPE_MBOX_DEL_IDX,
+ fields.TYPE_FLAGS_VAL, mbox, '1'))
+
+ # TODO can deferToThread this?
+ def remove_all_deleted(self, mbox):
+ """
+ Remove from Soledad all messages flagged as deleted for a given
+ mailbox.
+ """
+ print "DELETING ALL DOCS FOR -------", mbox
+ deleted = []
+ for doc in self.deleted_iter(mbox):
+ deleted.append(doc.content[fields.UID_KEY])
+ print
+ print ">>>>>>>>>>>>>>>>>>>>"
+ print "deleting doc: ", doc.doc_id, doc.content
+ self._soledad.delete_doc(doc)
+ return deleted
diff --git a/src/leap/mail/imap/tests/leap_tests_imap.zsh b/src/leap/mail/imap/tests/leap_tests_imap.zsh
index 676d1a8..8f0df9f 100755
--- a/src/leap/mail/imap/tests/leap_tests_imap.zsh
+++ b/src/leap/mail/imap/tests/leap_tests_imap.zsh
@@ -61,7 +61,8 @@ IMAPTEST="imaptest"
# These should be kept constant across benchmarking
# runs across different machines, for comparability.
-DURATION=200
+#DURATION=200
+DURATION=60
NUM_MSG=200
@@ -76,7 +77,7 @@ imaptest_cmd() {
}
stress_imap() {
- mknod imap_pipe p
+ mkfifo imap_pipe
cat imap_pipe | tee output &
imaptest_cmd >> imap_pipe
}
@@ -99,7 +100,7 @@ print_results() {
echo "----------------------"
echo "\tavg\tstdev"
$GREP "avg" ./output | sed -e 's/^ *//g' -e 's/ *$//g' | \
- awk '
+ gawk '
function avg(data, count) {
sum=0;
for( x=0; x <= count-1; x++) {