summaryrefslogtreecommitdiff
path: root/src/leap
diff options
context:
space:
mode:
authordrebs <drebs@leap.se>2013-04-23 18:40:03 -0300
committerdrebs <drebs@leap.se>2013-04-23 18:53:36 -0300
commit000639eeb664c476d1cea3fe7056db94caa093c0 (patch)
tree45a34ecce725d116bb25bb107c3c6480fa139578 /src/leap
parent4c74a91ad905e7d59260ccec789930c16b29a62d (diff)
Completelly switch to Key Manager for crypto.
This removes all GPG wrapper that was left and includes Key Manager to take care of all crypto stuff.
Diffstat (limited to 'src/leap')
-rw-r--r--src/leap/soledad/__init__.py98
-rw-r--r--src/leap/soledad/backends/leap_backend.py15
-rw-r--r--src/leap/soledad/crypto.py69
-rw-r--r--src/leap/soledad/tests/__init__.py2
-rw-r--r--src/leap/soledad/tests/test_crypto.py22
-rw-r--r--src/leap/soledad/tests/test_soledad.py14
-rw-r--r--src/leap/soledad/util.py393
7 files changed, 112 insertions, 501 deletions
diff --git a/src/leap/soledad/__init__.py b/src/leap/soledad/__init__.py
index a27a586d..23d81e84 100644
--- a/src/leap/soledad/__init__.py
+++ b/src/leap/soledad/__init__.py
@@ -43,7 +43,6 @@ from hashlib import sha256
from leap.common import events
-#from leap.common.keymanager.gpgwrapper import GPGWrapper
from leap.soledad.config import SoledadConfig
from leap.soledad.backends import sqlcipher
from leap.soledad.backends.leap_backend import (
@@ -108,14 +107,14 @@ class Soledad(object):
The length of the secret used for symmetric encryption.
"""
- def __init__(self, user, passphrase, config_path=None, gnupg_home=None,
+ def __init__(self, address, passphrase, config_path=None, gnupg_home=None,
secret_path=None, local_db_path=None,
shared_db_url=None, auth_token=None, bootstrap=True):
"""
Initialize configuration, cryptographic keys and dbs.
- @param user: Email address of the user (username@provider).
- @type user: str
+ @param address: User's address in the form C{user@provider}.
+ @type address: str
@param passphrase: The passphrase for locking and unlocking encryption
secrets for disk storage.
@type passphrase: str
@@ -138,7 +137,7 @@ class Soledad(object):
@type bootstrap: bool
"""
# TODO: allow for fingerprint enforcing.
- self._user = user
+ self._address = address
self._passphrase = passphrase
self._auth_token = auth_token
self._init_config(
@@ -179,7 +178,7 @@ class Soledad(object):
# TODO: log each bootstrap step.
# Stage 0 - Local environment setup
self._init_dirs()
- self._crypto = SoledadCrypto(self._config.get_gnupg_home())
+ self._crypto = SoledadCrypto(self)
if self._config.get_shared_db_url() and self._auth_token:
# TODO: eliminate need to create db here.
self._shared_db = SoledadSharedDatabase.open_database(
@@ -192,12 +191,14 @@ class Soledad(object):
if self._has_keys():
self._load_keys()
else:
- doc = self._get_keys_doc()
+ doc = self._fetch_keys_from_shared_db()
if not doc:
self._init_keys()
else:
- self._set_symkey(self.decrypt(doc.content['_symkey'],
- passphrase=self._hash_user()))
+ self._set_symkey(
+ self._crypto.decrypt_sym(
+ doc.content['_symkey'],
+ passphrase=self._address_hash()))
# Stage 2 - Keys synchronization
self._assert_server_keys()
# Stage 3 - Local database initialization
@@ -243,12 +244,13 @@ class Soledad(object):
"""
Generate (if needed) and load secret for symmetric encryption.
"""
- events.signal(events.events_pb2.SOLEDAD_CREATING_KEYS, self._user)
+ events.signal(events.events_pb2.SOLEDAD_CREATING_KEYS, self._address)
# load/generate secret
if not self._has_symkey():
self._gen_symkey()
self._load_symkey()
- events.signal(events.events_pb2.SOLEDAD_DONE_CREATING_KEYS, self._user)
+ events.signal(
+ events.events_pb2.SOLEDAD_DONE_CREATING_KEYS, self._address)
def _init_db(self):
"""
@@ -293,7 +295,8 @@ class Soledad(object):
raise DocumentNotEncrypted(
"File %s is not encrypted!" % self._config.get_secret_path())
# can we decrypt it?
- cyphertext = self._crypto.decrypt(content, passphrase=self._passphrase)
+ cyphertext = self._crypto.decrypt_sym(
+ content, passphrase=self._passphrase)
return bool(cyphertext)
def _load_symkey(self):
@@ -306,7 +309,8 @@ class Soledad(object):
try:
with open(self._config.get_secret_path()) as f:
self._symkey = \
- self._crypto.decrypt(f.read(), passphrase=self._passphrase)
+ self._crypto.decrypt_sym(
+ f.read(), passphrase=self._passphrase)
self._crypto.symkey = self._symkey
except IOError:
raise IOError('Failed to open secret file %s.' %
@@ -317,10 +321,11 @@ class Soledad(object):
Generate a secret for symmetric encryption and store in a local
encrypted file.
"""
- self._set_symkey(''.join(
+ symkey = ''.join(
random.choice(
string.ascii_letters +
- string.digits) for x in range(self.SECRET_LENGTH)))
+ string.digits) for x in range(self.SECRET_LENGTH))
+ self._set_symkey(symkey)
def _set_symkey(self, symkey):
"""
@@ -338,9 +343,8 @@ class Soledad(object):
self._store_symkey()
def _store_symkey(self):
- ciphertext = self._crypto.encrypt(
- self._symkey, symmetric=True,
- passphrase=self._passphrase)
+ ciphertext = self._crypto.encrypt_sym(
+ self._symkey, self._passphrase)
f = open(self._config.get_secret_path(), 'w')
f.write(str(ciphertext))
f.close()
@@ -370,7 +374,7 @@ class Soledad(object):
"""
self._gen_symkey()
- def _hash_user(self):
+ def _address_hash(self):
"""
Calculate a hash for storing/retrieving key material on shared
database, based on user's address.
@@ -379,9 +383,9 @@ class Soledad(object):
@rtype: str
"""
return b2a_base64(
- sha256('user-%s' % self._user).digest())[:-1]
+ sha256('address-%s' % self._address).digest())[:-1]
- def _get_keys_doc(self):
+ def _fetch_keys_from_shared_db(self):
"""
Retrieve the document with encrypted key material from the shared
database.
@@ -389,12 +393,14 @@ class Soledad(object):
@return: a document with encrypted key material in its contents
@rtype: LeapDocument
"""
- events.signal(events.events_pb2.SOLEDAD_DOWNLOADING_KEYS, self._user)
+ events.signal(
+ events.events_pb2.SOLEDAD_DOWNLOADING_KEYS, self._address)
# TODO: change below to raise appropriate exceptions
if not self._shared_db:
return None
- doc = self._shared_db.get_doc_unauth(self._hash_user())
- events.signal(events.events_pb2.SOLEDAD_DONE_DOWNLOADING_KEYS, self._user)
+ doc = self._shared_db.get_doc_unauth(self._address_hash())
+ events.signal(
+ events.events_pb2.SOLEDAD_DONE_DOWNLOADING_KEYS, self._address)
return doc
def _assert_server_keys(self):
@@ -404,20 +410,24 @@ class Soledad(object):
assert self._has_keys()
if not self._shared_db:
return
- doc = self._get_keys_doc()
+ doc = self._fetch_keys_from_shared_db()
if doc:
- remote_symkey = self.decrypt(doc.content['_symkey'],
- passphrase=self._hash_user())
+ remote_symkey = self.decrypt_sym(
+ doc.content['_symkey'],
+ passphrase=self._address_hash())
assert remote_symkey == self._symkey
else:
- events.signal(events.events_pb2.SOLEDAD_UPLOADING_KEYS, self._user)
+ events.signal(
+ events.events_pb2.SOLEDAD_UPLOADING_KEYS, self._address)
content = {
- '_symkey': self.encrypt(self._symkey),
+ '_symkey': self.encrypt_sym(self._symkey, self._passphrase),
}
- doc = LeapDocument(doc_id=self._hash_user(), crypto=self._crypto)
+ doc = LeapDocument(doc_id=self._address_hash(),
+ crypto=self._crypto)
doc.content = content
self._shared_db.put_doc(doc)
- events.signal(events.events_pb2.SOLEDAD_DONE_UPLOADING_KEYS, self._user)
+ events.signal(
+ events.events_pb2.SOLEDAD_DONE_UPLOADING_KEYS, self._address)
#-------------------------------------------------------------------------
# Document storage, retrieval and sync
@@ -497,7 +507,6 @@ class Soledad(object):
"""
return self._db.get_all_docs(include_deleted)
-
def create_doc(self, content, doc_id=None):
"""
Create a new document in the local encrypted database.
@@ -569,7 +578,7 @@ class Soledad(object):
"""
# TODO: create authentication scheme for sync with server.
local_gen = self._db.sync(url, creds=None, autocreate=True)
- events.signal(events.events_pb2.SOLEDAD_DONE_DATA_SYNC, self._user)
+ events.signal(events.events_pb2.SOLEDAD_DONE_DATA_SYNC, self._address)
return local_gen
def need_sync(self, url):
@@ -587,11 +596,11 @@ class Soledad(object):
info = target.get_sync_info(self._db._get_replica_uid())
# compare source generation with target's last known source generation
if self._db._get_generation() != info[4]:
- events.signal(events.events_pb2.SOLEDAD_NEW_DATA_TO_SYNC, self._user)
+ events.signal(
+ events.events_pb2.SOLEDAD_NEW_DATA_TO_SYNC, self._address)
return True
return False
-
#-------------------------------------------------------------------------
# Recovery document export and import
#-------------------------------------------------------------------------
@@ -623,13 +632,11 @@ class Soledad(object):
@rtype: str
"""
data = json.dumps({
- 'user': self._user,
+ 'address': self._address,
'symkey': self._symkey,
})
if passphrase:
- data = self._crypto.encrypt(data, None, sign=None,
- passphrase=passphrase,
- symmetric=True)
+ data = self._crypto.encrypt_sym(data, passphrase)
return data
def import_recovery_document(self, data, passphrase=None):
@@ -649,14 +656,23 @@ class Soledad(object):
raise DocumentNotEncrypted("You provided a password but the "
"recovery document is not encrypted.")
if passphrase:
- data = str(self._crypto.decrypt(data, passphrase=passphrase))
+ data = self._crypto.decrypt_sym(data, passphrase=passphrase)
data = json.loads(data)
- self._user = data['user']
+ self._address = data['address']
self._symkey = data['symkey']
self._crypto.symkey = self._symkey
self._store_symkey()
# TODO: make this work well with bootstrap.
self._load_keys()
+ #
+ # Setters/getters
+ #
+
+ def _get_address(self):
+ return self._address
+
+ address = property(_get_address, doc='The user address.')
+
__all__ = ['backends', 'util', 'server', 'shared_db']
diff --git a/src/leap/soledad/backends/leap_backend.py b/src/leap/soledad/backends/leap_backend.py
index dfec9e85..1c0d5a7d 100644
--- a/src/leap/soledad/backends/leap_backend.py
+++ b/src/leap/soledad/backends/leap_backend.py
@@ -35,6 +35,9 @@ from u1db.remote.http_database import HTTPDatabase
from u1db.errors import BrokenSyncStream
+from leap.common.keymanager import KeyManager
+
+
class NoDefaultKey(Exception):
"""
Exception to signal that there's no default OpenPGP key configured.
@@ -120,18 +123,18 @@ class LeapDocument(Document):
"""
if not self._crypto:
raise NoSoledadCryptoInstance()
- return self._crypto.encrypt_symmetric(
+ return self._crypto.encrypt_sym(
self.get_json(),
- self._crypto._hash_passphrase(self.doc_id))
+ self._crypto.passphrase_hash(self.doc_id))
def set_encrypted_content(self, cyphertext):
"""
Decrypt C{cyphertext} and set document's content.
contents.
"""
- plaintext = self._crypto.decrypt_symmetric(
+ plaintext = self._crypto.decrypt_sym(
cyphertext,
- self._crypto._hash_passphrase(self.doc_id))
+ self._crypto.passphrase_hash(self.doc_id))
self.set_json(plaintext)
self.encryption_scheme = EncryptionSchemes.NONE
@@ -299,14 +302,14 @@ class LeapSyncTarget(HTTPSyncTarget):
raise DocumentNotEncrypted(
'Incoming document\'s contents should be '
'encrypted with a symmetric key.')
- plain_json = self._crypto.decrypt_symmetric(
+ plain_json = self._crypto.decrypt_sym(
enc_json, self._crypto._symkey)
elif entry['encryption_scheme'] == EncryptionScheme.PUBKEY:
if not self._crypto.is_encrypted_asym(enc_json):
raise DocumentNotEncrypted(
'Incoming document\'s contents should be '
'encrypted to the user\'s public key.')
- plain_json = self._crypto.decrypt(enc_json)
+ plain_json = self._crypto.decrypt_asym(enc_json)
else:
raise DocumentNotEncrypted(
"Incoming document from sync is not encrypted.")
diff --git a/src/leap/soledad/crypto.py b/src/leap/soledad/crypto.py
index 471b35e2..dcc40439 100644
--- a/src/leap/soledad/crypto.py
+++ b/src/leap/soledad/crypto.py
@@ -25,8 +25,7 @@ from binascii import b2a_base64
from hashlib import sha256
-from leap.common.keymanager import KeyManager
-from leap.soledad.util import GPGWrapper
+from leap.common.keymanager import openpgp
class NoSymmetricSecret(Exception):
@@ -40,42 +39,32 @@ class SoledadCrypto(object):
General cryptographic functionality.
"""
- def __init__(self, gnupg_home, symkey=None):
+ def __init__(self, soledad):
"""
Initialize the crypto object.
- @param gnupg_home: Home of the gpg instance.
- @type gnupg_home: str
- @param symkey: A key to use for symmetric encryption.
- @type symkey: str
+ @param soledad: A Soledad instance for key lookup.
+ @type soledad: leap.soledad.Soledad
"""
- self._gpg = GPGWrapper(gnupghome=gnupg_home)
- self._symkey = symkey
+ self._soledad = soledad
+ self._pgp = openpgp.OpenPGPScheme(self._soledad)
+ self._symkey = None
- def encrypt(self, data, recipients=None, sign=None, passphrase=None,
- symmetric=False):
+ def encrypt_asym(self, data, key):
"""
Encrypt data.
@param data: the data to be encrypted
@type data: str
- @param recipients: to whom C{data} should be encrypted
- @type recipients: list or str
- @param sign: the fingerprint of key to be used for signature
- @type sign: str
- @param passphrase: the passphrase to be used for encryption
- @type passphrase: str
- @param symmetric: whether the encryption scheme should be symmetric
- @type symmetric: bool
+ @param key: the key to be used for encryption
+ @type key: str
@return: the encrypted data
@rtype: str
"""
- return str(self._gpg.encrypt(data, recipients, sign=sign,
- passphrase=passphrase,
- symmetric=symmetric))
+ return openpgp.encrypt_asym(data, key)
- def encrypt_symmetric(self, data, passphrase, sign=None):
+ def encrypt_sym(self, data, passphrase):
"""
Encrypt C{data} using a {password}.
@@ -83,18 +72,13 @@ class SoledadCrypto(object):
@type data: str
@param passphrase: the passphrase to use for encryption
@type passphrase: str
- @param data: the data to be encrypted
- @param sign: the fingerprint of key to be used for signature
- @type sign: str
@return: the encrypted data
@rtype: str
"""
- return self.encrypt(data, sign=sign,
- passphrase=passphrase,
- symmetric=True)
+ return openpgp.encrypt_sym(data, passphrase)
- def decrypt(self, data, passphrase=None):
+ def decrypt_asym(self, data):
"""
Decrypt data.
@@ -106,9 +90,10 @@ class SoledadCrypto(object):
@return: the decrypted data
@rtype: str
"""
- return str(self._gpg.decrypt(data, passphrase=passphrase))
+ key = self._pgp.get_key(self._soledad.address, private=True)
+ return openpgp.decrypt_asym(data, key)
- def decrypt_symmetric(self, data, passphrase):
+ def decrypt_sym(self, data, passphrase):
"""
Decrypt data using symmetric secret.
@@ -120,7 +105,7 @@ class SoledadCrypto(object):
@return: the decrypted data
@rtype: str
"""
- return self.decrypt(data, passphrase=passphrase)
+ return openpgp.decrypt_sym(data, passphrase)
def is_encrypted(self, data):
"""
@@ -132,7 +117,7 @@ class SoledadCrypto(object):
@return: whether the data is a cyphertext
@rtype: bool
"""
- return self._gpg.is_encrypted(data)
+ return openpgp.is_encrypted(data)
def is_encrypted_sym(self, data):
"""
@@ -141,7 +126,7 @@ class SoledadCrypto(object):
@return: whether data is encrypted to a symmetric key
@rtype: bool
"""
- return self._gpg.is_encrypted_sym(data)
+ return openpgp.is_encrypted_sym(data)
def is_encrypted_asym(self, data):
"""
@@ -151,14 +136,14 @@ class SoledadCrypto(object):
@return: whether data is encrypted to an OpenPGP private key
@rtype: bool
"""
- return self._gpg.is_encrypted_asym(data)
+ return openpgp.is_encrypted_asym(data)
- def _hash_passphrase(self, suffix):
+ def passphrase_hash(self, suffix):
"""
Generate a passphrase for symmetric encryption.
- The password is derived from C{suffix} and the secret for
- symmetric encryption previously loaded.
+ The password is derived from the secret for symmetric encryption and
+ a C{suffix} that is appended to the secret prior to hashing.
@param suffix: Will be appended to the symmetric key before hashing.
@type suffix: str
@@ -172,9 +157,9 @@ class SoledadCrypto(object):
return b2a_base64(
sha256('%s%s' % (self._symkey, suffix)).digest())[:-1]
- #
- # symkey setters/getters
- #
+ #
+ # symkey setters/getters
+ #
def _get_symkey(self):
return self._symkey
diff --git a/src/leap/soledad/tests/__init__.py b/src/leap/soledad/tests/__init__.py
index 28114391..dac27a29 100644
--- a/src/leap/soledad/tests/__init__.py
+++ b/src/leap/soledad/tests/__init__.py
@@ -36,7 +36,7 @@ class BaseSoledadTest(BaseLeapTest):
self._soledad = self._soledad_instance(user=self.email)
self._soledad._init_dirs()
#self._soledad._gpg.import_keys(PUBLIC_KEY)
- self._soledad._crypto = SoledadCrypto(self.gnupg_home)
+ self._soledad._crypto = SoledadCrypto(self._soledad)
if not self._soledad._has_symkey():
self._soledad._gen_symkey()
self._soledad._load_symkey()
diff --git a/src/leap/soledad/tests/test_crypto.py b/src/leap/soledad/tests/test_crypto.py
index f762437a..039d2f3c 100644
--- a/src/leap/soledad/tests/test_crypto.py
+++ b/src/leap/soledad/tests/test_crypto.py
@@ -77,7 +77,7 @@ class RecoveryDocumentTestCase(BaseSoledadTest):
rd = self._soledad.export_recovery_document(None)
self.assertEqual(
{
- 'user': self._soledad._user,
+ 'address': self._soledad._address,
'symkey': self._soledad._symkey
},
json.loads(rd),
@@ -89,10 +89,10 @@ class RecoveryDocumentTestCase(BaseSoledadTest):
self.assertEqual(True,
self._soledad._crypto.is_encrypted_sym(rd))
data = {
- 'user': self._soledad._user,
+ 'address': self._soledad._address,
'symkey': self._soledad._symkey,
}
- raw_data = json.loads(str(self._soledad._crypto.decrypt(
+ raw_data = json.loads(str(self._soledad._crypto.decrypt_sym(
rd,
passphrase='123456')))
self.assertEqual(
@@ -111,10 +111,10 @@ class RecoveryDocumentTestCase(BaseSoledadTest):
gnupg_home = self.gnupg_home = "%s/gnupg2" % self.tempdir
s = self._soledad_instance(user='anotheruser@leap.se', prefix='/2')
s._init_dirs()
- s._crypto = SoledadCrypto(gnupg_home)
+ s._crypto = SoledadCrypto(s)
s.import_recovery_document(rd, None)
- self.assertEqual(self._soledad._user,
- s._user, 'Failed setting user email.')
+ self.assertEqual(self._soledad._address,
+ s._address, 'Failed setting user email.')
self.assertEqual(self._soledad._symkey,
s._symkey,
'Failed settinng secret for symmetric encryption.')
@@ -124,10 +124,10 @@ class RecoveryDocumentTestCase(BaseSoledadTest):
gnupg_home = self.gnupg_home = "%s/gnupg2" % self.tempdir
s = self._soledad_instance(user='anotheruser@leap.se', prefix='3')
s._init_dirs()
- s._crypto = SoledadCrypto(gnupg_home)
+ s._crypto = SoledadCrypto(s)
s.import_recovery_document(rd, '123456')
- self.assertEqual(self._soledad._user,
- s._user, 'Failed setting user email.')
+ self.assertEqual(self._soledad._address,
+ s._address, 'Failed setting user email.')
self.assertEqual(self._soledad._symkey,
s._symkey,
'Failed settinng secret for symmetric encryption.')
@@ -138,7 +138,7 @@ class CryptoMethodsTestCase(BaseSoledadTest):
def test__gen_symkey(self):
sol = self._soledad_instance(user='user@leap.se', prefix='/3')
sol._init_dirs()
- sol._crypto = SoledadCrypto("%s/3/gnupg" % self.tempdir)
+ sol._crypto = SoledadCrypto(sol)
self.assertFalse(sol._has_symkey(), "Should not have a symkey at "
"this point")
sol._gen_symkey()
@@ -147,7 +147,7 @@ class CryptoMethodsTestCase(BaseSoledadTest):
def test__has_keys(self):
sol = self._soledad_instance(user='leap@leap.se', prefix='/5')
sol._init_dirs()
- sol._crypto = SoledadCrypto("%s/5/gnupg" % self.tempdir)
+ sol._crypto = SoledadCrypto(sol)
self.assertFalse(sol._has_keys())
sol._gen_symkey()
self.assertTrue(sol._has_keys())
diff --git a/src/leap/soledad/tests/test_soledad.py b/src/leap/soledad/tests/test_soledad.py
index b849c310..bbe9ad4b 100644
--- a/src/leap/soledad/tests/test_soledad.py
+++ b/src/leap/soledad/tests/test_soledad.py
@@ -50,7 +50,7 @@ class AuxMethodsTestCase(BaseSoledadTest):
def test__init_db(self):
sol = self._soledad_instance()
sol._init_dirs()
- sol._crypto = SoledadCrypto(self.tempdir+'/gnupg')
+ sol._crypto = SoledadCrypto(sol)
#self._soledad._gpg.import_keys(PUBLIC_KEY)
if not sol._has_symkey():
sol._gen_symkey()
@@ -63,7 +63,7 @@ class AuxMethodsTestCase(BaseSoledadTest):
"""
Test if configuration defaults point to the correct place.
"""
- sol = Soledad(user='leap@leap.se', passphrase='123', bootstrap=False)
+ sol = Soledad('leap@leap.se', passphrase='123', bootstrap=False)
self.assertTrue(bool(re.match(
'.*/\.config/leap/soledad/gnupg', sol._config.get_gnupg_home())))
self.assertTrue(bool(re.match(
@@ -83,7 +83,7 @@ class AuxMethodsTestCase(BaseSoledadTest):
# we use regexp match here because HOME environment variable is
# changed by the BaseLeapTest class but BaseConfig does not capture
# that change.
- sol = Soledad(user='leap@leap.se', passphrase='123', bootstrap=False)
+ sol = Soledad('leap@leap.se', passphrase='123', bootstrap=False)
self.assertTrue(bool(re.match(
'.*/\.config/leap/soledad/gnupg', sol._config.get_gnupg_home())))
self.assertTrue(bool(re.match(
@@ -114,8 +114,8 @@ class AuxMethodsTestCase(BaseSoledadTest):
f.write(json.dumps(config_values))
f.close()
sol = Soledad(
- user='leap@leap.se',
- passphrase='123',
+ 'leap@leap.se',
+ passphrase='123',
bootstrap=False,
config_path=tmpfile)
self.assertEqual('value_1', sol._config.get_gnupg_home())
@@ -131,8 +131,8 @@ class AuxMethodsTestCase(BaseSoledadTest):
# changed by the BaseLeapTest class but BaseConfig does not capture
# that change.
sol = Soledad(
- user='leap@leap.se',
- passphrase='123',
+ 'leap@leap.se',
+ passphrase='123',
bootstrap=False,
gnupg_home='value_4',
secret_path='value_3',
diff --git a/src/leap/soledad/util.py b/src/leap/soledad/util.py
deleted file mode 100644
index 90797501..00000000
--- a/src/leap/soledad/util.py
+++ /dev/null
@@ -1,393 +0,0 @@
-# -*- coding: utf-8 -*-
-# util.py
-# Copyright (C) 2013 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-
-"""
-Utilities for Soledad.
-"""
-
-import os
-import gnupg
-import re
-from gnupg import (
- logger,
- _is_sequence,
- _make_binary_stream,
-)
-
-
-class ListPackets():
- """
- Handle status messages for --list-packets.
- """
-
- def __init__(self, gpg):
- """
- Initialize the packet listing handling class.
-
- @param gpg: GPG object instance.
- @type gpg: gnupg.GPG
- """
- self.gpg = gpg
- self.nodata = None
- self.key = None
- self.need_passphrase = None
- self.need_passphrase_sym = None
- self.userid_hint = None
-
- def handle_status(self, key, value):
- """
- Handle one line of the --list-packets status message.
-
- @param key: The status message key.
- @type key: str
- @param value: The status message value.
- @type value: str
- """
- # TODO: write tests for handle_status
- if key == 'NODATA':
- self.nodata = True
- if key == 'ENC_TO':
- # This will only capture keys in our keyring. In the future we
- # may want to include multiple unknown keys in this list.
- self.key, _, _ = value.split()
- if key == 'NEED_PASSPHRASE':
- self.need_passphrase = True
- if key == 'NEED_PASSPHRASE_SYM':
- self.need_passphrase_sym = True
- if key == 'USERID_HINT':
- self.userid_hint = value.strip().split()
-
-
-class GPGWrapper(gnupg.GPG):
- """
- This is a temporary class for handling GPG requests, and should be
- replaced by a more general class used throughout the project.
- """
-
- GNUPG_HOME = os.environ['HOME'] + "/.config/leap/gnupg"
- GNUPG_BINARY = "/usr/bin/gpg" # this has to be changed based on OS
-
- def __init__(self, gpgbinary=GNUPG_BINARY, gnupghome=GNUPG_HOME,
- verbose=False, use_agent=False, keyring=None, options=None):
- """
- Initialize a GnuPG process wrapper.
-
- @param gpgbinary: Name for GnuPG binary executable.
- @type gpgbinary: C{str}
- @param gpghome: Full pathname to directory containing the public and
- private keyrings.
- @type gpghome: C{str}
- @param keyring: Name of alternative keyring file to use. If specified,
- the default keyring is not used.
- @param verbose: Should some verbose info be output?
- @type verbose: bool
- @param use_agent: Should pass `--use-agent` to GPG binary?
- @type use_agent: bool
- @param keyring: Path for the keyring to use.
- @type keyring: str
- @options: A list of additional options to pass to the GPG binary.
- @type options: list
-
- @raise: RuntimeError with explanation message if there is a problem
- invoking gpg.
- """
- gnupg.GPG.__init__(self, gnupghome=gnupghome, gpgbinary=gpgbinary,
- verbose=verbose, use_agent=use_agent,
- keyring=keyring, options=options)
- self.result_map['list-packets'] = ListPackets
-
- def find_key_by_email(self, email, secret=False):
- """
- Find user's key based on their email.
-
- @param email: Email address of key being searched for.
- @type email: str
- @param secret: Should we search for a secret key?
- @type secret: bool
-
- @return: The fingerprint of the found key.
- @rtype: str
- """
- for key in self.list_keys(secret=secret):
- for uid in key['uids']:
- if re.search(email, uid):
- return key
- raise LookupError("GnuPG public key for email %s not found!" % email)
-
- def find_key_by_subkey(self, subkey, secret=False):
- """
- Find user's key based on a subkey fingerprint.
-
- @param email: Subkey fingerprint of the key being searched for.
- @type email: str
- @param secret: Should we search for a secret key?
- @type secret: bool
-
- @return: The fingerprint of the found key.
- @rtype: str
- """
- for key in self.list_keys(secret=secret):
- for sub in key['subkeys']:
- if sub[0] == subkey:
- return key
- raise LookupError(
- "GnuPG public key for subkey %s not found!" % subkey)
-
- def find_key_by_keyid(self, keyid, secret=False):
- """
- Find user's key based on the key ID.
-
- @param email: The key ID of the key being searched for.
- @type email: str
- @param secret: Should we search for a secret key?
- @type secret: bool
-
- @return: The fingerprint of the found key.
- @rtype: str
- """
- for key in self.list_keys(secret=secret):
- if keyid == key['keyid']:
- return key
- raise LookupError(
- "GnuPG public key for keyid %s not found!" % keyid)
-
- def find_key_by_fingerprint(self, fingerprint, secret=False):
- """
- Find user's key based on the key fingerprint.
-
- @param email: The fingerprint of the key being searched for.
- @type email: str
- @param secret: Should we search for a secret key?
- @type secret: bool
-
- @return: The fingerprint of the found key.
- @rtype: str
- """
- for key in self.list_keys(secret=secret):
- if fingerprint == key['fingerprint']:
- return key
- raise LookupError(
- "GnuPG public key for fingerprint %s not found!" % fingerprint)
-
- def encrypt(self, data, recipient, sign=None, always_trust=True,
- passphrase=None, symmetric=False):
- """
- Encrypt data using GPG.
-
- @param data: The data to be encrypted.
- @type data: str
- @param recipient: The address of the public key to be used.
- @type recipient: str
- @param sign: Should the encrypted content be signed?
- @type sign: bool
- @param always_trust: Skip key validation and assume that used keys
- are always fully trusted?
- @type always_trust: bool
- @param passphrase: The passphrase to be used if symmetric encryption
- is desired.
- @type passphrase: str
- @param symmetric: Should we encrypt to a password?
- @type symmetric: bool
-
- @return: An object with encrypted result in the `data` field.
- @rtype: gnupg.Crypt
- """
- # TODO: devise a way so we don't need to "always trust".
- return gnupg.GPG.encrypt(self, data, recipient, sign=sign,
- always_trust=always_trust,
- passphrase=passphrase,
- symmetric=symmetric,
- cipher_algo='AES256')
-
- def decrypt(self, data, always_trust=True, passphrase=None):
- """
- Decrypt data using GPG.
-
- @param data: The data to be decrypted.
- @type data: str
- @param always_trust: Skip key validation and assume that used keys
- are always fully trusted?
- @type always_trust: bool
- @param passphrase: The passphrase to be used if symmetric encryption
- is desired.
- @type passphrase: str
-
- @return: An object with decrypted result in the `data` field.
- @rtype: gnupg.Crypt
- """
- # TODO: devise a way so we don't need to "always trust".
- return gnupg.GPG.decrypt(self, data, always_trust=always_trust,
- passphrase=passphrase)
-
- def send_keys(self, keyserver, *keyids):
- """
- Send keys to a keyserver
-
- @param keyserver: The keyserver to send the keys to.
- @type keyserver: str
- @param keyids: The key ids to send.
- @type keyids: list
-
- @return: A list of keys sent to server.
- @rtype: gnupg.ListKeys
- """
- # TODO: write tests for this.
- # TODO: write a SendKeys class to handle status for this.
- result = self.result_map['list'](self)
- gnupg.logger.debug('send_keys: %r', keyids)
- data = gnupg._make_binary_stream("", self.encoding)
- args = ['--keyserver', keyserver, '--send-keys']
- args.extend(keyids)
- self._handle_io(args, data, result, binary=True)
- gnupg.logger.debug('send_keys result: %r', result.__dict__)
- data.close()
- return result
-
- def encrypt_file(self, file, recipients, sign=None,
- always_trust=False, passphrase=None,
- armor=True, output=None, symmetric=False,
- cipher_algo=None):
- """
- Encrypt the message read from the file-like object 'file'.
-
- @param file: The file to be encrypted.
- @type data: file
- @param recipient: The address of the public key to be used.
- @type recipient: str
- @param sign: Should the encrypted content be signed?
- @type sign: bool
- @param always_trust: Skip key validation and assume that used keys
- are always fully trusted?
- @type always_trust: bool
- @param passphrase: The passphrase to be used if symmetric encryption
- is desired.
- @type passphrase: str
- @param armor: Create ASCII armored output?
- @type armor: bool
- @param output: Path of file to write results in.
- @type output: str
- @param symmetric: Should we encrypt to a password?
- @type symmetric: bool
- @param cipher_algo: Algorithm to use.
- @type cipher_algo: str
-
- @return: An object with encrypted result in the `data` field.
- @rtype: gnupg.Crypt
- """
- args = ['--encrypt']
- if symmetric:
- args = ['--symmetric']
- if cipher_algo:
- args.append('--cipher-algo %s' % cipher_algo)
- else:
- args = ['--encrypt']
- if not _is_sequence(recipients):
- recipients = (recipients,)
- for recipient in recipients:
- args.append('--recipient "%s"' % recipient)
- if armor: # create ascii-armored output - set to False for binary
- args.append('--armor')
- if output: # write the output to a file with the specified name
- if os.path.exists(output):
- os.remove(output) # to avoid overwrite confirmation message
- args.append('--output "%s"' % output)
- if sign:
- args.append('--sign --default-key "%s"' % sign)
- if always_trust:
- args.append("--always-trust")
- result = self.result_map['crypt'](self)
- self._handle_io(args, file, result, passphrase=passphrase, binary=True)
- logger.debug('encrypt result: %r', result.data)
- return result
-
- def list_packets(self, data):
- """
- List the sequence of packets.
-
- @param data: The data to extract packets from.
- @type data: str
-
- @return: An object with packet info.
- @rtype ListPackets
- """
- args = ["--list-packets"]
- result = self.result_map['list-packets'](self)
- self._handle_io(
- args,
- _make_binary_stream(data, self.encoding),
- result,
- )
- return result
-
- def encrypted_to(self, data):
- """
- Return the key to which data is encrypted to.
-
- @param data: The data to be examined.
- @type data: str
-
- @return: The fingerprint of the key to which data is encrypted to.
- @rtype: str
- """
- # TODO: make this support multiple keys.
- result = self.list_packets(data)
- if not result.key:
- raise LookupError(
- "Content is not encrypted to a GnuPG key!")
- try:
- return self.find_key_by_keyid(result.key)
- except:
- return self.find_key_by_subkey(result.key)
-
- def is_encrypted_sym(self, data):
- """
- Say whether some chunk of data is encrypted to a symmetric key.
-
- @param data: The data to be examined.
- @type data: str
-
- @return: Whether data is encrypted to a symmetric key.
- @rtype: bool
- """
- result = self.list_packets(data)
- return bool(result.need_passphrase_sym)
-
- def is_encrypted_asym(self, data):
- """
- Say whether some chunk of data is encrypted to a private key.
-
- @param data: The data to be examined.
- @type data: str
-
- @return: Whether data is encrypted to a private key.
- @rtype: bool
- """
- result = self.list_packets(data)
- return bool(result.key)
-
- def is_encrypted(self, data):
- """
- Say whether some chunk of data is encrypted to a key.
-
- @param data: The data to be examined.
- @type data: str
-
- @return: Whether data is encrypted to a key.
- @rtype: bool
- """
- self.is_encrypted_asym() or self.is_encrypted_sym()