summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordrebs <drebs@leap.se>2013-02-19 13:04:55 -0300
committerdrebs <drebs@leap.se>2013-02-19 13:04:55 -0300
commit8bf2cfc9ec699eceee49c04360434d9c7b0cdf45 (patch)
tree9477a5d50d3ddafabda4561579f51bdd04cba05a
parent77a29a4cda84ee7d7d4859d5ed183810a3e81693 (diff)
Add support for verifying encryption status of data with gpg.
-rw-r--r--__init__.py29
-rw-r--r--backends/leap_backend.py23
-rw-r--r--tests/__init__.py2
-rw-r--r--tests/test_encrypted.py22
-rw-r--r--tests/test_leap_backend.py1
-rw-r--r--util.py92
6 files changed, 140 insertions, 29 deletions
diff --git a/__init__.py b/__init__.py
index 92c9feb5..d1518a91 100644
--- a/__init__.py
+++ b/__init__.py
@@ -35,7 +35,7 @@ class Soledad(object):
# other configs
SECRET_LENGTH = 50
- def __init__(self, user_email, gpghome=None, initialize=True,
+ def __init__(self, user_email, gnupghome=None, initialize=True,
prefix=None, secret_path=None, local_db_path=None):
"""
Bootstrap Soledad, initialize cryptographic material and open
@@ -47,7 +47,7 @@ class Soledad(object):
self.LOCAL_DB_PATH = local_db_path or self.LOCAL_DB_PATH
if not os.path.isdir(self.PREFIX):
os.makedirs(self.PREFIX)
- self._gpg = GPGWrapper(gpghome=(gpghome or self.GNUPG_HOME))
+ self._gpg = GPGWrapper(gnupghome=(gnupghome or self.GNUPG_HOME))
if initialize:
self._init_crypto()
self._init_db()
@@ -131,7 +131,7 @@ class Soledad(object):
"""
# TODO: verify if we have the corresponding private key.
try:
- self._gpg.find_key(self._user_email)
+ self._gpg.find_key_by_email(self._user_email)
return True
except LookupError:
return False
@@ -152,7 +152,8 @@ class Soledad(object):
"""
Find fingerprint for this user's OpenPGP keypair.
"""
- self._fingerprint = self._gpg.find_key(self._user_email)['fingerprint']
+ self._fingerprint = self._gpg.find_key_by_email(
+ self._user_email)['fingerprint']
def publish_pubkey(self, keyserver):
"""
@@ -177,8 +178,9 @@ class Soledad(object):
"""
Encrypt data using symmetric secret.
"""
- h = hmac.new(self._secret, doc_id).hexdigest()
- return self.encrypt(data, sign=sign, passphrase=h, symmetric=True)
+ return self.encrypt(data, sign=sign,
+ passphrase=self._hmac_passphrase(doc_id),
+ symmetric=True)
def decrypt(self, data, passphrase=None, symmetric=False):
"""
@@ -190,8 +192,19 @@ class Soledad(object):
"""
Decrypt data using symmetric secret.
"""
- h = hmac.new(self._secret, doc_id).hexdigest()
- return self.decrypt(data, passphrase=h)
+ return self.decrypt(data, passphrase=self._hmac_passphrase(doc_id))
+
+ def _hmac_passphrase(self, doc_id):
+ return hmac.new(self._secret, doc_id).hexdigest()
+
+ def is_encrypted(self, data):
+ return self._gpg.is_encrypted(data)
+
+ def is_encrypted_sym(self, data):
+ return self._gpg.is_encrypted_sym(data)
+
+ def is_encrypted_asym(self, data):
+ return self._gpg.is_encrypted_asym(data)
#-------------------------------------------------------------------------
# Document storage, retrieval and sync
diff --git a/backends/leap_backend.py b/backends/leap_backend.py
index 571cd8ca..f7b1becc 100644
--- a/backends/leap_backend.py
+++ b/backends/leap_backend.py
@@ -31,9 +31,9 @@ class NoSoledadInstance(Exception):
pass
-class DocumentEncryptionFailed(Exception):
+class DocumentNotEncrypted(Exception):
"""
- Exception to signal the failure of document encryption.
+ Exception to signal failures in document encryption.
"""
pass
@@ -135,6 +135,11 @@ class LeapSyncTarget(HTTPSyncTarget):
line, comma = utils.check_and_strip_comma(entry)
entry = json.loads(line)
# decrypt after receiving from server.
+ if not self._soledad:
+ raise NoSoledadInstance()
+ if not self._soledad.is_encrypted_sym(entry['content']):
+ raise DocumentNotEncrypted(
+ "Incoming document from sync is not encrypted.")
doc = LeapDocument(entry['id'], entry['rev'],
encrypted_json=entry['content'],
soledad=self._soledad)
@@ -183,16 +188,12 @@ class LeapSyncTarget(HTTPSyncTarget):
for doc, gen, trans_id in docs_by_generations:
if doc.syncable:
# encrypt and verify before sending to server.
- doc_content = doc.get_encrypted_json()
- if doc_content == doc.get_json():
- raise DocumentEncryptionFailed
- enc_doc = LeapDocument(doc.doc_id, doc.rev,
- encrypted_json=doc_content,
- soledad=self._soledad)
- if doc.get_json() != enc_doc.get_json():
- raise DocumentEncryptionFailed
+ enc_json = doc.get_encrypted_json()
+ if not self._soledad.is_encrypted_sym(enc_json):
+ raise DocumentNotEncrypted(
+ "Could not encrypt document before sync.")
size += prepare(id=doc.doc_id, rev=doc.rev,
- content=doc_content,
+ content=enc_json,
gen=gen, trans_id=trans_id)
entries.append('\r\n]')
size += len(entries[-1])
diff --git a/tests/__init__.py b/tests/__init__.py
index 1a9962a7..4bdf814c 100644
--- a/tests/__init__.py
+++ b/tests/__init__.py
@@ -30,7 +30,7 @@ class BaseSoledadTest(BaseLeapTest):
self._db2 = u1db.open(self.db2_file, create=True,
document_factory=LeapDocument)
# open a soledad instance
- self._soledad = Soledad(self.email, gpghome=self.gnupg_home,
+ self._soledad = Soledad(self.email, gnupghome=self.gnupg_home,
initialize=False)
self._soledad._gpg.import_keys(PUBLIC_KEY)
self._soledad._gpg.import_keys(PRIVATE_KEY)
diff --git a/tests/test_encrypted.py b/tests/test_encrypted.py
index 18894331..4a48266e 100644
--- a/tests/test_encrypted.py
+++ b/tests/test_encrypted.py
@@ -1,5 +1,11 @@
from leap.soledad.backends.leap_backend import LeapDocument
from leap.soledad.tests import BaseSoledadTest
+from leap.soledad.tests import KEY_FINGERPRINT
+
+try:
+ import simplejson as json
+except ImportError:
+ import json # noqa
class EncryptedSyncTestCase(BaseSoledadTest):
@@ -7,9 +13,9 @@ class EncryptedSyncTestCase(BaseSoledadTest):
Tests that guarantee that data will always be encrypted when syncing.
"""
- def test_get_set_encrypted(self):
+ def test_get_set_encrypted_json(self):
"""
- Test if document encryption is working.
+ Test getting and setting encrypted content.
"""
doc1 = LeapDocument(soledad=self._soledad)
doc1.content = {'key': 'val'}
@@ -19,3 +25,15 @@ class EncryptedSyncTestCase(BaseSoledadTest):
res1 = doc1.get_json()
res2 = doc2.get_json()
self.assertEqual(res1, res2, 'incorrect document encryption')
+
+ def test_successful_symmetric_encryption(self):
+ """
+ Test for successful symmetric encryption.
+ """
+ doc1 = LeapDocument(soledad=self._soledad)
+ doc1.content = {'key': 'val'}
+ enc_json = json.loads(doc1.get_encrypted_json())['_encrypted_json']
+ self.assertEqual(
+ True,
+ self._soledad._gpg.is_encrypted_sym(enc_json),
+ "could not encrypt with passphrase.")
diff --git a/tests/test_leap_backend.py b/tests/test_leap_backend.py
index a061533c..9056355f 100644
--- a/tests/test_leap_backend.py
+++ b/tests/test_leap_backend.py
@@ -134,7 +134,6 @@ class TestLeapParsingSyncStream(test_remote_sync_target.TestParsingSyncStream):
self.assertRaises(u1db.errors.BrokenSyncStream,
tgt._parse_sync_stream, "[\r\n{},\r\n]", None)
-
self.assertRaises(leap_backend.NoSoledadInstance,
tgt._parse_sync_stream,
'[\r\n{},\r\n{"id": "i", "rev": "r", '
diff --git a/util.py b/util.py
index 09bca6b4..2103c056 100644
--- a/util.py
+++ b/util.py
@@ -5,7 +5,40 @@ Utilities for Soledad.
import os
import gnupg
import re
-from gnupg import logger
+from gnupg import (
+ logger,
+ _is_sequence,
+ _make_binary_stream,
+)
+
+
+class ListPackets():
+ """
+ Handle status messages for --list-packets.
+ """
+
+ def __init__(self, gpg):
+ self.gpg = gpg
+ self.nodata = None
+ self.key = None
+ self.need_passphrase = None
+ self.need_passphrase_sym = None
+ self.userid_hint = None
+
+ def handle_status(self, key, value):
+ # TODO: write tests for handle_status
+ if key == 'NODATA':
+ self.nodata = True
+ if key == 'ENC_TO':
+ # This will only capture keys in our keyring. In the future we
+ # may want to include multiple unknown keys in this list.
+ self.key, _, _ = value.split()
+ if key == 'NEED_PASSPHRASE':
+ self.need_passphrase = True
+ if key == 'NEED_PASSPHRASE_SYM':
+ self.need_passphrase_sym = True
+ if key == 'USERID_HINT':
+ self.userid_hint = value.strip().split()
class GPGWrapper(gnupg.GPG):
@@ -17,11 +50,17 @@ class GPGWrapper(gnupg.GPG):
GNUPG_HOME = os.environ['HOME'] + "/.config/leap/gnupg"
GNUPG_BINARY = "/usr/bin/gpg" # this has to be changed based on OS
- def __init__(self, gpghome=GNUPG_HOME, gpgbinary=GNUPG_BINARY):
- super(GPGWrapper, self).__init__(gnupghome=gpghome,
- gpgbinary=gpgbinary)
+ def __init__(self, gpgbinary=GNUPG_BINARY, gnupghome=GNUPG_HOME,
+ verbose=False, use_agent=False, keyring=None, options=None):
+ super(GPGWrapper, self).__init__(gnupghome=gnupghome,
+ gpgbinary=gpgbinary,
+ verbose=verbose,
+ use_agent=use_agent,
+ keyring=keyring,
+ options=options)
+ self.result_map['list-packets'] = ListPackets
- def find_key(self, email):
+ def find_key_by_email(self, email):
"""
Find user's key based on their email.
"""
@@ -29,7 +68,16 @@ class GPGWrapper(gnupg.GPG):
for uid in key['uids']:
if re.search(email, uid):
return key
- raise LookupError("GnuPG public key for %s not found!" % email)
+ raise LookupError("GnuPG public key for email %s not found!" % email)
+
+ def find_key_by_subkey(self, subkey):
+ for key in self.list_keys():
+ for sub in key['subkeys']:
+ #print sub[0]
+ if sub[0] == subkey:
+ return key
+ raise LookupError(
+ "GnuPG public key for subkey %s not found!" % subkey)
def encrypt(self, data, recipient, sign=None, always_trust=True,
passphrase=None, symmetric=False):
@@ -96,3 +144,35 @@ class GPGWrapper(gnupg.GPG):
self._handle_io(args, file, result, passphrase=passphrase, binary=True)
logger.debug('encrypt result: %r', result.data)
return result
+
+ def list_packets(self, raw_data):
+ args = ["--list-packets"]
+ result = self.result_map['list-packets'](self)
+ self._handle_io(
+ args,
+ _make_binary_stream(raw_data, self.encoding),
+ result,
+ )
+ return result
+
+ def encrypted_to(self, raw_data):
+ """
+ Return the key to which raw_data is encrypted to.
+ """
+ # TODO: make this support multiple keys.
+ result = self.list_packets(raw_data)
+ if not result.encrypted_sym:
+ raise LookupError(
+ "Content is not encrypted to a GnuPG key!")
+ return self.find_key_by_subkey(result.key)
+
+ def is_encrypted_sym(self, raw_data):
+ result = self.list_packets(raw_data)
+ return bool(result.need_passphrase_sym)
+
+ def is_encrypted_asym(self, raw_data):
+ result = self.list_packets(raw_data)
+ return bool(result.need_passphrase)
+
+ def is_encrypted(self, raw_data):
+ self.is_encrypted_asym() or self.is_encrypted_sym()