diff options
Diffstat (limited to 'scripts/docker/files/bin/client_side_db.py')
| -rw-r--r-- | scripts/docker/files/bin/client_side_db.py | 321 | 
1 files changed, 0 insertions, 321 deletions
| diff --git a/scripts/docker/files/bin/client_side_db.py b/scripts/docker/files/bin/client_side_db.py deleted file mode 100644 index 80da7392..00000000 --- a/scripts/docker/files/bin/client_side_db.py +++ /dev/null @@ -1,321 +0,0 @@ -#!/usr/bin/python - -import os -import argparse -import tempfile -import getpass -import requests -import srp._pysrp as srp -import binascii -import logging -import json -import time - -from twisted.internet import reactor -from twisted.internet.defer import inlineCallbacks - -from leap.soledad.client import Soledad -from leap.keymanager import KeyManager -from leap.keymanager.openpgp import OpenPGPKey - -from leap.common.events import server -server.ensure_server() - -from util import ValidateUserHandle - - -""" -Script to give access to client-side Soledad database. - -This is mainly used for tests, but can also be used to recover data from a -Soledad database (public/private keys, export documents, etc). - -To speed up testing/debugging, this script can dump the auth data after -logging in. Use the --export-auth-data option to export auth data to a file. -The contents of the file is a json dictionary containing the uuid, server_url, -cert_file and token, which is enough info to instantiate a soledad client -without having to interact with the webapp again. Use the --use-auth-data -option to use the auth data stored in a file. - -Use the --help option to see available options. -""" - - -# create a logger -logger = logging.getLogger(__name__) -LOG_FORMAT = '%(asctime)s %(message)s' -logging.basicConfig(format=LOG_FORMAT, level=logging.DEBUG) - - -safe_unhexlify = lambda x: binascii.unhexlify(x) if ( -    len(x) % 2 == 0) else binascii.unhexlify('0' + x) - - -def _fail(reason): -    logger.error('Fail: ' + reason) -    exit(2) - - -def _get_api_info(provider): -    info = requests.get( -        'https://' + provider + '/provider.json', verify=False).json() -    return info['api_uri'], info['api_version'] - - -def _login(username, passphrase, provider, api_uri, api_version): -    usr = srp.User(username, passphrase, srp.SHA256, srp.NG_1024) -    auth = None -    try: -        auth = _authenticate(api_uri, api_version, usr).json() -    except requests.exceptions.ConnectionError: -        _fail('Could not connect to server.') -    if 'errors' in auth: -        _fail(str(auth['errors'])) -    return api_uri, api_version, auth - - -def _authenticate(api_uri, api_version, usr): -    api_url = "%s/%s" % (api_uri, api_version) -    session = requests.session() -    uname, A = usr.start_authentication() -    params = {'login': uname, 'A': binascii.hexlify(A)} -    init = session.post( -        api_url + '/sessions', data=params, verify=False).json() -    if 'errors' in init: -        _fail('test user not found') -    M = usr.process_challenge( -        safe_unhexlify(init['salt']), safe_unhexlify(init['B'])) -    return session.put(api_url + '/sessions/' + uname, verify=False, -                       data={'client_auth': binascii.hexlify(M)}) - - -def _get_soledad_info(username, provider, passphrase, basedir): -    api_uri, api_version = _get_api_info(provider) -    auth = _login(username, passphrase, provider, api_uri, api_version) -    # get soledad server url -    service_url = '%s/%s/config/soledad-service.json' % \ -                  (api_uri, api_version) -    soledad_hosts = requests.get(service_url, verify=False).json()['hosts'] -    hostnames = soledad_hosts.keys() -    # allow for choosing the host -    host = hostnames[0] -    if len(hostnames) > 1: -        i = 1 -        print "There are many available hosts:" -        for h in hostnames: -            print "  (%d) %s.%s" % (i, h, provider) -            i += 1 -        choice = raw_input("Choose a host to use (default: 1): ") -        if choice != '': -            host = hostnames[int(choice) - 1] -    server_url = 'https://%s:%d/user-%s' % \ -        (soledad_hosts[host]['hostname'], soledad_hosts[host]['port'], -         auth[2]['id']) -    # get provider ca certificate -    ca_cert = requests.get('https://%s/ca.crt' % provider, verify=False).text -    cert_file = os.path.join(basedir, 'ca.crt') -    with open(cert_file, 'w') as f: -        f.write(ca_cert) -    return auth[2]['id'], server_url, cert_file, auth[2]['token'] - - -def _get_soledad_instance(uuid, passphrase, basedir, server_url, cert_file, -                          token): -    # setup soledad info -    logger.info('UUID is %s' % uuid) -    logger.info('Server URL is %s' % server_url) -    secrets_path = os.path.join( -        basedir, '%s.secret' % uuid) -    local_db_path = os.path.join( -        basedir, '%s.db' % uuid) -    # instantiate soledad -    return Soledad( -        uuid, -        unicode(passphrase), -        secrets_path=secrets_path, -        local_db_path=local_db_path, -        server_url=server_url, -        cert_file=cert_file, -        auth_token=token) - - -def _get_keymanager_instance(username, provider, soledad, token, -                             ca_cert_path=None, api_uri=None, api_version=None, -                             uid=None, gpgbinary=None): -    return KeyManager( -        "{username}@{provider}".format(username=username, provider=provider), -        "http://uri", -        soledad, -        token=token, -        ca_cert_path=ca_cert_path, -        api_uri=api_uri, -        api_version=api_version, -        uid=uid, -        gpgbinary=gpgbinary) - - -def _parse_args(): -    # parse command line -    parser = argparse.ArgumentParser() -    parser.add_argument( -        'user@provider', action=ValidateUserHandle, help='the user handle') -    parser.add_argument( -        '--basedir', '-b', default=None, -        help='soledad base directory') -    parser.add_argument( -        '--passphrase', '-p', default=None, -        help='the user passphrase') -    parser.add_argument( -        '--get-all-docs', '-a', action='store_true', -        help='get all documents from the local database') -    parser.add_argument( -        '--create-docs', '-c', default=0, type=int, -        help='create a number of documents') -    parser.add_argument( -        '--sync', '-s', action='store_true', -        help='synchronize with the server replica') -    parser.add_argument( -        '--repeat-sync', '-r', action='store_true', -        help='repeat synchronization until no new data is received') -    parser.add_argument( -        '--export-public-key', help="export the public key to a file") -    parser.add_argument( -        '--export-private-key', help="export the private key to a file") -    parser.add_argument( -        '--export-incoming-messages', -        help="export incoming messages to a directory") -    parser.add_argument( -        '--export-auth-data', -        help="export authentication data to a file") -    parser.add_argument( -        '--use-auth-data', -        help="use authentication data from a file") -    return parser.parse_args() - - -def _get_passphrase(args): -    passphrase = args.passphrase -    if passphrase is None: -        passphrase = getpass.getpass( -            'Password for %s@%s: ' % (args.username, args.provider)) -    return passphrase - - -def _get_basedir(args): -    basedir = args.basedir -    if basedir is None: -        basedir = tempfile.mkdtemp() -    elif not os.path.isdir(basedir): -        os.mkdir(basedir) -    logger.info('Using %s as base directory.' % basedir) -    return basedir - - -@inlineCallbacks -def _export_key(args, km, fname, private=False): -    address = args.username + "@" + args.provider -    pkey = yield km.get_key( -        address, OpenPGPKey, private=private, fetch_remote=False) -    with open(args.export_private_key, "w") as f: -        f.write(pkey.key_data) - - -@inlineCallbacks -def _export_incoming_messages(soledad, directory): -    yield soledad.create_index("by-incoming", "bool(incoming)") -    docs = yield soledad.get_from_index("by-incoming", '1') -    i = 1 -    for doc in docs: -        with open(os.path.join(directory, "message_%d.gpg" % i), "w") as f: -            f.write(doc.content["_enc_json"]) -        i += 1 - - -@inlineCallbacks -def _get_all_docs(soledad): -    _, docs = yield soledad.get_all_docs() -    for doc in docs: -        print json.dumps(doc.content, indent=4) - - -# main program - -@inlineCallbacks -def _main(soledad, km, args): -    try: -        if args.create_docs: -            for i in xrange(args.create_docs): -                t = time.time() -                logger.debug( -                    "Creating doc %d/%d..." % (i + 1, args.create_docs)) -                content = { -                    'datetime': time.strftime( -                        "%Y-%m-%d %H:%M:%S", time.gmtime(t)), -                    'timestamp': t, -                    'index': i, -                    'total': args.create_docs, -                } -                yield soledad.create_doc(content) -        if args.sync: -            yield soledad.sync() -        if args.repeat_sync: -            old_gen = 0 -            new_gen = yield soledad.sync() -            while old_gen != new_gen: -                old_gen = new_gen -                new_gen = yield soledad.sync() -        if args.get_all_docs: -            yield _get_all_docs(soledad) -        if args.export_private_key: -            yield _export_key(args, km, args.export_private_key, private=True) -        if args.export_public_key: -            yield _export_key(args, km, args.expoert_public_key, private=False) -        if args.export_incoming_messages: -            yield _export_incoming_messages( -                soledad, args.export_incoming_messages) -    except Exception as e: -        logger.error(e) -    finally: -        soledad.close() -        reactor.callWhenRunning(reactor.stop) - - -if __name__ == '__main__': -    args = _parse_args() -    passphrase = _get_passphrase(args) -    basedir = _get_basedir(args) - -    if not args.use_auth_data: -        # get auth data from server -        uuid, server_url, cert_file, token = \ -            _get_soledad_info( -                args.username, args.provider, passphrase, basedir) -    else: -        # load auth data from file -        with open(args.use_auth_data) as f: -            auth_data = json.loads(f.read()) -            uuid = auth_data['uuid'] -            server_url = auth_data['server_url'] -            cert_file = auth_data['cert_file'] -            token = auth_data['token'] - -    # export auth data to a file -    if args.export_auth_data: -        with open(args.export_auth_data, "w") as f: -            f.write(json.dumps({ -                'uuid': uuid, -                'server_url': server_url, -                'cert_file': cert_file, -                'token': token, -            })) - -    soledad = _get_soledad_instance( -        uuid, passphrase, basedir, server_url, cert_file, token) -    km = _get_keymanager_instance( -        args.username, -        args.provider, -        soledad, -        token, -        uid=uuid) -    _main(soledad, km, args) -    reactor.run() | 
