summaryrefslogtreecommitdiff
path: root/client/src/leap/soledad/client/_blobs.py
blob: f41f3cf4febd07e27ded7e9cf2d0a104b57ee192 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
"""
Clientside BlobBackend Storage.
"""

from copy import copy
from uuid import uuid4
import os.path

from io import BytesIO
from functools import partial

from sqlite3 import Binary

from twisted.logger import Logger
from twisted.enterprise import adbapi
from twisted.internet import defer, reactor
from twisted.web.client import FileBodyProducer

import treq

from leap.soledad.client.sqlcipher import SQLCipherOptions
from leap.soledad.client import pragmas

from _crypto import DocInfo, BlobEncryptor, BlobDecryptor


logger = Logger()


class _ConnectionPool(adbapi.ConnectionPool):

    def blob(self, table, column, key, value):
        conn = self.connectionFactory(self)
        # XXX FIXME what are these values???
        # shouldn't I pass the doc_id key?? Why is it asking for an integer???
        blob = conn.blob(table, column, 1, 1)
        print "GOT BLOB", blob
        return blob


class DecrypterBuffer(object):

    def __init__(self, doc_id, rev, secret):
        self.decrypter = None
        self.buffer = BytesIO()
        self.doc_info = DocInfo(doc_id, rev)
        self.secret = secret
        self.d = None

    def write(self, data):
        if not self.decrypter:
            self.buffer.write(data)
            self.decrypter = BlobDecryptor(
                self.doc_info, self.buffer,
                secret=self.secret,
                armor=True,
                start_stream=False)
            self.d = self.decrypter.decrypt()
        else:
            self.decrypter.write(data)

    def close(self):
        if self.d:
            self.d.addCallback(lambda result: (result, self.decrypter.size))
        return self.d


class BlobManager(object):
    """
    Ideally, the decrypting flow goes like this:

    - GET a blob from remote server.
    - Decrypt the preamble
    - Allocate a zeroblob in the sqlcipher sink
    - Mark the blob as unusable (ie, not verified)
    - Decrypt the payload incrementally, and write chunks to sqlcipher
      ** Is it possible to use a small buffer for the aes writer w/o
      ** allocating all the memory in openssl?
    - Finalize the AES decryption
    - If preamble + payload verifies correctly, mark the blob as usable

    """

    def __init__(self, local_path, remote, key, secret, user):
        self.local = SQLiteBlobBackend(local_path, key)
        self.remote = remote
        self.secret = secret
        self.user = user

    @defer.inlineCallbacks
    def put(self, doc):
        fd = doc.blob_fd
        # TODO this is a tee really, but ok... could do db and upload
        # concurrently. not sure if we'd gain something.
        yield self.local.put(doc.blob_id, fd)
        fd.seek(0)
        yield self._encrypt_and_upload(doc.blob_id,  fd, up)

    @defer.inlineCallbacks
    def get(self, blob_id, doc_id, rev):
        local_blob = yield self.local.get(blob_id)
        if local_blob:
            print "GOT LOCAL BLOB", local_blob
            defer.returnValue(local_blob)

        blob, size = yield self._download_and_decrypt(blob_id, doc_id, rev)
        print "DOWNLOADED BLOB, SIZE:", size

        if blob:
            print 'GOT DECRYPTED BLOB', type(blob)
            print 'SAVING BLOB IN LOCAL STORE'
            blob.seek(0)
            yield self.local.put(blob_id, blob, size=size)
            blob.seek(0)
            defer.returnValue(blob)
        else:
            # XXX we shouldn't get here, but we will...
            # lots of ugly error handling possible:
            # 1. retry, might be network error
            # 2. try later, maybe didn't finished streaming
            # 3.. resignation, might be error while verifying
            logger.error('sorry, dunno what happened')

    @defer.inlineCallbacks
    def _encrypt_and_upload(self, blob_id, doc_id, rev, payload):
        # TODO ------------------------------------------
        # this is wrong, is doing 2 stages.
        # the crypto producer can be passed to 
        # the uploader and react as data is written.
        # try to rewrite as a tube: pass the fd to aes and let aes writer
        # produce data to the treq request fd.
        # ------------------------------------------------
        doc_info = DocInfo(doc_id, rev)
        uri = self.remote + '/' + self.user + '/' + blob_id
        crypter = BlobEncryptor(doc_info, payload, secret=self.secret,
                                armor=True)
        result = yield crypter.encrypt()
        yield treq.put(uri, data=result)

    @defer.inlineCallbacks
    def _download_and_decrypt(self, blob_id, doc_id, rev):
        # TODO this needs to be connected in a tube
        uri = self.remote + self.user + '/' + blob_id
        buf = DecrypterBuffer(doc_id, rev, self.secret)
        data = yield treq.get(uri)
        yield treq.collect(data, buf.write)
        blob = yield buf.close()
        defer.returnValue(blob)


