f9166ce02646ff2f19edc75b03ff8ffebb2e8f0c
[soledad.git] / src / leap / soledad / server / _blobs.py
1 # -*- coding: utf-8 -*-
2 # _blobs.py
3 # Copyright (C) 2017 LEAP
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 """
19 Blobs Server implementation.
20
21 This is a very simplistic implementation for the time being.
22 Clients should be able to opt-in util the feature is complete.
23
24 A more performant BlobsBackend can (and should) be implemented for production
25 environments.
26 """
27 import os
28 import base64
29 import json
30 import re
31
32 from twisted.web import static
33 from twisted.web import resource
34 from twisted.web.client import FileBodyProducer
35 from twisted.web.server import NOT_DONE_YET
36 from twisted.internet import utils, defer
37
38 from zope.interface import implementer
39
40 from leap.common.files import mkdir_p
41 from leap.soledad.common.log import getLogger
42 from leap.soledad.server import interfaces
43 from leap.soledad.common.blobs import ACCEPTED_FLAGS
44 from leap.soledad.common.blobs import InvalidFlag
45
46
47 __all__ = ['BlobsResource']
48
49
50 logger = getLogger(__name__)
51
52 # Used for sanitizers, we accept only letters, numbers, '-' and '_'
53 VALID_STRINGS = re.compile('^[a-zA-Z0-9_-]+$')
54
55
56 # for the future:
57 # [ ] isolate user avatar in a safer way
58 # [ ] catch timeout in the server (and delete incomplete upload)
59 # [ ] chunking (should we do it on the client or on the server?)
60
61
62 class BlobNotFound(Exception):
63     """
64     Raised when a blob is not found in data storage backend.
65     """
66
67
68 class BlobExists(Exception):
69     """
70     Raised when a blob already exists in data storage backend.
71     """
72
73
74 class QuotaExceeded(Exception):
75     """
76     Raised when the quota would be exceeded if an operation would be held.
77     """
78
79
80 @implementer(interfaces.IBlobsBackend)
81 class FilesystemBlobsBackend(object):
82
83     def __init__(self, blobs_path='/tmp/blobs/', quota=200 * 1024,
84                  concurrent_writes=50):
85         self.quota = quota
86         self.semaphore = defer.DeferredSemaphore(concurrent_writes)
87         if not os.path.isdir(blobs_path):
88             os.makedirs(blobs_path)
89         self.path = blobs_path
90
91     def __touch(self, path):
92         open(path, 'a')
93
94     def read_blob(self, user, blob_id, namespace=''):
95         logger.info('reading blob: %s - %s@%s' % (user, blob_id, namespace))
96         path = self._get_path(user, blob_id, namespace)
97         logger.debug('blob path: %s' % path)
98         res = static.File(path, defaultType='application/octet-stream')
99         return res
100
101     def get_flags(self, user, blob_id, namespace=''):
102         path = self._get_path(user, blob_id, namespace)
103         if not os.path.isfile(path):
104             raise BlobNotFound
105         if not os.path.isfile(path + '.flags'):
106             return []
107         with open(path + '.flags', 'r') as flags_file:
108             return json.loads(flags_file.read())
109
110     def set_flags(self, user, blob_id, flags, namespace=''):
111         path = self._get_path(user, blob_id, namespace)
112         if not os.path.isfile(path):
113             raise BlobNotFound
114         for flag in flags:
115             if flag not in ACCEPTED_FLAGS:
116                 raise InvalidFlag(flag)
117         with open(path + '.flags', 'w') as flags_file:
118             raw_flags = json.dumps(flags)
119             flags_file.write(raw_flags)
120
121     @defer.inlineCallbacks
122     def write_blob(self, user, blob_id, fd, namespace=''):
123         yield self.semaphore.acquire()
124         path = self._get_path(user, blob_id, namespace)
125         try:
126             mkdir_p(os.path.split(path)[0])
127         except OSError as e:
128             logger.warn("Got exception trying to create directory: %r" % e)
129         if os.path.isfile(path):
130             raise BlobExists
131         used = yield self.get_total_storage(user)
132         if used > self.quota:
133             raise QuotaExceeded
134         logger.info('writing blob: %s - %s' % (user, blob_id))
135         fbp = FileBodyProducer(fd)
136         with open(path, 'wb') as blobfile:
137             yield fbp.startProducing(blobfile)
138         yield self.semaphore.release()
139
140     def delete_blob(self, user, blob_id, namespace=''):
141         blob_path = self._get_path(user, blob_id, namespace)
142         if not os.path.isfile(blob_path):
143             raise BlobNotFound
144         self.__touch(blob_path + '.deleted')
145         os.unlink(blob_path)
146         try:
147             os.unlink(blob_path + '.flags')
148         except Exception:
149             pass
150         return defer.succeed(None)
151
152     def get_blob_size(self, user, blob_id, namespace=''):
153         blob_path = self._get_path(user, blob_id, namespace)
154         return os.stat(blob_path).st_size
155
156     def count(self, user, namespace=''):
157         base_path = self._get_path(user, namespace=namespace)
158         count = 0
159         for _, _, filenames in os.walk(base_path):
160             count += len(filter(lambda i: not i.endswith('.flags'), filenames))
161         return json.dumps({"count": count})
162
163     def list_blobs(self, user, namespace='', order_by=None, deleted=False,
164                    filter_flag=False):
165         namespace = namespace or 'default'
166         blob_ids = []
167         base_path = self._get_path(user, namespace=namespace)
168
169         def match(name):
170             if deleted:
171                 return name.endswith('.deleted')
172             return VALID_STRINGS.match(name)
173         for root, dirs, filenames in os.walk(base_path):
174             blob_ids += [os.path.join(root, name) for name in filenames
175                          if match(name)]
176         if order_by in ['date', '+date']:
177             blob_ids.sort(key=lambda x: os.path.getmtime(x))
178         elif order_by == '-date':
179             blob_ids.sort(key=lambda x: os.path.getmtime(x), reverse=True)
180         elif order_by:
181             raise Exception("Unsupported order_by parameter: %s" % order_by)
182         if filter_flag:
183             blob_ids = list(self._filter_flag(blob_ids, filter_flag))
184         blob_ids = [os.path.basename(path).replace('.deleted', '')
185                     for path in blob_ids]
186         return blob_ids
187
188     def _filter_flag(self, blob_paths, flag):
189         for blob_path in blob_paths:
190             flag_path = blob_path + '.flags'
191             if not os.path.isfile(flag_path):
192                 continue
193             with open(flag_path, 'r') as flags_file:
194                 blob_flags = json.loads(flags_file.read())
195             if flag in blob_flags:
196                 yield blob_path
197
198     def get_total_storage(self, user):
199         return self._get_disk_usage(self._get_path(user))
200
201     def get_tag(self, user, blob_id, namespace=''):
202         blob_path = self._get_path(user, blob_id, namespace)
203         if not os.path.isfile(blob_path):
204             raise BlobNotFound
205         with open(blob_path) as doc_file:
206             doc_file.seek(-16, 2)
207             tag = base64.urlsafe_b64encode(doc_file.read())
208             return tag
209
210     @defer.inlineCallbacks
211     def _get_disk_usage(self, start_path):
212         if not os.path.isdir(start_path):
213             defer.returnValue(0)
214         cmd = ['/usr/bin/du', '-s', '-c', start_path]
215         output = yield utils.getProcessOutput(cmd[0], cmd[1:])
216         size = output.split()[0]
217         defer.returnValue(int(size))
218
219     def _validate_path(self, desired_path, user, blob_id):
220         if not VALID_STRINGS.match(user):
221             raise Exception("Invalid characters on user: %s" % user)
222         if blob_id and not VALID_STRINGS.match(blob_id):
223             raise Exception("Invalid characters on blob_id: %s" % blob_id)
224         desired_path = os.path.realpath(desired_path)  # expand path references
225         root = os.path.realpath(self.path)
226         if not desired_path.startswith(root + os.sep + user):
227             err = "User %s tried accessing a invalid path: %s" % (user,
228                                                                   desired_path)
229             raise Exception(err)
230         return desired_path
231
232     def exists(self, user, blob_id, namespace):
233         return os.path.isfile(
234             self._get_path(user, blob_id=blob_id, namespace=namespace))
235
236     def _get_path(self, user, blob_id='', namespace=''):
237         parts = [user]
238         if blob_id:
239             namespace = namespace or 'default'
240             parts += self._get_path_parts(blob_id, namespace)
241         elif namespace and not blob_id:
242             parts += [namespace]  # namespace path
243         else:
244             pass  # root path
245         path = os.path.join(self.path, *parts)
246         return self._validate_path(path, user, blob_id)
247
248     def _get_path_parts(self, blob_id, custom):
249         if custom and not blob_id:
250             return [custom]
251         return [custom] + [blob_id[0], blob_id[0:3], blob_id[0:6]] + [blob_id]
252
253
254 class ImproperlyConfiguredException(Exception):
255     pass
256
257
258 class BlobsResource(resource.Resource):
259
260     isLeaf = True
261
262     # Allowed backend classes are defined here
263     handlers = {"filesystem": FilesystemBlobsBackend}
264
265     def __init__(self, backend, blobs_path, **backend_kwargs):
266         resource.Resource.__init__(self)
267         self._blobs_path = blobs_path
268         backend_kwargs.update({'blobs_path': blobs_path})
269         if backend not in self.handlers:
270             raise ImproperlyConfiguredException("No such backend: %s", backend)
271         self._handler = self.handlers[backend](**backend_kwargs)
272         assert interfaces.IBlobsBackend.providedBy(self._handler)
273
274     # TODO double check credentials, we can have then
275     # under request.
276
277     def render_GET(self, request):
278         logger.info("http get: %s" % request.path)
279         user, blob_id, namespace = self._validate(request)
280         if not blob_id and request.args.get('only_count', [False])[0]:
281             return self._handler.count(user, namespace)
282         elif not blob_id:
283             order = request.args.get('order_by', [None])[0]
284             filter_flag = request.args.get('filter_flag', [False])[0]
285             deleted = request.args.get('deleted', [False])[0]
286             blobs = self._handler.list_blobs(user, namespace,
287                                              order_by=order, deleted=deleted,
288                                              filter_flag=filter_flag)
289             return json.dumps(blobs)
290         only_flags = request.args.get('only_flags', [False])[0]
291         try:
292             if only_flags:
293                 flags = self._handler.get_flags(user, blob_id, namespace)
294                 return json.dumps(flags)
295             tag = self._handler.get_tag(user, blob_id, namespace)
296             request.responseHeaders.setRawHeaders('Tag', [tag])
297         except BlobNotFound:
298             # 404 - Not Found
299             request.setResponseCode(404)
300             return "Blob doesn't exists: %s" % blob_id
301         res = self._handler.read_blob(user, blob_id, namespace=namespace)
302         return res.render_GET(request)
303
304     def render_DELETE(self, request):
305         logger.info("http put: %s" % request.path)
306         user, blob_id, namespace = self._validate(request)
307
308         def catchBlobNotFound(failure):
309             failure.trap(BlobNotFound)
310             request.setResponseCode(404)
311             return "Blob doesn't exists: %s" % blob_id
312
313         d = self._handler.delete_blob(user, blob_id, namespace=namespace)
314         d.addCallback(lambda _: request.finish())
315         d.addErrback(catchBlobNotFound)
316         return NOT_DONE_YET
317
318     def render_PUT(self, request):
319         logger.info("http put: %s" % request.path)
320         user, blob_id, namespace = self._validate(request)
321
322         def catchBlobExists(failure):
323             failure.trap(BlobExists)
324             request.setResponseCode(409)
325             request.write("Blob already exists: %s" % blob_id)
326             request.finish()
327
328         def catchQuotaExceeded(failure):
329             failure.trap(QuotaExceeded)
330             logger.error("Error 507: Quota exceeded for user: %s" % user)
331             request.setResponseCode(507)
332             request.write('Quota Exceeded!')
333             request.finish()
334
335         fd = request.content
336         d = self._handler.write_blob(user, blob_id, fd, namespace=namespace)
337         d.addCallback(lambda _: request.finish())
338         d.addErrback(catchBlobExists)
339         d.addErrback(catchQuotaExceeded)
340         d.addErrback(self._error, request)
341         return NOT_DONE_YET
342
343     def render_POST(self, request):
344         logger.info("http post: %s" % request.path)
345         user, blob_id, namespace = self._validate(request)
346         raw_flags = request.content.read()
347         flags = json.loads(raw_flags)
348         try:
349             self._handler.set_flags(user, blob_id, flags, namespace=namespace)
350         except BlobNotFound:
351             # 404 - Not Found
352             request.setResponseCode(404)
353             return "Blob doesn't exists: %s" % blob_id
354         except InvalidFlag as e:
355             request.setResponseCode(406)
356             flag = e.message
357             return "Invalid flag: %s" % str(flag)
358         return ''
359
360     def _error(self, e, request):
361         logger.error('Error processing request: %s' % e.getErrorMessage())
362         request.setResponseCode(500)
363         request.finish()
364
365     def _validate(self, request):
366         for arg in request.postpath:
367             if arg and not VALID_STRINGS.match(arg):
368                 raise Exception('Invalid blob resource argument: %s' % arg)
369         namespace = request.args.get('namespace', ['default'])[0]
370         if namespace and not VALID_STRINGS.match(namespace):
371             raise Exception('Invalid blob namespace: %s' % namespace)
372         return request.postpath + [namespace]
373
374
375 if __name__ == '__main__':
376     # A dummy blob server
377     # curl -X PUT --data-binary @/tmp/book.pdf localhost:9000/user/someid
378     # curl -X GET -o /dev/null localhost:9000/user/somerandomstring
379     from twisted.python import log
380     import sys
381     log.startLogging(sys.stdout)
382
383     from twisted.web.server import Site
384     from twisted.internet import reactor
385
386     # parse command line arguments
387     import argparse
388
389     parser = argparse.ArgumentParser()
390     parser.add_argument('--port', default=9000, type=int)
391     parser.add_argument('--path', default='/tmp/blobs/user')
392     args = parser.parse_args()
393
394     root = BlobsResource("filesystem", args.path)
395     # I picture somethink like
396     # BlobsResource(backend="filesystem", backend_opts={'path': '/tmp/blobs'})
397
398     factory = Site(root)
399     reactor.listenTCP(args.port, factory)
400     reactor.run()
401
402
403 class BlobsServerState(object):
404     """
405     Given a backend name, it gives a instance of IBlobsBackend
406     """
407     # Allowed backend classes are defined here
408     handlers = {"filesystem": FilesystemBlobsBackend}
409
410     def __init__(self, backend, **backend_kwargs):
411         if backend not in self.handlers:
412             raise ImproperlyConfiguredException("No such backend: %s", backend)
413         self.backend = self.handlers[backend](**backend_kwargs)
414
415     def open_database(self, user_id):
416         """
417         That method is just for compatibility with CouchServerState, so
418         IncomingAPI can change backends.
419         """
420         # TODO: deprecate/refactor it as it's here for compatibility.
421         return self.backend