diff options
author | Nick Mathewson <nickm@torproject.org> | 2008-08-30 16:33:47 +0000 |
---|---|---|
committer | Nick Mathewson <nickm@torproject.org> | 2008-08-30 16:33:47 +0000 |
commit | 7b434c8d229b867df4523a13ec97e30ec26b6b7b (patch) | |
tree | 99225347698179a361cab873f5f8bd4731493d12 /lib |
Initial auto-updater commit: s-expression libray and format spec.
git-svn-id: file:///home/or/svnrepo/updater/trunk@16692 55e972cd-5a19-0410-ae62-a4d7a52db4cd
Diffstat (limited to 'lib')
-rw-r--r-- | lib/sexp/__init__.py | 2 | ||||
-rw-r--r-- | lib/sexp/access.py | 373 | ||||
-rw-r--r-- | lib/sexp/encode.py | 223 | ||||
-rw-r--r-- | lib/sexp/parse.py | 210 | ||||
-rw-r--r-- | lib/sexp/tests.py | 31 |
5 files changed, 839 insertions, 0 deletions
diff --git a/lib/sexp/__init__.py b/lib/sexp/__init__.py new file mode 100644 index 0000000..1ccccc3 --- /dev/null +++ b/lib/sexp/__init__.py @@ -0,0 +1,2 @@ + +__all__ = [ 'parse', 'encode', 'access', 'tests' ] diff --git a/lib/sexp/access.py b/lib/sexp/access.py new file mode 100644 index 0000000..d9b40dc --- /dev/null +++ b/lib/sexp/access.py @@ -0,0 +1,373 @@ + +import re +import sys + +def s_tag(s): + """ + >>> s_tag("a string") is None + True + >>> s_tag(["a-tagged", "list"]) + 'a-tagged' + >>> s_tag([["untagged"], "list"]) is None + True + """ + if len(s) and not isinstance(s, str) and isinstance(s[0],str): + return s[0] + else: + return None + +def s_child(s, tag): + for child in s: + if s_tag(child) == tag: + return child + return None + +def s_children(s, tag): + return (ch for ch in s if s_tag(ch) == tag) + +def s_descendants(s, tags=()): + stack = [ ] + push = stack.append + pop = stack.pop + + idx = 0 + while 1: + while idx == len(s): + try: + s, idx = pop() + except IndexError: + return + if isinstance(s[idx], str): + idx += 1 + continue + if s_tag(s[idx]) in tags: + yield s[idx] + + +class SExpr(list): + def __init__(self, stuff=()): + list.__init__(self, stuff) + self._d = None + + def __getattr__(self, item): + if self._d is None: self._buildDict() + return self[self._d[item]] + + def __getitem__(self, idx): + item = list.__getitem__(self, idx) + if type(item) in (list, tuple): #exact match only. + item = self[idx] = SExpr(item) + return item + + def _buildDict(self): + self._d = d = {} + for idx in xrange(len(self)): + item = list.__getitem__(self, idx) + t = s_tag(item) + if t is not None: + d[t] = idx + +def _s_lookup_all(s, path, callback): + + # XXXX: Watch out; ** gets pretty heavy pretty fast. + + if isinstance(path, str): + path = path.split(".") + + if len(path) == 0: + callback(s) + return + + for p_idx in xrange(len(path)): + p_item = path[p_idx] + + if p_item == '*': + for ch in s: + if not isinstance(ch, str): + _s_lookup_all(s, path[p_idx+1:], callback) + return + elif p_item == '**': + for ch in s_descendants(s): + if not isinstance(ch, str): + _s_lookup_all(s, path[p_idx+1:], callback) + return + elif p_item.startswith('**'): + for ch in s_descendants(s): + if s_tag(ch) == p_item[2:]: + _s_lookup_all(s, path[p_idx+1:], callback) + else: + s = s_child(s, p_item) + if s is None: + return + + callback(s) + +def s_lookup_all(s, path): + result = [] + _s_lookup_all(s, path, result.append) + return result + +def s_lookup(s, path): + r = s_lookup_all(s, path) + if len(r): + return r[0] + return None + +class _Singleton: + def isSingleton(self): + return True + def matches(self, item): + raise NotImplemented() + + def clear(self): + self._got = False + def matchItem(self, item): + if not self._got and self.matches(item): + self._got = True + return True + else: + return False + def isSatisfied(self): + return self._got + +class _Span: + def isSingleton(self): + return False + def clear(self): + pass + def matchItem(self, item): + raise NotImplemented() + def isSatisfied(self): + raise NotImplemented() + +class _AnyItem(_Singleton): + def matches(self,item): + return True + def rep(self): + return "?" +class _AnyString(_Singleton): + def matches(self,item): + return isinstance(item, str) + def rep(self): + return "." +class _ExactString(_Singleton): + def __init__(self, s): + self._s = s + def matches(self, item): + return item == self._s + def rep(self): + return "=%s"%self._s +class _ReString(_Singleton): + def __init__(self, s, regex=None): + if regex is None: + regex = re.compile(s) + self._re = regex + self._s = s + def matches(self, item): + if not isinstance(item, str): + return False + m = self._re.match(item) + return m and m.end() == len(item) + def rep(self): + return "%s"%self._s +class _List(_Singleton): + def __init__(self, subpatterns): + self._pats = subpatterns + def clear(self): + _Singleton.clear(self) + for p in self._pats: + p.clear() + def matches(self, item): + if isinstance(item, str): + return False + + i_idx = 0 + pat_idx = 0 + while i_idx < len(item): + try: + subpat = self._pats[pat_idx] + except: + return False # Too many items. + + if subpat.isSingleton(): + if not subpat.matches(item[i_idx]): + return False + i_idx += 1 + pat_idx += 1 + else: + subpat.clear() + while i_idx < len(item) and subpat.matchItem(item[i_idx]): + i_idx += 1 + if not subpat.isSatisfied(): + return False + pat_idx += 1 + + # Out of items, but we have more patterns. Make sure they all accept + # 0 items. + if pat_idx < len(self._pats): + for subpat in self._pats[pat_idx:]: + subpat.clear() + if not subpat.isSatisfied(): + return False + return True + + def rep(self): + return [ p.rep() for p in self._pats ] + +class _AnyItems(_Span): + def matchItem(self, item): + return True + def isSatisfied(self): + return True + def rep(self): + return "*" + +class _NMatches(_Span): + def __init__(self, alternatives, lo, hi): + self.lo = lo + self.hi = hi + self.count = 0 + self.alternatives = alternatives + for a in alternatives: + if not a.isSingleton(): + raise SchemaFormatError("Nexted span inside span") + def clear(self): + self.count = 0 + for p in self.alternatives: + p.clear() + def matchItem(self, item): + if self.count == self.hi: + return False + for p in self.alternatives: + if p.matches(item): + self.count += 1 + return True + return False + def isSatisfied(self): + return self.lo <= self.count <= self.hi + + def rep(self): + name = { (1,1): ":oneof", + (0,1): ":maybe", + (0,sys.maxint): ":anyof", + (1,sys.maxint): ":someof" }.get((self.lo, self.hi)) + if name is None: + name = ":%d-%d"%(self.lo, self.hi) + result = [ name ] + result.extend(p.rep() for p in self.alternatives) + return result + +class _Unordered(_Span): + def __init__(self, alternatives): + self.alternatives = alternatives + def clear(self): + for p in self.alternatives: + p.clear() + def matchItem(self, item): + for p in self.alternatives: + if p.matchItem(item): + return True + return False + def isSatisfied(self): + for p in self.alternatives: + if not p.isSatisfied(): + return False + return True + def rep(self): + result = [ ":unordered" ] + result.extend(p.rep() for p in self.alternatives) + return result + +class SchemaFormatError(Exception): + pass + +_RE_PAT = re.compile(r'/((?:[\\.]|[^\\/]+)*)/([ilmstx]*)', re.I) + +def parseSchema(s, table): + + if isinstance(s, str): + if not len(s): + raise SchemaFormatError("Empty string encountered") + if s == '*': + return _AnyItems() + elif s == '?': + return _AnyItem() + elif s == '.': + return _AnyString() + elif s.startswith('='): + return _ExactString(s[1:]) + elif s.startswith('.'): + try: + return table[s[1:]] + except KeyError: + raise SchemaFormatError("Unknown reference %s"%s) + else: + m = _RE_PAT.match(s) + if m: + flags = 0 + for char in m.group(2): + flags |= { "i":re.I, "l":re.L, "m":re.M, "s":re.S, + "t":re.T, "x":re.X }[char.lower()] + try: + p = re.compile(m.group(1), flags) + except re.error, e: + raise SchemaFormatError("Couldn't compile %s"%s) + + return _ReString(s, p) + + raise SchemaFormatError("Confusing entry %s"%s) + elif len(s) and isinstance(s[0], str) and s[0].startswith(':'): + tag = s[0] + + m = re.match(r'\:(\d*)(\-\d*)?$', tag) + if m: + g = m.groups() + if g[0]: + lo = int(g[0], 10) + else: + lo = 0 + if g[1]: + if len(g[1]) > 1: + hi = int(g[1][1:], 10) + else: + hi = sys.maxint + else: + hi = lo + else: + try: + lo,hi = { ":maybe": (0,1), + ":oneof": (1,1), + ":anyof": (0,sys.maxint), + ":someof":(1,sys.maxint), + ":unordered": (None, None) }[tag] + except KeyError: + raise SchemaFormatError("Unknown tag %s"%tag) + + subitems = [ parseSchema(i, table) for i in s[1:] ] + if lo is not None: + return _NMatches(subitems, lo, hi) + else: + return _Unordered(subitems) + else: + return _List([ parseSchema(i, table) for i in s ]) + +# Schema format: +# "=string" matches a string itself. +# "*" matches any number of items and can occur only at the end of a list. +# "?" matches a single item. +# "." matches any string. +# "/re/" matches a regex. +# ".name" matches a named schema +# +# (i1 i2 i3) matches a list of i1 followed by i2 followed by i3. +# +# (:maybe i1) matches zero or one of i1. +# (:oneof i1 i2 i3) matches any one of the items i1, i2, i3. +# (:anyof i1 i2 i3) matches zero or more of the items i1, i2, i3. +# (:someof i1 i2 i3) matches one or more of i1, i2, and i3. +# +# (:unordered i1 i2 i3) matches all of i1, i2, and i3, in any order. +# +# The matching algorithm is a stupid greedy algorithm. If you need to +# check stuff it can't handle, write a new thing. + diff --git a/lib/sexp/encode.py b/lib/sexp/encode.py new file mode 100644 index 0000000..1df6406 --- /dev/null +++ b/lib/sexp/encode.py @@ -0,0 +1,223 @@ + + +import base64 +import binascii +import re +import hashlib + +def _encodeHex(s): + """ + Encode a string in hex format. + + >>> _encodeHex("Hello world") + '#48656c6c6f20776f726c64#' + >>> _encodeHex("") + '##' + """ + return "#%s#"%binascii.b2a_hex(s) + +def _encodeBase64(s): + """ + Encode a string in base64 format, with embedded newlines. + + >>> _encodeBase64("") + '||' + >>> _encodeBase64("Hello world") + '|SGVsbG8gd29ybGQ=|' + >>> print _encodeBase64("Hello world") + |SGVsbG8gd29ybGQ=| + >>> _encodeBase64("Good night, sweet prince! A flock of angels " + ... "sing thee to thy rest") + '|R29vZCBuaWdodCwgc3dlZXQgcHJpbmNlISBBIGZsb2NrIG9mIGFuZ2VscyBzaW5nIHRoZWUgdG8g\\ndGh5IHJlc3Q=|' + + """ + return "|%s|"%base64.encodestring(s).strip() + +# Map from a character value to its representation in a quoted-string. +_QUOTED_MAP = { '\b' : "\\b", + '\t' : "\\t", + '\v' : "\\v", + '\n' : "\\n", + '\f' : "\\f", + '\r' : "\\r", + '"' : "\"", + '\b' : "\\b", + '\\' : "\\", } +for x in xrange(256): + if 32 <= x <= 126: + _QUOTED_MAP[chr(x)] = chr(x) + elif not _QUOTED_MAP.has_key(chr(x)): + _QUOTED_MAP[chr(x)] = "\\x%02x"%x +del x + + +_QUOTED_CHAR_RE = re.compile(r'[^\ -\~]') +def _replaceQuotedChar(match, _Q=_QUOTED_MAP): + """Helper function for replacing .""" + return _Q[match.group(0)] + +def _encodeQuoted(s, _Q=_QUOTED_MAP): + """ + >>> _encodeQuoted("") + '""' + >>> _encodeQuoted("Hello world") + '"Hello world"' + >>> print _encodeQuoted("Hello \xff\b") + "Hello \\xff\\b" + """ + # This implementation is a slower for the case where lots of stuff + # needs quoting, but faster for the case where only some stuff + # needs quoting. If more than about 1/4 of the characters need + # quoting, then the commented-out version below is faster. Yes, + # this is a stupid overoptimization. + return '"%s"'%(_QUOTED_CHAR_RE.sub(_replaceQuotedChar, s)) + + #return '"%s"'%("".join(map(_QUOTED_MAP.__getitem__, s))) + +def _encodeRaw(s): + """ + Encode a string in the "raw" format used for canonical encodings. + + >>> _encodeRaw("") + '0:' + >>> _encodeRaw(" ") + '1: ' + >>> _encodeRaw(" \\n") + '2: \\n' + """ + return "%d:%s"%(len(s),s) + +_TOKEN_PAT = r"[a-zA-Z\-\.\/\_\:\*\+\=][a-zA-Z0-9\-\.\/\_\:\*\+\=]*" + +_TOKEN_RE = re.compile(_TOKEN_PAT) +def _writeToken(write,s): + """Write a string in the token (unencoded) format. Only works for strings + matching _TOKEN_RE. + """ + assert _TOKEN_RE.match(s) + return s + +def _encodeCleanest(s, indent=0): + """Encode s in whatever format seems most human-readable.""" + + if _TOKEN_RE.match(s): + return s + n = 0 + for ch in s: + if _QUOTED_MAP[ch] != ch: + n += 1 + if n > 3 and n > len(s)//4: + if len(s) > 16: + return _encodeBase64(s).replace("\n", " "*(indent+1)+"\n") + else: + return _encodeHex(s) + else: + return _encodeQuoted(s) + +def _encodePrettyPrint(s, write, indent=0, niceWidth=80): + if isinstance(s, str): + write(_encodeCleanest(s)) + return + elif len(s) == 0: + write("()") + return + + if isinstance(s[0], str): + parts = [ " "*indent, "(", _encodeCleanest(s), "\n" ] + else: + parts = [ "(" ] + +def _encodeCanonical(rep, append): + """Given an s-expression in <rep>, encode it in canonical format, + passing each part to the function "append" as it is done. + """ + if isinstance(rep, str): + append(_encodeRaw(rep)) + return + + append("(") + + stack = [ ] + push = stack.append + pop = stack.pop + idx = 0 + while 1: + while idx == len(rep): + append(")") + try: + rep,idx = pop() + except IndexError: + return + if isinstance(rep[idx], str): + append(_encodeRaw(rep[idx])) + idx += 1 + continue + push((rep,idx+1)) + rep = rep[idx] + idx = 0 + append("(") + +def encode_canonical(rep): + """Return the canonical encoding of the s-expression <rep>. + + >>> encode_canonical("abc") + '3:abc' + >>> encode_canonical(["a"]) + '(1:a)' + >>> encode_canonical(["a", "bc"]) + '(1:a2:bc)' + >>> encode_canonical([[["X", "ab c"]], "d"]) + '(((1:X4:ab c))1:d)' + """ + parts = [] + _encodeCanonical(rep, parts.append) + return "".join(parts) + +def hash_canonical(rep, hashobj): + """Given a hashlib hash object <hashobj>, adds the canonical + encoding of the s-expression <rep> to hashobj. + + >>> import hashlib + >>> s = hashlib.sha256() + >>> s.update("(3:abc(6:hello 5:world)(1:9))") + >>> s.hexdigest() + '43f7726155f2700ff0d84240f3aaa9e5a1ee2e2c9e4702f7ac3ebcd45fd2f397' + >>> s = hashlib.sha256() + >>> hash_canonical(["abc", ["hello ", "world"], ["9"] ], s) + >>> s.hexdigest() + '43f7726155f2700ff0d84240f3aaa9e5a1ee2e2c9e4702f7ac3ebcd45fd2f397' + """ + _encodeCanonical(rep, hashobj.update) + +def _encodePretty(rep, append, indent_step=2, niceWidth=80): + stack = [] + idx = 0 + indent = 0 + append("(") + pop = stack.pop + push = stack.append + + while 1: + while idx == len(rep): + append(")") + indent -= indent_step + try: + rep,idx = pop() + except IndexError: + append("\n") + return + else: + append(" ") + if isinstance(rep[idx], str): + _encodePrettyPrint(rep[idx], append, indent, niceWidth) + idx += 1 + if idx < len(rep): + append(" ") + continue + push((rep,idx+1)) + rep = rep[idx] + idx = 0 + indent += indent_step + append("\n%s("%(" "*indent)) + + diff --git a/lib/sexp/parse.py b/lib/sexp/parse.py new file mode 100644 index 0000000..a33b79a --- /dev/null +++ b/lib/sexp/parse.py @@ -0,0 +1,210 @@ + +import re +import base64 +import binascii +import re + +# Partial implementation of Rivest's proposed S-Expressions standard +# as documented at +# http://people.csail.mit.edu/rivest/Sexp.txt +# +# It's slightly optimized. +# +# Not implemented: +# [display hints] +# {basic transport} + +__all__ = [ 'FormatError', 'parse' ] + +class FormatError(Exception): + """Raised when parsing fails.""" + pass + +_TOKEN_PAT = r"[a-zA-Z\-\.\/\_\:\*\+\=][a-zA-Z0-9\-\.\/\_\:\*\+\=]*" +# Regular expression to match a single lexeme from an encode s-expression. +_LEXEME_START_RE = re.compile( + r""" \s* (?: (%s) | # Grp 0: A token. + ([0-9]*(?: [\:\|\#\{] | # Grp1 : start of string... + \"(?:[^\\\"]+|\\.)*\")) | # or qstring. + ([\(\)]) # Grp 2: a paren of some kind. + )""" + %_TOKEN_PAT,re.X|re.M|re.S) + +class _P: + """Helper class for parenthesis tokens.""" + def __init__(self, val): + self.val = val + def __repr__(self): + return "_P(%r)"%self.val + +_OPEN_PAREN = _P("(") +_CLOSE_PAREN = _P(")") +del _P +_SPACE_RE = re.compile(r'\s+') + +# Matches all characters in a string that we need to unquote. +_UNQUOTE_CHAR_RE = re.compile(r''' + \\ (?: [abtnvfr] | \r \n ? | \n \r ? | [xX] [A-Fa-f0-9]{2} | [0-8]{1,3} ) + ''') + +# Map from quoted representation to unquoted format. +_UNQUOTE_CHAR_MAP = { 'a': '\a', + 'b': '\b', + 't': '\t', + 'n': '\n', + 'v': '\v', + 'f': '\f', + 'r': '\r' } +def _unquoteChar(ch, _U=_UNQUOTE_CHAR_MAP): + ch = ch[1:] + try: + return _U[ch] + except KeyError: + pass + if ch[0] in "\n\r": + return "" + elif ch[0] in 'xX': + return chr(int(ch[1:], 16)) + else: + i = int(ch[1:], 8) + if i >= 256: + raise FormatError("Octal character format out of range.") + return chr(i) + +def _lexItems(s): + """Generator that iterates over the lexical items in an encoded + s-expression. Yields a string for strings, or the special objects + _OPEN_PAREN and _CLOSE_PAREN. + + >>> list(_lexItems('(4:a)b hello) (world 1:a 0: ')) + [_P('('), 'a)b ', 'hello', _P(')'), _P('('), 'world', 'a', ''] + + >>> list(_lexItems('a b-c 1#20#2#2061##686877# |aGVsbG8gd29ybGQ|')) + ['a', 'b-c', ' ', ' a', 'hhw', 'hello world'] + + >>> list(_lexItems('#2 0# |aGVs\\nbG8 gd29yb GQ| ')) + [' ', 'hello world'] + + >>> list(_lexItems('|YWJjZA==| x |YWJjZA| 3|Y W J j|')) + ['abcd', 'x', 'abcd', 'abc'] + + >>> list(_lexItems('("1""234""hello world" 3"abc" 4" " )')) + [_P('('), '1', '234', 'hello world', 'abc', ' ', _P(')')] + + """ + s = s.strip() + while s: + m = _LEXEME_START_RE.match(s) + if not m: + raise FormatError("No pattern match at %r"%s[:30]) + g = m.groups() + if g[2]: + if g[2] == "(": + yield _OPEN_PAREN + else: + yield _CLOSE_PAREN + s = s[m.end():] + elif g[0]: + # we have a token. Go with that. + yield g[0] + s = s[m.end():] + else: + assert g[1] + lastChar = g[1][-1] + if lastChar == '"': + qidx = g[1].index('"') + quoted = g[1][qidx+1:-1] # All but quotes. + data = _UNQUOTE_CHAR_RE.sub(_unquoteChar, quoted) + if qidx != 0: + num = int(g[1][:qidx], 10) + if num != len(data): + raise FormatError("Bad length on quoted string") + yield data + s = s[m.end():] + continue + + num = g[1][:-1] + if len(num): + num = int(num, 10) + else: + num = None + + if lastChar == ':': + if num is None: + raise FormatError() + s = s[m.end():] + if len(s) < num: + raise FormatError() + yield s[:num] + s = s[num:] + elif lastChar == '#': + s = s[m.end():] + try: + nextHash = s.index('#') + except ValueError: + raise FormatError("Unterminated # string") + dataStr = _SPACE_RE.sub("", s[:nextHash]) + try: + data = binascii.a2b_hex(dataStr) + except TypeError: + raise FormatError("Bad hex string") + if num is not None and len(data) != num: + raise FormatError("Bad number on hex string") + yield data + s = s[nextHash+1:] + elif lastChar == '|': + s = s[m.end():] + try: + nextBar = s.index('|') + except ValueError: + raise FormatError("Unterminated | string") + dataStr = _SPACE_RE.sub("", s[:nextBar]) + # Re-pad. + mod = len(dataStr) % 4 + if mod: + dataStr += "=" * (4 - mod) + try: + data = binascii.a2b_base64(dataStr) + except TypeError: + raise FormatError("Bad base64 string") + if num is not None and len(data) != num: + raise FormatError("Bad number on base64 string") + yield data + s = s[nextBar+1:] + else: + assert None + +def parse(s): + """ + >>> parse("()") + [] + >>> parse("(1:X3:abc1:d)") + ['X', 'abc', 'd'] + >>> parse("(1:X((3:abc))1:d)") + ['X', [['abc']], 'd'] + >>> parse("(a b (d\\ne f) (g) #ff00ff# |aGVsbG8gd29ybGQ|)") + ['a', 'b', ['d', 'e', 'f'], ['g'], '\\xff\\x00\\xff', 'hello world'] + + """ + outermost = [] + stack = [ ] + push = stack.append + pop = stack.pop + add = outermost.append + + for item in _lexItems(s): + if item is _OPEN_PAREN: + next = [] + add(next) + push(add) + add = next.append + elif item is _CLOSE_PAREN: + add = pop() + else: + # it's a string. + add(item) + + if len(outermost) != 1: + raise FormatError("No enclosing parenthesis on list") + return outermost[0] + diff --git a/lib/sexp/tests.py b/lib/sexp/tests.py new file mode 100644 index 0000000..7decd03 --- /dev/null +++ b/lib/sexp/tests.py @@ -0,0 +1,31 @@ + +import unittest +import doctest + +import sexp.parse +import sexp.access +import sexp.encode + +class EncodingTest(unittest.TestCase): + def testQuotedString(self): + self.assertEquals(1,1) + + + +def suite(): + import sexp.tests + suite = unittest.TestSuite() + + suite.addTest(doctest.DocTestSuite(sexp.encode)) + suite.addTest(doctest.DocTestSuite(sexp.parse)) + suite.addTest(doctest.DocTestSuite(sexp.access)) + + loader = unittest.TestLoader() + suite.addTest(loader.loadTestsFromModule(sexp.tests)) + + return suite + + +if __name__ == '__main__': + + unittest.TextTestRunner(verbosity=1).run(suite()) |