summaryrefslogtreecommitdiff
path: root/client/src/leap/soledad/client/_db/blobs.py
diff options
context:
space:
mode:
Diffstat (limited to 'client/src/leap/soledad/client/_db/blobs.py')
-rw-r--r--client/src/leap/soledad/client/_db/blobs.py554
1 files changed, 0 insertions, 554 deletions
diff --git a/client/src/leap/soledad/client/_db/blobs.py b/client/src/leap/soledad/client/_db/blobs.py
deleted file mode 100644
index 10b90c71..00000000
--- a/client/src/leap/soledad/client/_db/blobs.py
+++ /dev/null
@@ -1,554 +0,0 @@
-# -*- coding: utf-8 -*-
-# _blobs.py
-# Copyright (C) 2017 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-"""
-Clientside BlobBackend Storage.
-"""
-
-from urlparse import urljoin
-
-import binascii
-import os
-import base64
-
-from io import BytesIO
-from functools import partial
-
-from twisted.logger import Logger
-from twisted.enterprise import adbapi
-from twisted.internet import defer
-from twisted.web.client import FileBodyProducer
-
-import treq
-
-from leap.soledad.common.errors import SoledadError
-from leap.common.files import mkdir_p
-
-from .._document import BlobDoc
-from .._crypto import DocInfo
-from .._crypto import BlobEncryptor
-from .._crypto import BlobDecryptor
-from .._http import HTTPClient
-from .._pipes import TruncatedTailPipe
-from .._pipes import PreamblePipe
-
-from . import pragmas
-from . import sqlcipher
-
-
-logger = Logger()
-FIXED_REV = 'ImmutableRevision' # Blob content is immutable
-
-
-class BlobAlreadyExistsError(SoledadError):
- pass
-
-
-class ConnectionPool(adbapi.ConnectionPool):
-
- def insertAndGetLastRowid(self, *args, **kwargs):
- """
- Execute an SQL query and return the last rowid.
-
- See: https://sqlite.org/c3ref/last_insert_rowid.html
- """
- return self.runInteraction(
- self._insertAndGetLastRowid, *args, **kwargs)
-
- def _insertAndGetLastRowid(self, trans, *args, **kw):
- trans.execute(*args, **kw)
- return trans.lastrowid
-
- def blob(self, table, column, irow, flags):
- """
- Open a BLOB for incremental I/O.
-
- Return a handle to the BLOB that would be selected by:
-
- SELECT column FROM table WHERE rowid = irow;
-
- See: https://sqlite.org/c3ref/blob_open.html
-
- :param table: The table in which to lookup the blob.
- :type table: str
- :param column: The column where the BLOB is located.
- :type column: str
- :param rowid: The rowid of the BLOB.
- :type rowid: int
- :param flags: If zero, BLOB is opened for read-only. If non-zero,
- BLOB is opened for RW.
- :type flags: int
-
- :return: A BLOB handle.
- :rtype: pysqlcipher.dbapi.Blob
- """
- return self.runInteraction(self._blob, table, column, irow, flags)
-
- def _blob(self, trans, table, column, irow, flags):
- # TODO: should not use transaction private variable here
- handle = trans._connection.blob(table, column, irow, flags)
- return handle
-
-
-def check_http_status(code):
- if code == 409:
- raise BlobAlreadyExistsError()
- elif code != 200:
- raise SoledadError("Server Error")
-
-
-class DecrypterBuffer(object):
-
- def __init__(self, blob_id, secret, tag):
- self.doc_info = DocInfo(blob_id, FIXED_REV)
- self.secret = secret
- self.tag = tag
- self.preamble_pipe = PreamblePipe(self._make_decryptor)
-
- def _make_decryptor(self, preamble):
- self.decrypter = BlobDecryptor(
- self.doc_info, preamble,
- secret=self.secret,
- armor=False,
- start_stream=False,
- tag=self.tag)
- return TruncatedTailPipe(self.decrypter, tail_size=len(self.tag))
-
- def write(self, data):
- self.preamble_pipe.write(data)
-
- def close(self):
- real_size = self.decrypter.decrypted_content_size
- return self.decrypter._end_stream(), real_size
-
-
-class BlobManager(object):
- """
- Ideally, the decrypting flow goes like this:
-
- - GET a blob from remote server.
- - Decrypt the preamble
- - Allocate a zeroblob in the sqlcipher sink
- - Mark the blob as unusable (ie, not verified)
- - Decrypt the payload incrementally, and write chunks to sqlcipher
- ** Is it possible to use a small buffer for the aes writer w/o
- ** allocating all the memory in openssl?
- - Finalize the AES decryption
- - If preamble + payload verifies correctly, mark the blob as usable
-
- """
-
- def __init__(
- self, local_path, remote, key, secret, user, token=None,
- cert_file=None):
- if local_path:
- mkdir_p(os.path.dirname(local_path))
- self.local = SQLiteBlobBackend(local_path, key)
- self.remote = remote
- self.secret = secret
- self.user = user
- self._client = HTTPClient(user, token, cert_file)
-
- def close(self):
- if hasattr(self, 'local') and self.local:
- return self.local.close()
-
- @defer.inlineCallbacks
- def remote_list(self):
- uri = urljoin(self.remote, self.user + '/')
- data = yield self._client.get(uri)
- defer.returnValue((yield data.json()))
-
- def local_list(self):
- return self.local.list()
-
- @defer.inlineCallbacks
- def send_missing(self):
- our_blobs = yield self.local_list()
- server_blobs = yield self.remote_list()
- missing = [b_id for b_id in our_blobs if b_id not in server_blobs]
- logger.info("Amount of documents missing on server: %s" % len(missing))
- # TODO: Send concurrently when we are able to stream directly from db
- for blob_id in missing:
- fd = yield self.local.get(blob_id)
- logger.info("Upload local blob: %s" % blob_id)
- yield self._encrypt_and_upload(blob_id, fd)
-
- @defer.inlineCallbacks
- def fetch_missing(self):
- # TODO: Use something to prioritize user requests over general new docs
- our_blobs = yield self.local_list()
- server_blobs = yield self.remote_list()
- docs_we_want = [b_id for b_id in server_blobs if b_id not in our_blobs]
- logger.info("Fetching new docs from server: %s" % len(docs_we_want))
- # TODO: Fetch concurrently when we are able to stream directly into db
- for blob_id in docs_we_want:
- logger.info("Fetching new doc: %s" % blob_id)
- yield self.get(blob_id)
-
- @defer.inlineCallbacks
- def put(self, doc, size):
- if (yield self.local.exists(doc.blob_id)):
- error_message = "Blob already exists: %s" % doc.blob_id
- raise BlobAlreadyExistsError(error_message)
- fd = doc.blob_fd
- # TODO this is a tee really, but ok... could do db and upload
- # concurrently. not sure if we'd gain something.
- yield self.local.put(doc.blob_id, fd, size=size)
- # In fact, some kind of pipe is needed here, where each write on db
- # handle gets forwarded into a write on the connection handle
- fd = yield self.local.get(doc.blob_id)
- yield self._encrypt_and_upload(doc.blob_id, fd)
-
- @defer.inlineCallbacks
- def get(self, blob_id):
- local_blob = yield self.local.get(blob_id)
- if local_blob:
- logger.info("Found blob in local database: %s" % blob_id)
- defer.returnValue(local_blob)
-
- result = yield self._download_and_decrypt(blob_id)
-
- if not result:
- defer.returnValue(None)
- blob, size = result
-
- if blob:
- logger.info("Got decrypted blob of type: %s" % type(blob))
- blob.seek(0)
- yield self.local.put(blob_id, blob, size=size)
- defer.returnValue((yield self.local.get(blob_id)))
- else:
- # XXX we shouldn't get here, but we will...
- # lots of ugly error handling possible:
- # 1. retry, might be network error
- # 2. try later, maybe didn't finished streaming
- # 3.. resignation, might be error while verifying
- logger.error('sorry, dunno what happened')
-
- @defer.inlineCallbacks
- def _encrypt_and_upload(self, blob_id, fd):
- # TODO ------------------------------------------
- # this is wrong, is doing 2 stages.
- # the crypto producer can be passed to
- # the uploader and react as data is written.
- # try to rewrite as a tube: pass the fd to aes and let aes writer
- # produce data to the treq request fd.
- # ------------------------------------------------
- logger.info("Staring upload of blob: %s" % blob_id)
- doc_info = DocInfo(blob_id, FIXED_REV)
- uri = urljoin(self.remote, self.user + "/" + blob_id)
- crypter = BlobEncryptor(doc_info, fd, secret=self.secret,
- armor=False)
- fd = yield crypter.encrypt()
- response = yield self._client.put(uri, data=fd)
- check_http_status(response.code)
- logger.info("Finished upload: %s" % (blob_id,))
-
- @defer.inlineCallbacks
- def _download_and_decrypt(self, blob_id):
- logger.info("Staring download of blob: %s" % blob_id)
- # TODO this needs to be connected in a tube
- uri = urljoin(self.remote, self.user + '/' + blob_id)
- data = yield self._client.get(uri)
-
- if data.code == 404:
- logger.warn("Blob not found in server: %s" % blob_id)
- defer.returnValue(None)
- elif not data.headers.hasHeader('Tag'):
- logger.error("Server didn't send a tag header for: %s" % blob_id)
- defer.returnValue(None)
- tag = data.headers.getRawHeaders('Tag')[0]
- tag = base64.urlsafe_b64decode(tag)
- buf = DecrypterBuffer(blob_id, self.secret, tag)
-
- # incrementally collect the body of the response
- yield treq.collect(data, buf.write)
- fd, size = buf.close()
- logger.info("Finished download: (%s, %d)" % (blob_id, size))
- defer.returnValue((fd, size))
-
- @defer.inlineCallbacks
- def delete(self, blob_id):
- logger.info("Staring deletion of blob: %s" % blob_id)
- yield self._delete_from_remote(blob_id)
- if (yield self.local.exists(blob_id)):
- yield self.local.delete(blob_id)
-
- def _delete_from_remote(self, blob_id):
- # TODO this needs to be connected in a tube
- uri = urljoin(self.remote, self.user + '/' + blob_id)
- return self._client.delete(uri)
-
-
-class SQLiteBlobBackend(object):
-
- def __init__(self, path, key=None):
- self.path = os.path.abspath(
- os.path.join(path, 'soledad_blob.db'))
- mkdir_p(os.path.dirname(self.path))
- if not key:
- raise ValueError('key cannot be None')
- backend = 'pysqlcipher.dbapi2'
- opts = sqlcipher.SQLCipherOptions(
- '/tmp/ignored', binascii.b2a_hex(key),
- is_raw_key=True, create=True)
- pragmafun = partial(pragmas.set_init_pragmas, opts=opts)
- openfun = _sqlcipherInitFactory(pragmafun)
-
- self.dbpool = ConnectionPool(
- backend, self.path, check_same_thread=False, timeout=5,
- cp_openfun=openfun, cp_min=1, cp_max=2, cp_name='blob_pool')
-
- def close(self):
- from twisted._threads import AlreadyQuit
- try:
- self.dbpool.close()
- except AlreadyQuit:
- pass
-
- @defer.inlineCallbacks
- def put(self, blob_id, blob_fd, size=None):
- logger.info("Saving blob in local database...")
- insert = 'INSERT INTO blobs (blob_id, payload) VALUES (?, zeroblob(?))'
- irow = yield self.dbpool.insertAndGetLastRowid(insert, (blob_id, size))
- handle = yield self.dbpool.blob('blobs', 'payload', irow, 1)
- blob_fd.seek(0)
- # XXX I have to copy the buffer here so that I'm able to
- # return a non-closed file to the caller (blobmanager.get)
- # FIXME should remove this duplication!
- # have a look at how treq does cope with closing the handle
- # for uploading a file
- producer = FileBodyProducer(blob_fd)
- done = yield producer.startProducing(handle)
- logger.info("Finished saving blob in local database.")
- defer.returnValue(done)
-
- @defer.inlineCallbacks
- def get(self, blob_id):
- # TODO we can also stream the blob value using sqlite
- # incremental interface for blobs - and just return the raw fd instead
- select = 'SELECT payload FROM blobs WHERE blob_id = ?'
- result = yield self.dbpool.runQuery(select, (blob_id,))
- if result:
- defer.returnValue(BytesIO(str(result[0][0])))
-
- @defer.inlineCallbacks
- def list(self):
- query = 'select blob_id from blobs'
- result = yield self.dbpool.runQuery(query)
- if result:
- defer.returnValue([b_id[0] for b_id in result])
- else:
- defer.returnValue([])
-
- @defer.inlineCallbacks
- def exists(self, blob_id):
- query = 'SELECT blob_id from blobs WHERE blob_id = ?'
- result = yield self.dbpool.runQuery(query, (blob_id,))
- defer.returnValue(bool(len(result)))
-
- def delete(self, blob_id):
- query = 'DELETE FROM blobs WHERE blob_id = ?'
- return self.dbpool.runQuery(query, (blob_id,))
-
-
-def _init_blob_table(conn):
- maybe_create = (
- "CREATE TABLE IF NOT EXISTS "
- "blobs ("
- "blob_id PRIMARY KEY, "
- "payload BLOB)")
- conn.execute(maybe_create)
-
-
-def _sqlcipherInitFactory(fun):
- def _initialize(conn):
- fun(conn)
- _init_blob_table(conn)
- return _initialize
-
-
-#
-# testing facilities
-#
-
-@defer.inlineCallbacks
-def testit(reactor):
- # configure logging to stdout
- from twisted.python import log
- import sys
- log.startLogging(sys.stdout)
-
- # parse command line arguments
- import argparse
-
- parser = argparse.ArgumentParser()
- parser.add_argument('--url', default='http://localhost:9000/')
- parser.add_argument('--path', default='/tmp/blobs')
- parser.add_argument('--secret', default='secret')
- parser.add_argument('--uuid', default='user')
- parser.add_argument('--token', default=None)
- parser.add_argument('--cert-file', default='')
-
- subparsers = parser.add_subparsers(help='sub-command help', dest='action')
-
- # parse upload command
- parser_upload = subparsers.add_parser(
- 'upload', help='upload blob and bypass local db')
- parser_upload.add_argument('payload')
- parser_upload.add_argument('blob_id')
-
- # parse download command
- parser_download = subparsers.add_parser(
- 'download', help='download blob and bypass local db')
- parser_download.add_argument('blob_id')
- parser_download.add_argument('--output-file', default='/tmp/incoming-file')
-
- # parse put command
- parser_put = subparsers.add_parser(
- 'put', help='put blob in local db and upload')
- parser_put.add_argument('payload')
- parser_put.add_argument('blob_id')
-
- # parse get command
- parser_get = subparsers.add_parser(
- 'get', help='get blob from local db, get if needed')
- parser_get.add_argument('blob_id')
-
- # parse delete command
- parser_get = subparsers.add_parser(
- 'delete', help='delete blob from local and remote db')
- parser_get.add_argument('blob_id')
-
- # parse list command
- parser_get = subparsers.add_parser(
- 'list', help='list local and remote blob ids')
-
- # parse send_missing command
- parser_get = subparsers.add_parser(
- 'send_missing', help='send all pending upload blobs')
-
- # parse send_missing command
- parser_get = subparsers.add_parser(
- 'fetch_missing', help='fetch all new server blobs')
-
- # parse arguments
- args = parser.parse_args()
-
- # TODO convert these into proper unittests
-
- def _manager():
- mkdir_p(os.path.dirname(args.path))
- manager = BlobManager(
- args.path, args.url,
- 'A' * 32, args.secret,
- args.uuid, args.token, args.cert_file)
- return manager
-
- @defer.inlineCallbacks
- def _upload(blob_id, payload):
- logger.info(":: Starting upload only: %s" % str((blob_id, payload)))
- manager = _manager()
- with open(payload, 'r') as fd:
- yield manager._encrypt_and_upload(blob_id, fd)
- logger.info(":: Finished upload only: %s" % str((blob_id, payload)))
-
- @defer.inlineCallbacks
- def _download(blob_id):
- logger.info(":: Starting download only: %s" % blob_id)
- manager = _manager()
- result = yield manager._download_and_decrypt(blob_id)
- logger.info(":: Result of download: %s" % str(result))
- if result:
- fd, _ = result
- with open(args.output_file, 'w') as f:
- logger.info(":: Writing data to %s" % args.output_file)
- f.write(fd.read())
- logger.info(":: Finished download only: %s" % blob_id)
-
- @defer.inlineCallbacks
- def _put(blob_id, payload):
- logger.info(":: Starting full put: %s" % blob_id)
- manager = _manager()
- size = os.path.getsize(payload)
- with open(payload) as fd:
- doc = BlobDoc(fd, blob_id)
- result = yield manager.put(doc, size=size)
- logger.info(":: Result of put: %s" % str(result))
- logger.info(":: Finished full put: %s" % blob_id)
-
- @defer.inlineCallbacks
- def _get(blob_id):
- logger.info(":: Starting full get: %s" % blob_id)
- manager = _manager()
- fd = yield manager.get(blob_id)
- if fd:
- logger.info(":: Result of get: " + fd.getvalue())
- logger.info(":: Finished full get: %s" % blob_id)
-
- @defer.inlineCallbacks
- def _delete(blob_id):
- logger.info(":: Starting deletion of: %s" % blob_id)
- manager = _manager()
- yield manager.delete(blob_id)
- logger.info(":: Finished deletion of: %s" % blob_id)
-
- @defer.inlineCallbacks
- def _list():
- logger.info(":: Listing local blobs")
- manager = _manager()
- local_list = yield manager.local_list()
- logger.info(":: Local list: %s" % local_list)
- logger.info(":: Listing remote blobs")
- remote_list = yield manager.remote_list()
- logger.info(":: Remote list: %s" % remote_list)
-
- @defer.inlineCallbacks
- def _send_missing():
- logger.info(":: Sending local pending upload docs")
- manager = _manager()
- yield manager.send_missing()
- logger.info(":: Finished sending missing docs")
-
- @defer.inlineCallbacks
- def _fetch_missing():
- logger.info(":: Fetching remote new docs")
- manager = _manager()
- yield manager.fetch_missing()
- logger.info(":: Finished fetching new docs")
-
- if args.action == 'upload':
- yield _upload(args.blob_id, args.payload)
- elif args.action == 'download':
- yield _download(args.blob_id)
- elif args.action == 'put':
- yield _put(args.blob_id, args.payload)
- elif args.action == 'get':
- yield _get(args.blob_id)
- elif args.action == 'delete':
- yield _delete(args.blob_id)
- elif args.action == 'list':
- yield _list()
- elif args.action == 'send_missing':
- yield _send_missing()
- elif args.action == 'fetch_missing':
- yield _fetch_missing()
-
-
-if __name__ == '__main__':
- from twisted.internet.task import react
- react(testit)