summaryrefslogtreecommitdiff
path: root/src/leap/mx/vendor/pgpy/decorators.py
blob: d2b9926f48cf3c17dae70b1aef120886d48f34b1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
""" decorators.py
"""
import contextlib
import functools
import six
import warnings

try:
    from singledispatch import singledispatch

except ImportError:  # pragma: no cover
    from functools import singledispatch


from .errors import PGPError

__all__ = ['classproperty',
           'sdmethod',
           'sdproperty',
           'KeyAction']


def classproperty(fget):
    class ClassProperty(object):
        def __init__(self, fget):
            self.fget = fget
            self.__doc__ = fget.__doc__

        def __get__(self, cls, owner):
            return self.fget(owner)

        def __set__(self, obj, value):  # pragma: no cover
            raise AttributeError("Read-only attribute")

        def __delete__(self, obj):  # pragma: no cover
            raise AttributeError("Read-only attribute")

    return ClassProperty(fget)


def sdmethod(meth):
    """
    This is a hack to monkey patch sdproperty to work as expected with instance methods.
    """
    sd = singledispatch(meth)

    def wrapper(obj, *args, **kwargs):
        return sd.dispatch(args[0].__class__)(obj, *args, **kwargs)

    wrapper.register = sd.register
    wrapper.dispatch = sd.dispatch
    wrapper.registry = sd.registry
    wrapper._clear_cache = sd._clear_cache
    functools.update_wrapper(wrapper, meth)
    return wrapper


def sdproperty(fget):
    def defset(obj, val):  # pragma: no cover
        raise TypeError(str(val.__class__))

    class SDProperty(property):
        def register(self, cls=None, fset=None):
            return self.fset.register(cls, fset)

        def setter(self, fset):
            self.register(object, fset)
            return type(self)(self.fget, self.fset, self.fdel, self.__doc__)

    return SDProperty(fget, sdmethod(defset))


class KeyAction(object):
    def __init__(self, *usage, **conditions):
        super(KeyAction, self).__init__()
        self.flags = set(usage)
        self.conditions = conditions

    @contextlib.contextmanager
    def usage(self, key, user):
        def _preiter(first, iterable):
            yield first
            for item in iterable:
                yield item

        em = {}
        em['keyid'] = key.fingerprint.keyid
        em['flags'] = ', '.join(flag.name for flag in self.flags)

        if len(self.flags):
            for _key in _preiter(key, key.subkeys.values()):
                if self.flags & set(_key._get_key_flags(user)):
                    break

            else:  # pragma: no cover
                raise PGPError("Key {keyid:s} does not have the required usage flag {flags:s}".format(**em))

        else:
            _key = key

        if _key is not key:
            em['subkeyid'] = _key.fingerprint.keyid
            warnings.warn("Key {keyid:s} does not have the required usage flag {flags:s}; using subkey {subkeyid:s}"
                          "".format(**em), stacklevel=4)

        yield _key

    def check_attributes(self, key):
        for attr, expected in self.conditions.items():
            if getattr(key, attr) != expected:
                raise PGPError("Expected: {attr:s} == {eval:s}. Got: {got:s}"
                               "".format(attr=attr, eval=str(expected), got=str(getattr(key, attr))))

    def __call__(self, action):
        # @functools.wraps(action)
        @six.wraps(action)
        def _action(key, *args, **kwargs):
            if key._key is None:
                raise PGPError("No key!")

            # if a key is in the process of being created, it needs to be allowed to certify its own user id
            if len(key._uids) == 0 and key.is_primary and action is not key.certify.__wrapped__:
                raise PGPError("Key is not complete - please add a User ID!")

            with self.usage(key, kwargs.get('user', None)) as _key:
                self.check_attributes(key)

                # do the thing
                return action(_key, *args, **kwargs)

        return _action