From ce55f761a55f78cb122296e91686fa6fde8959b8 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Fri, 7 Feb 2014 07:00:47 -0400 Subject: two versions of accumulator util --- src/leap/mail/utils.py | 81 +++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 80 insertions(+), 1 deletion(-) (limited to 'src') diff --git a/src/leap/mail/utils.py b/src/leap/mail/utils.py index 8b75cfc..3ba4291 100644 --- a/src/leap/mail/utils.py +++ b/src/leap/mail/utils.py @@ -17,10 +17,10 @@ """ Mail utilities. """ -import copy import json import re import traceback +import Queue from leap.soledad.common.document import SoledadDocument @@ -149,6 +149,85 @@ def phash_iter(d): yield phash +def accumulator(fun, lim): + """ + A simple accumulator that uses a closure and a mutable + object to collect items. + When the count of items is greater than `lim`, the + collection is flushed after invoking a map of the function `fun` + over it. + + The returned accumulator can also be flushed at any moment + by passing a boolean as a second parameter. + + :param fun: the function to call over the collection + when its size is greater than `lim` + :type fun: callable + :param lim: the turning point for the collection + :type lim: int + :rtype: function + + >>> from pprint import pprint + >>> acc = accumulator(pprint, 2) + >>> acc(1) + >>> acc(2) + [1, 2] + >>> acc(3) + >>> acc(4) + [3, 4] + >>> acc = accumulator(pprint, 5) + >>> acc(1) + >>> acc(2) + >>> acc(3) + >>> acc(None, flush=True) + [1,2,3] + """ + KEY = "items" + _o = {KEY: []} + + def _accumulator(item, flush=False): + collection = _o[KEY] + collection.append(item) + if len(collection) >= lim or flush: + map(fun, filter(None, collection)) + _o[KEY] = [] + + return _accumulator + + +def accumulator_queue(fun, lim): + """ + A version of the accumulator that uses a queue. + + When the count of items is greater than `lim`, the + queue is flushed after invoking the function `fun` + over its items. + + The returned accumulator can also be flushed at any moment + by passing a boolean as a second parameter. + + :param fun: the function to call over the collection + when its size is greater than `lim` + :type fun: callable + :param lim: the turning point for the collection + :type lim: int + :rtype: function + """ + _q = Queue.Queue() + + def _accumulator(item, flush=False): + _q.put(item) + if _q.qsize() >= lim or flush: + collection = [_q.get() for i in range(_q.qsize())] + map(fun, filter(None, collection)) + + return _accumulator + + +# +# String manipulation +# + class CustomJsonScanner(object): """ This class is a context manager definition used to monkey patch the default -- cgit v1.2.3