summaryrefslogtreecommitdiff
path: root/mail/src/leap
diff options
context:
space:
mode:
Diffstat (limited to 'mail/src/leap')
-rw-r--r--mail/src/leap/mail/utils.py81
1 files changed, 80 insertions, 1 deletions
diff --git a/mail/src/leap/mail/utils.py b/mail/src/leap/mail/utils.py
index 8b75cfc..3ba4291 100644
--- a/mail/src/leap/mail/utils.py
+++ b/mail/src/leap/mail/utils.py
@@ -17,10 +17,10 @@
"""
Mail utilities.
"""
-import copy
import json
import re
import traceback
+import Queue
from leap.soledad.common.document import SoledadDocument
@@ -149,6 +149,85 @@ def phash_iter(d):
yield phash
+def accumulator(fun, lim):
+ """
+ A simple accumulator that uses a closure and a mutable
+ object to collect items.
+ When the count of items is greater than `lim`, the
+ collection is flushed after invoking a map of the function `fun`
+ over it.
+
+ The returned accumulator can also be flushed at any moment
+ by passing a boolean as a second parameter.
+
+ :param fun: the function to call over the collection
+ when its size is greater than `lim`
+ :type fun: callable
+ :param lim: the turning point for the collection
+ :type lim: int
+ :rtype: function
+
+ >>> from pprint import pprint
+ >>> acc = accumulator(pprint, 2)
+ >>> acc(1)
+ >>> acc(2)
+ [1, 2]
+ >>> acc(3)
+ >>> acc(4)
+ [3, 4]
+ >>> acc = accumulator(pprint, 5)
+ >>> acc(1)
+ >>> acc(2)
+ >>> acc(3)
+ >>> acc(None, flush=True)
+ [1,2,3]
+ """
+ KEY = "items"
+ _o = {KEY: []}
+
+ def _accumulator(item, flush=False):
+ collection = _o[KEY]
+ collection.append(item)
+ if len(collection) >= lim or flush:
+ map(fun, filter(None, collection))
+ _o[KEY] = []
+
+ return _accumulator
+
+
+def accumulator_queue(fun, lim):
+ """
+ A version of the accumulator that uses a queue.
+
+ When the count of items is greater than `lim`, the
+ queue is flushed after invoking the function `fun`
+ over its items.
+
+ The returned accumulator can also be flushed at any moment
+ by passing a boolean as a second parameter.
+
+ :param fun: the function to call over the collection
+ when its size is greater than `lim`
+ :type fun: callable
+ :param lim: the turning point for the collection
+ :type lim: int
+ :rtype: function
+ """
+ _q = Queue.Queue()
+
+ def _accumulator(item, flush=False):
+ _q.put(item)
+ if _q.qsize() >= lim or flush:
+ collection = [_q.get() for i in range(_q.qsize())]
+ map(fun, filter(None, collection))
+
+ return _accumulator
+
+
+#
+# String manipulation
+#
+
class CustomJsonScanner(object):
"""
This class is a context manager definition used to monkey patch the default