From 597cc5edd624525563e6549dc0057eca2a51c81d Mon Sep 17 00:00:00 2001 From: Micah Anderson Date: Tue, 11 Nov 2014 13:30:46 -0500 Subject: upgrade to new version --- src/xpub.cpp | 191 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 191 insertions(+) create mode 100644 src/xpub.cpp (limited to 'src/xpub.cpp') diff --git a/src/xpub.cpp b/src/xpub.cpp new file mode 100644 index 0000000..99a699d --- /dev/null +++ b/src/xpub.cpp @@ -0,0 +1,191 @@ +/* + Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include + +#include "xpub.hpp" +#include "pipe.hpp" +#include "err.hpp" +#include "msg.hpp" + +zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) : + socket_base_t (parent_, tid_, sid_), + verbose(false), + more (false) +{ + options.type = ZMQ_XPUB; +} + +zmq::xpub_t::~xpub_t () +{ +} + +void zmq::xpub_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) +{ + zmq_assert (pipe_); + dist.attach (pipe_); + + // If subscribe_to_all_ is specified, the caller would like to subscribe + // to all data on this pipe, implicitly. + if (subscribe_to_all_) + subscriptions.add (NULL, 0, pipe_); + + // The pipe is active when attached. Let's read the subscriptions from + // it, if any. + xread_activated (pipe_); +} + +void zmq::xpub_t::xread_activated (pipe_t *pipe_) +{ + // There are some subscriptions waiting. Let's process them. + msg_t sub; + while (pipe_->read (&sub)) { + // Apply the subscription to the trie + unsigned char *const data = (unsigned char *) sub.data (); + const size_t size = sub.size (); + if (size > 0 && (*data == 0 || *data == 1)) { + bool unique; + if (*data == 0) + unique = subscriptions.rm (data + 1, size - 1, pipe_); + else + unique = subscriptions.add (data + 1, size - 1, pipe_); + + // If the subscription is not a duplicate store it so that it can be + // passed to used on next recv call. (Unsubscribe is not verbose.) + if (options.type == ZMQ_XPUB && (unique || (*data && verbose))) { + pending_data.push_back (blob_t (data, size)); + pending_flags.push_back (0); + } + } + else { + // Process user message coming upstream from xsub socket + pending_data.push_back (blob_t (data, size)); + pending_flags.push_back (sub.flags ()); + } + sub.close (); + } +} + +void zmq::xpub_t::xwrite_activated (pipe_t *pipe_) +{ + dist.activated (pipe_); +} + +int zmq::xpub_t::xsetsockopt (int option_, const void *optval_, + size_t optvallen_) +{ + if (option_ != ZMQ_XPUB_VERBOSE) { + errno = EINVAL; + return -1; + } + if (optvallen_ != sizeof (int) || *static_cast (optval_) < 0) { + errno = EINVAL; + return -1; + } + verbose = (*static_cast (optval_) != 0); + return 0; +} + +void zmq::xpub_t::xpipe_terminated (pipe_t *pipe_) +{ + // Remove the pipe from the trie. If there are topics that nobody + // is interested in anymore, send corresponding unsubscriptions + // upstream. + subscriptions.rm (pipe_, send_unsubscription, this); + + dist.pipe_terminated (pipe_); +} + +void zmq::xpub_t::mark_as_matching (pipe_t *pipe_, void *arg_) +{ + xpub_t *self = (xpub_t*) arg_; + self->dist.match (pipe_); +} + +int zmq::xpub_t::xsend (msg_t *msg_) +{ + bool msg_more = msg_->flags () & msg_t::more ? true : false; + + // For the first part of multi-part message, find the matching pipes. + if (!more) + subscriptions.match ((unsigned char*) msg_->data (), msg_->size (), + mark_as_matching, this); + + // Send the message to all the pipes that were marked as matching + // in the previous step. + int rc = dist.send_to_matching (msg_); + if (rc != 0) + return rc; + + // If we are at the end of multi-part message we can mark all the pipes + // as non-matching. + if (!msg_more) + dist.unmatch (); + + more = msg_more; + + return 0; +} + +bool zmq::xpub_t::xhas_out () +{ + return dist.has_out (); +} + +int zmq::xpub_t::xrecv (msg_t *msg_) +{ + // If there is at least one + if (pending_data.empty ()) { + errno = EAGAIN; + return -1; + } + + int rc = msg_->close (); + errno_assert (rc == 0); + rc = msg_->init_size (pending_data.front ().size ()); + errno_assert (rc == 0); + memcpy (msg_->data (), + pending_data.front ().data (), + pending_data.front ().size ()); + msg_->set_flags (pending_flags.front ()); + pending_data.pop_front (); + pending_flags.pop_front (); + return 0; +} + +bool zmq::xpub_t::xhas_in () +{ + return !pending_data.empty (); +} + +void zmq::xpub_t::send_unsubscription (unsigned char *data_, size_t size_, + void *arg_) +{ + xpub_t *self = (xpub_t*) arg_; + + if (self->options.type != ZMQ_PUB) { + // Place the unsubscription to the queue of pending (un)sunscriptions + // to be retrived by the user later on. + blob_t unsub (size_ + 1, 0); + unsub [0] = 0; + memcpy (&unsub [1], data_, size_); + self->pending_data.push_back (unsub); + self->pending_flags.push_back (0); + } +} -- cgit v1.2.3