summaryrefslogtreecommitdiff
path: root/src/socket_base.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/socket_base.cpp')
-rw-r--r--src/socket_base.cpp1283
1 files changed, 1283 insertions, 0 deletions
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
new file mode 100644
index 0000000..eac2882
--- /dev/null
+++ b/src/socket_base.cpp
@@ -0,0 +1,1283 @@
+/*
+ Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include <new>
+#include <string>
+#include <algorithm>
+
+#include "platform.hpp"
+
+#if defined ZMQ_HAVE_WINDOWS
+#include "windows.hpp"
+#if defined _MSC_VER
+#if defined _WIN32_WCE
+#include <cmnintrin.h>
+#else
+#include <intrin.h>
+#endif
+#endif
+#else
+#include <unistd.h>
+#endif
+
+#include "socket_base.hpp"
+#include "tcp_listener.hpp"
+#include "ipc_listener.hpp"
+#include "tcp_connecter.hpp"
+#include "io_thread.hpp"
+#include "session_base.hpp"
+#include "config.hpp"
+#include "pipe.hpp"
+#include "err.hpp"
+#include "ctx.hpp"
+#include "platform.hpp"
+#include "likely.hpp"
+#include "msg.hpp"
+#include "address.hpp"
+#include "ipc_address.hpp"
+#include "tcp_address.hpp"
+#ifdef ZMQ_HAVE_OPENPGM
+#include "pgm_socket.hpp"
+#endif
+
+#include "pair.hpp"
+#include "pub.hpp"
+#include "sub.hpp"
+#include "req.hpp"
+#include "rep.hpp"
+#include "pull.hpp"
+#include "push.hpp"
+#include "dealer.hpp"
+#include "router.hpp"
+#include "xpub.hpp"
+#include "xsub.hpp"
+#include "stream.hpp"
+
+bool zmq::socket_base_t::check_tag ()
+{
+ return tag == 0xbaddecaf;
+}
+
+zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_,
+ uint32_t tid_, int sid_)
+{
+ socket_base_t *s = NULL;
+ switch (type_) {
+ case ZMQ_PAIR:
+ s = new (std::nothrow) pair_t (parent_, tid_, sid_);
+ break;
+ case ZMQ_PUB:
+ s = new (std::nothrow) pub_t (parent_, tid_, sid_);
+ break;
+ case ZMQ_SUB:
+ s = new (std::nothrow) sub_t (parent_, tid_, sid_);
+ break;
+ case ZMQ_REQ:
+ s = new (std::nothrow) req_t (parent_, tid_, sid_);
+ break;
+ case ZMQ_REP:
+ s = new (std::nothrow) rep_t (parent_, tid_, sid_);
+ break;
+ case ZMQ_DEALER:
+ s = new (std::nothrow) dealer_t (parent_, tid_, sid_);
+ break;
+ case ZMQ_ROUTER:
+ s = new (std::nothrow) router_t (parent_, tid_, sid_);
+ break;
+ case ZMQ_PULL:
+ s = new (std::nothrow) pull_t (parent_, tid_, sid_);
+ break;
+ case ZMQ_PUSH:
+ s = new (std::nothrow) push_t (parent_, tid_, sid_);
+ break;
+ case ZMQ_XPUB:
+ s = new (std::nothrow) xpub_t (parent_, tid_, sid_);
+ break;
+ case ZMQ_XSUB:
+ s = new (std::nothrow) xsub_t (parent_, tid_, sid_);
+ break;
+ case ZMQ_STREAM:
+ s = new (std::nothrow) stream_t (parent_, tid_, sid_);
+ break;
+ default:
+ errno = EINVAL;
+ return NULL;
+ }
+ if (s->mailbox.get_fd () == retired_fd)
+ return NULL;
+
+ alloc_assert (s);
+ return s;
+}
+
+zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_) :
+ own_t (parent_, tid_),
+ tag (0xbaddecaf),
+ ctx_terminated (false),
+ destroyed (false),
+ last_tsc (0),
+ ticks (0),
+ rcvmore (false),
+ monitor_socket (NULL),
+ monitor_events (0)
+{
+ options.socket_id = sid_;
+ options.ipv6 = (parent_->get (ZMQ_IPV6) != 0);
+}
+
+zmq::socket_base_t::~socket_base_t ()
+{
+ stop_monitor ();
+ zmq_assert (destroyed);
+}
+
+zmq::mailbox_t *zmq::socket_base_t::get_mailbox ()
+{
+ return &mailbox;
+}
+
+void zmq::socket_base_t::stop ()
+{
+ // Called by ctx when it is terminated (zmq_term).
+ // 'stop' command is sent from the threads that called zmq_term to
+ // the thread owning the socket. This way, blocking call in the
+ // owner thread can be interrupted.
+ send_stop ();
+}
+
+int zmq::socket_base_t::parse_uri (const char *uri_,
+ std::string &protocol_, std::string &address_)
+{
+ zmq_assert (uri_ != NULL);
+
+ std::string uri (uri_);
+ std::string::size_type pos = uri.find ("://");
+ if (pos == std::string::npos) {
+ errno = EINVAL;
+ return -1;
+ }
+ protocol_ = uri.substr (0, pos);
+ address_ = uri.substr (pos + 3);
+
+ if (protocol_.empty () || address_.empty ()) {
+ errno = EINVAL;
+ return -1;
+ }
+ return 0;
+}
+
+int zmq::socket_base_t::check_protocol (const std::string &protocol_)
+{
+ // First check out whether the protcol is something we are aware of.
+ if (protocol_ != "inproc" && protocol_ != "ipc" && protocol_ != "tcp" &&
+ protocol_ != "pgm" && protocol_ != "epgm") {
+ errno = EPROTONOSUPPORT;
+ return -1;
+ }
+
+ // If 0MQ is not compiled with OpenPGM, pgm and epgm transports
+ // are not avaialble.
+#if !defined ZMQ_HAVE_OPENPGM
+ if (protocol_ == "pgm" || protocol_ == "epgm") {
+ errno = EPROTONOSUPPORT;
+ return -1;
+ }
+#endif
+
+ // IPC transport is not available on Windows and OpenVMS.
+#if defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS
+ if (protocol_ == "ipc") {
+ // Unknown protocol.
+ errno = EPROTONOSUPPORT;
+ return -1;
+ }
+#endif
+
+ // Check whether socket type and transport protocol match.
+ // Specifically, multicast protocols can't be combined with
+ // bi-directional messaging patterns (socket types).
+ if ((protocol_ == "pgm" || protocol_ == "epgm") &&
+ options.type != ZMQ_PUB && options.type != ZMQ_SUB &&
+ options.type != ZMQ_XPUB && options.type != ZMQ_XSUB) {
+ errno = ENOCOMPATPROTO;
+ return -1;
+ }
+
+ // Protocol is available.
+ return 0;
+}
+
+void zmq::socket_base_t::attach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
+{
+ // First, register the pipe so that we can terminate it later on.
+ pipe_->set_event_sink (this);
+ pipes.push_back (pipe_);
+
+ // Let the derived socket type know about new pipe.
+ xattach_pipe (pipe_, subscribe_to_all_);
+
+ // If the socket is already being closed, ask any new pipes to terminate
+ // straight away.
+ if (is_terminating ()) {
+ register_term_acks (1);
+ pipe_->terminate (false);
+ }
+}
+
+int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
+ size_t optvallen_)
+{
+ if (unlikely (ctx_terminated)) {
+ errno = ETERM;
+ return -1;
+ }
+
+ // First, check whether specific socket type overloads the option.
+ int rc = xsetsockopt (option_, optval_, optvallen_);
+ if (rc == 0 || errno != EINVAL)
+ return rc;
+
+ // If the socket type doesn't support the option, pass it to
+ // the generic option parser.
+ return options.setsockopt (option_, optval_, optvallen_);
+}
+
+int zmq::socket_base_t::getsockopt (int option_, void *optval_,
+ size_t *optvallen_)
+{
+ if (unlikely (ctx_terminated)) {
+ errno = ETERM;
+ return -1;
+ }
+
+ if (option_ == ZMQ_RCVMORE) {
+ if (*optvallen_ < sizeof (int)) {
+ errno = EINVAL;
+ return -1;
+ }
+ *((int*) optval_) = rcvmore ? 1 : 0;
+ *optvallen_ = sizeof (int);
+ return 0;
+ }
+
+ if (option_ == ZMQ_FD) {
+ if (*optvallen_ < sizeof (fd_t)) {
+ errno = EINVAL;
+ return -1;
+ }
+ *((fd_t*) optval_) = mailbox.get_fd ();
+ *optvallen_ = sizeof (fd_t);
+ return 0;
+ }
+
+ if (option_ == ZMQ_EVENTS) {
+ if (*optvallen_ < sizeof (int)) {
+ errno = EINVAL;
+ return -1;
+ }
+ int rc = process_commands (0, false);
+ if (rc != 0 && (errno == EINTR || errno == ETERM))
+ return -1;
+ errno_assert (rc == 0);
+ *((int*) optval_) = 0;
+ if (has_out ())
+ *((int*) optval_) |= ZMQ_POLLOUT;
+ if (has_in ())
+ *((int*) optval_) |= ZMQ_POLLIN;
+ *optvallen_ = sizeof (int);
+ return 0;
+ }
+
+ if (option_ == ZMQ_LAST_ENDPOINT) {
+ if (*optvallen_ < last_endpoint.size () + 1) {
+ errno = EINVAL;
+ return -1;
+ }
+ strcpy (static_cast <char *> (optval_), last_endpoint.c_str ());
+ *optvallen_ = last_endpoint.size () + 1;
+ return 0;
+ }
+
+ return options.getsockopt (option_, optval_, optvallen_);
+}
+
+int zmq::socket_base_t::bind (const char *addr_)
+{
+ if (unlikely (ctx_terminated)) {
+ errno = ETERM;
+ return -1;
+ }
+
+ // Process pending commands, if any.
+ int rc = process_commands (0, false);
+ if (unlikely (rc != 0))
+ return -1;
+
+ // Parse addr_ string.
+ std::string protocol;
+ std::string address;
+ rc = parse_uri (addr_, protocol, address);
+ if (rc != 0)
+ return -1;
+
+ rc = check_protocol (protocol);
+ if (rc != 0)
+ return -1;
+
+ if (protocol == "inproc") {
+ endpoint_t endpoint = {this, options};
+ int rc = register_endpoint (addr_, endpoint);
+ if (rc == 0) {
+ connect_pending(addr_, this);
+ last_endpoint.assign (addr_);
+ }
+ return rc;
+ }
+
+ if (protocol == "pgm" || protocol == "epgm") {
+ // For convenience's sake, bind can be used interchageable with
+ // connect for PGM and EPGM transports.
+ return connect (addr_);
+ }
+
+ // Remaining trasnports require to be run in an I/O thread, so at this
+ // point we'll choose one.
+ io_thread_t *io_thread = choose_io_thread (options.affinity);
+ if (!io_thread) {
+ errno = EMTHREAD;
+ return -1;
+ }
+
+ if (protocol == "tcp") {
+ tcp_listener_t *listener = new (std::nothrow) tcp_listener_t (
+ io_thread, this, options);
+ alloc_assert (listener);
+ int rc = listener->set_address (address.c_str ());
+ if (rc != 0) {
+ delete listener;
+ event_bind_failed (address, zmq_errno());
+ return -1;
+ }
+
+ // Save last endpoint URI
+ listener->get_address (last_endpoint);
+
+ add_endpoint (addr_, (own_t *) listener, NULL);
+ return 0;
+ }
+
+#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
+ if (protocol == "ipc") {
+ ipc_listener_t *listener = new (std::nothrow) ipc_listener_t (
+ io_thread, this, options);
+ alloc_assert (listener);
+ int rc = listener->set_address (address.c_str ());
+ if (rc != 0) {
+ delete listener;
+ event_bind_failed (address, zmq_errno());
+ return -1;
+ }
+
+ // Save last endpoint URI
+ listener->get_address (last_endpoint);
+
+ add_endpoint (addr_, (own_t *) listener, NULL);
+ return 0;
+ }
+#endif
+
+ zmq_assert (false);
+ return -1;
+}
+
+int zmq::socket_base_t::connect (const char *addr_)
+{
+ if (unlikely (ctx_terminated)) {
+ errno = ETERM;
+ return -1;
+ }
+
+ // Process pending commands, if any.
+ int rc = process_commands (0, false);
+ if (unlikely (rc != 0))
+ return -1;
+
+ // Parse addr_ string.
+ std::string protocol;
+ std::string address;
+ rc = parse_uri (addr_, protocol, address);
+ if (rc != 0)
+ return -1;
+
+ rc = check_protocol (protocol);
+ if (rc != 0)
+ return -1;
+
+ if (protocol == "inproc") {
+
+ // TODO: inproc connect is specific with respect to creating pipes
+ // as there's no 'reconnect' functionality implemented. Once that
+ // is in place we should follow generic pipe creation algorithm.
+
+ // Find the peer endpoint.
+ endpoint_t peer = find_endpoint (addr_);
+
+ // The total HWM for an inproc connection should be the sum of
+ // the binder's HWM and the connector's HWM.
+ int sndhwm = 0;
+ if (peer.socket == NULL)
+ sndhwm = options.sndhwm;
+ else if (options.sndhwm != 0 && peer.options.rcvhwm != 0)
+ sndhwm = options.sndhwm + peer.options.rcvhwm;
+ int rcvhwm = 0;
+ if (peer.socket == NULL)
+ rcvhwm = options.rcvhwm;
+ else if (options.rcvhwm != 0 && peer.options.sndhwm != 0)
+ rcvhwm = options.rcvhwm + peer.options.sndhwm;
+
+ // Create a bi-directional pipe to connect the peers.
+ object_t *parents [2] = {this, peer.socket == NULL ? this : peer.socket};
+ pipe_t *new_pipes [2] = {NULL, NULL};
+
+ bool conflate = options.conflate &&
+ (options.type == ZMQ_DEALER ||
+ options.type == ZMQ_PULL ||
+ options.type == ZMQ_PUSH ||
+ options.type == ZMQ_PUB ||
+ options.type == ZMQ_SUB);
+
+ int hwms [2] = {conflate? -1 : sndhwm, conflate? -1 : rcvhwm};
+ bool conflates [2] = {conflate, conflate};
+ int rc = pipepair (parents, new_pipes, hwms, conflates);
+ errno_assert (rc == 0);
+
+ // Attach local end of the pipe to this socket object.
+ attach_pipe (new_pipes [0]);
+
+ if (!peer.socket) {
+ // The peer doesn't exist yet so we don't know whether
+ // to send the identity message or not. To resolve this,
+ // we always send our identity and drop it later if
+ // the peer doesn't expect it.
+ msg_t id;
+ rc = id.init_size (options.identity_size);
+ errno_assert (rc == 0);
+ memcpy (id.data (), options.identity, options.identity_size);
+ id.set_flags (msg_t::identity);
+ bool written = new_pipes [0]->write (&id);
+ zmq_assert (written);
+ new_pipes [0]->flush ();
+
+ endpoint_t endpoint = {this, options};
+ pending_connection_t pending_connection = {endpoint, new_pipes [0], new_pipes [1]};
+ pend_connection (addr_, pending_connection);
+ }
+ else
+ {
+ // If required, send the identity of the local socket to the peer.
+ if (peer.options.recv_identity) {
+
+ msg_t id;
+ rc = id.init_size (options.identity_size);
+ errno_assert (rc == 0);
+ memcpy (id.data (), options.identity, options.identity_size);
+ id.set_flags (msg_t::identity);
+ bool written = new_pipes [0]->write (&id);
+ zmq_assert (written);
+ new_pipes [0]->flush ();
+ }
+
+ // If required, send the identity of the peer to the local socket.
+ if (options.recv_identity) {
+ msg_t id;
+ rc = id.init_size (peer.options.identity_size);
+ errno_assert (rc == 0);
+ memcpy (id.data (), peer.options.identity, peer.options.identity_size);
+ id.set_flags (msg_t::identity);
+ bool written = new_pipes [1]->write (&id);
+ zmq_assert (written);
+ new_pipes [1]->flush ();
+ }
+
+ // Attach remote end of the pipe to the peer socket. Note that peer's
+ // seqnum was incremented in find_endpoint function. We don't need it
+ // increased here.
+ send_bind (peer.socket, new_pipes [1], false);
+ }
+
+ // Save last endpoint URI
+ last_endpoint.assign (addr_);
+
+ // remember inproc connections for disconnect
+ inprocs.insert (inprocs_t::value_type (std::string (addr_), new_pipes[0]));
+
+ return 0;
+ }
+
+ // Choose the I/O thread to run the session in.
+ io_thread_t *io_thread = choose_io_thread (options.affinity);
+ if (!io_thread) {
+ errno = EMTHREAD;
+ return -1;
+ }
+
+ address_t *paddr = new (std::nothrow) address_t (protocol, address);
+ alloc_assert (paddr);
+
+ // Resolve address (if needed by the protocol)
+ if (protocol == "tcp") {
+ paddr->resolved.tcp_addr = new (std::nothrow) tcp_address_t ();
+ alloc_assert (paddr->resolved.tcp_addr);
+ int rc = paddr->resolved.tcp_addr->resolve (
+ address.c_str (), false, options.ipv6);
+ if (rc != 0) {
+ delete paddr;
+ return -1;
+ }
+ }
+#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
+ else
+ if (protocol == "ipc") {
+ paddr->resolved.ipc_addr = new (std::nothrow) ipc_address_t ();
+ alloc_assert (paddr->resolved.ipc_addr);
+ int rc = paddr->resolved.ipc_addr->resolve (address.c_str ());
+ if (rc != 0) {
+ delete paddr;
+ return -1;
+ }
+ }
+#endif
+#ifdef ZMQ_HAVE_OPENPGM
+ if (protocol == "pgm" || protocol == "epgm") {
+ struct pgm_addrinfo_t *res = NULL;
+ uint16_t port_number = 0;
+ int rc = pgm_socket_t::init_address(address.c_str(), &res, &port_number);
+ if (res != NULL)
+ pgm_freeaddrinfo (res);
+ if (rc != 0 || port_number == 0)
+ return -1;
+ }
+#endif
+ // Create session.
+ session_base_t *session = session_base_t::create (io_thread, true, this,
+ options, paddr);
+ errno_assert (session);
+
+ // PGM does not support subscription forwarding; ask for all data to be
+ // sent to this pipe.
+ bool subscribe_to_all = protocol == "pgm" || protocol == "epgm";
+ pipe_t *newpipe = NULL;
+
+ if (options.immediate != 1 || subscribe_to_all) {
+ // Create a bi-directional pipe.
+ object_t *parents [2] = {this, session};
+ pipe_t *new_pipes [2] = {NULL, NULL};
+
+ bool conflate = options.conflate &&
+ (options.type == ZMQ_DEALER ||
+ options.type == ZMQ_PULL ||
+ options.type == ZMQ_PUSH ||
+ options.type == ZMQ_PUB ||
+ options.type == ZMQ_SUB);
+
+ int hwms [2] = {conflate? -1 : options.sndhwm,
+ conflate? -1 : options.rcvhwm};
+ bool conflates [2] = {conflate, conflate};
+ rc = pipepair (parents, new_pipes, hwms, conflates);
+ errno_assert (rc == 0);
+
+ // Attach local end of the pipe to the socket object.
+ attach_pipe (new_pipes [0], subscribe_to_all);
+ newpipe = new_pipes [0];
+
+ // Attach remote end of the pipe to the session object later on.
+ session->attach_pipe (new_pipes [1]);
+ }
+
+ // Save last endpoint URI
+ paddr->to_string (last_endpoint);
+
+ add_endpoint (addr_, (own_t *) session, newpipe);
+ return 0;
+}
+
+void zmq::socket_base_t::add_endpoint (const char *addr_, own_t *endpoint_, pipe_t *pipe)
+{
+ // Activate the session. Make it a child of this socket.
+ launch_child (endpoint_);
+ endpoints.insert (endpoints_t::value_type (std::string (addr_), endpoint_pipe_t(endpoint_, pipe)));
+}
+
+int zmq::socket_base_t::term_endpoint (const char *addr_)
+{
+ // Check whether the library haven't been shut down yet.
+ if (unlikely (ctx_terminated)) {
+ errno = ETERM;
+ return -1;
+ }
+
+ // Check whether endpoint address passed to the function is valid.
+ if (unlikely (!addr_)) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ // Process pending commands, if any, since there could be pending unprocessed process_own()'s
+ // (from launch_child() for example) we're asked to terminate now.
+ int rc = process_commands (0, false);
+ if (unlikely (rc != 0))
+ return -1;
+
+ // Parse addr_ string.
+ std::string protocol;
+ std::string address;
+ rc = parse_uri (addr_, protocol, address);
+ if (rc != 0)
+ return -1;
+
+ rc = check_protocol (protocol);
+ if (rc != 0)
+ return -1;
+
+ // Disconnect an inproc socket
+ if (protocol == "inproc") {
+ std::pair <inprocs_t::iterator, inprocs_t::iterator> range = inprocs.equal_range (std::string (addr_));
+ if (range.first == range.second) {
+ errno = ENOENT;
+ return -1;
+ }
+
+ for (inprocs_t::iterator it = range.first; it != range.second; ++it)
+ it->second->terminate(true);
+ inprocs.erase (range.first, range.second);
+ return 0;
+ }
+
+ // Find the endpoints range (if any) corresponding to the addr_ string.
+ std::pair <endpoints_t::iterator, endpoints_t::iterator> range = endpoints.equal_range (std::string (addr_));
+ if (range.first == range.second) {
+ errno = ENOENT;
+ return -1;
+ }
+
+ for (endpoints_t::iterator it = range.first; it != range.second; ++it) {
+ // If we have an associated pipe, terminate it.
+ if (it->second.second != NULL)
+ it->second.second->terminate(false);
+ term_child (it->second.first);
+ }
+ endpoints.erase (range.first, range.second);
+ return 0;
+}
+
+int zmq::socket_base_t::send (msg_t *msg_, int flags_)
+{
+ // Check whether the library haven't been shut down yet.
+ if (unlikely (ctx_terminated)) {
+ errno = ETERM;
+ return -1;
+ }
+
+ // Check whether message passed to the function is valid.
+ if (unlikely (!msg_ || !msg_->check ())) {
+ errno = EFAULT;
+ return -1;
+ }
+
+ // Process pending commands, if any.
+ int rc = process_commands (0, true);
+ if (unlikely (rc != 0))
+ return -1;
+
+ // Clear any user-visible flags that are set on the message.
+ msg_->reset_flags (msg_t::more);
+
+ // At this point we impose the flags on the message.
+ if (flags_ & ZMQ_SNDMORE)
+ msg_->set_flags (msg_t::more);
+
+ // Try to send the message.
+ rc = xsend (msg_);
+ if (rc == 0)
+ return 0;
+ if (unlikely (errno != EAGAIN))
+ return -1;
+
+ // In case of non-blocking send we'll simply propagate
+ // the error - including EAGAIN - up the stack.
+ if (flags_ & ZMQ_DONTWAIT || options.sndtimeo == 0)
+ return -1;
+
+ // Compute the time when the timeout should occur.
+ // If the timeout is infinite, don't care.
+ int timeout = options.sndtimeo;
+ uint64_t end = timeout < 0 ? 0 : (clock.now_ms () + timeout);
+
+ // Oops, we couldn't send the message. Wait for the next
+ // command, process it and try to send the message again.
+ // If timeout is reached in the meantime, return EAGAIN.
+ while (true) {
+ if (unlikely (process_commands (timeout, false) != 0))
+ return -1;
+ rc = xsend (msg_);
+ if (rc == 0)
+ break;
+ if (unlikely (errno != EAGAIN))
+ return -1;
+ if (timeout > 0) {
+ timeout = (int) (end - clock.now_ms ());
+ if (timeout <= 0) {
+ errno = EAGAIN;
+ return -1;
+ }
+ }
+ }
+ return 0;
+}
+
+int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
+{
+ // Check whether the library haven't been shut down yet.
+ if (unlikely (ctx_terminated)) {
+ errno = ETERM;
+ return -1;
+ }
+
+ // Check whether message passed to the function is valid.
+ if (unlikely (!msg_ || !msg_->check ())) {
+ errno = EFAULT;
+ return -1;
+ }
+
+ // Once every inbound_poll_rate messages check for signals and process
+ // incoming commands. This happens only if we are not polling altogether
+ // because there are messages available all the time. If poll occurs,
+ // ticks is set to zero and thus we avoid this code.
+ //
+ // Note that 'recv' uses different command throttling algorithm (the one
+ // described above) from the one used by 'send'. This is because counting
+ // ticks is more efficient than doing RDTSC all the time.
+ if (++ticks == inbound_poll_rate) {
+ if (unlikely (process_commands (0, false) != 0))
+ return -1;
+ ticks = 0;
+ }
+
+ // Get the message.
+ int rc = xrecv (msg_);
+ if (unlikely (rc != 0 && errno != EAGAIN))
+ return -1;
+
+ // If we have the message, return immediately.
+ if (rc == 0) {
+ extract_flags (msg_);
+ return 0;
+ }
+
+ // If the message cannot be fetched immediately, there are two scenarios.
+ // For non-blocking recv, commands are processed in case there's an
+ // activate_reader command already waiting int a command pipe.
+ // If it's not, return EAGAIN.
+ if (flags_ & ZMQ_DONTWAIT || options.rcvtimeo == 0) {
+ if (unlikely (process_commands (0, false) != 0))
+ return -1;
+ ticks = 0;
+
+ rc = xrecv (msg_);
+ if (rc < 0)
+ return rc;
+ extract_flags (msg_);
+ return 0;
+ }
+
+ // Compute the time when the timeout should occur.
+ // If the timeout is infinite, don't care.
+ int timeout = options.rcvtimeo;
+ uint64_t end = timeout < 0 ? 0 : (clock.now_ms () + timeout);
+
+ // In blocking scenario, commands are processed over and over again until
+ // we are able to fetch a message.
+ bool block = (ticks != 0);
+ while (true) {
+ if (unlikely (process_commands (block ? timeout : 0, false) != 0))
+ return -1;
+ rc = xrecv (msg_);
+ if (rc == 0) {
+ ticks = 0;
+ break;
+ }
+ if (unlikely (errno != EAGAIN))
+ return -1;
+ block = true;
+ if (timeout > 0) {
+ timeout = (int) (end - clock.now_ms ());
+ if (timeout <= 0) {
+ errno = EAGAIN;
+ return -1;
+ }
+ }
+ }
+
+ extract_flags (msg_);
+ return 0;
+}
+
+int zmq::socket_base_t::close ()
+{
+ // Mark the socket as dead
+ tag = 0xdeadbeef;
+
+ // Transfer the ownership of the socket from this application thread
+ // to the reaper thread which will take care of the rest of shutdown
+ // process.
+ send_reap (this);
+
+ return 0;
+}
+
+bool zmq::socket_base_t::has_in ()
+{
+ return xhas_in ();
+}
+
+bool zmq::socket_base_t::has_out ()
+{
+ return xhas_out ();
+}
+
+void zmq::socket_base_t::start_reaping (poller_t *poller_)
+{
+ // Plug the socket to the reaper thread.
+ poller = poller_;
+ handle = poller->add_fd (mailbox.get_fd (), this);
+ poller->set_pollin (handle);
+
+ // Initialise the termination and check whether it can be deallocated
+ // immediately.
+ terminate ();
+ check_destroy ();
+}
+
+int zmq::socket_base_t::process_commands (int timeout_, bool throttle_)
+{
+ int rc;
+ command_t cmd;
+ if (timeout_ != 0) {
+
+ // If we are asked to wait, simply ask mailbox to wait.
+ rc = mailbox.recv (&cmd, timeout_);
+ }
+ else {
+
+ // If we are asked not to wait, check whether we haven't processed
+ // commands recently, so that we can throttle the new commands.
+
+ // Get the CPU's tick counter. If 0, the counter is not available.
+ uint64_t tsc = zmq::clock_t::rdtsc ();
+
+ // Optimised version of command processing - it doesn't have to check
+ // for incoming commands each time. It does so only if certain time
+ // elapsed since last command processing. Command delay varies
+ // depending on CPU speed: It's ~1ms on 3GHz CPU, ~2ms on 1.5GHz CPU
+ // etc. The optimisation makes sense only on platforms where getting
+ // a timestamp is a very cheap operation (tens of nanoseconds).
+ if (tsc && throttle_) {
+
+ // Check whether TSC haven't jumped backwards (in case of migration
+ // between CPU cores) and whether certain time have elapsed since
+ // last command processing. If it didn't do nothing.
+ if (tsc >= last_tsc && tsc - last_tsc <= max_command_delay)
+ return 0;
+ last_tsc = tsc;
+ }
+
+ // Check whether there are any commands pending for this thread.
+ rc = mailbox.recv (&cmd, 0);
+ }
+
+ // Process all available commands.
+ while (rc == 0) {
+ cmd.destination->process_command (cmd);
+ rc = mailbox.recv (&cmd, 0);
+ }
+
+ if (errno == EINTR)
+ return -1;
+
+ zmq_assert (errno == EAGAIN);
+
+ if (ctx_terminated) {
+ errno = ETERM;
+ return -1;
+ }
+
+ return 0;
+}
+
+void zmq::socket_base_t::process_stop ()
+{
+ // Here, someone have called zmq_term while the socket was still alive.
+ // We'll remember the fact so that any blocking call is interrupted and any
+ // further attempt to use the socket will return ETERM. The user is still
+ // responsible for calling zmq_close on the socket though!
+ stop_monitor ();
+ ctx_terminated = true;
+}
+
+void zmq::socket_base_t::process_bind (pipe_t *pipe_)
+{
+ attach_pipe (pipe_);
+}
+
+void zmq::socket_base_t::process_term (int linger_)
+{
+ // Unregister all inproc endpoints associated with this socket.
+ // Doing this we make sure that no new pipes from other sockets (inproc)
+ // will be initiated.
+ unregister_endpoints (this);
+
+ // Ask all attached pipes to terminate.
+ for (pipes_t::size_type i = 0; i != pipes.size (); ++i)
+ pipes [i]->terminate (false);
+ register_term_acks ((int) pipes.size ());
+
+ // Continue the termination process immediately.
+ own_t::process_term (linger_);
+}
+
+void zmq::socket_base_t::process_destroy ()
+{
+ destroyed = true;
+}
+
+int zmq::socket_base_t::xsetsockopt (int, const void *, size_t)
+{
+ errno = EINVAL;
+ return -1;
+}
+
+bool zmq::socket_base_t::xhas_out ()
+{
+ return false;
+}
+
+int zmq::socket_base_t::xsend (msg_t *)
+{
+ errno = ENOTSUP;
+ return -1;
+}
+
+bool zmq::socket_base_t::xhas_in ()
+{
+ return false;
+}
+
+int zmq::socket_base_t::xrecv (msg_t *)
+{
+ errno = ENOTSUP;
+ return -1;
+}
+
+void zmq::socket_base_t::xread_activated (pipe_t *)
+{
+ zmq_assert (false);
+}
+void zmq::socket_base_t::xwrite_activated (pipe_t *)
+{
+ zmq_assert (false);
+}
+
+void zmq::socket_base_t::xhiccuped (pipe_t *)
+{
+ zmq_assert (false);
+}
+
+void zmq::socket_base_t::in_event ()
+{
+ // This function is invoked only once the socket is running in the context
+ // of the reaper thread. Process any commands from other threads/sockets
+ // that may be available at the moment. Ultimately, the socket will
+ // be destroyed.
+ process_commands (0, false);
+ check_destroy ();
+}
+
+void zmq::socket_base_t::out_event ()
+{
+ zmq_assert (false);
+}
+
+void zmq::socket_base_t::timer_event (int)
+{
+ zmq_assert (false);
+}
+
+void zmq::socket_base_t::check_destroy ()
+{
+ // If the object was already marked as destroyed, finish the deallocation.
+ if (destroyed) {
+
+ // Remove the socket from the reaper's poller.
+ poller->rm_fd (handle);
+
+ // Remove the socket from the context.
+ destroy_socket (this);
+
+ // Notify the reaper about the fact.
+ send_reaped ();
+
+ // Deallocate.
+ own_t::process_destroy ();
+ }
+}
+
+void zmq::socket_base_t::read_activated (pipe_t *pipe_)
+{
+ xread_activated (pipe_);
+}
+
+void zmq::socket_base_t::write_activated (pipe_t *pipe_)
+{
+ xwrite_activated (pipe_);
+}
+
+void zmq::socket_base_t::hiccuped (pipe_t *pipe_)
+{
+ if (options.immediate == 1)
+ pipe_->terminate (false);
+ else
+ // Notify derived sockets of the hiccup
+ xhiccuped (pipe_);
+}
+
+void zmq::socket_base_t::pipe_terminated (pipe_t *pipe_)
+{
+ // Notify the specific socket type about the pipe termination.
+ xpipe_terminated (pipe_);
+
+ // Remove pipe from inproc pipes
+ for (inprocs_t::iterator it = inprocs.begin(); it != inprocs.end(); ++it) {
+ if (it->second == pipe_) {
+ inprocs.erase(it);
+ break;
+ }
+ }
+
+ // Remove the pipe from the list of attached pipes and confirm its
+ // termination if we are already shutting down.
+ pipes.erase (pipe_);
+ if (is_terminating ())
+ unregister_term_ack ();
+}
+
+void zmq::socket_base_t::extract_flags (msg_t *msg_)
+{
+ // Test whether IDENTITY flag is valid for this socket type.
+ if (unlikely (msg_->flags () & msg_t::identity))
+ zmq_assert (options.recv_identity);
+
+ // Remove MORE flag.
+ rcvmore = msg_->flags () & msg_t::more ? true : false;
+}
+
+int zmq::socket_base_t::monitor (const char *addr_, int events_)
+{
+ int rc;
+ if (unlikely (ctx_terminated)) {
+ errno = ETERM;
+ return -1;
+ }
+
+ // Support deregistering monitoring endpoints as well
+ if (addr_ == NULL) {
+ stop_monitor ();
+ return 0;
+ }
+
+ // Parse addr_ string.
+ std::string protocol;
+ std::string address;
+ rc = parse_uri (addr_, protocol, address);
+ if (rc != 0)
+ return -1;
+
+ rc = check_protocol (protocol);
+ if (rc != 0)
+ return -1;
+
+ // Event notification only supported over inproc://
+ if (protocol != "inproc") {
+ errno = EPROTONOSUPPORT;
+ return -1;
+ }
+
+ // Register events to monitor
+ monitor_events = events_;
+ monitor_socket = zmq_socket (get_ctx (), ZMQ_PAIR);
+ if (monitor_socket == NULL)
+ return -1;
+
+ // Never block context termination on pending event messages
+ int linger = 0;
+ rc = zmq_setsockopt (monitor_socket, ZMQ_LINGER, &linger, sizeof (linger));
+ if (rc == -1)
+ stop_monitor ();
+
+ // Spawn the monitor socket endpoint
+ rc = zmq_bind (monitor_socket, addr_);
+ if (rc == -1)
+ stop_monitor ();
+ return rc;
+}
+
+void zmq::socket_base_t::event_connected (std::string &addr_, int fd_)
+{
+ if (monitor_events & ZMQ_EVENT_CONNECTED) {
+ zmq_event_t event;
+ event.event = ZMQ_EVENT_CONNECTED;
+ event.value = fd_;
+ monitor_event (event, addr_);
+ }
+}
+
+void zmq::socket_base_t::event_connect_delayed (std::string &addr_, int err_)
+{
+ if (monitor_events & ZMQ_EVENT_CONNECT_DELAYED) {
+ zmq_event_t event;
+ event.event = ZMQ_EVENT_CONNECT_DELAYED;
+ event.value = err_;
+ monitor_event (event, addr_);
+ }
+}
+
+void zmq::socket_base_t::event_connect_retried (std::string &addr_, int interval_)
+{
+ if (monitor_events & ZMQ_EVENT_CONNECT_RETRIED) {
+ zmq_event_t event;
+ event.event = ZMQ_EVENT_CONNECT_RETRIED;
+ event.value = interval_;
+ monitor_event (event, addr_);
+ }
+}
+
+void zmq::socket_base_t::event_listening (std::string &addr_, int fd_)
+{
+ if (monitor_events & ZMQ_EVENT_LISTENING) {
+ zmq_event_t event;
+ event.event = ZMQ_EVENT_LISTENING;
+ event.value = fd_;
+ monitor_event (event, addr_);
+ }
+}
+
+void zmq::socket_base_t::event_bind_failed (std::string &addr_, int err_)
+{
+ if (monitor_events & ZMQ_EVENT_BIND_FAILED) {
+ zmq_event_t event;
+ event.event = ZMQ_EVENT_BIND_FAILED;
+ event.value = err_;
+ monitor_event (event, addr_);
+ }
+}
+
+void zmq::socket_base_t::event_accepted (std::string &addr_, int fd_)
+{
+ if (monitor_events & ZMQ_EVENT_ACCEPTED) {
+ zmq_event_t event;
+ event.event = ZMQ_EVENT_ACCEPTED;
+ event.value = fd_;
+ monitor_event (event, addr_);
+ }
+}
+
+void zmq::socket_base_t::event_accept_failed (std::string &addr_, int err_)
+{
+ if (monitor_events & ZMQ_EVENT_ACCEPT_FAILED) {
+ zmq_event_t event;
+ event.event = ZMQ_EVENT_ACCEPT_FAILED;
+ event.value= err_;
+ monitor_event (event, addr_);
+ }
+}
+
+void zmq::socket_base_t::event_closed (std::string &addr_, int fd_)
+{
+ if (monitor_events & ZMQ_EVENT_CLOSED) {
+ zmq_event_t event;
+ event.event = ZMQ_EVENT_CLOSED;
+ event.value = fd_;
+ monitor_event (event, addr_);
+ }
+}
+
+void zmq::socket_base_t::event_close_failed (std::string &addr_, int err_)
+{
+ if (monitor_events & ZMQ_EVENT_CLOSE_FAILED) {
+ zmq_event_t event;
+ event.event = ZMQ_EVENT_CLOSE_FAILED;
+ event.value = err_;
+ monitor_event (event, addr_);
+ }
+}
+
+void zmq::socket_base_t::event_disconnected (std::string &addr_, int fd_)
+{
+ if (monitor_events & ZMQ_EVENT_DISCONNECTED) {
+ zmq_event_t event;
+ event.event = ZMQ_EVENT_DISCONNECTED;
+ event.value = fd_;
+ monitor_event (event, addr_);
+ }
+}
+
+void zmq::socket_base_t::monitor_event (zmq_event_t event_, const std::string& addr_)
+{
+ if (monitor_socket) {
+ const uint16_t eid = (uint16_t)event_.event;
+ const uint32_t value = (uint32_t)event_.value;
+ // prepare and send first message frame
+ // containing event id and value
+ zmq_msg_t msg;
+ zmq_msg_init_size (&msg, sizeof(eid) + sizeof(value));
+ char* data1 = (char*)zmq_msg_data(&msg);
+ memcpy (data1, &eid, sizeof(eid));
+ memcpy (data1+sizeof(eid), &value, sizeof(value));
+ zmq_sendmsg (monitor_socket, &msg, ZMQ_SNDMORE);
+ // prepare and send second message frame
+ // containing the address (endpoint)
+ zmq_msg_init_size (&msg, addr_.size());
+ memcpy(zmq_msg_data(&msg), addr_.c_str(), addr_.size());
+ zmq_sendmsg (monitor_socket, &msg, 0);
+ }
+}
+
+void zmq::socket_base_t::stop_monitor()
+{
+ if (monitor_socket) {
+ if (monitor_events & ZMQ_EVENT_MONITOR_STOPPED) {
+ zmq_event_t event;
+ event.event = ZMQ_EVENT_MONITOR_STOPPED;
+ event.value = 0;
+ monitor_event (event, "");
+ }
+ zmq_close (monitor_socket);
+ monitor_socket = NULL;
+ monitor_events = 0;
+ }
+}