#!/usr/bin/env python
# -*- coding: utf-8 -*-
# bitmask_cli
# Copyright (C) 2015, 2016 LEAP
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
"""
Bitmask Command Line interface: zmq client.
"""
import json
import sys
import getpass
import argparse
from colorama import init as color_init
from colorama import Fore
from twisted.internet import reactor
from txzmq import ZmqEndpoint, ZmqEndpointType
from txzmq import ZmqFactory, ZmqREQConnection
from txzmq import ZmqRequestTimeoutError
from leap.bitmask.core import ENDPOINT
class BitmaskCLI(object):
def __init__(self):
parser = argparse.ArgumentParser(
usage='''bitmask_cli []
Controls the Bitmask application.
SERVICE COMMANDS:
user Handles Bitmask accounts
mail Bitmask Encrypted Mail
eip Encrypted Internet Proxy
keys Bitmask Keymanager
GENERAL COMMANDS:
version prints version number and exit
launch launch the Bitmask backend daemon
shutdown shutdown Bitmask backend daemon
status displays general status about the running Bitmask services
debug show some debug info about bitmask-core
''', epilog=("Use 'bitmask_cli --help' to learn more "
"about each command."))
parser.add_argument('command', help='Subcommand to run')
# parse_args defaults to [1:] for args, but you need to
# exclude the rest of the args too, or validation will fail
args = parser.parse_args(sys.argv[1:2])
self.args = args
self.subargs = None
if not hasattr(self, args.command):
print 'Unrecognized command'
parser.print_help()
exit(1)
# use dispatch pattern to invoke method with same name
getattr(self, args.command)()
def user(self):
parser = argparse.ArgumentParser(
description=('Handles Bitmask accounts: creation, authentication '
'and modification'),
prog='bitmask_cli user')
parser.add_argument('username', nargs='?',
help='username ID, in the form ')
parser.add_argument('--create', action='store_true',
help='register a new user, if possible')
parser.add_argument('--authenticate', action='store_true',
help='logs in against the provider')
parser.add_argument('--logout', action='store_true',
help='ends any active session with the provider')
parser.add_argument('--active', action='store_true',
help='shows the active user, if any')
# now that we're inside a subcommand, ignore the first
# TWO argvs, ie the command (bitmask_cli) and the subcommand (user)
args = parser.parse_args(sys.argv[2:])
self.subargs = args
def mail(self):
parser = argparse.ArgumentParser(
description='Bitmask Encrypted Mail service',
prog='bitmask_cli mail')
parser.add_argument('--start', action='store_true',
help='tries to start the mail service')
parser.add_argument('--stop', action='store_true',
help='stops the mail service if running')
parser.add_argument('--status', action='store_true',
help='displays status about the mail service')
parser.add_argument('--enable', action='store_true')
parser.add_argument('--disable', action='store_true')
parser.add_argument('--get-imap-token', action='store_true',
help='returns token for the IMAP service')
parser.add_argument('--get-smtp-token', action='store_true',
help='returns token for the SMTP service')
parser.add_argument('--get-smtp-certificate', action='store_true',
help='downloads a new smtp certificate')
parser.add_argument('--check-smtp-certificate', action='store_true',
help='downloads a new smtp certificate '
'(NOT IMPLEMENTED)')
args = parser.parse_args(sys.argv[2:])
self.subargs = args
def eip(self):
parser = argparse.ArgumentParser(
description='Encrypted Internet Proxy service',
prog='bitmask_cli eip')
parser.add_argument('--start', action='store_true',
help='Start service')
parser.add_argument('--stop', action='store_true', help='Stop service')
parser.add_argument('--status', action='store_true',
help='Display status about service')
parser.add_argument('--enable', action='store_true')
parser.add_argument('--disable', action='store_true')
args = parser.parse_args(sys.argv[2:])
self.subargs = args
def keys(self):
parser = argparse.ArgumentParser(
description='Bitmask Keymanager management service',
prog='bitmask_cli keys')
parser.add_argument('--status', action='store_true',
help='Display status about service')
parser.add_argument('--list-keys', action='store_true',
help='List all known keys')
parser.add_argument('--export-key', action='store_true',
help='Export the given key')
args = parser.parse_args(sys.argv[2:])
self.subargs = args
# Single commands
def launch(self):
pass
def shutdown(self):
pass
def status(self):
pass
def version(self):
pass
def debug(self):
pass
def get_zmq_connection():
zf = ZmqFactory()
e = ZmqEndpoint(ZmqEndpointType.connect, ENDPOINT)
return ZmqREQConnection(zf, e)
def error(msg, stop=False):
print Fore.RED + "[!] %s" % msg + Fore.RESET
if stop:
reactor.stop()
else:
sys.exit(1)
def timeout_handler(failure, stop_reactor=True):
# TODO ---- could try to launch the bitmask daemon here and retry
if failure.trap(ZmqRequestTimeoutError) == ZmqRequestTimeoutError:
print (Fore.RED + "[ERROR] Timeout contacting the bitmask daemon. "
"Is it running?" + Fore.RESET)
reactor.stop()
def do_print_result(stuff):
obj = json.loads(stuff[0])
if not obj['error']:
print Fore.GREEN + '%s' % obj['result'] + Fore.RESET
else:
print Fore.RED + 'ERROR:' + '%s' % obj['error'] + Fore.RESET
def send_command(cli):
args = cli.args
subargs = cli.subargs
cb = do_print_result
cmd = args.command
if cmd == 'launch':
# XXX careful! Should see if the process in PID is running,
# avoid launching again.
import commands
commands.getoutput('bitmaskd')
reactor.stop()
return
elif cmd == 'version':
do_print_result([json.dumps(
{'result': 'bitmask_cli: 0.0.1',
'error': None})])
data = ('version',)
elif cmd == 'status':
data = ('status',)
elif cmd == 'shutdown':
data = ('shutdown',)
elif cmd == 'debug':
data = ('stats',)
elif cmd == 'user':
username = subargs.username
if username and '@' not in username:
error("Username ID must be in the form ",
stop=True)
return
if not username:
username = ''
# TODO check that ONLY ONE FLAG is True
# TODO check that AT LEAST ONE FLAG is True
passwd = getpass.getpass()
data = ['user']
if subargs.active:
data += ['active', '', '']
elif subargs.create:
data += ['signup', username, passwd]
elif subargs.authenticate:
data += ['authenticate', username, passwd]
elif subargs.logout:
data += ['logout', username]
else:
error('Use bitmask_cli user --help to see available subcommands')
return
elif cmd == 'mail':
data = ['mail']
if subargs.status:
data += ['status']
elif subargs.enable:
data += ['enable']
elif subargs.disable:
data += ['disable']
elif subargs.get_imap_token:
data += ['get_imap_token']
elif subargs.get_smtp_token:
data += ['get_smtp_token']
elif subargs.get_smtp_certificate:
data += ['get_smtp_certificate']
else:
error('Use bitmask_cli mail --help to see available subcommands')
return
elif cmd == 'eip':
data = ['eip']
if subargs.status:
data += ['status']
elif subargs.enable:
data += ['enable']
elif subargs.disable:
data += ['disable']
elif subargs.start:
data += ['start']
elif subargs.stop:
data += ['stop']
else:
error('Use bitmask_cli eip --help to see available subcommands',
stop=True)
return
elif cmd == 'keys':
data = ['keys']
if subargs.status:
data += ['status']
elif subargs.list_keys:
data += ['list_keys']
elif subargs.export_key:
data += ['export_keys']
else:
error('Use bitmask_cli keys --help to see available subcommands',
stop=True)
return
s = get_zmq_connection()
d = s.sendMsg(*data, timeout=60)
d.addCallback(cb)
d.addCallback(lambda x: reactor.stop())
d.addErrback(timeout_handler)
def main():
color_init()
cli = BitmaskCLI()
reactor.callWhenRunning(reactor.callLater, 0, send_command, cli)
reactor.run()
if __name__ == "__main__":
main()