summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/leap/soledad/__init__.py98
-rw-r--r--src/leap/soledad/backends/leap_backend.py15
-rw-r--r--src/leap/soledad/crypto.py69
-rw-r--r--src/leap/soledad/tests/__init__.py2
-rw-r--r--src/leap/soledad/tests/test_crypto.py22
-rw-r--r--src/leap/soledad/tests/test_soledad.py14
-rw-r--r--src/leap/soledad/util.py393
7 files changed, 112 insertions, 501 deletions
diff --git a/src/leap/soledad/__init__.py b/src/leap/soledad/__init__.py
index a27a586d..23d81e84 100644
--- a/src/leap/soledad/__init__.py
+++ b/src/leap/soledad/__init__.py
@@ -43,7 +43,6 @@ from hashlib import sha256
from leap.common import events
-#from leap.common.keymanager.gpgwrapper import GPGWrapper
from leap.soledad.config import SoledadConfig
from leap.soledad.backends import sqlcipher
from leap.soledad.backends.leap_backend import (
@@ -108,14 +107,14 @@ class Soledad(object):
The length of the secret used for symmetric encryption.
"""
- def __init__(self, user, passphrase, config_path=None, gnupg_home=None,
+ def __init__(self, address, passphrase, config_path=None, gnupg_home=None,
secret_path=None, local_db_path=None,
shared_db_url=None, auth_token=None, bootstrap=True):
"""
Initialize configuration, cryptographic keys and dbs.
- @param user: Email address of the user (username@provider).
- @type user: str
+ @param address: User's address in the form C{user@provider}.
+ @type address: str
@param passphrase: The passphrase for locking and unlocking encryption
secrets for disk storage.
@type passphrase: str
@@ -138,7 +137,7 @@ class Soledad(object):
@type bootstrap: bool
"""
# TODO: allow for fingerprint enforcing.
- self._user = user
+ self._address = address
self._passphrase = passphrase
self._auth_token = auth_token
self._init_config(
@@ -179,7 +178,7 @@ class Soledad(object):
# TODO: log each bootstrap step.
# Stage 0 - Local environment setup
self._init_dirs()
- self._crypto = SoledadCrypto(self._config.get_gnupg_home())
+ self._crypto = SoledadCrypto(self)
if self._config.get_shared_db_url() and self._auth_token:
# TODO: eliminate need to create db here.
self._shared_db = SoledadSharedDatabase.open_database(
@@ -192,12 +191,14 @@ class Soledad(object):
if self._has_keys():
self._load_keys()
else:
- doc = self._get_keys_doc()
+ doc = self._fetch_keys_from_shared_db()
if not doc:
self._init_keys()
else:
- self._set_symkey(self.decrypt(doc.content['_symkey'],
- passphrase=self._hash_user()))
+ self._set_symkey(
+ self._crypto.decrypt_sym(
+ doc.content['_symkey'],
+ passphrase=self._address_hash()))
# Stage 2 - Keys synchronization
self._assert_server_keys()
# Stage 3 - Local database initialization
@@ -243,12 +244,13 @@ class Soledad(object):
"""
Generate (if needed) and load secret for symmetric encryption.
"""
- events.signal(events.events_pb2.SOLEDAD_CREATING_KEYS, self._user)
+ events.signal(events.events_pb2.SOLEDAD_CREATING_KEYS, self._address)
# load/generate secret
if not self._has_symkey():
self._gen_symkey()
self._load_symkey()
- events.signal(events.events_pb2.SOLEDAD_DONE_CREATING_KEYS, self._user)
+ events.signal(
+ events.events_pb2.SOLEDAD_DONE_CREATING_KEYS, self._address)
def _init_db(self):
"""
@@ -293,7 +295,8 @@ class Soledad(object):
raise DocumentNotEncrypted(
"File %s is not encrypted!" % self._config.get_secret_path())
# can we decrypt it?
- cyphertext = self._crypto.decrypt(content, passphrase=self._passphrase)
+ cyphertext = self._crypto.decrypt_sym(
+ content, passphrase=self._passphrase)
return bool(cyphertext)
def _load_symkey(self):
@@ -306,7 +309,8 @@ class Soledad(object):
try:
with open(self._config.get_secret_path()) as f:
self._symkey = \
- self._crypto.decrypt(f.read(), passphrase=self._passphrase)
+ self._crypto.decrypt_sym(
+ f.read(), passphrase=self._passphrase)
self._crypto.symkey = self._symkey
except IOError:
raise IOError('Failed to open secret file %s.' %
@@ -317,10 +321,11 @@ class Soledad(object):
Generate a secret for symmetric encryption and store in a local
encrypted file.
"""
- self._set_symkey(''.join(
+ symkey = ''.join(
random.choice(
string.ascii_letters +
- string.digits) for x in range(self.SECRET_LENGTH)))
+ string.digits) for x in range(self.SECRET_LENGTH))
+ self._set_symkey(symkey)
def _set_symkey(self, symkey):
"""
@@ -338,9 +343,8 @@ class Soledad(object):
self._store_symkey()
def _store_symkey(self):
- ciphertext = self._crypto.encrypt(
- self._symkey, symmetric=True,
- passphrase=self._passphrase)
+ ciphertext = self._crypto.encrypt_sym(
+ self._symkey, self._passphrase)
f = open(self._config.get_secret_path(), 'w')
f.write(str(ciphertext))
f.close()
@@ -370,7 +374,7 @@ class Soledad(object):
"""
self._gen_symkey()
- def _hash_user(self):
+ def _address_hash(self):
"""
Calculate a hash for storing/retrieving key material on shared
database, based on user's address.
@@ -379,9 +383,9 @@ class Soledad(object):
@rtype: str
"""
return b2a_base64(
- sha256('user-%s' % self._user).digest())[:-1]
+ sha256('address-%s' % self._address).digest())[:-1]
- def _get_keys_doc(self):
+ def _fetch_keys_from_shared_db(self):
"""
Retrieve the document with encrypted key material from the shared
database.
@@ -389,12 +393,14 @@ class Soledad(object):
@return: a document with encrypted key material in its contents
@rtype: LeapDocument
"""
- events.signal(events.events_pb2.SOLEDAD_DOWNLOADING_KEYS, self._user)
+ events.signal(
+ events.events_pb2.SOLEDAD_DOWNLOADING_KEYS, self._address)
# TODO: change below to raise appropriate exceptions
if not self._shared_db:
return None
- doc = self._shared_db.get_doc_unauth(self._hash_user())
- events.signal(events.events_pb2.SOLEDAD_DONE_DOWNLOADING_KEYS, self._user)
+ doc = self._shared_db.get_doc_unauth(self._address_hash())
+ events.signal(
+ events.events_pb2.SOLEDAD_DONE_DOWNLOADING_KEYS, self._address)
return doc
def _assert_server_keys(self):
@@ -404,20 +410,24 @@ class Soledad(object):
assert self._has_keys()
if not self._shared_db:
return
- doc = self._get_keys_doc()
+ doc = self._fetch_keys_from_shared_db()
if doc:
- remote_symkey = self.decrypt(doc.content['_symkey'],
- passphrase=self._hash_user())
+ remote_symkey = self.decrypt_sym(
+ doc.content['_symkey'],
+ passphrase=self._address_hash())
assert remote_symkey == self._symkey
else:
- events.signal(events.events_pb2.SOLEDAD_UPLOADING_KEYS, self._user)
+ events.signal(
+ events.events_pb2.SOLEDAD_UPLOADING_KEYS, self._address)
content = {
- '_symkey': self.encrypt(self._symkey),
+ '_symkey': self.encrypt_sym(self._symkey, self._passphrase),
}
- doc = LeapDocument(doc_id=self._hash_user(), crypto=self._crypto)
+ doc = LeapDocument(doc_id=self._address_hash(),
+ crypto=self._crypto)
doc.content = content
self._shared_db.put_doc(doc)
- events.signal(events.events_pb2.SOLEDAD_DONE_UPLOADING_KEYS, self._user)
+ events.signal(
+ events.events_pb2.SOLEDAD_DONE_UPLOADING_KEYS, self._address)
#-------------------------------------------------------------------------
# Document storage, retrieval and sync
@@ -497,7 +507,6 @@ class Soledad(object):
"""
return self._db.get_all_docs(include_deleted)
-
def create_doc(self, content, doc_id=None):
"""
Create a new document in the local encrypted database.
@@ -569,7 +578,7 @@ class Soledad(object):
"""
# TODO: create authentication scheme for sync with server.
local_gen = self._db.sync(url, creds=None, autocreate=True)
- events.signal(events.events_pb2.SOLEDAD_DONE_DATA_SYNC, self._user)
+ events.signal(events.events_pb2.SOLEDAD_DONE_DATA_SYNC, self._address)
return local_gen
def need_sync(self, url):
@@ -587,11 +596,11 @@ class Soledad(object):
info = target.get_sync_info(self._db._get_replica_uid())
# compare source generation with target's last known source generation
if self._db._get_generation() != info[4]:
- events.signal(events.events_pb2.SOLEDAD_NEW_DATA_TO_SYNC, self._user)
+ events.signal(
+ events.events_pb2.SOLEDAD_NEW_DATA_TO_SYNC, self._address)
return True
return False
-
#-------------------------------------------------------------------------
# Recovery document export and import
#-------------------------------------------------------------------------
@@ -623,13 +632,11 @@ class Soledad(object):
@rtype: str
"""
data = json.dumps({
- 'user': self._user,
+ 'address': self._address,
'symkey': self._symkey,
})
if passphrase:
- data = self._crypto.encrypt(data, None, sign=None,
- passphrase=passphrase,
- symmetric=True)
+ data = self._crypto.encrypt_sym(data, passphrase)
return data
def import_recovery_document(self, data, passphrase=None):
@@ -649,14 +656,23 @@ class Soledad(object):
raise DocumentNotEncrypted("You provided a password but the "
"recovery document is not encrypted.")
if passphrase:
- data = str(self._crypto.decrypt(data, passphrase=passphrase))
+ data = self._crypto.decrypt_sym(data, passphrase=passphrase)
data = json.loads(data)
- self._user = data['user']
+ self._address = data['address']
self._symkey = data['symkey']
self._crypto.symkey = self._symkey
self._store_symkey()
# TODO: make this work well with bootstrap.
self._load_keys()
+ #
+ # Setters/getters
+ #
+
+ def _get_address(self):
+ return self._address
+
+ address = property(_get_address, doc='The user address.')
+
__all__ = ['backends', 'util', 'server', 'shared_db']
diff --git a/src/leap/soledad/backends/leap_backend.py b/src/leap/soledad/backends/leap_backend.py
index dfec9e85..1c0d5a7d 100644
--- a/src/leap/soledad/backends/leap_backend.py
+++ b/src/leap/soledad/backends/leap_backend.py
@@ -35,6 +35,9 @@ from u1db.remote.http_database import HTTPDatabase
from u1db.errors import BrokenSyncStream
+from leap.common.keymanager import KeyManager
+
+
class NoDefaultKey(Exception):
"""
Exception to signal that there's no default OpenPGP key configured.
@@ -120,18 +123,18 @@ class LeapDocument(Document):
"""
if not self._crypto:
raise NoSoledadCryptoInstance()
- return self._crypto.encrypt_symmetric(
+ return self._crypto.encrypt_sym(
self.get_json(),
- self._crypto._hash_passphrase(self.doc_id))
+ self._crypto.passphrase_hash(self.doc_id))
def set_encrypted_content(self, cyphertext):
"""
Decrypt C{cyphertext} and set document's content.
contents.
"""
- plaintext = self._crypto.decrypt_symmetric(
+ plaintext = self._crypto.decrypt_sym(
cyphertext,
- self._crypto._hash_passphrase(self.doc_id))
+ self._crypto.passphrase_hash(self.doc_id))
self.set_json(plaintext)
self.encryption_scheme = EncryptionSchemes.NONE
@@ -299,14 +302,14 @@ class LeapSyncTarget(HTTPSyncTarget):
raise DocumentNotEncrypted(
'Incoming document\'s contents should be '
'encrypted with a symmetric key.')
- plain_json = self._crypto.decrypt_symmetric(
+ plain_json = self._crypto.decrypt_sym(
enc_json, self._crypto._symkey)
elif entry['encryption_scheme'] == EncryptionScheme.PUBKEY:
if not self._crypto.is_encrypted_asym(enc_json):
raise DocumentNotEncrypted(
'Incoming document\'s contents should be '
'encrypted to the user\'s public key.')
- plain_json = self._crypto.decrypt(enc_json)
+ plain_json = self._crypto.decrypt_asym(enc_json)
else:
raise DocumentNotEncrypted(
"Incoming document from sync is not encrypted.")
diff --git a/src/leap/soledad/crypto.py b/src/leap/soledad/crypto.py
index 471b35e2..dcc40439 100644
--- a/src/leap/soledad/crypto.py
+++ b/src/leap/soledad/crypto.py
@@ -25,8 +25,7 @@ from binascii import b2a_base64
from hashlib import sha256
-from leap.common.keymanager import KeyManager
-from leap.soledad.util import GPGWrapper
+from leap.common.keymanager import openpgp
class NoSymmetricSecret(Exception):
@@ -40,42 +39,32 @@ class SoledadCrypto(object):
General cryptographic functionality.
"""
- def __init__(self, gnupg_home, symkey=None):
+ def __init__(self, soledad):
"""
Initialize the crypto object.
- @param gnupg_home: Home of the gpg instance.
- @type gnupg_home: str
- @param symkey: A key to use for symmetric encryption.
- @type symkey: str
+ @param soledad: A Soledad instance for key lookup.
+ @type soledad: leap.soledad.Soledad
"""
- self._gpg = GPGWrapper(gnupghome=gnupg_home)
- self._symkey = symkey
+ self._soledad = soledad
+ self._pgp = openpgp.OpenPGPScheme(self._soledad)
+ self._symkey = None
- def encrypt(self, data, recipients=None, sign=None, passphrase=None,
- symmetric=False):
+ def encrypt_asym(self, data, key):
"""
Encrypt data.
@param data: the data to be encrypted
@type data: str
- @param recipients: to whom C{data} should be encrypted
- @type recipients: list or str
- @param sign: the fingerprint of key to be used for signature
- @type sign: str
- @param passphrase: the passphrase to be used for encryption
- @type passphrase: str
- @param symmetric: whether the encryption scheme should be symmetric
- @type symmetric: bool
+ @param key: the key to be used for encryption
+ @type key: str
@return: the encrypted data
@rtype: str
"""
- return str(self._gpg.encrypt(data, recipients, sign=sign,
- passphrase=passphrase,
- symmetric=symmetric))
+ return openpgp.encrypt_asym(data, key)
- def encrypt_symmetric(self, data, passphrase, sign=None):
+ def encrypt_sym(self, data, passphrase):
"""
Encrypt C{data} using a {password}.
@@ -83,18 +72,13 @@ class SoledadCrypto(object):
@type data: str
@param passphrase: the passphrase to use for encryption
@type passphrase: str
- @param data: the data to be encrypted
- @param sign: the fingerprint of key to be used for signature
- @type sign: str
@return: the encrypted data
@rtype: str
"""
- return self.encrypt(data, sign=sign,
- passphrase=passphrase,
- symmetric=True)
+ return openpgp.encrypt_sym(data, passphrase)
- def decrypt(self, data, passphrase=None):
+ def decrypt_asym(self, data):
"""
Decrypt data.
@@ -106,9 +90,10 @@ class SoledadCrypto(object):
@return: the decrypted data
@rtype: str
"""
- return str(self._gpg.decrypt(data, passphrase=passphrase))
+ key = self._pgp.get_key(self._soledad.address, private=True)
+ return openpgp.decrypt_asym(data, key)
- def decrypt_symmetric(self, data, passphrase):
+ def decrypt_sym(self, data, passphrase):
"""
Decrypt data using symmetric secret.
@@ -120,7 +105,7 @@ class SoledadCrypto(object):
@return: the decrypted data
@rtype: str
"""
- return self.decrypt(data, passphrase=passphrase)
+ return openpgp.decrypt_sym(data, passphrase)
def is_encrypted(self, data):
"""
@@ -132,7 +117,7 @@ class SoledadCrypto(object):
@return: whether the data is a cyphertext
@rtype: bool
"""
- return self._gpg.is_encrypted(data)
+ return openpgp.is_encrypted(data)
def is_encrypted_sym(self, data):
"""
@@ -141,7 +126,7 @@ class SoledadCrypto(object):
@return: whether data is encrypted to a symmetric key
@rtype: bool
"""
- return self._gpg.is_encrypted_sym(data)
+ return openpgp.is_encrypted_sym(data)
def is_encrypted_asym(self, data):
"""
@@ -151,14 +136,14 @@ class SoledadCrypto(object):
@return: whether data is encrypted to an OpenPGP private key
@rtype: bool
"""
- return self._gpg.is_encrypted_asym(data)
+ return openpgp.is_encrypted_asym(data)
- def _hash_passphrase(self, suffix):
+ def passphrase_hash(self, suffix):
"""
Generate a passphrase for symmetric encryption.
- The password is derived from C{suffix} and the secret for
- symmetric encryption previously loaded.
+ The password is derived from the secret for symmetric encryption and
+ a C{suffix} that is appended to the secret prior to hashing.
@param suffix: Will be appended to the symmetric key before hashing.
@type suffix: str
@@ -172,9 +157,9 @@ class SoledadCrypto(object):
return b2a_base64(
sha256('%s%s' % (self._symkey, suffix)).digest())[:-1]
- #
- # symkey setters/getters
- #
+ #
+ # symkey setters/getters
+ #
def _get_symkey(self):
return self._symkey
diff --git a/src/leap/soledad/tests/__init__.py b/src/leap/soledad/tests/__init__.py
index 28114391..dac27a29 100644
--- a/src/leap/soledad/tests/__init__.py
+++ b/src/leap/soledad/tests/__init__.py
@@ -36,7 +36,7 @@ class BaseSoledadTest(BaseLeapTest):
self._soledad = self._soledad_instance(user=self.email)
self._soledad._init_dirs()
#self._soledad._gpg.import_keys(PUBLIC_KEY)
- self._soledad._crypto = SoledadCrypto(self.gnupg_home)
+ self._soledad._crypto = SoledadCrypto(self._soledad)
if not self._soledad._has_symkey():
self._soledad._gen_symkey()
self._soledad._load_symkey()
diff --git a/src/leap/soledad/tests/test_crypto.py b/src/leap/soledad/tests/test_crypto.py
index f762437a..039d2f3c 100644
--- a/src/leap/soledad/tests/test_crypto.py
+++ b/src/leap/soledad/tests/test_crypto.py
@@ -77,7 +77,7 @@ class RecoveryDocumentTestCase(BaseSoledadTest):
rd = self._soledad.export_recovery_document(None)
self.assertEqual(
{
- 'user': self._soledad._user,
+ 'address': self._soledad._address,
'symkey': self._soledad._symkey
},
json.loads(rd),
@@ -89,10 +89,10 @@ class RecoveryDocumentTestCase(BaseSoledadTest):
self.assertEqual(True,
self._soledad._crypto.is_encrypted_sym(rd))
data = {
- 'user': self._soledad._user,
+ 'address': self._soledad._address,
'symkey': self._soledad._symkey,
}
- raw_data = json.loads(str(self._soledad._crypto.decrypt(
+ raw_data = json.loads(str(self._soledad._crypto.decrypt_sym(
rd,
passphrase='123456')))
self.assertEqual(
@@ -111,10 +111,10 @@ class RecoveryDocumentTestCase(BaseSoledadTest):
gnupg_home = self.gnupg_home = "%s/gnupg2" % self.tempdir
s = self._soledad_instance(user='anotheruser@leap.se', prefix='/2')
s._init_dirs()
- s._crypto = SoledadCrypto(gnupg_home)
+ s._crypto = SoledadCrypto(s)
s.import_recovery_document(rd, None)
- self.assertEqual(self._soledad._user,
- s._user, 'Failed setting user email.')
+ self.assertEqual(self._soledad._address,
+ s._address, 'Failed setting user email.')
self.assertEqual(self._soledad._symkey,
s._symkey,
'Failed settinng secret for symmetric encryption.')
@@ -124,10 +124,10 @@ class RecoveryDocumentTestCase(BaseSoledadTest):
gnupg_home = self.gnupg_home = "%s/gnupg2" % self.tempdir
s = self._soledad_instance(user='anotheruser@leap.se', prefix='3')
s._init_dirs()
- s._crypto = SoledadCrypto(gnupg_home)
+ s._crypto = SoledadCrypto(s)
s.import_recovery_document(rd, '123456')
- self.assertEqual(self._soledad._user,
- s._user, 'Failed setting user email.')
+ self.assertEqual(self._soledad._address,
+ s._address, 'Failed setting user email.')
self.assertEqual(self._soledad._symkey,
s._symkey,
'Failed settinng secret for symmetric encryption.')
@@ -138,7 +138,7 @@ class CryptoMethodsTestCase(BaseSoledadTest):
def test__gen_symkey(self):
sol = self._soledad_instance(user='user@leap.se', prefix='/3')
sol._init_dirs()
- sol._crypto = SoledadCrypto("%s/3/gnupg" % self.tempdir)
+ sol._crypto = SoledadCrypto(sol)
self.assertFalse(sol._has_symkey(), "Should not have a symkey at "
"this point")
sol._gen_symkey()
@@ -147,7 +147,7 @@ class CryptoMethodsTestCase(BaseSoledadTest):
def test__has_keys(self):
sol = self._soledad_instance(user='leap@leap.se', prefix='/5')
sol._init_dirs()
- sol._crypto = SoledadCrypto("%s/5/gnupg" % self.tempdir)
+ sol._crypto = SoledadCrypto(sol)
self.assertFalse(sol._has_keys())
sol._gen_symkey()
self.assertTrue(sol._has_keys())
diff --git a/src/leap/soledad/tests/test_soledad.py b/src/leap/soledad/tests/test_soledad.py
index b849c310..bbe9ad4b 100644
--- a/src/leap/soledad/tests/test_soledad.py
+++ b/src/leap/soledad/tests/test_soledad.py
@@ -50,7 +50,7 @@ class AuxMethodsTestCase(BaseSoledadTest):
def test__init_db(self):
sol = self._soledad_instance()
sol._init_dirs()
- sol._crypto = SoledadCrypto(self.tempdir+'/gnupg')
+ sol._crypto = SoledadCrypto(sol)
#self._soledad._gpg.import_keys(PUBLIC_KEY)
if not sol._has_symkey():
sol._gen_symkey()
@@ -63,7 +63,7 @@ class AuxMethodsTestCase(BaseSoledadTest):
"""
Test if configuration defaults point to the correct place.
"""
- sol = Soledad(user='leap@leap.se', passphrase='123', bootstrap=False)
+ sol = Soledad('leap@leap.se', passphrase='123', bootstrap=False)
self.assertTrue(bool(re.match(
'.*/\.config/leap/soledad/gnupg', sol._config.get_gnupg_home())))
self.assertTrue(bool(re.match(
@@ -83,7 +83,7 @@ class AuxMethodsTestCase(BaseSoledadTest):
# we use regexp match here because HOME environment variable is
# changed by the BaseLeapTest class but BaseConfig does not capture
# that change.
- sol = Soledad(user='leap@leap.se', passphrase='123', bootstrap=False)
+ sol = Soledad('leap@leap.se', passphrase='123', bootstrap=False)
self.assertTrue(bool(re.match(
'.*/\.config/leap/soledad/gnupg', sol._config.get_gnupg_home())))
self.assertTrue(bool(re.match(
@@ -114,8 +114,8 @@ class AuxMethodsTestCase(BaseSoledadTest):
f.write(json.dumps(config_values))
f.close()
sol = Soledad(
- user='leap@leap.se',
- passphrase='123',
+ 'leap@leap.se',
+ passphrase='123',
bootstrap=False,
config_path=tmpfile)
self.assertEqual('value_1', sol._config.get_gnupg_home())
@@ -131,8 +131,8 @@ class AuxMethodsTestCase(BaseSoledadTest):
# changed by the BaseLeapTest class but BaseConfig does not capture
# that change.
sol = Soledad(
- user='leap@leap.se',
- passphrase='123',
+ 'leap@leap.se',
+ passphrase='123',
bootstrap=False,
gnupg_home='value_4',
secret_path='value_3',
diff --git a/src/leap/soledad/util.py b/src/leap/soledad/util.py
deleted file mode 100644
index 90797501..00000000
--- a/src/leap/soledad/util.py
+++ /dev/null
@@ -1,393 +0,0 @@
-# -*- coding: utf-8 -*-
-# util.py
-# Copyright (C) 2013 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-
-"""
-Utilities for Soledad.
-"""
-
-import os
-import gnupg
-import re
-from gnupg import (
- logger,
- _is_sequence,
- _make_binary_stream,
-)
-
-
-class ListPackets():
- """
- Handle status messages for --list-packets.
- """
-
- def __init__(self, gpg):
- """
- Initialize the packet listing handling class.
-
- @param gpg: GPG object instance.
- @type gpg: gnupg.GPG
- """
- self.gpg = gpg
- self.nodata = None
- self.key = None
- self.need_passphrase = None
- self.need_passphrase_sym = None
- self.userid_hint = None
-
- def handle_status(self, key, value):
- """
- Handle one line of the --list-packets status message.
-
- @param key: The status message key.
- @type key: str
- @param value: The status message value.
- @type value: str
- """
- # TODO: write tests for handle_status
- if key == 'NODATA':
- self.nodata = True
- if key == 'ENC_TO':
- # This will only capture keys in our keyring. In the future we
- # may want to include multiple unknown keys in this list.
- self.key, _, _ = value.split()
- if key == 'NEED_PASSPHRASE':
- self.need_passphrase = True
- if key == 'NEED_PASSPHRASE_SYM':
- self.need_passphrase_sym = True
- if key == 'USERID_HINT':
- self.userid_hint = value.strip().split()
-
-
-class GPGWrapper(gnupg.GPG):
- """
- This is a temporary class for handling GPG requests, and should be
- replaced by a more general class used throughout the project.
- """
-
- GNUPG_HOME = os.environ['HOME'] + "/.config/leap/gnupg"
- GNUPG_BINARY = "/usr/bin/gpg" # this has to be changed based on OS
-
- def __init__(self, gpgbinary=GNUPG_BINARY, gnupghome=GNUPG_HOME,
- verbose=False, use_agent=False, keyring=None, options=None):
- """
- Initialize a GnuPG process wrapper.
-
- @param gpgbinary: Name for GnuPG binary executable.
- @type gpgbinary: C{str}
- @param gpghome: Full pathname to directory containing the public and
- private keyrings.
- @type gpghome: C{str}
- @param keyring: Name of alternative keyring file to use. If specified,
- the default keyring is not used.
- @param verbose: Should some verbose info be output?
- @type verbose: bool
- @param use_agent: Should pass `--use-agent` to GPG binary?
- @type use_agent: bool
- @param keyring: Path for the keyring to use.
- @type keyring: str
- @options: A list of additional options to pass to the GPG binary.
- @type options: list
-
- @raise: RuntimeError with explanation message if there is a problem
- invoking gpg.
- """
- gnupg.GPG.__init__(self, gnupghome=gnupghome, gpgbinary=gpgbinary,
- verbose=verbose, use_agent=use_agent,
- keyring=keyring, options=options)
- self.result_map['list-packets'] = ListPackets
-
- def find_key_by_email(self, email, secret=False):
- """
- Find user's key based on their email.
-
- @param email: Email address of key being searched for.
- @type email: str
- @param secret: Should we search for a secret key?
- @type secret: bool
-
- @return: The fingerprint of the found key.
- @rtype: str
- """
- for key in self.list_keys(secret=secret):
- for uid in key['uids']:
- if re.search(email, uid):
- return key
- raise LookupError("GnuPG public key for email %s not found!" % email)
-
- def find_key_by_subkey(self, subkey, secret=False):
- """
- Find user's key based on a subkey fingerprint.
-
- @param email: Subkey fingerprint of the key being searched for.
- @type email: str
- @param secret: Should we search for a secret key?
- @type secret: bool
-
- @return: The fingerprint of the found key.
- @rtype: str
- """
- for key in self.list_keys(secret=secret):
- for sub in key['subkeys']:
- if sub[0] == subkey:
- return key
- raise LookupError(
- "GnuPG public key for subkey %s not found!" % subkey)
-
- def find_key_by_keyid(self, keyid, secret=False):
- """
- Find user's key based on the key ID.
-
- @param email: The key ID of the key being searched for.
- @type email: str
- @param secret: Should we search for a secret key?
- @type secret: bool
-
- @return: The fingerprint of the found key.
- @rtype: str
- """
- for key in self.list_keys(secret=secret):
- if keyid == key['keyid']:
- return key
- raise LookupError(
- "GnuPG public key for keyid %s not found!" % keyid)
-
- def find_key_by_fingerprint(self, fingerprint, secret=False):
- """
- Find user's key based on the key fingerprint.
-
- @param email: The fingerprint of the key being searched for.
- @type email: str
- @param secret: Should we search for a secret key?
- @type secret: bool
-
- @return: The fingerprint of the found key.
- @rtype: str
- """
- for key in self.list_keys(secret=secret):
- if fingerprint == key['fingerprint']:
- return key
- raise LookupError(
- "GnuPG public key for fingerprint %s not found!" % fingerprint)
-
- def encrypt(self, data, recipient, sign=None, always_trust=True,
- passphrase=None, symmetric=False):
- """
- Encrypt data using GPG.
-
- @param data: The data to be encrypted.
- @type data: str
- @param recipient: The address of the public key to be used.
- @type recipient: str
- @param sign: Should the encrypted content be signed?
- @type sign: bool
- @param always_trust: Skip key validation and assume that used keys
- are always fully trusted?
- @type always_trust: bool
- @param passphrase: The passphrase to be used if symmetric encryption
- is desired.
- @type passphrase: str
- @param symmetric: Should we encrypt to a password?
- @type symmetric: bool
-
- @return: An object with encrypted result in the `data` field.
- @rtype: gnupg.Crypt
- """
- # TODO: devise a way so we don't need to "always trust".
- return gnupg.GPG.encrypt(self, data, recipient, sign=sign,
- always_trust=always_trust,
- passphrase=passphrase,
- symmetric=symmetric,
- cipher_algo='AES256')
-
- def decrypt(self, data, always_trust=True, passphrase=None):
- """
- Decrypt data using GPG.
-
- @param data: The data to be decrypted.
- @type data: str
- @param always_trust: Skip key validation and assume that used keys
- are always fully trusted?
- @type always_trust: bool
- @param passphrase: The passphrase to be used if symmetric encryption
- is desired.
- @type passphrase: str
-
- @return: An object with decrypted result in the `data` field.
- @rtype: gnupg.Crypt
- """
- # TODO: devise a way so we don't need to "always trust".
- return gnupg.GPG.decrypt(self, data, always_trust=always_trust,
- passphrase=passphrase)
-
- def send_keys(self, keyserver, *keyids):
- """
- Send keys to a keyserver
-
- @param keyserver: The keyserver to send the keys to.
- @type keyserver: str
- @param keyids: The key ids to send.
- @type keyids: list
-
- @return: A list of keys sent to server.
- @rtype: gnupg.ListKeys
- """
- # TODO: write tests for this.
- # TODO: write a SendKeys class to handle status for this.
- result = self.result_map['list'](self)
- gnupg.logger.debug('send_keys: %r', keyids)
- data = gnupg._make_binary_stream("", self.encoding)
- args = ['--keyserver', keyserver, '--send-keys']
- args.extend(keyids)
- self._handle_io(args, data, result, binary=True)
- gnupg.logger.debug('send_keys result: %r', result.__dict__)
- data.close()
- return result
-
- def encrypt_file(self, file, recipients, sign=None,
- always_trust=False, passphrase=None,
- armor=True, output=None, symmetric=False,
- cipher_algo=None):
- """
- Encrypt the message read from the file-like object 'file'.
-
- @param file: The file to be encrypted.
- @type data: file
- @param recipient: The address of the public key to be used.
- @type recipient: str
- @param sign: Should the encrypted content be signed?
- @type sign: bool
- @param always_trust: Skip key validation and assume that used keys
- are always fully trusted?
- @type always_trust: bool
- @param passphrase: The passphrase to be used if symmetric encryption
- is desired.
- @type passphrase: str
- @param armor: Create ASCII armored output?
- @type armor: bool
- @param output: Path of file to write results in.
- @type output: str
- @param symmetric: Should we encrypt to a password?
- @type symmetric: bool
- @param cipher_algo: Algorithm to use.
- @type cipher_algo: str
-
- @return: An object with encrypted result in the `data` field.
- @rtype: gnupg.Crypt
- """
- args = ['--encrypt']
- if symmetric:
- args = ['--symmetric']
- if cipher_algo:
- args.append('--cipher-algo %s' % cipher_algo)
- else:
- args = ['--encrypt']
- if not _is_sequence(recipients):
- recipients = (recipients,)
- for recipient in recipients:
- args.append('--recipient "%s"' % recipient)
- if armor: # create ascii-armored output - set to False for binary
- args.append('--armor')
- if output: # write the output to a file with the specified name
- if os.path.exists(output):
- os.remove(output) # to avoid overwrite confirmation message
- args.append('--output "%s"' % output)
- if sign:
- args.append('--sign --default-key "%s"' % sign)
- if always_trust:
- args.append("--always-trust")
- result = self.result_map['crypt'](self)
- self._handle_io(args, file, result, passphrase=passphrase, binary=True)
- logger.debug('encrypt result: %r', result.data)
- return result
-
- def list_packets(self, data):
- """
- List the sequence of packets.
-
- @param data: The data to extract packets from.
- @type data: str
-
- @return: An object with packet info.
- @rtype ListPackets
- """
- args = ["--list-packets"]
- result = self.result_map['list-packets'](self)
- self._handle_io(
- args,
- _make_binary_stream(data, self.encoding),
- result,
- )
- return result
-
- def encrypted_to(self, data):
- """
- Return the key to which data is encrypted to.
-
- @param data: The data to be examined.
- @type data: str
-
- @return: The fingerprint of the key to which data is encrypted to.
- @rtype: str
- """
- # TODO: make this support multiple keys.
- result = self.list_packets(data)
- if not result.key:
- raise LookupError(
- "Content is not encrypted to a GnuPG key!")
- try:
- return self.find_key_by_keyid(result.key)
- except:
- return self.find_key_by_subkey(result.key)
-
- def is_encrypted_sym(self, data):
- """
- Say whether some chunk of data is encrypted to a symmetric key.
-
- @param data: The data to be examined.
- @type data: str
-
- @return: Whether data is encrypted to a symmetric key.
- @rtype: bool
- """
- result = self.list_packets(data)
- return bool(result.need_passphrase_sym)
-
- def is_encrypted_asym(self, data):
- """
- Say whether some chunk of data is encrypted to a private key.
-
- @param data: The data to be examined.
- @type data: str
-
- @return: Whether data is encrypted to a private key.
- @rtype: bool
- """
- result = self.list_packets(data)
- return bool(result.key)
-
- def is_encrypted(self, data):
- """
- Say whether some chunk of data is encrypted to a key.
-
- @param data: The data to be examined.
- @type data: str
-
- @return: Whether data is encrypted to a key.
- @rtype: bool
- """
- self.is_encrypted_asym() or self.is_encrypted_sym()