From 7d5c3dcd969161322deed6c43f8a6a3cb92c3369 Mon Sep 17 00:00:00 2001 From: Micah Anderson Date: Tue, 11 Nov 2014 11:53:55 -0500 Subject: upgrade to 14.4.1 --- examples/security/strawhouse.py | 94 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 94 insertions(+) create mode 100644 examples/security/strawhouse.py (limited to 'examples/security/strawhouse.py') diff --git a/examples/security/strawhouse.py b/examples/security/strawhouse.py new file mode 100644 index 0000000..dc75bd7 --- /dev/null +++ b/examples/security/strawhouse.py @@ -0,0 +1,94 @@ +#!/usr/bin/env python + +''' +Allow or deny clients based on IP address. + +Strawhouse, which is plain text with filtering on IP addresses. It still +uses the NULL mechanism, but we install an authentication hook that checks +the IP address against a whitelist or blacklist and allows or denies it +accordingly. + +Author: Chris Laws +''' + +import logging +import sys + +import zmq +import zmq.auth +from zmq.auth.thread import ThreadAuthenticator + + +def run(): + '''Run strawhouse client''' + + allow_test_pass = False + deny_test_pass = False + + ctx = zmq.Context().instance() + + # Start an authenticator for this context. + auth = ThreadAuthenticator(ctx) + auth.start() + + # Part 1 - demonstrate allowing clients based on IP address + auth.allow('127.0.0.1') + + server = ctx.socket(zmq.PUSH) + server.zap_domain = b'global' # must come before bind + server.bind('tcp://*:9000') + + client_allow = ctx.socket(zmq.PULL) + client_allow.connect('tcp://127.0.0.1:9000') + + server.send(b"Hello") + + msg = client_allow.recv() + if msg == b"Hello": + allow_test_pass = True + + client_allow.close() + + # Part 2 - demonstrate denying clients based on IP address + auth.stop() + + auth = ThreadAuthenticator(ctx) + auth.start() + + auth.deny('127.0.0.1') + + client_deny = ctx.socket(zmq.PULL) + client_deny.connect('tcp://127.0.0.1:9000') + + if server.poll(50, zmq.POLLOUT): + server.send(b"Hello") + + if client_deny.poll(50): + msg = client_deny.recv() + else: + deny_test_pass = True + else: + deny_test_pass = True + + client_deny.close() + + auth.stop() # stop auth thread + + if allow_test_pass and deny_test_pass: + logging.info("Strawhouse test OK") + else: + logging.error("Strawhouse test FAIL") + + +if __name__ == '__main__': + if zmq.zmq_version_info() < (4,0): + raise RuntimeError("Security is not supported in libzmq version < 4.0. libzmq version {0}".format(zmq.zmq_version())) + + if '-v' in sys.argv: + level = logging.DEBUG + else: + level = logging.INFO + + logging.basicConfig(level=level, format="[%(levelname)s] %(message)s") + + run() -- cgit v1.2.3