diff options
Diffstat (limited to 'src/leap/base/jsonschema.py')
-rw-r--r-- | src/leap/base/jsonschema.py | 791 |
1 files changed, 791 insertions, 0 deletions
diff --git a/src/leap/base/jsonschema.py b/src/leap/base/jsonschema.py new file mode 100644 index 00000000..56689b08 --- /dev/null +++ b/src/leap/base/jsonschema.py @@ -0,0 +1,791 @@ +""" +An implementation of JSON Schema for Python + +The main functionality is provided by the validator classes for each of the +supported JSON Schema versions. + +Most commonly, :func:`validate` is the quickest way to simply validate a given +instance under a schema, and will create a validator for you. + +""" + +from __future__ import division, unicode_literals + +import collections +import json +import itertools +import operator +import re +import sys + + +__version__ = "0.8.0" + +PY3 = sys.version_info[0] >= 3 + +if PY3: + from urllib import parse as urlparse + from urllib.parse import unquote + from urllib.request import urlopen + basestring = unicode = str + iteritems = operator.methodcaller("items") +else: + from itertools import izip as zip + from urllib import unquote + from urllib2 import urlopen + import urlparse + iteritems = operator.methodcaller("iteritems") + + +FLOAT_TOLERANCE = 10 ** -15 +validators = {} + + +def validates(version): + """ + Register the decorated validator for a ``version`` of the specification. + + Registered validators and their meta schemas will be considered when + parsing ``$schema`` properties' URIs. + + :argument str version: an identifier to use as the version's name + :returns: a class decorator to decorate the validator with the version + + """ + + def _validates(cls): + validators[version] = cls + return cls + return _validates + + +class UnknownType(Exception): + """ + An attempt was made to check if an instance was of an unknown type. + + """ + + +class RefResolutionError(Exception): + """ + A JSON reference failed to resolve. + + """ + + +class SchemaError(Exception): + """ + The provided schema is malformed. + + The same attributes are present as for :exc:`ValidationError`\s. + + """ + + def __init__(self, message, validator=None, path=()): + super(SchemaError, self).__init__(message, validator, path) + self.message = message + self.path = list(path) + self.validator = validator + + def __str__(self): + return self.message + + +class ValidationError(Exception): + """ + The instance didn't properly validate under the provided schema. + + Relevant attributes are: + * ``message`` : a human readable message explaining the error + * ``path`` : a list containing the path to the offending element (or [] + if the error happened globally) in *reverse* order (i.e. + deepest index first). + + """ + + def __init__(self, message, validator=None, path=()): + # Any validator that recurses (e.g. properties and items) must append + # to the ValidationError's path to properly maintain where in the + # instance the error occurred + super(ValidationError, self).__init__(message, validator, path) + self.message = message + self.path = list(path) + self.validator = validator + + def __str__(self): + return self.message + + +@validates("draft3") +class Draft3Validator(object): + """ + A validator for JSON Schema draft 3. + + """ + + DEFAULT_TYPES = { + "array": list, "boolean": bool, "integer": int, "null": type(None), + "number": (int, float), "object": dict, "string": basestring, + } + + def __init__(self, schema, types=(), resolver=None): + self._types = dict(self.DEFAULT_TYPES) + self._types.update(types) + + if resolver is None: + resolver = RefResolver.from_schema(schema) + + self.resolver = resolver + self.schema = schema + + def is_type(self, instance, type): + if type == "any": + return True + elif type not in self._types: + raise UnknownType(type) + type = self._types[type] + + # bool inherits from int, so ensure bools aren't reported as integers + if isinstance(instance, bool): + type = _flatten(type) + if int in type and bool not in type: + return False + return isinstance(instance, type) + + def is_valid(self, instance, _schema=None): + error = next(self.iter_errors(instance, _schema), None) + return error is None + + @classmethod + def check_schema(cls, schema): + for error in cls(cls.META_SCHEMA).iter_errors(schema): + raise SchemaError( + error.message, validator=error.validator, path=error.path, + ) + + def iter_errors(self, instance, _schema=None): + if _schema is None: + _schema = self.schema + + for k, v in iteritems(_schema): + validator = getattr(self, "validate_%s" % (k.lstrip("$"),), None) + + if validator is None: + continue + + errors = validator(v, instance, _schema) or () + for error in errors: + # set the validator if it wasn't already set by the called fn + if error.validator is None: + error.validator = k + yield error + + def validate(self, *args, **kwargs): + for error in self.iter_errors(*args, **kwargs): + raise error + + def validate_type(self, types, instance, schema): + types = _list(types) + + for type in types: + if self.is_type(type, "object"): + if self.is_valid(instance, type): + return + elif self.is_type(type, "string"): + if self.is_type(instance, type): + return + else: + yield ValidationError(_types_msg(instance, types)) + + def validate_properties(self, properties, instance, schema): + if not self.is_type(instance, "object"): + return + + for property, subschema in iteritems(properties): + if property in instance: + for error in self.iter_errors(instance[property], subschema): + error.path.append(property) + yield error + elif subschema.get("required", False): + yield ValidationError( + "%r is a required property" % (property,), + validator="required", + path=[property], + ) + + def validate_patternProperties(self, patternProperties, instance, schema): + if not self.is_type(instance, "object"): + return + + for pattern, subschema in iteritems(patternProperties): + for k, v in iteritems(instance): + if re.match(pattern, k): + for error in self.iter_errors(v, subschema): + yield error + + def validate_additionalProperties(self, aP, instance, schema): + if not self.is_type(instance, "object"): + return + + extras = set(_find_additional_properties(instance, schema)) + + if self.is_type(aP, "object"): + for extra in extras: + for error in self.iter_errors(instance[extra], aP): + yield error + elif not aP and extras: + error = "Additional properties are not allowed (%s %s unexpected)" + yield ValidationError(error % _extras_msg(extras)) + + def validate_dependencies(self, dependencies, instance, schema): + if not self.is_type(instance, "object"): + return + + for property, dependency in iteritems(dependencies): + if property not in instance: + continue + + if self.is_type(dependency, "object"): + for error in self.iter_errors(instance, dependency): + yield error + else: + dependencies = _list(dependency) + for dependency in dependencies: + if dependency not in instance: + yield ValidationError( + "%r is a dependency of %r" % (dependency, property) + ) + + def validate_items(self, items, instance, schema): + if not self.is_type(instance, "array"): + return + + if self.is_type(items, "object"): + for index, item in enumerate(instance): + for error in self.iter_errors(item, items): + error.path.append(index) + yield error + else: + for (index, item), subschema in zip(enumerate(instance), items): + for error in self.iter_errors(item, subschema): + error.path.append(index) + yield error + + def validate_additionalItems(self, aI, instance, schema): + if ( + not self.is_type(instance, "array") or + not self.is_type(schema.get("items"), "array") + ): + return + + if self.is_type(aI, "object"): + for item in instance[len(schema):]: + for error in self.iter_errors(item, aI): + yield error + elif not aI and len(instance) > len(schema.get("items", [])): + error = "Additional items are not allowed (%s %s unexpected)" + yield ValidationError( + error % _extras_msg(instance[len(schema.get("items", [])):]) + ) + + def validate_minimum(self, minimum, instance, schema): + if not self.is_type(instance, "number"): + return + + instance = float(instance) + if schema.get("exclusiveMinimum", False): + failed = instance <= minimum + cmp = "less than or equal to" + else: + failed = instance < minimum + cmp = "less than" + + if failed: + yield ValidationError( + "%r is %s the minimum of %r" % (instance, cmp, minimum) + ) + + def validate_maximum(self, maximum, instance, schema): + if not self.is_type(instance, "number"): + return + + instance = float(instance) + if schema.get("exclusiveMaximum", False): + failed = instance >= maximum + cmp = "greater than or equal to" + else: + failed = instance > maximum + cmp = "greater than" + + if failed: + yield ValidationError( + "%r is %s the maximum of %r" % (instance, cmp, maximum) + ) + + def validate_minItems(self, mI, instance, schema): + if self.is_type(instance, "array") and len(instance) < mI: + yield ValidationError("%r is too short" % (instance,)) + + def validate_maxItems(self, mI, instance, schema): + if self.is_type(instance, "array") and len(instance) > mI: + yield ValidationError("%r is too long" % (instance,)) + + def validate_uniqueItems(self, uI, instance, schema): + if uI and self.is_type(instance, "array") and not _uniq(instance): + yield ValidationError("%r has non-unique elements" % instance) + + def validate_pattern(self, patrn, instance, schema): + if self.is_type(instance, "string") and not re.match(patrn, instance): + yield ValidationError("%r does not match %r" % (instance, patrn)) + + def validate_minLength(self, mL, instance, schema): + if self.is_type(instance, "string") and len(instance) < mL: + yield ValidationError("%r is too short" % (instance,)) + + def validate_maxLength(self, mL, instance, schema): + if self.is_type(instance, "string") and len(instance) > mL: + yield ValidationError("%r is too long" % (instance,)) + + def validate_enum(self, enums, instance, schema): + if instance not in enums: + yield ValidationError("%r is not one of %r" % (instance, enums)) + + def validate_divisibleBy(self, dB, instance, schema): + if not self.is_type(instance, "number"): + return + + if isinstance(dB, float): + mod = instance % dB + failed = (mod > FLOAT_TOLERANCE) and (dB - mod) > FLOAT_TOLERANCE + else: + failed = instance % dB + + if failed: + yield ValidationError("%r is not divisible by %r" % (instance, dB)) + + def validate_disallow(self, disallow, instance, schema): + for disallowed in _list(disallow): + if self.is_valid(instance, {"type": [disallowed]}): + yield ValidationError( + "%r is disallowed for %r" % (disallowed, instance) + ) + + def validate_extends(self, extends, instance, schema): + if self.is_type(extends, "object"): + extends = [extends] + for subschema in extends: + for error in self.iter_errors(instance, subschema): + yield error + + def validate_ref(self, ref, instance, schema): + resolved = self.resolver.resolve(ref) + for error in self.iter_errors(instance, resolved): + yield error + + +Draft3Validator.META_SCHEMA = { + "$schema": "http://json-schema.org/draft-03/schema#", + "id": "http://json-schema.org/draft-03/schema#", + "type": "object", + + "properties": { + "type": { + "type": ["string", "array"], + "items": {"type": ["string", {"$ref": "#"}]}, + "uniqueItems": True, + "default": "any" + }, + "properties": { + "type": "object", + "additionalProperties": {"$ref": "#", "type": "object"}, + "default": {} + }, + "patternProperties": { + "type": "object", + "additionalProperties": {"$ref": "#"}, + "default": {} + }, + "additionalProperties": { + "type": [{"$ref": "#"}, "boolean"], "default": {} + }, + "items": { + "type": [{"$ref": "#"}, "array"], + "items": {"$ref": "#"}, + "default": {} + }, + "additionalItems": { + "type": [{"$ref": "#"}, "boolean"], "default": {} + }, + "required": {"type": "boolean", "default": False}, + "dependencies": { + "type": ["string", "array", "object"], + "additionalProperties": { + "type": ["string", "array", {"$ref": "#"}], + "items": {"type": "string"} + }, + "default": {} + }, + "minimum": {"type": "number"}, + "maximum": {"type": "number"}, + "exclusiveMinimum": {"type": "boolean", "default": False}, + "exclusiveMaximum": {"type": "boolean", "default": False}, + "minItems": {"type": "integer", "minimum": 0, "default": 0}, + "maxItems": {"type": "integer", "minimum": 0}, + "uniqueItems": {"type": "boolean", "default": False}, + "pattern": {"type": "string", "format": "regex"}, + "minLength": {"type": "integer", "minimum": 0, "default": 0}, + "maxLength": {"type": "integer"}, + "enum": {"type": "array", "minItems": 1, "uniqueItems": True}, + "default": {"type": "any"}, + "title": {"type": "string"}, + "description": {"type": "string"}, + "format": {"type": "string"}, + "maxDecimal": {"type": "number", "minimum": 0}, + "divisibleBy": { + "type": "number", + "minimum": 0, + "exclusiveMinimum": True, + "default": 1 + }, + "disallow": { + "type": ["string", "array"], + "items": {"type": ["string", {"$ref": "#"}]}, + "uniqueItems": True + }, + "extends": { + "type": [{"$ref": "#"}, "array"], + "items": {"$ref": "#"}, + "default": {} + }, + "id": {"type": "string", "format": "uri"}, + "$ref": {"type": "string", "format": "uri"}, + "$schema": {"type": "string", "format": "uri"}, + }, + "dependencies": { + "exclusiveMinimum": "minimum", "exclusiveMaximum": "maximum" + }, +} + + +class RefResolver(object): + """ + Resolve JSON References. + + :argument str base_uri: URI of the referring document + :argument referrer: the actual referring document + :argument dict store: a mapping from URIs to documents to cache + + """ + + def __init__(self, base_uri, referrer, store=()): + self.base_uri = base_uri + self.referrer = referrer + self.store = dict(store, **_meta_schemas()) + + @classmethod + def from_schema(cls, schema, *args, **kwargs): + """ + Construct a resolver from a JSON schema object. + + :argument schema schema: the referring schema + :rtype: :class:`RefResolver` + + """ + + return cls(schema.get("id", ""), schema, *args, **kwargs) + + def resolve(self, ref): + """ + Resolve a JSON ``ref``. + + :argument str ref: reference to resolve + :returns: the referrant document + + """ + + base_uri = self.base_uri + uri, fragment = urlparse.urldefrag(urlparse.urljoin(base_uri, ref)) + + if uri in self.store: + document = self.store[uri] + elif not uri or uri == self.base_uri: + document = self.referrer + else: + document = self.resolve_remote(uri) + + return self.resolve_fragment(document, fragment.lstrip("/")) + + def resolve_fragment(self, document, fragment): + """ + Resolve a ``fragment`` within the referenced ``document``. + + :argument document: the referrant document + :argument str fragment: a URI fragment to resolve within it + + """ + + parts = unquote(fragment).split("/") if fragment else [] + + for part in parts: + part = part.replace("~1", "/").replace("~0", "~") + + if part not in document: + raise RefResolutionError( + "Unresolvable JSON pointer: %r" % fragment + ) + + document = document[part] + + return document + + def resolve_remote(self, uri): + """ + Resolve a remote ``uri``. + + Does not check the store first. + + :argument str uri: the URI to resolve + :returns: the retrieved document + + """ + + return json.load(urlopen(uri)) + + +class ErrorTree(object): + """ + ErrorTrees make it easier to check which validations failed. + + """ + + def __init__(self, errors=()): + self.errors = {} + self._contents = collections.defaultdict(self.__class__) + + for error in errors: + container = self + for element in reversed(error.path): + container = container[element] + container.errors[error.validator] = error + + def __contains__(self, k): + return k in self._contents + + def __getitem__(self, k): + """ + Retrieve the child tree with key ``k``. + + """ + + return self._contents[k] + + def __setitem__(self, k, v): + self._contents[k] = v + + def __iter__(self): + return iter(self._contents) + + def __len__(self): + return self.total_errors + + def __repr__(self): + return "<%s (%s total errors)>" % (self.__class__.__name__, len(self)) + + @property + def total_errors(self): + """ + The total number of errors in the entire tree, including children. + + """ + + child_errors = sum(len(tree) for _, tree in iteritems(self._contents)) + return len(self.errors) + child_errors + + +def _meta_schemas(): + """ + Collect the urls and meta schemas from each known validator. + + """ + + meta_schemas = (v.META_SCHEMA for v in validators.values()) + return dict((urlparse.urldefrag(m["id"])[0], m) for m in meta_schemas) + + +def _find_additional_properties(instance, schema): + """ + Return the set of additional properties for the given ``instance``. + + Weeds out properties that should have been validated by ``properties`` and + / or ``patternProperties``. + + Assumes ``instance`` is dict-like already. + + """ + + properties = schema.get("properties", {}) + patterns = "|".join(schema.get("patternProperties", {})) + for property in instance: + if property not in properties: + if patterns and re.search(patterns, property): + continue + yield property + + +def _extras_msg(extras): + """ + Create an error message for extra items or properties. + + """ + + if len(extras) == 1: + verb = "was" + else: + verb = "were" + return ", ".join(repr(extra) for extra in extras), verb + + +def _types_msg(instance, types): + """ + Create an error message for a failure to match the given types. + + If the ``instance`` is an object and contains a ``name`` property, it will + be considered to be a description of that object and used as its type. + + Otherwise the message is simply the reprs of the given ``types``. + + """ + + reprs = [] + for type in types: + try: + reprs.append(repr(type["name"])) + except Exception: + reprs.append(repr(type)) + return "%r is not of type %s" % (instance, ", ".join(reprs)) + + +def _flatten(suitable_for_isinstance): + """ + isinstance() can accept a bunch of really annoying different types: + * a single type + * a tuple of types + * an arbitrary nested tree of tuples + + Return a flattened tuple of the given argument. + + """ + + types = set() + + if not isinstance(suitable_for_isinstance, tuple): + suitable_for_isinstance = (suitable_for_isinstance,) + for thing in suitable_for_isinstance: + if isinstance(thing, tuple): + types.update(_flatten(thing)) + else: + types.add(thing) + return tuple(types) + + +def _list(thing): + """ + Wrap ``thing`` in a list if it's a single str. + + Otherwise, return it unchanged. + + """ + + if isinstance(thing, basestring): + return [thing] + return thing + + +def _delist(thing): + """ + Unwrap ``thing`` to a single element if its a single str in a list. + + Otherwise, return it unchanged. + + """ + + if ( + isinstance(thing, list) and + len(thing) == 1 + and isinstance(thing[0], basestring) + ): + return thing[0] + return thing + + +def _unbool(element, true=object(), false=object()): + """ + A hack to make True and 1 and False and 0 unique for _uniq. + + """ + + if element is True: + return true + elif element is False: + return false + return element + + +def _uniq(container): + """ + Check if all of a container's elements are unique. + + Successively tries first to rely that the elements are hashable, then + falls back on them being sortable, and finally falls back on brute + force. + + """ + + try: + return len(set(_unbool(i) for i in container)) == len(container) + except TypeError: + try: + sort = sorted(_unbool(i) for i in container) + sliced = itertools.islice(sort, 1, None) + for i, j in zip(sort, sliced): + if i == j: + return False + except (NotImplementedError, TypeError): + seen = [] + for e in container: + e = _unbool(e) + if e in seen: + return False + seen.append(e) + return True + + +def validate(instance, schema, cls=Draft3Validator, *args, **kwargs): + """ + Validate an ``instance`` under the given ``schema``. + + >>> validate([2, 3, 4], {"maxItems" : 2}) + Traceback (most recent call last): + ... + ValidationError: [2, 3, 4] is too long + + :func:`validate` will first verify that the provided schema is itself + valid, since not doing so can lead to less obvious error messages and fail + in less obvious or consistent ways. If you know you have a valid schema + already or don't care, you might prefer using the ``validate`` method + directly on a specific validator (e.g. :meth:`Draft3Validator.validate`). + + ``cls`` is a validator class that will be used to validate the instance. + By default this is a draft 3 validator. Any other provided positional and + keyword arguments will be provided to this class when constructing a + validator. + + :raises: + :exc:`ValidationError` if the instance is invalid + + :exc:`SchemaError` if the schema itself is invalid + + """ + + cls.check_schema(schema) + cls(schema, *args, **kwargs).validate(instance) |