# License? """A U1DB implementation for using Object Stores as its persistence layer.""" import os import string import random import hmac from leap.soledad.backends import sqlcipher from leap.soledad.util import GPGWrapper class Soledad(object): # paths PREFIX = os.environ['HOME'] + '/.config/leap/soledad' SECRET_PATH = PREFIX + '/secret.gpg' GNUPG_HOME = PREFIX + '/gnupg' LOCAL_DB_PATH = PREFIX + '/soledad.u1db' # other configs SECRET_LENGTH = 50 def __init__(self, user_email, gpghome=None, initialize=True): self._user_email = user_email if not os.path.isdir(self.PREFIX): os.makedirs(self.PREFIX) if not gpghome: gpghome = self.GNUPG_HOME self._gpg = GPGWrapper(gpghome=gpghome) if initialize: self._initialize() def _initialize(self): # load/generate OpenPGP keypair if not self._has_openpgp_keypair(): self._gen_openpgp_keypair() self._load_openpgp_keypair() # load/generate secret if not self._has_secret(): self._gen_secret() self._load_secret() # instantiate u1db # TODO: verify if secret for sqlcipher should be the same as the one # for symmetric encryption. self._db = sqlcipher.open(self.LOCAL_DB_PATH, True, self._secret, soledad=self) def close(self): self._db.close() #------------------------------------------------------------------------- # Management of secret for symmetric encryption #------------------------------------------------------------------------- def _has_secret(self): """ Verify if secret for symmetric encryption exists on local encrypted file. """ # TODO: verify if file is a GPG-encrypted file and if we have the # corresponding private key for decryption. if os.path.isfile(self.SECRET_PATH): return True return False def _load_secret(self): """ Load secret for symmetric encryption from local encrypted file. """ try: with open(self.SECRET_PATH) as f: self._secret = str(self._gpg.decrypt(f.read())) except IOError: raise IOError('Failed to open secret file %s.' % self.SECRET_PATH) def _gen_secret(self): """ Generate a secret for symmetric encryption and store in a local encrypted file. """ self._secret = ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(self.SECRET_LENGTH)) ciphertext = self._gpg.encrypt(self._secret, self._fingerprint, self._fingerprint) f = open(self.SECRET_PATH, 'w') f.write(str(ciphertext)) f.close() #------------------------------------------------------------------------- # Management of OpenPGP keypair #------------------------------------------------------------------------- def _has_openpgp_keypair(self): """ Verify if there exists an OpenPGP keypair for this user. """ # TODO: verify if we have the corresponding private key. try: self._gpg.find_key(self._user_email) return True except LookupError: return False def _gen_openpgp_keypair(self): """ Generate an OpenPGP keypair for this user. """ params = self._gpg.gen_key_input( key_type='RSA', key_length=4096, name_real=self._user_email, name_email=self._user_email, name_comment='Generated by LEAP Soledad.') self._gpg.gen_key(params) def _load_openpgp_keypair(self): """ Find fingerprint for this user's OpenPGP keypair. """ self._fingerprint = self._gpg.find_key(self._user_email)['fingerprint'] def publish_pubkey(self, keyserver): """ Publish OpenPGP public key to a keyserver. """ # TODO: this has to talk to LEAP's Nickserver. pass #------------------------------------------------------------------------- # Data encryption and decryption #------------------------------------------------------------------------- def encrypt(self, data, sign=None, passphrase=None, symmetric=False): """ Encrypt data. """ return str(self._gpg.encrypt(data, self._fingerprint, sign=sign, passphrase=passphrase, symmetric=symmetric)) def encrypt_symmetric(self, doc_id, data, sign=None): """ Encrypt data using symmetric secret. """ h = hmac.new(self._secret, doc_id).hexdigest() return self.encrypt(data, sign=sign, passphrase=h, symmetric=True) def decrypt(self, data, passphrase=None, symmetric=False): """ Decrypt data. """ return str(self._gpg.decrypt(data, passphrase=passphrase)) def decrypt_symmetric(self, doc_id, data): """ Decrypt data using symmetric secret. """ h = hmac.new(self._secret, doc_id).hexdigest() return self.decrypt(data, passphrase=h) #------------------------------------------------------------------------- # Document storage, retrieval and sync #------------------------------------------------------------------------- def put_doc(self, doc): """ Update a document in the local encrypted database. """ return self._db.put_doc(doc) def delete_doc(self, doc): """ Delete a document from the local encrypted database. """ return self._db.delete_doc(doc) def get_doc(self, doc_id, include_deleted=False): """ Retrieve a document from the local encrypted database. """ return self._db.get_doc(doc_id, include_deleted=include_deleted) def get_docs(self, doc_ids, check_for_conflicts=True, include_deleted=False): """ Get the content for many documents. """ return self._db.get_docs(doc_ids, check_for_conflicts=check_for_conflicts, include_deleted=include_deleted) def create_doc(self, content, doc_id=None): """ Create a new document in the local encrypted database. """ return self._db.create_doc(content, doc_id=doc_id) def get_doc_conflicts(self, doc_id): """ Get the list of conflicts for the given document. """ return self._db.get_doc_conflicts(doc_id) def resolve_doc(self, doc, conflicted_doc_revs): """ Mark a document as no longer conflicted. """ return self._db.resolve_doc(doc, conflicted_doc_revs) def sync(self, url): """ Synchronize the local encrypted database with LEAP server. """ # TODO: create authentication scheme for sync with server. return self._db.sync(url, creds=None, autocreate=True) __all__ = ['util']