diff options
| -rw-r--r-- | __init__.py | 29 | ||||
| -rw-r--r-- | backends/leap_backend.py | 23 | ||||
| -rw-r--r-- | tests/__init__.py | 2 | ||||
| -rw-r--r-- | tests/test_encrypted.py | 22 | ||||
| -rw-r--r-- | tests/test_leap_backend.py | 1 | ||||
| -rw-r--r-- | util.py | 92 | 
6 files changed, 140 insertions, 29 deletions
diff --git a/__init__.py b/__init__.py index 92c9feb5..d1518a91 100644 --- a/__init__.py +++ b/__init__.py @@ -35,7 +35,7 @@ class Soledad(object):      # other configs      SECRET_LENGTH = 50 -    def __init__(self, user_email, gpghome=None, initialize=True, +    def __init__(self, user_email, gnupghome=None, initialize=True,                   prefix=None, secret_path=None, local_db_path=None):          """          Bootstrap Soledad, initialize cryptographic material and open @@ -47,7 +47,7 @@ class Soledad(object):          self.LOCAL_DB_PATH = local_db_path or self.LOCAL_DB_PATH          if not os.path.isdir(self.PREFIX):              os.makedirs(self.PREFIX) -        self._gpg = GPGWrapper(gpghome=(gpghome or self.GNUPG_HOME)) +        self._gpg = GPGWrapper(gnupghome=(gnupghome or self.GNUPG_HOME))          if initialize:              self._init_crypto()              self._init_db() @@ -131,7 +131,7 @@ class Soledad(object):          """          # TODO: verify if we have the corresponding private key.          try: -            self._gpg.find_key(self._user_email) +            self._gpg.find_key_by_email(self._user_email)              return True          except LookupError:              return False @@ -152,7 +152,8 @@ class Soledad(object):          """          Find fingerprint for this user's OpenPGP keypair.          """ -        self._fingerprint = self._gpg.find_key(self._user_email)['fingerprint'] +        self._fingerprint = self._gpg.find_key_by_email( +            self._user_email)['fingerprint']      def publish_pubkey(self, keyserver):          """ @@ -177,8 +178,9 @@ class Soledad(object):          """          Encrypt data using symmetric secret.          """ -        h = hmac.new(self._secret, doc_id).hexdigest() -        return self.encrypt(data, sign=sign, passphrase=h, symmetric=True) +        return self.encrypt(data, sign=sign, +                            passphrase=self._hmac_passphrase(doc_id), +                            symmetric=True)      def decrypt(self, data, passphrase=None, symmetric=False):          """ @@ -190,8 +192,19 @@ class Soledad(object):          """          Decrypt data using symmetric secret.          """ -        h = hmac.new(self._secret, doc_id).hexdigest() -        return self.decrypt(data, passphrase=h) +        return self.decrypt(data, passphrase=self._hmac_passphrase(doc_id)) + +    def _hmac_passphrase(self, doc_id): +        return hmac.new(self._secret, doc_id).hexdigest() + +    def is_encrypted(self, data): +        return self._gpg.is_encrypted(data) + +    def is_encrypted_sym(self, data): +        return self._gpg.is_encrypted_sym(data) + +    def is_encrypted_asym(self, data): +        return self._gpg.is_encrypted_asym(data)      #-------------------------------------------------------------------------      # Document storage, retrieval and sync diff --git a/backends/leap_backend.py b/backends/leap_backend.py index 571cd8ca..f7b1becc 100644 --- a/backends/leap_backend.py +++ b/backends/leap_backend.py @@ -31,9 +31,9 @@ class NoSoledadInstance(Exception):      pass -class DocumentEncryptionFailed(Exception): +class DocumentNotEncrypted(Exception):      """ -    Exception to signal the failure of document encryption. +    Exception to signal failures in document encryption.      """      pass @@ -135,6 +135,11 @@ class LeapSyncTarget(HTTPSyncTarget):                  line, comma = utils.check_and_strip_comma(entry)                  entry = json.loads(line)                  # decrypt after receiving from server. +                if not self._soledad: +                    raise NoSoledadInstance() +                if not self._soledad.is_encrypted_sym(entry['content']): +                    raise DocumentNotEncrypted( +                        "Incoming document from sync is not encrypted.")                  doc = LeapDocument(entry['id'], entry['rev'],                                     encrypted_json=entry['content'],                                     soledad=self._soledad) @@ -183,16 +188,12 @@ class LeapSyncTarget(HTTPSyncTarget):          for doc, gen, trans_id in docs_by_generations:              if doc.syncable:                  # encrypt and verify before sending to server. -                doc_content = doc.get_encrypted_json() -                if doc_content == doc.get_json(): -                    raise DocumentEncryptionFailed -                enc_doc = LeapDocument(doc.doc_id, doc.rev, -                                       encrypted_json=doc_content, -                                       soledad=self._soledad) -                if doc.get_json() != enc_doc.get_json(): -                    raise DocumentEncryptionFailed +                enc_json = doc.get_encrypted_json() +                if not self._soledad.is_encrypted_sym(enc_json): +                    raise DocumentNotEncrypted( +                        "Could not encrypt document before sync.")                  size += prepare(id=doc.doc_id, rev=doc.rev, -                                content=doc_content, +                                content=enc_json,                                  gen=gen, trans_id=trans_id)          entries.append('\r\n]')          size += len(entries[-1]) diff --git a/tests/__init__.py b/tests/__init__.py index 1a9962a7..4bdf814c 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -30,7 +30,7 @@ class BaseSoledadTest(BaseLeapTest):          self._db2 = u1db.open(self.db2_file, create=True,                                document_factory=LeapDocument)          # open a soledad instance -        self._soledad = Soledad(self.email, gpghome=self.gnupg_home, +        self._soledad = Soledad(self.email, gnupghome=self.gnupg_home,                                  initialize=False)          self._soledad._gpg.import_keys(PUBLIC_KEY)          self._soledad._gpg.import_keys(PRIVATE_KEY) diff --git a/tests/test_encrypted.py b/tests/test_encrypted.py index 18894331..4a48266e 100644 --- a/tests/test_encrypted.py +++ b/tests/test_encrypted.py @@ -1,5 +1,11 @@  from leap.soledad.backends.leap_backend import LeapDocument  from leap.soledad.tests import BaseSoledadTest +from leap.soledad.tests import KEY_FINGERPRINT + +try: +    import simplejson as json +except ImportError: +    import json  # noqa  class EncryptedSyncTestCase(BaseSoledadTest): @@ -7,9 +13,9 @@ class EncryptedSyncTestCase(BaseSoledadTest):      Tests that guarantee that data will always be encrypted when syncing.      """ -    def test_get_set_encrypted(self): +    def test_get_set_encrypted_json(self):          """ -        Test if document encryption is working. +        Test getting and setting encrypted content.          """          doc1 = LeapDocument(soledad=self._soledad)          doc1.content = {'key': 'val'} @@ -19,3 +25,15 @@ class EncryptedSyncTestCase(BaseSoledadTest):          res1 = doc1.get_json()          res2 = doc2.get_json()          self.assertEqual(res1, res2, 'incorrect document encryption') + +    def test_successful_symmetric_encryption(self): +        """ +        Test for successful symmetric encryption. +        """ +        doc1 = LeapDocument(soledad=self._soledad) +        doc1.content = {'key': 'val'} +        enc_json = json.loads(doc1.get_encrypted_json())['_encrypted_json'] +        self.assertEqual( +            True, +            self._soledad._gpg.is_encrypted_sym(enc_json), +            "could not encrypt with passphrase.") diff --git a/tests/test_leap_backend.py b/tests/test_leap_backend.py index a061533c..9056355f 100644 --- a/tests/test_leap_backend.py +++ b/tests/test_leap_backend.py @@ -134,7 +134,6 @@ class TestLeapParsingSyncStream(test_remote_sync_target.TestParsingSyncStream):          self.assertRaises(u1db.errors.BrokenSyncStream,                            tgt._parse_sync_stream, "[\r\n{},\r\n]", None) -          self.assertRaises(leap_backend.NoSoledadInstance,                            tgt._parse_sync_stream,                            '[\r\n{},\r\n{"id": "i", "rev": "r", ' @@ -5,7 +5,40 @@ Utilities for Soledad.  import os  import gnupg  import re -from gnupg import logger +from gnupg import ( +    logger, +    _is_sequence, +    _make_binary_stream, +) + + +class ListPackets(): +    """ +    Handle status messages for --list-packets. +    """ + +    def __init__(self, gpg): +        self.gpg = gpg +        self.nodata = None +        self.key = None +        self.need_passphrase = None +        self.need_passphrase_sym = None +        self.userid_hint = None + +    def handle_status(self, key, value): +        # TODO: write tests for handle_status +        if key == 'NODATA': +            self.nodata = True +        if key == 'ENC_TO': +            # This will only capture keys in our keyring. In the future we +            # may want to include multiple unknown keys in this list. +            self.key, _, _ = value.split() +        if key == 'NEED_PASSPHRASE': +            self.need_passphrase = True +        if key == 'NEED_PASSPHRASE_SYM': +            self.need_passphrase_sym = True +        if key == 'USERID_HINT': +            self.userid_hint = value.strip().split()  class GPGWrapper(gnupg.GPG): @@ -17,11 +50,17 @@ class GPGWrapper(gnupg.GPG):      GNUPG_HOME = os.environ['HOME'] + "/.config/leap/gnupg"      GNUPG_BINARY = "/usr/bin/gpg"  # this has to be changed based on OS -    def __init__(self, gpghome=GNUPG_HOME, gpgbinary=GNUPG_BINARY): -        super(GPGWrapper, self).__init__(gnupghome=gpghome, -                                         gpgbinary=gpgbinary) +    def __init__(self, gpgbinary=GNUPG_BINARY, gnupghome=GNUPG_HOME, +                 verbose=False, use_agent=False, keyring=None, options=None): +        super(GPGWrapper, self).__init__(gnupghome=gnupghome, +                                         gpgbinary=gpgbinary, +                                         verbose=verbose, +                                         use_agent=use_agent, +                                         keyring=keyring, +                                         options=options) +        self.result_map['list-packets'] = ListPackets -    def find_key(self, email): +    def find_key_by_email(self, email):          """          Find user's key based on their email.          """ @@ -29,7 +68,16 @@ class GPGWrapper(gnupg.GPG):              for uid in key['uids']:                  if re.search(email, uid):                      return key -        raise LookupError("GnuPG public key for %s not found!" % email) +        raise LookupError("GnuPG public key for email %s not found!" % email) + +    def find_key_by_subkey(self, subkey): +        for key in self.list_keys(): +            for sub in key['subkeys']: +                #print sub[0] +                if sub[0] == subkey: +                    return key +        raise LookupError( +            "GnuPG public key for subkey %s not found!" % subkey)      def encrypt(self, data, recipient, sign=None, always_trust=True,                  passphrase=None, symmetric=False): @@ -96,3 +144,35 @@ class GPGWrapper(gnupg.GPG):          self._handle_io(args, file, result, passphrase=passphrase, binary=True)          logger.debug('encrypt result: %r', result.data)          return result + +    def list_packets(self, raw_data): +        args = ["--list-packets"] +        result = self.result_map['list-packets'](self) +        self._handle_io( +            args, +            _make_binary_stream(raw_data, self.encoding), +            result, +        ) +        return result + +    def encrypted_to(self, raw_data): +        """ +        Return the key to which raw_data is encrypted to. +        """ +        # TODO: make this support multiple keys. +        result = self.list_packets(raw_data) +        if not result.encrypted_sym: +            raise LookupError( +                "Content is not encrypted to a GnuPG key!") +        return self.find_key_by_subkey(result.key) + +    def is_encrypted_sym(self, raw_data): +        result = self.list_packets(raw_data) +        return bool(result.need_passphrase_sym) + +    def is_encrypted_asym(self, raw_data): +        result = self.list_packets(raw_data) +        return bool(result.need_passphrase) + +    def is_encrypted(self, raw_data): +        self.is_encrypted_asym() or self.is_encrypted_sym()  | 
