diff options
author | drebs <drebs@leap.se> | 2013-02-19 13:04:55 -0300 |
---|---|---|
committer | drebs <drebs@leap.se> | 2013-02-19 13:04:55 -0300 |
commit | 8bf2cfc9ec699eceee49c04360434d9c7b0cdf45 (patch) | |
tree | 9477a5d50d3ddafabda4561579f51bdd04cba05a | |
parent | 77a29a4cda84ee7d7d4859d5ed183810a3e81693 (diff) |
Add support for verifying encryption status of data with gpg.
-rw-r--r-- | __init__.py | 29 | ||||
-rw-r--r-- | backends/leap_backend.py | 23 | ||||
-rw-r--r-- | tests/__init__.py | 2 | ||||
-rw-r--r-- | tests/test_encrypted.py | 22 | ||||
-rw-r--r-- | tests/test_leap_backend.py | 1 | ||||
-rw-r--r-- | util.py | 92 |
6 files changed, 140 insertions, 29 deletions
diff --git a/__init__.py b/__init__.py index 92c9feb5..d1518a91 100644 --- a/__init__.py +++ b/__init__.py @@ -35,7 +35,7 @@ class Soledad(object): # other configs SECRET_LENGTH = 50 - def __init__(self, user_email, gpghome=None, initialize=True, + def __init__(self, user_email, gnupghome=None, initialize=True, prefix=None, secret_path=None, local_db_path=None): """ Bootstrap Soledad, initialize cryptographic material and open @@ -47,7 +47,7 @@ class Soledad(object): self.LOCAL_DB_PATH = local_db_path or self.LOCAL_DB_PATH if not os.path.isdir(self.PREFIX): os.makedirs(self.PREFIX) - self._gpg = GPGWrapper(gpghome=(gpghome or self.GNUPG_HOME)) + self._gpg = GPGWrapper(gnupghome=(gnupghome or self.GNUPG_HOME)) if initialize: self._init_crypto() self._init_db() @@ -131,7 +131,7 @@ class Soledad(object): """ # TODO: verify if we have the corresponding private key. try: - self._gpg.find_key(self._user_email) + self._gpg.find_key_by_email(self._user_email) return True except LookupError: return False @@ -152,7 +152,8 @@ class Soledad(object): """ Find fingerprint for this user's OpenPGP keypair. """ - self._fingerprint = self._gpg.find_key(self._user_email)['fingerprint'] + self._fingerprint = self._gpg.find_key_by_email( + self._user_email)['fingerprint'] def publish_pubkey(self, keyserver): """ @@ -177,8 +178,9 @@ class Soledad(object): """ Encrypt data using symmetric secret. """ - h = hmac.new(self._secret, doc_id).hexdigest() - return self.encrypt(data, sign=sign, passphrase=h, symmetric=True) + return self.encrypt(data, sign=sign, + passphrase=self._hmac_passphrase(doc_id), + symmetric=True) def decrypt(self, data, passphrase=None, symmetric=False): """ @@ -190,8 +192,19 @@ class Soledad(object): """ Decrypt data using symmetric secret. """ - h = hmac.new(self._secret, doc_id).hexdigest() - return self.decrypt(data, passphrase=h) + return self.decrypt(data, passphrase=self._hmac_passphrase(doc_id)) + + def _hmac_passphrase(self, doc_id): + return hmac.new(self._secret, doc_id).hexdigest() + + def is_encrypted(self, data): + return self._gpg.is_encrypted(data) + + def is_encrypted_sym(self, data): + return self._gpg.is_encrypted_sym(data) + + def is_encrypted_asym(self, data): + return self._gpg.is_encrypted_asym(data) #------------------------------------------------------------------------- # Document storage, retrieval and sync diff --git a/backends/leap_backend.py b/backends/leap_backend.py index 571cd8ca..f7b1becc 100644 --- a/backends/leap_backend.py +++ b/backends/leap_backend.py @@ -31,9 +31,9 @@ class NoSoledadInstance(Exception): pass -class DocumentEncryptionFailed(Exception): +class DocumentNotEncrypted(Exception): """ - Exception to signal the failure of document encryption. + Exception to signal failures in document encryption. """ pass @@ -135,6 +135,11 @@ class LeapSyncTarget(HTTPSyncTarget): line, comma = utils.check_and_strip_comma(entry) entry = json.loads(line) # decrypt after receiving from server. + if not self._soledad: + raise NoSoledadInstance() + if not self._soledad.is_encrypted_sym(entry['content']): + raise DocumentNotEncrypted( + "Incoming document from sync is not encrypted.") doc = LeapDocument(entry['id'], entry['rev'], encrypted_json=entry['content'], soledad=self._soledad) @@ -183,16 +188,12 @@ class LeapSyncTarget(HTTPSyncTarget): for doc, gen, trans_id in docs_by_generations: if doc.syncable: # encrypt and verify before sending to server. - doc_content = doc.get_encrypted_json() - if doc_content == doc.get_json(): - raise DocumentEncryptionFailed - enc_doc = LeapDocument(doc.doc_id, doc.rev, - encrypted_json=doc_content, - soledad=self._soledad) - if doc.get_json() != enc_doc.get_json(): - raise DocumentEncryptionFailed + enc_json = doc.get_encrypted_json() + if not self._soledad.is_encrypted_sym(enc_json): + raise DocumentNotEncrypted( + "Could not encrypt document before sync.") size += prepare(id=doc.doc_id, rev=doc.rev, - content=doc_content, + content=enc_json, gen=gen, trans_id=trans_id) entries.append('\r\n]') size += len(entries[-1]) diff --git a/tests/__init__.py b/tests/__init__.py index 1a9962a7..4bdf814c 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -30,7 +30,7 @@ class BaseSoledadTest(BaseLeapTest): self._db2 = u1db.open(self.db2_file, create=True, document_factory=LeapDocument) # open a soledad instance - self._soledad = Soledad(self.email, gpghome=self.gnupg_home, + self._soledad = Soledad(self.email, gnupghome=self.gnupg_home, initialize=False) self._soledad._gpg.import_keys(PUBLIC_KEY) self._soledad._gpg.import_keys(PRIVATE_KEY) diff --git a/tests/test_encrypted.py b/tests/test_encrypted.py index 18894331..4a48266e 100644 --- a/tests/test_encrypted.py +++ b/tests/test_encrypted.py @@ -1,5 +1,11 @@ from leap.soledad.backends.leap_backend import LeapDocument from leap.soledad.tests import BaseSoledadTest +from leap.soledad.tests import KEY_FINGERPRINT + +try: + import simplejson as json +except ImportError: + import json # noqa class EncryptedSyncTestCase(BaseSoledadTest): @@ -7,9 +13,9 @@ class EncryptedSyncTestCase(BaseSoledadTest): Tests that guarantee that data will always be encrypted when syncing. """ - def test_get_set_encrypted(self): + def test_get_set_encrypted_json(self): """ - Test if document encryption is working. + Test getting and setting encrypted content. """ doc1 = LeapDocument(soledad=self._soledad) doc1.content = {'key': 'val'} @@ -19,3 +25,15 @@ class EncryptedSyncTestCase(BaseSoledadTest): res1 = doc1.get_json() res2 = doc2.get_json() self.assertEqual(res1, res2, 'incorrect document encryption') + + def test_successful_symmetric_encryption(self): + """ + Test for successful symmetric encryption. + """ + doc1 = LeapDocument(soledad=self._soledad) + doc1.content = {'key': 'val'} + enc_json = json.loads(doc1.get_encrypted_json())['_encrypted_json'] + self.assertEqual( + True, + self._soledad._gpg.is_encrypted_sym(enc_json), + "could not encrypt with passphrase.") diff --git a/tests/test_leap_backend.py b/tests/test_leap_backend.py index a061533c..9056355f 100644 --- a/tests/test_leap_backend.py +++ b/tests/test_leap_backend.py @@ -134,7 +134,6 @@ class TestLeapParsingSyncStream(test_remote_sync_target.TestParsingSyncStream): self.assertRaises(u1db.errors.BrokenSyncStream, tgt._parse_sync_stream, "[\r\n{},\r\n]", None) - self.assertRaises(leap_backend.NoSoledadInstance, tgt._parse_sync_stream, '[\r\n{},\r\n{"id": "i", "rev": "r", ' @@ -5,7 +5,40 @@ Utilities for Soledad. import os import gnupg import re -from gnupg import logger +from gnupg import ( + logger, + _is_sequence, + _make_binary_stream, +) + + +class ListPackets(): + """ + Handle status messages for --list-packets. + """ + + def __init__(self, gpg): + self.gpg = gpg + self.nodata = None + self.key = None + self.need_passphrase = None + self.need_passphrase_sym = None + self.userid_hint = None + + def handle_status(self, key, value): + # TODO: write tests for handle_status + if key == 'NODATA': + self.nodata = True + if key == 'ENC_TO': + # This will only capture keys in our keyring. In the future we + # may want to include multiple unknown keys in this list. + self.key, _, _ = value.split() + if key == 'NEED_PASSPHRASE': + self.need_passphrase = True + if key == 'NEED_PASSPHRASE_SYM': + self.need_passphrase_sym = True + if key == 'USERID_HINT': + self.userid_hint = value.strip().split() class GPGWrapper(gnupg.GPG): @@ -17,11 +50,17 @@ class GPGWrapper(gnupg.GPG): GNUPG_HOME = os.environ['HOME'] + "/.config/leap/gnupg" GNUPG_BINARY = "/usr/bin/gpg" # this has to be changed based on OS - def __init__(self, gpghome=GNUPG_HOME, gpgbinary=GNUPG_BINARY): - super(GPGWrapper, self).__init__(gnupghome=gpghome, - gpgbinary=gpgbinary) + def __init__(self, gpgbinary=GNUPG_BINARY, gnupghome=GNUPG_HOME, + verbose=False, use_agent=False, keyring=None, options=None): + super(GPGWrapper, self).__init__(gnupghome=gnupghome, + gpgbinary=gpgbinary, + verbose=verbose, + use_agent=use_agent, + keyring=keyring, + options=options) + self.result_map['list-packets'] = ListPackets - def find_key(self, email): + def find_key_by_email(self, email): """ Find user's key based on their email. """ @@ -29,7 +68,16 @@ class GPGWrapper(gnupg.GPG): for uid in key['uids']: if re.search(email, uid): return key - raise LookupError("GnuPG public key for %s not found!" % email) + raise LookupError("GnuPG public key for email %s not found!" % email) + + def find_key_by_subkey(self, subkey): + for key in self.list_keys(): + for sub in key['subkeys']: + #print sub[0] + if sub[0] == subkey: + return key + raise LookupError( + "GnuPG public key for subkey %s not found!" % subkey) def encrypt(self, data, recipient, sign=None, always_trust=True, passphrase=None, symmetric=False): @@ -96,3 +144,35 @@ class GPGWrapper(gnupg.GPG): self._handle_io(args, file, result, passphrase=passphrase, binary=True) logger.debug('encrypt result: %r', result.data) return result + + def list_packets(self, raw_data): + args = ["--list-packets"] + result = self.result_map['list-packets'](self) + self._handle_io( + args, + _make_binary_stream(raw_data, self.encoding), + result, + ) + return result + + def encrypted_to(self, raw_data): + """ + Return the key to which raw_data is encrypted to. + """ + # TODO: make this support multiple keys. + result = self.list_packets(raw_data) + if not result.encrypted_sym: + raise LookupError( + "Content is not encrypted to a GnuPG key!") + return self.find_key_by_subkey(result.key) + + def is_encrypted_sym(self, raw_data): + result = self.list_packets(raw_data) + return bool(result.need_passphrase_sym) + + def is_encrypted_asym(self, raw_data): + result = self.list_packets(raw_data) + return bool(result.need_passphrase) + + def is_encrypted(self, raw_data): + self.is_encrypted_asym() or self.is_encrypted_sym() |