diff options
4 files changed, 55 insertions, 58 deletions
diff --git a/service/pixelated/adapter/mailstore/maintenance/__init__.py b/service/pixelated/adapter/mailstore/maintenance/__init__.py index edc442c2..9b6d6023 100644 --- a/service/pixelated/adapter/mailstore/maintenance/__init__.py +++ b/service/pixelated/adapter/mailstore/maintenance/__init__.py @@ -13,7 +13,7 @@ # # You should have received a copy of the GNU Affero General Public License # along with Pixelated. If not, see <http://www.gnu.org/licenses/>. -from leap.keymanager.keys import KEY_TYPE_KEY, KEY_PRIVATE_KEY, KEY_ID_KEY, KEY_ADDRESS_KEY +from leap.keymanager.keys import KEY_TYPE_KEY, KEY_PRIVATE_KEY, KEY_FINGERPRINT_KEY, KEY_ADDRESS_KEY from leap.keymanager.openpgp import OpenPGPKey from twisted.internet import defer @@ -44,8 +44,8 @@ def _is_public_key(doc): return _is_key_doc(doc) and not doc.content.get(KEY_PRIVATE_KEY, False) -def _key_id(doc): - return doc.content.get(KEY_ID_KEY, None) +def _key_fingerprint(doc): + return doc.content.get(KEY_FINGERPRINT_KEY, None) def _address(doc): @@ -60,40 +60,41 @@ class SoledadMaintenance(object): def repair(self): _, docs = yield self._soledad.get_all_docs() - private_key_ids = self._key_ids_with_private_key(docs) + private_key_fingerprints = self._key_fingerprints_with_private_key(docs) for doc in docs: - if _is_key_doc(doc) and _key_id(doc) not in private_key_ids: - logger.warn('Deleting doc %s for key %s of <%s>' % (doc.doc_id, _key_id(doc), _address(doc))) + if _is_key_doc(doc) and _key_fingerprint(doc) not in private_key_fingerprints: + logger.warn('Deleting doc %s for key %s of <%s>' % (doc.doc_id, _key_fingerprint(doc), _address(doc))) yield self._soledad.delete_doc(doc) - yield self._repair_missing_active_docs(docs, private_key_ids) + yield self._repair_missing_active_docs(docs, private_key_fingerprints) @defer.inlineCallbacks - def _repair_missing_active_docs(self, docs, private_key_ids): - missing = self._missing_active_docs(docs, private_key_ids) - for key_id in missing: - emails = self._emails_for_key_id(docs, key_id) + def _repair_missing_active_docs(self, docs, private_key_fingerprints): + missing = self._missing_active_docs(docs, private_key_fingerprints) + for fingerprint in missing: + emails = self._emails_for_key_fingerprint(docs, fingerprint) for email in emails: - logger.warn('Re-creating active doc for key %s, email %s' % (key_id, email)) - yield self._soledad.create_doc_from_json(OpenPGPKey(email, key_id=key_id, private=False).get_active_json(email)) + logger.warn('Re-creating active doc for key %s, email %s' % (fingerprint, email)) + yield self._soledad.create_doc_from_json(OpenPGPKey(email, fingerprint=fingerprint, private=False).get_active_json()) - def _key_ids_with_private_key(self, docs): - return [doc.content[KEY_ID_KEY] for doc in docs if _is_private_key_doc(doc)] + def _key_fingerprints_with_private_key(self, docs): + return [doc.content[KEY_FINGERPRINT_KEY] for doc in docs if _is_private_key_doc(doc)] - def _missing_active_docs(self, docs, private_key_ids): - active_doc_ids = self._active_docs_for_key_id(docs) + def _missing_active_docs(self, docs, private_key_fingerprints): + active_doc_ids = self._active_docs_for_key_fingerprint(docs) - return set([private_key_id for private_key_id in private_key_ids if private_key_id not in active_doc_ids]) + return set([private_key_fingerprint for private_key_fingerprint in private_key_fingerprints if private_key_fingerprint not in active_doc_ids]) - def _emails_for_key_id(self, docs, key_id): + def _emails_for_key_fingerprint(self, docs, fingerprint): for doc in docs: - if _is_private_key_doc(doc) and _key_id(doc) == key_id: + if _is_private_key_doc(doc) and _key_fingerprint(doc) == fingerprint: email = _address(doc) + if email is None: + return [] if isinstance(email, list): return email - else: - return [email] + return [email] - def _active_docs_for_key_id(self, docs): - return [doc.content[KEY_ID_KEY] for doc in docs if _is_active_key_doc(doc) and _is_public_key(doc)] + def _active_docs_for_key_fingerprint(self, docs): + return [doc.content[KEY_FINGERPRINT_KEY] for doc in docs if _is_active_key_doc(doc) and _is_public_key(doc)] diff --git a/service/pixelated/resources/keys_resource.py b/service/pixelated/resources/keys_resource.py index d6f469fe..9075ab9e 100644 --- a/service/pixelated/resources/keys_resource.py +++ b/service/pixelated/resources/keys_resource.py @@ -17,7 +17,7 @@ class KeysResource(BaseResource): if key.private: respond_json_deferred(None, request, status_code=401) else: - respond_json_deferred(key.get_json(), request) + respond_json_deferred(key.get_active_json(), request) def key_not_found(_): respond_json_deferred(None, request, status_code=404) diff --git a/service/test/unit/adapter/mailstore/maintenance/test_soledad_maintenance.py b/service/test/unit/adapter/mailstore/maintenance/test_soledad_maintenance.py index e46d6864..be73af93 100644 --- a/service/test/unit/adapter/mailstore/maintenance/test_soledad_maintenance.py +++ b/service/test/unit/adapter/mailstore/maintenance/test_soledad_maintenance.py @@ -26,7 +26,7 @@ logging.getLogger('pixelated.adapter.mailstore.maintenance').addHandler(logging. SOME_EMAIL_ADDRESS = 'foo@example.tld' -SOME_KEY_ID = '4914254E384E264C' +SOME_FINGERPRINT = '4914254E384E264C' class TestSoledadMaintenance(unittest.TestCase): @@ -42,8 +42,8 @@ class TestSoledadMaintenance(unittest.TestCase): @defer.inlineCallbacks def test_repair_delete_public_key_active_docs(self): soledad = mock() - key = self._public_key(SOME_EMAIL_ADDRESS, SOME_KEY_ID) - active_doc = SoledadDocument(doc_id='some_doc', json=key.get_active_json(SOME_EMAIL_ADDRESS)) + key = self._public_key(SOME_EMAIL_ADDRESS, SOME_FINGERPRINT) + active_doc = SoledadDocument(doc_id='some_doc', json=key.get_active_json()) when(soledad).get_all_docs().thenReturn(defer.succeed((1, [active_doc]))) yield SoledadMaintenance(soledad).repair() @@ -53,8 +53,8 @@ class TestSoledadMaintenance(unittest.TestCase): @defer.inlineCallbacks def test_repair_delete_public_key_docs(self): soledad = mock() - key = self._public_key(SOME_EMAIL_ADDRESS, SOME_KEY_ID) - active_doc = SoledadDocument(doc_id='some_doc', json=key.get_active_json(SOME_EMAIL_ADDRESS)) + key = self._public_key(SOME_EMAIL_ADDRESS, SOME_FINGERPRINT) + active_doc = SoledadDocument(doc_id='some_doc', json=key.get_active_json()) key_doc = SoledadDocument(doc_id='some_doc', json=key.get_json()) when(soledad).get_all_docs().thenReturn(defer.succeed((1, [key_doc, active_doc]))) @@ -66,9 +66,9 @@ class TestSoledadMaintenance(unittest.TestCase): @defer.inlineCallbacks def test_repair_keeps_active_and_key_doc_if_private_key_exists(self): soledad = mock() - key = self._public_key(SOME_EMAIL_ADDRESS, SOME_KEY_ID) - private_key = self._private_key(SOME_EMAIL_ADDRESS, SOME_KEY_ID) - active_doc = SoledadDocument(doc_id='some_doc', json=key.get_active_json(SOME_EMAIL_ADDRESS)) + key = self._public_key(SOME_EMAIL_ADDRESS, SOME_FINGERPRINT) + private_key = self._private_key(SOME_EMAIL_ADDRESS, SOME_FINGERPRINT) + active_doc = SoledadDocument(doc_id='some_doc', json=key.get_active_json()) key_doc = SoledadDocument(doc_id='some_doc', json=key.get_json()) private_key_doc = SoledadDocument(doc_id='some_doc', json=private_key.get_json()) when(soledad).get_all_docs().thenReturn(defer.succeed((1, [key_doc, active_doc, private_key_doc]))) @@ -82,8 +82,8 @@ class TestSoledadMaintenance(unittest.TestCase): @defer.inlineCallbacks def test_repair_only_deletes_key_docs(self): soledad = mock() - key = self._public_key(SOME_EMAIL_ADDRESS, SOME_KEY_ID) - key_doc = SoledadDocument(doc_id='some_doc', json=key.get_active_json(SOME_EMAIL_ADDRESS)) + key = self._public_key(SOME_EMAIL_ADDRESS, SOME_FINGERPRINT) + key_doc = SoledadDocument(doc_id='some_doc', json=key.get_active_json()) other_doc = SoledadDocument(doc_id='something', json='{}') when(soledad).get_all_docs().thenReturn(defer.succeed((1, [key_doc, other_doc]))) @@ -95,19 +95,19 @@ class TestSoledadMaintenance(unittest.TestCase): def test_repair_recreates_public_key_active_doc_if_necessary(self): soledad = mock() - private_key = self._private_key(SOME_EMAIL_ADDRESS, SOME_KEY_ID) - private_key_doc = SoledadDocument(doc_id='some_doc', json=private_key.get_json()) + private_key = self._private_key(SOME_EMAIL_ADDRESS, SOME_FINGERPRINT) + private_key_doc = SoledadDocument(doc_id='some_doc', json=private_key.get_active_json()) when(soledad).get_all_docs().thenReturn(defer.succeed((1, [private_key_doc]))) yield SoledadMaintenance(soledad).repair() - verify(soledad).create_doc_from_json('{"key_id": "4914254E384E264C", "tags": ["keymanager-active"], "type": "OpenPGPKey-active", "private": false, "address": "foo@example.tld"}') + verify(soledad).create_doc_from_json('{"encr_used": false, "sign_used": false, "validation": "Weak_Chain", "version": 1, "address": "foo@example.tld", "last_audited_at": 0, "fingerprint": "4914254E384E264C", "type": "OpenPGPKey-active", "private": false, "tags": ["keymanager-active"]}') - def _public_key(self, address, keyid): - return self._gpgkey(address, keyid, private=False) + def _public_key(self, address, fingerprint): + return self._gpgkey(address, fingerprint, private=False) - def _private_key(self, address, keyid): - return self._gpgkey(address, keyid, private=True) + def _private_key(self, address, fingerprint): + return self._gpgkey(address, fingerprint, private=True) - def _gpgkey(self, address, keyid, private=False): - return OpenPGPKey(address, key_id=keyid, private=private) + def _gpgkey(self, address, fingerprint, private=False): + return OpenPGPKey(address, fingerprint=fingerprint, private=private) diff --git a/service/test/unit/resources/test_keys_resources.py b/service/test/unit/resources/test_keys_resources.py index 6aa822e1..2bf53cb4 100644 --- a/service/test/unit/resources/test_keys_resources.py +++ b/service/test/unit/resources/test_keys_resources.py @@ -44,20 +44,16 @@ class TestKeysResource(unittest.TestCase): d = self.web.get(request) expected = { - "tags": ["keymanager-key"], - "fingerprint": '', - "private": False, - 'sign_used': False, - 'refreshed_at': 0, - "expiry_date": 0, - "address": 'some@key', - 'encr_used': False, - 'last_audited_at': 0, - 'key_data': '', - 'length': 0, - 'key_id': '', - 'validation': 'Weak_Chain', - 'type': 'OpenPGPKey', + u'address': u'some@key', + u'encr_used': False, + u'fingerprint': u'', + u'last_audited_at': 0, + u'private': False, + u'sign_used': False, + u'tags': [u'keymanager-active'], + u'type': u'OpenPGPKey-active', + u'validation': u'Weak_Chain', + u'version': 1, } def assert_response(_): |