"""
Returns the x509 from the contents of this string
- @param string: certificate contents as downloaded
- @type string: str
+ :param string: certificate contents as downloaded
+ :type string: str
- @return: x509 or None
+ :return: x509 or None
"""
leap_assert(string, "We need something to load")
"""
Returns the private key from the contents of this string
- @param string: private key contents as downloaded
- @type string: str
+ :param string: private key contents as downloaded
+ :type string: str
- @return: private key or None
+ :return: private key or None
"""
leap_assert(string, "We need something to load")
"""
Returns the digest for the cert_data using the method specified
- @param cert_data: certificate data in string form
- @type cert_data: str
- @param method: method to be used for digest
- @type method: str
+ :param cert_data: certificate data in string form
+ :type cert_data: str
+ :param method: method to be used for digest
+ :type method: str
- @rtype: str
+ :rtype: str
"""
x509 = get_cert_from_string(cert_data)
digest = x509.digest(method).replace(":", "").lower()
Loads certificate and private key from a buffer, returns True if
everything went well, False otherwise
- @param string: buffer containing the cert and private key
- @type string: str or any kind of buffer
+ :param string: buffer containing the cert and private key
+ :type string: str or any kind of buffer
- @rtype: bool
+ :rtype: bool
"""
can_load = True
"""
Checks that the passed string is a valid pem certificate
- @param cert: String containing pem content
- @type cert: str
+ :param cert: String containing pem content
+ :type cert: str
- @rtype: bool
+ :rtype: bool
"""
leap_assert(cert, "We need a cert to load")
"""
Returns the time boundaries for the certificate saved in certfile
- @param certfile: path to certificate
- @type certfile: str
+ :param certfile: path to certificate
+ :type certfile: str
- @rtype: tuple (from, to)
+ :rtype: tuple (from, to)
"""
cert = get_cert_from_string(certfile)
leap_assert(cert, 'There was a problem loading the certificate')
"""
Returns True if any of the checks don't pass, False otherwise
- @param certfile: path to certificate
- @type certfile: str
- @param now: current date function, ONLY USED FOR TESTING
+ :param certfile: path to certificate
+ :type certfile: str
+ :param now: current date function, ONLY USED FOR TESTING
- @rtype: bool
+ :rtype: bool
"""
exists = os.path.isfile(certfile)
Asserts the condition and displays the message if that's not
met. It also logs the error and its backtrace.
- @param condition: condition to check
- @type condition: bool
- @param message: message to display if the condition isn't met
- @type message: str
+ :param condition: condition to check
+ :type condition: bool
+ :param message: message to display if the condition isn't met
+ :type message: str
"""
if not condition:
logger.error("Bug: %s" % (message,))
"""
Helper assert check for a variable's expected type
- @param var: variable to check
- @type var: any
- @param expectedType: type to check agains
- @type expectedType: type
+ :param var: variable to check
+ :type var: any
+ :param expectedType: type to check agains
+ :type expectedType: type
"""
leap_assert(isinstance(var, expectedType),
"Expected type %r instead of %r" %
"""
Tries to return a value only if the config has already been loaded.
- @rtype: depends on the config structure, dict, str, array, int
- @return: returns the value for the specified key in the config
+ :rtype: depends on the config structure, dict, str, array, int
+ :return: returns the value for the specified key in the config
"""
leap_assert(self._config_checker, "Load the config first")
return self._config_checker.config.get(key, None)
"""
Returns the platform dependant path prefixer
- @param standalone: if True it will return the prefix for a
+ :param standalone: if True it will return the prefix for a
standalone application. Otherwise, it will return the system
default for configuration storage.
- @type standalone: bool
+ :type standalone: bool
"""
return ""
This method expects an env variable named LEAP_CLIENT_PATH if
standalone is used.
- @param standalone: if True it will return the prefix for a
+ :param standalone: if True it will return the prefix for a
standalone application. Otherwise, it will return the system
default for configuration storage.
- @type standalone: bool
+ :type standalone: bool
"""
config_dir = BaseDirectory.xdg_config_home
if not standalone:
This method expects an env variable named LEAP_CLIENT_PATH if
standalone is used.
- @param standalone: if True it will return the prefix for a
+ :param standalone: if True it will return the prefix for a
standalone application. Otherwise, it will return the system
default for configuration storage.
- @type standalone: bool
+ :type standalone: bool
"""
config_dir = BaseDirectory.xdg_config_home
if not standalone:
This method expects an env variable named LEAP_CLIENT_PATH if
standalone is used.
- @param standalone: if True it will return the prefix for a
+ :param standalone: if True it will return the prefix for a
standalone application. Otherwise, it will return the system
default for configuration storage.
- @type standalone: bool
+ :type standalone: bool
"""
config_dir = BaseDirectory.xdg_config_home
"""
Encrypt C{data} with C{key}, using C{method} encryption method.
- @param data: The data to be encrypted.
- @type data: str
- @param key: The key used to encrypt C{data} (must be 256 bits long).
- @type key: str
- @param method: The encryption method to use.
- @type method: str
-
- @return: A tuple with the initial value and the encrypted data.
- @rtype: (long, str)
+ :param data: The data to be encrypted.
+ :type data: str
+ :param key: The key used to encrypt C{data} (must be 256 bits long).
+ :type key: str
+ :param method: The encryption method to use.
+ :type method: str
+
+ :return: A tuple with the initial value and the encrypted data.
+ :rtype: (long, str)
"""
leap_assert_type(key, str)
"""
Decrypt C{data} with C{key} using C{method} encryption method.
- @param data: The data to be decrypted.
- @type data: str
- @param key: The key used to decrypt C{data} (must be 256 bits long).
- @type key: str
- @param method: The encryption method to use.
- @type method: str
- @param kwargs: Other parameters specific to each encryption method.
- @type kwargs: dict
-
- @return: The decrypted data.
- @rtype: str
+ :param data: The data to be decrypted.
+ :type data: str
+ :param key: The key used to decrypt C{data} (must be 256 bits long).
+ :type key: str
+ :param method: The encryption method to use.
+ :type method: str
+ :param kwargs: Other parameters specific to each encryption method.
+ :type kwargs: dict
+
+ :return: The decrypted data.
+ :rtype: str
"""
leap_assert_type(key, str)
returned for a synch request but nothing will be returned for an asynch
request.
- @param signal: the signal that causes the callback to be launched
- @type signal: int (see the `events.proto` file)
- @param callback: the callback to be called when the signal is received
- @type callback: function
- @param uid: a unique id for the callback
- @type uid: int
- @param replace: should an existent callback with same uid be replaced?
- @type replace: bool
- @param reqcbk: a callback to be called when a response from server is
+ :param signal: the signal that causes the callback to be launched
+ :type signal: int (see the `events.proto` file)
+ :param callback: the callback to be called when the signal is received
+ :type callback: function
+ :param uid: a unique id for the callback
+ :type uid: int
+ :param replace: should an existent callback with same uid be replaced?
+ :type replace: bool
+ :param reqcbk: a callback to be called when a response from server is
received
- @type reqcbk: function
+ :type reqcbk: function
callback(leap.common.events.events_pb2.EventResponse)
- @param timeout: the timeout for synch calls
- @type timeout: int
+ :param timeout: the timeout for synch calls
+ :type timeout: int
- @return: the response from server for synch calls or nothing for asynch
+ :return: the response from server for synch calls or nothing for asynch
calls
- @rtype: leap.common.events.events_pb2.EventsResponse or None
+ :rtype: leap.common.events.events_pb2.EventsResponse or None
"""
return component.register(signal, callback, uid, replace, reqcbk, timeout)
returned for a synch request but nothing will be returned for an asynch
request.
- @param signal: the signal that causes the callback to be launched
- @type signal: int (see the `events.proto` file)
- @param content: the contents of the event signal
- @type content: str
- @param mac_method: the method used to auth mac
- @type mac_method: str
- @param mac: the content of the auth mac
- @type mac: str
- @param reqcbk: a callback to be called when a response from server is
+ :param signal: the signal that causes the callback to be launched
+ :type signal: int (see the `events.proto` file)
+ :param content: the contents of the event signal
+ :type content: str
+ :param mac_method: the method used to auth mac
+ :type mac_method: str
+ :param mac: the content of the auth mac
+ :type mac: str
+ :param reqcbk: a callback to be called when a response from server is
received
- @type reqcbk: function
+ :type reqcbk: function
callback(leap.common.events.events_pb2.EventResponse)
- @param timeout: the timeout for synch calls
- @type timeout: int
+ :param timeout: the timeout for synch calls
+ :type timeout: int
- @return: the response from server for synch calls or nothing for asynch
+ :return: the response from server for synch calls or nothing for asynch
calls
- @rtype: leap.common.events.events_pb2.EventsResponse or None
+ :rtype: leap.common.events.events_pb2.EventsResponse or None
"""
return component.signal(signal, content, mac_method, mac, reqcbk, timeout)
Ensure the component daemon is running and listening for incoming
messages.
- @return: the daemon instance
- @rtype: EventsComponentDaemon
+ :return: the daemon instance
+ :rtype: EventsComponentDaemon
"""
import time
daemon = EventsComponentDaemon.ensure(0)
returned for a synch request but nothing will be returned for an asynch
request.
- @param signal: the signal that causes the callback to be launched
- @type signal: int (see the `events.proto` file)
- @param callback: the callback to be called when the signal is received
- @type callback: function
+ :param signal: the signal that causes the callback to be launched
+ :type signal: int (see the `events.proto` file)
+ :param callback: the callback to be called when the signal is received
+ :type callback: function
callback(leap.common.events.events_pb2.SignalRequest)
- @param uid: a unique id for the callback
- @type uid: int
- @param replace: should an existent callback with same uid be replaced?
- @type replace: bool
- @param reqcbk: a callback to be called when a response from server is
+ :param uid: a unique id for the callback
+ :type uid: int
+ :param replace: should an existent callback with same uid be replaced?
+ :type replace: bool
+ :param reqcbk: a callback to be called when a response from server is
received
- @type reqcbk: function
+ :type reqcbk: function
callback(leap.common.events.events_pb2.EventResponse)
- @param timeout: the timeout for synch calls
- @type timeout: int
+ :param timeout: the timeout for synch calls
+ :type timeout: int
Might raise a CallbackAlreadyRegistered exception if there's already a
callback identified by the given uid and replace is False.
- @return: the response from server for synch calls or nothing for asynch
+ :return: the response from server for synch calls or nothing for asynch
calls
- @rtype: leap.common.events.events_pb2.EventsResponse or None
+ :rtype: leap.common.events.events_pb2.EventsResponse or None
"""
ensure_component_daemon() # so we can receive registered signals
# register callback locally
returned for a synch request but nothing will be returned for an asynch
request.
- @param signal: the signal that causes the callback to be launched
- @type signal: int (see the `events.proto` file)
- @param content: the contents of the event signal
- @type content: str
- @param mac_method: the method used for auth mac
- @type mac_method: str
- @param mac: the content of the auth mac
- @type mac: str
- @param reqcbk: a callback to be called when a response from server is
+ :param signal: the signal that causes the callback to be launched
+ :type signal: int (see the `events.proto` file)
+ :param content: the contents of the event signal
+ :type content: str
+ :param mac_method: the method used for auth mac
+ :type mac_method: str
+ :param mac: the content of the auth mac
+ :type mac: str
+ :param reqcbk: a callback to be called when a response from server is
received
- @type reqcbk: function
+ :type reqcbk: function
callback(leap.common.events.events_pb2.EventResponse)
- @param timeout: the timeout for synch calls
- @type timeout: int
+ :param timeout: the timeout for synch calls
+ :type timeout: int
- @return: the response from server for synch calls or nothing for asynch
+ :return: the response from server for synch calls or nothing for asynch
calls
- @rtype: leap.common.events.events_pb2.EventsResponse or None
+ :rtype: leap.common.events.events_pb2.EventsResponse or None
"""
request = proto.SignalRequest()
request.event = signal
This method is called whenever a signal request is received from
server.
- @param controller: used to mediate a single method call
- @type controller: protobuf.socketrpc.controller.SocketRpcController
- @param request: the request received from the component
- @type request: leap.common.events.events_pb2.SignalRequest
- @param done: callback to be called when done
- @type done: protobuf.socketrpc.server.Callback
+ :param controller: used to mediate a single method call
+ :type controller: protobuf.socketrpc.controller.SocketRpcController
+ :param request: the request received from the component
+ :type request: leap.common.events.events_pb2.SignalRequest
+ :param done: callback to be called when done
+ :type done: protobuf.socketrpc.server.Callback
"""
logger.info('Received signal from server: %s' % str(request))
"""
Make sure the daemon is running on the given port.
- @param port: the port in which the daemon should listen
- @type port: int
+ :param port: the port in which the daemon should listen
+ :type port: int
- @return: a daemon instance
- @rtype: EventsComponentDaemon
+ :return: a daemon instance
+ :rtype: EventsComponentDaemon
"""
return cls.ensure_service(port, EventsComponentService())
"""
Initialize a RPC server.
- @param port: the port in which to listen for incoming messages
- @type port: int
- @param host: the address to bind to
- @type host: str
+ :param port: the port in which to listen for incoming messages
+ :type port: int
+ :param host: the address to bind to
+ :type host: str
"""
SocketRpcServer.__init__(self, port, host)
self._server = None
This is a static method disguised as instance method that actually
does the initialization of the daemon instance.
- @param port: the port in which to listen for incoming messages
- @type port: int
- @param service: the service to provide in this daemon
- @type service: google.protobuf.service.Service
+ :param port: the port in which to listen for incoming messages
+ :type port: int
+ :param service: the service to provide in this daemon
+ :type service: google.protobuf.service.Service
"""
threading.Thread.__init__(self)
self._port = port
with the appropriate service from the `events.proto` definitions, and
return the daemon instance.
- @param port: the port in which the daemon should be listening
- @type port: int
+ :param port: the port in which the daemon should be listening
+ :type port: int
- @return: a daemon instance
- @rtype: EventsSingletonDaemon
+ :return: a daemon instance
+ :rtype: EventsSingletonDaemon
"""
raise NotImplementedError(self.ensure)
Might return ServiceAlreadyRunningException
- @param port: the port in which the daemon should be listening
- @type port: int
+ :param port: the port in which the daemon should be listening
+ :type port: int
- @return: a daemon instance
- @rtype: EventsSingletonDaemon
+ :return: a daemon instance
+ :rtype: EventsSingletonDaemon
"""
daemon = cls(port, service)
if not daemon.is_alive():
"""
Retrieve singleton instance of this daemon.
- @return: a daemon instance
- @rtype: EventsSingletonDaemon
+ :return: a daemon instance
+ :rtype: EventsSingletonDaemon
"""
return cls.__instance
Retrieve the value of the port to which the service running in this
daemon is binded to.
- @return: the port to which the daemon is binded to
- @rtype: int
+ :return: the port to which the daemon is binded to
+ :rtype: int
"""
if self._port is 0:
self._port = self._server.port
Attempt to connect to given local port. Upon success, assume that the
events server has already been started. Upon failure, start events server.
- @param port: the port in which server should be listening
- @type port: int
+ :param port: the port in which server should be listening
+ :type port: int
- @return: the daemon instance or nothing
- @rtype: EventsServerDaemon or None
+ :return: the daemon instance or nothing
+ :rtype: EventsServerDaemon or None
"""
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
"""
Register a component port to be signaled when specific events come in.
- @param controller: used to mediate a single method call
- @type controller: protobuf.socketrpc.controller.SocketRpcController
- @param request: the request received from the component
- @type request: leap.common.events.events_pb2.RegisterRequest
- @param done: callback to be called when done
- @type done: protobuf.socketrpc.server.Callback
+ :param controller: used to mediate a single method call
+ :type controller: protobuf.socketrpc.controller.SocketRpcController
+ :param request: the request received from the component
+ :type request: leap.common.events.events_pb2.RegisterRequest
+ :param done: callback to be called when done
+ :type done: protobuf.socketrpc.server.Callback
"""
logger.info("Received registration request: %s" % str(request))
# add component port to signal list
Perform an RPC call to signal all components registered to receive a
specific signal.
- @param controller: used to mediate a single method call
- @type controller: protobuf.socketrpc.controller.SocketRpcController
- @param request: the request received from the component
- @type request: leap.common.events.events_pb2.SignalRequest
- @param done: callback to be called when done
- @type done: protobuf.socketrpc.server.Callback
+ :param controller: used to mediate a single method call
+ :type controller: protobuf.socketrpc.controller.SocketRpcController
+ :param request: the request received from the component
+ :type request: leap.common.events.events_pb2.SignalRequest
+ :param done: callback to be called when done
+ :type done: protobuf.socketrpc.server.Callback
"""
logger.info('Received signal from component: %s', str(request))
# send signal to all registered components
"""
Make sure the daemon is running on the given port.
- @param port: the port in which the daemon should listen
- @type port: int
+ :param port: the port in which the daemon should listen
+ :type port: int
- @return: a daemon instance
- @rtype: EventsServerDaemon
+ :return: a daemon instance
+ :rtype: EventsServerDaemon
"""
return cls.ensure_service(port, EventsServerService())
Might raise OSError
- @param cert: Certificate path
- @type cert: str
+ :param cert: Certificate path
+ :type cert: str
"""
mode = stat.S_IMODE(os.stat(cert).st_mode)
"""
Returns the modified time or None if the file doesn't exist
- @param filename: path to check
- @type filename: str
+ :param filename: path to check
+ :type filename: str
- @rtype: str
+ :rtype: str
"""
try:
mtime = time.ctime(os.path.getmtime(filename)) + " GMT"
Might raise OSError
- @param path: path to create
- @type path: str
+ :param path: path to create
+ :type path: str
"""
try:
os.makedirs(path)
On MS-Windows the only flag that has any meaning is os.F_OK. Any other
flags will be ignored.
- @type name: C{str}
- @param name: The name for which to search.
+ :type name: C{str}
+ :param name: The name for which to search.
- @type flags: C{int}
- @param flags: Arguments to L{os.access}.
+ :type flags: C{int}
+ :param flags: Arguments to L{os.access}.
- @rtype: C{list}
- @param: A list of the full paths to files found, in the
+ :rtype: C{list}
+ :param: A list of the full paths to files found, in the
order in which they were found.
"""
Initialize a Key Manager for user's C{address} with provider's
nickserver reachable in C{url}.
- @param address: The address of the user of this Key Manager.
- @type address: str
- @param url: The URL of the nickserver.
- @type url: str
- @param soledad: A Soledad instance for local storage of keys.
- @type soledad: leap.soledad.Soledad
- @param session_id: The session ID for interacting with the webapp API.
- @type session_id: str
- @param ca_cert_path: The path to the CA certificate.
- @type ca_cert_path: str
- @param api_uri: The URI of the webapp API.
- @type api_uri: str
- @param api_version: The version of the webapp API.
- @type api_version: str
- @param uid: The users' UID.
- @type uid: str
+ :param address: The address of the user of this Key Manager.
+ :type address: str
+ :param url: The URL of the nickserver.
+ :type url: str
+ :param soledad: A Soledad instance for local storage of keys.
+ :type soledad: leap.soledad.Soledad
+ :param session_id: The session ID for interacting with the webapp API.
+ :type session_id: str
+ :param ca_cert_path: The path to the CA certificate.
+ :type ca_cert_path: str
+ :param api_uri: The URI of the webapp API.
+ :type api_uri: str
+ :param api_version: The version of the webapp API.
+ :type api_version: str
+ :param uid: The users' UID.
+ :type uid: str
"""
self._address = address
self._nickserver_uri = nickserver_uri
"""
Send a GET request to C{uri} containing C{data}.
- @param uri: The URI of the request.
- @type uri: str
- @param data: The body of the request.
- @type data: dict, str or file
+ :param uri: The URI of the request.
+ :type uri: str
+ :param data: The body of the request.
+ :type data: dict, str or file
- @return: The response to the request.
- @rtype: requests.Response
+ :return: The response to the request.
+ :rtype: requests.Response
"""
leap_assert(
self._ca_cert_path is not None,
verify the server certificate and the configured session id for
authentication.
- @param uri: The URI of the request.
- @type uri: str
- @param data: The body of the request.
- @type data: dict, str or file
+ :param uri: The URI of the request.
+ :type uri: str
+ :param data: The body of the request.
+ :type data: dict, str or file
- @return: The response to the request.
- @rtype: requests.Response
+ :return: The response to the request.
+ :rtype: requests.Response
"""
leap_assert(
self._ca_cert_path is not None,
Fetch keys bound to C{address} from nickserver and insert them in
local database.
- @param address: The address bound to the keys.
- @type address: str
+ :param address: The address bound to the keys.
+ :type address: str
@raise KeyNotFound: If the key was not found on nickserver.
"""
will be saved in the server in a way it is publicly retrievable
through the hash string.
- @param ktype: The type of the key.
- @type ktype: KeyType
+ :param ktype: The type of the key.
+ :type ktype: KeyType
@raise KeyNotFound: If the key was not found in local database.
"""
First, search for the key in local storage. If it is not available,
then try to fetch from nickserver.
- @param address: The address bound to the key.
- @type address: str
- @param ktype: The type of the key.
- @type ktype: KeyType
- @param private: Look for a private key instead of a public one?
- @type private: bool
+ :param address: The address bound to the key.
+ :type address: str
+ :param ktype: The type of the key.
+ :type ktype: KeyType
+ :param private: Look for a private key instead of a public one?
+ :type private: bool
- @return: A key of type C{ktype} bound to C{address}.
- @rtype: EncryptionKey
+ :return: A key of type C{ktype} bound to C{address}.
+ :rtype: EncryptionKey
@raise KeyNotFound: If the key was not found both locally and in
keyserver.
"""
"""
Return all keys stored in local database.
- @return: A list with all keys in local db.
- @rtype: list
+ :return: A list with all keys in local db.
+ :rtype: list
"""
return map(
lambda doc: build_key_from_dict(
"""
Generate a key of type C{ktype} bound to the user's address.
- @param ktype: The type of the key.
- @type ktype: KeyType
+ :param ktype: The type of the key.
+ :type ktype: KeyType
- @return: The generated key.
- @rtype: EncryptionKey
+ :return: The generated key.
+ :rtype: EncryptionKey
"""
return self._wrapper_map[ktype].gen_key(self._address)
"""
Initialize the packet listing handling class.
- @param gpg: GPG object instance.
- @type gpg: gnupg.GPG
+ :param gpg: GPG object instance.
+ :type gpg: gnupg.GPG
"""
self.gpg = gpg
self.nodata = None
"""
Handle one line of the --list-packets status message.
- @param key: The status message key.
- @type key: str
- @param value: The status message value.
- @type value: str
+ :param key: The status message key.
+ :type key: str
+ :param value: The status message value.
+ :type value: str
"""
# TODO: write tests for handle_status
if key == 'NODATA':
"""
Initialize a GnuPG process wrapper.
- @param gpgbinary: Name for GnuPG binary executable.
- @type gpgbinary: C{str}
- @param gpghome: Full pathname to directory containing the public and
+ :param gpgbinary: Name for GnuPG binary executable.
+ :type gpgbinary: C{str}
+ :param gpghome: Full pathname to directory containing the public and
private keyrings.
- @type gpghome: C{str}
- @param keyring: Name of alternative keyring file to use. If specified,
+ :type gpghome: C{str}
+ :param keyring: Name of alternative keyring file to use. If specified,
the default keyring is not used.
- @param verbose: Should some verbose info be output?
- @type verbose: bool
- @param use_agent: Should pass `--use-agent` to GPG binary?
- @type use_agent: bool
- @param keyring: Path for the keyring to use.
- @type keyring: str
+ :param verbose: Should some verbose info be output?
+ :type verbose: bool
+ :param use_agent: Should pass `--use-agent` to GPG binary?
+ :type use_agent: bool
+ :param keyring: Path for the keyring to use.
+ :type keyring: str
@options: A list of additional options to pass to the GPG binary.
- @type options: list
+ :type options: list
@raise: RuntimeError with explanation message if there is a problem
invoking gpg.
"""
Find user's key based on their email.
- @param email: Email address of key being searched for.
- @type email: str
- @param secret: Should we search for a secret key?
- @type secret: bool
+ :param email: Email address of key being searched for.
+ :type email: str
+ :param secret: Should we search for a secret key?
+ :type secret: bool
- @return: The fingerprint of the found key.
- @rtype: str
+ :return: The fingerprint of the found key.
+ :rtype: str
"""
for key in self.list_keys(secret=secret):
for uid in key['uids']:
"""
Find user's key based on a subkey fingerprint.
- @param email: Subkey fingerprint of the key being searched for.
- @type email: str
- @param secret: Should we search for a secret key?
- @type secret: bool
+ :param email: Subkey fingerprint of the key being searched for.
+ :type email: str
+ :param secret: Should we search for a secret key?
+ :type secret: bool
- @return: The fingerprint of the found key.
- @rtype: str
+ :return: The fingerprint of the found key.
+ :rtype: str
"""
for key in self.list_keys(secret=secret):
for sub in key['subkeys']:
"""
Find user's key based on the key ID.
- @param email: The key ID of the key being searched for.
- @type email: str
- @param secret: Should we search for a secret key?
- @type secret: bool
+ :param email: The key ID of the key being searched for.
+ :type email: str
+ :param secret: Should we search for a secret key?
+ :type secret: bool
- @return: The fingerprint of the found key.
- @rtype: str
+ :return: The fingerprint of the found key.
+ :rtype: str
"""
for key in self.list_keys(secret=secret):
if keyid == key['keyid']:
"""
Find user's key based on the key fingerprint.
- @param email: The fingerprint of the key being searched for.
- @type email: str
- @param secret: Should we search for a secret key?
- @type secret: bool
+ :param email: The fingerprint of the key being searched for.
+ :type email: str
+ :param secret: Should we search for a secret key?
+ :type secret: bool
- @return: The fingerprint of the found key.
- @rtype: str
+ :return: The fingerprint of the found key.
+ :rtype: str
"""
for key in self.list_keys(secret=secret):
if fingerprint == key['fingerprint']:
"""
Encrypt data using GPG.
- @param data: The data to be encrypted.
- @type data: str
- @param recipient: The address of the public key to be used.
- @type recipient: str
- @param sign: Should the encrypted content be signed?
- @type sign: bool
- @param always_trust: Skip key validation and assume that used keys
+ :param data: The data to be encrypted.
+ :type data: str
+ :param recipient: The address of the public key to be used.
+ :type recipient: str
+ :param sign: Should the encrypted content be signed?
+ :type sign: bool
+ :param always_trust: Skip key validation and assume that used keys
are always fully trusted?
- @type always_trust: bool
- @param passphrase: The passphrase to be used if symmetric encryption
+ :type always_trust: bool
+ :param passphrase: The passphrase to be used if symmetric encryption
is desired.
- @type passphrase: str
- @param symmetric: Should we encrypt to a password?
- @type symmetric: bool
+ :type passphrase: str
+ :param symmetric: Should we encrypt to a password?
+ :type symmetric: bool
- @return: An object with encrypted result in the `data` field.
- @rtype: gnupg.Crypt
+ :return: An object with encrypted result in the `data` field.
+ :rtype: gnupg.Crypt
"""
# TODO: devise a way so we don't need to "always trust".
return gnupg.GPG.encrypt(self, data, recipient, sign=sign,
"""
Decrypt data using GPG.
- @param data: The data to be decrypted.
- @type data: str
- @param always_trust: Skip key validation and assume that used keys
+ :param data: The data to be decrypted.
+ :type data: str
+ :param always_trust: Skip key validation and assume that used keys
are always fully trusted?
- @type always_trust: bool
- @param passphrase: The passphrase to be used if symmetric encryption
+ :type always_trust: bool
+ :param passphrase: The passphrase to be used if symmetric encryption
is desired.
- @type passphrase: str
+ :type passphrase: str
- @return: An object with decrypted result in the `data` field.
- @rtype: gnupg.Crypt
+ :return: An object with decrypted result in the `data` field.
+ :rtype: gnupg.Crypt
"""
# TODO: devise a way so we don't need to "always trust".
return gnupg.GPG.decrypt(self, data, always_trust=always_trust,
"""
Send keys to a keyserver
- @param keyserver: The keyserver to send the keys to.
- @type keyserver: str
- @param keyids: The key ids to send.
- @type keyids: list
+ :param keyserver: The keyserver to send the keys to.
+ :type keyserver: str
+ :param keyids: The key ids to send.
+ :type keyids: list
- @return: A list of keys sent to server.
- @rtype: gnupg.ListKeys
+ :return: A list of keys sent to server.
+ :rtype: gnupg.ListKeys
"""
# TODO: write tests for this.
# TODO: write a SendKeys class to handle status for this.
"""
Encrypt the message read from the file-like object 'file'.
- @param file: The file to be encrypted.
- @type data: file
- @param recipient: The address of the public key to be used.
- @type recipient: str
- @param sign: Should the encrypted content be signed?
- @type sign: bool
- @param always_trust: Skip key validation and assume that used keys
+ :param file: The file to be encrypted.
+ :type data: file
+ :param recipient: The address of the public key to be used.
+ :type recipient: str
+ :param sign: Should the encrypted content be signed?
+ :type sign: bool
+ :param always_trust: Skip key validation and assume that used keys
are always fully trusted?
- @type always_trust: bool
- @param passphrase: The passphrase to be used if symmetric encryption
+ :type always_trust: bool
+ :param passphrase: The passphrase to be used if symmetric encryption
is desired.
- @type passphrase: str
- @param armor: Create ASCII armored output?
- @type armor: bool
- @param output: Path of file to write results in.
- @type output: str
- @param symmetric: Should we encrypt to a password?
- @type symmetric: bool
- @param cipher_algo: Algorithm to use.
- @type cipher_algo: str
-
- @return: An object with encrypted result in the `data` field.
- @rtype: gnupg.Crypt
+ :type passphrase: str
+ :param armor: Create ASCII armored output?
+ :type armor: bool
+ :param output: Path of file to write results in.
+ :type output: str
+ :param symmetric: Should we encrypt to a password?
+ :type symmetric: bool
+ :param cipher_algo: Algorithm to use.
+ :type cipher_algo: str
+
+ :return: An object with encrypted result in the `data` field.
+ :rtype: gnupg.Crypt
"""
args = ['--encrypt']
if symmetric:
"""
List the sequence of packets.
- @param data: The data to extract packets from.
- @type data: str
+ :param data: The data to extract packets from.
+ :type data: str
- @return: An object with packet info.
- @rtype ListPackets
+ :return: An object with packet info.
+ :rtype ListPackets
"""
args = ["--list-packets"]
result = self.result_map['list-packets'](self)
"""
Return the key to which data is encrypted to.
- @param data: The data to be examined.
- @type data: str
+ :param data: The data to be examined.
+ :type data: str
- @return: The fingerprint of the key to which data is encrypted to.
- @rtype: str
+ :return: The fingerprint of the key to which data is encrypted to.
+ :rtype: str
"""
# TODO: make this support multiple keys.
result = self.list_packets(data)
"""
Say whether some chunk of data is encrypted to a symmetric key.
- @param data: The data to be examined.
- @type data: str
+ :param data: The data to be examined.
+ :type data: str
- @return: Whether data is encrypted to a symmetric key.
- @rtype: bool
+ :return: Whether data is encrypted to a symmetric key.
+ :rtype: bool
"""
result = self.list_packets(data)
return bool(result.need_passphrase_sym)
"""
Say whether some chunk of data is encrypted to a private key.
- @param data: The data to be examined.
- @type data: str
+ :param data: The data to be examined.
+ :type data: str
- @return: Whether data is encrypted to a private key.
- @rtype: bool
+ :return: Whether data is encrypted to a private key.
+ :rtype: bool
"""
result = self.list_packets(data)
return bool(result.key)
"""
Say whether some chunk of data is encrypted to a key.
- @param data: The data to be examined.
- @type data: str
+ :param data: The data to be examined.
+ :type data: str
- @return: Whether data is encrypted to a key.
- @rtype: bool
+ :return: Whether data is encrypted to a key.
+ :rtype: bool
"""
return self.is_encrypted_asym(data) or self.is_encrypted_sym(data)
"""
Return whether the given C{address} is in the form user@provider.
- @param address: The address to be tested.
- @type address: str
- @return: Whether C{address} is in the form user@provider.
- @rtype: bool
+ :param address: The address to be tested.
+ :type address: str
+ :return: Whether C{address} is in the form user@provider.
+ :rtype: bool
"""
return bool(re.match('[\w.-]+@[\w.-]+', address))
"""
Build an C{kClass} key bound to C{address} based on info in C{kdict}.
- @param address: The address bound to the key.
- @type address: str
- @param kdict: Dictionary with key data.
- @type kdict: dict
- @return: An instance of the key.
- @rtype: C{kClass}
+ :param address: The address bound to the key.
+ :type address: str
+ :param kdict: Dictionary with key data.
+ :type kdict: dict
+ :return: An instance of the key.
+ :rtype: C{kClass}
"""
leap_assert(
address == kdict[KEY_ADDRESS_KEY],
Return the document id for the document containing a key for
C{address}.
- @param address: The type of the key.
- @type address: KeyType
- @param address: The address bound to the key.
- @type address: str
- @param private: Whether the key is private or not.
- @type private: bool
- @return: The document id for the document that stores a key bound to
+ :param address: The type of the key.
+ :type address: KeyType
+ :param address: The address bound to the key.
+ :type address: str
+ :param private: Whether the key is private or not.
+ :type private: bool
+ :return: The document id for the document that stores a key bound to
C{address}.
- @rtype: str
+ :rtype: str
"""
leap_assert(is_address(address), "Wrong address format: %s" % address)
ktype = str(ktype)
"""
Return a JSON string describing this key.
- @return: The JSON string describing this key.
- @rtype: str
+ :return: The JSON string describing this key.
+ :rtype: str
"""
return json.dumps({
KEY_ADDRESS_KEY: self.address,
"""
Initialize this Encryption Scheme.
- @param soledad: A Soledad instance for local storage of keys.
- @type soledad: leap.soledad.Soledad
+ :param soledad: A Soledad instance for local storage of keys.
+ :type soledad: leap.soledad.Soledad
"""
self._soledad = soledad
"""
Get key from local storage.
- @param address: The address bound to the key.
- @type address: str
- @param private: Look for a private key instead of a public one?
- @type private: bool
+ :param address: The address bound to the key.
+ :type address: str
+ :param private: Look for a private key instead of a public one?
+ :type private: bool
- @return: The key bound to C{address}.
- @rtype: EncryptionKey
+ :return: The key bound to C{address}.
+ :rtype: EncryptionKey
@raise KeyNotFound: If the key was not found on local storage.
"""
pass
"""
Put a key in local storage.
- @param key: The key to be stored.
- @type key: EncryptionKey
+ :param key: The key to be stored.
+ :type key: EncryptionKey
"""
pass
"""
Generate a new key.
- @param address: The address bound to the key.
- @type address: str
+ :param address: The address bound to the key.
+ :type address: str
- @return: The key bound to C{address}.
- @rtype: EncryptionKey
+ :return: The key bound to C{address}.
+ :rtype: EncryptionKey
"""
pass
"""
Remove C{key} from storage.
- @param key: The key to be removed.
- @type key: EncryptionKey
+ :param key: The key to be removed.
+ :type key: EncryptionKey
"""
pass
Returns a unitary gpg wrapper that implements context manager
protocol.
- @param key_data: ASCII armored key data.
- @type key_data: str
+ :param key_data: ASCII armored key data.
+ :type key_data: str
- @return: a GPGWrapper instance
- @rtype: GPGWrapper
+ :return: a GPGWrapper instance
+ :rtype: GPGWrapper
"""
# TODO do here checks on key_data
return TempGPGWrapper(keys=keys)
"""
def __init__(self, keys=None):
"""
- @param keys: OpenPGP key, or list of.
- @type keys: OpenPGPKey or list of OpenPGPKeys
+ :param keys: OpenPGP key, or list of.
+ :type keys: OpenPGPKey or list of OpenPGPKeys
"""
self._gpg = None
if not keys:
"""
Calls the unitary gpgwrapper initializer
- @return: A GPG wrapper with a unitary keyring.
- @rtype: gnupg.GPG
+ :return: A GPG wrapper with a unitary keyring.
+ :rtype: gnupg.GPG
"""
self._build_keyring()
return self._gpg
"""
Create an empty GPG keyring and import C{keys} into it.
- @param keys: List of keys to add to the keyring.
- @type keys: list of OpenPGPKey
+ :param keys: List of keys to add to the keyring.
+ :type keys: list of OpenPGPKey
- @return: A GPG wrapper with a unitary keyring.
- @rtype: gnupg.GPG
+ :return: A GPG wrapper with a unitary keyring.
+ :rtype: gnupg.GPG
"""
privkeys = [key for key in self._keys if key and key.private is True]
publkeys = [key for key in self._keys if key and key.private is False]
"""
Encrypt C{data} using public @{key} and sign with C{sign} key.
- @param data: The data to be encrypted.
- @type data: str
- @param pubkey: The key used to encrypt.
- @type pubkey: OpenPGPKey
- @param sign: The key used for signing.
- @type sign: OpenPGPKey
+ :param data: The data to be encrypted.
+ :type data: str
+ :param pubkey: The key used to encrypt.
+ :type pubkey: OpenPGPKey
+ :param sign: The key used for signing.
+ :type sign: OpenPGPKey
- @return: The encrypted data.
- @rtype: str
+ :return: The encrypted data.
+ :rtype: str
"""
leap_assert_type(key, OpenPGPKey)
leap_assert(key.private is False, 'Key is not public.')
"""
Decrypt C{data} using private @{key} and verify with C{verify} key.
- @param data: The data to be decrypted.
- @type data: str
- @param privkey: The key used to decrypt.
- @type privkey: OpenPGPKey
- @param verify: The key used to verify a signature.
- @type verify: OpenPGPKey
+ :param data: The data to be decrypted.
+ :type data: str
+ :param privkey: The key used to decrypt.
+ :type privkey: OpenPGPKey
+ :param verify: The key used to verify a signature.
+ :type verify: OpenPGPKey
- @return: The decrypted data.
- @rtype: str
+ :return: The decrypted data.
+ :rtype: str
@raise InvalidSignature: Raised if unable to verify the signature with
C{verify} key.
"""
Return whether C{data} was encrypted using OpenPGP.
- @param data: The data we want to know about.
- @type data: str
+ :param data: The data we want to know about.
+ :type data: str
- @return: Whether C{data} was encrypted using this wrapper.
- @rtype: bool
+ :return: Whether C{data} was encrypted using this wrapper.
+ :rtype: bool
"""
return lambda gpg: gpg.is_encrypted(data)
"""
Return whether C{data} was asymmetrically encrypted using OpenPGP.
- @param data: The data we want to know about.
- @type data: str
+ :param data: The data we want to know about.
+ :type data: str
- @return: Whether C{data} was encrypted using this wrapper.
- @rtype: bool
+ :return: Whether C{data} was encrypted using this wrapper.
+ :rtype: bool
"""
return lambda gpg: gpg.is_encrypted_asym(data)
"""
Sign C{data} with C{privkey}.
- @param data: The data to be signed.
- @type data: str
+ :param data: The data to be signed.
+ :type data: str
- @param privkey: The private key to be used to sign.
- @type privkey: OpenPGPKey
+ :param privkey: The private key to be used to sign.
+ :type privkey: OpenPGPKey
- @return: The ascii-armored signed data.
- @rtype: str
+ :return: The ascii-armored signed data.
+ :rtype: str
"""
leap_assert_type(privkey, OpenPGPKey)
leap_assert(privkey.private is True)
"""
Verify signed C{data} with C{pubkey}.
- @param data: The data to be verified.
- @type data: str
+ :param data: The data to be verified.
+ :type data: str
- @param pubkey: The public key to be used on verification.
- @type pubkey: OpenPGPKey
+ :param pubkey: The public key to be used on verification.
+ :type pubkey: OpenPGPKey
- @return: The ascii-armored signed data.
- @rtype: str
+ :return: The ascii-armored signed data.
+ :rtype: str
"""
leap_assert_type(key, OpenPGPKey)
leap_assert(key.private is False)
ASCII armored GPG key data has to be queried independently in this
wrapper, so we receive it in C{key_data}.
- @param address: The address bound to the key.
- @type address: str
- @param key: Key obtained from GPG storage.
- @type key: dict
- @param key_data: Key data obtained from GPG storage.
- @type key_data: str
- @return: An instance of the key.
- @rtype: OpenPGPKey
+ :param address: The address bound to the key.
+ :type address: str
+ :param key: Key obtained from GPG storage.
+ :type key: dict
+ :param key_data: Key data obtained from GPG storage.
+ :type key_data: str
+ :return: An instance of the key.
+ :rtype: OpenPGPKey
"""
return OpenPGPKey(
address,
"""
Initialize the OpenPGP wrapper.
- @param soledad: A Soledad instance for key storage.
- @type soledad: leap.soledad.Soledad
+ :param soledad: A Soledad instance for key storage.
+ :type soledad: leap.soledad.Soledad
"""
EncryptionScheme.__init__(self, soledad)
"""
Generate an OpenPGP keypair bound to C{address}.
- @param address: The address bound to the key.
- @type address: str
- @return: The key bound to C{address}.
- @rtype: OpenPGPKey
+ :param address: The address bound to the key.
+ :type address: str
+ :return: The key bound to C{address}.
+ :rtype: OpenPGPKey
@raise KeyAlreadyExists: If key already exists in local database.
"""
# make sure the key does not already exist
"""
Get key bound to C{address} from local storage.
- @param address: The address bound to the key.
- @type address: str
- @param private: Look for a private key instead of a public one?
- @type private: bool
+ :param address: The address bound to the key.
+ :type address: str
+ :param private: Look for a private key instead of a public one?
+ :type private: bool
- @return: The key bound to C{address}.
- @rtype: OpenPGPKey
+ :return: The key bound to C{address}.
+ :rtype: OpenPGPKey
@raise KeyNotFound: If the key was not found on local storage.
"""
leap_assert(is_address(address), 'Not an user address: %s' % address)
"""
Put key contained in ascii-armored C{key_data} in local storage.
- @param key_data: The key data to be stored.
- @type key_data: str
+ :param key_data: The key data to be stored.
+ :type key_data: str
"""
leap_assert_type(key_data, str)
# TODO: add more checks for correct key data.
"""
Put C{key} in local storage.
- @param key: The key to be stored.
- @type key: OpenPGPKey
+ :param key: The key to be stored.
+ :type key: OpenPGPKey
"""
doc = self._get_key_doc(key.address, private=key.private)
if doc is None:
If C{private} is True, looks for a private key instead of a public.
- @param address: The address bound to the key.
- @type address: str
- @param private: Whether to look for a private key.
- @type private: bool
- @return: The document with the key or None if it does not exist.
- @rtype: leap.soledad.backends.leap_backend.LeapDocument
+ :param address: The address bound to the key.
+ :type address: str
+ :param private: Whether to look for a private key.
+ :type private: bool
+ :return: The document with the key or None if it does not exist.
+ :rtype: leap.soledad.backends.leap_backend.LeapDocument
"""
return self._soledad.get_doc(
keymanager_doc_id(OpenPGPKey, address, private))
"""
Remove C{key} from storage.
- @param key: The key to be removed.
- @type key: EncryptionKey
+ :param key: The key to be removed.
+ :type key: EncryptionKey
"""
leap_assert(key.__class__ is OpenPGPKey, 'Wrong key type.')
stored_key = self.get_key(key.address, private=key.private)
Raises NotImplementedError for this platform
if do_raise is True
- @param do_raise: flag to actually raise exception
- @type do_raise: bool
+ :param do_raise: flag to actually raise exception
+ :type do_raise: bool
"""
if do_raise:
raise NotImplementedError(
prepending the temporal dir associated with this
TestCase
- @param filename: the filename
- @type filename: str
+ :param filename: the filename
+ :type filename: str
"""
return os.path.join(self.tempdir, filename)
Touches a filepath, creating folders along
the way if needed.
- @param filepath: path to be touched
- @type filepath: str
+ :param filepath: path to be touched
+ :type filepath: str
"""
folder, filename = os.path.split(filepath)
if not os.path.isdir(folder):
"""
Chmods 600 a file
- @param filepath: filepath to be chmodded
- @type filepath: str
+ :param filepath: filepath to be chmodded
+ :type filepath: str
"""
check_and_fix_urw_only(filepath)
"""
Runs a given TestCase
- @param testcase: the testcase
- @type testcase: unittest.TestCase
+ :param testcase: the testcase
+ :type testcase: unittest.TestCase
"""
if not testcase:
return None