# These require PyCrypto. import Crypto.PublicKey.RSA import Crypto.Hash.SHA256 import Crypto.Cipher.AES import cPickle as pickle import binascii import logging import os import struct import sys import simplejson import getpass import thandy.formats import thandy.util class PublicKey: def __init__(self): # Confusingly, these roles are the ones used for a private key to # remember what we're willing to do with it. self._roles = [] def format(self): raise NotImplemented() def sign(self, data): # returns a list of method,signature tuples. raise NotImplemented() def checkSignature(self, method, data, signature): # returns True, False, or raises UnknownMethod. raise NotImplemented() def getKeyID(self): raise NotImplemented() def getRoles(self): return self._roles def addRole(self, role, path): assert role in thandy.formats.ALL_ROLES self._roles.append((role, path)) def clearRoles(self): del self._roles[:] def hasRole(self, role, path): for r, p in self._roles: if r == role and thandy.formats.rolePathMatches(p, path): return True return False if hex(1L).upper() == "0X1L": def intToBinary(number): """Convert an int or long into a big-endian series of bytes. """ # This "convert-to-hex, then use binascii" approach may look silly, # but it's over 10x faster than the Crypto.Util.number approach. h = hex(long(number)) h = h[2:-1] if len(h)%2: h = "0"+h return binascii.a2b_hex(h) elif hex(1L).upper() == "0X1": def intToBinary(number): h = hex(long(number)) h = h[2:] if len(h)%2: h = "0"+h return binascii.a2b_hex(h) else: import Crypto.Util.number intToBinary = Crypto.Util.number.long_to_bytes assert None def binaryToInt(binary): """Convert a big-endian series of bytes into a long. """ return long(binascii.b2a_hex(binary), 16) def intToBase64(number): return thandy.formats.formatBase64(intToBinary(number)) def base64ToInt(number): return binaryToInt(thandy.formats.parseBase64(number)) def _pkcs1_padding(m, size): # I'd rather use OAEP+, but apparently PyCrypto barely supports # signature verification, and doesn't seem to support signature # verification with nondeterministic padding. "argh." s = [ "\x00\x01", "\xff"* (size-3-len(m)), "\x00", m ] r = "".join(s) return r def _xor(a,b): if a: return not b else: return b class RSAKey(PublicKey): """ >>> k = RSAKey.generate(bits=512) >>> obj = k.format() >>> obj['_keytype'] 'rsa' >>> base64ToInt(obj['e']) 65537L >>> k1 = RSAKey.fromJSon(obj) >>> k1.key.e == k.key.e True >>> k1.key.n == k.key.n True >>> k.getKeyID() == k1.getKeyID() True >>> s = { 'A B C' : "D", "E" : [ "F", "g", 99] } >>> method, sig = k.sign(obj=s) >>> k.checkSignature(method, sig, obj=s) True >>> s2 = [ s ] >>> k.checkSignature(method, sig, obj=s2) False """ def __init__(self, key): PublicKey.__init__(self) self.key = key self.keyid = None @staticmethod def generate(bits=2048): key = Crypto.PublicKey.RSA.generate(bits=bits, randfunc=os.urandom) return RSAKey(key) @staticmethod def fromJSon(obj): # obj must match RSAKEY_SCHEMA thandy.formats.RSAKEY_SCHEMA.checkMatch(obj) n = base64ToInt(obj['n']) e = base64ToInt(obj['e']) if obj.has_key('d'): d = base64ToInt(obj['d']) p = base64ToInt(obj['p']) q = base64ToInt(obj['q']) u = base64ToInt(obj['u']) key = Crypto.PublicKey.RSA.construct((n, e, d, p, q, u)) else: key = Crypto.PublicKey.RSA.construct((n, e)) result = RSAKey(key) if obj.has_key('roles'): for r, p in obj['roles']: result.addRole(r,p) return result def isPrivateKey(self): return hasattr(self.key, 'd') def format(self, private=False, includeRoles=False): n = intToBase64(self.key.n) e = intToBase64(self.key.e) result = { '_keytype' : 'rsa', 'e' : e, 'n' : n } if private: result['d'] = intToBase64(self.key.d) result['p'] = intToBase64(self.key.p) result['q'] = intToBase64(self.key.q) result['u'] = intToBase64(self.key.u) if includeRoles: result['roles'] = self.getRoles() return result def getKeyID(self): if self.keyid == None: d_obj = Crypto.Hash.SHA256.new() thandy.formats.getDigest(self.format(), d_obj) self.keyid = thandy.formats.formatHash(d_obj.digest()) return self.keyid def _digest(self, obj, method=None): if method in (None, "sha256-pkcs1"): d_obj = Crypto.Hash.SHA256.new() thandy.formats.getDigest(obj, d_obj) digest = d_obj.digest() return ("sha256-pkcs1", digest) raise UnknownMethod(method) def sign(self, obj=None, digest=None): assert _xor(obj == None, digest == None) if digest == None: method, digest = self._digest(obj) m = _pkcs1_padding(digest, (self.key.size()+1) // 8) sig = intToBase64(self.key.sign(m, "")[0]) return (method, sig) def checkSignature(self, method, sig, obj=None, digest=None): assert _xor(obj == None, digest == None) if method != "sha256-pkcs1": raise UnknownMethod("method") if digest == None: method, digest = self._digest(obj, method) sig = base64ToInt(sig) m = _pkcs1_padding(digest, (self.key.size()+1) // 8) return bool(self.key.verify(m, (sig,))) SALTLEN=16 def secretToKey(salt, secret): """Convert 'secret' to a 32-byte key, using a version of the algorithm from RFC2440. The salt must be SALTLEN+1 bytes long, and should be random, except for the last byte, which encodes how time- consuming the computation should be. (The goal is to make offline password-guessing attacks harder by increasing the time required to convert a password to a key, and to make precomputed password tables impossible to generate by ) """ assert len(salt) == SALTLEN+1 # The algorithm is basically, 'call the last byte of the salt the # "difficulty", and all other bytes of the salt S. Now make # an infinite stream of S|secret|S|secret|..., and hash the # first N bytes of that, where N is determined by the difficulty. # # Obviously, this wants a hash algorithm that's tricky to # parallelize. # # Unlike RFC2440, we use a 16-byte salt. Because CPU times # have improved, we start at 16 times the previous minimum. difficulty = ord(salt[-1]) count = (16L+(difficulty & 15)) << ((difficulty >> 4) + 10) # Make 'data' nice and long, so that we don't need to call update() # a zillion times. data = salt[:-1]+secret if len(data)<1024: data *= (1024 // len(data))+1 d = Crypto.Hash.SHA256.new() iters, leftover = divmod(count, len(data)) for _ in xrange(iters): d.update(data) #count -= len(data) if leftover: d.update(data[:leftover]) #count -= leftover #assert count == 0 return d.digest() def encryptSecret(secret, password, difficulty=0x80): """Encrypt the secret 'secret' using the password 'password', and return the encrypted result.""" # The encrypted format is: # "GKEY1" -- 5 octets, fixed, denotes data format. # SALT -- 17 bytes, used to hash password # IV -- 16 bytes; salt for encryption # ENCRYPTED IN AES256-OFB, using a key=s2k(password, salt) and IV=IV: # SLEN -- 4 bytes; length of secret, big-endian. # SECRET -- len(secret) bytes # D -- 32 bytes; SHA256 hash of (salt|secret|salt). # # This format leaks the secret length, obviously. assert 0 <= difficulty < 256 salt = os.urandom(SALTLEN)+chr(difficulty) key = secretToKey(salt, password) d_obj = Crypto.Hash.SHA256.new() d_obj.update(salt) d_obj.update(secret) d_obj.update(salt) d = d_obj.digest() iv = os.urandom(16) e = Crypto.Cipher.AES.new(key, Crypto.Cipher.AES.MODE_OFB, iv) # Stupidly, pycrypto doesn't accept that stream ciphers don't need to # take their input in blocks. So pad it, then ignore the padded output. padlen = 16-((len(secret)+len(d)+4) % 16) if padlen == 16: padlen = 0 pad = '\x00' * padlen slen = struct.pack("!L",len(secret)) encrypted = e.encrypt("%s%s%s%s" % (slen, secret, d, pad)) if padlen: encrypted = encrypted[:-padlen] return "GKEY1%s%s%s"%(salt, iv, encrypted) def decryptSecret(encrypted, password): """Decrypt a value encrypted with encryptSecret. Raises UnknownFormat or FormatError if 'encrypted' was not generated with encryptSecret. Raises BadPassword if the password was not correct. """ if encrypted[:5] != "GKEY1": raise thandy.UnknownFormat() encrypted = encrypted[5:] if len(encrypted) < SALTLEN+1+16: raise thandy.FormatException() salt = encrypted[:SALTLEN+1] iv = encrypted[SALTLEN+1:SALTLEN+1+16] encrypted = encrypted[SALTLEN+1+16:] key = secretToKey(salt, password) e = Crypto.Cipher.AES.new(key, Crypto.Cipher.AES.MODE_OFB, iv) padlen = 16-(len(encrypted) % 16) if padlen == 16: padlen = 0 pad = '\x00' * padlen decrypted = e.decrypt("%s%s"%(encrypted,pad)) slen = struct.unpack("!L", decrypted[:4])[0] secret = decrypted[4:4+slen] hash = decrypted[4+slen:4+slen+Crypto.Hash.SHA256.digest_size] d = Crypto.Hash.SHA256.new() d.update(salt) d.update(secret) d.update(salt) if d.digest() != hash: raise thandy.BadPassword() return secret class KeyStore(thandy.formats.KeyDB): def __init__(self, fname, encrypted=True): thandy.formats.KeyDB.__init__(self) self._loaded = None self._fname = fname self._passwd = None self._encrypted = encrypted def getpass(self, reprompt=False): if self._passwd != None: return self._passwd while 1: pwd = getpass.getpass("Password: ", sys.stderr) if not reprompt: return pwd pwd2 = getpass.getpass("Confirm: ", sys.stderr) if pwd == pwd2: return pwd else: print "Mismatch; try again." def load(self, password=None): logging.info("Loading private keys from %r...", self._fname) if not os.path.exists(self._fname): logging.info("...no such file.") self._loaded = True return if password is None and self._encrypted: password = self.getpass() contents = open(self._fname, 'rb').read() if self._encrypted: contents = decryptSecret(contents, password) listOfKeys = simplejson.loads(contents) self._passwd = password # It worked. if not listOfKeys.has_key('keys'): listOfKeys['keys'] = [] for obj in listOfKeys['keys']: key = RSAKey.fromJSon(obj) self.addKey(key) logging.info("Loaded key %s", key.getKeyID()) self._loaded = True def setPassword(self, passwd): self._passwd = passwd def clearPassword(self): self._passwd = None def save(self, password=None): if not self._loaded and self._encrypted: self.load(password) if password is None: password = self.getpass(True) logging.info("Saving private keys into %r...", self._fname) listOfKeys = { 'keys' : [ key.format(private=True, includeRoles=True) for key in self._keys.values() ] } contents = simplejson.dumps(listOfKeys) if self._encrypted: contents = encryptSecret(contents, password) thandy.util.replaceFile(self._fname, contents) self._passwd = password # It worked. logging.info("Done.")