diff options
| -rw-r--r-- | src/leap/common/keymanager/__init__.py | 246 | ||||
| -rw-r--r-- | src/leap/common/keymanager/keys.py | 72 | ||||
| -rw-r--r-- | src/leap/common/tests/test_keymanager.py | 113 | 
3 files changed, 288 insertions, 143 deletions
| diff --git a/src/leap/common/keymanager/__init__.py b/src/leap/common/keymanager/__init__.py index 30a9146..ad9bb3b 100644 --- a/src/leap/common/keymanager/__init__.py +++ b/src/leap/common/keymanager/__init__.py @@ -34,6 +34,7 @@ from leap.common.keymanager.errors import (  )  from leap.common.keymanager.keys import (      build_key_from_dict, +    KEYMANAGER_KEY_TAG,  )  from leap.common.keymanager.openpgp import (      OpenPGPKey, @@ -42,6 +43,10 @@ from leap.common.keymanager.openpgp import (  ) +# +# key indexing constants. +# +  TAGS_INDEX = 'by-tags'  TAGS_AND_PRIVATE_INDEX = 'by-tags-and-private'  INDEXES = { @@ -50,9 +55,21 @@ INDEXES = {  } +# +# The Key Manager +# +  class KeyManager(object): -    def __init__(self, address, nickserver_url, soledad, token=None): +    # +    # server's key storage constants +    # + +    OPENPGP_KEY = 'openpgp' +    PUBKEY_KEY = "user[public_key]" + +    def __init__(self, address, nickserver_uri, soledad, session_id=None, +                 ca_cert_path=None, api_uri=None, api_version=None, uid=None):          """          Initialize a Key Manager for user's C{address} with provider's          nickserver reachable in C{url}. @@ -63,17 +80,35 @@ class KeyManager(object):          @type url: str          @param soledad: A Soledad instance for local storage of keys.          @type soledad: leap.soledad.Soledad +        @param session_id: The session ID for interacting with the webapp API. +        @type session_id: str +        @param ca_cert_path: The path to the CA certificate. +        @type ca_cert_path: str +        @param api_uri: The URI of the webapp API. +        @type api_uri: str +        @param api_version: The version of the webapp API. +        @type api_version: str +        @param uid: The users' UID. +        @type uid: str          """          self._address = address -        self._nickserver_url = nickserver_url +        self._nickserver_uri = nickserver_uri          self._soledad = soledad -        self.token = token +        self._session_id = session_id +        self.ca_cert_path = ca_cert_path +        self.api_uri = api_uri +        self.api_version = api_version +        self.uid = uid +        # a dict to map key types to their handlers          self._wrapper_map = {              OpenPGPKey: OpenPGPScheme(soledad),              # other types of key will be added to this mapper.          } +        # initialize the indexes needed to query the database          self._init_indexes() +        # the following are used to perform https requests          self._fetcher = requests +        self._session = self._fetcher.session()      #      # utilities @@ -107,24 +142,81 @@ class KeyManager(object):              self._soledad.delete_index(name)              self._soledad.create_index(name, *expression) -    def _get_dict_from_http_json(self, path): +    def _get(self, uri, data=None):          """ -        Make a GET HTTP request and return a dictionary containing the -        response. +        Send a GET request to C{uri} containing C{data}. + +        @param uri: The URI of the request. +        @type uri: str +        @param data: The body of the request. +        @type data: dict, str or file + +        @return: The response to the request. +        @rtype: requests.Response          """ -        response = self._fetcher.get(self._nickserver_url+path) -        leap_assert(response.status_code == 200, 'Invalid response.')          leap_assert( -            response.headers['content-type'].startswith('application/json') -            is True, +            self._ca_cert_path is not None, +            'We need the CA certificate path!') +        res = self._fetcher.get(uri, data=data, verify=self._ca_cert_path) +        # assert that the response is valid +        res.raise_for_status() +        leap_assert( +            res.headers['content-type'].startswith('application/json'),              'Content-type is not JSON.') -        return response.json() +        return res + +    def _put(self, uri, data=None): +        """ +        Send a PUT request to C{uri} containing C{data}. + +        The request will be sent using the configured CA certificate path to +        verify the server certificate and the configured session id for +        authentication. + +        @param uri: The URI of the request. +        @type uri: str +        @param data: The body of the request. +        @type data: dict, str or file + +        @return: The response to the request. +        @rtype: requests.Response +        """ +        leap_assert( +            self._ca_cert_path is not None, +            'We need the CA certificate path!') +        leap_assert( +            self._session_id is not None, +            'We need a session_id to interact with webapp!') +        res = self._fetcher.put( +            uri, data=data, verify=self._ca_cert_path, +            cookies={'_session_id': self._session_id}) +        # assert that the response is valid +        res.raise_for_status() +        return res + +    def _fetch_keys_from_server(self, address): +        """ +        Fetch keys bound to C{address} from nickserver and insert them in +        local database. + +        @param address: The address bound to the keys. +        @type address: str + +        @raise KeyNotFound: If the key was not found on nickserver. +        """ +        # request keys from the nickserver +        server_keys = self._get( +            self._nickserver_uri, {'address': address}).json() +        # insert keys in local database +        if self.OPENPGP_KEY in server_keys: +            self._wrapper_map[OpenPGPKey].put_ascii_key( +                server_keys['openpgp'])      #      # key management      # -    def send_key(self, ktype, send_private=False, password=None): +    def send_key(self, ktype):          """          Send user's key of type C{ktype} to provider. @@ -140,33 +232,22 @@ class KeyManager(object):          @param ktype: The type of the key.          @type ktype: KeyType -        @raise httplib.HTTPException: -        @raise KeyNotFound: If the key was not found both locally and in -            keyserver. +        @raise KeyNotFound: If the key was not found in local database.          """ +        leap_assert( +            ktype is OpenPGPKey, +            'For now we only know how to send OpenPGP public keys.')          # prepare the public key bound to address          pubkey = self.get_key(              self._address, ktype, private=False, fetch_remote=False)          data = { -            'address': self._address, -            'keys': [ -                json.loads(pubkey.get_json()), -            ] +            self.PUBKEY_KEY: pubkey.key_data          } -        # prepare the private key bound to address -        if send_private: -            if password is None or password == '': -                raise NoPasswordGiven('Can\'t send unencrypted private keys!') -            privkey = self.get_key( -                self._address, ktype, private=True, fetch_remote=False) -            privkey = json.loads(privkey.get_json()) -            privkey.key_data = encrypt_sym( -                privkey.key_data, passphrase=password) -            data['keys'].append(privkey) -        self._fetcher.put( -            self._nickserver_url + '/key/' + self._address, -            data=data, -            auth=(self._address, self._token)) +        uri = "%s/%s/users/%s.json" % ( +            self._api_uri, +            self._api_version, +            self._uid) +        self._put(uri, data)      def get_key(self, address, ktype, private=False, fetch_remote=True):          """ @@ -191,48 +272,15 @@ class KeyManager(object):              ktype in self._wrapper_map,              'Unkown key type: %s.' % str(ktype))          try: +            # return key if it exists in local database              return self._wrapper_map[ktype].get_key(address, private=private)          except KeyNotFound:              # we will only try to fetch a key from nickserver if fetch_remote              # is True and the key is not private.              if fetch_remote is False or private is True:                  raise -            # fetch keys from server and discard unwanted types. -            keys = filter(lambda k: isinstance(k, ktype), -                          self.fetch_keys_from_server(address)) -            if len(keys) is 0: -                raise KeyNotFound() -            leap_assert( -                len(keys) == 1, -                'Got more than one key of type %s for %s.' % -                (str(ktype), address)) -            self._wrapper_map[ktype].put_key(keys[0]) -            return self._wrapper_map[ktype].get_key(address, private=private) - -    def fetch_keys_from_server(self, address): -        """ -        Fetch keys bound to C{address} from nickserver. - -        @param address: The address bound to the keys. -        @type address: str - -        @return: A list of keys bound to C{address}. -        @rtype: list of EncryptionKey -        @raise KeyNotFound: If the key was not found on nickserver. -        @raise httplib.HTTPException: -        """ -        keydata = self._get_dict_from_http_json('/key/%s' % address) -        leap_assert( -            keydata['address'] == address, -            "Fetched key for wrong address.") -        keys = [] -        for key in keydata['keys']: -            keys.append( -                build_key_from_dict( -                    self._key_class_from_type(key['type']), -                    address, -                    key)) -        return keys +            self._fetch_keys_from_server(address) +            return self._wrapper_map[ktype].get_key(address, private=False)      def get_all_keys_in_local_db(self, private=False):          """ @@ -248,7 +296,7 @@ class KeyManager(object):                  doc.content),              self._soledad.get_from_index(                  TAGS_AND_PRIVATE_INDEX, -                'keymanager-key', +                KEYMANAGER_KEY_TAG,                  '1' if private else '0'))      def refresh_keys(self): @@ -258,10 +306,11 @@ class KeyManager(object):          addresses = set(map(              lambda doc: doc.address,              self.get_all_keys_in_local_db(private=False))) -        # TODO: maybe we should not attempt to refresh our own public key?          for address in addresses: -            for key in self.fetch_keys_from_server(address): -                self._wrapper_map[key.__class__].put_key(key) +            # do not attempt to refresh our own key +            if address == self._address: +                continue +            self._fetch_keys_from_server(address)      def gen_key(self, ktype):          """ @@ -276,14 +325,51 @@ class KeyManager(object):          return self._wrapper_map[ktype].gen_key(self._address)      # -    # Token setter/getter +    # Setters/getters      # -    def _get_token(self): -        return self._token +    def _get_session_id(self): +        return self._session_id + +    def _set_session_id(self, session_id): +        self._session_id = session_id + +    session_id = property( +        _get_session_id, _set_session_id, doc='The session id.') + +    def _get_ca_cert_path(self): +        return self._ca_cert_path + +    def _set_ca_cert_path(self, ca_cert_path): +        self._ca_cert_path = ca_cert_path + +    ca_cert_path = property( +        _get_ca_cert_path, _set_ca_cert_path, +        doc='The path to the CA certificate.') + +    def _get_api_uri(self): +        return self._api_uri + +    def _set_api_uri(self, api_uri): +        self._api_uri = api_uri + +    api_uri = property( +        _get_api_uri, _set_api_uri, doc='The webapp API URI.') + +    def _get_api_version(self): +        return self._api_version + +    def _set_api_version(self, api_version): +        self._api_version = api_version + +    api_version = property( +        _get_api_version, _set_api_version, doc='The webapp API version.') + +    def _get_uid(self): +        return self._uid -    def _set_token(self, token): -        self._token = token +    def _set_uid(self, uid): +        self._uid = uid -    token = property( -        _get_token, _set_token, doc='The auth token.') +    uid = property( +        _get_uid, _set_uid, doc='The uid of the user.') diff --git a/src/leap/common/keymanager/keys.py b/src/leap/common/keymanager/keys.py index d98bd02..1d87858 100644 --- a/src/leap/common/keymanager/keys.py +++ b/src/leap/common/keymanager/keys.py @@ -34,6 +34,30 @@ from leap.common.check import leap_assert  # +# Dictionary keys used for storing cryptographic keys. +# + +KEY_ADDRESS_KEY = 'address' +KEY_TYPE_KEY = 'type' +KEY_ID_KEY = 'key_id' +KEY_FINGERPRINT_KEY = 'fingerprint' +KEY_DATA_KEY = 'key_data' +KEY_PRIVATE_KEY = 'private' +KEY_LENGTH_KEY = 'length' +KEY_EXPIRY_DATE_KEY = 'expiry_date' +KEY_FIRST_SEEN_AT_KEY = 'first_seen_at' +KEY_LAST_AUDITED_AT_KEY = 'last_audited_at' +KEY_VALIDATION_KEY = 'validation' +KEY_TAGS_KEY = 'tags' + + +# +# Key storage constants +# + +KEYMANAGER_KEY_TAG = 'keymanager-key' + +#  # Key handling utilities  # @@ -60,18 +84,20 @@ def build_key_from_dict(kClass, address, kdict):      @return: An instance of the key.      @rtype: C{kClass}      """ -    leap_assert(address == kdict['address'], 'Wrong address in key data.') +    leap_assert( +        address == kdict[KEY_ADDRESS_KEY], +        'Wrong address in key data.')      return kClass(          address, -        key_id=kdict['key_id'], -        fingerprint=kdict['fingerprint'], -        key_data=kdict['key_data'], -        private=kdict['private'], -        length=kdict['length'], -        expiry_date=kdict['expiry_date'], -        first_seen_at=kdict['first_seen_at'], -        last_audited_at=kdict['last_audited_at'], -        validation=kdict['validation'],  # TODO: verify for validation. +        key_id=kdict[KEY_ID_KEY], +        fingerprint=kdict[KEY_FINGERPRINT_KEY], +        key_data=kdict[KEY_DATA_KEY], +        private=kdict[KEY_PRIVATE_KEY], +        length=kdict[KEY_LENGTH_KEY], +        expiry_date=kdict[KEY_EXPIRY_DATE_KEY], +        first_seen_at=kdict[KEY_FIRST_SEEN_AT_KEY], +        last_audited_at=kdict[KEY_LAST_AUDITED_AT_KEY], +        validation=kdict[KEY_VALIDATION_KEY],  # TODO: verify for validation.      ) @@ -92,7 +118,7 @@ def keymanager_doc_id(ktype, address, private=False):      """      leap_assert(is_address(address), "Wrong address format: %s" % address)      ktype = str(ktype) -    visibility = 'private' if private else 'public' +    visibility = KEY_PRIVATE_KEY if private else 'public'      return sha256('keymanager-'+address+'-'+ktype+'-'+visibility).hexdigest() @@ -141,18 +167,18 @@ class EncryptionKey(object):          @rtype: str          """          return json.dumps({ -            'address': self.address, -            'type': str(self.__class__), -            'key_id': self.key_id, -            'fingerprint': self.fingerprint, -            'key_data': self.key_data, -            'private': self.private, -            'length': self.length, -            'expiry_date': self.expiry_date, -            'validation': self.validation, -            'first_seen_at': self.first_seen_at, -            'last_audited_at': self.last_audited_at, -            'tags': ['keymanager-key'], +            KEY_ADDRESS_KEY: self.address, +            KEY_TYPE_KEY: str(self.__class__), +            KEY_ID_KEY: self.key_id, +            KEY_FINGERPRINT_KEY: self.fingerprint, +            KEY_DATA_KEY: self.key_data, +            KEY_PRIVATE_KEY: self.private, +            KEY_LENGTH_KEY: self.length, +            KEY_EXPIRY_DATE_KEY: self.expiry_date, +            KEY_VALIDATION_KEY: self.validation, +            KEY_FIRST_SEEN_AT_KEY: self.first_seen_at, +            KEY_LAST_AUDITED_AT_KEY: self.last_audited_at, +            KEY_TAGS_KEY: [KEYMANAGER_KEY_TAG],          })      def __repr__(self): diff --git a/src/leap/common/tests/test_keymanager.py b/src/leap/common/tests/test_keymanager.py index a7aa1ca..dcd525c 100644 --- a/src/leap/common/tests/test_keymanager.py +++ b/src/leap/common/tests/test_keymanager.py @@ -419,73 +419,106 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):          km = self._key_manager()          self.assertRaises(              KeyNotFound, -            km.send_key, OpenPGPKey, send_private=False) +            km.send_key, OpenPGPKey) -    def test_send_private_key_raises_key_not_found(self): -        km = self._key_manager() -        km._wrapper_map[OpenPGPKey].put_ascii_key(PUBLIC_KEY) -        self.assertRaises( -            KeyNotFound, -            km.send_key, OpenPGPKey, send_private=True, -            password='123') - -    def test_send_private_key_without_password_raises(self): -        km = self._key_manager() -        km._wrapper_map[OpenPGPKey].put_ascii_key(PUBLIC_KEY) -        self.assertRaises( -            NoPasswordGiven, -            km.send_key, OpenPGPKey, send_private=True) - -    def test_send_public_key(self): +    def test_send_key(self): +        """ +        Test that request is well formed when sending keys to server. +        """          km = self._key_manager()          km._wrapper_map[OpenPGPKey].put_ascii_key(PUBLIC_KEY)          km._fetcher.put = Mock() -        km.token = '123' -        km.send_key(OpenPGPKey, send_private=False) -        # setup args +        # the following data will be used on the send +        km.ca_cert_path = 'capath' +        km.session_id = 'sessionid' +        km.uid = 'myuid' +        km.api_uri = 'apiuri' +        km.api_version = 'apiver' +        km.send_key(OpenPGPKey) +        # setup expected args          data = { -            'address': km._address, -            'keys': [ -                json.loads( -                    km.get_key( -                        km._address, OpenPGPKey).get_json()), -            ] +            km.PUBKEY_KEY: km.get_key(km._address, OpenPGPKey).key_data,          } -        url = km._nickserver_url + '/key/' + km._address - +        url = '%s/%s/users/%s.json' % ('apiuri', 'apiver', 'myuid')          km._fetcher.put.assert_called_once_with( -            url, data=data, auth=(km._address, '123') +            url, data=data, verify='capath', +            cookies={'_session_id': 'sessionid'},          ) -    def test_fetch_keys_from_server(self): -        km = self._key_manager() -        # setup mock +    def test__fetch_keys_from_server(self): +        """ +        Test that the request is well formed when fetching keys from server. +        """ +        km = self._key_manager(url='http://nickserver.domain')          class Response(object):              status_code = 200              headers = {'content-type': 'application/json'}              def json(self): -                return {'address': ADDRESS_2, 'keys': []} +                return {'address': ADDRESS_2, 'openpgp': PUBLIC_KEY_2} + +            def raise_for_status(self): +                pass +        # mock the fetcher so it returns the key for ADDRESS_2          km._fetcher.get = Mock(              return_value=Response()) +        km.ca_cert_path = 'cacertpath'          # do the fetch -        km.fetch_keys_from_server(ADDRESS_2) +        km._fetch_keys_from_server(ADDRESS_2)          # and verify the call          km._fetcher.get.assert_called_once_with( -            km._nickserver_url + '/key/' + ADDRESS_2, +            'http://nickserver.domain', +            data={'address': ADDRESS_2}, +            verify='cacertpath',          ) -    def test_refresh_keys(self): -        # TODO: maybe we should not attempt to refresh our own public key? +    def test_refresh_keys_does_not_refresh_own_key(self): +        """ +        Test that refreshing keys will not attempt to refresh our own key. +        """          km = self._key_manager() +        # we add 2 keys but we expect it to only refresh the second one.          km._wrapper_map[OpenPGPKey].put_ascii_key(PUBLIC_KEY) -        km.fetch_keys_from_server = Mock(return_value=[]) +        km._wrapper_map[OpenPGPKey].put_ascii_key(PUBLIC_KEY_2) +        # mock the key fetching +        km._fetch_keys_from_server = Mock(return_value=[]) +        km.ca_cert_path = ''  # some bogus path so the km does not complain. +        # do the refreshing          km.refresh_keys() -        km.fetch_keys_from_server.assert_called_once_with( -            ADDRESS +        km._fetch_keys_from_server.assert_called_once_with( +            ADDRESS_2 +        ) + +    def test_get_key_fetches_from_server(self): +        """ +        Test that getting a key successfuly fetches from server. +        """ +        km = self._key_manager(url='http://nickserver.domain') + +        class Response(object): +            status_code = 200 +            headers = {'content-type': 'application/json'} + +            def json(self): +                return {'address': ADDRESS_2, 'openpgp': PUBLIC_KEY_2} + +            def raise_for_status(self): +                pass + +        # mock the fetcher so it returns the key for ADDRESS_2 +        km._fetcher.get = Mock(return_value=Response()) +        km.ca_cert_path = 'cacertpath' +        # try to key get without fetching from server +        self.assertRaises( +            KeyNotFound, km.get_key, ADDRESS_2, OpenPGPKey, +            fetch_remote=False          ) +        # try to get key fetching from server. +        key = km.get_key(ADDRESS_2, OpenPGPKey) +        self.assertIsInstance(key, OpenPGPKey) +        self.assertEqual(ADDRESS_2, key.address)  # Key material for testing | 
