summaryrefslogtreecommitdiff
path: root/src/xsub.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/xsub.cpp')
-rw-r--r--src/xsub.cpp228
1 files changed, 0 insertions, 228 deletions
diff --git a/src/xsub.cpp b/src/xsub.cpp
deleted file mode 100644
index c4381a2..0000000
--- a/src/xsub.cpp
+++ /dev/null
@@ -1,228 +0,0 @@
-/*
- Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the GNU Lesser General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU Lesser General Public License for more details.
-
- You should have received a copy of the GNU Lesser General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include <string.h>
-
-#include "xsub.hpp"
-#include "err.hpp"
-
-zmq::xsub_t::xsub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
- socket_base_t (parent_, tid_, sid_),
- has_message (false),
- more (false)
-{
- options.type = ZMQ_XSUB;
-
- // When socket is being closed down we don't want to wait till pending
- // subscription commands are sent to the wire.
- options.linger = 0;
-
- int rc = message.init ();
- errno_assert (rc == 0);
-}
-
-zmq::xsub_t::~xsub_t ()
-{
- int rc = message.close ();
- errno_assert (rc == 0);
-}
-
-void zmq::xsub_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
-{
- // subscribe_to_all_ is unused
- (void) subscribe_to_all_;
-
- zmq_assert (pipe_);
- fq.attach (pipe_);
- dist.attach (pipe_);
-
- // Send all the cached subscriptions to the new upstream peer.
- subscriptions.apply (send_subscription, pipe_);
- pipe_->flush ();
-}
-
-void zmq::xsub_t::xread_activated (pipe_t *pipe_)
-{
- fq.activated (pipe_);
-}
-
-void zmq::xsub_t::xwrite_activated (pipe_t *pipe_)
-{
- dist.activated (pipe_);
-}
-
-void zmq::xsub_t::xpipe_terminated (pipe_t *pipe_)
-{
- fq.pipe_terminated (pipe_);
- dist.pipe_terminated (pipe_);
-}
-
-void zmq::xsub_t::xhiccuped (pipe_t *pipe_)
-{
- // Send all the cached subscriptions to the hiccuped pipe.
- subscriptions.apply (send_subscription, pipe_);
- pipe_->flush ();
-}
-
-int zmq::xsub_t::xsend (msg_t *msg_)
-{
- size_t size = msg_->size ();
- unsigned char *data = (unsigned char *) msg_->data ();
-
- if (size > 0 && *data == 1) {
- // Process subscribe message
- // This used to filter out duplicate subscriptions,
- // however this is alread done on the XPUB side and
- // doing it here as well breaks ZMQ_XPUB_VERBOSE
- // when there are forwarding devices involved.
- subscriptions.add (data + 1, size - 1);
- return dist.send_to_all (msg_);
- }
- else
- if (size > 0 && *data == 0) {
- // Process unsubscribe message
- if (subscriptions.rm (data + 1, size - 1))
- return dist.send_to_all (msg_);
- }
- else
- // User message sent upstream to XPUB socket
- return dist.send_to_all (msg_);
-
- int rc = msg_->close ();
- errno_assert (rc == 0);
- rc = msg_->init ();
- errno_assert (rc == 0);
-
- return 0;
-}
-
-bool zmq::xsub_t::xhas_out ()
-{
- // Subscription can be added/removed anytime.
- return true;
-}
-
-int zmq::xsub_t::xrecv (msg_t *msg_)
-{
- // If there's already a message prepared by a previous call to zmq_poll,
- // return it straight ahead.
- if (has_message) {
- int rc = msg_->move (message);
- errno_assert (rc == 0);
- has_message = false;
- more = msg_->flags () & msg_t::more ? true : false;
- return 0;
- }
-
- // TODO: This can result in infinite loop in the case of continuous
- // stream of non-matching messages which breaks the non-blocking recv
- // semantics.
- while (true) {
-
- // Get a message using fair queueing algorithm.
- int rc = fq.recv (msg_);
-
- // If there's no message available, return immediately.
- // The same when error occurs.
- if (rc != 0)
- return -1;
-
- // Check whether the message matches at least one subscription.
- // Non-initial parts of the message are passed
- if (more || !options.filter || match (msg_)) {
- more = msg_->flags () & msg_t::more ? true : false;
- return 0;
- }
-
- // Message doesn't match. Pop any remaining parts of the message
- // from the pipe.
- while (msg_->flags () & msg_t::more) {
- rc = fq.recv (msg_);
- errno_assert (rc == 0);
- }
- }
-}
-
-bool zmq::xsub_t::xhas_in ()
-{
- // There are subsequent parts of the partly-read message available.
- if (more)
- return true;
-
- // If there's already a message prepared by a previous call to zmq_poll,
- // return straight ahead.
- if (has_message)
- return true;
-
- // TODO: This can result in infinite loop in the case of continuous
- // stream of non-matching messages.
- while (true) {
-
- // Get a message using fair queueing algorithm.
- int rc = fq.recv (&message);
-
- // If there's no message available, return immediately.
- // The same when error occurs.
- if (rc != 0) {
- errno_assert (errno == EAGAIN);
- return false;
- }
-
- // Check whether the message matches at least one subscription.
- if (!options.filter || match (&message)) {
- has_message = true;
- return true;
- }
-
- // Message doesn't match. Pop any remaining parts of the message
- // from the pipe.
- while (message.flags () & msg_t::more) {
- rc = fq.recv (&message);
- errno_assert (rc == 0);
- }
- }
-}
-
-bool zmq::xsub_t::match (msg_t *msg_)
-{
- return subscriptions.check ((unsigned char*) msg_->data (), msg_->size ());
-}
-
-void zmq::xsub_t::send_subscription (unsigned char *data_, size_t size_,
- void *arg_)
-{
- pipe_t *pipe = (pipe_t*) arg_;
-
- // Create the subsctription message.
- msg_t msg;
- int rc = msg.init_size (size_ + 1);
- errno_assert (rc == 0);
- unsigned char *data = (unsigned char*) msg.data ();
- data [0] = 1;
- memcpy (data + 1, data_, size_);
-
- // Send it to the pipe.
- bool sent = pipe->write (&msg);
- // If we reached the SNDHWM, and thus cannot send the subscription, drop
- // the subscription message instead. This matches the behaviour of
- // zmq_setsockopt(ZMQ_SUBSCRIBE, ...), which also drops subscriptions
- // when the SNDHWM is reached.
- if (!sent)
- msg.close ();
-}