blob: 8630a3606874ca0db32345336bb70d6528933916 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
# coding: utf-8
"""miscellaneous zmq_utils wrapping"""
# Copyright (C) PyZMQ Developers
# Distributed under the terms of the Modified BSD License.
from ._cffi import ffi, C
from zmq.error import ZMQError, _check_rc, _check_version
def curve_keypair():
"""generate a Z85 keypair for use with zmq.CURVE security
Requires libzmq (≥ 4.0) to have been linked with libsodium.
Returns
-------
(public, secret) : two bytestrings
The public and private keypair as 40 byte z85-encoded bytestrings.
"""
_check_version((3,2), "monitor")
public = ffi.new('char[64]')
private = ffi.new('char[64]')
rc = C.zmq_curve_keypair(public, private)
_check_rc(rc)
return ffi.buffer(public)[:40], ffi.buffer(private)[:40]
class Stopwatch(object):
def __init__(self):
self.watch = ffi.NULL
def start(self):
if self.watch == ffi.NULL:
self.watch = C.zmq_stopwatch_start()
else:
raise ZMQError('Stopwatch is already runing.')
def stop(self):
if self.watch == ffi.NULL:
raise ZMQError('Must start the Stopwatch before calling stop.')
else:
time = C.zmq_stopwatch_stop(self.watch)
self.watch = ffi.NULL
return time
def sleep(self, seconds):
C.zmq_sleep(seconds)
__all__ = ['curve_keypair', 'Stopwatch']
|