From 597cc5edd624525563e6549dc0057eca2a51c81d Mon Sep 17 00:00:00 2001 From: Micah Anderson Date: Tue, 11 Nov 2014 13:30:46 -0500 Subject: upgrade to new version --- src/pipe.hpp | 216 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 216 insertions(+) create mode 100644 src/pipe.hpp (limited to 'src/pipe.hpp') diff --git a/src/pipe.hpp b/src/pipe.hpp new file mode 100644 index 0000000..dfb35d3 --- /dev/null +++ b/src/pipe.hpp @@ -0,0 +1,216 @@ +/* + Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_PIPE_HPP_INCLUDED__ +#define __ZMQ_PIPE_HPP_INCLUDED__ + +#include "msg.hpp" +#include "ypipe_base.hpp" +#include "config.hpp" +#include "object.hpp" +#include "stdint.hpp" +#include "array.hpp" +#include "blob.hpp" + +namespace zmq +{ + + class object_t; + class pipe_t; + + // Create a pipepair for bi-directional transfer of messages. + // First HWM is for messages passed from first pipe to the second pipe. + // Second HWM is for messages passed from second pipe to the first pipe. + // Delay specifies how the pipe behaves when the peer terminates. If true + // pipe receives all the pending messages before terminating, otherwise it + // terminates straight away. + // If conflate is true, only the most recently arrived message could be + // read (older messages are discarded) + int pipepair (zmq::object_t *parents_ [2], zmq::pipe_t* pipes_ [2], + int hwms_ [2], bool conflate_ [2]); + + struct i_pipe_events + { + virtual ~i_pipe_events () {} + + virtual void read_activated (zmq::pipe_t *pipe_) = 0; + virtual void write_activated (zmq::pipe_t *pipe_) = 0; + virtual void hiccuped (zmq::pipe_t *pipe_) = 0; + virtual void pipe_terminated (zmq::pipe_t *pipe_) = 0; + }; + + // Note that pipe can be stored in three different arrays. + // The array of inbound pipes (1), the array of outbound pipes (2) and + // the generic array of pipes to deallocate (3). + + class pipe_t : + public object_t, + public array_item_t <1>, + public array_item_t <2>, + public array_item_t <3> + { + // This allows pipepair to create pipe objects. + friend int pipepair (zmq::object_t *parents_ [2], zmq::pipe_t* pipes_ [2], + int hwms_ [2], bool conflate_ [2]); + + public: + + // Specifies the object to send events to. + void set_event_sink (i_pipe_events *sink_); + + // Pipe endpoint can store an opaque ID to be used by its clients. + void set_identity (const blob_t &identity_); + blob_t get_identity (); + + // Returns true if there is at least one message to read in the pipe. + bool check_read (); + + // Reads a message to the underlying pipe. + bool read (msg_t *msg_); + + // Checks whether messages can be written to the pipe. If writing + // the message would cause high watermark the function returns false. + bool check_write (); + + // Writes a message to the underlying pipe. Returns false if the + // message cannot be written because high watermark was reached. + bool write (msg_t *msg_); + + // Remove unfinished parts of the outbound message from the pipe. + void rollback (); + + // Flush the messages downsteam. + void flush (); + + // Temporaraily disconnects the inbound message stream and drops + // all the messages on the fly. Causes 'hiccuped' event to be generated + // in the peer. + void hiccup (); + + // Ensure the pipe wont block on receiving pipe_term. + void set_nodelay (); + + // Ask pipe to terminate. The termination will happen asynchronously + // and user will be notified about actual deallocation by 'terminated' + // event. If delay is true, the pending messages will be processed + // before actual shutdown. + void terminate (bool delay_); + + // set the high water marks. + void set_hwms (int inhwm_, int outhwm_); + + private: + + // Type of the underlying lock-free pipe. + typedef ypipe_base_t upipe_t; + + // Command handlers. + void process_activate_read (); + void process_activate_write (uint64_t msgs_read_); + void process_hiccup (void *pipe_); + void process_pipe_term (); + void process_pipe_term_ack (); + + // Handler for delimiter read from the pipe. + void process_delimiter (); + + // Constructor is private. Pipe can only be created using + // pipepair function. + pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_, + int inhwm_, int outhwm_, bool conflate_); + + // Pipepair uses this function to let us know about + // the peer pipe object. + void set_peer (pipe_t *pipe_); + + // Destructor is private. Pipe objects destroy themselves. + ~pipe_t (); + + // Underlying pipes for both directions. + upipe_t *inpipe; + upipe_t *outpipe; + + // Can the pipe be read from / written to? + bool in_active; + bool out_active; + + // High watermark for the outbound pipe. + int hwm; + + // Low watermark for the inbound pipe. + int lwm; + + // Number of messages read and written so far. + uint64_t msgs_read; + uint64_t msgs_written; + + // Last received peer's msgs_read. The actual number in the peer + // can be higher at the moment. + uint64_t peers_msgs_read; + + // The pipe object on the other side of the pipepair. + pipe_t *peer; + + // Sink to send events to. + i_pipe_events *sink; + + // States of the pipe endpoint: + // active: common state before any termination begins, + // delimiter_received: delimiter was read from pipe before + // term command was received, + // waiting_fo_delimiter: term command was already received + // from the peer but there are still pending messages to read, + // term_ack_sent: all pending messages were already read and + // all we are waiting for is ack from the peer, + // term_req_sent1: 'terminate' was explicitly called by the user, + // term_req_sent2: user called 'terminate' and then we've got + // term command from the peer as well. + enum { + active, + delimiter_received, + waiting_for_delimiter, + term_ack_sent, + term_req_sent1, + term_req_sent2 + } state; + + // If true, we receive all the pending inbound messages before + // terminating. If false, we terminate immediately when the peer + // asks us to. + bool delay; + + // Identity of the writer. Used uniquely by the reader side. + blob_t identity; + + // Returns true if the message is delimiter; false otherwise. + static bool is_delimiter (msg_t &msg_); + + // Computes appropriate low watermark from the given high watermark. + static int compute_lwm (int hwm_); + + bool conflate; + + // Disable copying. + pipe_t (const pipe_t&); + const pipe_t &operator = (const pipe_t&); + }; + +} + +#endif -- cgit v1.2.3