1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
|
"""0MQ Socket class."""
#
# Copyright (c) 2010-2011 Brian E. Granger & Min Ragan-Kelley
#
# This file is part of pyzmq.
#
# pyzmq is free software; you can redistribute it and/or modify it under
# the terms of the Lesser GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# pyzmq is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# Lesser GNU General Public License for more details.
#
# You should have received a copy of the Lesser GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
#-----------------------------------------------------------------------------
# Cython Imports
#-----------------------------------------------------------------------------
# get version-independent aliases:
cdef extern from "pyversion_compat.h":
pass
from libc.errno cimport ENAMETOOLONG
from libc.string cimport memcpy
from cpython cimport PyBytes_FromStringAndSize
from cpython cimport PyBytes_AsString, PyBytes_Size
from cpython cimport Py_DECREF, Py_INCREF
from buffers cimport asbuffer_r, viewfromobject_r
from libzmq cimport *
from message cimport Frame, copy_zmq_msg_bytes
from context cimport Context
cdef extern from "Python.h":
ctypedef int Py_ssize_t
cdef extern from "ipcmaxlen.h":
int get_ipc_path_max_len()
cdef extern from "getpid_compat.h":
int getpid()
#-----------------------------------------------------------------------------
# Python Imports
#-----------------------------------------------------------------------------
import copy as copy_mod
import time
import sys
import random
import struct
import codecs
from zmq.utils import jsonapi
try:
import cPickle
pickle = cPickle
except:
cPickle = None
import pickle
import zmq
from zmq.backend.cython import constants
from zmq.backend.cython.constants import *
from zmq.backend.cython.checkrc cimport _check_rc
from zmq.error import ZMQError, ZMQBindError, _check_version
from zmq.utils.strtypes import bytes,unicode,basestring
#-----------------------------------------------------------------------------
# Code
#-----------------------------------------------------------------------------
IPC_PATH_MAX_LEN = get_ipc_path_max_len()
# inline some small socket submethods:
# true methods frequently cannot be inlined, acc. Cython docs
cdef inline _check_closed(Socket s):
"""raise ENOTSUP if socket is closed
Does not do a deep check
"""
if s._closed:
raise ZMQError(ENOTSOCK)
cdef inline _check_closed_deep(Socket s):
"""thorough check of whether the socket has been closed,
even if by another entity (e.g. ctx.destroy).
Only used by the `closed` property.
returns True if closed, False otherwise
"""
cdef int rc
cdef int errno
cdef int stype
cdef size_t sz=sizeof(int)
if s._closed:
return True
else:
rc = zmq_getsockopt(s.handle, ZMQ_TYPE, <void *>&stype, &sz)
if rc < 0 and zmq_errno() == ENOTSOCK:
s._closed = True
return True
else:
_check_rc(rc)
return False
cdef inline Frame _recv_frame(void *handle, int flags=0, track=False):
"""Receive a message in a non-copying manner and return a Frame."""
cdef int rc
msg = zmq.Frame(track=track)
cdef Frame cmsg = msg
with nogil:
rc = zmq_msg_recv(&cmsg.zmq_msg, handle, flags)
_check_rc(rc)
return msg
cdef inline object _recv_copy(void *handle, int flags=0):
"""Receive a message and return a copy"""
cdef zmq_msg_t zmq_msg
with nogil:
zmq_msg_init (&zmq_msg)
rc = zmq_msg_recv(&zmq_msg, handle, flags)
_check_rc(rc)
msg_bytes = copy_zmq_msg_bytes(&zmq_msg)
zmq_msg_close(&zmq_msg)
return msg_bytes
cdef inline object _send_frame(void *handle, Frame msg, int flags=0):
"""Send a Frame on this socket in a non-copy manner."""
cdef int rc
cdef Frame msg_copy
# Always copy so the original message isn't garbage collected.
# This doesn't do a real copy, just a reference.
msg_copy = msg.fast_copy()
with nogil:
rc = zmq_msg_send(&msg_copy.zmq_msg, handle, flags)
_check_rc(rc)
return msg.tracker
cdef inline object _send_copy(void *handle, object msg, int flags=0):
"""Send a message on this socket by copying its content."""
cdef int rc, rc2
cdef zmq_msg_t data
cdef char *msg_c
cdef Py_ssize_t msg_c_len=0
# copy to c array:
asbuffer_r(msg, <void **>&msg_c, &msg_c_len)
# Copy the msg before sending. This avoids any complications with
# the GIL, etc.
# If zmq_msg_init_* fails we must not call zmq_msg_close (Bus Error)
rc = zmq_msg_init_size(&data, msg_c_len)
_check_rc(rc)
with nogil:
memcpy(zmq_msg_data(&data), msg_c, zmq_msg_size(&data))
rc = zmq_msg_send(&data, handle, flags)
rc2 = zmq_msg_close(&data)
_check_rc(rc)
_check_rc(rc2)
cdef class Socket:
"""Socket(context, socket_type)
A 0MQ socket.
These objects will generally be constructed via the socket() method of a Context object.
Note: 0MQ Sockets are *not* threadsafe. **DO NOT** share them across threads.
Parameters
----------
context : Context
The 0MQ Context this Socket belongs to.
socket_type : int
The socket type, which can be any of the 0MQ socket types:
REQ, REP, PUB, SUB, PAIR, DEALER, ROUTER, PULL, PUSH, XPUB, XSUB.
See Also
--------
.Context.socket : method for creating a socket bound to a Context.
"""
# no-op for the signature
def __init__(self, context=None, socket_type=-1, shadow=0):
pass
def __cinit__(self, Context context=None, int socket_type=-1, size_t shadow=0, *args, **kwargs):
cdef Py_ssize_t c_handle
self.handle = NULL
self.context = context
if shadow:
self._shadow = True
self.handle = <void *>shadow
else:
if context is None:
raise TypeError("context must be specified")
if socket_type < 0:
raise TypeError("socket_type must be specified")
self._shadow = False
self.handle = zmq_socket(context.handle, socket_type)
if self.handle == NULL:
raise ZMQError()
self._closed = False
self._pid = getpid()
if context:
context._add_socket(self.handle)
def __dealloc__(self):
"""remove from context's list
But be careful that context might not exist if called during gc
"""
if self.handle != NULL and not self._shadow and getpid() == self._pid:
# during gc, self.context might be NULL
if self.context and not self.context.closed:
self.context._remove_socket(self.handle)
@property
def underlying(self):
"""The address of the underlying libzmq socket"""
return <size_t> self.handle
@property
def closed(self):
return _check_closed_deep(self)
def close(self, linger=None):
"""s.close(linger=None)
Close the socket.
If linger is specified, LINGER sockopt will be set prior to closing.
This can be called to close the socket by hand. If this is not
called, the socket will automatically be closed when it is
garbage collected.
"""
cdef int rc=0
cdef int linger_c
cdef bint setlinger=False
if linger is not None:
linger_c = linger
setlinger=True
if self.handle != NULL and not self._closed and getpid() == self._pid:
if setlinger:
zmq_setsockopt(self.handle, ZMQ_LINGER, &linger_c, sizeof(int))
rc = zmq_close(self.handle)
if rc != 0 and zmq_errno() != ENOTSOCK:
# ignore ENOTSOCK (closed by Context)
_check_rc(rc)
self._closed = True
# during gc, self.context might be NULL
if self.context:
self.context._remove_socket(self.handle)
self.handle = NULL
def set(self, int option, optval):
"""s.set(option, optval)
Set socket options.
See the 0MQ API documentation for details on specific options.
Parameters
----------
option : int
The option to set. Available values will depend on your
version of libzmq. Examples include::
zmq.SUBSCRIBE, UNSUBSCRIBE, IDENTITY, HWM, LINGER, FD
optval : int or bytes
The value of the option to set.
"""
cdef int64_t optval_int64_c
cdef int optval_int_c
cdef int rc
cdef char* optval_c
cdef Py_ssize_t sz
_check_closed(self)
if isinstance(optval, unicode):
raise TypeError("unicode not allowed, use setsockopt_string")
if option in zmq.constants.bytes_sockopts:
if not isinstance(optval, bytes):
raise TypeError('expected bytes, got: %r' % optval)
optval_c = PyBytes_AsString(optval)
sz = PyBytes_Size(optval)
rc = zmq_setsockopt(
self.handle, option,
optval_c, sz
)
elif option in zmq.constants.int64_sockopts:
if not isinstance(optval, int):
raise TypeError('expected int, got: %r' % optval)
optval_int64_c = optval
rc = zmq_setsockopt(
self.handle, option,
&optval_int64_c, sizeof(int64_t)
)
else:
# default is to assume int, which is what most new sockopts will be
# this lets pyzmq work with newer libzmq which may add constants
# pyzmq has not yet added, rather than artificially raising. Invalid
# sockopts will still raise just the same, but it will be libzmq doing
# the raising.
if not isinstance(optval, int):
raise TypeError('expected int, got: %r' % optval)
optval_int_c = optval
rc = zmq_setsockopt(
self.handle, option,
&optval_int_c, sizeof(int)
)
_check_rc(rc)
def get(self, int option):
"""s.get(option)
Get the value of a socket option.
See the 0MQ API documentation for details on specific options.
Parameters
----------
option : int
The option to get. Available values will depend on your
version of libzmq. Examples include::
zmq.IDENTITY, HWM, LINGER, FD, EVENTS
Returns
-------
optval : int or bytes
The value of the option as a bytestring or int.
"""
cdef int64_t optval_int64_c
cdef int optval_int_c
cdef fd_t optval_fd_c
cdef char identity_str_c [255]
cdef size_t sz
cdef int rc
_check_closed(self)
if option in zmq.constants.bytes_sockopts:
sz = 255
rc = zmq_getsockopt(self.handle, option, <void *>identity_str_c, &sz)
_check_rc(rc)
# strip null-terminated strings *except* identity
if option != ZMQ_IDENTITY and sz > 0 and (<char *>identity_str_c)[sz-1] == b'\0':
sz -= 1
result = PyBytes_FromStringAndSize(<char *>identity_str_c, sz)
elif option in zmq.constants.int64_sockopts:
sz = sizeof(int64_t)
rc = zmq_getsockopt(self.handle, option, <void *>&optval_int64_c, &sz)
_check_rc(rc)
result = optval_int64_c
elif option in zmq.constants.fd_sockopts:
sz = sizeof(fd_t)
rc = zmq_getsockopt(self.handle, option, <void *>&optval_fd_c, &sz)
_check_rc(rc)
result = optval_fd_c
else:
# default is to assume int, which is what most new sockopts will be
# this lets pyzmq work with newer libzmq which may add constants
# pyzmq has not yet added, rather than artificially raising. Invalid
# sockopts will still raise just the same, but it will be libzmq doing
# the raising.
sz = sizeof(int)
rc = zmq_getsockopt(self.handle, option, <void *>&optval_int_c, &sz)
_check_rc(rc)
result = optval_int_c
return result
def bind(self, addr):
"""s.bind(addr)
Bind the socket to an address.
This causes the socket to listen on a network port. Sockets on the
other side of this connection will use ``Socket.connect(addr)`` to
connect to this socket.
Parameters
----------
addr : str
The address string. This has the form 'protocol://interface:port',
for example 'tcp://127.0.0.1:5555'. Protocols supported include
tcp, udp, pgm, epgm, inproc and ipc. If the address is unicode, it is
encoded to utf-8 first.
"""
cdef int rc
cdef char* c_addr
_check_closed(self)
if isinstance(addr, unicode):
addr = addr.encode('utf-8')
if not isinstance(addr, bytes):
raise TypeError('expected str, got: %r' % addr)
c_addr = addr
rc = zmq_bind(self.handle, c_addr)
if rc != 0:
if IPC_PATH_MAX_LEN and zmq_errno() == ENAMETOOLONG:
# py3compat: addr is bytes, but msg wants str
if str is unicode:
addr = addr.decode('utf-8', 'replace')
path = addr.split('://', 1)[-1]
msg = ('ipc path "{0}" is longer than {1} '
'characters (sizeof(sockaddr_un.sun_path)). '
'zmq.IPC_PATH_MAX_LEN constant can be used '
'to check addr length (if it is defined).'
.format(path, IPC_PATH_MAX_LEN))
raise ZMQError(msg=msg)
_check_rc(rc)
def connect(self, addr):
"""s.connect(addr)
Connect to a remote 0MQ socket.
Parameters
----------
addr : str
The address string. This has the form 'protocol://interface:port',
for example 'tcp://127.0.0.1:5555'. Protocols supported are
tcp, upd, pgm, inproc and ipc. If the address is unicode, it is
encoded to utf-8 first.
"""
cdef int rc
cdef char* c_addr
_check_closed(self)
if isinstance(addr, unicode):
addr = addr.encode('utf-8')
if not isinstance(addr, bytes):
raise TypeError('expected str, got: %r' % addr)
c_addr = addr
rc = zmq_connect(self.handle, c_addr)
if rc != 0:
raise ZMQError()
def unbind(self, addr):
"""s.unbind(addr)
Unbind from an address (undoes a call to bind).
.. versionadded:: libzmq-3.2
.. versionadded:: 13.0
Parameters
----------
addr : str
The address string. This has the form 'protocol://interface:port',
for example 'tcp://127.0.0.1:5555'. Protocols supported are
tcp, upd, pgm, inproc and ipc. If the address is unicode, it is
encoded to utf-8 first.
"""
cdef int rc
cdef char* c_addr
_check_version((3,2), "unbind")
_check_closed(self)
if isinstance(addr, unicode):
addr = addr.encode('utf-8')
if not isinstance(addr, bytes):
raise TypeError('expected str, got: %r' % addr)
c_addr = addr
rc = zmq_unbind(self.handle, c_addr)
if rc != 0:
raise ZMQError()
def disconnect(self, addr):
"""s.disconnect(addr)
Disconnect from a remote 0MQ socket (undoes a call to connect).
.. versionadded:: libzmq-3.2
.. versionadded:: 13.0
Parameters
----------
addr : str
The address string. This has the form 'protocol://interface:port',
for example 'tcp://127.0.0.1:5555'. Protocols supported are
tcp, upd, pgm, inproc and ipc. If the address is unicode, it is
encoded to utf-8 first.
"""
cdef int rc
cdef char* c_addr
_check_version((3,2), "disconnect")
_check_closed(self)
if isinstance(addr, unicode):
addr = addr.encode('utf-8')
if not isinstance(addr, bytes):
raise TypeError('expected str, got: %r' % addr)
c_addr = addr
rc = zmq_disconnect(self.handle, c_addr)
if rc != 0:
raise ZMQError()
def monitor(self, addr, int events=ZMQ_EVENT_ALL):
"""s.monitor(addr, flags)
Start publishing socket events on inproc.
See libzmq docs for zmq_monitor for details.
While this function is available from libzmq 3.2,
pyzmq cannot parse monitor messages from libzmq prior to 4.0.
.. versionadded: libzmq-3.2
.. versionadded: 14.0
Parameters
----------
addr : str
The inproc url used for monitoring. Passing None as
the addr will cause an existing socket monitor to be
deregistered.
events : int [default: zmq.EVENT_ALL]
The zmq event bitmask for which events will be sent to the monitor.
"""
cdef int rc, c_flags
cdef char* c_addr = NULL
_check_version((3,2), "monitor")
if addr is not None:
if isinstance(addr, unicode):
addr = addr.encode('utf-8')
if not isinstance(addr, bytes):
raise TypeError('expected str, got: %r' % addr)
c_addr = addr
c_flags = events
rc = zmq_socket_monitor(self.handle, c_addr, c_flags)
_check_rc(rc)
#-------------------------------------------------------------------------
# Sending and receiving messages
#-------------------------------------------------------------------------
cpdef object send(self, object data, int flags=0, copy=True, track=False):
"""s.send(data, flags=0, copy=True, track=False)
Send a message on this socket.
This queues the message to be sent by the IO thread at a later time.
Parameters
----------
data : object, str, Frame
The content of the message.
flags : int
Any supported flag: NOBLOCK, SNDMORE.
copy : bool
Should the message be sent in a copying or non-copying manner.
track : bool
Should the message be tracked for notification that ZMQ has
finished with it? (ignored if copy=True)
Returns
-------
None : if `copy` or not track
None if message was sent, raises an exception otherwise.
MessageTracker : if track and not copy
a MessageTracker object, whose `pending` property will
be True until the send is completed.
Raises
------
TypeError
If a unicode object is passed
ValueError
If `track=True`, but an untracked Frame is passed.
ZMQError
If the send does not succeed for any reason.
"""
_check_closed(self)
if isinstance(data, unicode):
raise TypeError("unicode not allowed, use send_string")
if copy:
# msg.bytes never returns the input data object
# it is always a copy, but always the same copy
if isinstance(data, Frame):
data = data.buffer
return _send_copy(self.handle, data, flags)
else:
if isinstance(data, Frame):
if track and not data.tracker:
raise ValueError('Not a tracked message')
msg = data
else:
msg = Frame(data, track=track)
return _send_frame(self.handle, msg, flags)
cpdef object recv(self, int flags=0, copy=True, track=False):
"""s.recv(flags=0, copy=True, track=False)
Receive a message.
Parameters
----------
flags : int
Any supported flag: NOBLOCK. If NOBLOCK is set, this method
will raise a ZMQError with EAGAIN if a message is not ready.
If NOBLOCK is not set, then this method will block until a
message arrives.
copy : bool
Should the message be received in a copying or non-copying manner?
If False a Frame object is returned, if True a string copy of
message is returned.
track : bool
Should the message be tracked for notification that ZMQ has
finished with it? (ignored if copy=True)
Returns
-------
msg : bytes, Frame
The received message frame. If `copy` is False, then it will be a Frame,
otherwise it will be bytes.
Raises
------
ZMQError
for any of the reasons zmq_msg_recv might fail.
"""
_check_closed(self)
if copy:
return _recv_copy(self.handle, flags)
else:
frame = _recv_frame(self.handle, flags, track)
frame.more = self.getsockopt(zmq.RCVMORE)
return frame
__all__ = ['Socket', 'IPC_PATH_MAX_LEN']
|