diff options
author | Kali Kaneko <kali@leap.se> | 2016-03-04 12:02:26 -0400 |
---|---|---|
committer | Kali Kaneko <kali@leap.se> | 2016-04-19 09:59:25 -0400 |
commit | f4503842fdc288fb6336211099ad0a8d01165df3 (patch) | |
tree | 05bb4d2a2a2be4fb02d8048df5c426f2b0e20243 /src/leap/bitmask/core/_zmq.py | |
parent | 6fd1c73db49c5e1e08cf7963017470511fef0059 (diff) |
[feature] landing of bitmask.core
Diffstat (limited to 'src/leap/bitmask/core/_zmq.py')
-rw-r--r-- | src/leap/bitmask/core/_zmq.py | 68 |
1 files changed, 68 insertions, 0 deletions
diff --git a/src/leap/bitmask/core/_zmq.py b/src/leap/bitmask/core/_zmq.py new file mode 100644 index 00000000..a656fc65 --- /dev/null +++ b/src/leap/bitmask/core/_zmq.py @@ -0,0 +1,68 @@ +# -*- coding: utf-8 -*- +# _zmq.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +ZMQ REQ-REP Dispatcher. +""" + +from twisted.application import service +from twisted.internet import reactor +from twisted.python import log + +from txzmq import ZmqEndpoint, ZmqEndpointType +from txzmq import ZmqFactory, ZmqREPConnection + +from leap.bitmask.core import ENDPOINT +from leap.bitmask.core.dispatcher import CommandDispatcher + + +class ZMQServerService(service.Service): + + def __init__(self, core): + self._core = core + + def startService(self): + zf = ZmqFactory() + e = ZmqEndpoint(ZmqEndpointType.bind, ENDPOINT) + + self._conn = _DispatcherREPConnection(zf, e, self._core) + reactor.callWhenRunning(self._conn.do_greet) + service.Service.startService(self) + + def stopService(self): + service.Service.stopService(self) + + +class _DispatcherREPConnection(ZmqREPConnection): + + def __init__(self, zf, e, core): + ZmqREPConnection.__init__(self, zf, e) + self.dispatcher = CommandDispatcher(core) + + def gotMessage(self, msgId, *parts): + + r = self.dispatcher.dispatch(parts) + r.addCallback(self.defer_reply, msgId) + + def defer_reply(self, response, msgId): + reactor.callLater(0, self.reply, msgId, str(response)) + + def log_err(self, failure, msgId): + log.err(failure) + self.defer_reply("ERROR: %r" % failure, msgId) + + def do_greet(self): + log.msg('starting ZMQ dispatcher') |