class SQLiteBlobBackend(object):

    def __init__(self, path, key=None):
        self.path = os.path.abspath(
            os.path.join(path, 'soledad_blob.db'))
        if not key:
            raise ValueError('key cannot be None')
        backend = 'pysqlcipher.dbapi2'
        opts = SQLCipherOptions('/tmp/ignored', key)
        pragmafun = partial(pragmas.set_init_pragmas, opts=opts)
        openfun = _sqlcipherInitFactory(pragmafun)

        self.dbpool = _ConnectionPool(
            backend, self.path, check_same_thread=False, timeout=5,
            cp_openfun=openfun, cp_min=1, cp_max=2, cp_name='blob_pool')

    @defer.inlineCallbacks
    def put(self, blob_id, blob_fd, size=None):
        insert = str('INSERT INTO blobs VALUES (?, zeroblob(?))')
        yield self.dbpool.runQuery(insert, (blob_id, size))
        dbblob = self.dbpool.blob('blobs', 'payload', 'blob_id', blob_id)
        blob_fd.seek(0)
        # XXX I have to copy the buffer here so that I'm able to
        # return a non-closed file to the caller (blobmanager.get)
        # FIXME should remove this duplication!
        # have a look at how treq does cope with closing the handle
        # for uploading a file
        producer = FileBodyProducer(copy(blob_fd))
        done = yield producer.startProducing(dbblob)
        defer.returnValue(done)

    @defer.inlineCallbacks
    def get(self, blob_id):
        # TODO we can also stream the blob value using sqlite
        # incremental interface for blobs - and just return the raw fd instead
        select = 'SELECT payload FROM blobs WHERE blob_id = ?'
        result = yield self.dbpool.runQuery(select, (blob_id,))
        if result:
            defer.returnValue(BytesIO(str(result[0][0])))


def _init_blob_table(conn):
    maybe_create = (
        "CREATE TABLE IF NOT EXISTS "
        "blobs ("
        "blob_id PRIMARY KEY, "
        "payload BLOB)")
    conn.execute(maybe_create)


def _sqlcipherInitFactory(fun):
    def _initialize(conn):
        fun(conn)
        _init_blob_table(conn)
    return _initialize

# --------------------8<----------------------------------------------
#class BlobDoc(object):
#
    # TODO probably not needed, but convenient for testing for now.
#
    #def __init__(self, doc_id, rev, content, blob_id=None):
#
        #self.doc_id = doc_id
        #self.rev = rev
        #self.is_blob = True
        #self.blob_fd = content
        #if blob_id is None:
            #blob_id = uuid4().get_hex()
        #self.blob_id = blob_id
# --------------------8<----------------------------------------------

@defer.inlineCallbacks
def testit(reactor):

    # TODO convert this into proper unittests

    import sys
    try:
        cmd = sys.argv[1]
    except:
        cmd = ''

    if cmd == 'upload':
        src = sys.argv[2]
        blob_id = sys.argv[3]

        doc_info = DocInfo('mydoc', '1')
        print "DOC INFO", doc_info

        # I don't use BlobManager here because I need to avoid
        # putting the blob on local db on upload
        crypter = BlobEncryptor(
            doc_info, open(src, 'r'), 'A' * 32, armor=True)
        print "UPLOADING WITH ENCRYPTOR"
        result = yield crypter.encrypt()
        yield treq.put('http://localhost:9000/user/' + blob_id, data=result)

    elif cmd == 'download':
        blob_id = sys.argv[2]
        manager = BlobManager(
            '/tmp/blobs', 'http://localhost:9000/',
            'A' * 32, 'secret', 'user')
        result = yield manager.get(blob_id, 'mydoc', '1')
        print result.getvalue()

    else:
        print "Usage:"
        print "cd server/src/leap/soledad/server/ && python _blobs.py"
        print "python _blobs.py upload /path/to/file blob_id"
        print "python _blobs.py download blob_id"


if __name__ == '__main__':
    from twisted.internet.task import react
    react(testit)