From cbff516a2542a9d48704774bbd2477e5a1587d6e Mon Sep 17 00:00:00 2001 From: drebs Date: Mon, 11 Mar 2013 16:54:58 -0300 Subject: Refactor and organize bootstrap sequence. --- src/leap/soledad/__init__.py | 218 ++++++++++++++++++------------- src/leap/soledad/server.py | 3 +- src/leap/soledad/shared_db.py | 4 +- src/leap/soledad/tests/__init__.py | 14 +- src/leap/soledad/tests/test_encrypted.py | 19 +-- src/leap/soledad/util.py | 17 ++- 6 files changed, 163 insertions(+), 112 deletions(-) (limited to 'src/leap') diff --git a/src/leap/soledad/__init__.py b/src/leap/soledad/__init__.py index 316eeaf7..bf625bbd 100644 --- a/src/leap/soledad/__init__.py +++ b/src/leap/soledad/__init__.py @@ -70,11 +70,23 @@ class Soledad(object): def __init__(self, user_email, prefix=None, gnupg_home=None, secret_path=None, local_db_path=None, config_file=None, shared_db_url=None, auth_token=None, - initialize=True): - """ - Bootstrap Soledad, initialize cryptographic material and open - underlying U1DB database. - """ + bootstrap=True): + """ + Initialize crypto and dbs. + + :param user_email: Email address of the user (username@provider). + :param prefix: Path to use as prefix for files. + :param gnupg_home: Home directory for gnupg. + :param secret_path: Path for storing gpg-encrypted key used for + symmetric encryption. + :param local_db_path: Path for local encrypted storage db. + :param config_file: Path for configuration file. + :param shared_db_url: URL for shared Soledad DB for key storage and + unauth retrieval. + :param auth_token: Authorization token for accessing remote databases. + :param bootstrap: True/False, should bootstrap keys? + """ + # TODO: allow for fingerprint enforcing. self._user_email = user_email self._auth_token = auth_token self._init_config( @@ -89,48 +101,44 @@ class Soledad(object): if self.shared_db_url: # TODO: eliminate need to create db here. self._shared_db = SoledadSharedDatabase.open_database( - shared_db_url, + self.shared_db_url, True, token=auth_token) - if initialize: + if bootstrap: self._bootstrap() def _bootstrap(self): """ Bootstrap local Soledad instance. - There are 3 stages for Soledad Client bootstrap: - - 1. No key material has been generated, so we need to generate and - upload to the server. + Soledad Client bootstrap is the following sequence of stages: - 2. Key material has already been generated and uploaded to the - server, but has not been downloaded to this device/installation - yet. + 1. Key loading/generation: guarantees key is either loaded from + server or generated locally. + 2. Key copy verification: guarantees our copy of keys is identical + to server's copy. - 3. Key material has already been generated and uploaded, and is - also stored locally, so we just need to load it from disk. - - This method decides which bootstrap stage has to be performed and - performs it. + This method decides which bootstrap stages have already been performed + and performs the missing ones in order. """ # TODO: make sure key storage always happens (even if this method is # interrupted). # TODO: write tests for bootstrap stages. self._init_dirs() self._gpg = GPGWrapper(gnupghome=self.gnupg_home) - if not self._has_keys(): - try: - # stage 2 bootstrap - self._retrieve_keys() - except Exception: - # stage 1 bootstrap + # bootstrap stage 1 + if self._has_keys(): + self._load_keys() + else: + doc = self._get_keys_doc() + if not doc: self._init_keys() - # TODO: change key below - self._send_keys(self._secret) - # stage 3 bootstrap - self._load_keys() - self._send_keys(self._secret) + else: + self._set_privkey(self.decrypt(doc.content['_privkey'], + passphrase=self._user_hash())) + self._set_symkey(self.decrypt(doc.content['_symkey'])) + # bootstrap stage 2 + self._assert_server_keys() self._init_db() def _init_config(self, param_conf): @@ -152,7 +160,7 @@ class Soledad(object): config = configparser.ConfigParser() config.read(self.config_file) if 'soledad-client' in config: - for key in default_conf: + for key in self.DEFAULT_CONF: if key in config['soledad-client'] and not param_conf[key]: setattr(self, key, config['soledad-client'][key]) @@ -170,13 +178,13 @@ class Soledad(object): """ # TODO: write tests for methods below. # load/generate OpenPGP keypair - if not self._has_openpgp_keypair(): - self._gen_openpgp_keypair() - self._load_openpgp_keypair() + if not self._has_privkey(): + self._gen_privkey() + self._load_privkey() # load/generate secret - if not self._has_secret(): - self._gen_secret() - self._load_secret() + if not self._has_symkey(): + self._gen_symkey() + self._load_symkey() def _init_db(self): """ @@ -187,7 +195,7 @@ class Soledad(object): # one for symmetric encryption. self._db = sqlcipher.open( self.local_db_path, - self._secret, + self._symkey, create=True, document_factory=LeapDocument, soledad=self) @@ -205,7 +213,7 @@ class Soledad(object): # TODO: refactor the following methods to somewhere out of here # (SoledadCrypto, maybe?) - def _has_secret(self): + def _has_symkey(self): """ Verify if secret for symmetric encryption exists in a local encrypted file. @@ -227,36 +235,39 @@ class Soledad(object): "which we don't have." % fp) return True - def _load_secret(self): + def _load_symkey(self): """ Load secret for symmetric encryption from local encrypted file. """ - if not self._has_secret(): + if not self._has_symkey(): raise KeyDoesNotExist("Tried to load key for symmetric " "encryption but it does not exist on disk.") try: with open(self.secret_path) as f: - self._secret = str(self._gpg.decrypt(f.read())) + self._symkey = str(self._gpg.decrypt(f.read())) except IOError: raise IOError('Failed to open secret file %s.' % self.secret_path) - def _gen_secret(self): + def _gen_symkey(self): """ Generate a secret for symmetric encryption and store in a local encrypted file. """ - if self._has_secret(): - raise KeyAlreadyExists("Tried to generate secret for symmetric " - "encryption but it already exists on " - "disk.") - self._secret = ''.join( + self._set_symkey(''.join( random.choice( string.ascii_letters + - string.digits) for x in range(self.SECRET_LENGTH)) - self._store_secret() - - def _store_secret(self): - ciphertext = self._gpg.encrypt(self._secret, self._fingerprint, + string.digits) for x in range(self.SECRET_LENGTH))) + + def _set_symkey(self, symkey): + if self._has_symkey(): + raise KeyAlreadyExists("Tried to set the value of the key for " + "symmetric encryption but it already " + "exists on disk.") + self._symkey = symkey + self._store_symkey() + + def _store_symkey(self): + ciphertext = self._gpg.encrypt(self._symkey, self._fingerprint, self._fingerprint) f = open(self.secret_path, 'w') f.write(str(ciphertext)) @@ -266,21 +277,21 @@ class Soledad(object): # Management of OpenPGP keypair #------------------------------------------------------------------------- - def _has_openpgp_keypair(self): + def _has_privkey(self): """ Verify if there exists an OpenPGP keypair for this user. """ try: - self._load_openpgp_keypair() + self._load_privkey() return True except: return False - def _gen_openpgp_keypair(self): + def _gen_privkey(self): """ Generate an OpenPGP keypair for this user. """ - if self._has_openpgp_keypair(): + if self._has_privkey(): raise KeyAlreadyExists("Tried to generate OpenPGP keypair but it " "already exists on disk.") params = self._gpg.gen_key_input( @@ -289,17 +300,30 @@ class Soledad(object): name_real=self._user_email, name_email=self._user_email, name_comment='Generated by LEAP Soledad.') - self._gpg.gen_key(params) + fingerprint = self._gpg.gen_key(params).fingerprint + return self._load_privkey(fingerprint) + + def _set_privkey(self, raw_data): + if self._has_privkey(): + raise KeyAlreadyExists("Tried to generate OpenPGP keypair but it " + "already exists on disk.") + fingerprint = self._gpg.import_keys(raw_data).fingerprints[0] + return self._load_privkey(fingerprint) - def _load_openpgp_keypair(self): + def _load_privkey(self, fingerprint=None): """ Find fingerprint for this user's OpenPGP keypair. """ - # TODO: verify if we have the corresponding private key. + # TODO: guarantee encrypted storage of private keys. try: - self._fingerprint = self._gpg.find_key_by_email( - self._user_email, - secret=True)['fingerprint'] + if fingerprint: + self._fingerprint = self._gpg.find_key_by_fingerprint( + fingerprint, + secret=True)['fingerprint'] + else: + self._fingerprint = self._gpg.find_key_by_email( + self._user_email, + secret=True)['fingerprint'] return self._fingerprint except LookupError: raise KeyDoesNotExist("Tried to load OpenPGP keypair but it does " @@ -317,36 +341,52 @@ class Soledad(object): #------------------------------------------------------------------------- def _has_keys(self): - return self._has_openpgp_keypair() and self._has_secret() + return self._has_privkey() and self._has_symkey() def _load_keys(self): - self._load_openpgp_keypair() - self._load_secret() + self._load_privkey() + self._load_symkey() def _gen_keys(self): - self._gen_openpgp_keypair() - self._gen_secret() + self._gen_privkey() + self._gen_symkey() def _user_hash(self): return hmac.new(self._user_email, 'user').hexdigest() - def _retrieve_keys(self): + def _get_keys_doc(self): return self._shared_db.get_doc_unauth(self._user_hash()) - # TODO: create corresponding error on server side - - def _send_keys(self, passphrase): - # TODO: change this method's name to something more meaningful. - privkey = self._gpg.export_keys(self._fingerprint, secret=True) - content = { - '_privkey': self.encrypt(privkey, passphrase=passphrase, - symmetric=True), - '_symkey': self.encrypt(self._secret), - } - doc = self._retrieve_keys() - if not doc: + + def _assert_server_keys(self): + """ + Assert our key copies are the same as server's ones. + """ + assert self._has_keys() + doc = self._get_keys_doc() + if doc: + remote_privkey = self.decrypt(doc.content['_privkey'], + # TODO: change passphrase. + passphrase=self._user_hash()) + remote_symkey = self.decrypt(doc.content['_symkey']) + result = self._gpg.import_keys(remote_privkey) + # TODO: is the following behaviour expected in any scenario? + assert result.fingerprints[0] == self._fingerprint + assert remote_symkey == self._symkey + else: + privkey = self._gpg.export_keys(self._fingerprint, secret=True) + content = { + '_privkey': self.encrypt(privkey, + # TODO: change passphrase + passphrase=self._user_hash(), + symmetric=True), + '_symkey': self.encrypt(self._symkey), + } doc = LeapDocument(doc_id=self._user_hash(), soledad=self) - doc.content = content - self._shared_db.put_doc(doc) + doc.content = content + self._shared_db.put_doc(doc) + + def _assert_remote_keys(self): + privkey, symkey = self._retrieve_keys() #------------------------------------------------------------------------- # Data encryption and decryption @@ -368,7 +408,7 @@ class Soledad(object): passphrase=self._hmac_passphrase(doc_id), symmetric=True) - def decrypt(self, data, passphrase=None, symmetric=False): + def decrypt(self, data, passphrase=None): """ Decrypt data. """ @@ -381,7 +421,7 @@ class Soledad(object): return self.decrypt(data, passphrase=self._hmac_passphrase(doc_id)) def _hmac_passphrase(self, doc_id): - return hmac.new(self._secret, doc_id).hexdigest() + return hmac.new(self._symkey, doc_id).hexdigest() def is_encrypted(self, data): return self._gpg.is_encrypted(data) @@ -478,7 +518,7 @@ class Soledad(object): data = json.dumps({ 'user_email': self._user_email, 'privkey': self._gpg.export_keys(self._fingerprint, secret=True), - 'secret': self._secret, + 'symkey': self._symkey, }) if passphrase: data = str(self._gpg.encrypt(data, None, sign=None, @@ -498,9 +538,9 @@ class Soledad(object): data = json.loads(data) self._user_email = data['user_email'] self._gpg.import_keys(data['privkey']) - self._load_openpgp_keypair() - self._secret = data['secret'] - self._store_secret() + self._load_privkey() + self._symkey = data['symkey'] + self._store_symkey() # TODO: make this work well with bootstrap. self._load_keys() diff --git a/src/leap/soledad/server.py b/src/leap/soledad/server.py index eaa5e964..159f4768 100644 --- a/src/leap/soledad/server.py +++ b/src/leap/soledad/server.py @@ -86,7 +86,8 @@ class SoledadAuthMiddleware(object): Verify if token is valid for authenticating this action. """ # TODO: implement token verification - raise NotImplementedError(self.verify_token) + return True + #raise NotImplementedError(self.verify_token) def need_auth(self, environ): """ diff --git a/src/leap/soledad/shared_db.py b/src/leap/soledad/shared_db.py index c27bba71..27018701 100644 --- a/src/leap/soledad/shared_db.py +++ b/src/leap/soledad/shared_db.py @@ -99,6 +99,6 @@ class SoledadSharedDatabase(http_database.HTTPDatabase): """ Modified method to allow for unauth request. """ - db = http_database.HTTPDatabase(self._url, factory=self._factory, - creds=self._creds) + db = http_database.HTTPDatabase(self._url.geturl(), + document_factory=self._factory) return db.get_doc(doc_id) diff --git a/src/leap/soledad/tests/__init__.py b/src/leap/soledad/tests/__init__.py index fdde8c78..1ed2e248 100644 --- a/src/leap/soledad/tests/__init__.py +++ b/src/leap/soledad/tests/__init__.py @@ -32,16 +32,16 @@ class BaseSoledadTest(BaseLeapTest): document_factory=LeapDocument) # initialize soledad by hand so we can control keys self._soledad = Soledad(self.email, gnupg_home=self.gnupg_home, - initialize=False, + bootstrap=False, prefix=self.tempdir) self._soledad._init_dirs() self._soledad._gpg = GPGWrapper(gnupghome=self.gnupg_home) - self._soledad._gpg.import_keys(PUBLIC_KEY) - self._soledad._gpg.import_keys(PRIVATE_KEY) - self._soledad._load_openpgp_keypair() - if not self._soledad._has_secret(): - self._soledad._gen_secret() - self._soledad._load_secret() + #self._soledad._gpg.import_keys(PUBLIC_KEY) + if not self._soledad._has_privkey(): + self._soledad._set_privkey(PRIVATE_KEY) + if not self._soledad._has_symkey(): + self._soledad._gen_symkey() + self._soledad._load_symkey() self._soledad._init_db() def tearDown(self): diff --git a/src/leap/soledad/tests/test_encrypted.py b/src/leap/soledad/tests/test_encrypted.py index eb78fbe3..b4821cf4 100644 --- a/src/leap/soledad/tests/test_encrypted.py +++ b/src/leap/soledad/tests/test_encrypted.py @@ -43,6 +43,9 @@ class EncryptedSyncTestCase(BaseSoledadTest): self._soledad._gpg.is_encrypted_sym(enc_json), "could not encrypt with passphrase.") + +class RecoveryDocumentTestCase(BaseSoledadTest): + def test_export_recovery_document_raw(self): rd = self._soledad.export_recovery_document(None) self.assertEqual( @@ -51,7 +54,7 @@ class EncryptedSyncTestCase(BaseSoledadTest): 'privkey': self._soledad._gpg.export_keys( self._soledad._fingerprint, secret=True), - 'secret': self._soledad._secret + 'symkey': self._soledad._symkey }, json.loads(rd), "Could not export raw recovery document." @@ -66,7 +69,7 @@ class EncryptedSyncTestCase(BaseSoledadTest): 'privkey': self._soledad._gpg.export_keys( self._soledad._fingerprint, secret=True), - 'secret': self._soledad._secret, + 'symkey': self._soledad._symkey, } raw_data = json.loads(str(self._soledad._gpg.decrypt( rd, @@ -86,14 +89,14 @@ class EncryptedSyncTestCase(BaseSoledadTest): rd = self._soledad.export_recovery_document(None) gnupg_home = self.gnupg_home = "%s/gnupg2" % self.tempdir s = Soledad('anotheruser@leap.se', gnupg_home=gnupg_home, - initialize=False, prefix=self.tempdir) + bootstrap=False, prefix=self.tempdir) s._init_dirs() s._gpg = GPGWrapper(gnupghome=gnupg_home) s.import_recovery_document(rd, None) self.assertEqual(self._soledad._user_email, s._user_email, 'Failed setting user email.') - self.assertEqual(self._soledad._secret, - s._secret, + self.assertEqual(self._soledad._symkey, + s._symkey, 'Failed settinng secret for symmetric encryption.') self.assertEqual(self._soledad._fingerprint, s._fingerprint, @@ -112,14 +115,14 @@ class EncryptedSyncTestCase(BaseSoledadTest): rd = self._soledad.export_recovery_document('123456') gnupg_home = self.gnupg_home = "%s/gnupg2" % self.tempdir s = Soledad('anotheruser@leap.se', gnupg_home=gnupg_home, - initialize=False, prefix=self.tempdir) + bootstrap=False, prefix=self.tempdir) s._init_dirs() s._gpg = GPGWrapper(gnupghome=gnupg_home) s.import_recovery_document(rd, '123456') self.assertEqual(self._soledad._user_email, s._user_email, 'Failed setting user email.') - self.assertEqual(self._soledad._secret, - s._secret, + self.assertEqual(self._soledad._symkey, + s._symkey, 'Failed settinng secret for symmetric encryption.') self.assertEqual(self._soledad._fingerprint, s._fingerprint, diff --git a/src/leap/soledad/util.py b/src/leap/soledad/util.py index c64d4c5f..b8ee4cf3 100644 --- a/src/leap/soledad/util.py +++ b/src/leap/soledad/util.py @@ -70,20 +70,27 @@ class GPGWrapper(gnupg.GPG): return key raise LookupError("GnuPG public key for email %s not found!" % email) - def find_key_by_subkey(self, subkey): - for key in self.list_keys(): + def find_key_by_subkey(self, subkey, secret=False): + for key in self.list_keys(secret=secret): for sub in key['subkeys']: if sub[0] == subkey: return key raise LookupError( "GnuPG public key for subkey %s not found!" % subkey) - def find_key_by_keyid(self, keyid): - for key in self.list_keys(): + def find_key_by_keyid(self, keyid, secret=False): + for key in self.list_keys(secret=secret): if keyid == key['keyid']: return key raise LookupError( - "GnuPG public key for subkey %s not found!" % subkey) + "GnuPG public key for keyid %s not found!" % keyid) + + def find_key_by_fingerprint(self, fingerprint, secret=False): + for key in self.list_keys(secret=secret): + if fingerprint == key['fingerprint']: + return key + raise LookupError( + "GnuPG public key for fingerprint %s not found!" % fingerprint) def encrypt(self, data, recipient, sign=None, always_trust=True, passphrase=None, symmetric=False): -- cgit v1.2.3 From 1b7573280c5f6a8fc95170013cb7b29845e99a9e Mon Sep 17 00:00:00 2001 From: drebs Date: Tue, 12 Mar 2013 14:04:45 -0300 Subject: Send db init to bootstrap method. --- src/leap/soledad/__init__.py | 35 +++++++++++++++++++++++------------ 1 file changed, 23 insertions(+), 12 deletions(-) (limited to 'src/leap') diff --git a/src/leap/soledad/__init__.py b/src/leap/soledad/__init__.py index bf625bbd..f79e1c31 100644 --- a/src/leap/soledad/__init__.py +++ b/src/leap/soledad/__init__.py @@ -98,12 +98,6 @@ class Soledad(object): 'shared_db_url': shared_db_url, } ) - if self.shared_db_url: - # TODO: eliminate need to create db here. - self._shared_db = SoledadSharedDatabase.open_database( - self.shared_db_url, - True, - token=auth_token) if bootstrap: self._bootstrap() @@ -113,10 +107,18 @@ class Soledad(object): Soledad Client bootstrap is the following sequence of stages: - 1. Key loading/generation: guarantees key is either loaded from - server or generated locally. - 2. Key copy verification: guarantees our copy of keys is identical - to server's copy. + Stage 0 - Local environment setup. + - directory initialization. + - gnupg wrapper initialization. + Stage 1 - Keys generation/loading: + - if keys exists locally, load them. + - else, if keys exists in server, download them. + - else, generate keys. + Stage 2 - Keys synchronization: + - if keys exist in server, confirm we have the same keys + locally. + - else, send keys to server. + Stage 3 - Database initialization. This method decides which bootstrap stages have already been performed and performs the missing ones in order. @@ -124,9 +126,11 @@ class Soledad(object): # TODO: make sure key storage always happens (even if this method is # interrupted). # TODO: write tests for bootstrap stages. + # TODO: log each bootstrap step. + # Stage 0 - Local environment setup self._init_dirs() self._gpg = GPGWrapper(gnupghome=self.gnupg_home) - # bootstrap stage 1 + # Stage 1 - Keys generation/loading if self._has_keys(): self._load_keys() else: @@ -137,9 +141,16 @@ class Soledad(object): self._set_privkey(self.decrypt(doc.content['_privkey'], passphrase=self._user_hash())) self._set_symkey(self.decrypt(doc.content['_symkey'])) - # bootstrap stage 2 + # Stage 2 - Keys synchronization self._assert_server_keys() + # Stage 3 -Database initialization self._init_db() + if self.shared_db_url: + # TODO: eliminate need to create db here. + self._shared_db = SoledadSharedDatabase.open_database( + self.shared_db_url, + True, + token=auth_token) def _init_config(self, param_conf): """ -- cgit v1.2.3 From 708803f3a8b40263ccb3061e320010e35f218474 Mon Sep 17 00:00:00 2001 From: drebs Date: Tue, 12 Mar 2013 16:14:02 -0300 Subject: Add test for _init_dirs(). --- src/leap/soledad/tests/test_crypto.py | 161 +++++++++++++++++++++++++++++++ src/leap/soledad/tests/test_encrypted.py | 138 -------------------------- 2 files changed, 161 insertions(+), 138 deletions(-) create mode 100644 src/leap/soledad/tests/test_crypto.py delete mode 100644 src/leap/soledad/tests/test_encrypted.py (limited to 'src/leap') diff --git a/src/leap/soledad/tests/test_crypto.py b/src/leap/soledad/tests/test_crypto.py new file mode 100644 index 00000000..ca7502af --- /dev/null +++ b/src/leap/soledad/tests/test_crypto.py @@ -0,0 +1,161 @@ +import os +from leap.testing.basetest import BaseLeapTest +from leap.soledad.backends.leap_backend import LeapDocument +from leap.soledad.tests import BaseSoledadTest +from leap.soledad.tests import ( + KEY_FINGERPRINT, + PRIVATE_KEY, +) +from leap.soledad import ( + Soledad, + KeyAlreadyExists, +) +from leap.soledad.util import GPGWrapper + +try: + import simplejson as json +except ImportError: + import json # noqa + + +class EncryptedSyncTestCase(BaseSoledadTest): + """ + Tests that guarantee that data will always be encrypted when syncing. + """ + + def test_get_set_encrypted_json(self): + """ + Test getting and setting encrypted content. + """ + doc1 = LeapDocument(soledad=self._soledad) + doc1.content = {'key': 'val'} + doc2 = LeapDocument(doc_id=doc1.doc_id, + encrypted_json=doc1.get_encrypted_json(), + soledad=self._soledad) + res1 = doc1.get_json() + res2 = doc2.get_json() + self.assertEqual(res1, res2, 'incorrect document encryption') + + def test_successful_symmetric_encryption(self): + """ + Test for successful symmetric encryption. + """ + doc1 = LeapDocument(soledad=self._soledad) + doc1.content = {'key': 'val'} + enc_json = json.loads(doc1.get_encrypted_json())['_encrypted_json'] + self.assertEqual( + True, + self._soledad._gpg.is_encrypted_sym(enc_json), + "could not encrypt with passphrase.") + + +class RecoveryDocumentTestCase(BaseSoledadTest): + + def test_export_recovery_document_raw(self): + rd = self._soledad.export_recovery_document(None) + self.assertEqual( + { + 'user_email': self._soledad._user_email, + 'privkey': self._soledad._gpg.export_keys( + self._soledad._fingerprint, + secret=True), + 'symkey': self._soledad._symkey + }, + json.loads(rd), + "Could not export raw recovery document." + ) + + def test_export_recovery_document_crypt(self): + rd = self._soledad.export_recovery_document('123456') + self.assertEqual(True, + self._soledad._gpg.is_encrypted_sym(rd)) + data = { + 'user_email': self._soledad._user_email, + 'privkey': self._soledad._gpg.export_keys( + self._soledad._fingerprint, + secret=True), + 'symkey': self._soledad._symkey, + } + raw_data = json.loads(str(self._soledad._gpg.decrypt( + rd, + passphrase='123456'))) + self.assertEqual( + raw_data, + data, + "Could not export raw recovery document." + ) + + def test_import_recovery_document_raises_exception(self): + rd = self._soledad.export_recovery_document(None) + self.assertRaises(KeyAlreadyExists, + self._soledad.import_recovery_document, rd, None) + + def test_import_recovery_document_raw(self): + rd = self._soledad.export_recovery_document(None) + gnupg_home = self.gnupg_home = "%s/gnupg2" % self.tempdir + s = Soledad('anotheruser@leap.se', gnupg_home=gnupg_home, + bootstrap=False, prefix=self.tempdir) + s._init_dirs() + s._gpg = GPGWrapper(gnupghome=gnupg_home) + s.import_recovery_document(rd, None) + self.assertEqual(self._soledad._user_email, + s._user_email, 'Failed setting user email.') + self.assertEqual(self._soledad._symkey, + s._symkey, + 'Failed settinng secret for symmetric encryption.') + self.assertEqual(self._soledad._fingerprint, + s._fingerprint, + 'Failed settinng fingerprint.') + pk1 = self._soledad._gpg.export_keys( + self._soledad._fingerprint, + secret=True) + pk2 = s._gpg.export_keys(s._fingerprint, secret=True) + self.assertEqual( + pk1, + pk2, + 'Failed settinng private key.' + ) + + def test_import_recovery_document_crypt(self): + rd = self._soledad.export_recovery_document('123456') + gnupg_home = self.gnupg_home = "%s/gnupg2" % self.tempdir + s = Soledad('anotheruser@leap.se', gnupg_home=gnupg_home, + bootstrap=False, prefix=self.tempdir) + s._init_dirs() + s._gpg = GPGWrapper(gnupghome=gnupg_home) + s.import_recovery_document(rd, '123456') + self.assertEqual(self._soledad._user_email, + s._user_email, 'Failed setting user email.') + self.assertEqual(self._soledad._symkey, + s._symkey, + 'Failed settinng secret for symmetric encryption.') + self.assertEqual(self._soledad._fingerprint, + s._fingerprint, + 'Failed settinng fingerprint.') + pk1 = self._soledad._gpg.export_keys( + self._soledad._fingerprint, + secret=True) + pk2 = s._gpg.export_keys(s._fingerprint, secret=True) + self.assertEqual( + pk1, + pk2, + 'Failed settinng private key.' + ) + + +class SoledadAuxMethods(BaseLeapTest): + + def setUp(self): + pass + + def tearDown(self): + pass + + def _soledad_instance(self): + return Soledad('leap@leap.se', bootstrap=False, + prefix=self.tempdir+'/soledad') + + def test__init_dirs(self): + sol = self._soledad_instance() + sol._init_dirs() + self.assertTrue(os.path.isdir(sol.prefix)) diff --git a/src/leap/soledad/tests/test_encrypted.py b/src/leap/soledad/tests/test_encrypted.py deleted file mode 100644 index b4821cf4..00000000 --- a/src/leap/soledad/tests/test_encrypted.py +++ /dev/null @@ -1,138 +0,0 @@ -from leap.soledad.backends.leap_backend import LeapDocument -from leap.soledad.tests import BaseSoledadTest -from leap.soledad.tests import KEY_FINGERPRINT -from leap.soledad import ( - Soledad, - KeyAlreadyExists, -) -from leap.soledad.util import GPGWrapper - -try: - import simplejson as json -except ImportError: - import json # noqa - - -class EncryptedSyncTestCase(BaseSoledadTest): - """ - Tests that guarantee that data will always be encrypted when syncing. - """ - - def test_get_set_encrypted_json(self): - """ - Test getting and setting encrypted content. - """ - doc1 = LeapDocument(soledad=self._soledad) - doc1.content = {'key': 'val'} - doc2 = LeapDocument(doc_id=doc1.doc_id, - encrypted_json=doc1.get_encrypted_json(), - soledad=self._soledad) - res1 = doc1.get_json() - res2 = doc2.get_json() - self.assertEqual(res1, res2, 'incorrect document encryption') - - def test_successful_symmetric_encryption(self): - """ - Test for successful symmetric encryption. - """ - doc1 = LeapDocument(soledad=self._soledad) - doc1.content = {'key': 'val'} - enc_json = json.loads(doc1.get_encrypted_json())['_encrypted_json'] - self.assertEqual( - True, - self._soledad._gpg.is_encrypted_sym(enc_json), - "could not encrypt with passphrase.") - - -class RecoveryDocumentTestCase(BaseSoledadTest): - - def test_export_recovery_document_raw(self): - rd = self._soledad.export_recovery_document(None) - self.assertEqual( - { - 'user_email': self._soledad._user_email, - 'privkey': self._soledad._gpg.export_keys( - self._soledad._fingerprint, - secret=True), - 'symkey': self._soledad._symkey - }, - json.loads(rd), - "Could not export raw recovery document." - ) - - def test_export_recovery_document_crypt(self): - rd = self._soledad.export_recovery_document('123456') - self.assertEqual(True, - self._soledad._gpg.is_encrypted_sym(rd)) - data = { - 'user_email': self._soledad._user_email, - 'privkey': self._soledad._gpg.export_keys( - self._soledad._fingerprint, - secret=True), - 'symkey': self._soledad._symkey, - } - raw_data = json.loads(str(self._soledad._gpg.decrypt( - rd, - passphrase='123456'))) - self.assertEqual( - raw_data, - data, - "Could not export raw recovery document." - ) - - def test_import_recovery_document_raises_exception(self): - rd = self._soledad.export_recovery_document(None) - self.assertRaises(KeyAlreadyExists, - self._soledad.import_recovery_document, rd, None) - - def test_import_recovery_document_raw(self): - rd = self._soledad.export_recovery_document(None) - gnupg_home = self.gnupg_home = "%s/gnupg2" % self.tempdir - s = Soledad('anotheruser@leap.se', gnupg_home=gnupg_home, - bootstrap=False, prefix=self.tempdir) - s._init_dirs() - s._gpg = GPGWrapper(gnupghome=gnupg_home) - s.import_recovery_document(rd, None) - self.assertEqual(self._soledad._user_email, - s._user_email, 'Failed setting user email.') - self.assertEqual(self._soledad._symkey, - s._symkey, - 'Failed settinng secret for symmetric encryption.') - self.assertEqual(self._soledad._fingerprint, - s._fingerprint, - 'Failed settinng fingerprint.') - pk1 = self._soledad._gpg.export_keys( - self._soledad._fingerprint, - secret=True) - pk2 = s._gpg.export_keys(s._fingerprint, secret=True) - self.assertEqual( - pk1, - pk2, - 'Failed settinng private key.' - ) - - def test_import_recovery_document_crypt(self): - rd = self._soledad.export_recovery_document('123456') - gnupg_home = self.gnupg_home = "%s/gnupg2" % self.tempdir - s = Soledad('anotheruser@leap.se', gnupg_home=gnupg_home, - bootstrap=False, prefix=self.tempdir) - s._init_dirs() - s._gpg = GPGWrapper(gnupghome=gnupg_home) - s.import_recovery_document(rd, '123456') - self.assertEqual(self._soledad._user_email, - s._user_email, 'Failed setting user email.') - self.assertEqual(self._soledad._symkey, - s._symkey, - 'Failed settinng secret for symmetric encryption.') - self.assertEqual(self._soledad._fingerprint, - s._fingerprint, - 'Failed settinng fingerprint.') - pk1 = self._soledad._gpg.export_keys( - self._soledad._fingerprint, - secret=True) - pk2 = s._gpg.export_keys(s._fingerprint, secret=True) - self.assertEqual( - pk1, - pk2, - 'Failed settinng private key.' - ) -- cgit v1.2.3 From 5dcb6625991a6cd645037ea751d7e51e0d0ffba2 Mon Sep 17 00:00:00 2001 From: drebs Date: Tue, 12 Mar 2013 16:14:17 -0300 Subject: Add test for _init_db(). --- src/leap/soledad/tests/test_crypto.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) (limited to 'src/leap') diff --git a/src/leap/soledad/tests/test_crypto.py b/src/leap/soledad/tests/test_crypto.py index ca7502af..04ca9e42 100644 --- a/src/leap/soledad/tests/test_crypto.py +++ b/src/leap/soledad/tests/test_crypto.py @@ -159,3 +159,17 @@ class SoledadAuxMethods(BaseLeapTest): sol = self._soledad_instance() sol._init_dirs() self.assertTrue(os.path.isdir(sol.prefix)) + + def test__init_db(self): + sol = self._soledad_instance() + sol._init_dirs() + sol._gpg = GPGWrapper(gnupghome="%s/gnupg" % self.tempdir) + #self._soledad._gpg.import_keys(PUBLIC_KEY) + if not sol._has_privkey(): + sol._set_privkey(PRIVATE_KEY) + if not sol._has_symkey(): + sol._gen_symkey() + sol._load_symkey() + sol._init_db() + from leap.soledad.backends.sqlcipher import SQLCipherDatabase + self.assertIsInstance(sol._db, SQLCipherDatabase) -- cgit v1.2.3 From 167b737235b28036a64700933c22cb1c0842fa85 Mon Sep 17 00:00:00 2001 From: drebs Date: Tue, 12 Mar 2013 16:16:30 -0300 Subject: Add test for _has_symkey(). --- src/leap/soledad/tests/test_crypto.py | 11 +++++++++++ 1 file changed, 11 insertions(+) (limited to 'src/leap') diff --git a/src/leap/soledad/tests/test_crypto.py b/src/leap/soledad/tests/test_crypto.py index 04ca9e42..6be05243 100644 --- a/src/leap/soledad/tests/test_crypto.py +++ b/src/leap/soledad/tests/test_crypto.py @@ -173,3 +173,14 @@ class SoledadAuxMethods(BaseLeapTest): sol._init_db() from leap.soledad.backends.sqlcipher import SQLCipherDatabase self.assertIsInstance(sol._db, SQLCipherDatabase) + + def test__has_symkey(self): + sol = self._soledad_instance() + sol._init_dirs() + sol._gpg = GPGWrapper(gnupghome="%s/gnupg" % self.tempdir) + #self._soledad._gpg.import_keys(PUBLIC_KEY) + if not sol._has_privkey(): + sol._set_privkey(PRIVATE_KEY) + self.assertFalse(sol._has_symkey()) + sol._gen_symkey() + self.assertTrue(sol._has_symkey()) -- cgit v1.2.3 From 779fec037c012de6cfe77dd5cd6211feecd2e894 Mon Sep 17 00:00:00 2001 From: drebs Date: Tue, 12 Mar 2013 16:17:37 -0300 Subject: Add test for _has_privkey(). --- src/leap/soledad/tests/test_crypto.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) (limited to 'src/leap') diff --git a/src/leap/soledad/tests/test_crypto.py b/src/leap/soledad/tests/test_crypto.py index 6be05243..4936b56c 100644 --- a/src/leap/soledad/tests/test_crypto.py +++ b/src/leap/soledad/tests/test_crypto.py @@ -174,11 +174,18 @@ class SoledadAuxMethods(BaseLeapTest): from leap.soledad.backends.sqlcipher import SQLCipherDatabase self.assertIsInstance(sol._db, SQLCipherDatabase) + def test__has_privkey(self): + sol = self._soledad_instance() + sol._init_dirs() + sol._gpg = GPGWrapper(gnupghome="%s/gnupg" % self.tempdir) + self.assertFalse(sol._has_privkey()) + sol._set_privkey(PRIVATE_KEY) + self.assertTrue(sol._has_privkey()) + def test__has_symkey(self): sol = self._soledad_instance() sol._init_dirs() sol._gpg = GPGWrapper(gnupghome="%s/gnupg" % self.tempdir) - #self._soledad._gpg.import_keys(PUBLIC_KEY) if not sol._has_privkey(): sol._set_privkey(PRIVATE_KEY) self.assertFalse(sol._has_symkey()) -- cgit v1.2.3 From 0b0384c4985210ba2763dc31de98afa59e3936e4 Mon Sep 17 00:00:00 2001 From: drebs Date: Tue, 12 Mar 2013 16:30:21 -0300 Subject: Add test for _has_keys(). --- src/leap/soledad/tests/test_crypto.py | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) (limited to 'src/leap') diff --git a/src/leap/soledad/tests/test_crypto.py b/src/leap/soledad/tests/test_crypto.py index 4936b56c..52cc0315 100644 --- a/src/leap/soledad/tests/test_crypto.py +++ b/src/leap/soledad/tests/test_crypto.py @@ -154,6 +154,8 @@ class SoledadAuxMethods(BaseLeapTest): def _soledad_instance(self): return Soledad('leap@leap.se', bootstrap=False, prefix=self.tempdir+'/soledad') + def _gpgwrapper_instance(self): + return GPGWrapper(gnupghome="%s/gnupg" % self.tempdir) def test__init_dirs(self): sol = self._soledad_instance() @@ -163,7 +165,7 @@ class SoledadAuxMethods(BaseLeapTest): def test__init_db(self): sol = self._soledad_instance() sol._init_dirs() - sol._gpg = GPGWrapper(gnupghome="%s/gnupg" % self.tempdir) + sol._gpg = self._gpgwrapper_instance() #self._soledad._gpg.import_keys(PUBLIC_KEY) if not sol._has_privkey(): sol._set_privkey(PRIVATE_KEY) @@ -177,17 +179,29 @@ class SoledadAuxMethods(BaseLeapTest): def test__has_privkey(self): sol = self._soledad_instance() sol._init_dirs() - sol._gpg = GPGWrapper(gnupghome="%s/gnupg" % self.tempdir) + sol._gpg = GPGWrapper(gnupghome="%s/gnupg2" % self.tempdir) self.assertFalse(sol._has_privkey()) sol._set_privkey(PRIVATE_KEY) self.assertTrue(sol._has_privkey()) def test__has_symkey(self): - sol = self._soledad_instance() + sol = Soledad('leap@leap.se', bootstrap=False, + prefix=self.tempdir+'/soledad3') sol._init_dirs() - sol._gpg = GPGWrapper(gnupghome="%s/gnupg" % self.tempdir) + sol._gpg = GPGWrapper(gnupghome="%s/gnupg3" % self.tempdir) if not sol._has_privkey(): sol._set_privkey(PRIVATE_KEY) self.assertFalse(sol._has_symkey()) sol._gen_symkey() self.assertTrue(sol._has_symkey()) + + def test__has_keys(self): + sol = self._soledad_instance() + sol._init_dirs() + sol._gpg = self._gpgwrapper_instance() + self.assertFalse(sol._has_keys()) + sol._set_privkey(PRIVATE_KEY) + self.assertFalse(sol._has_keys()) + sol._gen_symkey() + self.assertTrue(sol._has_keys()) + -- cgit v1.2.3