diff options
| -rw-r--r-- | keymanager/changes/next-changelog.txt | 1 | ||||
| -rw-r--r-- | keymanager/src/leap/keymanager/openpgp.py | 35 | ||||
| -rw-r--r-- | keymanager/src/leap/keymanager/tests/test_keymanager.py | 22 | ||||
| -rw-r--r-- | keymanager/src/leap/keymanager/tests/test_openpgp.py | 67 | 
4 files changed, 68 insertions, 57 deletions
| diff --git a/keymanager/changes/next-changelog.txt b/keymanager/changes/next-changelog.txt index 91ca8bd..a2ab4fb 100644 --- a/keymanager/changes/next-changelog.txt +++ b/keymanager/changes/next-changelog.txt @@ -12,6 +12,7 @@ Features  ~~~~~~~~  - `#8031 <https://leap.se/code/issues/8031>`_: Remove support for multiple key types.  - `#8068 <https://leap.se/code/issues/8068>`_: make get_all_keys aware of active addresses. +- `#6658 <https://leap.se/code/issues/6658>`_: Improve duplicated active documents fixup.  - `#1234 <https://leap.se/code/issues/1234>`_: Description of the new feature corresponding with issue #1234.  - New feature without related issue number. diff --git a/keymanager/src/leap/keymanager/openpgp.py b/keymanager/src/leap/keymanager/openpgp.py index 8658e9c..98ce464 100644 --- a/keymanager/src/leap/keymanager/openpgp.py +++ b/keymanager/src/leap/keymanager/openpgp.py @@ -47,8 +47,8 @@ from leap.keymanager.documents import (      TYPE_ADDRESS_PRIVATE_INDEX,      KEY_UIDS_KEY,      KEY_FINGERPRINT_KEY, +    KEY_PRIVATE_KEY,      KEY_REFRESHED_AT_KEY, -    KEY_LAST_AUDITED_AT_KEY,      KEY_SIGN_USED_KEY,      KEY_ENCR_USED_KEY,      KEY_ADDRESS_KEY, @@ -268,8 +268,8 @@ class OpenPGPScheme(object):              '1' if private else '0')          keys = [] +        fp = lambda doc: doc.content[KEY_FINGERPRINT_KEY]          for active in active_docs: -            fp = lambda doc: doc.content[KEY_FINGERPRINT_KEY]              fp_keys = filter(lambda k: fp(k) == fp(active), key_docs)              if len(fp_keys) == 0: @@ -770,6 +770,7 @@ class OpenPGPScheme(object):          return self._repair_docs(doclist, cmp_key, log_key_doc) +    @defer.inlineCallbacks      def _repair_active_docs(self, doclist):          """          If there is more than one active doc for an address try to self-repair @@ -779,23 +780,39 @@ class OpenPGPScheme(object):                   all the deletions are completed          :rtype: Deferred          """ +        keys = {} +        for doc in doclist: +            fp = doc.content[KEY_FINGERPRINT_KEY] +            private = doc.content[KEY_PRIVATE_KEY] +            try: +                key = yield self._get_key_doc_from_fingerprint(fp, private) +                keys[fp] = key +            except Exception: +                pass +          def log_active_doc(doc):              logger.error("\t%s: %s" % (doc.content[KEY_ADDRESS_KEY],                                         doc.content[KEY_FINGERPRINT_KEY]))          def cmp_active(d1, d2): -            res = cmp(d1.content[KEY_LAST_AUDITED_AT_KEY], -                      d2.content[KEY_LAST_AUDITED_AT_KEY]) -            if res != 0: -                return res - +            # XXX: for private keys it will be nice to check which key is known +            #      by the nicknym server and keep this one. But this needs a +            #      refactor that might not be worth it.              used1 = (d1.content[KEY_SIGN_USED_KEY] +                       d1.content[KEY_ENCR_USED_KEY])              used2 = (d2.content[KEY_SIGN_USED_KEY] +                       d2.content[KEY_ENCR_USED_KEY]) -            return cmp(used1, used2) +            res = cmp(used1, used2) +            if res != 0: +                return res + +            key1 = keys[d1.content[KEY_FINGERPRINT_KEY]] +            key2 = keys[d2.content[KEY_FINGERPRINT_KEY]] +            return cmp(key1.content[KEY_REFRESHED_AT_KEY], +                       key2.content[KEY_REFRESHED_AT_KEY]) -        return self._repair_docs(doclist, cmp_active, log_active_doc) +        doc = yield self._repair_docs(doclist, cmp_active, log_active_doc) +        defer.returnValue(doc)      def _repair_docs(self, doclist, cmp_func, log_func):          logger.error("BUG ---------------------------------------------------") diff --git a/keymanager/src/leap/keymanager/tests/test_keymanager.py b/keymanager/src/leap/keymanager/tests/test_keymanager.py index 9deaff1..c4d3fda 100644 --- a/keymanager/src/leap/keymanager/tests/test_keymanager.py +++ b/keymanager/src/leap/keymanager/tests/test_keymanager.py @@ -237,7 +237,8 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):          def verify_the_call(_):              used_kwargs = km._async_client_pinned.request.call_args[1] -            km._async_client_pinned.request.assert_called_once_with(expected_url, 'GET', **used_kwargs) +            km._async_client_pinned.request.assert_called_once_with( +                expected_url, 'GET', **used_kwargs)          d = self._fetch_key(km, ADDRESS_2, PUBLIC_KEY_2)          d.addCallback(verify_the_call) @@ -245,11 +246,13 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):      def test_key_not_found_is_raised_if_key_search_responds_404(self):          """ -        Test if key search request comes back with a 404 response then KeyNotFound is raised, with corresponding error message. +        Test if key search request comes back with a 404 response then +        KeyNotFound is raised, with corresponding error message.          """          km = self._key_manager(url=NICKSERVER_URI)          client.readBody = Mock(return_value=defer.succeed(None)) -        km._async_client_pinned.request = Mock(return_value=defer.succeed(None)) +        km._async_client_pinned.request = Mock( +            return_value=defer.succeed(None))          url = NICKSERVER_URI + '?address=' + INVALID_MAIL_ADDRESS          d = km._fetch_and_handle_404_from_nicknym(url, INVALID_MAIL_ADDRESS) @@ -259,7 +262,9 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):              check_404_callback = used_kwargs['callback']              fake_response = Mock()              fake_response.code = NOT_FOUND -            with self.assertRaisesRegexp(errors.KeyNotFound, '404: %s key not found.' % INVALID_MAIL_ADDRESS): +            with self.assertRaisesRegexp( +                    errors.KeyNotFound, +                    '404: %s key not found.' % INVALID_MAIL_ADDRESS):                  check_404_callback(fake_response)          d.addCallback(check_key_not_found_is_raised_if_404) @@ -267,11 +272,13 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):      def test_non_existing_key_from_nicknym_is_relayed(self):          """ -        Test if key search requests throws KeyNotFound, the same error is raised. +        Test if key search requests throws KeyNotFound, the same error is +        raised.          """          km = self._key_manager(url=NICKSERVER_URI)          key_not_found_exception = errors.KeyNotFound('some message') -        km._async_client_pinned.request = Mock(side_effect=key_not_found_exception) +        km._async_client_pinned.request = Mock( +            side_effect=key_not_found_exception)          def assert_key_not_found_raised(error):              self.assertEqual(error.value, key_not_found_exception) @@ -312,7 +319,8 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):          client.readBody = Mock(return_value=defer.succeed(data))          # mock the fetcher so it returns the key for ADDRESS_2 -        km._async_client_pinned.request = Mock(return_value=defer.succeed(None)) +        km._async_client_pinned.request = Mock( +            return_value=defer.succeed(None))          km.ca_cert_path = 'cacertpath'          # try to key get without fetching from server          d_fail = km.get_key(address, fetch_remote=False) diff --git a/keymanager/src/leap/keymanager/tests/test_openpgp.py b/keymanager/src/leap/keymanager/tests/test_openpgp.py index 4c01676..17a6b79 100644 --- a/keymanager/src/leap/keymanager/tests/test_openpgp.py +++ b/keymanager/src/leap/keymanager/tests/test_openpgp.py @@ -309,52 +309,37 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):          self.assertEqual(self.count, 2)      @inlineCallbacks -    def test_self_repair_five_active_docs(self): +    def test_self_repair_six_active_docs(self):          pgp = openpgp.OpenPGPScheme(              self._soledad, gpgbinary=self.gpg_binary_path) -        get_from_index = self._soledad.get_from_index -        delete_doc = self._soledad.delete_doc - -        def my_get_from_index(*args): -            if (args[0] == TYPE_ADDRESS_PRIVATE_INDEX and -                    args[2] == ADDRESS): -                k1 = OpenPGPKey(ADDRESS, fingerprint="1", -                                last_audited_at=datetime(2005, 1, 1)) -                k2 = OpenPGPKey(ADDRESS, fingerprint="2", -                                last_audited_at=datetime(2007, 1, 1)) -                k3 = OpenPGPKey(ADDRESS, fingerprint="3", -                                last_audited_at=datetime(2007, 1, 1), -                                encr_used=True, sign_used=True) -                k4 = OpenPGPKey(ADDRESS, fingerprint="4", -                                last_audited_at=datetime(2007, 1, 1), -                                sign_used=True) -                k5 = OpenPGPKey(ADDRESS, fingerprint="5", -                                last_audited_at=datetime(2007, 1, 1), -                                encr_used=True) -                deferreds = [] -                for k in (k1, k2, k3, k4, k5): -                    d = self._soledad.create_doc_from_json( -                        k.get_active_json()) -                    deferreds.append(d) -                return gatherResults(deferreds) -            elif args[0] == TYPE_FINGERPRINT_PRIVATE_INDEX: -                fingerprint = args[2] -                self.assertEqual(fingerprint, "3") -                k = OpenPGPKey(ADDRESS, fingerprint="3") -                return succeed( -                    [self._soledad.create_doc_from_json(k.get_json())]) -            return get_from_index(*args) +        k1 = OpenPGPKey(ADDRESS, fingerprint="1", +                        refreshed_at=datetime(2005, 1, 1)) +        k2 = OpenPGPKey(ADDRESS, fingerprint="2", +                        refreshed_at=datetime(2007, 1, 1)) +        k3 = OpenPGPKey(ADDRESS, fingerprint="3", +                        refreshed_at=datetime(2007, 1, 1), +                        encr_used=True, sign_used=True) +        k4 = OpenPGPKey(ADDRESS, fingerprint="4", +                        refreshed_at=datetime(2007, 1, 1), +                        sign_used=True) +        k5 = OpenPGPKey(ADDRESS, fingerprint="5", +                        refreshed_at=datetime(2007, 1, 1), +                        encr_used=True) +        k6 = OpenPGPKey(ADDRESS, fingerprint="6", +                        refreshed_at=datetime(2006, 1, 1), +                        encr_used=True, sign_used=True) +        keys = (k1, k2, k3, k4, k5, k6) +        for key in keys: +            yield self._soledad.create_doc_from_json(key.get_json()) +            yield self._soledad.create_doc_from_json(key.get_active_json()) -        self._soledad.get_from_index = my_get_from_index -        self._soledad.delete_doc = Mock(return_value=succeed(None)) +        delete_doc = self._mock_delete_doc() -        try: -            yield pgp.get_key(ADDRESS, private=False) -            self.assertEqual(self._soledad.delete_doc.call_count, 4) -        finally: -            self._soledad.get_from_index = get_from_index -            self._soledad.delete_doc = delete_doc +        key = yield pgp.get_key(ADDRESS, private=False) +        self._soledad.delete_doc = delete_doc +        self.assertEqual(self.count, 5) +        self.assertEqual(key.fingerprint, "3")      def _assert_key_not_found(self, pgp, address, private=False):          d = pgp.get_key(address, private=private) | 
