diff options
Diffstat (limited to 'src/xpub.cpp')
-rw-r--r-- | src/xpub.cpp | 191 |
1 files changed, 0 insertions, 191 deletions
diff --git a/src/xpub.cpp b/src/xpub.cpp deleted file mode 100644 index 99a699d..0000000 --- a/src/xpub.cpp +++ /dev/null @@ -1,191 +0,0 @@ -/* - Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see <http://www.gnu.org/licenses/>. -*/ - -#include <string.h> - -#include "xpub.hpp" -#include "pipe.hpp" -#include "err.hpp" -#include "msg.hpp" - -zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) : - socket_base_t (parent_, tid_, sid_), - verbose(false), - more (false) -{ - options.type = ZMQ_XPUB; -} - -zmq::xpub_t::~xpub_t () -{ -} - -void zmq::xpub_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) -{ - zmq_assert (pipe_); - dist.attach (pipe_); - - // If subscribe_to_all_ is specified, the caller would like to subscribe - // to all data on this pipe, implicitly. - if (subscribe_to_all_) - subscriptions.add (NULL, 0, pipe_); - - // The pipe is active when attached. Let's read the subscriptions from - // it, if any. - xread_activated (pipe_); -} - -void zmq::xpub_t::xread_activated (pipe_t *pipe_) -{ - // There are some subscriptions waiting. Let's process them. - msg_t sub; - while (pipe_->read (&sub)) { - // Apply the subscription to the trie - unsigned char *const data = (unsigned char *) sub.data (); - const size_t size = sub.size (); - if (size > 0 && (*data == 0 || *data == 1)) { - bool unique; - if (*data == 0) - unique = subscriptions.rm (data + 1, size - 1, pipe_); - else - unique = subscriptions.add (data + 1, size - 1, pipe_); - - // If the subscription is not a duplicate store it so that it can be - // passed to used on next recv call. (Unsubscribe is not verbose.) - if (options.type == ZMQ_XPUB && (unique || (*data && verbose))) { - pending_data.push_back (blob_t (data, size)); - pending_flags.push_back (0); - } - } - else { - // Process user message coming upstream from xsub socket - pending_data.push_back (blob_t (data, size)); - pending_flags.push_back (sub.flags ()); - } - sub.close (); - } -} - -void zmq::xpub_t::xwrite_activated (pipe_t *pipe_) -{ - dist.activated (pipe_); -} - -int zmq::xpub_t::xsetsockopt (int option_, const void *optval_, - size_t optvallen_) -{ - if (option_ != ZMQ_XPUB_VERBOSE) { - errno = EINVAL; - return -1; - } - if (optvallen_ != sizeof (int) || *static_cast <const int*> (optval_) < 0) { - errno = EINVAL; - return -1; - } - verbose = (*static_cast <const int*> (optval_) != 0); - return 0; -} - -void zmq::xpub_t::xpipe_terminated (pipe_t *pipe_) -{ - // Remove the pipe from the trie. If there are topics that nobody - // is interested in anymore, send corresponding unsubscriptions - // upstream. - subscriptions.rm (pipe_, send_unsubscription, this); - - dist.pipe_terminated (pipe_); -} - -void zmq::xpub_t::mark_as_matching (pipe_t *pipe_, void *arg_) -{ - xpub_t *self = (xpub_t*) arg_; - self->dist.match (pipe_); -} - -int zmq::xpub_t::xsend (msg_t *msg_) -{ - bool msg_more = msg_->flags () & msg_t::more ? true : false; - - // For the first part of multi-part message, find the matching pipes. - if (!more) - subscriptions.match ((unsigned char*) msg_->data (), msg_->size (), - mark_as_matching, this); - - // Send the message to all the pipes that were marked as matching - // in the previous step. - int rc = dist.send_to_matching (msg_); - if (rc != 0) - return rc; - - // If we are at the end of multi-part message we can mark all the pipes - // as non-matching. - if (!msg_more) - dist.unmatch (); - - more = msg_more; - - return 0; -} - -bool zmq::xpub_t::xhas_out () -{ - return dist.has_out (); -} - -int zmq::xpub_t::xrecv (msg_t *msg_) -{ - // If there is at least one - if (pending_data.empty ()) { - errno = EAGAIN; - return -1; - } - - int rc = msg_->close (); - errno_assert (rc == 0); - rc = msg_->init_size (pending_data.front ().size ()); - errno_assert (rc == 0); - memcpy (msg_->data (), - pending_data.front ().data (), - pending_data.front ().size ()); - msg_->set_flags (pending_flags.front ()); - pending_data.pop_front (); - pending_flags.pop_front (); - return 0; -} - -bool zmq::xpub_t::xhas_in () -{ - return !pending_data.empty (); -} - -void zmq::xpub_t::send_unsubscription (unsigned char *data_, size_t size_, - void *arg_) -{ - xpub_t *self = (xpub_t*) arg_; - - if (self->options.type != ZMQ_PUB) { - // Place the unsubscription to the queue of pending (un)sunscriptions - // to be retrived by the user later on. - blob_t unsub (size_ + 1, 0); - unsub [0] = 0; - memcpy (&unsub [1], data_, size_); - self->pending_data.push_back (unsub); - self->pending_flags.push_back (0); - } -} |