From 597cc5edd624525563e6549dc0057eca2a51c81d Mon Sep 17 00:00:00 2001 From: Micah Anderson Date: Tue, 11 Nov 2014 13:30:46 -0500 Subject: upgrade to new version --- src/stream_engine.cpp | 842 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 842 insertions(+) create mode 100644 src/stream_engine.cpp (limited to 'src/stream_engine.cpp') diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp new file mode 100644 index 0000000..4d252d8 --- /dev/null +++ b/src/stream_engine.cpp @@ -0,0 +1,842 @@ +/* + Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "platform.hpp" +#if defined ZMQ_HAVE_WINDOWS +#include "windows.hpp" +#else +#include +#include +#include +#include +#include +#include +#include +#endif + +#include +#include + +#include "stream_engine.hpp" +#include "io_thread.hpp" +#include "session_base.hpp" +#include "v1_encoder.hpp" +#include "v1_decoder.hpp" +#include "v2_encoder.hpp" +#include "v2_decoder.hpp" +#include "null_mechanism.hpp" +#include "plain_mechanism.hpp" +#include "curve_client.hpp" +#include "curve_server.hpp" +#include "raw_decoder.hpp" +#include "raw_encoder.hpp" +#include "config.hpp" +#include "err.hpp" +#include "ip.hpp" +#include "likely.hpp" +#include "wire.hpp" + +zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, + const std::string &endpoint_) : + s (fd_), + inpos (NULL), + insize (0), + decoder (NULL), + outpos (NULL), + outsize (0), + encoder (NULL), + handshaking (true), + greeting_size (v2_greeting_size), + greeting_bytes_read (0), + session (NULL), + options (options_), + endpoint (endpoint_), + plugged (false), + read_msg (&stream_engine_t::read_identity), + write_msg (&stream_engine_t::write_identity), + io_error (false), + subscription_required (false), + mechanism (NULL), + input_stopped (false), + output_stopped (false), + socket (NULL) +{ + int rc = tx_msg.init (); + errno_assert (rc == 0); + + // Put the socket into non-blocking mode. + unblock_socket (s); + + if (!get_peer_ip_address (s, peer_address)) + peer_address = ""; + +#ifdef SO_NOSIGPIPE + // Make sure that SIGPIPE signal is not generated when writing to a + // connection that was already closed by the peer. + int set = 1; + rc = setsockopt (s, SOL_SOCKET, SO_NOSIGPIPE, &set, sizeof (int)); + errno_assert (rc == 0); +#endif +} + +zmq::stream_engine_t::~stream_engine_t () +{ + zmq_assert (!plugged); + + if (s != retired_fd) { +#ifdef ZMQ_HAVE_WINDOWS + int rc = closesocket (s); + wsa_assert (rc != SOCKET_ERROR); +#else + int rc = close (s); + errno_assert (rc == 0); +#endif + s = retired_fd; + } + + int rc = tx_msg.close (); + errno_assert (rc == 0); + + delete encoder; + delete decoder; + delete mechanism; +} + +void zmq::stream_engine_t::plug (io_thread_t *io_thread_, + session_base_t *session_) +{ + zmq_assert (!plugged); + plugged = true; + + // Connect to session object. + zmq_assert (!session); + zmq_assert (session_); + session = session_; + socket = session-> get_socket (); + + // Connect to I/O threads poller object. + io_object_t::plug (io_thread_); + handle = add_fd (s); + io_error = false; + + if (options.raw_sock) { + // no handshaking for raw sock, instantiate raw encoder and decoders + encoder = new (std::nothrow) raw_encoder_t (out_batch_size); + alloc_assert (encoder); + + decoder = new (std::nothrow) raw_decoder_t (in_batch_size); + alloc_assert (decoder); + + // disable handshaking for raw socket + handshaking = false; + + read_msg = &stream_engine_t::pull_msg_from_session; + write_msg = &stream_engine_t::push_msg_to_session; + } + else { + // Send the 'length' and 'flags' fields of the identity message. + // The 'length' field is encoded in the long format. + outpos = greeting_send; + outpos [outsize++] = 0xff; + put_uint64 (&outpos [outsize], options.identity_size + 1); + outsize += 8; + outpos [outsize++] = 0x7f; + } + + set_pollin (handle); + set_pollout (handle); + // Flush all the data that may have been already received downstream. + in_event (); +} + +void zmq::stream_engine_t::unplug () +{ + zmq_assert (plugged); + plugged = false; + + // Cancel all fd subscriptions. + if (!io_error) + rm_fd (handle); + + // Disconnect from I/O threads poller object. + io_object_t::unplug (); + + session = NULL; +} + +void zmq::stream_engine_t::terminate () +{ + unplug (); + delete this; +} + +void zmq::stream_engine_t::in_event () +{ + assert (!io_error); + + // If still handshaking, receive and process the greeting message. + if (unlikely (handshaking)) + if (!handshake ()) + return; + + zmq_assert (decoder); + + // If there has been an I/O error, stop polling. + if (input_stopped) { + rm_fd (handle); + io_error = true; + return; + } + + // If there's no data to process in the buffer... + if (!insize) { + + // Retrieve the buffer and read as much data as possible. + // Note that buffer can be arbitrarily large. However, we assume + // the underlying TCP layer has fixed buffer size and thus the + // number of bytes read will be always limited. + size_t bufsize = 0; + decoder->get_buffer (&inpos, &bufsize); + + int const rc = read (inpos, bufsize); + if (rc == 0) { + error (); + return; + } + if (rc == -1) { + if (errno != EAGAIN) + error (); + return; + } + + // Adjust input size + insize = static_cast (rc); + } + + int rc = 0; + size_t processed = 0; + + while (insize > 0) { + rc = decoder->decode (inpos, insize, processed); + zmq_assert (processed <= insize); + inpos += processed; + insize -= processed; + if (rc == 0 || rc == -1) + break; + rc = (this->*write_msg) (decoder->msg ()); + if (rc == -1) + break; + } + + // Tear down the connection if we have failed to decode input data + // or the session has rejected the message. + if (rc == -1) { + if (errno != EAGAIN) { + error (); + return; + } + input_stopped = true; + reset_pollin (handle); + } + + session->flush (); +} + +void zmq::stream_engine_t::out_event () +{ + zmq_assert (!io_error); + + // If write buffer is empty, try to read new data from the encoder. + if (!outsize) { + + // Even when we stop polling as soon as there is no + // data to send, the poller may invoke out_event one + // more time due to 'speculative write' optimisation. + if (unlikely (encoder == NULL)) { + zmq_assert (handshaking); + return; + } + + outpos = NULL; + outsize = encoder->encode (&outpos, 0); + + while (outsize < out_batch_size) { + if ((this->*read_msg) (&tx_msg) == -1) + break; + encoder->load_msg (&tx_msg); + unsigned char *bufptr = outpos + outsize; + size_t n = encoder->encode (&bufptr, out_batch_size - outsize); + zmq_assert (n > 0); + if (outpos == NULL) + outpos = bufptr; + outsize += n; + } + + // If there is no data to send, stop polling for output. + if (outsize == 0) { + output_stopped = true; + reset_pollout (handle); + return; + } + } + + // If there are any data to write in write buffer, write as much as + // possible to the socket. Note that amount of data to write can be + // arbitrarily large. However, we assume that underlying TCP layer has + // limited transmission buffer and thus the actual number of bytes + // written should be reasonably modest. + int nbytes = write (outpos, outsize); + + // IO error has occurred. We stop waiting for output events. + // The engine is not terminated until we detect input error; + // this is necessary to prevent losing incoming messages. + if (nbytes == -1) { + reset_pollout (handle); + return; + } + + outpos += nbytes; + outsize -= nbytes; + + // If we are still handshaking and there are no data + // to send, stop polling for output. + if (unlikely (handshaking)) + if (outsize == 0) + reset_pollout (handle); +} + +void zmq::stream_engine_t::restart_output () +{ + if (unlikely (io_error)) + return; + + if (likely (output_stopped)) { + set_pollout (handle); + output_stopped = false; + } + + // Speculative write: The assumption is that at the moment new message + // was sent by the user the socket is probably available for writing. + // Thus we try to write the data to socket avoiding polling for POLLOUT. + // Consequently, the latency should be better in request/reply scenarios. + out_event (); +} + +void zmq::stream_engine_t::restart_input () +{ + zmq_assert (input_stopped); + zmq_assert (session != NULL); + zmq_assert (decoder != NULL); + + int rc = (this->*write_msg) (decoder->msg ()); + if (rc == -1) { + if (errno == EAGAIN) + session->flush (); + else + error (); + return; + } + + while (insize > 0) { + size_t processed = 0; + rc = decoder->decode (inpos, insize, processed); + zmq_assert (processed <= insize); + inpos += processed; + insize -= processed; + if (rc == 0 || rc == -1) + break; + rc = (this->*write_msg) (decoder->msg ()); + if (rc == -1) + break; + } + + if (rc == -1 && errno == EAGAIN) + session->flush (); + else + if (rc == -1 || io_error) + error (); + else { + input_stopped = false; + set_pollin (handle); + session->flush (); + + // Speculative read. + in_event (); + } +} + +bool zmq::stream_engine_t::handshake () +{ + zmq_assert (handshaking); + zmq_assert (greeting_bytes_read < greeting_size); + // Receive the greeting. + while (greeting_bytes_read < greeting_size) { + const int n = read (greeting_recv + greeting_bytes_read, + greeting_size - greeting_bytes_read); + if (n == 0) { + error (); + return false; + } + if (n == -1) { + if (errno != EAGAIN) + error (); + return false; + } + + greeting_bytes_read += n; + + // We have received at least one byte from the peer. + // If the first byte is not 0xff, we know that the + // peer is using unversioned protocol. + if (greeting_recv [0] != 0xff) + break; + + if (greeting_bytes_read < signature_size) + continue; + + // Inspect the right-most bit of the 10th byte (which coincides + // with the 'flags' field if a regular message was sent). + // Zero indicates this is a header of identity message + // (i.e. the peer is using the unversioned protocol). + if (!(greeting_recv [9] & 0x01)) + break; + + // The peer is using versioned protocol. + // Send the major version number. + if (outpos + outsize == greeting_send + signature_size) { + if (outsize == 0) + set_pollout (handle); + outpos [outsize++] = 3; // Major version number + } + + if (greeting_bytes_read > signature_size) { + if (outpos + outsize == greeting_send + signature_size + 1) { + if (outsize == 0) + set_pollout (handle); + + // Use ZMTP/2.0 to talk to older peers. + if (greeting_recv [10] == ZMTP_1_0 + || greeting_recv [10] == ZMTP_2_0) + outpos [outsize++] = options.type; + else { + outpos [outsize++] = 0; // Minor version number + memset (outpos + outsize, 0, 20); + + zmq_assert (options.mechanism == ZMQ_NULL + || options.mechanism == ZMQ_PLAIN + || options.mechanism == ZMQ_CURVE); + + if (options.mechanism == ZMQ_NULL) + memcpy (outpos + outsize, "NULL", 4); + else + if (options.mechanism == ZMQ_PLAIN) + memcpy (outpos + outsize, "PLAIN", 5); + else + memcpy (outpos + outsize, "CURVE", 5); + outsize += 20; + memset (outpos + outsize, 0, 32); + outsize += 32; + greeting_size = v3_greeting_size; + } + } + } + } + + // Position of the revision field in the greeting. + const size_t revision_pos = 10; + + // Is the peer using ZMTP/1.0 with no revision number? + // If so, we send and receive rest of identity message + if (greeting_recv [0] != 0xff || !(greeting_recv [9] & 0x01)) { + encoder = new (std::nothrow) v1_encoder_t (out_batch_size); + alloc_assert (encoder); + + decoder = new (std::nothrow) v1_decoder_t (in_batch_size, options.maxmsgsize); + alloc_assert (decoder); + + // We have already sent the message header. + // Since there is no way to tell the encoder to + // skip the message header, we simply throw that + // header data away. + const size_t header_size = options.identity_size + 1 >= 255 ? 10 : 2; + unsigned char tmp [10], *bufferp = tmp; + + // Prepare the identity message and load it into encoder. + // Then consume bytes we have already sent to the peer. + const int rc = tx_msg.init_size (options.identity_size); + zmq_assert (rc == 0); + memcpy (tx_msg.data (), options.identity, options.identity_size); + encoder->load_msg (&tx_msg); + size_t buffer_size = encoder->encode (&bufferp, header_size); + zmq_assert (buffer_size == header_size); + + // Make sure the decoder sees the data we have already received. + inpos = greeting_recv; + insize = greeting_bytes_read; + + // To allow for interoperability with peers that do not forward + // their subscriptions, we inject a phantom subscription message + // message into the incoming message stream. + if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) + subscription_required = true; + + // We are sending our identity now and the next message + // will come from the socket. + read_msg = &stream_engine_t::pull_msg_from_session; + + // We are expecting identity message. + write_msg = &stream_engine_t::write_identity; + } + else + if (greeting_recv [revision_pos] == ZMTP_1_0) { + encoder = new (std::nothrow) v1_encoder_t ( + out_batch_size); + alloc_assert (encoder); + + decoder = new (std::nothrow) v1_decoder_t ( + in_batch_size, options.maxmsgsize); + alloc_assert (decoder); + } + else + if (greeting_recv [revision_pos] == ZMTP_2_0) { + encoder = new (std::nothrow) v2_encoder_t (out_batch_size); + alloc_assert (encoder); + + decoder = new (std::nothrow) v2_decoder_t ( + in_batch_size, options.maxmsgsize); + alloc_assert (decoder); + } + else { + encoder = new (std::nothrow) v2_encoder_t (out_batch_size); + alloc_assert (encoder); + + decoder = new (std::nothrow) v2_decoder_t ( + in_batch_size, options.maxmsgsize); + alloc_assert (decoder); + + if (options.mechanism == ZMQ_NULL + && memcmp (greeting_recv + 12, "NULL\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) == 0) { + mechanism = new (std::nothrow) + null_mechanism_t (session, peer_address, options); + alloc_assert (mechanism); + } + else + if (options.mechanism == ZMQ_PLAIN + && memcmp (greeting_recv + 12, "PLAIN\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) == 0) { + mechanism = new (std::nothrow) + plain_mechanism_t (session, peer_address, options); + alloc_assert (mechanism); + } +#ifdef HAVE_LIBSODIUM + else + if (options.mechanism == ZMQ_CURVE + && memcmp (greeting_recv + 12, "CURVE\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) == 0) { + if (options.as_server) + mechanism = new (std::nothrow) + curve_server_t (session, peer_address, options); + else + mechanism = new (std::nothrow) curve_client_t (options); + alloc_assert (mechanism); + } +#endif + else { + error (); + return false; + } + read_msg = &stream_engine_t::next_handshake_command; + write_msg = &stream_engine_t::process_handshake_command; + } + + // Start polling for output if necessary. + if (outsize == 0) + set_pollout (handle); + + // Handshaking was successful. + // Switch into the normal message flow. + handshaking = false; + + return true; +} + +int zmq::stream_engine_t::read_identity (msg_t *msg_) +{ + int rc = msg_->init_size (options.identity_size); + errno_assert (rc == 0); + if (options.identity_size > 0) + memcpy (msg_->data (), options.identity, options.identity_size); + read_msg = &stream_engine_t::pull_msg_from_session; + return 0; +} + +int zmq::stream_engine_t::write_identity (msg_t *msg_) +{ + if (options.recv_identity) { + msg_->set_flags (msg_t::identity); + int rc = session->push_msg (msg_); + errno_assert (rc == 0); + } + else { + int rc = msg_->close (); + errno_assert (rc == 0); + rc = msg_->init (); + errno_assert (rc == 0); + } + + if (subscription_required) + write_msg = &stream_engine_t::write_subscription_msg; + else + write_msg = &stream_engine_t::push_msg_to_session; + + return 0; +} + +int zmq::stream_engine_t::next_handshake_command (msg_t *msg_) +{ + zmq_assert (mechanism != NULL); + + const int rc = mechanism->next_handshake_command (msg_); + if (rc == 0) { + msg_->set_flags (msg_t::command); + if (mechanism->is_handshake_complete ()) + mechanism_ready (); + } + + return rc; +} + +int zmq::stream_engine_t::process_handshake_command (msg_t *msg_) +{ + zmq_assert (mechanism != NULL); + const int rc = mechanism->process_handshake_command (msg_); + if (rc == 0) { + if (mechanism->is_handshake_complete ()) + mechanism_ready (); + if (output_stopped) + restart_output (); + } + + return rc; +} + +void zmq::stream_engine_t::zap_msg_available () +{ + zmq_assert (mechanism != NULL); + + const int rc = mechanism->zap_msg_available (); + if (rc == -1) { + error (); + return; + } + if (input_stopped) + restart_input (); + if (output_stopped) + restart_output (); +} + +void zmq::stream_engine_t::mechanism_ready () +{ + if (options.recv_identity) { + msg_t identity; + mechanism->peer_identity (&identity); + const int rc = session->push_msg (&identity); + if (rc == -1 && errno == EAGAIN) { + // If the write is failing at this stage with + // an EAGAIN the pipe must be being shut down, + // so we can just bail out of the identity set. + return; + } + errno_assert (rc == 0); + session->flush (); + } + + read_msg = &stream_engine_t::pull_and_encode; + write_msg = &stream_engine_t::decode_and_push; +} + +int zmq::stream_engine_t::pull_msg_from_session (msg_t *msg_) +{ + return session->pull_msg (msg_); +} + +int zmq::stream_engine_t::push_msg_to_session (msg_t *msg_) +{ + return session->push_msg (msg_); +} + +int zmq::stream_engine_t::pull_and_encode (msg_t *msg_) +{ + zmq_assert (mechanism != NULL); + + if (session->pull_msg (msg_) == -1) + return -1; + if (mechanism->encode (msg_) == -1) + return -1; + return 0; +} + +int zmq::stream_engine_t::decode_and_push (msg_t *msg_) +{ + zmq_assert (mechanism != NULL); + + if (mechanism->decode (msg_) == -1) + return -1; + if (session->push_msg (msg_) == -1) { + if (errno == EAGAIN) + write_msg = &stream_engine_t::push_one_then_decode_and_push; + return -1; + } + return 0; +} + +int zmq::stream_engine_t::push_one_then_decode_and_push (msg_t *msg_) +{ + const int rc = session->push_msg (msg_); + if (rc == 0) + write_msg = &stream_engine_t::decode_and_push; + return rc; +} + +int zmq::stream_engine_t::write_subscription_msg (msg_t *msg_) +{ + msg_t subscription; + + // Inject the subscription message, so that also + // ZMQ 2.x peers receive published messages. + int rc = subscription.init_size (1); + errno_assert (rc == 0); + *(unsigned char*) subscription.data () = 1; + rc = session->push_msg (&subscription); + if (rc == -1) + return -1; + + write_msg = &stream_engine_t::push_msg_to_session; + return push_msg_to_session (msg_); +} + +void zmq::stream_engine_t::error () +{ + zmq_assert (session); + socket->event_disconnected (endpoint, s); + session->flush (); + session->detach (); + unplug (); + delete this; +} + +int zmq::stream_engine_t::write (const void *data_, size_t size_) +{ +#ifdef ZMQ_HAVE_WINDOWS + + int nbytes = send (s, (char*) data_, (int) size_, 0); + + // If not a single byte can be written to the socket in non-blocking mode + // we'll get an error (this may happen during the speculative write). + if (nbytes == SOCKET_ERROR && WSAGetLastError () == WSAEWOULDBLOCK) + return 0; + + // Signalise peer failure. + if (nbytes == SOCKET_ERROR && ( + WSAGetLastError () == WSAENETDOWN || + WSAGetLastError () == WSAENETRESET || + WSAGetLastError () == WSAEHOSTUNREACH || + WSAGetLastError () == WSAECONNABORTED || + WSAGetLastError () == WSAETIMEDOUT || + WSAGetLastError () == WSAECONNRESET)) + return -1; + + wsa_assert (nbytes != SOCKET_ERROR); + return nbytes; + +#else + + ssize_t nbytes = send (s, data_, size_, 0); + + // Several errors are OK. When speculative write is being done we may not + // be able to write a single byte from the socket. Also, SIGSTOP issued + // by a debugging tool can result in EINTR error. + if (nbytes == -1 && (errno == EAGAIN || errno == EWOULDBLOCK || + errno == EINTR)) + return 0; + + // Signalise peer failure. + if (nbytes == -1) { + errno_assert (errno != EACCES + && errno != EBADF + && errno != EDESTADDRREQ + && errno != EFAULT + && errno != EINVAL + && errno != EISCONN + && errno != EMSGSIZE + && errno != ENOMEM + && errno != ENOTSOCK + && errno != EOPNOTSUPP); + return -1; + } + + return static_cast (nbytes); + +#endif +} + +int zmq::stream_engine_t::read (void *data_, size_t size_) +{ +#ifdef ZMQ_HAVE_WINDOWS + + const int rc = recv (s, (char*) data_, (int) size_, 0); + + // If not a single byte can be read from the socket in non-blocking mode + // we'll get an error (this may happen during the speculative read). + if (rc == SOCKET_ERROR) { + if (WSAGetLastError () == WSAEWOULDBLOCK) + errno = EAGAIN; + else { + wsa_assert (WSAGetLastError () == WSAENETDOWN + || WSAGetLastError () == WSAENETRESET + || WSAGetLastError () == WSAECONNABORTED + || WSAGetLastError () == WSAETIMEDOUT + || WSAGetLastError () == WSAECONNRESET + || WSAGetLastError () == WSAECONNREFUSED + || WSAGetLastError () == WSAENOTCONN); + errno = wsa_error_to_errno (WSAGetLastError ()); + } + } + + return rc == SOCKET_ERROR? -1: rc; + +#else + + const ssize_t rc = recv (s, data_, size_, 0); + + // Several errors are OK. When speculative read is being done we may not + // be able to read a single byte from the socket. Also, SIGSTOP issued + // by a debugging tool can result in EINTR error. + if (rc == -1) { + errno_assert (errno != EBADF + && errno != EFAULT + && errno != EINVAL + && errno != ENOMEM + && errno != ENOTSOCK); + if (errno == EWOULDBLOCK || errno == EINTR) + errno = EAGAIN; + } + + return static_cast (rc); + +#endif +} -- cgit v1.2.3