From c9662e6aff3a3f893b6a0f84d0e68c7f239ab117 Mon Sep 17 00:00:00 2001 From: Ruben Pollan Date: Tue, 16 Sep 2014 10:51:09 -0500 Subject: Add invalidation timeout to cache --- src/leap/common/decorators.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) (limited to 'src/leap') diff --git a/src/leap/common/decorators.py b/src/leap/common/decorators.py index 2ef6711..99c3653 100644 --- a/src/leap/common/decorators.py +++ b/src/leap/common/decorators.py @@ -144,17 +144,20 @@ class _memoized(object): return functools.partial(self.__call__, obj) -def memoized_method(function=None, ignore_kwargs=None): +def memoized_method(function=None, ignore_kwargs=None, + invalidation=_memoized.CACHE_INVALIDATION_DELTA): """ Wrap _memoized to allow for deferred calling :type function: callable, or None. :type ignore_kwargs: None, True or tuple. + :type invalidation: int seconds. """ if function: - return _memoized(function, is_method=True) + return _memoized(function, is_method=True, invalidation=invalidation) else: def wrapper(function): return _memoized( - function, ignore_kwargs=ignore_kwargs, is_method=True) + function, ignore_kwargs=ignore_kwargs, is_method=True, + invalidation=invalidation) return wrapper -- cgit v1.2.3 From 97831bcee1df96aa34a57f0a15f1d57006e96d46 Mon Sep 17 00:00:00 2001 From: Ruben Pollan Date: Wed, 3 Dec 2014 09:42:45 -0600 Subject: Extract the environment set up and tear down for tests Using trial there is no setUpClass neither tearDownClass, the setting up of the environment needs to be in an external class to be able to call it from inhereted classes. --- src/leap/common/testing/basetest.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) (limited to 'src/leap') diff --git a/src/leap/common/testing/basetest.py b/src/leap/common/testing/basetest.py index 54826d5..3c6fc29 100644 --- a/src/leap/common/testing/basetest.py +++ b/src/leap/common/testing/basetest.py @@ -40,6 +40,14 @@ class BaseLeapTest(unittest.TestCase): @classmethod def setUpClass(cls): + cls.setUpEnv() + + @classmethod + def tearDownClass(cls): + cls.tearDownEnv() + + @classmethod + def setUpEnv(cls): """ Sets up common facilities for testing this TestCase: - custom PATH and HOME environmental variables @@ -57,7 +65,7 @@ class BaseLeapTest(unittest.TestCase): os.environ["HOME"] = cls.tempdir @classmethod - def tearDownClass(cls): + def tearDownEnv(cls): """ Cleanup common facilities used for testing this TestCase: - restores the default PATH and HOME variables -- cgit v1.2.3 From 7664fc93ebd5797f16b15d9f6f42acba3fd111dd Mon Sep 17 00:00:00 2001 From: Ruben Pollan Date: Wed, 3 Dec 2014 10:40:38 -0600 Subject: Add support for deferreds to memoize_method decorator --- src/leap/common/decorators.py | 92 +++++++++++++++++++++++++++++++------------ 1 file changed, 66 insertions(+), 26 deletions(-) (limited to 'src/leap') diff --git a/src/leap/common/decorators.py b/src/leap/common/decorators.py index 99c3653..4b07ea9 100644 --- a/src/leap/common/decorators.py +++ b/src/leap/common/decorators.py @@ -22,6 +22,13 @@ import datetime import functools import logging +try: + from twisted.internet import defer +except ImportError: + class defer: + class Deferred: + pass + logger = logging.getLogger(__name__) @@ -59,6 +66,7 @@ class _memoized(object): # consume a huge amount of memory. self.cache = {} self.cache_ts = {} + self.is_deferred = {} def __call__(self, *args, **kwargs): """ @@ -67,28 +75,7 @@ class _memoized(object): :tyoe args: tuple :type kwargs: dict """ - def ret_or_raise(value): - """ - Returns the value except if it is an exception, - in which case it's raised. - """ - if isinstance(value, Exception): - raise value - return value - - if self.is_method: - # forget about `self` as key - key_args = args[1:] - else: - key_args = args - - if self.ignore_kwargs is True: - key = key_args - else: - key = (key_args, frozenset( - [(k, v) for k, v in kwargs.items() - if k not in self.ignore_kwargs])) - + key = self._build_key(*args, **kwargs) if not isinstance(key, collections.Hashable): # uncacheable. a list, for instance. # better to not cache than blow up. @@ -97,9 +84,9 @@ class _memoized(object): if key in self.cache: if self._is_cache_still_valid(key): - value = self.cache[key] logger.debug("Got value from cache...") - return ret_or_raise(value) + value = self._get_value(key) + return self._ret_or_raise(value) else: logger.debug("Cache is invalid, evaluating again...") @@ -109,9 +96,52 @@ class _memoized(object): except Exception as exc: logger.error("Exception while calling function: %r" % (exc,)) value = exc - self.cache[key] = value + + if isinstance(value, defer.Deferred): + value.addBoth(self._store_deferred_value(key)) + else: + self.cache[key] = value + self.is_deferred[key] = False self.cache_ts[key] = datetime.datetime.now() - return ret_or_raise(value) + return self._ret_or_raise(value) + + def _build_key(self, *args, **kwargs): + """ + Build key from the function arguments + """ + if self.is_method: + # forget about `self` as key + key_args = args[1:] + else: + key_args = args + + if self.ignore_kwargs is True: + key = key_args + else: + key = (key_args, frozenset( + [(k, v) for k, v in kwargs.items() + if k not in self.ignore_kwargs])) + return key + + def _get_value(self, key): + """ + Get a value form cache for a key + """ + if self.is_deferred[key]: + value = self.cache[key] + # it produces an errback if value is Failure + return defer.succeed(value) + else: + return self.cache[key] + + def _ret_or_raise(self, value): + """ + Returns the value except if it is an exception, + in which case it's raised. + """ + if isinstance(value, Exception): + raise value + return value def _is_cache_still_valid(self, key, now=datetime.datetime.now): """ @@ -131,6 +161,16 @@ class _memoized(object): delta = datetime.timedelta(seconds=self.CACHE_INVALIDATION_DELTA) return (now() - cached_ts) < delta + def _store_deferred_value(self, key): + """ + Returns a callback to store values from deferreds + """ + def callback(value): + self.cache[key] = value + self.is_deferred[key] = True + return value + return callback + def __repr__(self): """ Return the function's docstring. -- cgit v1.2.3 From 6ef20e16abb80ea0f265a35b7ecdbb38fb29298c Mon Sep 17 00:00:00 2001 From: Ivan Alejandro Date: Mon, 12 Jan 2015 17:13:05 -0300 Subject: Consider different possibilities for tmpdir. In some systems the used tmp dir is like '/tmp/leap_tests-asdf' and in others is like '/tmp/username/leap_tests-asdf'. With this fix we protect the home dir and consider different possible temp directories. --- src/leap/common/testing/basetest.py | 3 +++ 1 file changed, 3 insertions(+) (limited to 'src/leap') diff --git a/src/leap/common/testing/basetest.py b/src/leap/common/testing/basetest.py index 3c6fc29..efaedc3 100644 --- a/src/leap/common/testing/basetest.py +++ b/src/leap/common/testing/basetest.py @@ -77,6 +77,9 @@ class BaseLeapTest(unittest.TestCase): # XXX needs to adapt to non-linuces leap_assert( cls.tempdir.startswith('/tmp/leap_tests-') or + (cls.tempdir.startswith('/tmp/') and + cls.tempdir.startswith(tempfile.gettempdir()) and + 'leap_tests-' in cls.tempdir) or cls.tempdir.startswith('/var/folder'), "beware! tried to remove a dir which does not " "live in temporal folder!") -- cgit v1.2.3 From 00bfb22368c788bfc5de458e9159ee73cf3e66c7 Mon Sep 17 00:00:00 2001 From: Ivan Alejandro Date: Wed, 20 May 2015 18:39:10 -0300 Subject: [bug] get certificate times as UTC, add tests The certificate validity times were converted to local time and later on compared with UTC time, which caused the certificate not being updated at the right times. Add tests to be sure this is not happenning again. Add a joint pem file for the existing cert and key files to ease test. - Resolves: #6994 --- src/leap/common/certs.py | 17 +-- .../common/testing/leaptest_combined_keycert.pem | 127 +++++++++++++++++++++ src/leap/common/tests/test_certs.py | 97 ++++++++++++++++ 3 files changed, 233 insertions(+), 8 deletions(-) create mode 100644 src/leap/common/testing/leaptest_combined_keycert.pem create mode 100644 src/leap/common/tests/test_certs.py (limited to 'src/leap') diff --git a/src/leap/common/certs.py b/src/leap/common/certs.py index 4fe563b..db513f6 100644 --- a/src/leap/common/certs.py +++ b/src/leap/common/certs.py @@ -128,22 +128,23 @@ def is_valid_pemfile(cert): return can_load_cert_and_pkey(cert) -def get_cert_time_boundaries(certfile): +def get_cert_time_boundaries(certdata): """ - Returns the time boundaries for the certificate saved in certfile + Return the time boundaries for the given certificate. + The returned values are UTC/GMT time.struct_time objects - :param certfile: path to certificate - :type certfile: str + :param certdata: the certificate contents + :type certdata: str :rtype: tuple (from, to) """ - cert = get_cert_from_string(certfile) + cert = get_cert_from_string(certdata) leap_assert(cert, 'There was a problem loading the certificate') fromts, tots = (cert.get_notBefore(), cert.get_notAfter()) - from_, to_ = map( - lambda ts: time.gmtime(time.mktime(dateparse(ts).timetuple())), - (fromts, tots)) + from_ = dateparse(fromts).timetuple() + to_ = dateparse(tots).timetuple() + return from_, to_ diff --git a/src/leap/common/testing/leaptest_combined_keycert.pem b/src/leap/common/testing/leaptest_combined_keycert.pem new file mode 100644 index 0000000..f1e0fb2 --- /dev/null +++ b/src/leap/common/testing/leaptest_combined_keycert.pem @@ -0,0 +1,127 @@ +-----BEGIN PRIVATE KEY----- +MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQDAT67c8y4ORa6o +/gor3gAouJI6W6R7YlPzzh4uaba4YhqgAEZzcZ7S589CqON8Eo8G2tQvh+ZSaNmP +idFDjFxfpb1sBMb9Nu720s0V8UMxzCDh7oyePZGdKP+F04+MkB0eHWSObBrzsjsj +KVsD/5fUeLbd1SdK1C2cKzsoRQ+GVEyMJeiISFlV05fxdAZSybPcwH+3aiVX+7cM +9ek8bufIeqrAfZVdvO/2Fuv6WfR2MEe32XO/9gj02XSGWd6sPXMn/lflKvNNKEWC +UWdqTArGi0CqQ4D+Q73ruUIthCJr+7LWK+7QKG980a1RR7GZJvs3skhUDh3eua2u +ICPiqQLlAgMBAAECggEBAKJr6j0UagaF1dFG1eJc2neKA36kXdQTpOIaaKU8haVO +vjv6X4YrJT/tpsAfEhp9Ni1M7r7CIcXiZjVz6bkKOA5UVhqAImxEVClEuw/YN688 +P11yc3NGftBkiwNFPk0yflUr7/zV0yGVm5rD1+oVme9KkO/kkg4CDA+E9664PTdu +S4NVvL6OgHUG2nucqIz0kzUBapo7okIkvggfldeDYF47rn/e0VikNpKkMzoCzWs+ +3ryzldfigwg18rE2nltQYk/Mi+H30iNTNEonLjs9YbiDPn8iy5SZ7xY4lAO1Urev +skdY04hmkOzh9JEETHeTj1LUIw7tfRdS3X6B+19gkCECgYEA+41UOLBb0ZHsGjsA +aZBtI4g8mvGTOFe/Az7Jm0404z6e8AIe4NeSJoJ6oV6WG2BmKvqTViYT2QnmauHA +WB1xJ5fH3PK7/LjICI2QE247E294+g5p1KYysALc2fChfJNpdfETzWJAUSSBFqxb +amCDZ4Gc8S/V44fEkJKPLuAvxL0CgYEAw7YyGsEX+lDGzumFQY4wz20+Cyvilnx0 +xoFGQ85RSprZttU64PID/u1HD9/Nv4lGcBDcTTtrmOEk9x51YzttrsPF9sn+Wj0y +eahuR32Qc1652Y7fAKgN2vIZRFBcGpAsemcoqmYFRMImX60G0areKaclooimTbJA +Si52P4IKnUkCgYAI+07ah1F/9hncBedJ3aJH9oFTdvSuulNTplZEeVJiGsZKA4le +tdO+FEKUqG/rolGDj1bbaJik0zmq70yS2NpFc6HrPa+Aoohh5cwTJYhudTh4lTMq +KJT+u9tu3KynagwF7gmq96scOpVxXc4VykRm2bXk1rRoX1yhXNpH7jFGcQKBgCn/ +v2DebzbYftGIa4BV80OQPfBHyqhgrO6sb1e9vtQzxuTlfW0ogpMCeG1/qbegzeze +sWghiEWWi0g80RQqfK80dBcx4dObrmlNK91LpOQdP+TgNBr/9Xk22xU96YYJyoG6 +AZAPtLG8uF9v0jbMZECsDfeDO60Qw5snvViDn6OBAoGACAbbQCbuh0cduDVwn0uz +8XDyRTKDhOsJ3p6UB1TjwvbLeoPI93Dv1gpEeoqELtCu+ZXr3+/i/IbwrxucrFNn +L/s+4zR2mlEAxBdtCFsH/Qf7+iYpBFSo4YReXz3xR+EVTmswzodJCbtF74oQSr6o +7d56Rd/5k2UkFeXnlGOLyAc= +-----END PRIVATE KEY----- +Certificate: + Data: + Version: 3 (0x2) + Serial Number: 4 (0x4) + Signature Algorithm: sha1WithRSAEncryption + Issuer: C=US, ST=CA, L=Cyberspace, O=LEAP Encryption Access Project, OU=cyberspace, CN=tests-leap.se/name=tests-leap/emailAddress=tests@leap.se + Validity + Not Before: Sep 3 17:52:16 2013 GMT + Not After : Sep 1 17:52:16 2023 GMT + Subject: C=US, ST=CA, L=Cyberspace, O=LEAP Encryption Access Project, OU=cyberspace, CN=localhost/name=tests-leap/emailAddress=tests@leap.se + Subject Public Key Info: + Public Key Algorithm: rsaEncryption + Public-Key: (2048 bit) + Modulus: + 00:c0:4f:ae:dc:f3:2e:0e:45:ae:a8:fe:0a:2b:de: + 00:28:b8:92:3a:5b:a4:7b:62:53:f3:ce:1e:2e:69: + b6:b8:62:1a:a0:00:46:73:71:9e:d2:e7:cf:42:a8: + e3:7c:12:8f:06:da:d4:2f:87:e6:52:68:d9:8f:89: + d1:43:8c:5c:5f:a5:bd:6c:04:c6:fd:36:ee:f6:d2: + cd:15:f1:43:31:cc:20:e1:ee:8c:9e:3d:91:9d:28: + ff:85:d3:8f:8c:90:1d:1e:1d:64:8e:6c:1a:f3:b2: + 3b:23:29:5b:03:ff:97:d4:78:b6:dd:d5:27:4a:d4: + 2d:9c:2b:3b:28:45:0f:86:54:4c:8c:25:e8:88:48: + 59:55:d3:97:f1:74:06:52:c9:b3:dc:c0:7f:b7:6a: + 25:57:fb:b7:0c:f5:e9:3c:6e:e7:c8:7a:aa:c0:7d: + 95:5d:bc:ef:f6:16:eb:fa:59:f4:76:30:47:b7:d9: + 73:bf:f6:08:f4:d9:74:86:59:de:ac:3d:73:27:fe: + 57:e5:2a:f3:4d:28:45:82:51:67:6a:4c:0a:c6:8b: + 40:aa:43:80:fe:43:bd:eb:b9:42:2d:84:22:6b:fb: + b2:d6:2b:ee:d0:28:6f:7c:d1:ad:51:47:b1:99:26: + fb:37:b2:48:54:0e:1d:de:b9:ad:ae:20:23:e2:a9: + 02:e5 + Exponent: 65537 (0x10001) + X509v3 extensions: + X509v3 Basic Constraints: + CA:FALSE + Netscape Cert Type: + SSL Server + Netscape Comment: + Easy-RSA Generated Server Certificate + X509v3 Subject Key Identifier: + 51:92:B6:A3:D7:D6:EC:8F:FC:16:C5:D4:0F:87:CC:EA:5C:7C:17:81 + X509v3 Authority Key Identifier: + keyid:36:19:70:96:A9:5C:FE:A3:82:0F:79:95:31:52:2B:4A:41:BD:81:CB + DirName:/C=US/ST=CA/L=Cyberspace/O=LEAP Encryption Access Project/OU=cyberspace/CN=tests-leap.se/name=tests-leap/emailAddress=tests@leap.se + serial:8B:BF:41:63:1E:10:6A:EC + + X509v3 Extended Key Usage: + TLS Web Server Authentication + X509v3 Key Usage: + Digital Signature, Key Encipherment + Signature Algorithm: sha1WithRSAEncryption + 88:d9:35:e0:d9:fa:fd:6b:57:e2:4d:f6:ef:91:6f:56:a6:2b: + 1a:1e:ec:8f:b0:18:e3:ec:ca:c9:1e:78:07:1d:0f:cf:fe:09: + 21:84:25:c4:27:ea:22:d7:48:53:73:ed:78:0f:42:5c:c7:f7: + 38:04:84:df:67:99:fd:75:6f:e8:dc:3b:91:ab:fa:c7:32:e5: + fd:3b:ce:de:7c:6a:df:39:46:1e:46:3a:4d:e1:e1:60:f3:bf: + aa:b2:0b:5d:ee:f2:0c:ee:82:7f:b5:02:75:04:47:d5:2b:c8: + e0:6d:6a:10:4a:ca:e0:c3:4f:ee:ff:15:71:37:f5:4d:95:38: + fe:a3:da:84:46:90:04:c2:61:86:a0:7f:e2:7d:62:46:6d:f6: + f8:90:51:88:c3:f2:8c:ca:b3:89:40:9f:6b:8b:33:65:e1:fd: + 0f:8b:d7:6a:93:dc:de:be:85:07:c7:d1:1d:b5:db:70:54:9f: + 95:d8:fb:11:f7:a7:e6:90:ba:9b:28:0e:3d:47:7a:63:6d:60: + 44:f6:96:aa:b6:a2:bc:0a:e5:25:c8:a2:74:91:54:95:bb:e2: + 09:01:56:73:6e:56:e8:6f:d6:a5:d8:18:96:c1:82:ef:2c:9e: + e2:4c:94:bc:00:71:5e:16:49:6b:e4:94:7a:d1:0c:2e:f4:19: + b2:2a:c2:b8 +-----BEGIN CERTIFICATE----- +MIIFdDCCBFygAwIBAgIBBDANBgkqhkiG9w0BAQUFADCBuDELMAkGA1UEBhMCVVMx +CzAJBgNVBAgTAkNBMRMwEQYDVQQHEwpDeWJlcnNwYWNlMScwJQYDVQQKEx5MRUFQ +IEVuY3J5cHRpb24gQWNjZXNzIFByb2plY3QxEzARBgNVBAsTCmN5YmVyc3BhY2Ux +FjAUBgNVBAMTDXRlc3RzLWxlYXAuc2UxEzARBgNVBCkTCnRlc3RzLWxlYXAxHDAa +BgkqhkiG9w0BCQEWDXRlc3RzQGxlYXAuc2UwHhcNMTMwOTAzMTc1MjE2WhcNMjMw +OTAxMTc1MjE2WjCBtDELMAkGA1UEBhMCVVMxCzAJBgNVBAgTAkNBMRMwEQYDVQQH +EwpDeWJlcnNwYWNlMScwJQYDVQQKEx5MRUFQIEVuY3J5cHRpb24gQWNjZXNzIFBy +b2plY3QxEzARBgNVBAsTCmN5YmVyc3BhY2UxEjAQBgNVBAMTCWxvY2FsaG9zdDET +MBEGA1UEKRMKdGVzdHMtbGVhcDEcMBoGCSqGSIb3DQEJARYNdGVzdHNAbGVhcC5z +ZTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAMBPrtzzLg5Frqj+Cive +ACi4kjpbpHtiU/POHi5ptrhiGqAARnNxntLnz0Ko43wSjwba1C+H5lJo2Y+J0UOM +XF+lvWwExv027vbSzRXxQzHMIOHujJ49kZ0o/4XTj4yQHR4dZI5sGvOyOyMpWwP/ +l9R4tt3VJ0rULZwrOyhFD4ZUTIwl6IhIWVXTl/F0BlLJs9zAf7dqJVf7twz16Txu +58h6qsB9lV287/YW6/pZ9HYwR7fZc7/2CPTZdIZZ3qw9cyf+V+Uq800oRYJRZ2pM +CsaLQKpDgP5Dveu5Qi2EImv7stYr7tAob3zRrVFHsZkm+zeySFQOHd65ra4gI+Kp +AuUCAwEAAaOCAYkwggGFMAkGA1UdEwQCMAAwEQYJYIZIAYb4QgEBBAQDAgZAMDQG +CWCGSAGG+EIBDQQnFiVFYXN5LVJTQSBHZW5lcmF0ZWQgU2VydmVyIENlcnRpZmlj +YXRlMB0GA1UdDgQWBBRRkraj19bsj/wWxdQPh8zqXHwXgTCB7QYDVR0jBIHlMIHi +gBQ2GXCWqVz+o4IPeZUxUitKQb2By6GBvqSBuzCBuDELMAkGA1UEBhMCVVMxCzAJ +BgNVBAgTAkNBMRMwEQYDVQQHEwpDeWJlcnNwYWNlMScwJQYDVQQKEx5MRUFQIEVu +Y3J5cHRpb24gQWNjZXNzIFByb2plY3QxEzARBgNVBAsTCmN5YmVyc3BhY2UxFjAU +BgNVBAMTDXRlc3RzLWxlYXAuc2UxEzARBgNVBCkTCnRlc3RzLWxlYXAxHDAaBgkq +hkiG9w0BCQEWDXRlc3RzQGxlYXAuc2WCCQCLv0FjHhBq7DATBgNVHSUEDDAKBggr +BgEFBQcDATALBgNVHQ8EBAMCBaAwDQYJKoZIhvcNAQEFBQADggEBAIjZNeDZ+v1r +V+JN9u+Rb1amKxoe7I+wGOPsyskeeAcdD8/+CSGEJcQn6iLXSFNz7XgPQlzH9zgE +hN9nmf11b+jcO5Gr+scy5f07zt58at85Rh5GOk3h4WDzv6qyC13u8gzugn+1AnUE +R9UryOBtahBKyuDDT+7/FXE39U2VOP6j2oRGkATCYYagf+J9YkZt9viQUYjD8ozK +s4lAn2uLM2Xh/Q+L12qT3N6+hQfH0R2123BUn5XY+xH3p+aQupsoDj1HemNtYET2 +lqq2orwK5SXIonSRVJW74gkBVnNuVuhv1qXYGJbBgu8snuJMlLwAcV4WSWvklHrR +DC70GbIqwrg= +-----END CERTIFICATE----- diff --git a/src/leap/common/tests/test_certs.py b/src/leap/common/tests/test_certs.py new file mode 100644 index 0000000..999071f --- /dev/null +++ b/src/leap/common/tests/test_certs.py @@ -0,0 +1,97 @@ +# -*- coding: utf-8 -*- +# test_certs.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +""" +Tests for: + * leap/common/certs.py +""" +import os +import time + +try: + import unittest2 as unittest +except ImportError: + import unittest + +from leap.common import certs +from leap.common.testing.basetest import BaseLeapTest + +TEST_CERT_PEM = os.path.join( + os.path.split(__file__)[0], + '..', 'testing', "leaptest_combined_keycert.pem") + +# Values from the test cert file: +# Not Before: Sep 3 17:52:16 2013 GMT +# Not After : Sep 1 17:52:16 2023 GMT +CERT_NOT_BEFORE = (2013, 9, 3, 17, 52, 16, 1, 246, 0) +CERT_NOT_AFTER = (2023, 9, 1, 17, 52, 16, 4, 244, 0) + + +class CertsTest(BaseLeapTest): + + def setUp(self): + pass + + def tearDown(self): + pass + + def test_should_redownload_if_no_cert(self): + self.assertTrue(certs.should_redownload(certfile="")) + + def test_should_redownload_if_invalid_pem(self): + cert_path = self.get_tempfile('test_pem_file.pem') + + with open(cert_path, 'w') as f: + f.write('this is some invalid data for the pem file') + + self.assertTrue(certs.should_redownload(cert_path)) + + def test_should_redownload_if_before(self): + new_now = lambda: time.struct_time(CERT_NOT_BEFORE) + self.assertTrue(certs.should_redownload(TEST_CERT_PEM, now=new_now)) + + def test_should_redownload_if_after(self): + new_now = lambda: time.struct_time(CERT_NOT_AFTER) + self.assertTrue(certs.should_redownload(TEST_CERT_PEM, now=new_now)) + + def test_not_should_redownload(self): + self.assertFalse(certs.should_redownload(TEST_CERT_PEM)) + + def test_is_valid_pemfile(self): + with open(TEST_CERT_PEM) as f: + cert_data = f.read() + + self.assertTrue(certs.is_valid_pemfile(cert_data)) + + def test_not_is_valid_pemfile(self): + cert_data = 'this is some invalid data for the pem file' + + self.assertFalse(certs.is_valid_pemfile(cert_data)) + + def test_get_cert_time_boundaries(self): + """ + This test ensures us that the returned values are returned in UTC/GMT. + """ + with open(TEST_CERT_PEM) as f: + cert_data = f.read() + + valid_from, valid_to = certs.get_cert_time_boundaries(cert_data) + self.assertEqual(tuple(valid_from), CERT_NOT_BEFORE) + self.assertEqual(tuple(valid_to), CERT_NOT_AFTER) + + +if __name__ == "__main__": + unittest.main() -- cgit v1.2.3 From 71c750ef9c3e53ef416d1de6e85458f16ca48d74 Mon Sep 17 00:00:00 2001 From: Ruben Pollan Date: Tue, 26 May 2015 22:06:20 +0200 Subject: [refactor] move http twisted code from soledad Implements an HTTP client the twisted way, with a focus on pinning the SSL certs. * Related: #6506 --- src/leap/common/http.py | 162 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 162 insertions(+) create mode 100644 src/leap/common/http.py (limited to 'src/leap') diff --git a/src/leap/common/http.py b/src/leap/common/http.py new file mode 100644 index 0000000..39f01ba --- /dev/null +++ b/src/leap/common/http.py @@ -0,0 +1,162 @@ +# -*- coding: utf-8 -*- +# http.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +""" +Twisted HTTP/HTTPS client. +""" + +import os + +from zope.interface import implements + +from OpenSSL.crypto import load_certificate +from OpenSSL.crypto import FILETYPE_PEM + +from twisted.internet import reactor +from twisted.internet.ssl import ClientContextFactory +from twisted.internet.ssl import CertificateOptions +from twisted.internet.defer import succeed + +from twisted.web.client import Agent +from twisted.web.client import HTTPConnectionPool +from twisted.web.client import readBody +from twisted.web.client import BrowserLikePolicyForHTTPS +from twisted.web.http_headers import Headers +from twisted.web.iweb import IBodyProducer + + +class HTTPClient(object): + """ + HTTP client done the twisted way, with a main focus on pinning the SSL + certificate. + """ + + def __init__(self, cert_file=None): + """ + Init the HTTP client + + :param cert_file: The path to the certificate file, if None given the + system's CAs will be used. + :type cert_file: str + """ + self._pool = HTTPConnectionPool(reactor, persistent=True) + self._pool.maxPersistentPerHost = 10 + + if cert_file: + cert = self._load_cert(cert_file) + self._agent = Agent( + reactor, + HTTPClient.ClientContextFactory(cert), + pool=self._pool) + else: + # trust the system's CAs + self._agent = Agent( + reactor, + BrowserLikePolicyForHTTPS(), + pool=self._pool) + + def _load_cert(self, cert_file): + """ + Load a X509 certificate from a file. + + :param cert_file: The path to the certificate file. + :type cert_file: str + + :return: The X509 certificate. + :rtype: OpenSSL.crypto.X509 + """ + if os.path.exists(cert_file): + with open(cert_file) as f: + data = f.read() + return load_certificate(FILETYPE_PEM, data) + + def request(self, url, method='GET', body=None, headers={}): + """ + Perform an HTTP request. + + :param url: The URL for the request. + :type url: str + :param method: The HTTP method of the request. + :type method: str + :param body: The body of the request, if any. + :type body: str + :param headers: The headers of the request. + :type headers: dict + + :return: A deferred that fires with the body of the request. + :rtype: twisted.internet.defer.Deferred + """ + if body: + body = HTTPClient.StringBodyProducer(body) + d = self._agent.request( + method, url, headers=Headers(headers), bodyProducer=body) + d.addCallback(readBody) + return d + + class ClientContextFactory(ClientContextFactory): + """ + A context factory that will verify the server's certificate against a + given CA certificate. + """ + + def __init__(self, cacert): + """ + Initialize the context factory. + + :param cacert: The CA certificate. + :type cacert: OpenSSL.crypto.X509 + """ + self._cacert = cacert + + def getContext(self, hostname, port): + opts = CertificateOptions(verify=True, caCerts=[self._cacert]) + return opts.getContext() + + class StringBodyProducer(object): + """ + A producer that writes the body of a request to a consumer. + """ + + implements(IBodyProducer) + + def __init__(self, body): + """ + Initialize the string produer. + + :param body: The body of the request. + :type body: str + """ + self.body = body + self.length = len(body) + + def startProducing(self, consumer): + """ + Write the body to the consumer. + + :param consumer: Any IConsumer provider. + :type consumer: twisted.internet.interfaces.IConsumer + + :return: A successful deferred. + :rtype: twisted.internet.defer.Deferred + """ + consumer.write(self.body) + return succeed(None) + + def pauseProducing(self): + pass + + def stopProducing(self): + pass -- cgit v1.2.3 From 514c1434a016b09d93e8dfc5578b14825d14005a Mon Sep 17 00:00:00 2001 From: drebs Date: Wed, 4 Feb 2015 15:04:10 -0200 Subject: [feat] refactor events to use ZMQ Before this commit, protobuf and protobuf.socketrpc were used to serialize and transmit messages between events clients. This change implements a simpler ZMQ client/server events mechanism that uses ZMQ sockets for transmitting messages from clients to server and to redistribute such messages to subscribed clients. Closes: #6359 --- src/leap/common/events/Makefile | 31 -- src/leap/common/events/README.rst | 117 +++-- src/leap/common/events/__init__.py | 315 +++++++------- src/leap/common/events/catalog.py | 88 ++++ src/leap/common/events/client.py | 712 ++++++++++++++++++++----------- src/leap/common/events/daemon.py | 208 --------- src/leap/common/events/errors.py | 23 + src/leap/common/events/events.proto | 145 ------- src/leap/common/events/events_pb2.py | 635 --------------------------- src/leap/common/events/mac_auth.py | 31 -- src/leap/common/events/server.py | 242 +++-------- src/leap/common/events/txclient.py | 185 ++++++++ src/leap/common/events/zmq_components.py | 179 ++++++++ src/leap/common/tests/test_events.py | 468 +++++--------------- src/leap/common/zmq_utils.py | 105 +++++ 15 files changed, 1438 insertions(+), 2046 deletions(-) delete mode 100644 src/leap/common/events/Makefile create mode 100644 src/leap/common/events/catalog.py delete mode 100644 src/leap/common/events/daemon.py create mode 100644 src/leap/common/events/errors.py delete mode 100644 src/leap/common/events/events.proto delete mode 100644 src/leap/common/events/events_pb2.py delete mode 100644 src/leap/common/events/mac_auth.py create mode 100644 src/leap/common/events/txclient.py create mode 100644 src/leap/common/events/zmq_components.py create mode 100644 src/leap/common/zmq_utils.py (limited to 'src/leap') diff --git a/src/leap/common/events/Makefile b/src/leap/common/events/Makefile deleted file mode 100644 index 5b7e60d..0000000 --- a/src/leap/common/events/Makefile +++ /dev/null @@ -1,31 +0,0 @@ -# -*- coding: utf-8 -*- -# Makefile -# Copyright (C) 2013 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . - -# This file is used to generate protobuf python files that are used for IPC: -# -# https://developers.google.com/protocol-buffers/docs/pythontutorial - -PROTOC = protoc - -all: events_pb2.py - -%_pb2.py: %.proto - $(PROTOC) --python_out=./ $< -# autopep8 --in-place --aggressive $@ - -clean: - rm -f *_pb2.py diff --git a/src/leap/common/events/README.rst b/src/leap/common/events/README.rst index 2e7f254..f455cc8 100644 --- a/src/leap/common/events/README.rst +++ b/src/leap/common/events/README.rst @@ -1,19 +1,83 @@ Events mechanism ================ -The events mechanism allows for clients to send signal events to each -other by means of a centralized server. Clients can register with the -server to receive signals of certain types, and they can also send signals to -the server that will then redistribute these signals to registered clients. +The events mechanism allows for clients to send events to each other by means +of a centralized server. Clients can register with the server to receive +events of certain types, and they can also send events to the server that will +then redistribute these events to registered clients. -Listening daemons ------------------ +ZMQ connections and events redistribution +----------------------------------------- -Both clients and the server listen for incoming messages by using a -listening daemon that runs in its own thread. The server daemon has to be -started explicitly, while clients daemon will be started whenever a -client registers with the server to receive messages. +Clients and server use ZMQ connection patterns to communicate. Clients can +push events to the server, and may subscribe to events published by the +server. The server in turn pulls events from clients and publishes them to +subscribed clients. + +Clients connect to the server's zmq pub socket, and register to specific +events indicating which callbacks should be executed when that event is +received: + + + EventsServer + .------------. + |PULL PUB| + '------------' + ^^ + || + reg(1, cbk1) |'--------------. reg(2, cbk2) + | | + | | + .------------. .------------. .------------. + |PUSH SUB| |PUSH SUB| |PUSH SUB| + '------------' '------------' '------------' + EventsClient EventsClient EventsClient + + +A client that wants to send an event connects to the server's zmq pull socket +and pushes the event to the server. The server then redistributes it to all +clients subscribed to that event. + + + EventsServer + .------------. + |PULL---->PUB| + '------------' + ^ |. + | |. +sig(1, 'foo') .----------------' |'............... + | | . + | v . + .------------. .------------. .------------. + |PUSH SUB| |PUSH SUB| |PUSH SUB| + '------------' '------------' '------------' + EventsClient EventsClient EventsClient + | + v + cbk1(1, 'foo') + + +Any client may emit or subscribe to an event. ZMQ will manage sockets and +reuse the connection whenever it can. + + + EventsServer + .------------. + |PULL---->PUB| + '------------' + ^ .| + | .| +sig(2, 'bar') .-----------------' .'--------------. + | . | + | . v + .------------. .------------. .------------. + |PUSH SUB| |PUSH SUB| |PUSH SUB| + '------------' '------------' '------------' + EventsClient EventsClient EventsClient + | + v + cbk2(2, 'bar') How to use it @@ -22,32 +86,27 @@ How to use it To start the events server: >>> from leap.common.events import server ->>> server.ensure_server(port=8090) +>>> server.ensure_server( + emit_addr="tcp://127.0.0.1:9000", + reg_addr="tcp://127.0.0.1:9001") -To register a callback to be called when a given signal is raised: +To register a callback to be called when a given event is raised: ->>> from leap.common.events import ( ->>> register, ->>> events_pb2 as proto, ->>> ) +>>> from leap.common.events import register +>>> from leap.common.events import catalog >>> ->>> def mycallback(sigreq): ->>> print str(sigreq) +>>> def mycbk(event, *content): +>>> print "%s, %s" (str(event), str(content)) >>> ->>> events.register(signal=proto.CLIENT_UID, callback=mycallback) +>>> register(catalog.CLIENT_UID, callback=mycbk) -To signal an event: +To emit an event: ->>> from leap.common.events import ( ->>> signal, ->>> events_pb2 as proto, ->>> ) ->>> signal(proto.CLIENT_UID) +>>> from leap.common.events import emit +>>> from leap.common.events import catalog +>>> emit(catalog.CLIENT_UID) Adding events ------------- -* Add the new event under enum ``Event`` in ``events.proto`` -* Compile the new protocolbuffers file:: - - make +To add a new event, just add it to ``catalog.py``. diff --git a/src/leap/common/events/__init__.py b/src/leap/common/events/__init__.py index 0cc6573..9269b9a 100644 --- a/src/leap/common/events/__init__.py +++ b/src/leap/common/events/__init__.py @@ -15,188 +15,199 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . + """ -This is an events mechanism that uses a server to allow for signaling of -events between clients. +This is an events mechanism that uses a server to allow for emitting events +between clients. Application components should use the interface available in this file to -register callbacks to be executed upon receiving specific signals, and to send -signals to other components. +register callbacks to be executed upon receiving specific events, and to send +events to other components. -To register a callback to be executed when a specific event is signaled, use +To register a callback to be executed when a specific event is emitted, use leap.common.events.register(): >>> from leap.common.events import register ->>> from leap.common.events import events_pb2 as proto ->>> register(proto.CLIENT_UID, lambda req: do_something(req)) - -To signal an event, use leap.common.events.signal(): - ->>> from leap.common.events import signal ->>> from leap.common.events import events_pb2 as proto ->>> signal(proto.CLIENT_UID) - - -NOTE ABOUT SYNC/ASYNC REQUESTS: - -Clients always communicate with the server, and never between themselves. -When a client registers a callback for an event, the callback is saved locally -in the client and the server stores the client socket port in a list -associated with that event. When a client signals an event, the server -forwards the signal to all registered client ports, and then each client -executes its callbacks associated with that event locally. - -Each RPC call from a client to the server is followed by a response from the -server to the client. Calls to register() and signal() (and all other RPC -calls) can be synchronous or asynchronous meaning if they will or not wait for -the server's response before returning. - -This mechanism was built on top of protobuf.socketrpc, and because of this RPC -calls are made synchronous or asynchronous in the following way: - - * If RPC calls receive a parameter called `reqcbk`, then the call is made - asynchronous. That means that: +>>> from leap.common.events import catalog +>>> register(catalog.CLIENT_UID, lambda sig, content: do_something(content)) - - an eventual `timeout` parameter is not used, - - the call returns immediatelly with value None, and - - the `reqcbk` callback is executed asynchronously upon the arrival of - a response from the server. +To emit an event, use leap.common.events.emit(): - * Otherwise, if the `reqcbk` parameter is None, then the call is made in a - synchronous manner: - - - if a response from server arrives within `timeout` milliseconds, the - RPC call returns it; - - otherwise, the call returns None. +>>> from leap.common.events import emit +>>> from leap.common.events import catalog +>>> emit(catalog.CLIENT_UID) """ + import logging -import socket +import argparse + +from leap.common.events import client +from leap.common.events import server +from leap.common.events import catalog -from leap.common.events import ( - events_pb2 as proto, - server, - client, - daemon, -) +__all__ = [ + "register", + "unregister", + "emit", + "catalog", +] logger = logging.getLogger(__name__) -def register(signal, callback, uid=None, replace=False, reqcbk=None, - timeout=1000): +def register(event, callback, uid=None, replace=False): """ - Register a callback to be called when the given signal is received. - - Will timeout after timeout ms if response has not been received. The - timeout arg is only used for asynch requests. If a reqcbk callback has - been supplied the timeout arg is not used. The response value will be - returned for a synch request but nothing will be returned for an asynch - request. - - :param signal: the signal that causes the callback to be launched - :type signal: int (see the `events.proto` file) - :param callback: the callback to be called when the signal is received - :type callback: function - :param uid: a unique id for the callback - :type uid: int - :param replace: should an existent callback with same uid be replaced? + Register a callback to be executed when an event is received. + + :param event: The event that triggers the callback. + :type event: str + :param callback: The callback to be executed. + :type callback: callable(event, content) + :param uid: The callback uid. + :type uid: str + :param replace: Wether an eventual callback with same ID should be + replaced. :type replace: bool - :param reqcbk: a callback to be called when a response from server is - received - :type reqcbk: function(leap.common.events.events_pb2.EventResponse) - :param timeout: the timeout for synch calls - :type timeout: int - - :return: the response from server for synch calls or nothing for asynch - calls. - :rtype: leap.common.events.events_pb2.EventsResponse or None - """ - return client.register(signal, callback, uid, replace, reqcbk, timeout) + :return: The callback uid. + :rtype: str -def unregister(signal, uid=None, reqcbk=None, timeout=1000): - """ - Unregister a callback. - - If C{uid} is specified, unregisters only the callback identified by that - unique id. Otherwise, unregisters all callbacks registered for C{signal}. - - :param signal: the signal that causes the callback to be launched - :type signal: int (see the `events.proto` file) - :param uid: a unique id for the callback - :type uid: int - :param reqcbk: a callback to be called when a response from server is - received - :type reqcbk: function(proto.UnregisterRequest, proto.EventResponse) - :param timeout: the timeout for synch calls - :type timeout: int - - :return: the response from server for synch calls or nothing for asynch - calls. - :rtype: leap.common.events.events_pb2.EventsResponse or None + :raises CallbackAlreadyRegistered: when there's already a callback + identified by the given uid and replace is False. """ - return client.unregister(signal, uid, reqcbk, timeout) + return client.register(event, callback, uid, replace) -def signal(signal, content="", mac_method="", mac="", reqcbk=None, - timeout=1000): +def unregister(event, uid=None): """ - Send `signal` event to events server. - - Will timeout after timeout ms if response has not been received. The - timeout arg is only used for asynch requests. If a reqcbk callback has - been supplied the timeout arg is not used. The response value will be - returned for a synch request but nothing will be returned for an asynch - request. - - :param signal: the signal that causes the callback to be launched - :type signal: int (see the `events.proto` file) - :param content: the contents of the event signal - :type content: str - :param mac_method: the method used to auth mac - :type mac_method: str - :param mac: the content of the auth mac - :type mac: str - :param reqcbk: a callback to be called when a response from server is - received - :type reqcbk: function(proto.SignalRequest, proto.EventResponse) - :param timeout: the timeout for synch calls - :type timeout: int - - :return: the response from server for synch calls or nothing for asynch - calls. - :rtype: leap.common.events.events_pb2.EventsResponse or None - """ - return client.signal(signal, content, mac_method, mac, reqcbk, timeout) + Unregister callbacks for an event. -def ping_client(port, reqcbk=None, timeout=1000): - """ - Ping a client running in C{port}. - - :param port: the port in which the client should be listening - :type port: int - :param reqcbk: a callback to be called when a response from client is - received - :type reqcbk: function(proto.PingRequest, proto.EventResponse) - :param timeout: the timeout for synch calls - :type timeout: int + If uid is not None, then only the callback identified by the given uid is + removed. Otherwise, all callbacks for the event are removed. + + :param event: The event that triggers the callback. + :type event: Event + :param uid: The callback uid. + :type uid: str """ - return client.ping(port, reqcbk=reqcbk, timeout=timeout) + return client.unregister(event, uid) -def ping_server(port=server.SERVER_PORT, reqcbk=None, timeout=1000): +def emit(event, *content): """ - Ping the server. - - :param port: the port in which server should be listening - :type port: int - :param reqcbk: a callback to be called when a response from server is - received - :type reqcbk: function(proto.PingRequest, proto.EventResponse) - :param timeout: the timeout for synch calls - :type timeout: int + Send an event. + + :param event: The event to be sent. + :type event: Event + :param content: The content of the event. + :type content: list """ - return server.ping(port, reqcbk=reqcbk, timeout=timeout) + return client.emit(event, *content) + + +if __name__ == "__main__": + + def _echo(event, *content): + print "Received event: (%s, %s)" % (event, content) + + def _parse_args(): + parser = argparse.ArgumentParser() + parser.add_argument( + "--debug", "-d", action="store_true", + help="print debug information") + + subparsers = parser.add_subparsers(dest="command") + + # server options + server_parser = subparsers.add_parser( + "server", help="Run an events server.") + server_parser.add_argument( + "--emit-addr", + help="The address in which to listen for events", + default=server.EMIT_ADDR) + server_parser.add_argument( + "--reg-addr", + help="The address in which to listen for registration for events.", + default=server.REG_ADDR) + + # client options + client_parser = subparsers.add_parser( + "client", help="Run an events client.") + client_parser.add_argument( + "--emit-addr", + help="The address in which to emit events.", + default=server.EMIT_ADDR) + client_parser.add_argument( + "--reg-addr", + help="The address in which to register for events.", + default=server.REG_ADDR) + group = client_parser.add_mutually_exclusive_group(required=True) + group.add_argument('--reg', help="register an event") + group.add_argument('--emit', help="send an event") + client_parser.add_argument( + '--content', help="the content of the event", default=None) + + # txclient options + txclient_parser = subparsers.add_parser( + "txclient", help="Run an events twisted client.") + txclient_parser.add_argument( + "--emit-addr", + help="The address in which to emit events.", + default=server.EMIT_ADDR) + txclient_parser.add_argument( + "--reg-addr", + help="The address in which to register for events.", + default=server.REG_ADDR) + group = txclient_parser.add_mutually_exclusive_group(required=True) + group.add_argument('--reg', help="register an event") + group.add_argument('--emit', help="send an event") + txclient_parser.add_argument( + '--content', help="the content of the event", default=None) + + return parser.parse_args() + + args = _parse_args() + + if args.debug: + logging.basicConfig(level=logging.DEBUG) + + if args.command == "server": + # run server + server.ensure_server(emit_addr=args.emit_addr, reg_addr=args.reg_addr) + from twisted.internet import reactor + reactor.run() + elif args.command == "client": + if args.reg: + event = getattr(catalog, args.reg) + # run client and register to a signal + register(event, _echo) + # make sure we stop on CTRL+C + import signal + signal.signal( + signal.SIGINT, lambda sig, frame: client.shutdown()) + # wait until client thread dies + import time + while client.EventsClientThread.instance().is_alive(): + time.sleep(0.1) + if args.emit: + # run client and emit a signal + event = getattr(catalog, args.emit) + emit(event, args.content) + client.shutdown() + elif args.command == "txclient": + from leap.common.events import txclient + register = txclient.register + emit = txclient.emit + if args.reg: + event = getattr(catalog, args.reg) + # run client and register to a signal + register(event, _echo) + from twisted.internet import reactor + reactor.run() + if args.emit: + # run client and emit a signal + event = getattr(catalog, args.emit) + emit(event, args.content) diff --git a/src/leap/common/events/catalog.py b/src/leap/common/events/catalog.py new file mode 100644 index 0000000..8bddd2c --- /dev/null +++ b/src/leap/common/events/catalog.py @@ -0,0 +1,88 @@ +# -*- coding: utf-8 -*- +# catalog.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful", +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + + +""" +Events catalog. +""" + + +EVENTS = [ + "CLIENT_SESSION_ID", + "CLIENT_UID", + "IMAP_CLIENT_LOGIN", + "IMAP_SERVICE_FAILED_TO_START", + "IMAP_SERVICE_STARTED", + "IMAP_UNHANDLED_ERROR", + "KEYMANAGER_DONE_UPLOADING_KEYS", + "KEYMANAGER_FINISHED_KEY_GENERATION", + "KEYMANAGER_KEY_FOUND", + "KEYMANAGER_KEY_NOT_FOUND", + "KEYMANAGER_LOOKING_FOR_KEY", + "KEYMANAGER_STARTED_KEY_GENERATION", + "MAIL_FETCHED_INCOMING", + "MAIL_MSG_DECRYPTED", + "MAIL_MSG_DELETED_INCOMING", + "MAIL_MSG_PROCESSING", + "MAIL_MSG_SAVED_LOCALLY", + "MAIL_UNREAD_MESSAGES", + "RAISE_WINDOW", + "SMTP_CONNECTION_LOST", + "SMTP_END_ENCRYPT_AND_SIGN", + "SMTP_END_SIGN", + "SMTP_RECIPIENT_ACCEPTED_ENCRYPTED", + "SMTP_RECIPIENT_ACCEPTED_UNENCRYPTED", + "SMTP_RECIPIENT_REJECTED", + "SMTP_SEND_MESSAGE_ERROR", + "SMTP_SEND_MESSAGE_START", + "SMTP_SEND_MESSAGE_SUCCESS", + "SMTP_SERVICE_FAILED_TO_START", + "SMTP_SERVICE_STARTED", + "SMTP_START_ENCRYPT_AND_SIGN", + "SMTP_START_SIGN", + "SOLEDAD_CREATING_KEYS", + "SOLEDAD_DONE_CREATING_KEYS", + "SOLEDAD_DONE_DATA_SYNC", + "SOLEDAD_DONE_DOWNLOADING_KEYS", + "SOLEDAD_DONE_UPLOADING_KEYS", + "SOLEDAD_DOWNLOADING_KEYS", + "SOLEDAD_INVALID_AUTH_TOKEN", + "SOLEDAD_NEW_DATA_TO_SYNC", + "SOLEDAD_SYNC_RECEIVE_STATUS", + "SOLEDAD_SYNC_SEND_STATUS", + "SOLEDAD_UPLOADING_KEYS", + "UPDATER_DONE_UPDATING", + "UPDATER_NEW_UPDATES", +] + + +class Event(object): + + def __init__(self, label): + self.label = label + + def __repr__(self): + return '' % self.label + + def __str__(self): + return self.label + + +# create local variables based on the event list above +lcl = locals() +for event in EVENTS: + lcl[event] = Event(event) diff --git a/src/leap/common/events/client.py b/src/leap/common/events/client.py index 83f18e0..6b234a1 100644 --- a/src/leap/common/events/client.py +++ b/src/leap/common/events/client.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # client.py -# Copyright (C) 2013 LEAP +# Copyright (C) 2013, 2014, 2015 LEAP # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -14,6 +14,8 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see . + + """ The client end point of the events mechanism. @@ -21,309 +23,501 @@ Clients are the communicating parties of the events mechanism. They communicate by sending messages to a server, which in turn redistributes messages to other clients. -When a client registers a callback for a given signal, it also tells the -server that it wants to be notified whenever signals of that type are sent by +When a client registers a callback for a given event, it also tells the +server that it wants to be notified whenever events of that type are sent by some other client. """ -import logging +import logging +import collections +import uuid +import threading +import time +import pickle +import os + +from abc import ABCMeta +from abc import abstractmethod + +import zmq +from zmq.eventloop import zmqstream +from zmq.eventloop import ioloop + +# XXX some distros don't package libsodium, so we have to be prepared for +# absence of zmq.auth +try: + import zmq.auth +except ImportError: + pass -from protobuf.socketrpc import RpcService +from leap.common.config import get_path_prefix +from leap.common.zmq_utils import zmq_has_curve +from leap.common.zmq_utils import maybe_create_and_get_certificates +from leap.common.zmq_utils import PUBLIC_KEYS_PREFIX -from leap.common.events import events_pb2 as proto -from leap.common.events import server -from leap.common.events import daemon -from leap.common.events import mac_auth +from leap.common.events.errors import CallbackAlreadyRegisteredError +from leap.common.events.server import EMIT_ADDR +from leap.common.events.server import REG_ADDR +from leap.common.events import catalog logger = logging.getLogger(__name__) -# the `registered_callbacks` dictionary below should have the following -# format: -# -# { event_signal: [ (uid, callback), ... ], ... } -# -registered_callbacks = {} +_emit_addr = EMIT_ADDR +_reg_addr = REG_ADDR + +def configure_client(emit_addr, reg_addr): + global _emit_addr, _reg_addr + logger.debug("Configuring client with addresses: (%s, %s)" % + (emit_addr, reg_addr)) + _emit_addr = emit_addr + _reg_addr = reg_addr -class CallbackAlreadyRegistered(Exception): + +class EventsClient(object): """ - Raised when trying to register an already registered callback. + A singleton client for the events mechanism. """ - pass + __metaclass__ = ABCMeta -def ensure_client_daemon(): - """ - Ensure the client daemon is running and listening for incoming - messages. + _instance = None + _instance_lock = threading.Lock() - :return: the daemon instance - :rtype: EventsClientDaemon - """ - import time - daemon = EventsClientDaemon.ensure(0) - logger.debug('ensure client daemon') + def __init__(self, emit_addr, reg_addr): + """ + Initialize the events client. + """ + logger.debug("Creating client instance.") + self._callbacks = collections.defaultdict(dict) + self._emit_addr = emit_addr + self._reg_addr = reg_addr + + @property + def callbacks(self): + return self._callbacks - # Because we use a random port we want to wait until a port is assigned to - # local client daemon. + @classmethod + def instance(cls): + """ + Return a singleton EventsClient instance. + """ + with cls._instance_lock: + if cls._instance is None: + cls._instance = cls(_emit_addr, _reg_addr) + return cls._instance - while not (EventsClientDaemon.get_instance() and - EventsClientDaemon.get_instance().get_port()): - time.sleep(0.1) - return daemon + def register(self, event, callback, uid=None, replace=False): + """ + Register a callback to be executed when an event is received. + + :param event: The event that triggers the callback. + :type event: Event + :param callback: The callback to be executed. + :type callback: callable(event, *content) + :param uid: The callback uid. + :type uid: str + :param replace: Wether an eventual callback with same ID should be + replaced. + :type replace: bool + + :return: The callback uid. + :rtype: str + + :raises CallbackAlreadyRegisteredError: when there's already a callback + identified by the given uid and replace is False. + """ + logger.debug("Subscribing to event: %s" % event) + if not uid: + uid = uuid.uuid4() + elif uid in self._callbacks[event] and not replace: + raise CallbackAlreadyRegisteredError() + self._callbacks[event][uid] = callback + self._subscribe(str(event)) + return uid + + def unregister(self, event, uid=None): + """ + Unregister callbacks for an event. + If uid is not None, then only the callback identified by the given uid + is removed. Otherwise, all callbacks for the event are removed. -def register(signal, callback, uid=None, replace=False, reqcbk=None, - timeout=1000): - """ - Registers a callback to be called when a specific signal event is - received. - - Will timeout after timeout ms if response has not been received. The - timeout arg is only used for asynch requests. If a reqcbk callback has - been supplied the timeout arg is not used. The response value will be - returned for a synch request but nothing will be returned for an asynch - request. - - :param signal: the signal that causes the callback to be launched - :type signal: int (see the `events.proto` file) - :param callback: the callback to be called when the signal is received - :type callback: function(leap.common.events.events_pb2.SignalRequest) - :param uid: a unique id for the callback - :type uid: int - :param replace: should an existent callback with same uid be replaced? - :type replace: bool - :param reqcbk: a callback to be called when a response from server is - received - :type reqcbk: function(proto.RegisterRequest, proto.EventResponse) - :param timeout: the timeout for synch calls - :type timeout: int - - Might raise a CallbackAlreadyRegistered exception if there's already a - callback identified by the given uid and replace is False. - - :return: the response from server for synch calls or nothing for asynch - calls. - :rtype: leap.common.events.events_pb2.EventsResponse or None - """ - ensure_client_daemon() # so we can receive registered signals - # register callback locally - if signal not in registered_callbacks: - registered_callbacks[signal] = [] - cbklist = registered_callbacks[signal] - - # TODO should check that the callback has the right - # number of arguments. - - if uid and filter(lambda (x, y): x == uid, cbklist): - if not replace: - raise CallbackAlreadyRegistered() + :param event: The event that triggers the callback. + :type event: Event + :param uid: The callback uid. + :type uid: str + """ + if not uid: + logger.debug( + "Unregistering all callbacks from event %s." % event) + self._callbacks[event] = {} else: - registered_callbacks[signal] = filter(lambda(x, y): x != uid, - cbklist) - registered_callbacks[signal].append((uid, callback)) - # register callback on server - request = proto.RegisterRequest() - request.event = signal - request.port = EventsClientDaemon.get_instance().get_port() - request.mac_method = mac_auth.MacMethod.MAC_NONE - request.mac = "" - service = RpcService(proto.EventsServerService_Stub, - server.SERVER_PORT, 'localhost') - logger.debug( - "Sending registration request to server on port %s: %s", - server.SERVER_PORT, - str(request)[:40]) - return service.register(request, callback=reqcbk, timeout=timeout) - - -def unregister(signal, uid=None, reqcbk=None, timeout=1000): - """ - Unregister a callback. - - If C{uid} is specified, unregisters only the callback identified by that - unique id. Otherwise, unregisters all callbacks - - :param signal: the signal that causes the callback to be launched - :type signal: int (see the `events.proto` file) - :param uid: a unique id for the callback - :type uid: int - :param reqcbk: a callback to be called when a response from server is - received - :type reqcbk: function(proto.UnregisterRequest, proto.EventResponse) - :param timeout: the timeout for synch calls - :type timeout: int - - :return: the response from server for synch calls or nothing for asynch - calls or None if no callback is registered for that signal or - uid. - :rtype: leap.common.events.events_pb2.EventsResponse or None - """ - if signal not in registered_callbacks or not registered_callbacks[signal]: - logger.warning("No callback registered for signal %d." % signal) - return None - # unregister callback locally - cbklist = registered_callbacks[signal] - if uid is not None: - if filter(lambda (cbkuid, _): cbkuid == uid, cbklist) == []: - logger.warning("No callback registered for uid %d." % st) - return None - registered_callbacks[signal] = filter(lambda(x, y): x != uid, cbklist) - else: - # exclude all callbacks for given signal - registered_callbacks[signal] = [] - # unregister port in server if there are no more callbacks for this signal - if not registered_callbacks[signal]: - request = proto.UnregisterRequest() - request.event = signal - request.port = EventsClientDaemon.get_instance().get_port() - request.mac_method = mac_auth.MacMethod.MAC_NONE - request.mac = "" - service = RpcService(proto.EventsServerService_Stub, - server.SERVER_PORT, 'localhost') - logger.info( - "Sending unregistration request to server on port %s: %s", - server.SERVER_PORT, - str(request)[:40]) - return service.unregister(request, callback=reqcbk, timeout=timeout) - - -def signal(signal, content="", mac_method="", mac="", reqcbk=None, - timeout=1000): - """ - Send `signal` event to events server. - - Will timeout after timeout ms if response has not been received. The - timeout arg is only used for asynch requests. If a reqcbk callback has - been supplied the timeout arg is not used. The response value will be - returned for a synch request but nothing will be returned for an asynch - request. - - :param signal: the signal that causes the callback to be launched - :type signal: int (see the `events.proto` file) - :param content: the contents of the event signal - :type content: str - :param mac_method: the method used for auth mac - :type mac_method: str - :param mac: the content of the auth mac - :type mac: str - :param reqcbk: a callback to be called when a response from server is - received - :type reqcbk: function(proto.SignalRequest, proto.EventResponse) - :param timeout: the timeout for synch calls - :type timeout: int - - :return: the response from server for synch calls or nothing for asynch - calls. - :rtype: leap.common.events.events_pb2.EventsResponse or None - """ - request = proto.SignalRequest() - request.event = signal - request.content = content - request.mac_method = mac_method - request.mac = mac - service = RpcService(proto.EventsServerService_Stub, server.SERVER_PORT, - 'localhost') - logger.debug("Sending signal to server: %s", str(request)[:40]) - return service.signal(request, callback=reqcbk, timeout=timeout) - - -def ping(port, reqcbk=None, timeout=1000): + logger.debug( + "Unregistering callback %s from event %s." % (uid, event)) + if uid in self._callbacks[event]: + del self._callbacks[event][uid] + if not self._callbacks[event]: + del self._callbacks[event] + self._unsubscribe(str(event)) + + def emit(self, event, *content): + """ + Send an event. + + :param event: The event to be sent. + :type event: Event + :param content: The content of the event. + :type content: list + """ + logger.debug("Sending event: (%s, %s)" % (event, content)) + self._send(str(event) + b'\0' + pickle.dumps(content)) + + def _handle_event(self, event, content): + """ + Handle an incoming event. + + :param msg: The incoming message. + :type msg: list(str) + """ + logger.debug("Handling event %s..." % event) + for uid in self._callbacks[event]: + callback = self._callbacks[event][uid] + logger.debug("Executing callback %s." % uid) + callback(event, *content) + + @abstractmethod + def _subscribe(self, tag): + """ + Subscribe to a tag on the zmq SUB socket. + + :param tag: The tag to be subscribed. + :type tag: str + """ + pass + + @abstractmethod + def _unsubscribe(self, tag): + """ + Unsubscribe from a tag on the zmq SUB socket. + + :param tag: The tag to be unsubscribed. + :type tag: str + """ + pass + + @abstractmethod + def _send(self, data): + """ + Send data through PUSH socket. + + :param data: The data to be sent. + :type event: str + """ + pass + + def shutdown(self): + self.__class__.reset() + + @classmethod + def reset(cls): + with cls._instance_lock: + cls._instance = None + + +class EventsIOLoop(ioloop.ZMQIOLoop): """ - Ping a client running in C{port}. - - :param port: the port in which the client should be listening - :type port: int - :param reqcbk: a callback to be called when a response from client is - received - :type reqcbk: function(proto.PingRequest, proto.EventResponse) - :param timeout: the timeout for synch calls - :type timeout: int - - :return: the response from client for synch calls or nothing for asynch - calls. - :rtype: leap.common.events.events_pb2.EventsResponse or None + An extension of zmq's ioloop that can wait until there are no callbacks + in the queue before stopping. """ - request = proto.PingRequest() - service = RpcService( - proto.EventsClientService_Stub, - port, - 'localhost') - logger.debug("Pinging a client in port %d..." % port) - return service.ping(request, callback=reqcbk, timeout=timeout) + def stop(self, wait=False): + """ + Stop the I/O loop. -class EventsClientService(proto.EventsClientService): + :param wait: Whether we should wait for callbacks in queue to finish + before stopping. + :type wait: bool + """ + if wait: + # prevent new callbacks from being added + with self._callback_lock: + self._closing = True + # wait until all callbacks have been executed + while self._callbacks: + time.sleep(0.1) + ioloop.ZMQIOLoop.stop(self) + + +class EventsClientThread(threading.Thread, EventsClient): """ - Service for receiving signal events in clients. + A threaded version of the events client. """ - def __init__(self): - proto.EventsClientService.__init__(self) + def __init__(self, emit_addr, reg_addr): + """ + Initialize the events client. + """ + threading.Thread.__init__(self) + EventsClient.__init__(self, emit_addr, reg_addr) + self._lock = threading.Lock() + self._initialized = threading.Event() + self._config_prefix = os.path.join( + get_path_prefix(), "leap", "events") + self._loop = None + self._context = None + self._push = None + self._sub = None + + def _init_zmq(self): + """ + Initialize ZMQ connections. + """ + self._loop = EventsIOLoop() + self._context = zmq.Context() + # connect SUB first, otherwise we might miss some event sent from this + # same client + self._sub = self._zmq_connect_sub() + self._push = self._zmq_connect_push() + + def _zmq_connect(self, socktype, address): + """ + Connect to an address using with a zmq socktype. + + :param socktype: The ZMQ socket type. + :type socktype: int + :param address: The address to connect to. + :type address: str - def signal(self, controller, request, done): + :return: A ZMQ connection stream. + :rtype: ZMQStream """ - Receive a signal and run callbacks registered for that signal. + logger.debug("Connecting %s to %s." % (socktype, address)) + socket = self._context.socket(socktype) + # configure curve authentication + if zmq_has_curve(): + public, private = maybe_create_and_get_certificates( + self._config_prefix, "client") + server_public_file = os.path.join( + self._config_prefix, PUBLIC_KEYS_PREFIX, "server.key") + server_public, _ = zmq.auth.load_certificate(server_public_file) + socket.curve_publickey = public + socket.curve_secretkey = private + socket.curve_serverkey = server_public + stream = zmqstream.ZMQStream(socket, self._loop) + socket.connect(address) + return stream + + def _zmq_connect_push(self): + """ + Initialize the client's PUSH connection. - This method is called whenever a signal request is received from - server. + :return: A ZMQ connection stream. + :rtype: ZMQStream + """ + return self._zmq_connect(zmq.PUSH, self._emit_addr) - :param controller: used to mediate a single method call - :type controller: protobuf.socketrpc.controller.SocketRpcController - :param request: the request received from the client - :type request: leap.common.events.events_pb2.SignalRequest - :param done: callback to be called when done - :type done: protobuf.socketrpc.server.Callback + def _zmq_connect_sub(self): """ - logger.debug('Received signal from server: %s...' % str(request)[:40]) + Initialize the client's SUB connection. - # run registered callbacks - # TODO: verify authentication using mac in incoming message - if request.event in registered_callbacks: - for (_, cbk) in registered_callbacks[request.event]: - # callbacks should be prepared to receive a - # events_pb2.SignalRequest. - cbk(request) + :return: A ZMQ connection stream. + :rtype: ZMQStream + """ + stream = self._zmq_connect(zmq.SUB, self._reg_addr) + stream.on_recv(self._on_recv) + return stream - # send response back to server - response = proto.EventResponse() - response.status = proto.EventResponse.OK - done.run(response) + def _on_recv(self, msg): + """ + Handle an incoming message in the SUB socket. - def ping(self, controller, request, done): + :param msg: The received message. + :type msg: str """ - Reply to a ping request. + ev_str, content_pickle = msg[0].split(b'\0', 1) # undo txzmq tagging + event = getattr(catalog, ev_str) + content = pickle.loads(content_pickle) + self._handle_event(event, content) - :param controller: used to mediate a single method call - :type controller: protobuf.socketrpc.controller.SocketRpcController - :param request: the request received from the client - :type request: leap.common.events.events_pb2.RegisterRequest - :param done: callback to be called when done - :type done: protobuf.socketrpc.server.Callback + def _subscribe(self, tag): """ - logger.debug("Received ping request, sending response.") - response = proto.EventResponse() - response.status = proto.EventResponse.OK - done.run(response) + Subscribe from a tag on the zmq SUB socket. + :param tag: The tag to be subscribed. + :type tag: str + """ + self._sub.socket.setsockopt(zmq.SUBSCRIBE, tag) -class EventsClientDaemon(daemon.EventsSingletonDaemon): - """ - A daemon that listens for incoming events from server. - """ - @classmethod - def ensure(cls, port): + def _unsubscribe(self, tag): + """ + Unsubscribe from a tag on the zmq SUB socket. + + :param tag: The tag to be unsubscribed. + :type tag: str + """ + self._sub.socket.setsockopt(zmq.UNSUBSCRIBE, tag) + + def _send(self, data): + """ + Send data through PUSH socket. + + :param data: The data to be sent. + :type event: str + """ + logger.debug("Sending data: %s" % data) + # add send() as a callback for ioloop so it works between threads + self._loop.add_callback(lambda: self._push.send(data)) + + def register(self, event, callback, uid=None, replace=False): + """ + Register a callback to be executed when an event is received. + + :param event: The event that triggers the callback. + :type event: Event + :param callback: The callback to be executed. + :type callback: callable(event, *content) + :param uid: The callback uid. + :type uid: str + :param replace: Wether an eventual callback with same ID should be + replaced. + :type replace: bool + + :return: The callback uid. + :rtype: str + + :raises CallbackAlreadyRegisteredError: when there's already a + callback identified by the given uid and replace is False. """ - Make sure the daemon is running on the given port. + self.ensure_client() + return EventsClient.register(self, event, callback, uid=uid, replace=replace) - :param port: the port in which the daemon should listen - :type port: int + def unregister(self, event, uid=None): + """ + Unregister callbacks for an event. + + If uid is not None, then only the callback identified by the given uid + is removed. Otherwise, all callbacks for the event are removed. + + :param event: The event that triggers the callback. + :type event: Event + :param uid: The callback uid. + :type uid: str + """ + self.ensure_client() + EventsClient.unregister(self, event, uid=uid) + + def emit(self, event, *content): + """ + Send an event. + + :param event: The event to be sent. + :type event: Event + :param content: The content of the event. + :type content: list + """ + self.ensure_client() + EventsClient.emit(self, event, *content) + + def run(self): + """ + Run the events client. + """ + logger.debug("Starting ioloop.") + self._init_zmq() + self._initialized.set() + self._loop.start() + self._loop.close() + logger.debug("Ioloop finished.") + + def ensure_client(self): + """ + Make sure the events client thread is started. + """ + with self._lock: + if not self.is_alive(): + self.daemon = True + self.start() + self._initialized.wait() - :return: a daemon instance - :rtype: EventsClientDaemon + def shutdown(self): """ - return cls.ensure_service(port, EventsClientService()) + Shutdown the events client thread. + """ + logger.debug("Shutting down client...") + with self._lock: + if self.is_alive(): + self._loop.stop(wait=True) + EventsClient.shutdown(self) + + +def shutdown(): + """ + Shutdown the events client thread. + """ + EventsClientThread.instance().shutdown() + + +def register(event, callback, uid=None, replace=False): + """ + Register a callback to be executed when an event is received. + + :param event: The event that triggers the callback. + :type event: str + :param callback: The callback to be executed. + :type callback: callable(event, content) + :param uid: The callback uid. + :type uid: str + :param replace: Wether an eventual callback with same ID should be + replaced. + :type replace: bool + + :return: The callback uid. + :rtype: str + + :raises CallbackAlreadyRegisteredError: when there's already a callback + identified by the given uid and replace is False. + """ + return EventsClientThread.instance().register( + event, callback, uid=uid, replace=replace) + + +def unregister(event, uid=None): + """ + Unregister callbacks for an event. + + If uid is not None, then only the callback identified by the given uid is + removed. Otherwise, all callbacks for the event are removed. + + :param event: The event that triggers the callback. + :type event: str + :param uid: The callback uid. + :type uid: str + """ + return EventsClientThread.instance().unregister(event, uid=uid) + + +def emit(event, *content): + """ + Send an event. + + :param event: The event to be sent. + :type event: str + :param content: The content of the event. + :type content: list + """ + return EventsClientThread.instance().emit(event, *content) + + +def instance(): + """ + Return an instance of the events client. + + :return: An instance of the events client. + :rtype: EventsClientThread + """ + return EventsClientThread.instance() diff --git a/src/leap/common/events/daemon.py b/src/leap/common/events/daemon.py deleted file mode 100644 index c4a4189..0000000 --- a/src/leap/common/events/daemon.py +++ /dev/null @@ -1,208 +0,0 @@ -# -*- coding: utf-8 -*- -# daemon.py -# Copyright (C) 2013 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . - -""" -A singleton daemon for running RPC services using protobuf.socketrpc. -""" - - -import logging -import threading - - -from protobuf.socketrpc.server import ( - SocketRpcServer, - ThreadedTCPServer, - SocketHandler, -) - - -logger = logging.getLogger(__name__) - - -class ServiceAlreadyRunningException(Exception): - """ - Raised whenever a service is already running in this process but someone - attemped to start it in a different port. - """ - - -class EventsRpcServer(SocketRpcServer): - """ - RPC server used in server and client interfaces to receive messages. - """ - - def __init__(self, port, host='localhost'): - """ - Initialize a RPC server. - - :param port: the port in which to listen for incoming messages - :type port: int - :param host: the address to bind to - :type host: str - """ - SocketRpcServer.__init__(self, port, host) - self._server = None - - def run(self): - """ - Run the server. - """ - logger.info('Running server on port %d.' % self.port) - # parent implementation does not hold the server instance, so we do it - # here. - self._server = ThreadedTCPServer((self.host, self.port), - SocketHandler, self) - # if we chose to use a random port, fetch the port number info. - if self.port is 0: - self.port = self._server.socket.getsockname()[1] - self._server.serve_forever() - - def stop(self): - """ - Stop the server. - """ - self._server.shutdown() - - -class EventsSingletonDaemon(threading.Thread): - """ - Singleton class for for launching and terminating a daemon. - - This class is used so every part of the mechanism that needs to listen for - messages can launch its own daemon (thread) to do the job. - """ - - # Singleton instance - __instance = None - - def __new__(cls, *args, **kwargs): - """ - Return a singleton instance if it exists or create and initialize one. - """ - if len(args) is not 2: - raise TypeError("__init__() takes exactly 2 arguments (%d given)" - % len(args)) - if cls.__instance is None: - cls.__instance = object.__new__( - EventsSingletonDaemon) - cls.__initialize(cls.__instance, args[0], args[1]) - return cls.__instance - - @staticmethod - def __initialize(self, port, service): - """ - Initialize a singleton daemon. - - This is a static method disguised as instance method that actually - does the initialization of the daemon instance. - - :param port: the port in which to listen for incoming messages - :type port: int - :param service: the service to provide in this daemon - :type service: google.protobuf.service.Service - """ - threading.Thread.__init__(self) - self._port = port - self._service = service - self._server = EventsRpcServer(self._port) - self._server.registerService(self._service) - self.daemon = True - - def __init__(self): - """ - Singleton placeholder initialization method. - - Initialization is made in __new__ so we can always return the same - instance upon object creation. - """ - pass - - @classmethod - def ensure(cls, port): - """ - Make sure the daemon instance is running. - - Each implementation of this method should call `self.ensure_service` - with the appropriate service from the `events.proto` definitions, and - return the daemon instance. - - :param port: the port in which the daemon should be listening - :type port: int - - :return: a daemon instance - :rtype: EventsSingletonDaemon - """ - raise NotImplementedError(self.ensure) - - @classmethod - def ensure_service(cls, port, service): - """ - Start the singleton instance if not already running. - - Might return ServiceAlreadyRunningException - - :param port: the port in which the daemon should be listening - :type port: int - - :return: a daemon instance - :rtype: EventsSingletonDaemon - """ - daemon = cls(port, service) - if not daemon.is_alive(): - daemon.start() - elif port and port != cls.__instance._port: - # service is running in this process but someone is trying to - # start it in another port - raise ServiceAlreadyRunningException( - "Service is already running in this process on port %d." - % self.__instance._port) - return daemon - - @classmethod - def get_instance(cls): - """ - Retrieve singleton instance of this daemon. - - :return: a daemon instance - :rtype: EventsSingletonDaemon - """ - return cls.__instance - - def run(self): - """ - Run the server. - """ - self._server.run() - - def stop(self): - """ - Stop the daemon. - """ - self._server.stop() - - def get_port(self): - """ - Retrieve the value of the port to which the service running in this - daemon is binded to. - - :return: the port to which the daemon is binded to - :rtype: int - """ - if self._port is 0: - self._port = self._server.port - return self._port diff --git a/src/leap/common/events/errors.py b/src/leap/common/events/errors.py new file mode 100644 index 0000000..58e0014 --- /dev/null +++ b/src/leap/common/events/errors.py @@ -0,0 +1,23 @@ +# -*- coding: utf-8 -*- +# errors.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + + +class CallbackAlreadyRegisteredError(Exception): + """ + Raised when trying to register an already registered callback. + """ + pass diff --git a/src/leap/common/events/events.proto b/src/leap/common/events/events.proto deleted file mode 100644 index 2371b2a..0000000 --- a/src/leap/common/events/events.proto +++ /dev/null @@ -1,145 +0,0 @@ -// signal.proto -// Copyright (C) 2013 LEAP -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. -// -// You should have received a copy of the GNU General Public License -// along with this program. If not, see . - -package leap.common.events; -option py_generic_services = true; - - -// These are the events that can be signaled using the events mechanism. - -enum Event { - CLIENT_SESSION_ID = 1; - CLIENT_UID = 2; - SOLEDAD_CREATING_KEYS = 3; - SOLEDAD_DONE_CREATING_KEYS = 4; - SOLEDAD_UPLOADING_KEYS = 5; - SOLEDAD_DONE_UPLOADING_KEYS = 6; - SOLEDAD_DOWNLOADING_KEYS = 7; - SOLEDAD_DONE_DOWNLOADING_KEYS = 8; - SOLEDAD_NEW_DATA_TO_SYNC = 9; - SOLEDAD_DONE_DATA_SYNC = 10; - UPDATER_NEW_UPDATES = 11; - UPDATER_DONE_UPDATING = 12; - RAISE_WINDOW = 13; - SMTP_SERVICE_STARTED = 14; - SMTP_SERVICE_FAILED_TO_START = 15; - SMTP_RECIPIENT_ACCEPTED_ENCRYPTED = 16; - SMTP_RECIPIENT_ACCEPTED_UNENCRYPTED = 17; - SMTP_RECIPIENT_REJECTED = 18; - SMTP_START_ENCRYPT_AND_SIGN = 19; - SMTP_END_ENCRYPT_AND_SIGN = 20; - SMTP_START_SIGN = 21; - SMTP_END_SIGN = 22; - SMTP_SEND_MESSAGE_START = 23; - SMTP_SEND_MESSAGE_SUCCESS = 24; - SMTP_SEND_MESSAGE_ERROR = 25; - SMTP_CONNECTION_LOST = 26; - IMAP_SERVICE_STARTED = 30; - IMAP_SERVICE_FAILED_TO_START = 31; - IMAP_CLIENT_LOGIN = 32; - IMAP_FETCHED_INCOMING = 33; - IMAP_MSG_PROCESSING = 34; - IMAP_MSG_DECRYPTED = 35; - IMAP_MSG_SAVED_LOCALLY = 36; - IMAP_MSG_DELETED_INCOMING = 37; - IMAP_UNHANDLED_ERROR = 38; - IMAP_UNREAD_MAIL = 39; - KEYMANAGER_LOOKING_FOR_KEY = 40; - KEYMANAGER_KEY_FOUND = 41; - KEYMANAGER_KEY_NOT_FOUND = 42; - KEYMANAGER_STARTED_KEY_GENERATION = 43; - KEYMANAGER_FINISHED_KEY_GENERATION = 44; - KEYMANAGER_DONE_UPLOADING_KEYS = 45; - SOLEDAD_INVALID_AUTH_TOKEN = 46; - SOLEDAD_SYNC_SEND_STATUS = 47; - SOLEDAD_SYNC_RECEIVE_STATUS = 48; -} - - -// A SignalRequest is the type of the message sent from one component to request -// that a signal be sent to every registered component. - -message SignalRequest { - required Event event = 1; - required string content = 2; - required string mac_method = 3; - required bytes mac = 4; - optional string enc_method = 5; - optional bool error_occurred = 6; -} - - -// A RegisterRequest message tells the server that a component wants to -// be signaled whenever a specific event occurs. - -message RegisterRequest { - required Event event = 1; - required int32 port = 2; - required string mac_method = 3; - required bytes mac = 4; -} - - -// An UnregisterRequest message tells the server that a component does not -// want to be signaled when a specific event occurs. - -message UnregisterRequest { - required Event event = 1; - required int32 port = 2; - required string mac_method = 3; - required bytes mac = 4; -} - - -// A PingRequest message is used to find out if a server or component is -// alive. - -message PingRequest { -} - - -// The EventResponse is the message sent back by server and components after -// they receive other kinds of requests. - -message EventResponse { - - enum Status { - OK = 1; - UNAUTH = 2; - ERROR = 3; - } - - required Status status = 1; - optional string result = 2; -} - - -// The EventsServerService is the service provided by the server. - -service EventsServerService { - rpc ping(PingRequest) returns (EventResponse); - rpc register(RegisterRequest) returns (EventResponse); - rpc unregister(UnregisterRequest) returns (EventResponse); - rpc signal(SignalRequest) returns (EventResponse); -} - - -// EventsComponentService is the service provided by components (clients). - -service EventsClientService { - rpc ping(PingRequest) returns (EventResponse); - rpc signal(SignalRequest) returns (EventResponse); -} diff --git a/src/leap/common/events/events_pb2.py b/src/leap/common/events/events_pb2.py deleted file mode 100644 index 9692ea1..0000000 --- a/src/leap/common/events/events_pb2.py +++ /dev/null @@ -1,635 +0,0 @@ -# Generated by the protocol buffer compiler. DO NOT EDIT! - -from google.protobuf import descriptor -from google.protobuf import message -from google.protobuf import reflection -from google.protobuf import service -from google.protobuf import service_reflection -from google.protobuf import descriptor_pb2 -# @@protoc_insertion_point(imports) - - - -DESCRIPTOR = descriptor.FileDescriptor( - name='events.proto', - package='leap.common.events', - serialized_pb='\n\x0c\x65vents.proto\x12\x12leap.common.events\"\x97\x01\n\rSignalRequest\x12(\n\x05\x65vent\x18\x01 \x02(\x0e\x32\x19.leap.common.events.Event\x12\x0f\n\x07\x63ontent\x18\x02 \x02(\t\x12\x12\n\nmac_method\x18\x03 \x02(\t\x12\x0b\n\x03mac\x18\x04 \x02(\x0c\x12\x12\n\nenc_method\x18\x05 \x01(\t\x12\x16\n\x0e\x65rror_occurred\x18\x06 \x01(\x08\"j\n\x0fRegisterRequest\x12(\n\x05\x65vent\x18\x01 \x02(\x0e\x32\x19.leap.common.events.Event\x12\x0c\n\x04port\x18\x02 \x02(\x05\x12\x12\n\nmac_method\x18\x03 \x02(\t\x12\x0b\n\x03mac\x18\x04 \x02(\x0c\"l\n\x11UnregisterRequest\x12(\n\x05\x65vent\x18\x01 \x02(\x0e\x32\x19.leap.common.events.Event\x12\x0c\n\x04port\x18\x02 \x02(\x05\x12\x12\n\nmac_method\x18\x03 \x02(\t\x12\x0b\n\x03mac\x18\x04 \x02(\x0c\"\r\n\x0bPingRequest\"\x82\x01\n\rEventResponse\x12\x38\n\x06status\x18\x01 \x02(\x0e\x32(.leap.common.events.EventResponse.Status\x12\x0e\n\x06result\x18\x02 \x01(\t\"\'\n\x06Status\x12\x06\n\x02OK\x10\x01\x12\n\n\x06UNAUTH\x10\x02\x12\t\n\x05\x45RROR\x10\x03*\x9f\n\n\x05\x45vent\x12\x15\n\x11\x43LIENT_SESSION_ID\x10\x01\x12\x0e\n\nCLIENT_UID\x10\x02\x12\x19\n\x15SOLEDAD_CREATING_KEYS\x10\x03\x12\x1e\n\x1aSOLEDAD_DONE_CREATING_KEYS\x10\x04\x12\x1a\n\x16SOLEDAD_UPLOADING_KEYS\x10\x05\x12\x1f\n\x1bSOLEDAD_DONE_UPLOADING_KEYS\x10\x06\x12\x1c\n\x18SOLEDAD_DOWNLOADING_KEYS\x10\x07\x12!\n\x1dSOLEDAD_DONE_DOWNLOADING_KEYS\x10\x08\x12\x1c\n\x18SOLEDAD_NEW_DATA_TO_SYNC\x10\t\x12\x1a\n\x16SOLEDAD_DONE_DATA_SYNC\x10\n\x12\x17\n\x13UPDATER_NEW_UPDATES\x10\x0b\x12\x19\n\x15UPDATER_DONE_UPDATING\x10\x0c\x12\x10\n\x0cRAISE_WINDOW\x10\r\x12\x18\n\x14SMTP_SERVICE_STARTED\x10\x0e\x12 \n\x1cSMTP_SERVICE_FAILED_TO_START\x10\x0f\x12%\n!SMTP_RECIPIENT_ACCEPTED_ENCRYPTED\x10\x10\x12\'\n#SMTP_RECIPIENT_ACCEPTED_UNENCRYPTED\x10\x11\x12\x1b\n\x17SMTP_RECIPIENT_REJECTED\x10\x12\x12\x1f\n\x1bSMTP_START_ENCRYPT_AND_SIGN\x10\x13\x12\x1d\n\x19SMTP_END_ENCRYPT_AND_SIGN\x10\x14\x12\x13\n\x0fSMTP_START_SIGN\x10\x15\x12\x11\n\rSMTP_END_SIGN\x10\x16\x12\x1b\n\x17SMTP_SEND_MESSAGE_START\x10\x17\x12\x1d\n\x19SMTP_SEND_MESSAGE_SUCCESS\x10\x18\x12\x1b\n\x17SMTP_SEND_MESSAGE_ERROR\x10\x19\x12\x18\n\x14SMTP_CONNECTION_LOST\x10\x1a\x12\x18\n\x14IMAP_SERVICE_STARTED\x10\x1e\x12 \n\x1cIMAP_SERVICE_FAILED_TO_START\x10\x1f\x12\x15\n\x11IMAP_CLIENT_LOGIN\x10 \x12\x19\n\x15IMAP_FETCHED_INCOMING\x10!\x12\x17\n\x13IMAP_MSG_PROCESSING\x10\"\x12\x16\n\x12IMAP_MSG_DECRYPTED\x10#\x12\x1a\n\x16IMAP_MSG_SAVED_LOCALLY\x10$\x12\x1d\n\x19IMAP_MSG_DELETED_INCOMING\x10%\x12\x18\n\x14IMAP_UNHANDLED_ERROR\x10&\x12\x14\n\x10IMAP_UNREAD_MAIL\x10\'\x12\x1e\n\x1aKEYMANAGER_LOOKING_FOR_KEY\x10(\x12\x18\n\x14KEYMANAGER_KEY_FOUND\x10)\x12\x1c\n\x18KEYMANAGER_KEY_NOT_FOUND\x10*\x12%\n!KEYMANAGER_STARTED_KEY_GENERATION\x10+\x12&\n\"KEYMANAGER_FINISHED_KEY_GENERATION\x10,\x12\"\n\x1eKEYMANAGER_DONE_UPLOADING_KEYS\x10-\x12\x1e\n\x1aSOLEDAD_INVALID_AUTH_TOKEN\x10.\x12\x1c\n\x18SOLEDAD_SYNC_SEND_STATUS\x10/\x12\x1f\n\x1bSOLEDAD_SYNC_RECEIVE_STATUS\x10\x30\x32\xdd\x02\n\x13\x45ventsServerService\x12J\n\x04ping\x12\x1f.leap.common.events.PingRequest\x1a!.leap.common.events.EventResponse\x12R\n\x08register\x12#.leap.common.events.RegisterRequest\x1a!.leap.common.events.EventResponse\x12V\n\nunregister\x12%.leap.common.events.UnregisterRequest\x1a!.leap.common.events.EventResponse\x12N\n\x06signal\x12!.leap.common.events.SignalRequest\x1a!.leap.common.events.EventResponse2\xb1\x01\n\x13\x45ventsClientService\x12J\n\x04ping\x12\x1f.leap.common.events.PingRequest\x1a!.leap.common.events.EventResponse\x12N\n\x06signal\x12!.leap.common.events.SignalRequest\x1a!.leap.common.events.EventResponseB\x03\x90\x01\x01') - -_EVENT = descriptor.EnumDescriptor( - name='Event', - full_name='leap.common.events.Event', - filename=None, - file=DESCRIPTOR, - values=[ - descriptor.EnumValueDescriptor( - name='CLIENT_SESSION_ID', index=0, number=1, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='CLIENT_UID', index=1, number=2, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SOLEDAD_CREATING_KEYS', index=2, number=3, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SOLEDAD_DONE_CREATING_KEYS', index=3, number=4, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SOLEDAD_UPLOADING_KEYS', index=4, number=5, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SOLEDAD_DONE_UPLOADING_KEYS', index=5, number=6, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SOLEDAD_DOWNLOADING_KEYS', index=6, number=7, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SOLEDAD_DONE_DOWNLOADING_KEYS', index=7, number=8, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SOLEDAD_NEW_DATA_TO_SYNC', index=8, number=9, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SOLEDAD_DONE_DATA_SYNC', index=9, number=10, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='UPDATER_NEW_UPDATES', index=10, number=11, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='UPDATER_DONE_UPDATING', index=11, number=12, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='RAISE_WINDOW', index=12, number=13, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SMTP_SERVICE_STARTED', index=13, number=14, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SMTP_SERVICE_FAILED_TO_START', index=14, number=15, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SMTP_RECIPIENT_ACCEPTED_ENCRYPTED', index=15, number=16, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SMTP_RECIPIENT_ACCEPTED_UNENCRYPTED', index=16, number=17, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SMTP_RECIPIENT_REJECTED', index=17, number=18, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SMTP_START_ENCRYPT_AND_SIGN', index=18, number=19, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SMTP_END_ENCRYPT_AND_SIGN', index=19, number=20, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SMTP_START_SIGN', index=20, number=21, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SMTP_END_SIGN', index=21, number=22, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SMTP_SEND_MESSAGE_START', index=22, number=23, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SMTP_SEND_MESSAGE_SUCCESS', index=23, number=24, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SMTP_SEND_MESSAGE_ERROR', index=24, number=25, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SMTP_CONNECTION_LOST', index=25, number=26, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='IMAP_SERVICE_STARTED', index=26, number=30, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='IMAP_SERVICE_FAILED_TO_START', index=27, number=31, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='IMAP_CLIENT_LOGIN', index=28, number=32, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='IMAP_FETCHED_INCOMING', index=29, number=33, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='IMAP_MSG_PROCESSING', index=30, number=34, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='IMAP_MSG_DECRYPTED', index=31, number=35, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='IMAP_MSG_SAVED_LOCALLY', index=32, number=36, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='IMAP_MSG_DELETED_INCOMING', index=33, number=37, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='IMAP_UNHANDLED_ERROR', index=34, number=38, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='IMAP_UNREAD_MAIL', index=35, number=39, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='KEYMANAGER_LOOKING_FOR_KEY', index=36, number=40, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='KEYMANAGER_KEY_FOUND', index=37, number=41, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='KEYMANAGER_KEY_NOT_FOUND', index=38, number=42, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='KEYMANAGER_STARTED_KEY_GENERATION', index=39, number=43, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='KEYMANAGER_FINISHED_KEY_GENERATION', index=40, number=44, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='KEYMANAGER_DONE_UPLOADING_KEYS', index=41, number=45, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SOLEDAD_INVALID_AUTH_TOKEN', index=42, number=46, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SOLEDAD_SYNC_SEND_STATUS', index=43, number=47, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SOLEDAD_SYNC_RECEIVE_STATUS', index=44, number=48, - options=None, - type=None), - ], - containing_type=None, - options=None, - serialized_start=557, - serialized_end=1868, -) - - -CLIENT_SESSION_ID = 1 -CLIENT_UID = 2 -SOLEDAD_CREATING_KEYS = 3 -SOLEDAD_DONE_CREATING_KEYS = 4 -SOLEDAD_UPLOADING_KEYS = 5 -SOLEDAD_DONE_UPLOADING_KEYS = 6 -SOLEDAD_DOWNLOADING_KEYS = 7 -SOLEDAD_DONE_DOWNLOADING_KEYS = 8 -SOLEDAD_NEW_DATA_TO_SYNC = 9 -SOLEDAD_DONE_DATA_SYNC = 10 -UPDATER_NEW_UPDATES = 11 -UPDATER_DONE_UPDATING = 12 -RAISE_WINDOW = 13 -SMTP_SERVICE_STARTED = 14 -SMTP_SERVICE_FAILED_TO_START = 15 -SMTP_RECIPIENT_ACCEPTED_ENCRYPTED = 16 -SMTP_RECIPIENT_ACCEPTED_UNENCRYPTED = 17 -SMTP_RECIPIENT_REJECTED = 18 -SMTP_START_ENCRYPT_AND_SIGN = 19 -SMTP_END_ENCRYPT_AND_SIGN = 20 -SMTP_START_SIGN = 21 -SMTP_END_SIGN = 22 -SMTP_SEND_MESSAGE_START = 23 -SMTP_SEND_MESSAGE_SUCCESS = 24 -SMTP_SEND_MESSAGE_ERROR = 25 -SMTP_CONNECTION_LOST = 26 -IMAP_SERVICE_STARTED = 30 -IMAP_SERVICE_FAILED_TO_START = 31 -IMAP_CLIENT_LOGIN = 32 -IMAP_FETCHED_INCOMING = 33 -IMAP_MSG_PROCESSING = 34 -IMAP_MSG_DECRYPTED = 35 -IMAP_MSG_SAVED_LOCALLY = 36 -IMAP_MSG_DELETED_INCOMING = 37 -IMAP_UNHANDLED_ERROR = 38 -IMAP_UNREAD_MAIL = 39 -KEYMANAGER_LOOKING_FOR_KEY = 40 -KEYMANAGER_KEY_FOUND = 41 -KEYMANAGER_KEY_NOT_FOUND = 42 -KEYMANAGER_STARTED_KEY_GENERATION = 43 -KEYMANAGER_FINISHED_KEY_GENERATION = 44 -KEYMANAGER_DONE_UPLOADING_KEYS = 45 -SOLEDAD_INVALID_AUTH_TOKEN = 46 -SOLEDAD_SYNC_SEND_STATUS = 47 -SOLEDAD_SYNC_RECEIVE_STATUS = 48 - - -_EVENTRESPONSE_STATUS = descriptor.EnumDescriptor( - name='Status', - full_name='leap.common.events.EventResponse.Status', - filename=None, - file=DESCRIPTOR, - values=[ - descriptor.EnumValueDescriptor( - name='OK', index=0, number=1, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='UNAUTH', index=1, number=2, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='ERROR', index=2, number=3, - options=None, - type=None), - ], - containing_type=None, - options=None, - serialized_start=515, - serialized_end=554, -) - - -_SIGNALREQUEST = descriptor.Descriptor( - name='SignalRequest', - full_name='leap.common.events.SignalRequest', - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - descriptor.FieldDescriptor( - name='event', full_name='leap.common.events.SignalRequest.event', index=0, - number=1, type=14, cpp_type=8, label=2, - has_default_value=False, default_value=1, - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - descriptor.FieldDescriptor( - name='content', full_name='leap.common.events.SignalRequest.content', index=1, - number=2, type=9, cpp_type=9, label=2, - has_default_value=False, default_value=unicode("", "utf-8"), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - descriptor.FieldDescriptor( - name='mac_method', full_name='leap.common.events.SignalRequest.mac_method', index=2, - number=3, type=9, cpp_type=9, label=2, - has_default_value=False, default_value=unicode("", "utf-8"), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - descriptor.FieldDescriptor( - name='mac', full_name='leap.common.events.SignalRequest.mac', index=3, - number=4, type=12, cpp_type=9, label=2, - has_default_value=False, default_value="", - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - descriptor.FieldDescriptor( - name='enc_method', full_name='leap.common.events.SignalRequest.enc_method', index=4, - number=5, type=9, cpp_type=9, label=1, - has_default_value=False, default_value=unicode("", "utf-8"), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - descriptor.FieldDescriptor( - name='error_occurred', full_name='leap.common.events.SignalRequest.error_occurred', index=5, - number=6, type=8, cpp_type=7, label=1, - has_default_value=False, default_value=False, - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - ], - options=None, - is_extendable=False, - extension_ranges=[], - serialized_start=37, - serialized_end=188, -) - - -_REGISTERREQUEST = descriptor.Descriptor( - name='RegisterRequest', - full_name='leap.common.events.RegisterRequest', - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - descriptor.FieldDescriptor( - name='event', full_name='leap.common.events.RegisterRequest.event', index=0, - number=1, type=14, cpp_type=8, label=2, - has_default_value=False, default_value=1, - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - descriptor.FieldDescriptor( - name='port', full_name='leap.common.events.RegisterRequest.port', index=1, - number=2, type=5, cpp_type=1, label=2, - has_default_value=False, default_value=0, - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - descriptor.FieldDescriptor( - name='mac_method', full_name='leap.common.events.RegisterRequest.mac_method', index=2, - number=3, type=9, cpp_type=9, label=2, - has_default_value=False, default_value=unicode("", "utf-8"), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - descriptor.FieldDescriptor( - name='mac', full_name='leap.common.events.RegisterRequest.mac', index=3, - number=4, type=12, cpp_type=9, label=2, - has_default_value=False, default_value="", - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - ], - options=None, - is_extendable=False, - extension_ranges=[], - serialized_start=190, - serialized_end=296, -) - - -_UNREGISTERREQUEST = descriptor.Descriptor( - name='UnregisterRequest', - full_name='leap.common.events.UnregisterRequest', - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - descriptor.FieldDescriptor( - name='event', full_name='leap.common.events.UnregisterRequest.event', index=0, - number=1, type=14, cpp_type=8, label=2, - has_default_value=False, default_value=1, - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - descriptor.FieldDescriptor( - name='port', full_name='leap.common.events.UnregisterRequest.port', index=1, - number=2, type=5, cpp_type=1, label=2, - has_default_value=False, default_value=0, - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - descriptor.FieldDescriptor( - name='mac_method', full_name='leap.common.events.UnregisterRequest.mac_method', index=2, - number=3, type=9, cpp_type=9, label=2, - has_default_value=False, default_value=unicode("", "utf-8"), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - descriptor.FieldDescriptor( - name='mac', full_name='leap.common.events.UnregisterRequest.mac', index=3, - number=4, type=12, cpp_type=9, label=2, - has_default_value=False, default_value="", - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - ], - options=None, - is_extendable=False, - extension_ranges=[], - serialized_start=298, - serialized_end=406, -) - - -_PINGREQUEST = descriptor.Descriptor( - name='PingRequest', - full_name='leap.common.events.PingRequest', - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - ], - options=None, - is_extendable=False, - extension_ranges=[], - serialized_start=408, - serialized_end=421, -) - - -_EVENTRESPONSE = descriptor.Descriptor( - name='EventResponse', - full_name='leap.common.events.EventResponse', - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - descriptor.FieldDescriptor( - name='status', full_name='leap.common.events.EventResponse.status', index=0, - number=1, type=14, cpp_type=8, label=2, - has_default_value=False, default_value=1, - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - descriptor.FieldDescriptor( - name='result', full_name='leap.common.events.EventResponse.result', index=1, - number=2, type=9, cpp_type=9, label=1, - has_default_value=False, default_value=unicode("", "utf-8"), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - _EVENTRESPONSE_STATUS, - ], - options=None, - is_extendable=False, - extension_ranges=[], - serialized_start=424, - serialized_end=554, -) - -_SIGNALREQUEST.fields_by_name['event'].enum_type = _EVENT -_REGISTERREQUEST.fields_by_name['event'].enum_type = _EVENT -_UNREGISTERREQUEST.fields_by_name['event'].enum_type = _EVENT -_EVENTRESPONSE.fields_by_name['status'].enum_type = _EVENTRESPONSE_STATUS -_EVENTRESPONSE_STATUS.containing_type = _EVENTRESPONSE; -DESCRIPTOR.message_types_by_name['SignalRequest'] = _SIGNALREQUEST -DESCRIPTOR.message_types_by_name['RegisterRequest'] = _REGISTERREQUEST -DESCRIPTOR.message_types_by_name['UnregisterRequest'] = _UNREGISTERREQUEST -DESCRIPTOR.message_types_by_name['PingRequest'] = _PINGREQUEST -DESCRIPTOR.message_types_by_name['EventResponse'] = _EVENTRESPONSE - -class SignalRequest(message.Message): - __metaclass__ = reflection.GeneratedProtocolMessageType - DESCRIPTOR = _SIGNALREQUEST - - # @@protoc_insertion_point(class_scope:leap.common.events.SignalRequest) - -class RegisterRequest(message.Message): - __metaclass__ = reflection.GeneratedProtocolMessageType - DESCRIPTOR = _REGISTERREQUEST - - # @@protoc_insertion_point(class_scope:leap.common.events.RegisterRequest) - -class UnregisterRequest(message.Message): - __metaclass__ = reflection.GeneratedProtocolMessageType - DESCRIPTOR = _UNREGISTERREQUEST - - # @@protoc_insertion_point(class_scope:leap.common.events.UnregisterRequest) - -class PingRequest(message.Message): - __metaclass__ = reflection.GeneratedProtocolMessageType - DESCRIPTOR = _PINGREQUEST - - # @@protoc_insertion_point(class_scope:leap.common.events.PingRequest) - -class EventResponse(message.Message): - __metaclass__ = reflection.GeneratedProtocolMessageType - DESCRIPTOR = _EVENTRESPONSE - - # @@protoc_insertion_point(class_scope:leap.common.events.EventResponse) - - -_EVENTSSERVERSERVICE = descriptor.ServiceDescriptor( - name='EventsServerService', - full_name='leap.common.events.EventsServerService', - file=DESCRIPTOR, - index=0, - options=None, - serialized_start=1871, - serialized_end=2220, - methods=[ - descriptor.MethodDescriptor( - name='ping', - full_name='leap.common.events.EventsServerService.ping', - index=0, - containing_service=None, - input_type=_PINGREQUEST, - output_type=_EVENTRESPONSE, - options=None, - ), - descriptor.MethodDescriptor( - name='register', - full_name='leap.common.events.EventsServerService.register', - index=1, - containing_service=None, - input_type=_REGISTERREQUEST, - output_type=_EVENTRESPONSE, - options=None, - ), - descriptor.MethodDescriptor( - name='unregister', - full_name='leap.common.events.EventsServerService.unregister', - index=2, - containing_service=None, - input_type=_UNREGISTERREQUEST, - output_type=_EVENTRESPONSE, - options=None, - ), - descriptor.MethodDescriptor( - name='signal', - full_name='leap.common.events.EventsServerService.signal', - index=3, - containing_service=None, - input_type=_SIGNALREQUEST, - output_type=_EVENTRESPONSE, - options=None, - ), -]) - -class EventsServerService(service.Service): - __metaclass__ = service_reflection.GeneratedServiceType - DESCRIPTOR = _EVENTSSERVERSERVICE -class EventsServerService_Stub(EventsServerService): - __metaclass__ = service_reflection.GeneratedServiceStubType - DESCRIPTOR = _EVENTSSERVERSERVICE - - -_EVENTSCLIENTSERVICE = descriptor.ServiceDescriptor( - name='EventsClientService', - full_name='leap.common.events.EventsClientService', - file=DESCRIPTOR, - index=1, - options=None, - serialized_start=2223, - serialized_end=2400, - methods=[ - descriptor.MethodDescriptor( - name='ping', - full_name='leap.common.events.EventsClientService.ping', - index=0, - containing_service=None, - input_type=_PINGREQUEST, - output_type=_EVENTRESPONSE, - options=None, - ), - descriptor.MethodDescriptor( - name='signal', - full_name='leap.common.events.EventsClientService.signal', - index=1, - containing_service=None, - input_type=_SIGNALREQUEST, - output_type=_EVENTRESPONSE, - options=None, - ), -]) - -class EventsClientService(service.Service): - __metaclass__ = service_reflection.GeneratedServiceType - DESCRIPTOR = _EVENTSCLIENTSERVICE -class EventsClientService_Stub(EventsClientService): - __metaclass__ = service_reflection.GeneratedServiceStubType - DESCRIPTOR = _EVENTSCLIENTSERVICE - -# @@protoc_insertion_point(module_scope) diff --git a/src/leap/common/events/mac_auth.py b/src/leap/common/events/mac_auth.py deleted file mode 100644 index 49d48f7..0000000 --- a/src/leap/common/events/mac_auth.py +++ /dev/null @@ -1,31 +0,0 @@ -# -*- coding: utf-8 -*- -# mac_auth.py -# Copyright (C) 2013 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . - -""" -Authentication system for events. - -This is not implemented yet. -""" - - -class MacMethod(object): - """ - Representation of possible MAC authentication methods. - """ - - MAC_NONE = 'none' - MAC_HMAC = 'hmac' diff --git a/src/leap/common/events/server.py b/src/leap/common/events/server.py index 41aede3..a69202e 100644 --- a/src/leap/common/events/server.py +++ b/src/leap/common/events/server.py @@ -14,223 +14,79 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see . -""" -A server for the events mechanism. - -A server can receive different kinds of requests from clients: - 1. Registration request: store client port number to be notified when - a specific signal arrives. - 2. Signal request: redistribute the signal to registered clients. """ -import logging -import socket +The server for the events mechanism. +""" -from protobuf.socketrpc import RpcService -from leap.common.events import ( - events_pb2 as proto, - daemon, -) +import logging +import txzmq +from leap.common.zmq_utils import zmq_has_curve -logger = logging.getLogger(__name__) +from leap.common.events.zmq_components import TxZmqServerComponent -SERVER_PORT = 8090 +if zmq_has_curve(): + EMIT_ADDR = "tcp://127.0.0.1:9000" + REG_ADDR = "tcp://127.0.0.1:9001" +else: + EMIT_ADDR = "ipc:///tmp/leap.common.events.socket.0" + REG_ADDR = "ipc:///tmp/leap.common.events.socket.1" -# the `registered_clients` dictionary below should have the following -# format: -# -# { event_signal: [ port, ... ], ... } -# -registered_clients = {} - -class PortAlreadyTaken(Exception): - """ - Raised when trying to open a server in a port that is already taken. - """ - pass +logger = logging.getLogger(__name__) -def ensure_server(port=SERVER_PORT): +def ensure_server(emit_addr=EMIT_ADDR, reg_addr=REG_ADDR): """ - Make sure the server is running on the given port. + Make sure the server is running in the given addresses. - Attempt to connect to given local port. Upon success, assume that the - events server has already been started. Upon failure, start events server. + :param emit_addr: The address in which to receive events from clients. + :type emit_addr: str + :param reg_addr: The address to which publish events to clients. + :type reg_addr: str - :param port: the port in which server should be listening - :type port: int - - :return: the daemon instance or nothing - :rtype: EventsServerDaemon or None - - :raise PortAlreadyTaken: Raised if C{port} is already taken by something - that is not an events server. - """ - try: - # check if port is available - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - s.connect(('localhost', port)) - s.close() - # port is taken, check if there's a server running there - response = ping(port=port, timeout=1000) - if response is not None and response.status == proto.EventResponse.OK: - logger.info('A server is already running on port %d.', port) - return - # port is taken, and not by an events server - logger.warning( - 'Port %d is taken by something not an events server.', port) - raise PortAlreadyTaken(port) - except socket.error: - # port is available, run a server - logger.info('Launching server on port %d.', port) - return EventsServerDaemon.ensure(port) - - -def ping(port=SERVER_PORT, reqcbk=None, timeout=1000): + :return: an events server instance + :rtype: EventsServer """ - Ping the server. - - :param port: the port in which server should be listening - :type port: int - :param reqcbk: a callback to be called when a response from server is - received - :type reqcbk: function(proto.PingRequest, proto.EventResponse) - :param timeout: the timeout for synch calls - :type timeout: int - - :return: the response from server for synch calls or nothing for asynch - calls. - :rtype: leap.common.events.events_pb2.EventsResponse or None - """ - request = proto.PingRequest() - service = RpcService( - proto.EventsServerService_Stub, - port, - 'localhost') - logger.debug("Pinging server in port %d..." % port) - return service.ping(request, callback=reqcbk, timeout=timeout) + _server = EventsServer(emit_addr, reg_addr) + return _server -class EventsServerService(proto.EventsServerService): +class EventsServer(TxZmqServerComponent): """ - Service for receiving events in clients. + An events server that listens for events in one address and publishes those + events in another address. """ - def register(self, controller, request, done): - """ - Register a client port to be signaled when specific events come in. - - :param controller: used to mediate a single method call - :type controller: protobuf.socketrpc.controller.SocketRpcController - :param request: the request received from the client - :type request: leap.common.events.events_pb2.RegisterRequest - :param done: callback to be called when done - :type done: protobuf.socketrpc.server.Callback - """ - logger.info("Received registration request: %s..." % str(request)[:40]) - # add client port to signal list - if request.event not in registered_clients: - registered_clients[request.event] = set([]) - registered_clients[request.event].add(request.port) - # send response back to client - - logger.debug('sending response back') - response = proto.EventResponse() - response.status = proto.EventResponse.OK - done.run(response) - - def unregister(self, controller, request, done): - """ - Unregister a client port so it will not be signaled when specific - events come in. - - :param controller: used to mediate a single method call - :type controller: protobuf.socketrpc.controller.SocketRpcController - :param request: the request received from the client - :type request: leap.common.events.events_pb2.RegisterRequest - :param done: callback to be called when done - :type done: protobuf.socketrpc.server.Callback - """ - logger.info( - "Received unregistration request: %s..." % str(request)[:40]) - # remove client port from signal list - response = proto.EventResponse() - if request.event in registered_clients: - try: - registered_clients[request.event].remove(request.port) - response.status = proto.EventResponse.OK - except KeyError: - response.status = proto.EventsResponse.ERROR - response.result = 'Port %d not registered.' % request.port - # send response back to client - logger.debug('sending response back') - done.run(response) - - def signal(self, controller, request, done): - """ - Perform an RPC call to signal all clients registered to receive a - specific signal. - - :param controller: used to mediate a single method call - :type controller: protobuf.socketrpc.controller.SocketRpcController - :param request: the request received from the client - :type request: leap.common.events.events_pb2.SignalRequest - :param done: callback to be called when done - :type done: protobuf.socketrpc.server.Callback - """ - logger.debug('Received signal from client: %s...', str(request)[:40]) - # send signal to all registered clients - # TODO: verify signal auth - if request.event in registered_clients: - for port in registered_clients[request.event]: - - def callback(req, resp): - logger.debug("Signal received by " + str(port)) - - service = RpcService(proto.EventsClientService_Stub, - port, 'localhost') - service.signal(request, callback=callback) - # send response back to client - response = proto.EventResponse() - response.status = proto.EventResponse.OK - done.run(response) - - def ping(self, controller, request, done): + def __init__(self, emit_addr, reg_addr): """ - Reply to a ping request. - - :param controller: used to mediate a single method call - :type controller: protobuf.socketrpc.controller.SocketRpcController - :param request: the request received from the client - :type request: leap.common.events.events_pb2.RegisterRequest - :param done: callback to be called when done - :type done: protobuf.socketrpc.server.Callback - """ - logger.debug("Received ping request, sending response.") - response = proto.EventResponse() - response.status = proto.EventResponse.OK - done.run(response) - + Initialize the events server. -class EventsServerDaemon(daemon.EventsSingletonDaemon): - """ - Singleton class for starting an events server daemon. - """ - - @classmethod - def ensure(cls, port): + :param emit_addr: The address in which to receive events from clients. + :type emit_addr: str + :param reg_addr: The address to which publish events to clients. + :type reg_addr: str """ - Make sure the daemon is running on the given port. - - :param port: the port in which the daemon should listen - :type port: int + TxZmqServerComponent.__init__(self) + # bind PULL and PUB sockets + self._pull, self.pull_port = self._zmq_bind( + txzmq.ZmqPullConnection, emit_addr) + self._pub, self.pub_port = self._zmq_bind( + txzmq.ZmqPubConnection, reg_addr) + # set a handler for arriving messages + self._pull.onPull = self._onPull + + def _onPull(self, message): + """ + Callback executed when a message is pulled from a client. - :return: a daemon instance - :rtype: EventsServerDaemon + :param message: The message sent by the client. + :type message: str """ - return cls.ensure_service(port, EventsServerService()) + event, content = message[0].split(b"\0", 1) + logger.debug("Publishing event: %s" % event) + self._pub.publish(content, tag=event) diff --git a/src/leap/common/events/txclient.py b/src/leap/common/events/txclient.py new file mode 100644 index 0000000..8206ed5 --- /dev/null +++ b/src/leap/common/events/txclient.py @@ -0,0 +1,185 @@ +# -*- coding: utf-8 -*- +# txclient.py +# Copyright (C) 2013, 2014, 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + + +""" +The client end point of the events mechanism, implemented using txzmq. + +Clients are the communicating parties of the events mechanism. They +communicate by sending messages to a server, which in turn redistributes +messages to other clients. + +When a client registers a callback for a given event, it also tells the +server that it wants to be notified whenever events of that type are sent by +some other client. +""" + + +import logging +import pickle + +import txzmq + +from leap.common.events.zmq_components import TxZmqClientComponent +from leap.common.events.client import EventsClient +from leap.common.events.client import configure_client +from leap.common.events.server import EMIT_ADDR +from leap.common.events.server import REG_ADDR +from leap.common.events import catalog + + +logger = logging.getLogger(__name__) + + +__all__ = [ + "configure_client", + "EventsTxClient", + "register", + "unregister", + "emit", + "shutdown", +] + + +class EventsTxClient(TxZmqClientComponent, EventsClient): + """ + A twisted events client that listens for events in one address and + publishes those events to another address. + """ + + def __init__(self, emit_addr=EMIT_ADDR, reg_addr=REG_ADDR, + path_prefix=None): + """ + Initialize the events server. + """ + TxZmqClientComponent.__init__(self, path_prefix=path_prefix) + EventsClient.__init__(self, emit_addr, reg_addr) + # connect SUB first, otherwise we might miss some event sent from this + # same client + self._sub = self._zmq_connect(txzmq.ZmqSubConnection, reg_addr) + self._sub.gotMessage = self._gotMessage + self._push = self._zmq_connect(txzmq.ZmqPushConnection, emit_addr) + + def _gotMessage(self, msg, tag): + """ + Handle an incoming event. + + :param msg: The incoming message. + :type msg: list(str) + """ + event = getattr(catalog, tag) + content = pickle.loads(msg) + self._handle_event(event, content) + + def _subscribe(self, tag): + """ + Subscribe to a tag on the zmq SUB socket. + + :param tag: The tag to be subscribed. + :type tag: str + """ + self._sub.subscribe(tag) + + def _unsubscribe(self, tag): + """ + Unsubscribe from a tag on the zmq SUB socket. + + :param tag: The tag to be unsubscribed. + :type tag: str + """ + self._sub.unsubscribe(tag) + + def _send(self, data): + """ + Send data through PUSH socket. + + :param data: The data to be sent. + :type event: str + """ + self._push.send(data) + + def shutdown(self): + TxZmqClientComponent.shutdown(self) + EventsClient.shutdown(self) + + +def register(event, callback, uid=None, replace=False): + """ + Register a callback to be executed when an event is received. + + :param event: The event that triggers the callback. + :type event: str + :param callback: The callback to be executed. + :type callback: callable(event, content) + :param uid: The callback uid. + :type uid: str + :param replace: Wether an eventual callback with same ID should be + replaced. + :type replace: bool + + :return: The callback uid. + :rtype: str + + :raises CallbackAlreadyRegisteredError: when there's already a callback + identified by the given uid and replace is False. + """ + return EventsTxClient.instance().register( + event, callback, uid=uid, replace=replace) + + +def unregister(event, uid=None): + """ + Unregister callbacks for an event. + + If uid is not None, then only the callback identified by the given uid is + removed. Otherwise, all callbacks for the event are removed. + + :param event: The event that triggers the callback. + :type event: str + :param uid: The callback uid. + :type uid: str + """ + return EventsTxClient.instance().unregister(event, uid=uid) + + +def emit(event, *content): + """ + Send an event. + + :param event: The event to be sent. + :type event: str + :param content: The content of the event. + :type content: list + """ + return EventsTxClient.instance().emit(event, *content) + + +def shutdown(): + """ + Shutdown the events client. + """ + EventsTxClient.instance().shutdown() + + +def instance(): + """ + Return an instance of the events client. + + :return: An instance of the events client. + :rtype: EventsClientThread + """ + return EventsTxClient.instance() diff --git a/src/leap/common/events/zmq_components.py b/src/leap/common/events/zmq_components.py new file mode 100644 index 0000000..4fb95d3 --- /dev/null +++ b/src/leap/common/events/zmq_components.py @@ -0,0 +1,179 @@ +# -*- coding: utf-8 -*- +# zmq.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + + +""" +The server for the events mechanism. +""" + + +import os +import logging +import txzmq +import re + +from abc import ABCMeta + +# XXX some distros don't package libsodium, so we have to be prepared for +# absence of zmq.auth +try: + import zmq.auth + from zmq.auth.thread import ThreadAuthenticator +except ImportError: + pass + +from leap.common.config import get_path_prefix +from leap.common.zmq_utils import zmq_has_curve +from leap.common.zmq_utils import maybe_create_and_get_certificates +from leap.common.zmq_utils import PUBLIC_KEYS_PREFIX + + +logger = logging.getLogger(__name__) + + +ADDRESS_RE = re.compile("(.+)://(.+):([0-9]+)") + + +class TxZmqComponent(object): + """ + A twisted-powered zmq events component. + """ + + __metaclass__ = ABCMeta + + _component_type = None + + def __init__(self, path_prefix=None): + """ + Initialize the txzmq component. + """ + self._factory = txzmq.ZmqFactory() + self._factory.registerForShutdown() + if path_prefix == None: + path_prefix = get_path_prefix() + self._config_prefix = os.path.join(path_prefix, "leap", "events") + self._connections = [] + + @property + def component_type(self): + if not self._component_type: + raise Exception( + "Make sure implementations of TxZmqComponent" + "define a self._component_type!") + return self._component_type + + def _zmq_connect(self, connClass, address): + """ + Connect to an address. + + :param connClass: The connection class to be used. + :type connClass: txzmq.ZmqConnection + :param address: The address to connect to. + :type address: str + + :return: The binded connection. + :rtype: txzmq.ZmqConnection + """ + connection = connClass(self._factory) + # create and configure socket + socket = connection.socket + if zmq_has_curve(): + public, secret = maybe_create_and_get_certificates( + self._config_prefix, self.component_type) + server_public_file = os.path.join( + self._config_prefix, PUBLIC_KEYS_PREFIX, "server.key") + server_public, _ = zmq.auth.load_certificate(server_public_file) + socket.curve_publickey = public + socket.curve_secretkey = secret + socket.curve_serverkey = server_public + socket.connect(address) + logger.debug("Connected %s to %s." % (connClass, address)) + self._connections.append(connection) + return connection + + def _zmq_bind(self, connClass, address): + """ + Bind to an address. + + :param connClass: The connection class to be used. + :type connClass: txzmq.ZmqConnection + :param address: The address to bind to. + :type address: str + + :return: The binded connection and port. + :rtype: (txzmq.ZmqConnection, int) + """ + connection = connClass(self._factory) + socket = connection.socket + if zmq_has_curve(): + public, secret = maybe_create_and_get_certificates( + self._config_prefix, self.component_type) + socket.curve_publickey = public + socket.curve_secretkey = secret + self._start_thread_auth(connection.socket) + # check if port was given + protocol, addr, port = ADDRESS_RE.match(address).groups() + if port == "0": + port = socket.bind_to_random_port("%s://%s" % (protocol, addr)) + else: + socket.bind(address) + port = int(port) + logger.debug("Binded %s to %s://%s:%d." + % (connClass, protocol, addr, port)) + self._connections.append(connection) + return connection, port + + def _start_thread_auth(self, socket): + """ + Start the zmq curve thread authenticator. + + :param socket: The socket in which to configure the authenticator. + :type socket: zmq.Socket + """ + authenticator = ThreadAuthenticator(self._factory.context) + authenticator.start() + # XXX do not hardcode this here. + authenticator.allow('127.0.0.1') + # tell authenticator to use the certificate in a directory + public_keys_dir = os.path.join(self._config_prefix, PUBLIC_KEYS_PREFIX) + authenticator.configure_curve(domain="*", location=public_keys_dir) + socket.curve_server = True # must come before bind + + def shutdown(self): + """ + Shutdown the component. + """ + logger.debug("Shutting down component %s." % str(self)) + for conn in self._connections: + conn.shutdown() + self._factory.shutdown() + + +class TxZmqServerComponent(TxZmqComponent): + """ + A txZMQ server component. + """ + + _component_type = "server" + + +class TxZmqClientComponent(TxZmqComponent): + """ + A txZMQ client component. + """ + + _component_type = "client" diff --git a/src/leap/common/tests/test_events.py b/src/leap/common/tests/test_events.py index 0779b2e..7ef3e1b 100644 --- a/src/leap/common/tests/test_events.py +++ b/src/leap/common/tests/test_events.py @@ -15,414 +15,156 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -import unittest -import sets -import time -import socket -import threading -import random - - -from mock import Mock -from protobuf.socketrpc import RpcService -from leap.common import events -from leap.common.events import ( - server, - client, - mac_auth, -) -from leap.common.events.events_pb2 import ( - EventsServerService, - EventsServerService_Stub, - EventsClientService_Stub, - EventResponse, - SignalRequest, - RegisterRequest, - PingRequest, - SOLEDAD_CREATING_KEYS, - CLIENT_UID, -) +import os +import logging +import time -port = 8090 +from twisted.trial import unittest +from twisted.internet import defer -received = False +from leap.common.events import server +from leap.common.events import client +from leap.common.events import txclient +from leap.common.events import catalog +from leap.common.events.errors import CallbackAlreadyRegisteredError -class EventsTestCase(unittest.TestCase): +if 'DEBUG' in os.environ: + logging.basicConfig(level=logging.DEBUG) - @classmethod - def setUpClass(cls): - server.EventsServerDaemon.ensure(8090) - cls.callbacks = events.client.registered_callbacks - @classmethod - def tearDownClass(cls): - # give some time for requests to be processed. - time.sleep(1) +class EventsGenericClientTestCase(object): def setUp(self): - super(EventsTestCase, self).setUp() + self._server = server.ensure_server( + emit_addr="tcp://127.0.0.1:0", + reg_addr="tcp://127.0.0.1:0") + self._client.configure_client( + emit_addr="tcp://127.0.0.1:%d" % self._server.pull_port, + reg_addr="tcp://127.0.0.1:%d" % self._server.pub_port) def tearDown(self): - #events.client.registered_callbacks = {} - server.registered_clients = {} - super(EventsTestCase, self).tearDown() - - def test_service_singleton(self): - """ - Ensure that there's always just one instance of the server daemon - running. - """ - service1 = server.EventsServerDaemon.ensure(8090) - service2 = server.EventsServerDaemon.ensure(8090) - self.assertEqual(service1, service2, - "Can't get singleton class for service.") + self._client.shutdown() + self._server.shutdown() + # wait a bit for sockets to close properly + time.sleep(0.1) def test_client_register(self): """ Ensure clients can register callbacks. """ - self.assertTrue(1 not in self.callbacks, - 'There should should be no callback for this signal.') - events.register(1, lambda x: True) - self.assertTrue(1 in self.callbacks, - 'Could not register signal in local client.') - events.register(2, lambda x: True) - self.assertTrue(1 in self.callbacks, - 'Could not register signal in local client.') - self.assertTrue(2 in self.callbacks, - 'Could not register signal in local client.') + callbacks = self._client.instance().callbacks + self.assertTrue(len(callbacks) == 0, + 'There should be no callback for this event.') + # register one event + event1 = catalog.CLIENT_UID + cbk1 = lambda event, _: True + uid1 = self._client.register(event1, cbk1) + # assert for correct registration + self.assertTrue(len(callbacks) == 1) + self.assertTrue(callbacks[event1][uid1] == cbk1, + 'Could not register event in local client.') + # register another event + event2 = catalog.CLIENT_SESSION_ID + cbk2 = lambda event, _: True + uid2 = self._client.register(event2, cbk2) + # assert for correct registration + self.assertTrue(len(callbacks) == 2) + self.assertTrue(callbacks[event2][uid2] == cbk2, + 'Could not register event in local client.') def test_register_signal_replace(self): """ Make sure clients can replace already registered callbacks. """ - sig = 3 - cbk = lambda x: True - events.register(sig, cbk, uid=1) - self.assertRaises(Exception, events.register, sig, lambda x: True, - uid=1) - events.register(sig, lambda x: True, uid=1, replace=True) - self.assertTrue(sig in self.callbacks, 'Could not register signal.') - self.assertEqual(1, len(self.callbacks[sig]), - 'Wrong number of registered callbacks.') - - def test_signal_response_status(self): - """ - Ensure there's an appropriate response from server when signaling. - """ - sig = 4 - request = SignalRequest() - request.event = sig - request.content = 'my signal contents' - request.mac_method = mac_auth.MacMethod.MAC_NONE - request.mac = "" - service = RpcService(EventsServerService_Stub, port, 'localhost') - # test synch - response = service.signal(request, timeout=1000) - self.assertEqual(EventResponse.OK, response.status, - 'Wrong response status.') + event = catalog.CLIENT_UID + d = defer.Deferred() + cbk_fail = lambda event, _: d.errback(event) + cbk_succeed = lambda event, _: d.callback(event) + self._client.register(event, cbk_fail, uid=1) + self._client.register(event, cbk_succeed, uid=1, replace=True) + self._client.emit(event, None) + return d - def test_signal_executes_callback(self): - """ - Ensure callback is executed upon receiving signal. + def test_register_signal_replace_fails_when_replace_is_false(self): """ - sig = CLIENT_UID - request = SignalRequest() - request.event = sig - request.content = 'my signal contents' - request.mac_method = mac_auth.MacMethod.MAC_NONE - request.mac = "" - service = RpcService(EventsServerService_Stub, port, 'localhost') - - # register a callback - flag = Mock() - events.register(sig, lambda req: flag(req.event)) - # signal - response = service.signal(request) - self.assertEqual(EventResponse.OK, response.status, - 'Wrong response status.') - time.sleep(1) # wait for signal to arrive - flag.assert_called_once_with(sig) - - def test_events_server_service_register(self): + Make sure clients trying to replace already registered callbacks fail + when replace=False """ - Ensure the server can register clients to be signaled. - """ - sig = 5 - request = RegisterRequest() - request.event = sig - request.port = 8091 - request.mac_method = mac_auth.MacMethod.MAC_NONE - request.mac = "" - service = RpcService(EventsServerService_Stub, port, 'localhost') - complist = server.registered_clients - self.assertEqual({}, complist, - 'There should be no registered_ports when ' - 'server has just been created.') - response = service.register(request, timeout=1000) - self.assertTrue(sig in complist, "Signal not registered succesfully.") - self.assertTrue(8091 in complist[sig], - 'Failed registering client port.') + event = catalog.CLIENT_UID + self._client.register(event, lambda event, _: None, uid=1) + self.assertRaises( + CallbackAlreadyRegisteredError, + self._client.register, + event, lambda event, _: None, uid=1, replace=False) - def test_client_request_register(self): + def test_register_more_than_one_callback_works(self): """ - Ensure clients can register themselves with server. + Make sure clients can replace already registered callbacks. """ - sig = 6 - complist = server.registered_clients - self.assertTrue(sig not in complist, - 'There should be no registered clients for this ' - 'signal.') - events.register(sig, lambda x: True) - time.sleep(0.1) - port = client.EventsClientDaemon.get_instance().get_port() - self.assertTrue(sig in complist, 'Failed registering client.') - self.assertTrue(port in complist[sig], - 'Failed registering client port.') + event = catalog.CLIENT_UID + d1 = defer.Deferred() + cbk1 = lambda event, _: d1.callback(event) + d2 = defer.Deferred() + cbk2 = lambda event, _: d2.callback(event) + self._client.register(event, cbk1) + self._client.register(event, cbk2) + self._client.emit(event, None) + d = defer.gatherResults([d1, d2]) + return d def test_client_receives_signal(self): """ Ensure clients can receive signals. """ - sig = 7 - flag = Mock() - - events.register(sig, lambda req: flag(req.event)) - request = SignalRequest() - request.event = sig - request.content = "" - request.mac_method = mac_auth.MacMethod.MAC_NONE - request.mac = "" - service = RpcService(EventsServerService_Stub, port, 'localhost') - response = service.signal(request, timeout=1000) - self.assertTrue(response is not None, 'Did not receive response.') - time.sleep(0.5) - flag.assert_called_once_with(sig) - - def test_client_send_signal(self): - """ - Ensure clients can send signals. - """ - sig = 8 - response = events.signal(sig) - self.assertTrue(response.status == response.OK, - 'Received wrong response status when signaling.') + event = catalog.CLIENT_UID + d = defer.Deferred() + def cbk(events, _): + d.callback(event) + self._client.register(event, cbk) + self._client.emit(event, None) + return d def test_client_unregister_all(self): """ Test that the client can unregister all events for one signal. """ - sig = CLIENT_UID - complist = server.registered_clients - events.register(sig, lambda x: True) - events.register(sig, lambda x: True) - time.sleep(0.1) - events.unregister(sig) - time.sleep(0.1) - port = client.EventsClientDaemon.get_instance().get_port() - self.assertFalse(bool(complist[sig])) - self.assertTrue(port not in complist[sig]) + event1 = catalog.CLIENT_UID + d = defer.Deferred() + # register more than one callback for the same event + self._client.register(event1, lambda ev, _: d.errback(None)) + self._client.register(event1, lambda ev, _: d.errback(None)) + # unregister and emit the event + self._client.unregister(event1) + self._client.emit(event1, None) + # register and emit another event so the deferred can succeed + event2 = catalog.CLIENT_SESSION_ID + self._client.register(event2, lambda ev, _: d.callback(None)) + self._client.emit(event2, None) + return d def test_client_unregister_by_uid(self): """ Test that the client can unregister an event by uid. """ - sig = CLIENT_UID - complist = server.registered_clients - events.register(sig, lambda x: True, uid='cbkuid') - events.register(sig, lambda x: True, uid='cbkuid2') - time.sleep(0.1) - events.unregister(sig, uid='cbkuid') - time.sleep(0.1) - port = client.EventsClientDaemon.get_instance().get_port() - self.assertTrue(sig in complist) - self.assertTrue(len(complist[sig]) == 1) - self.assertTrue( - client.registered_callbacks[sig].pop()[0] == 'cbkuid2') - self.assertTrue(port in complist[sig]) - - def test_server_replies_ping(self): - """ - Ensure server replies to a ping. - """ - request = PingRequest() - service = RpcService(EventsServerService_Stub, port, 'localhost') - response = service.ping(request, timeout=1000) - self.assertIsNotNone(response) - self.assertEqual(EventResponse.OK, response.status, - 'Wrong response status.') - - def test_client_replies_ping(self): - """ - Ensure clients reply to a ping. - """ - daemon = client.ensure_client_daemon() - port = daemon.get_port() - request = PingRequest() - service = RpcService(EventsClientService_Stub, port, 'localhost') - response = service.ping(request, timeout=1000) - self.assertEqual(EventResponse.OK, response.status, - 'Wrong response status.') - - def test_server_ping(self): - """ - Ensure the function from server module pings correctly. - """ - response = server.ping() - self.assertIsNotNone(response) - self.assertEqual(EventResponse.OK, response.status, - 'Wrong response status.') - - def test_client_ping(self): - """ - Ensure the function from client module pings correctly. - """ - daemon = client.ensure_client_daemon() - response = client.ping(daemon.get_port()) - self.assertIsNotNone(response) - self.assertEqual(EventResponse.OK, response.status, - 'Wrong response status.') - - def test_module_ping_server(self): - """ - Ensure the function from main module pings server correctly. - """ - response = events.ping_server() - self.assertIsNotNone(response) - self.assertEqual(EventResponse.OK, response.status, - 'Wrong response status.') - - def test_module_ping_client(self): - """ - Ensure the function from main module pings clients correctly. - """ - daemon = client.ensure_client_daemon() - response = events.ping_client(daemon.get_port()) - self.assertIsNotNone(response) - self.assertEqual(EventResponse.OK, response.status, - 'Wrong response status.') - - def test_ensure_server_raises_if_port_taken(self): - """ - Verify that server raises an exception if port is already taken. - """ - # get a random free port - while True: - port = random.randint(1024, 65535) - try: - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - s.connect(('localhost', port)) - s.close() - except: - break - - class PortBlocker(threading.Thread): - - def run(self): - conns = 0 - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - s.bind(('localhost', port)) - s.setblocking(1) - s.listen(1) - while conns < 2: # blocks until rece - conns += 1 - s.accept() - s.close() - - # block the port - taker = PortBlocker() - taker.start() - time.sleep(1) # wait for thread to start. - self.assertRaises( - server.PortAlreadyTaken, server.ensure_server, port) - - def test_async_register(self): - """ - Test asynchronous registering of callbacks. - """ - flag = Mock() + event = catalog.CLIENT_UID + d = defer.Deferred() + # register one callback that would fail + uid = self._client.register(event, lambda ev, _: d.errback(None)) + # register one callback that will succeed + self._client.register(event, lambda ev, _: d.callback(None)) + # unregister by uid and emit the event + self._client.unregister(event, uid=uid) + self._client.emit(event, None) + return d - # executed after async register, when response is received from server - def reqcbk(request, response): - flag(request.event, response.status) - # callback registered by application - def callback(request): - pass +class EventsTxClientTestCase(EventsGenericClientTestCase, unittest.TestCase): - # passing a callback as reqcbk param makes the call asynchronous - result = events.register(CLIENT_UID, callback, reqcbk=reqcbk) - self.assertIsNone(result) - events.signal(CLIENT_UID) - time.sleep(1) # wait for signal to arrive from server - flag.assert_called_once_with(CLIENT_UID, EventResponse.OK) + _client = txclient - def test_async_signal(self): - """ - Test asynchronous registering of callbacks. - """ - flag = Mock() - - # executed after async signal, when response is received from server - def reqcbk(request, response): - flag(request.event, response.status) - - # passing a callback as reqcbk param makes the call asynchronous - result = events.signal(CLIENT_UID, reqcbk=reqcbk) - self.assertIsNone(result) - time.sleep(1) # wait for signal to arrive from server - flag.assert_called_once_with(CLIENT_UID, EventResponse.OK) - - def test_async_unregister(self): - """ - Test asynchronous unregistering of callbacks. - """ - flag = Mock() - - # executed after async signal, when response is received from server - def reqcbk(request, response): - flag(request.event, response.status) - - # callback registered by application - def callback(request): - pass - - # passing a callback as reqcbk param makes the call asynchronous - events.register(CLIENT_UID, callback) - result = events.unregister(CLIENT_UID, reqcbk=reqcbk) - self.assertIsNone(result) - time.sleep(1) # wait for signal to arrive from server - flag.assert_called_once_with(CLIENT_UID, EventResponse.OK) - - def test_async_ping_server(self): - """ - Test asynchronous pinging of server. - """ - flag = Mock() - - # executed after async signal, when response is received from server - def reqcbk(request, response): - flag(response.status) - - result = events.ping_server(reqcbk=reqcbk) - self.assertIsNone(result) - time.sleep(1) # wait for response to arrive from server. - flag.assert_called_once_with(EventResponse.OK) - - def test_async_ping_client(self): - """ - Test asynchronous pinging of client. - """ - flag = Mock() - # executed after async signal, when response is received from server - def reqcbk(request, response): - flag(response.status) +class EventsClientTestCase(EventsGenericClientTestCase, unittest.TestCase): - daemon = client.ensure_client_daemon() - result = events.ping_client(daemon.get_port(), reqcbk=reqcbk) - self.assertIsNone(result) - time.sleep(1) # wait for response to arrive from server. - flag.assert_called_once_with(EventResponse.OK) + _client = client diff --git a/src/leap/common/zmq_utils.py b/src/leap/common/zmq_utils.py new file mode 100644 index 0000000..19625b9 --- /dev/null +++ b/src/leap/common/zmq_utils.py @@ -0,0 +1,105 @@ +# -*- coding: utf-8 -*- +# zmq.py +# Copyright (C) 2013, 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +""" +Utilities to handle ZMQ certificates. +""" +import os +import logging +import stat +import shutil + +import zmq + +try: + import zmq.auth +except ImportError: + pass + +from leap.common.files import mkdir_p +from leap.common.check import leap_assert + +logger = logging.getLogger(__name__) + + +KEYS_PREFIX = "zmq_certificates" +PUBLIC_KEYS_PREFIX = os.path.join(KEYS_PREFIX, "public_keys") +PRIVATE_KEYS_PREFIX = os.path.join(KEYS_PREFIX, "private_keys") + + +def zmq_has_curve(): + """ + Return whether the current ZMQ has support for auth and CurveZMQ security. + + :rtype: bool + + Version notes: + `zmq.curve_keypair()` is new in version 14.0, new in version libzmq-4.0. + Requires libzmq (>= 4.0) to have been linked with libsodium. + `zmq.auth` module is new in version 14.1 + `zmq.has()` is new in version 14.1, new in version libzmq-4.1. + """ + zmq_version = zmq.zmq_version_info() + pyzmq_version = zmq.pyzmq_version_info() + + if pyzmq_version >= (14, 1, 0) and zmq_version >= (4, 1): + return zmq.has('curve') + + if pyzmq_version < (14, 1, 0): + return False + + if zmq_version < (4, 0): + # security is new in libzmq 4.0 + return False + + try: + zmq.curve_keypair() + except zmq.error.ZMQError: + # security requires libzmq to be linked against libsodium + return False + + return True + + +def assert_zmq_has_curve(): + leap_assert(zmq_has_curve, "CurveZMQ not supported!") + + +def maybe_create_and_get_certificates(basedir, name): + """ + Generate the needed ZMQ certificates for backend/frontend communication if + needed. + """ + assert_zmq_has_curve() + private_keys_dir = os.path.join(basedir, PRIVATE_KEYS_PREFIX) + private_key = os.path.join( + private_keys_dir, name + ".key_secret") + if not os.path.isfile(private_key): + mkdir_p(private_keys_dir) + zmq.auth.create_certificates(private_keys_dir, name) + # set permissions to: 0700 (U:rwx G:--- O:---) + os.chmod(private_key, stat.S_IRUSR | stat.S_IWUSR) + # move public key to public keys directory + public_keys_dir = os.path.join(basedir, PUBLIC_KEYS_PREFIX) + old_public_key = os.path.join( + private_keys_dir, name + ".key") + new_public_key = os.path.join( + public_keys_dir, name + ".key") + mkdir_p(public_keys_dir) + shutil.move(old_public_key, new_public_key) + return zmq.auth.load_certificate(private_key) + + -- cgit v1.2.3 From aacf74601f35511b91a9ac111729c9c2cbf99674 Mon Sep 17 00:00:00 2001 From: Ruben Pollan Date: Tue, 19 May 2015 00:31:05 +0200 Subject: [feat] add initialization of events to BaseLeapTest This is needed to get the tests working on environments where zmq was not initalizated. The environment variable XDG_CONFIG_HOME is set to get the leap configuration in the temp folder. --- src/leap/common/testing/basetest.py | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) (limited to 'src/leap') diff --git a/src/leap/common/testing/basetest.py b/src/leap/common/testing/basetest.py index efaedc3..3fbcf33 100644 --- a/src/leap/common/testing/basetest.py +++ b/src/leap/common/testing/basetest.py @@ -28,6 +28,8 @@ except ImportError: import unittest from leap.common.check import leap_assert +from leap.common.events import server as events_server +from leap.common.events import client as events_client from leap.common.files import mkdir_p, check_and_fix_urw_only @@ -56,6 +58,9 @@ class BaseLeapTest(unittest.TestCase): """ cls.old_path = os.environ['PATH'] cls.old_home = os.environ['HOME'] + cls.old_xdg_config = None + if "XDG_CONFIG_HOME" in os.environ: + cls.old_xdg_config = os.environ["XDG_CONFIG_HOME"] cls.tempdir = tempfile.mkdtemp(prefix="leap_tests-") cls.home = cls.tempdir bin_tdir = os.path.join( @@ -63,6 +68,17 @@ class BaseLeapTest(unittest.TestCase): 'bin') os.environ["PATH"] = bin_tdir os.environ["HOME"] = cls.tempdir + os.environ["XDG_CONFIG_HOME"] = os.path.join(cls.tempdir, ".config") + cls._init_events() + + @classmethod + def _init_events(cls): + cls._server = events_server.ensure_server( + emit_addr="tcp://127.0.0.1:0", + reg_addr="tcp://127.0.0.1:0") + events_client.configure_client( + emit_addr="tcp://127.0.0.1:%d" % cls._server.pull_port, + reg_addr="tcp://127.0.0.1:%d" % cls._server.pub_port) @classmethod def tearDownEnv(cls): @@ -71,8 +87,13 @@ class BaseLeapTest(unittest.TestCase): - restores the default PATH and HOME variables - removes the temporal folder """ + events_client.shutdown() + cls._server.shutdown() + os.environ["PATH"] = cls.old_path os.environ["HOME"] = cls.old_home + if cls.old_xdg_config is not None: + os.environ["XDG_CONFIG_HOME"] = cls.old_xdg_config # safety check! please do not wipe my home... # XXX needs to adapt to non-linuces leap_assert( -- cgit v1.2.3 From 7826a96e526a450380917f9b89e3714576ca50b7 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 27 May 2015 18:51:44 -0300 Subject: [bug] use dict keys to iterate We ran into a RuntimeError yesterday on this line, the callback can modify this dict and make the iteration raise an error. Using keys method to iterate is safer. See http://stackoverflow.com/questions/11941817/python-runtimeerror-dictionary-changed-size-during-iteration-how-to-avoid-th/11941855#11941855 --- src/leap/common/events/client.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'src/leap') diff --git a/src/leap/common/events/client.py b/src/leap/common/events/client.py index 6b234a1..0706fe3 100644 --- a/src/leap/common/events/client.py +++ b/src/leap/common/events/client.py @@ -184,7 +184,7 @@ class EventsClient(object): :type msg: list(str) """ logger.debug("Handling event %s..." % event) - for uid in self._callbacks[event]: + for uid in self._callbacks[event].keys(): callback = self._callbacks[event][uid] logger.debug("Executing callback %s." % uid) callback(event, *content) @@ -352,7 +352,6 @@ class EventsClientThread(threading.Thread, EventsClient): """ self._sub.socket.setsockopt(zmq.SUBSCRIBE, tag) - def _unsubscribe(self, tag): """ Unsubscribe from a tag on the zmq SUB socket. -- cgit v1.2.3 From c071c69e1b5a0d897674a1f7adc6ff32f19400ff Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 27 May 2015 12:49:44 -0300 Subject: [bug] Use BrowserLikePolicyForHTTPS for checking While testing the way that its implemented now, I found out that no check is being made on certificate attributes against the host. I found this simple way of creating a BrowserLikePolicyForHTTPS using a self signed cert and it worked on my test. I used test_https from Soledad for checking this (which we are fixing on another branch). Also, we don't want to depend on twisted for other things than leap.common.http. --- src/leap/common/certs.py | 17 ++++++++++++ src/leap/common/http.py | 70 +++++++++++++----------------------------------- 2 files changed, 35 insertions(+), 52 deletions(-) (limited to 'src/leap') diff --git a/src/leap/common/certs.py b/src/leap/common/certs.py index db513f6..c8e0743 100644 --- a/src/leap/common/certs.py +++ b/src/leap/common/certs.py @@ -178,3 +178,20 @@ def should_redownload(certfile, now=time.gmtime): return True return False + + +def get_compatible_ssl_context_factory(cert_path=None): + import twisted + cert = None + if twisted.version.base() > '14.0.1': + from twisted.web.client import BrowserLikePolicyForHTTPS + from twisted.internet import ssl + if cert_path: + cert = ssl.Certificate.loadPEM(open(cert_path).read()) + policy = BrowserLikePolicyForHTTPS(cert) + return policy + else: + raise Exception((""" + Twisted 14.0.2 is needed in order to have secure Client Web SSL Contexts, not %s + See: http://twistedmatrix.com/trac/ticket/7647 + """) % (twisted.version.base())) diff --git a/src/leap/common/http.py b/src/leap/common/http.py index 39f01ba..1dc5642 100644 --- a/src/leap/common/http.py +++ b/src/leap/common/http.py @@ -18,22 +18,28 @@ Twisted HTTP/HTTPS client. """ -import os +try: + import twisted +except ImportError: + print "*******" + print "Twisted is needed to use leap.common.http module" + print "" + print "Install the extra requirement of the package:" + print "$ pip install leap.common[Twisted]" + import sys + sys.exit(1) -from zope.interface import implements -from OpenSSL.crypto import load_certificate -from OpenSSL.crypto import FILETYPE_PEM +from leap.common.certs import get_compatible_ssl_context_factory + +from zope.interface import implements from twisted.internet import reactor -from twisted.internet.ssl import ClientContextFactory -from twisted.internet.ssl import CertificateOptions from twisted.internet.defer import succeed from twisted.web.client import Agent from twisted.web.client import HTTPConnectionPool from twisted.web.client import readBody -from twisted.web.client import BrowserLikePolicyForHTTPS from twisted.web.http_headers import Headers from twisted.web.iweb import IBodyProducer @@ -55,33 +61,12 @@ class HTTPClient(object): self._pool = HTTPConnectionPool(reactor, persistent=True) self._pool.maxPersistentPerHost = 10 - if cert_file: - cert = self._load_cert(cert_file) - self._agent = Agent( - reactor, - HTTPClient.ClientContextFactory(cert), - pool=self._pool) - else: - # trust the system's CAs - self._agent = Agent( - reactor, - BrowserLikePolicyForHTTPS(), - pool=self._pool) - - def _load_cert(self, cert_file): - """ - Load a X509 certificate from a file. - - :param cert_file: The path to the certificate file. - :type cert_file: str + policy = get_compatible_ssl_context_factory(cert_file) - :return: The X509 certificate. - :rtype: OpenSSL.crypto.X509 - """ - if os.path.exists(cert_file): - with open(cert_file) as f: - data = f.read() - return load_certificate(FILETYPE_PEM, data) + self._agent = Agent( + reactor, + policy, + pool=self._pool) def request(self, url, method='GET', body=None, headers={}): """ @@ -106,25 +91,6 @@ class HTTPClient(object): d.addCallback(readBody) return d - class ClientContextFactory(ClientContextFactory): - """ - A context factory that will verify the server's certificate against a - given CA certificate. - """ - - def __init__(self, cacert): - """ - Initialize the context factory. - - :param cacert: The CA certificate. - :type cacert: OpenSSL.crypto.X509 - """ - self._cacert = cacert - - def getContext(self, hostname, port): - opts = CertificateOptions(verify=True, caCerts=[self._cacert]) - return opts.getContext() - class StringBodyProducer(object): """ A producer that writes the body of a request to a consumer. -- cgit v1.2.3 From bf18c2bc6e3f533187281a3b31febd37ef22f8c0 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 27 May 2015 17:19:13 -0300 Subject: [feat] Make it optional to have a dedicated pool As @meskio pointed out, some cases could need a dedicated pool with different parameters. This is a suggested implementation where the pool is reused by default, creating a dedicated one just if needed/asked. This way we ensure that resources are under control and special cases are still handled. --- src/leap/common/http.py | 14 ++++++--- src/leap/common/tests/test_http.py | 62 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 72 insertions(+), 4 deletions(-) create mode 100644 src/leap/common/tests/test_http.py (limited to 'src/leap') diff --git a/src/leap/common/http.py b/src/leap/common/http.py index 1dc5642..1e384e5 100644 --- a/src/leap/common/http.py +++ b/src/leap/common/http.py @@ -44,13 +44,21 @@ from twisted.web.http_headers import Headers from twisted.web.iweb import IBodyProducer +def createPool(maxPersistentPerHost=10, persistent=True): + pool = HTTPConnectionPool(reactor, persistent) + pool.maxPersistentPerHost = maxPersistentPerHost + return pool + +_pool = createPool() + + class HTTPClient(object): """ HTTP client done the twisted way, with a main focus on pinning the SSL certificate. """ - def __init__(self, cert_file=None): + def __init__(self, cert_file=None, pool=_pool): """ Init the HTTP client @@ -58,15 +66,13 @@ class HTTPClient(object): system's CAs will be used. :type cert_file: str """ - self._pool = HTTPConnectionPool(reactor, persistent=True) - self._pool.maxPersistentPerHost = 10 policy = get_compatible_ssl_context_factory(cert_file) self._agent = Agent( reactor, policy, - pool=self._pool) + pool=pool) def request(self, url, method='GET', body=None, headers={}): """ diff --git a/src/leap/common/tests/test_http.py b/src/leap/common/tests/test_http.py new file mode 100644 index 0000000..e240ca3 --- /dev/null +++ b/src/leap/common/tests/test_http.py @@ -0,0 +1,62 @@ +# -*- coding: utf-8 -*- +# test_http.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +""" +Tests for: + * leap/common/http.py +""" +import os +try: + import unittest2 as unittest +except ImportError: + import unittest + +from leap.common import http +from leap.common.testing.basetest import BaseLeapTest + +TEST_CERT_PEM = os.path.join( + os.path.split(__file__)[0], + '..', 'testing', "leaptest_combined_keycert.pem") + + +class HTTPClientTest(BaseLeapTest): + + def setUp(self): + pass + + def tearDown(self): + pass + + def test_agents_sharing_pool_by_default(self): + client = http.HTTPClient() + client2 = http.HTTPClient(TEST_CERT_PEM) + self.assertNotEquals(client._agent, client2._agent, "Expected dedicated agents") + self.assertEquals(client._agent._pool, client2._agent._pool, "Pool was not reused by default") + + def test_agent_can_have_dedicated_custom_pool(self): + custom_pool = http.createPool(maxPersistentPerHost=42, persistent=False) + self.assertEquals(custom_pool.maxPersistentPerHost, 42, + "Custom persistent connections limit is not being respected") + self.assertFalse(custom_pool.persistent, + "Custom persistence is not being respected") + default_client = http.HTTPClient() + custom_client = http.HTTPClient(pool=custom_pool) + + self.assertNotEquals(default_client._agent, custom_client._agent, "No agent reuse is expected") + self.assertEquals(custom_pool, custom_client._agent._pool, "Custom pool usage was not respected") + +if __name__ == "__main__": + unittest.main() -- cgit v1.2.3 From 76436726663971ebd58bf2c758b52abb10f7c242 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Tue, 2 Jun 2015 17:27:42 -0400 Subject: [bug] allow ipc socket types previous regex wasn't capturing addresses of type ipc:// Closes: #7089 --- src/leap/common/events/tests/__init__.py | 0 .../common/events/tests/test_zmq_components.py | 51 ++++++++++++++++++++++ src/leap/common/events/zmq_components.py | 24 +++++----- 3 files changed, 65 insertions(+), 10 deletions(-) create mode 100644 src/leap/common/events/tests/__init__.py create mode 100644 src/leap/common/events/tests/test_zmq_components.py (limited to 'src/leap') diff --git a/src/leap/common/events/tests/__init__.py b/src/leap/common/events/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/leap/common/events/tests/test_zmq_components.py b/src/leap/common/events/tests/test_zmq_components.py new file mode 100644 index 0000000..c51e37e --- /dev/null +++ b/src/leap/common/events/tests/test_zmq_components.py @@ -0,0 +1,51 @@ +# -*- coding: utf-8 -*- +# test_zmq_components.py +# Copyright (C) 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +""" +Tests for the zmq_components module. +""" +try: + import unittest2 as unittest +except ImportError: + import unittest + +from leap.common.events import zmq_components + + +class AddrParseTestCase(unittest.TestCase): + + def setUp(self): + pass + + def tearDown(self): + pass + + def test_addr_parsing(self): + addr_re = zmq_components.ADDRESS_RE + + self.assertEqual( + addr_re.search("ipc:///tmp/foo/bar/baaz-2/foo.0").groups(), + ("ipc", "/tmp/foo/bar/baaz-2/foo.0", None)) + self.assertEqual( + addr_re.search("tcp://localhost:9000").groups(), + ("tcp", "localhost", "9000")) + self.assertEqual( + addr_re.search("tcp://127.0.0.1:9000").groups(), + ("tcp", "127.0.0.1", "9000")) + + +if __name__ == "__main__": + unittest.main() diff --git a/src/leap/common/events/zmq_components.py b/src/leap/common/events/zmq_components.py index 4fb95d3..3b88862 100644 --- a/src/leap/common/events/zmq_components.py +++ b/src/leap/common/events/zmq_components.py @@ -45,7 +45,7 @@ from leap.common.zmq_utils import PUBLIC_KEYS_PREFIX logger = logging.getLogger(__name__) -ADDRESS_RE = re.compile("(.+)://(.+):([0-9]+)") +ADDRESS_RE = re.compile("^([a-z]+)://([^:]+):?(\d+)?$") class TxZmqComponent(object): @@ -63,7 +63,7 @@ class TxZmqComponent(object): """ self._factory = txzmq.ZmqFactory() self._factory.registerForShutdown() - if path_prefix == None: + if path_prefix is None: path_prefix = get_path_prefix() self._config_prefix = os.path.join(path_prefix, "leap", "events") self._connections = [] @@ -125,15 +125,19 @@ class TxZmqComponent(object): socket.curve_publickey = public socket.curve_secretkey = secret self._start_thread_auth(connection.socket) - # check if port was given - protocol, addr, port = ADDRESS_RE.match(address).groups() - if port == "0": - port = socket.bind_to_random_port("%s://%s" % (protocol, addr)) + + proto, addr, port = ADDRESS_RE.search(address).groups() + + if port is None: + params = proto, addr + port = socket.bind("%s://%s" % params) + # XXX this log doesn't appear + logger.debug("Binded %s to %s://%s." % ((connClass,) + params)) else: - socket.bind(address) - port = int(port) - logger.debug("Binded %s to %s://%s:%d." - % (connClass, protocol, addr, port)) + params = proto, addr, int(port) + socket.bind("%s://%s:%d" % params) + # XXX this log doesn't appear + logger.debug("Binded %s to %s://%s:%d." % ((connClass,) + params)) self._connections.append(connection) return connection, port -- cgit v1.2.3 From b67648aa666345b0800b48f6c203538b21c9a201 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 8 Jun 2015 15:18:00 -0300 Subject: [bug] Makes request method respect a hard limit Altough we specify maxPersistentPerHost, Twisted won't stop opening connections after that. This limit is used just to keep the size of persistent connections pool under control. Additional connections will be made as non persistent. So, if we ask 10000 requests, it will open 10000 connections immediately and leave 10 open after all finished. For checking this behavior, see getConnection from Twisted source: http://twistedmatrix.com/trac/browser/tags/releases/twisted-15.2.1/twisted/web/client.py#L1203 I tested this by using http_target from soledad without a local database to download all encrypted docs from one account with 1700 of them. The program just hangs and crashes with 1000+ connections and "Too many files open" warnings. With this fix, it was able to download normally, respecting the maxPersistentPerHost as a limiter. :) --- src/leap/common/http.py | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) (limited to 'src/leap') diff --git a/src/leap/common/http.py b/src/leap/common/http.py index 1e384e5..d4a214c 100644 --- a/src/leap/common/http.py +++ b/src/leap/common/http.py @@ -35,6 +35,7 @@ from leap.common.certs import get_compatible_ssl_context_factory from zope.interface import implements from twisted.internet import reactor +from twisted.internet import defer from twisted.internet.defer import succeed from twisted.web.client import Agent @@ -56,6 +57,13 @@ class HTTPClient(object): """ HTTP client done the twisted way, with a main focus on pinning the SSL certificate. + + By default, it uses a shared connection pool. If you want a dedicated + one, create and pass on __init__ pool parameter. + Please note that this client will limit the maximum amount of connections + by using a DeferredSemaphore. + This limit is equal to the maxPersistentPerHost used on pool and is needed + in order to avoid resource abuse on huge requests batches. """ def __init__(self, cert_file=None, pool=_pool): @@ -65,6 +73,9 @@ class HTTPClient(object): :param cert_file: The path to the certificate file, if None given the system's CAs will be used. :type cert_file: str + :param pool: An optional dedicated connection pool to override the + default shared one. + :type pool: HTTPConnectionPool """ policy = get_compatible_ssl_context_factory(cert_file) @@ -73,6 +84,7 @@ class HTTPClient(object): reactor, policy, pool=pool) + self._semaphore = defer.DeferredSemaphore(pool.maxPersistentPerHost) def request(self, url, method='GET', body=None, headers={}): """ @@ -92,8 +104,9 @@ class HTTPClient(object): """ if body: body = HTTPClient.StringBodyProducer(body) - d = self._agent.request( - method, url, headers=Headers(headers), bodyProducer=body) + d = self._semaphore.run(self._agent.request, + method, url, headers=Headers(headers), + bodyProducer=body) d.addCallback(readBody) return d -- cgit v1.2.3 From b33f0ef3485311f87990409ef8ba2bcb8b26dc5d Mon Sep 17 00:00:00 2001 From: drebs Date: Thu, 11 Jun 2015 11:54:10 -0300 Subject: [bug] remove extraneous data from events logs The emission of an event was being logged twice, and the second time was logging the pickled content of the event. This pickled content contained line breaks and other things that caused strange output on the client log. This commit removes the second loggin of the event pickled content. Closes #7130. --- src/leap/common/events/client.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'src/leap') diff --git a/src/leap/common/events/client.py b/src/leap/common/events/client.py index 0706fe3..4852b5a 100644 --- a/src/leap/common/events/client.py +++ b/src/leap/common/events/client.py @@ -173,7 +173,7 @@ class EventsClient(object): :param content: The content of the event. :type content: list """ - logger.debug("Sending event: (%s, %s)" % (event, content)) + logger.debug("Emitting event: (%s, %s)" % (event, content)) self._send(str(event) + b'\0' + pickle.dumps(content)) def _handle_event(self, event, content): @@ -368,7 +368,6 @@ class EventsClientThread(threading.Thread, EventsClient): :param data: The data to be sent. :type event: str """ - logger.debug("Sending data: %s" % data) # add send() as a callback for ioloop so it works between threads self._loop.add_callback(lambda: self._push.send(data)) -- cgit v1.2.3 From e91c6b2daf15d849de5a5fd72436f9f88505250e Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Fri, 5 Jun 2015 13:11:56 -0400 Subject: [feature] utility collect_plugins to be used in post-sync hooks. Related: #6996 Releases: 0.4.1 --- src/leap/common/plugins.py | 75 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 75 insertions(+) create mode 100644 src/leap/common/plugins.py (limited to 'src/leap') diff --git a/src/leap/common/plugins.py b/src/leap/common/plugins.py new file mode 100644 index 0000000..bf0cd48 --- /dev/null +++ b/src/leap/common/plugins.py @@ -0,0 +1,75 @@ +# -*- coding: utf-8 -*- +# plugins.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +""" +Twisted plugins leap utilities. +""" +import os.path + +from twisted.plugin import getPlugins + +from leap.common.config import get_path_prefix + +# A whitelist of modules from where to collect plugins dynamically. +# For the moment restricted to leap namespace, but the idea is that we can pass +# other "trusted" modules as options to the initialization of soledad. + +# TODO discover all the namespace automagically + +PLUGGABLE_LEAP_MODULES = ('mail', 'keymanager') + +_preffix = get_path_prefix() +rc_file = os.path.join(_preffix, "leap", "leap.cfg") + + +def _get_extra_pluggable_modules(): + import ConfigParser + config = ConfigParser.RawConfigParser() + config.read(rc_file) + try: + modules = eval( + config.get('plugins', 'extra_pluggable_modules'), {}, {}) + except (ConfigParser.NoSectionError, ConfigParser.NoOptionError): + modules = [] + return modules + +if os.path.isfile(rc_file): + # TODO in the case of being called from the standalone client, + # we should pass the flag in some other way. + EXTRA_PLUGGABLE_MODULES = _get_extra_pluggable_modules() +else: + EXTRA_PLUGGABLE_MODULES = [] + + +def collect_plugins(interface): + """ + Traverse a whitelist of modules and collect all the plugins that implement + the passed interface. + """ + plugins = [] + for namespace in PLUGGABLE_LEAP_MODULES: + try: + module = __import__('leap.%s.plugins' % namespace, fromlist='.') + plugins = plugins + list(getPlugins(interface, module)) + except ImportError: + pass + for namespace in EXTRA_PLUGGABLE_MODULES: + try: + module = __import__('%s.plugins' % namespace, fromlist='.') + plugins = plugins + list(getPlugins(interface, module)) + except ImportError: + pass + return plugins -- cgit v1.2.3 From c9f07ff0041fac8b5124a15a8053bb9cc9f03bb2 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Wed, 10 Jun 2015 14:52:11 -0400 Subject: [bug] catch missing section header error --- src/leap/common/plugins.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'src/leap') diff --git a/src/leap/common/plugins.py b/src/leap/common/plugins.py index bf0cd48..04152f9 100644 --- a/src/leap/common/plugins.py +++ b/src/leap/common/plugins.py @@ -42,7 +42,8 @@ def _get_extra_pluggable_modules(): try: modules = eval( config.get('plugins', 'extra_pluggable_modules'), {}, {}) - except (ConfigParser.NoSectionError, ConfigParser.NoOptionError): + except (ConfigParser.NoSectionError, ConfigParser.NoOptionError, + ConfigParser.MissingSectionHeaderError): modules = [] return modules -- cgit v1.2.3 From c6107b88ba1eaf7e7ca97d0444e7444634aa98c2 Mon Sep 17 00:00:00 2001 From: drebs Date: Fri, 26 Jun 2015 14:44:37 -0300 Subject: [bug] allow passing ':0' as port in events address --- src/leap/common/events/zmq_components.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'src/leap') diff --git a/src/leap/common/events/zmq_components.py b/src/leap/common/events/zmq_components.py index 3b88862..04f71e0 100644 --- a/src/leap/common/events/zmq_components.py +++ b/src/leap/common/events/zmq_components.py @@ -128,9 +128,9 @@ class TxZmqComponent(object): proto, addr, port = ADDRESS_RE.search(address).groups() - if port is None: + if port is None or port is '0': params = proto, addr - port = socket.bind("%s://%s" % params) + port = socket.bind_to_random_port("%s://%s" % params) # XXX this log doesn't appear logger.debug("Binded %s to %s://%s." % ((connClass,) + params)) else: -- cgit v1.2.3 From 10d7ebdf5226b74c54850d205a807cc650843efe Mon Sep 17 00:00:00 2001 From: drebs Date: Fri, 26 Jun 2015 14:45:19 -0300 Subject: [bug] run callback from thread in events client --- src/leap/common/events/client.py | 36 +++++++++++++++++++++++++++++++++--- src/leap/common/events/txclient.py | 13 +++++++++++++ 2 files changed, 46 insertions(+), 3 deletions(-) (limited to 'src/leap') diff --git a/src/leap/common/events/client.py b/src/leap/common/events/client.py index 4852b5a..1a026ea 100644 --- a/src/leap/common/events/client.py +++ b/src/leap/common/events/client.py @@ -180,14 +180,30 @@ class EventsClient(object): """ Handle an incoming event. - :param msg: The incoming message. - :type msg: list(str) + :param event: The event to be sent. + :type event: Event + :param content: The content of the event. + :type content: list """ logger.debug("Handling event %s..." % event) for uid in self._callbacks[event].keys(): callback = self._callbacks[event][uid] logger.debug("Executing callback %s." % uid) - callback(event, *content) + self._run_callback(callback, event, content) + + @abstractmethod + def _run_callback(self, callback, event, content): + """ + Run a callback. + + :param callback: The callback to be run. + :type callback: callable(event, *content) + :param event: The event to be sent. + :type event: Event + :param content: The content of the event. + :type content: list + """ + pass @abstractmethod def _subscribe(self, tag): @@ -371,6 +387,20 @@ class EventsClientThread(threading.Thread, EventsClient): # add send() as a callback for ioloop so it works between threads self._loop.add_callback(lambda: self._push.send(data)) + def _run_callback(self, callback, event, content): + """ + Run a callback. + + :param callback: The callback to be run. + :type callback: callable(event, *content) + :param event: The event to be sent. + :type event: Event + :param content: The content of the event. + :type content: list + """ + from twisted.internet import reactor + reactor.callFromThread(callback, event, *content) + def register(self, event, callback, uid=None, replace=False): """ Register a callback to be executed when an event is received. diff --git a/src/leap/common/events/txclient.py b/src/leap/common/events/txclient.py index 8206ed5..0dcfc08 100644 --- a/src/leap/common/events/txclient.py +++ b/src/leap/common/events/txclient.py @@ -112,6 +112,19 @@ class EventsTxClient(TxZmqClientComponent, EventsClient): """ self._push.send(data) + def _run_callback(self, callback, event, content): + """ + Run a callback. + + :param callback: The callback to be run. + :type callback: callable(event, *content) + :param event: The event to be sent. + :type event: Event + :param content: The content of the event. + :type content: list + """ + callback(event, *content) + def shutdown(self): TxZmqClientComponent.shutdown(self) EventsClient.shutdown(self) -- cgit v1.2.3 From f508d3629686d1ad4574e5a7ec67f8b3283f7c5f Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Mon, 20 Jul 2015 15:18:57 -0400 Subject: [refactor] remove use of reactor in threaded version of events client the idea is that we'll be able to use the threaded version of the client, which makes use of the tornado ioloop, in a non-twisted module, like the main graphical client probably will be in the near future. --- src/leap/common/events/client.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) (limited to 'src/leap') diff --git a/src/leap/common/events/client.py b/src/leap/common/events/client.py index 1a026ea..5ee617e 100644 --- a/src/leap/common/events/client.py +++ b/src/leap/common/events/client.py @@ -174,7 +174,8 @@ class EventsClient(object): :type content: list """ logger.debug("Emitting event: (%s, %s)" % (event, content)) - self._send(str(event) + b'\0' + pickle.dumps(content)) + payload = str(event) + b'\0' + pickle.dumps(content) + self._send(payload) def _handle_event(self, event, content): """ @@ -186,7 +187,7 @@ class EventsClient(object): :type content: list """ logger.debug("Handling event %s..." % event) - for uid in self._callbacks[event].keys(): + for uid in self._callbacks[event]: callback = self._callbacks[event][uid] logger.debug("Executing callback %s." % uid) self._run_callback(callback, event, content) @@ -398,8 +399,7 @@ class EventsClientThread(threading.Thread, EventsClient): :param content: The content of the event. :type content: list """ - from twisted.internet import reactor - reactor.callFromThread(callback, event, *content) + self._loop.add_callback(lambda: callback(event, *content)) def register(self, event, callback, uid=None, replace=False): """ @@ -422,7 +422,8 @@ class EventsClientThread(threading.Thread, EventsClient): callback identified by the given uid and replace is False. """ self.ensure_client() - return EventsClient.register(self, event, callback, uid=uid, replace=replace) + return EventsClient.register( + self, event, callback, uid=uid, replace=replace) def unregister(self, event, uid=None): """ -- cgit v1.2.3 From 87339921eac261954e39901e3563851830309cc5 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Mon, 20 Jul 2015 19:18:03 -0400 Subject: [bug] do not add a port string to non-tcp addresses this, together with the events server registration, was breaking the events signalling on the client whenever it used ipc:// sockets. --- src/leap/common/events/zmq_components.py | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) (limited to 'src/leap') diff --git a/src/leap/common/events/zmq_components.py b/src/leap/common/events/zmq_components.py index 04f71e0..f99c754 100644 --- a/src/leap/common/events/zmq_components.py +++ b/src/leap/common/events/zmq_components.py @@ -128,16 +128,21 @@ class TxZmqComponent(object): proto, addr, port = ADDRESS_RE.search(address).groups() - if port is None or port is '0': - params = proto, addr - port = socket.bind_to_random_port("%s://%s" % params) - # XXX this log doesn't appear - logger.debug("Binded %s to %s://%s." % ((connClass,) + params)) + if proto == "tcp": + if port is None or port is '0': + params = proto, addr + port = socket.bind_to_random_port("%s://%s" % params) + logger.debug("Binded %s to %s://%s." % ((connClass,) + params)) + else: + params = proto, addr, int(port) + socket.bind("%s://%s:%d" % params) + logger.debug( + "Binded %s to %s://%s:%d." % ((connClass,) + params)) else: - params = proto, addr, int(port) - socket.bind("%s://%s:%d" % params) - # XXX this log doesn't appear - logger.debug("Binded %s to %s://%s:%d." % ((connClass,) + params)) + params = proto, addr + socket.bind("%s://%s" % params) + logger.debug( + "Binded %s to %s://%s" % ((connClass,) + params)) self._connections.append(connection) return connection, port -- cgit v1.2.3 From 467b14fa2e29ecd6f41d4834b00593d8c86cddc5 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Mon, 13 Jul 2015 11:48:47 -0400 Subject: [feature] add global flag for disabling the events framework this will be used to allow the unittests to disable the events framework. this way, emit() will become a passthrough. note that, until now, the basetest class is making use of the threaded version of the client, which launches a zmq tornado-based ioloop. this is wrong, and will have to be addressed in a future commit. we'll have to make use of the global EVENTS_ENABLED flag in the txclient version when those changes are made. Related: #7259 Relases: 0.4.2 --- src/leap/common/events/__init__.py | 7 +++---- src/leap/common/events/client.py | 12 +++++------- src/leap/common/events/flags.py | 28 ++++++++++++++++++++++++++++ src/leap/common/events/txclient.py | 6 +----- src/leap/common/testing/basetest.py | 21 +++++++++++++-------- 5 files changed, 50 insertions(+), 24 deletions(-) create mode 100644 src/leap/common/events/flags.py (limited to 'src/leap') diff --git a/src/leap/common/events/__init__.py b/src/leap/common/events/__init__.py index 9269b9a..87ed8ae 100644 --- a/src/leap/common/events/__init__.py +++ b/src/leap/common/events/__init__.py @@ -14,8 +14,6 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see . - - """ This is an events mechanism that uses a server to allow for emitting events between clients. @@ -37,13 +35,13 @@ To emit an event, use leap.common.events.emit(): >>> from leap.common.events import catalog >>> emit(catalog.CLIENT_UID) """ - - import logging import argparse from leap.common.events import client from leap.common.events import server +from leap.common.events.flags import set_events_enabled + from leap.common.events import catalog @@ -52,6 +50,7 @@ __all__ = [ "unregister", "emit", "catalog", + "set_events_enabled" ] diff --git a/src/leap/common/events/client.py b/src/leap/common/events/client.py index 5ee617e..1744341 100644 --- a/src/leap/common/events/client.py +++ b/src/leap/common/events/client.py @@ -14,8 +14,6 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see . - - """ The client end point of the events mechanism. @@ -27,8 +25,6 @@ When a client registers a callback for a given event, it also tells the server that it wants to be notified whenever events of that type are sent by some other client. """ - - import logging import collections import uuid @@ -59,6 +55,7 @@ from leap.common.zmq_utils import PUBLIC_KEYS_PREFIX from leap.common.events.errors import CallbackAlreadyRegisteredError from leap.common.events.server import EMIT_ADDR from leap.common.events.server import REG_ADDR +from leap.common.events import flags from leap.common.events import catalog @@ -173,9 +170,10 @@ class EventsClient(object): :param content: The content of the event. :type content: list """ - logger.debug("Emitting event: (%s, %s)" % (event, content)) - payload = str(event) + b'\0' + pickle.dumps(content) - self._send(payload) + if flags.EVENTS_ENABLED: + logger.debug("Emitting event: (%s, %s)" % (event, content)) + payload = str(event) + b'\0' + pickle.dumps(content) + self._send(payload) def _handle_event(self, event, content): """ diff --git a/src/leap/common/events/flags.py b/src/leap/common/events/flags.py new file mode 100644 index 0000000..137f663 --- /dev/null +++ b/src/leap/common/events/flags.py @@ -0,0 +1,28 @@ +# -*- coding: utf-8 -*- +# __init__.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +""" +Flags for the events framework. +""" +from leap.common.check import leap_assert + +EVENTS_ENABLED = True + + +def set_events_enabled(flag): + leap_assert(isinstance(flag, bool)) + global EVENTS_ENABLED + EVENTS_ENABLED = flag diff --git a/src/leap/common/events/txclient.py b/src/leap/common/events/txclient.py index 0dcfc08..dfd0533 100644 --- a/src/leap/common/events/txclient.py +++ b/src/leap/common/events/txclient.py @@ -14,8 +14,6 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see . - - """ The client end point of the events mechanism, implemented using txzmq. @@ -27,8 +25,6 @@ When a client registers a callback for a given event, it also tells the server that it wants to be notified whenever events of that type are sent by some other client. """ - - import logging import pickle @@ -62,7 +58,7 @@ class EventsTxClient(TxZmqClientComponent, EventsClient): """ def __init__(self, emit_addr=EMIT_ADDR, reg_addr=REG_ADDR, - path_prefix=None): + path_prefix=None): """ Initialize the events server. """ diff --git a/src/leap/common/testing/basetest.py b/src/leap/common/testing/basetest.py index 3fbcf33..3d3cee0 100644 --- a/src/leap/common/testing/basetest.py +++ b/src/leap/common/testing/basetest.py @@ -30,8 +30,11 @@ except ImportError: from leap.common.check import leap_assert from leap.common.events import server as events_server from leap.common.events import client as events_client +from leap.common.events import flags, set_events_enabled from leap.common.files import mkdir_p, check_and_fix_urw_only +set_events_enabled(False) + class BaseLeapTest(unittest.TestCase): """ @@ -73,12 +76,13 @@ class BaseLeapTest(unittest.TestCase): @classmethod def _init_events(cls): - cls._server = events_server.ensure_server( - emit_addr="tcp://127.0.0.1:0", - reg_addr="tcp://127.0.0.1:0") - events_client.configure_client( - emit_addr="tcp://127.0.0.1:%d" % cls._server.pull_port, - reg_addr="tcp://127.0.0.1:%d" % cls._server.pub_port) + if flags.EVENTS_ENABLED: + cls._server = events_server.ensure_server( + emit_addr="tcp://127.0.0.1:0", + reg_addr="tcp://127.0.0.1:0") + events_client.configure_client( + emit_addr="tcp://127.0.0.1:%d" % cls._server.pull_port, + reg_addr="tcp://127.0.0.1:%d" % cls._server.pub_port) @classmethod def tearDownEnv(cls): @@ -87,8 +91,9 @@ class BaseLeapTest(unittest.TestCase): - restores the default PATH and HOME variables - removes the temporal folder """ - events_client.shutdown() - cls._server.shutdown() + if flags.EVENTS_ENABLED: + events_client.shutdown() + cls._server.shutdown() os.environ["PATH"] = cls.old_path os.environ["HOME"] = cls.old_home -- cgit v1.2.3 From 46870f1e26ef159bf6fe4744aba3b00a5e81cc3b Mon Sep 17 00:00:00 2001 From: drebs Date: Wed, 8 Jul 2015 19:14:00 -0300 Subject: [feat] add close method for http agent The ability to close cached connections is needed in order to have a clean reactor when the program ends. --- src/leap/common/http.py | 7 +++++++ 1 file changed, 7 insertions(+) (limited to 'src/leap') diff --git a/src/leap/common/http.py b/src/leap/common/http.py index d4a214c..8d22f2c 100644 --- a/src/leap/common/http.py +++ b/src/leap/common/http.py @@ -80,6 +80,7 @@ class HTTPClient(object): policy = get_compatible_ssl_context_factory(cert_file) + self._pool = pool self._agent = Agent( reactor, policy, @@ -110,6 +111,12 @@ class HTTPClient(object): d.addCallback(readBody) return d + def close(self): + """ + Close any cached connections. + """ + self._pool.closeCachedConnections() + class StringBodyProducer(object): """ A producer that writes the body of a request to a consumer. -- cgit v1.2.3 From 34f6b07e08540fd9b2ec473d1e4e5a15be4feacc Mon Sep 17 00:00:00 2001 From: drebs Date: Wed, 8 Jul 2015 19:15:56 -0300 Subject: [bug] add http request timeout The connectTimeout parameter of twisted.web.client.Agent only acts on the connection setup, and the Agent will wait forever for incoming data after the connection has been established. This commit adds a timeout for the connection, and will cancel the deferred if the result has not been received after a certain number of seconds. --- src/leap/common/http.py | 260 +++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 214 insertions(+), 46 deletions(-) (limited to 'src/leap') diff --git a/src/leap/common/http.py b/src/leap/common/http.py index 8d22f2c..c93e65b 100644 --- a/src/leap/common/http.py +++ b/src/leap/common/http.py @@ -36,21 +36,24 @@ from zope.interface import implements from twisted.internet import reactor from twisted.internet import defer -from twisted.internet.defer import succeed +from twisted.python import failure from twisted.web.client import Agent from twisted.web.client import HTTPConnectionPool +from twisted.web.client import _HTTP11ClientFactory as HTTP11ClientFactory from twisted.web.client import readBody from twisted.web.http_headers import Headers from twisted.web.iweb import IBodyProducer +from twisted.web._newclient import HTTP11ClientProtocol -def createPool(maxPersistentPerHost=10, persistent=True): - pool = HTTPConnectionPool(reactor, persistent) - pool.maxPersistentPerHost = maxPersistentPerHost - return pool +__all__ = ["HTTPClient"] -_pool = createPool() + +# A default HTTP timeout is used for 2 distinct purposes: +# 1. as HTTP connection timeout, prior to connection estabilshment. +# 2. as data reception timeout, after the connection has been established. +DEFAULT_HTTP_TIMEOUT = 30 # seconds class HTTPClient(object): @@ -66,28 +69,36 @@ class HTTPClient(object): in order to avoid resource abuse on huge requests batches. """ - def __init__(self, cert_file=None, pool=_pool): + def __init__(self, cert_file=None, timeout=DEFAULT_HTTP_TIMEOUT): """ Init the HTTP client :param cert_file: The path to the certificate file, if None given the system's CAs will be used. :type cert_file: str - :param pool: An optional dedicated connection pool to override the - default shared one. - :type pool: HTTPConnectionPool + :param timeout: The amount of time that this Agent will wait for the + peer to accept a connection and for each request to be + finished. If a pool is passed, then this argument is + ignored. + :type timeout: float """ - policy = get_compatible_ssl_context_factory(cert_file) - - self._pool = pool + self._timeout = timeout + self._pool = self._createPool() self._agent = Agent( reactor, - policy, - pool=pool) - self._semaphore = defer.DeferredSemaphore(pool.maxPersistentPerHost) + get_compatible_ssl_context_factory(cert_file), + pool=self._pool, + connectTimeout=self._timeout) + self._semaphore = defer.DeferredSemaphore( + self._pool.maxPersistentPerHost) - def request(self, url, method='GET', body=None, headers={}): + def _createPool(self, maxPersistentPerHost=10, persistent=True): + pool = _HTTPConnectionPool(reactor, persistent, self._timeout) + pool.maxPersistentPerHost = maxPersistentPerHost + return pool + + def _request(self, url, method, body, headers): """ Perform an HTTP request. @@ -104,51 +115,208 @@ class HTTPClient(object): :rtype: twisted.internet.defer.Deferred """ if body: - body = HTTPClient.StringBodyProducer(body) - d = self._semaphore.run(self._agent.request, - method, url, headers=Headers(headers), - bodyProducer=body) + body = _StringBodyProducer(body) + d = self._agent.request( + method, url, headers=Headers(headers), bodyProducer=body) d.addCallback(readBody) return d + def request(self, url, method='GET', body=None, headers={}): + """ + Perform an HTTP request, but limit the maximum amount of concurrent + connections. + + :param url: The URL for the request. + :type url: str + :param method: The HTTP method of the request. + :type method: str + :param body: The body of the request, if any. + :type body: str + :param headers: The headers of the request. + :type headers: dict + + :return: A deferred that fires with the body of the request. + :rtype: twisted.internet.defer.Deferred + """ + return self._semaphore.run(self._request, url, method, body, headers) + def close(self): """ Close any cached connections. """ self._pool.closeCachedConnections() - class StringBodyProducer(object): +# +# An IBodyProducer to write the body of an HTTP request as a string. +# + +class _StringBodyProducer(object): + """ + A producer that writes the body of a request to a consumer. + """ + + implements(IBodyProducer) + + def __init__(self, body): + """ + Initialize the string produer. + + :param body: The body of the request. + :type body: str + """ + self.body = body + self.length = len(body) + + def startProducing(self, consumer): + """ + Write the body to the consumer. + + :param consumer: Any IConsumer provider. + :type consumer: twisted.internet.interfaces.IConsumer + + :return: A successful deferred. + :rtype: twisted.internet.defer.Deferred + """ + consumer.write(self.body) + return defer.succeed(None) + + def pauseProducing(self): + pass + + def stopProducing(self): + pass + + +# +# Patched twisted.web classes +# + +class _HTTP11ClientProtocol(HTTP11ClientProtocol): + """ + A timeout-able HTTP 1.1 client protocol, that is instantiated by the + _HTTP11ClientFactory below. + """ + + def __init__(self, quiescentCallback, timeout): + """ + Initialize the protocol. + + :param quiescentCallback: + :type quiescentCallback: callable + :param timeout: A timeout, in seconds, for requests made by this + protocol. + :type timeout: float + """ + HTTP11ClientProtocol.__init__(self, quiescentCallback) + self._timeout = timeout + self._timeoutCall = None + + def request(self, request): + """ + Issue request over self.transport and return a Deferred which + will fire with a Response instance or an error. + + :param request: The object defining the parameters of the request to + issue. + :type request: twisted.web._newclient.Request + + :return: A deferred which fires after the request has finished. + :rtype: Deferred + """ + d = HTTP11ClientProtocol.request(self, request) + if self._timeout: + self._last_buffer_len = 0 + timeoutCall = reactor.callLater( + self._timeout, self._doTimeout, request) + self._timeoutCall = timeoutCall + return d + + def _doTimeout(self, request): """ - A producer that writes the body of a request to a consumer. + Give up the request because of a timeout. + + :param request: The object defining the parameters of the request to + issue. + :type request: twisted.web._newclient.Request + """ + self._giveUp( + failure.Failure( + defer.TimeoutError( + "Getting %s took longer than %s seconds." + % (request.absoluteURI, self._timeout)))) + + def _cancelTimeout(self): """ + Cancel the request timeout, when it's finished. + """ + if self._timeoutCall.active(): + self._timeoutCall.cancel() + self._timeoutCall = None + + def _finishResponse_WAITING(self, rest): + """ + Cancel the timeout when finished receiving the response. + """ + self._cancelTimeout() + HTTP11ClientProtocol._finishResponse_WAITING(self, rest) - implements(IBodyProducer) + def _finishResponse_TRANSMITTING(self, rest): + """ + Cancel the timeout when finished receiving the response. + """ + self._cancelTimeout() + HTTP11ClientProtocol._finishResponse_TRANSMITTING(self, rest) - def __init__(self, body): - """ - Initialize the string produer. + def dataReceived(self, bytes): + """ + Receive some data and extend the timeout period of this request. + + :param bytes: A string of indeterminate length. + :type bytes: str + """ + HTTP11ClientProtocol.dataReceived(self, bytes) + if self._timeoutCall and self._timeoutCall.active(): + self._timeoutCall.reset(self._timeout) - :param body: The body of the request. - :type body: str - """ - self.body = body - self.length = len(body) - def startProducing(self, consumer): - """ - Write the body to the consumer. +class _HTTP11ClientFactory(HTTP11ClientFactory): + """ + A timeout-able HTTP 1.1 client protocol factory. + """ - :param consumer: Any IConsumer provider. - :type consumer: twisted.internet.interfaces.IConsumer + def __init__(self, quiescentCallback, timeout): + """ + :param quiescentCallback: The quiescent callback to be passed to + protocol instances, used to return them to + the connection pool. + :type quiescentCallback: callable(Protocol) + :param timeout: The timeout, in seconds, for requests made by + protocols created by this factory. + :type timeout: float + """ + HTTP11ClientFactory.__init__(self, quiescentCallback) + self._timeout = timeout + + def buildProtocol(self, _): + """ + Build the HTTP 1.1 client protocol. + """ + return _HTTP11ClientProtocol(self._quiescentCallback, self._timeout) + + +class _HTTPConnectionPool(HTTPConnectionPool): + """ + A timeout-able HTTP connection pool. + """ - :return: A successful deferred. - :rtype: twisted.internet.defer.Deferred - """ - consumer.write(self.body) - return succeed(None) + _factory = _HTTP11ClientFactory - def pauseProducing(self): - pass + def __init__(self, reactor, persistent, timeout): + HTTPConnectionPool.__init__(self, reactor, persistent=persistent) + self._timeout = timeout - def stopProducing(self): - pass + def _newConnection(self, key, endpoint): + def quiescentCallback(protocol): + self._putConnection(key, protocol) + factory = self._factory(quiescentCallback, timeout=self._timeout) + return endpoint.connect(factory) -- cgit v1.2.3 From 07d421a32f3bb45932668f4951233166ada4e770 Mon Sep 17 00:00:00 2001 From: drebs Date: Thu, 23 Jul 2015 17:03:50 -0300 Subject: [bug] fix events ioloop client tests The events ioloop client is not completelly thread safe, so we have to use reactor.callFromThread whenever we need it to be thread-safe. Examples of this behaviour are the events ioloop client tests that depend on the firing of deferreds, which are not thread safe. This commit fixes tese tests. --- src/leap/common/tests/test_events.py | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) (limited to 'src/leap') diff --git a/src/leap/common/tests/test_events.py b/src/leap/common/tests/test_events.py index 7ef3e1b..611781c 100644 --- a/src/leap/common/tests/test_events.py +++ b/src/leap/common/tests/test_events.py @@ -20,6 +20,7 @@ import os import logging import time +from twisted.internet.reactor import callFromThread from twisted.trial import unittest from twisted.internet import defer @@ -80,8 +81,8 @@ class EventsGenericClientTestCase(object): """ event = catalog.CLIENT_UID d = defer.Deferred() - cbk_fail = lambda event, _: d.errback(event) - cbk_succeed = lambda event, _: d.callback(event) + cbk_fail = lambda event, _: callFromThread(d.errback, event) + cbk_succeed = lambda event, _: callFromThread(d.callback, event) self._client.register(event, cbk_fail, uid=1) self._client.register(event, cbk_succeed, uid=1, replace=True) self._client.emit(event, None) @@ -105,9 +106,9 @@ class EventsGenericClientTestCase(object): """ event = catalog.CLIENT_UID d1 = defer.Deferred() - cbk1 = lambda event, _: d1.callback(event) + cbk1 = lambda event, _: callFromThread(d1.callback, event) d2 = defer.Deferred() - cbk2 = lambda event, _: d2.callback(event) + cbk2 = lambda event, _: callFromThread(d2.callback, event) self._client.register(event, cbk1) self._client.register(event, cbk2) self._client.emit(event, None) @@ -121,7 +122,7 @@ class EventsGenericClientTestCase(object): event = catalog.CLIENT_UID d = defer.Deferred() def cbk(events, _): - d.callback(event) + callFromThread(d.callback, event) self._client.register(event, cbk) self._client.emit(event, None) return d @@ -133,14 +134,17 @@ class EventsGenericClientTestCase(object): event1 = catalog.CLIENT_UID d = defer.Deferred() # register more than one callback for the same event - self._client.register(event1, lambda ev, _: d.errback(None)) - self._client.register(event1, lambda ev, _: d.errback(None)) + self._client.register( + event1, lambda ev, _: callFromThread(d.errback, None)) + self._client.register( + event1, lambda ev, _: callFromThread(d.errback, None)) # unregister and emit the event self._client.unregister(event1) self._client.emit(event1, None) # register and emit another event so the deferred can succeed event2 = catalog.CLIENT_SESSION_ID - self._client.register(event2, lambda ev, _: d.callback(None)) + self._client.register( + event2, lambda ev, _: callFromThread(d.callback, None)) self._client.emit(event2, None) return d @@ -151,9 +155,11 @@ class EventsGenericClientTestCase(object): event = catalog.CLIENT_UID d = defer.Deferred() # register one callback that would fail - uid = self._client.register(event, lambda ev, _: d.errback(None)) + uid = self._client.register( + event, lambda ev, _: callFromThread(d.errback, None)) # register one callback that will succeed - self._client.register(event, lambda ev, _: d.callback(None)) + self._client.register( + event, lambda ev, _: callFromThread(d.callback, None)) # unregister by uid and emit the event self._client.unregister(event, uid=uid) self._client.emit(event, None) -- cgit v1.2.3 From 25d3b7c80f85cd94159b574274108061a94f1bc9 Mon Sep 17 00:00:00 2001 From: Bruno Wagner Date: Wed, 22 Jul 2015 15:38:56 -0300 Subject: [style] Fixed pep8 warnings --- src/leap/common/__init__.py | 4 +-- src/leap/common/_version.py | 40 ++++++++++++------------- src/leap/common/ca_bundle.py | 1 + src/leap/common/config/pluggableconfig.py | 27 ++++++++++------- src/leap/common/config/tests/test_baseconfig.py | 30 +++++++++---------- src/leap/common/http.py | 21 ++++++------- src/leap/common/tests/test_certs.py | 6 ++-- src/leap/common/tests/test_events.py | 33 +++++++++++++++----- src/leap/common/zmq_utils.py | 2 -- 9 files changed, 96 insertions(+), 68 deletions(-) (limited to 'src/leap') diff --git a/src/leap/common/__init__.py b/src/leap/common/__init__.py index 5619900..383e198 100644 --- a/src/leap/common/__init__.py +++ b/src/leap/common/__init__.py @@ -4,6 +4,7 @@ from leap.common import certs from leap.common import check from leap.common import files from leap.common import events +from ._version import get_versions logger = logging.getLogger(__name__) @@ -11,11 +12,10 @@ try: import pygeoip HAS_GEOIP = True except ImportError: - #logger.debug('PyGeoIP not found. Disabled Geo support.') + # logger.debug('PyGeoIP not found. Disabled Geo support.') HAS_GEOIP = False __all__ = ["certs", "check", "files", "events"] -from ._version import get_versions __version__ = get_versions()['version'] del get_versions diff --git a/src/leap/common/_version.py b/src/leap/common/_version.py index 597e2e4..de94ba8 100644 --- a/src/leap/common/_version.py +++ b/src/leap/common/_version.py @@ -1,5 +1,3 @@ - -IN_LONG_VERSION_PY = True # This file helps to compute a version number in source trees obtained from # git-archive tarball (such as those provided by githubs download-from-tag # feature). Distribution tarballs (build by setup.py sdist) and build @@ -10,12 +8,16 @@ IN_LONG_VERSION_PY = True # versioneer-0.7+ (https://github.com/warner/python-versioneer) # these strings will be replaced by git during git-archive -git_refnames = "$Format:%d$" -git_full = "$Format:%H$" - import subprocess import sys +import re +import os.path + +IN_LONG_VERSION_PY = True +git_refnames = "$Format:%d$" +git_full = "$Format:%H$" + def run_command(args, cwd=None, verbose=False): try: @@ -37,10 +39,6 @@ def run_command(args, cwd=None, verbose=False): return stdout -import sys -import re -import os.path - def get_expanded_variables(versionfile_source): # the code embedded in _version.py can just fetch the value of these # variables. When used from setup.py, we don't want to import @@ -48,7 +46,7 @@ def get_expanded_variables(versionfile_source): # used from _version.py. variables = {} try: - f = open(versionfile_source,"r") + f = open(versionfile_source, "r") for line in f.readlines(): if line.strip().startswith("git_refnames ="): mo = re.search(r'=\s*"(.*)"', line) @@ -63,12 +61,13 @@ def get_expanded_variables(versionfile_source): pass return variables + def versions_from_expanded_variables(variables, tag_prefix, verbose=False): refnames = variables["refnames"].strip() if refnames.startswith("$Format"): if verbose: print("variables are unexpanded, not using") - return {} # unexpanded, so not in an unpacked git-archive tarball + return {} # unexpanded, so not in an unpacked git-archive tarball refs = set([r.strip() for r in refnames.strip("()").split(",")]) # starting in git-1.8.3, tags are listed as "tag: foo-1.0" instead of # just "foo-1.0". If we see a "tag: " prefix, prefer those. @@ -84,7 +83,7 @@ def versions_from_expanded_variables(variables, tag_prefix, verbose=False): # "stabilization", as well as "HEAD" and "master". tags = set([r for r in refs if re.search(r'\d', r)]) if verbose: - print("discarding '%s', no digits" % ",".join(refs-tags)) + print("discarding '%s', no digits" % ",".join(refs - tags)) if verbose: print("likely tags: %s" % ",".join(sorted(tags))) for ref in sorted(tags): @@ -93,13 +92,14 @@ def versions_from_expanded_variables(variables, tag_prefix, verbose=False): r = ref[len(tag_prefix):] if verbose: print("picking %s" % r) - return { "version": r, - "full": variables["full"].strip() } + return {"version": r, + "full": variables["full"].strip()} # no suitable tags, so we use the full revision id if verbose: print("no suitable tags, using full revision id") - return { "version": variables["full"].strip(), - "full": variables["full"].strip() } + return {"version": variables["full"].strip(), + "full": variables["full"].strip()} + def versions_from_vcs(tag_prefix, versionfile_source, verbose=False): # this runs 'git' from the root of the source tree. That either means @@ -116,7 +116,7 @@ def versions_from_vcs(tag_prefix, versionfile_source, verbose=False): here = os.path.abspath(__file__) except NameError: # some py2exe/bbfreeze/non-CPython implementations don't do __file__ - return {} # not always correct + return {} # not always correct # versionfile_source is the relative path from the top of the source tree # (where the .git directory might live) to this file. Invert this to find @@ -163,7 +163,7 @@ def versions_from_parentdir(parentdir_prefix, versionfile_source, verbose=False) here = os.path.abspath(__file__) except NameError: # py2exe/bbfreeze/non-CPython don't have __file__ - return {} # without __file__, we have no hope + return {} # without __file__, we have no hope # versionfile_source is the relative path from the top of the source # tree to _version.py. Invert this to find the root from __file__. root = here @@ -189,8 +189,9 @@ tag_prefix = "" parentdir_prefix = "leap.common-" versionfile_source = "src/leap/common/_version.py" + def get_versions(default={"version": "unknown", "full": ""}, verbose=False): - variables = { "refnames": git_refnames, "full": git_full } + variables = {"refnames": git_refnames, "full": git_full} ver = versions_from_expanded_variables(variables, tag_prefix, verbose) if not ver: ver = versions_from_vcs(tag_prefix, versionfile_source, verbose) @@ -200,4 +201,3 @@ def get_versions(default={"version": "unknown", "full": ""}, verbose=False): if not ver: ver = default return ver - diff --git a/src/leap/common/ca_bundle.py b/src/leap/common/ca_bundle.py index d8c72a6..2c41d18 100644 --- a/src/leap/common/ca_bundle.py +++ b/src/leap/common/ca_bundle.py @@ -28,6 +28,7 @@ _system = platform.system() IS_MAC = _system == "Darwin" + def where(): """ Return the preferred certificate bundle. diff --git a/src/leap/common/config/pluggableconfig.py b/src/leap/common/config/pluggableconfig.py index 8535fa6..1a98427 100644 --- a/src/leap/common/config/pluggableconfig.py +++ b/src/leap/common/config/pluggableconfig.py @@ -27,7 +27,7 @@ import urlparse import jsonschema -#from leap.base.util.translations import LEAPTranslatable +# from leap.base.util.translations import LEAPTranslatable from leap.common.check import leap_assert @@ -163,8 +163,8 @@ class TranslatableType(object): return data # LEAPTranslatable(data) # needed? we already have an extended dict... - #def get_prep_value(self, data): - #return dict(data) + # def get_prep_value(self, data): + # return dict(data) class URIType(object): @@ -283,9 +283,13 @@ class PluggableConfig(object): except BaseException, e: raise TypeCastException( "Could not coerce %s, %s, " - "to format %s: %s" % (key, value, - _ftype.__class__.__name__, - e)) + "to format %s: %s" % ( + key, + value, + _ftype.__class__.__name__, + e + ) + ) return config @@ -303,9 +307,12 @@ class PluggableConfig(object): except BaseException, e: raise TypeCastException( "Could not serialize %s, %s, " - "by format %s: %s" % (key, value, - _ftype.__class__.__name__, - e)) + "by format %s: %s" % ( + key, + value, + _ftype.__class__.__name__, + e) + ) else: config[key] = value return config @@ -435,7 +442,7 @@ class PluggableConfig(object): content = self.deserialize(string) if not string and fromfile is not None: - #import ipdb;ipdb.set_trace() + # import ipdb;ipdb.set_trace() content = self.deserialize(fromfile=fromfile) if not content: diff --git a/src/leap/common/config/tests/test_baseconfig.py b/src/leap/common/config/tests/test_baseconfig.py index 8bdf4d0..e17e82d 100644 --- a/src/leap/common/config/tests/test_baseconfig.py +++ b/src/leap/common/config/tests/test_baseconfig.py @@ -29,21 +29,21 @@ from mock import Mock # reduced eipconfig sample config sample_config = { "gateways": [ - { - "capabilities": { - "adblock": False, - "transport": ["openvpn"], - "user_ips": False - }, - "host": "host.dev.example.org", - }, { - "capabilities": { - "adblock": False, - "transport": ["openvpn"], - "user_ips": False - }, - "host": "host2.dev.example.org", - } + { + "capabilities": { + "adblock": False, + "transport": ["openvpn"], + "user_ips": False + }, + "host": "host.dev.example.org", + }, { + "capabilities": { + "adblock": False, + "transport": ["openvpn"], + "user_ips": False + }, + "host": "host2.dev.example.org", + } ], "default_language": "en", "languages": [ diff --git a/src/leap/common/http.py b/src/leap/common/http.py index c93e65b..56938b4 100644 --- a/src/leap/common/http.py +++ b/src/leap/common/http.py @@ -150,6 +150,7 @@ class HTTPClient(object): # An IBodyProducer to write the body of an HTTP request as a string. # + class _StringBodyProducer(object): """ A producer that writes the body of a request to a consumer. @@ -254,18 +255,18 @@ class _HTTP11ClientProtocol(HTTP11ClientProtocol): self._timeoutCall = None def _finishResponse_WAITING(self, rest): - """ - Cancel the timeout when finished receiving the response. - """ - self._cancelTimeout() - HTTP11ClientProtocol._finishResponse_WAITING(self, rest) + """ + Cancel the timeout when finished receiving the response. + """ + self._cancelTimeout() + HTTP11ClientProtocol._finishResponse_WAITING(self, rest) def _finishResponse_TRANSMITTING(self, rest): - """ - Cancel the timeout when finished receiving the response. - """ - self._cancelTimeout() - HTTP11ClientProtocol._finishResponse_TRANSMITTING(self, rest) + """ + Cancel the timeout when finished receiving the response. + """ + self._cancelTimeout() + HTTP11ClientProtocol._finishResponse_TRANSMITTING(self, rest) def dataReceived(self, bytes): """ diff --git a/src/leap/common/tests/test_certs.py b/src/leap/common/tests/test_certs.py index 999071f..209e051 100644 --- a/src/leap/common/tests/test_certs.py +++ b/src/leap/common/tests/test_certs.py @@ -60,11 +60,13 @@ class CertsTest(BaseLeapTest): self.assertTrue(certs.should_redownload(cert_path)) def test_should_redownload_if_before(self): - new_now = lambda: time.struct_time(CERT_NOT_BEFORE) + def new_now(): + time.struct_time(CERT_NOT_BEFORE) self.assertTrue(certs.should_redownload(TEST_CERT_PEM, now=new_now)) def test_should_redownload_if_after(self): - new_now = lambda: time.struct_time(CERT_NOT_AFTER) + def new_now(): + time.struct_time(CERT_NOT_AFTER) self.assertTrue(certs.should_redownload(TEST_CERT_PEM, now=new_now)) def test_not_should_redownload(self): diff --git a/src/leap/common/tests/test_events.py b/src/leap/common/tests/test_events.py index 611781c..e2a918f 100644 --- a/src/leap/common/tests/test_events.py +++ b/src/leap/common/tests/test_events.py @@ -1,4 +1,4 @@ -## -*- coding: utf-8 -*- +# -*- coding: utf-8 -*- # test_events.py # Copyright (C) 2013 LEAP # @@ -60,7 +60,10 @@ class EventsGenericClientTestCase(object): 'There should be no callback for this event.') # register one event event1 = catalog.CLIENT_UID - cbk1 = lambda event, _: True + + def cbk1(event, _): + return True + uid1 = self._client.register(event1, cbk1) # assert for correct registration self.assertTrue(len(callbacks) == 1) @@ -68,7 +71,10 @@ class EventsGenericClientTestCase(object): 'Could not register event in local client.') # register another event event2 = catalog.CLIENT_SESSION_ID - cbk2 = lambda event, _: True + + def cbk2(event, _): + return True + uid2 = self._client.register(event2, cbk2) # assert for correct registration self.assertTrue(len(callbacks) == 2) @@ -81,8 +87,13 @@ class EventsGenericClientTestCase(object): """ event = catalog.CLIENT_UID d = defer.Deferred() - cbk_fail = lambda event, _: callFromThread(d.errback, event) - cbk_succeed = lambda event, _: callFromThread(d.callback, event) + + def cbk_fail(event, _): + return callFromThread(d.errback, event) + + def cbk_succeed(event, _): + return callFromThread(d.callback, event) + self._client.register(event, cbk_fail, uid=1) self._client.register(event, cbk_succeed, uid=1, replace=True) self._client.emit(event, None) @@ -106,9 +117,15 @@ class EventsGenericClientTestCase(object): """ event = catalog.CLIENT_UID d1 = defer.Deferred() - cbk1 = lambda event, _: callFromThread(d1.callback, event) + + def cbk1(event, _): + return callFromThread(d1.callback, event) + d2 = defer.Deferred() - cbk2 = lambda event, _: callFromThread(d2.callback, event) + + def cbk2(event, _): + return d2.callback(event) + self._client.register(event, cbk1) self._client.register(event, cbk2) self._client.emit(event, None) @@ -121,8 +138,10 @@ class EventsGenericClientTestCase(object): """ event = catalog.CLIENT_UID d = defer.Deferred() + def cbk(events, _): callFromThread(d.callback, event) + self._client.register(event, cbk) self._client.emit(event, None) return d diff --git a/src/leap/common/zmq_utils.py b/src/leap/common/zmq_utils.py index 19625b9..0a781de 100644 --- a/src/leap/common/zmq_utils.py +++ b/src/leap/common/zmq_utils.py @@ -101,5 +101,3 @@ def maybe_create_and_get_certificates(basedir, name): mkdir_p(public_keys_dir) shutil.move(old_public_key, new_public_key) return zmq.auth.load_certificate(private_key) - - -- cgit v1.2.3 From 301be892a67f88b8b9479531003403a77b36a5f2 Mon Sep 17 00:00:00 2001 From: Bruno Wagner Date: Wed, 22 Jul 2015 19:17:03 -0300 Subject: [tests] set environment for certs test --- src/leap/common/tests/test_certs.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'src/leap') diff --git a/src/leap/common/tests/test_certs.py b/src/leap/common/tests/test_certs.py index 209e051..8ebc0f4 100644 --- a/src/leap/common/tests/test_certs.py +++ b/src/leap/common/tests/test_certs.py @@ -43,10 +43,10 @@ CERT_NOT_AFTER = (2023, 9, 1, 17, 52, 16, 4, 244, 0) class CertsTest(BaseLeapTest): def setUp(self): - pass + self.setUpEnv() def tearDown(self): - pass + self.tearDownEnv() def test_should_redownload_if_no_cert(self): self.assertTrue(certs.should_redownload(certfile="")) -- cgit v1.2.3 From 5502318835626527d9818c360c7fd2b1a4b01145 Mon Sep 17 00:00:00 2001 From: Bruno Wagner Date: Wed, 22 Jul 2015 23:55:39 -0300 Subject: [tests] implemented http feature according to test Two test cases were broken and were implemented here: The first was that HTTPClient should share the connection between clients if a pool was not passed explicitly. If you initialize an HTTPClient without a pool, it will reuse a pool created on the class. The second was that you should be able to pass to the HTTPCLient a pool on initialization. Added that possibility and fixed the tests accordingly --- src/leap/common/http.py | 98 +++++++++++++++++++++----------------- src/leap/common/tests/test_http.py | 7 ++- 2 files changed, 59 insertions(+), 46 deletions(-) (limited to 'src/leap') diff --git a/src/leap/common/http.py b/src/leap/common/http.py index 56938b4..f67507d 100644 --- a/src/leap/common/http.py +++ b/src/leap/common/http.py @@ -56,6 +56,50 @@ __all__ = ["HTTPClient"] DEFAULT_HTTP_TIMEOUT = 30 # seconds +class _HTTP11ClientFactory(HTTP11ClientFactory): + """ + A timeout-able HTTP 1.1 client protocol factory. + """ + + def __init__(self, quiescentCallback, timeout): + """ + :param quiescentCallback: The quiescent callback to be passed to + protocol instances, used to return them to + the connection pool. + :type quiescentCallback: callable(Protocol) + :param timeout: The timeout, in seconds, for requests made by + protocols created by this factory. + :type timeout: float + """ + HTTP11ClientFactory.__init__(self, quiescentCallback) + self._timeout = timeout + + def buildProtocol(self, _): + """ + Build the HTTP 1.1 client protocol. + """ + return _HTTP11ClientProtocol(self._quiescentCallback, self._timeout) + + +class _HTTPConnectionPool(HTTPConnectionPool): + """ + A timeout-able HTTP connection pool. + """ + + _factory = _HTTP11ClientFactory + + def __init__(self, reactor, persistent, timeout, maxPersistentPerHost=10): + HTTPConnectionPool.__init__(self, reactor, persistent=persistent) + self.maxPersistentPerHost = maxPersistentPerHost + self._timeout = timeout + + def _newConnection(self, key, endpoint): + def quiescentCallback(protocol): + self._putConnection(key, protocol) + factory = self._factory(quiescentCallback, timeout=self._timeout) + return endpoint.connect(factory) + + class HTTPClient(object): """ HTTP client done the twisted way, with a main focus on pinning the SSL @@ -69,7 +113,14 @@ class HTTPClient(object): in order to avoid resource abuse on huge requests batches. """ - def __init__(self, cert_file=None, timeout=DEFAULT_HTTP_TIMEOUT): + _pool = _HTTPConnectionPool( + reactor, + persistent=True, + timeout = DEFAULT_HTTP_TIMEOUT, + maxPersistentPerHost=10 + ) + + def __init__(self, cert_file=None, timeout=DEFAULT_HTTP_TIMEOUT, pool=None): """ Init the HTTP client @@ -84,7 +135,7 @@ class HTTPClient(object): """ self._timeout = timeout - self._pool = self._createPool() + self._pool = pool if pool is not None else self._pool self._agent = Agent( reactor, get_compatible_ssl_context_factory(cert_file), @@ -278,46 +329,3 @@ class _HTTP11ClientProtocol(HTTP11ClientProtocol): HTTP11ClientProtocol.dataReceived(self, bytes) if self._timeoutCall and self._timeoutCall.active(): self._timeoutCall.reset(self._timeout) - - -class _HTTP11ClientFactory(HTTP11ClientFactory): - """ - A timeout-able HTTP 1.1 client protocol factory. - """ - - def __init__(self, quiescentCallback, timeout): - """ - :param quiescentCallback: The quiescent callback to be passed to - protocol instances, used to return them to - the connection pool. - :type quiescentCallback: callable(Protocol) - :param timeout: The timeout, in seconds, for requests made by - protocols created by this factory. - :type timeout: float - """ - HTTP11ClientFactory.__init__(self, quiescentCallback) - self._timeout = timeout - - def buildProtocol(self, _): - """ - Build the HTTP 1.1 client protocol. - """ - return _HTTP11ClientProtocol(self._quiescentCallback, self._timeout) - - -class _HTTPConnectionPool(HTTPConnectionPool): - """ - A timeout-able HTTP connection pool. - """ - - _factory = _HTTP11ClientFactory - - def __init__(self, reactor, persistent, timeout): - HTTPConnectionPool.__init__(self, reactor, persistent=persistent) - self._timeout = timeout - - def _newConnection(self, key, endpoint): - def quiescentCallback(protocol): - self._putConnection(key, protocol) - factory = self._factory(quiescentCallback, timeout=self._timeout) - return endpoint.connect(factory) diff --git a/src/leap/common/tests/test_http.py b/src/leap/common/tests/test_http.py index e240ca3..a586fd1 100644 --- a/src/leap/common/tests/test_http.py +++ b/src/leap/common/tests/test_http.py @@ -47,7 +47,12 @@ class HTTPClientTest(BaseLeapTest): self.assertEquals(client._agent._pool, client2._agent._pool, "Pool was not reused by default") def test_agent_can_have_dedicated_custom_pool(self): - custom_pool = http.createPool(maxPersistentPerHost=42, persistent=False) + custom_pool = http._HTTPConnectionPool( + None, + timeout=10, + maxPersistentPerHost=42, + persistent=False + ) self.assertEquals(custom_pool.maxPersistentPerHost, 42, "Custom persistent connections limit is not being respected") self.assertFalse(custom_pool.persistent, -- cgit v1.2.3 From 486da2654c63262f0dbc2d603125f0c8c5c4ea74 Mon Sep 17 00:00:00 2001 From: Bruno Wagner Date: Thu, 23 Jul 2015 00:00:39 -0300 Subject: [tests] fixed events tests The events tests check for register and emit signals, but because the flag set_events_enabled was False by default in the tests, no signals were being emitted. I added the flag to the setUp and tearDown of the tests, they are still very slow but at least they are passing now --- src/leap/common/tests/test_events.py | 3 +++ 1 file changed, 3 insertions(+) (limited to 'src/leap') diff --git a/src/leap/common/tests/test_events.py b/src/leap/common/tests/test_events.py index e2a918f..8f08eeb 100644 --- a/src/leap/common/tests/test_events.py +++ b/src/leap/common/tests/test_events.py @@ -26,6 +26,7 @@ from twisted.internet import defer from leap.common.events import server from leap.common.events import client +from leap.common.events import flags from leap.common.events import txclient from leap.common.events import catalog from leap.common.events.errors import CallbackAlreadyRegisteredError @@ -44,10 +45,12 @@ class EventsGenericClientTestCase(object): self._client.configure_client( emit_addr="tcp://127.0.0.1:%d" % self._server.pull_port, reg_addr="tcp://127.0.0.1:%d" % self._server.pub_port) + flags.set_events_enabled(True) def tearDown(self): self._client.shutdown() self._server.shutdown() + flags.set_events_enabled(False) # wait a bit for sockets to close properly time.sleep(0.1) -- cgit v1.2.3 From e8d54bd3dc4b5f0327a30c0a6848dd832beb7da0 Mon Sep 17 00:00:00 2001 From: Bruno Wagner Date: Thu, 23 Jul 2015 00:09:03 -0300 Subject: [style] fixed pep8 warnings on http and test events --- src/leap/common/http.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/leap') diff --git a/src/leap/common/http.py b/src/leap/common/http.py index f67507d..8e8d3d9 100644 --- a/src/leap/common/http.py +++ b/src/leap/common/http.py @@ -116,7 +116,7 @@ class HTTPClient(object): _pool = _HTTPConnectionPool( reactor, persistent=True, - timeout = DEFAULT_HTTP_TIMEOUT, + timeout=DEFAULT_HTTP_TIMEOUT, maxPersistentPerHost=10 ) -- cgit v1.2.3 From 16242a1ebbac7ee0b38d3199b2a1317297d4506b Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Thu, 23 Jul 2015 16:36:12 -0400 Subject: [tests] fix initialization of basetest case --- src/leap/common/testing/test_basetest.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) (limited to 'src/leap') diff --git a/src/leap/common/testing/test_basetest.py b/src/leap/common/testing/test_basetest.py index cf0962d..ec42a62 100644 --- a/src/leap/common/testing/test_basetest.py +++ b/src/leap/common/testing/test_basetest.py @@ -83,12 +83,10 @@ class TestInitBaseLeapTest(BaseLeapTest): """ def setUp(self): - """nuke it""" - pass + self.setUpEnv() def tearDown(self): - """nuke it""" - pass + self.tearDownEnv() def test_path_is_changed(self): """tests whether we have changed the PATH env var""" -- cgit v1.2.3 From a119dd4fa2fc4a14577fd2d6e32dff950d934193 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Tue, 28 Jul 2015 11:41:32 -0400 Subject: [style] more pep8 cleanup --- src/leap/common/certs.py | 3 ++- src/leap/common/http.py | 3 ++- src/leap/common/tests/test_http.py | 18 +++++++++++++----- 3 files changed, 17 insertions(+), 7 deletions(-) (limited to 'src/leap') diff --git a/src/leap/common/certs.py b/src/leap/common/certs.py index c8e0743..37ede8e 100644 --- a/src/leap/common/certs.py +++ b/src/leap/common/certs.py @@ -192,6 +192,7 @@ def get_compatible_ssl_context_factory(cert_path=None): return policy else: raise Exception((""" - Twisted 14.0.2 is needed in order to have secure Client Web SSL Contexts, not %s + Twisted 14.0.2 is needed in order to have secure + Client Web SSL Contexts, not %s See: http://twistedmatrix.com/trac/ticket/7647 """) % (twisted.version.base())) diff --git a/src/leap/common/http.py b/src/leap/common/http.py index 8e8d3d9..1e7ded7 100644 --- a/src/leap/common/http.py +++ b/src/leap/common/http.py @@ -120,7 +120,8 @@ class HTTPClient(object): maxPersistentPerHost=10 ) - def __init__(self, cert_file=None, timeout=DEFAULT_HTTP_TIMEOUT, pool=None): + def __init__(self, cert_file=None, + timeout=DEFAULT_HTTP_TIMEOUT, pool=None): """ Init the HTTP client diff --git a/src/leap/common/tests/test_http.py b/src/leap/common/tests/test_http.py index a586fd1..f44550f 100644 --- a/src/leap/common/tests/test_http.py +++ b/src/leap/common/tests/test_http.py @@ -43,8 +43,11 @@ class HTTPClientTest(BaseLeapTest): def test_agents_sharing_pool_by_default(self): client = http.HTTPClient() client2 = http.HTTPClient(TEST_CERT_PEM) - self.assertNotEquals(client._agent, client2._agent, "Expected dedicated agents") - self.assertEquals(client._agent._pool, client2._agent._pool, "Pool was not reused by default") + self.assertNotEquals( + client._agent, client2._agent, "Expected dedicated agents") + self.assertEquals( + client._agent._pool, client2._agent._pool, + "Pool was not reused by default") def test_agent_can_have_dedicated_custom_pool(self): custom_pool = http._HTTPConnectionPool( @@ -54,14 +57,19 @@ class HTTPClientTest(BaseLeapTest): persistent=False ) self.assertEquals(custom_pool.maxPersistentPerHost, 42, - "Custom persistent connections limit is not being respected") + "Custom persistent connections " + "limit is not being respected") self.assertFalse(custom_pool.persistent, "Custom persistence is not being respected") default_client = http.HTTPClient() custom_client = http.HTTPClient(pool=custom_pool) - self.assertNotEquals(default_client._agent, custom_client._agent, "No agent reuse is expected") - self.assertEquals(custom_pool, custom_client._agent._pool, "Custom pool usage was not respected") + self.assertNotEquals( + default_client._agent, custom_client._agent, + "No agent reuse is expected") + self.assertEquals( + custom_pool, custom_client._agent._pool, + "Custom pool usage was not respected") if __name__ == "__main__": unittest.main() -- cgit v1.2.3 From 08779379337ff77729c9d2ebdb6f4aaf486d38a7 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 28 Jul 2015 16:26:14 -0300 Subject: [bug] Consider events flag when ensuring client Change EventsClientThread behavior so it won't start anymore if the events flag is set to False --- src/leap/common/events/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/leap') diff --git a/src/leap/common/events/client.py b/src/leap/common/events/client.py index 1744341..8d8d522 100644 --- a/src/leap/common/events/client.py +++ b/src/leap/common/events/client.py @@ -466,7 +466,7 @@ class EventsClientThread(threading.Thread, EventsClient): Make sure the events client thread is started. """ with self._lock: - if not self.is_alive(): + if flags.EVENTS_ENABLED and not self.is_alive(): self.daemon = True self.start() self._initialized.wait() -- cgit v1.2.3 From e47f4bcc0eb9c3d08c649adcb62b0325f439113e Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 28 Jul 2015 16:48:04 -0300 Subject: [test] set flag before starting client Test client will only start with flag set to True. Change EventsGenericClientTestCase to set the flag on the first line of setUp. --- src/leap/common/tests/test_events.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/leap') diff --git a/src/leap/common/tests/test_events.py b/src/leap/common/tests/test_events.py index 8f08eeb..2ad097e 100644 --- a/src/leap/common/tests/test_events.py +++ b/src/leap/common/tests/test_events.py @@ -39,13 +39,13 @@ if 'DEBUG' in os.environ: class EventsGenericClientTestCase(object): def setUp(self): + flags.set_events_enabled(True) self._server = server.ensure_server( emit_addr="tcp://127.0.0.1:0", reg_addr="tcp://127.0.0.1:0") self._client.configure_client( emit_addr="tcp://127.0.0.1:%d" % self._server.pull_port, reg_addr="tcp://127.0.0.1:%d" % self._server.pub_port) - flags.set_events_enabled(True) def tearDown(self): self._client.shutdown() -- cgit v1.2.3 From 2e911bd0c949b7f42824ed87c467b1ac0919224a Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 29 Jul 2015 13:30:41 -0300 Subject: [refactor] Extract flags check to caller Checking was done inside of emit method. Doing on emit function at a module level makes it cleaner with less lines inside of check. --- src/leap/common/events/client.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) (limited to 'src/leap') diff --git a/src/leap/common/events/client.py b/src/leap/common/events/client.py index 8d8d522..4790fc3 100644 --- a/src/leap/common/events/client.py +++ b/src/leap/common/events/client.py @@ -170,10 +170,9 @@ class EventsClient(object): :param content: The content of the event. :type content: list """ - if flags.EVENTS_ENABLED: - logger.debug("Emitting event: (%s, %s)" % (event, content)) - payload = str(event) + b'\0' + pickle.dumps(content) - self._send(payload) + logger.debug("Emitting event: (%s, %s)" % (event, content)) + payload = str(event) + b'\0' + pickle.dumps(content) + self._send(payload) def _handle_event(self, event, content): """ @@ -537,7 +536,8 @@ def emit(event, *content): :param content: The content of the event. :type content: list """ - return EventsClientThread.instance().emit(event, *content) + if flags.EVENTS_ENABLED: + return EventsClientThread.instance().emit(event, *content) def instance(): -- cgit v1.2.3 From 2d9ce114daeaf6c4c193079b018576ad1f4f8f28 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 29 Jul 2015 13:34:08 -0300 Subject: [bug] register and unregister controlled by flag Since register and unregister cant be used without full zmq initialization, it should make sense to also check flag for them. --- src/leap/common/events/client.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) (limited to 'src/leap') diff --git a/src/leap/common/events/client.py b/src/leap/common/events/client.py index 4790fc3..e085f5b 100644 --- a/src/leap/common/events/client.py +++ b/src/leap/common/events/client.py @@ -508,8 +508,9 @@ def register(event, callback, uid=None, replace=False): :raises CallbackAlreadyRegisteredError: when there's already a callback identified by the given uid and replace is False. """ - return EventsClientThread.instance().register( - event, callback, uid=uid, replace=replace) + if flags.EVENTS_ENABLED: + return EventsClientThread.instance().register( + event, callback, uid=uid, replace=replace) def unregister(event, uid=None): @@ -524,7 +525,8 @@ def unregister(event, uid=None): :param uid: The callback uid. :type uid: str """ - return EventsClientThread.instance().unregister(event, uid=uid) + if flags.EVENTS_ENABLED: + return EventsClientThread.instance().unregister(event, uid=uid) def emit(event, *content): -- cgit v1.2.3 From 0a8f455965fff778d993912f1dc11a77db264a93 Mon Sep 17 00:00:00 2001 From: Bruno Wagner Date: Tue, 4 Aug 2015 18:44:21 -0300 Subject: [bug] HTTP timeout was not being cleared on abort In case the http client loses connection, it has to clear it's timeout or the reactor will be left in a dirty state Fixing this solves a problem with some of the tests in Soledad that were trying to run on a dirty reactor --- src/leap/common/http.py | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) (limited to 'src/leap') diff --git a/src/leap/common/http.py b/src/leap/common/http.py index 1e7ded7..cda8ee8 100644 --- a/src/leap/common/http.py +++ b/src/leap/common/http.py @@ -302,23 +302,16 @@ class _HTTP11ClientProtocol(HTTP11ClientProtocol): """ Cancel the request timeout, when it's finished. """ - if self._timeoutCall.active(): + if self._timeoutCall and self._timeoutCall.active(): self._timeoutCall.cancel() self._timeoutCall = None - def _finishResponse_WAITING(self, rest): - """ - Cancel the timeout when finished receiving the response. - """ - self._cancelTimeout() - HTTP11ClientProtocol._finishResponse_WAITING(self, rest) - - def _finishResponse_TRANSMITTING(self, rest): + def _finishResponse(self, rest): """ Cancel the timeout when finished receiving the response. """ self._cancelTimeout() - HTTP11ClientProtocol._finishResponse_TRANSMITTING(self, rest) + HTTP11ClientProtocol._finishResponse(self, rest) def dataReceived(self, bytes): """ @@ -330,3 +323,7 @@ class _HTTP11ClientProtocol(HTTP11ClientProtocol): HTTP11ClientProtocol.dataReceived(self, bytes) if self._timeoutCall and self._timeoutCall.active(): self._timeoutCall.reset(self._timeout) + + def connectionLost(self, reason): + self._cancelTimeout() + return HTTP11ClientProtocol.connectionLost(self, reason) -- cgit v1.2.3 From 8965be42773e3e6c7e154f2c1a27f4e34031ae91 Mon Sep 17 00:00:00 2001 From: drebs Date: Tue, 11 Aug 2015 11:44:42 -0300 Subject: [feature] allow passing callback to http client --- src/leap/common/http.py | 26 ++++++++++++++++++++++---- 1 file changed, 22 insertions(+), 4 deletions(-) (limited to 'src/leap') diff --git a/src/leap/common/http.py b/src/leap/common/http.py index cda8ee8..6fc10b4 100644 --- a/src/leap/common/http.py +++ b/src/leap/common/http.py @@ -31,6 +31,7 @@ except ImportError: from leap.common.certs import get_compatible_ssl_context_factory +from leap.common.check import leap_assert from zope.interface import implements @@ -150,7 +151,7 @@ class HTTPClient(object): pool.maxPersistentPerHost = maxPersistentPerHost return pool - def _request(self, url, method, body, headers): + def _request(self, url, method, body, headers, callback): """ Perform an HTTP request. @@ -162,6 +163,9 @@ class HTTPClient(object): :type body: str :param headers: The headers of the request. :type headers: dict + :param callback: A callback to be added to the request's deferred + callback chain. + :type callback: callable :return: A deferred that fires with the body of the request. :rtype: twisted.internet.defer.Deferred @@ -170,14 +174,21 @@ class HTTPClient(object): body = _StringBodyProducer(body) d = self._agent.request( method, url, headers=Headers(headers), bodyProducer=body) - d.addCallback(readBody) + d.addCallback(callback) return d - def request(self, url, method='GET', body=None, headers={}): + def request(self, url, method='GET', body=None, headers={}, + callback=readBody): """ Perform an HTTP request, but limit the maximum amount of concurrent connections. + May be passed a callback to be added to the request's deferred + callback chain. The callback is expected to receive the response of + the request and may do whatever it wants with the response. By + default, if no callback is passed, we will use a simple body reader + which returns a deferred that is fired with the body of the response. + :param url: The URL for the request. :type url: str :param method: The HTTP method of the request. @@ -186,11 +197,18 @@ class HTTPClient(object): :type body: str :param headers: The headers of the request. :type headers: dict + :param callback: A callback to be added to the request's deferred + callback chain. + :type callback: callable :return: A deferred that fires with the body of the request. :rtype: twisted.internet.defer.Deferred """ - return self._semaphore.run(self._request, url, method, body, headers) + leap_assert( + callable(callback), + message="The callback parameter should be a callable!") + return self._semaphore.run(self._request, url, method, body, headers, + callback) def close(self): """ -- cgit v1.2.3 From 97096fa2c22e96ec238c38ad3befdd052b5028cf Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Mon, 17 Aug 2015 19:15:05 -0400 Subject: [style] pep8 fix --- src/leap/common/http.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'src/leap') diff --git a/src/leap/common/http.py b/src/leap/common/http.py index 6fc10b4..0dee3a2 100644 --- a/src/leap/common/http.py +++ b/src/leap/common/http.py @@ -20,6 +20,7 @@ Twisted HTTP/HTTPS client. try: import twisted + assert twisted except ImportError: print "*******" print "Twisted is needed to use leap.common.http module" @@ -178,7 +179,7 @@ class HTTPClient(object): return d def request(self, url, method='GET', body=None, headers={}, - callback=readBody): + callback=readBody): """ Perform an HTTP request, but limit the maximum amount of concurrent connections. @@ -208,7 +209,7 @@ class HTTPClient(object): callable(callback), message="The callback parameter should be a callable!") return self._semaphore.run(self._request, url, method, body, headers, - callback) + callback) def close(self): """ -- cgit v1.2.3