# Copyright 2008 The Tor Project, Inc. See LICENSE for licensing information. # These require PyCrypto. import Crypto.PublicKey.RSA import Crypto.Hash.SHA256 import Crypto.Cipher.AES import cPickle as pickle import binascii import logging import os import struct import sys import getpass import thandy.formats import thandy.util json = thandy.util.importJSON() class PublicKey: """Abstract base class for public keys.""" def __init__(self): self._roles = [] def format(self): raise NotImplemented() def sign(self, data): # returns a list of method,signature tuples. raise NotImplemented() def checkSignature(self, method, data, signature): # returns True, False, or raises UnknownMethod. raise NotImplemented() def getKeyID(self): raise NotImplemented() def getRoles(self): """Return a list of all roles supported by this key. A role is a doctype,pathPattern tuple. """ return self._roles def addRole(self, role, path): """Add a role to the list of roles supported by this key. A role is a permission to sign a given kind of document (one of thandy.format.ALL_ROLES) at a given set of relative paths. """ assert role in thandy.formats.ALL_ROLES if (role, path) not in self._roles: self._roles.append((role, path)) def clearRoles(self): """Remove all roles from this key.""" del self._roles[:] def hasRole(self, role, path): """Return true iff this key has a role that allows it to sign a document of type 'role' at location in the repository 'path'. """ for r, p in self._roles: if r == role and thandy.formats.rolePathMatches(p, path): return True return False if hex(1L).upper() == "0X1L": def intToBinary(number): """Convert an int or long into a big-endian series of bytes. """ # This "convert-to-hex, then use binascii" approach may look silly, # but it's over 10x faster than the Crypto.Util.number approach. h = hex(long(number)) h = h[2:-1] if len(h)%2: h = "0"+h return binascii.a2b_hex(h) elif hex(1L).upper() == "0X1": def intToBinary(number): "Variant for future versions of pythons that don't append 'L'." h = hex(long(number)) h = h[2:] if len(h)%2: h = "0"+h return binascii.a2b_hex(h) else: import Crypto.Util.number intToBinary = Crypto.Util.number.long_to_bytes assert None def binaryToInt(binary): """Convert a big-endian series of bytes into a long. """ return long(binascii.b2a_hex(binary), 16) def intToBase64(number): """Convert an int or long to a big-endian base64-encoded value.""" return thandy.formats.formatBase64(intToBinary(number)) def base64ToInt(number): """Convert a big-endian base64-encoded value to a long.""" return binaryToInt(thandy.formats.parseBase64(number)) def _pkcs1_padding(m, size): # I'd rather use OAEP+, but apparently PyCrypto barely supports # signature verification, and doesn't seem to support signature # verification with nondeterministic padding. "argh." s = [ "\x00\x01", "\xff"* (size-3-len(m)), "\x00", m ] r = "".join(s) return r def _xor(a,b): if a: return not b else: return b class RSAKey(PublicKey): """ >>> k = RSAKey.generate(bits=512) >>> obj = k.format() >>> obj['_keytype'] 'rsa' >>> base64ToInt(obj['e']) 65537L >>> k1 = RSAKey.fromJSon(obj) >>> k1.key.e == k.key.e True >>> k1.key.n == k.key.n True >>> k.getKeyID() == k1.getKeyID() True >>> s = { 'A B C' : "D", "E" : [ "F", "g", 99] } >>> method, sig = k.sign(obj=s) >>> k.checkSignature(method, sig, obj=s) True >>> s2 = [ s ] >>> k.checkSignature(method, sig, obj=s2) False """ def __init__(self, key): PublicKey.__init__(self) self.key = key self.keyid = None @staticmethod def generate(bits=2048): """Generate a new RSA key, with modulus length 'bits'.""" key = Crypto.PublicKey.RSA.generate(bits=bits, randfunc=os.urandom) return RSAKey(key) @staticmethod def fromJSon(obj): """Construct an RSA key from the output of the format() method. """ # obj must match RSAKEY_SCHEMA thandy.formats.RSAKEY_SCHEMA.checkMatch(obj) n = base64ToInt(obj['n']) e = base64ToInt(obj['e']) if obj.has_key('d'): d = base64ToInt(obj['d']) p = base64ToInt(obj['p']) q = base64ToInt(obj['q']) u = base64ToInt(obj['u']) key = Crypto.PublicKey.RSA.construct((n, e, d, p, q, u)) else: key = Crypto.PublicKey.RSA.construct((n, e)) result = RSAKey(key) if obj.has_key('roles'): for r, p in obj['roles']: result.addRole(r,p) return result def isPrivateKey(self): """Return true iff this key has private-key components""" return hasattr(self.key, 'd') def format(self, private=False, includeRoles=False): """Return a new object to represent this key in json format. If 'private', include private-key data. If 'includeRoles', include role information. """ n = intToBase64(self.key.n) e = intToBase64(self.key.e) result = { '_keytype' : 'rsa', 'e' : e, 'n' : n } if private: result['d'] = intToBase64(self.key.d) result['p'] = intToBase64(self.key.p) result['q'] = intToBase64(self.key.q) result['u'] = intToBase64(self.key.u) if includeRoles: result['roles'] = self.getRoles() return result def getKeyID(self): """Return the KeyID for this key. """ if self.keyid == None: d_obj = Crypto.Hash.SHA256.new() thandy.formats.getDigest(self.format(), d_obj) self.keyid = thandy.formats.formatHash(d_obj.digest()) return self.keyid def sign(self, obj=None, digest=None): assert _xor(obj == None, digest == None) method = "sha256-pkcs1" if digest == None: digest = thandy.formats.getDigest(obj) m = _pkcs1_padding(digest, (self.key.size()+1) // 8) sig = intToBase64(self.key.sign(m, "")[0]) return (method, sig) def checkSignature(self, method, sig, obj=None, digest=None): assert _xor(obj == None, digest == None) if method != "sha256-pkcs1": raise UnknownMethod(method) if digest == None: digest = thandy.formats.getDigest(obj) sig = base64ToInt(sig) m = _pkcs1_padding(digest, (self.key.size()+1) // 8) return bool(self.key.verify(m, (sig,))) SALTLEN=16 def secretToKey(salt, secret): """Convert 'secret' to a 32-byte key, using a version of the algorithm from RFC2440. The salt must be SALTLEN+1 bytes long, and should be random, except for the last byte, which encodes how time- consuming the computation should be. (The goal is to make offline password-guessing attacks harder by increasing the time required to convert a password to a key, and to make precomputed password tables impossible to generate by ) """ assert len(salt) == SALTLEN+1 # The algorithm is basically, 'call the last byte of the salt the # "difficulty", and all other bytes of the salt S. Now make # an infinite stream of S|secret|S|secret|..., and hash the # first N bytes of that, where N is determined by the difficulty. # # Obviously, this wants a hash algorithm that's tricky to # parallelize. # # Unlike RFC2440, we use a 16-byte salt. Because CPU times # have improved, we start at 16 times the previous minimum. difficulty = ord(salt[-1]) count = (16L+(difficulty & 15)) << ((difficulty >> 4) + 10) # Make 'data' nice and long, so that we don't need to call update() # a zillion times. data = salt[:-1]+secret if len(data)<1024: data *= (1024 // len(data))+1 d = Crypto.Hash.SHA256.new() iters, leftover = divmod(count, len(data)) for _ in xrange(iters): d.update(data) #count -= len(data) if leftover: d.update(data[:leftover]) #count -= leftover #assert count == 0 return d.digest() def encryptSecret(secret, password, difficulty=0x80): """Encrypt the secret 'secret' using the password 'password', and return the encrypted result.""" # The encrypted format is: # "GKEY1" -- 5 octets, fixed, denotes data format. # SALT -- 17 bytes, used to hash password # IV -- 16 bytes; salt for encryption # ENCRYPTED IN AES256-OFB, using a key=s2k(password, salt) and IV=IV: # SLEN -- 4 bytes; length of secret, big-endian. # SECRET -- len(secret) bytes # D -- 32 bytes; SHA256 hash of (salt|secret|salt). # # This format leaks the secret length, obviously. # # If the secret started out in unicode, we encode it using UTF-8 # and prepend the string "utf-8:" before we begin encryption. assert 0 <= difficulty < 256 salt = os.urandom(SALTLEN)+chr(difficulty) key = secretToKey(salt, password) if isinstance(secret, unicode): secret = "utf-8:"+secret.encode("utf-8") d_obj = Crypto.Hash.SHA256.new() d_obj.update(salt) d_obj.update(secret) d_obj.update(salt) d = d_obj.digest() iv = os.urandom(16) e = Crypto.Cipher.AES.new(key, Crypto.Cipher.AES.MODE_OFB, iv) # Stupidly, pycrypto doesn't accept that stream ciphers don't need to # take their input in blocks. So pad it, then ignore the padded output. padlen = 16-((len(secret)+len(d)+4) % 16) if padlen == 16: padlen = 0 pad = '\x00' * padlen slen = struct.pack("!L",len(secret)) encrypted = e.encrypt("%s%s%s%s" % (slen, secret, d, pad)) if padlen: encrypted = encrypted[:-padlen] return "GKEY1%s%s%s"%(salt, iv, encrypted) def decryptSecret(encrypted, password): """Decrypt a value encrypted with encryptSecret. Raises UnknownFormat or FormatError if 'encrypted' was not generated with encryptSecret. Raises BadPassword if the password was not correct. """ if encrypted[:5] != "GKEY1": raise thandy.UnknownFormat() encrypted = encrypted[5:] if len(encrypted) < SALTLEN+1+16: raise thandy.FormatException() salt = encrypted[:SALTLEN+1] iv = encrypted[SALTLEN+1:SALTLEN+1+16] encrypted = encrypted[SALTLEN+1+16:] key = secretToKey(salt, password) e = Crypto.Cipher.AES.new(key, Crypto.Cipher.AES.MODE_OFB, iv) padlen = 16-(len(encrypted) % 16) if padlen == 16: padlen = 0 pad = '\x00' * padlen decrypted = e.decrypt("%s%s"%(encrypted,pad)) slen = struct.unpack("!L", decrypted[:4])[0] secret = decrypted[4:4+slen] hash = decrypted[4+slen:4+slen+Crypto.Hash.SHA256.digest_size] d = Crypto.Hash.SHA256.new() d.update(salt) d.update(secret) d.update(salt) if d.digest() != hash: raise thandy.BadPassword() if secret.startswith("utf-8:"): secret = secret[6:].decode("utf-8") return secret class KeyStore(thandy.formats.KeyDB): """Helper to store private keys in an encrypted file.""" def __init__(self, fname, encrypted=True): thandy.formats.KeyDB.__init__(self) self._loaded = None self._fname = fname self._passwd = None self._encrypted = encrypted def getpass(self, reprompt=False): if self._passwd != None: return self._passwd while 1: sys.stderr.write("Password: ") pwd = getpass.getpass("") if not reprompt: return pwd sys.stderr.write("Confirm: ") pwd2 = getpass.getpass("") if pwd == pwd2: return pwd else: print "Mismatch; try again." def load(self, password=None): logging.info("Loading private keys from %r...", self._fname) if not os.path.exists(self._fname): logging.info("...no such file.") self._loaded = True return if password is None and self._encrypted: password = self.getpass() contents = open(self._fname, 'rb').read() if self._encrypted: contents = decryptSecret(contents, password) listOfKeys = json.loads(contents) self._passwd = password # It worked. if not listOfKeys.has_key('keys'): listOfKeys['keys'] = [] for obj in listOfKeys['keys']: key = RSAKey.fromJSon(obj) self.addKey(key) logging.info("Loaded key %s", key.getKeyID()) self._loaded = True def setPassword(self, passwd): self._passwd = passwd def clearPassword(self): self._passwd = None def save(self, password=None): if not self._loaded and self._encrypted: self.load(password) if password is None: password = self.getpass(True) logging.info("Saving private keys into %r...", self._fname) listOfKeys = { 'keys' : [ key.format(private=True, includeRoles=True) for key in self._keys.values() ] } contents = json.dumps(listOfKeys) if self._encrypted: contents = encryptSecret(contents, password) thandy.util.replaceFile(self._fname, contents) self._passwd = password # It worked. logging.info("Done.")