From 597cc5edd624525563e6549dc0057eca2a51c81d Mon Sep 17 00:00:00 2001 From: Micah Anderson Date: Tue, 11 Nov 2014 13:30:46 -0500 Subject: upgrade to new version --- tests/test_monitor.cpp | 272 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 272 insertions(+) create mode 100644 tests/test_monitor.cpp (limited to 'tests/test_monitor.cpp') diff --git a/tests/test_monitor.cpp b/tests/test_monitor.cpp new file mode 100644 index 0000000..8437619 --- /dev/null +++ b/tests/test_monitor.cpp @@ -0,0 +1,272 @@ +/* + Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "testutil.hpp" + +// REQ socket events handled +static int req_socket_events; +// 2nd REQ socket events handled +static int req2_socket_events; +// REP socket events handled +static int rep_socket_events; + +std::string addr ; + +static bool read_msg(void* s, zmq_event_t& event, std::string& ep) +{ + int rc ; + zmq_msg_t msg1; // binary part + zmq_msg_init (&msg1); + zmq_msg_t msg2; // address part + zmq_msg_init (&msg2); + rc = zmq_msg_recv (&msg1, s, 0); + if (rc == -1 && zmq_errno() == ETERM) + return true ; + + assert (rc != -1); + assert (zmq_msg_more(&msg1) != 0); + rc = zmq_msg_recv (&msg2, s, 0); + if (rc == -1 && zmq_errno() == ETERM) + return true; + + assert (rc != -1); + assert (zmq_msg_more(&msg2) == 0); + // copy binary data to event struct + const char* data = (char*)zmq_msg_data(&msg1); + memcpy(&event.event, data, sizeof(event.event)); + memcpy(&event.value, data+sizeof(event.event), sizeof(event.value)); + // copy address part + ep = std::string((char*)zmq_msg_data(&msg2), zmq_msg_size(&msg2)); + + if (event.event == ZMQ_EVENT_MONITOR_STOPPED) + return true; + + return false; +} + + +// REQ socket monitor thread +static void req_socket_monitor (void *ctx) +{ + zmq_event_t event; + std::string ep ; + int rc; + + void *s = zmq_socket (ctx, ZMQ_PAIR); + assert (s); + + rc = zmq_connect (s, "inproc://monitor.req"); + assert (rc == 0); + while (!read_msg(s, event, ep)) { + assert (ep == addr); + switch (event.event) { + case ZMQ_EVENT_CONNECTED: + assert (event.value > 0); + req_socket_events |= ZMQ_EVENT_CONNECTED; + req2_socket_events |= ZMQ_EVENT_CONNECTED; + break; + case ZMQ_EVENT_CONNECT_DELAYED: + assert (event.value != 0); + req_socket_events |= ZMQ_EVENT_CONNECT_DELAYED; + break; + case ZMQ_EVENT_CLOSE_FAILED: + assert (event.value != 0); + req_socket_events |= ZMQ_EVENT_CLOSE_FAILED; + break; + case ZMQ_EVENT_CLOSED: + assert (event.value != 0); + req_socket_events |= ZMQ_EVENT_CLOSED; + break; + case ZMQ_EVENT_DISCONNECTED: + assert (event.value != 0); + req_socket_events |= ZMQ_EVENT_DISCONNECTED; + break; + } + } + zmq_close (s); +} + +// 2nd REQ socket monitor thread +static void req2_socket_monitor (void *ctx) +{ + zmq_event_t event; + std::string ep ; + int rc; + + void *s = zmq_socket (ctx, ZMQ_PAIR); + assert (s); + + rc = zmq_connect (s, "inproc://monitor.req2"); + assert (rc == 0); + while (!read_msg(s, event, ep)) { + assert (ep == addr); + switch (event.event) { + case ZMQ_EVENT_CONNECTED: + assert (event.value > 0); + req2_socket_events |= ZMQ_EVENT_CONNECTED; + break; + case ZMQ_EVENT_CLOSED: + assert (event.value != 0); + req2_socket_events |= ZMQ_EVENT_CLOSED; + break; + } + } + zmq_close (s); +} + +// REP socket monitor thread +static void rep_socket_monitor (void *ctx) +{ + zmq_event_t event; + std::string ep ; + int rc; + + void *s = zmq_socket (ctx, ZMQ_PAIR); + assert (s); + + rc = zmq_connect (s, "inproc://monitor.rep"); + assert (rc == 0); + while (!read_msg(s, event, ep)) { + assert (ep == addr); + switch (event.event) { + case ZMQ_EVENT_LISTENING: + assert (event.value > 0); + rep_socket_events |= ZMQ_EVENT_LISTENING; + break; + case ZMQ_EVENT_ACCEPTED: + assert (event.value > 0); + rep_socket_events |= ZMQ_EVENT_ACCEPTED; + break; + case ZMQ_EVENT_CLOSE_FAILED: + assert (event.value != 0); + rep_socket_events |= ZMQ_EVENT_CLOSE_FAILED; + break; + case ZMQ_EVENT_CLOSED: + assert (event.value != 0); + rep_socket_events |= ZMQ_EVENT_CLOSED; + break; + case ZMQ_EVENT_DISCONNECTED: + assert (event.value != 0); + rep_socket_events |= ZMQ_EVENT_DISCONNECTED; + break; + } + } + zmq_close (s); +} + +int main (void) +{ + setup_test_environment(); + int rc; + void *req; + void *req2; + void *rep; + void* threads [3]; + + addr = "tcp://127.0.0.1:5560"; + + // Create the infrastructure + void *ctx = zmq_ctx_new (); + assert (ctx); + + // REP socket + rep = zmq_socket (ctx, ZMQ_REP); + assert (rep); + + // Assert supported protocols + rc = zmq_socket_monitor (rep, addr.c_str(), 0); + assert (rc == -1); + assert (zmq_errno() == EPROTONOSUPPORT); + + // Deregister monitor + rc = zmq_socket_monitor (rep, NULL, 0); + assert (rc == 0); + + // REP socket monitor, all events + rc = zmq_socket_monitor (rep, "inproc://monitor.rep", ZMQ_EVENT_ALL); + assert (rc == 0); + threads [0] = zmq_threadstart(&rep_socket_monitor, ctx); + + // REQ socket + req = zmq_socket (ctx, ZMQ_REQ); + assert (req); + + // REQ socket monitor, all events + rc = zmq_socket_monitor (req, "inproc://monitor.req", ZMQ_EVENT_ALL); + assert (rc == 0); + threads [1] = zmq_threadstart(&req_socket_monitor, ctx); + msleep (SETTLE_TIME); + + // Bind REQ and REP + rc = zmq_bind (rep, addr.c_str()); + assert (rc == 0); + + rc = zmq_connect (req, addr.c_str()); + assert (rc == 0); + + bounce (rep, req); + + // 2nd REQ socket + req2 = zmq_socket (ctx, ZMQ_REQ); + assert (req2); + + // 2nd REQ socket monitor, connected event only + rc = zmq_socket_monitor (req2, "inproc://monitor.req2", ZMQ_EVENT_CONNECTED); + assert (rc == 0); + threads [2] = zmq_threadstart(&req2_socket_monitor, ctx); + + rc = zmq_connect (req2, addr.c_str()); + assert (rc == 0); + + // Close the REP socket + rc = zmq_close (rep); + assert (rc == 0); + + // Allow enough time for detecting error states + msleep (250); + + // Close the REQ socket + rc = zmq_close (req); + assert (rc == 0); + + // Close the 2nd REQ socket + rc = zmq_close (req2); + assert (rc == 0); + + zmq_ctx_term (ctx); + + // Expected REP socket events + assert (rep_socket_events & ZMQ_EVENT_LISTENING); + assert (rep_socket_events & ZMQ_EVENT_ACCEPTED); + assert (rep_socket_events & ZMQ_EVENT_CLOSED); + + // Expected REQ socket events + assert (req_socket_events & ZMQ_EVENT_CONNECTED); + assert (req_socket_events & ZMQ_EVENT_DISCONNECTED); + assert (req_socket_events & ZMQ_EVENT_CLOSED); + + // Expected 2nd REQ socket events + assert (req2_socket_events & ZMQ_EVENT_CONNECTED); + assert (!(req2_socket_events & ZMQ_EVENT_CLOSED)); + + for (unsigned int i = 0; i < 3; ++i) + zmq_threadclose(threads [i]); + + return 0 ; +} -- cgit v1.2.3