[bug] avoid the events server to block twistd daemon
[leap_pycommon.git] / src / leap / common / events / zmq_components.py
1 # -*- coding: utf-8 -*-
2 # zmq.py
3 # Copyright (C) 2015, 2016 LEAP
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 """
18 The server for the events mechanism.
19 """
20 import os
21 import logging
22 import txzmq
23 import re
24 import time
25
26
27 from abc import ABCMeta
28
29 # XXX some distros don't package libsodium, so we have to be prepared for
30 #     absence of zmq.auth
31 try:
32     import zmq.auth
33     from zmq.auth.thread import ThreadAuthenticator
34 except ImportError:
35     pass
36
37 from txzmq.connection import ZmqEndpoint, ZmqEndpointType
38
39 from leap.common.config import flags, get_path_prefix
40 from leap.common.zmq_utils import zmq_has_curve
41
42 from leap.common.zmq_utils import maybe_create_and_get_certificates
43 from leap.common.zmq_utils import PUBLIC_KEYS_PREFIX
44
45
46 logger = logging.getLogger(__name__)
47
48
49 ADDRESS_RE = re.compile("^([a-z]+)://([^:]+):?(\d+)?$")
50
51
52 class TxZmqComponent(object):
53     """
54     A twisted-powered zmq events component.
55     """
56     _factory = txzmq.ZmqFactory()
57     _factory.registerForShutdown()
58
59     __metaclass__ = ABCMeta
60
61     _component_type = None
62
63     def __init__(self, path_prefix=None, enable_curve=True):
64         """
65         Initialize the txzmq component.
66         """
67         if path_prefix is None:
68             path_prefix = get_path_prefix(flags.STANDALONE)
69         self._config_prefix = os.path.join(path_prefix, "leap", "events")
70         self._connections = []
71         if enable_curve:
72             self.use_curve = zmq_has_curve()
73         else:
74             self.use_curve = False
75
76     @property
77     def component_type(self):
78         if not self._component_type:
79             raise Exception(
80                 "Make sure implementations of TxZmqComponent"
81                 "define a self._component_type!")
82         return self._component_type
83
84     def _zmq_connect(self, connClass, address):
85         """
86         Connect to an address.
87
88         :param connClass: The connection class to be used.
89         :type connClass: txzmq.ZmqConnection
90         :param address: The address to connect to.
91         :type address: str
92
93         :return: The binded connection.
94         :rtype: txzmq.ZmqConnection
95         """
96         endpoint = ZmqEndpoint(ZmqEndpointType.connect, address)
97         connection = connClass(self._factory)
98
99         if self.use_curve:
100             socket = connection.socket
101             public, secret = maybe_create_and_get_certificates(
102                 self._config_prefix, self.component_type)
103             server_public_file = os.path.join(
104                 self._config_prefix, PUBLIC_KEYS_PREFIX, "server.key")
105
106             server_public, _ = zmq.auth.load_certificate(server_public_file)
107             socket.curve_publickey = public
108             socket.curve_secretkey = secret
109             socket.curve_serverkey = server_public
110
111         connection.addEndpoints([endpoint])
112         return connection
113
114     def _zmq_bind(self, connClass, address):
115         """
116         Bind to an address.
117
118         :param connClass: The connection class to be used.
119         :type connClass: txzmq.ZmqConnection
120         :param address: The address to bind to.
121         :type address: str
122
123         :return: The binded connection and port.
124         :rtype: (txzmq.ZmqConnection, int)
125         """
126         proto, addr, port = ADDRESS_RE.search(address).groups()
127
128         endpoint = ZmqEndpoint(ZmqEndpointType.bind, address)
129         connection = connClass(self._factory)
130
131         if self.use_curve:
132             socket = connection.socket
133
134             public, secret = maybe_create_and_get_certificates(
135                 self._config_prefix, self.component_type)
136             socket.curve_publickey = public
137             socket.curve_secretkey = secret
138             self._start_thread_auth(connection.socket)
139
140         connection.addEndpoints([endpoint])
141         return connection, port
142
143     def _start_thread_auth(self, socket):
144         """
145         Start the zmq curve thread authenticator.
146
147         :param socket: The socket in which to configure the authenticator.
148         :type socket: zmq.Socket
149         """
150         # TODO re-implement without threads.
151         logger.debug("Starting thread authenticator...")
152         authenticator = ThreadAuthenticator(self._factory.context)
153
154         # Temporary fix until we understand what the problem is
155         # See https://leap.se/code/issues/7536
156         time.sleep(0.5)
157
158         authenticator.start()
159         # XXX do not hardcode this here.
160         authenticator.allow('127.0.0.1')
161         # tell authenticator to use the certificate in a directory
162         public_keys_dir = os.path.join(self._config_prefix, PUBLIC_KEYS_PREFIX)
163         authenticator.configure_curve(domain="*", location=public_keys_dir)
164         socket.curve_server = True  # must come before bind
165
166
167 class TxZmqServerComponent(TxZmqComponent):
168     """
169     A txZMQ server component.
170     """
171
172     _component_type = "server"
173
174
175 class TxZmqClientComponent(TxZmqComponent):
176     """
177     A txZMQ client component.
178     """
179
180     _component_type = "client"