[tests] adapt events tests to recent changes
[leap_pycommon.git] / src / leap / common / zmq_utils.py
1 # -*- coding: utf-8 -*-
2 # zmq.py
3 # Copyright (C) 2013, 2014 LEAP
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 """
18 Utilities to handle ZMQ certificates.
19 """
20 import os
21 import logging
22 import platform
23 import stat
24 import shutil
25
26 import zmq
27
28 try:
29     import zmq.auth
30 except ImportError:
31     pass
32
33 from leap.common.files import mkdir_p
34 from leap.common.check import leap_assert
35
36 logger = logging.getLogger(__name__)
37
38
39 KEYS_PREFIX = "zmq_certificates"
40 PUBLIC_KEYS_PREFIX = os.path.join(KEYS_PREFIX, "public_keys")
41 PRIVATE_KEYS_PREFIX = os.path.join(KEYS_PREFIX, "private_keys")
42
43
44 def zmq_has_curve():
45     """
46     Return whether the current ZMQ has support for auth and CurveZMQ security.
47
48     :rtype: bool
49
50      Version notes:
51        `zmq.curve_keypair()` is new in version 14.0, new in version libzmq-4.0.
52             Requires libzmq (>= 4.0) to have been linked with libsodium.
53        `zmq.auth` module is new in version 14.1
54        `zmq.has()` is new in version 14.1, new in version libzmq-4.1.
55     """
56     if platform.system() == "Windows":
57         # TODO: curve is not working on windows #7919
58         return False
59
60     zmq_version = zmq.zmq_version_info()
61     pyzmq_version = zmq.pyzmq_version_info()
62
63     if pyzmq_version >= (14, 1, 0) and zmq_version >= (4, 1):
64         return zmq.has('curve')
65
66     if pyzmq_version < (14, 1, 0):
67         return False
68
69     if zmq_version < (4, 0):
70         # security is new in libzmq 4.0
71         return False
72
73     try:
74         zmq.curve_keypair()
75     except zmq.error.ZMQError:
76         # security requires libzmq to be linked against libsodium
77         return False
78
79     return True
80
81
82 def assert_zmq_has_curve():
83     leap_assert(zmq_has_curve, "CurveZMQ not supported!")
84
85
86 def maybe_create_and_get_certificates(basedir, name):
87     """
88     Generate the needed ZMQ certificates for backend/frontend communication if
89     needed.
90     """
91     assert_zmq_has_curve()
92     private_keys_dir = os.path.join(basedir, PRIVATE_KEYS_PREFIX)
93     private_key = os.path.join(
94         private_keys_dir, name + ".key_secret")
95     if not os.path.isfile(private_key):
96         mkdir_p(private_keys_dir)
97         zmq.auth.create_certificates(private_keys_dir, name)
98         # set permissions to: 0700 (U:rwx G:--- O:---)
99         os.chmod(private_key, stat.S_IRUSR | stat.S_IWUSR)
100         # move public key to public keys directory
101         public_keys_dir = os.path.join(basedir, PUBLIC_KEYS_PREFIX)
102         old_public_key = os.path.join(
103             private_keys_dir, name + ".key")
104         new_public_key = os.path.join(
105             public_keys_dir, name + ".key")
106         mkdir_p(public_keys_dir)
107         shutil.move(old_public_key, new_public_key)
108     return zmq.auth.load_certificate(private_key)