[refactor] make get_blob_size() return a deferred
[soledad.git] / src / leap / soledad / server / _blobs.py
1 # -*- coding: utf-8 -*-
2 # _blobs.py
3 # Copyright (C) 2017 LEAP
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 """
19 Blobs Server implementation.
20
21 This is a very simplistic implementation for the time being.
22 Clients should be able to opt-in util the feature is complete.
23
24 A more performant BlobsBackend can (and should) be implemented for production
25 environments.
26 """
27 import os
28 import base64
29 import json
30 import re
31
32 from twisted.web import static
33 from twisted.web import resource
34 from twisted.web.client import FileBodyProducer
35 from twisted.web.server import NOT_DONE_YET
36 from twisted.internet import utils, defer
37
38 from zope.interface import implementer
39
40 from leap.common.files import mkdir_p
41 from leap.soledad.common.log import getLogger
42 from leap.soledad.server import interfaces
43 from leap.soledad.common.blobs import ACCEPTED_FLAGS
44 from leap.soledad.common.blobs import InvalidFlag
45
46
47 __all__ = ['BlobsResource']
48
49
50 logger = getLogger(__name__)
51
52 # Used for sanitizers, we accept only letters, numbers, '-' and '_'
53 VALID_STRINGS = re.compile('^[a-zA-Z0-9_-]+$')
54
55
56 # for the future:
57 # [ ] isolate user avatar in a safer way
58 # [ ] catch timeout in the server (and delete incomplete upload)
59 # [ ] chunking (should we do it on the client or on the server?)
60
61
62 class BlobNotFound(Exception):
63     """
64     Raised when a blob is not found in data storage backend.
65     """
66
67
68 class BlobExists(Exception):
69     """
70     Raised when a blob already exists in data storage backend.
71     """
72
73
74 class QuotaExceeded(Exception):
75     """
76     Raised when the quota would be exceeded if an operation would be held.
77     """
78
79
80 @implementer(interfaces.IBlobsBackend)
81 class FilesystemBlobsBackend(object):
82
83     def __init__(self, blobs_path='/tmp/blobs/', quota=200 * 1024,
84                  concurrent_writes=50):
85         self.quota = quota
86         self.semaphore = defer.DeferredSemaphore(concurrent_writes)
87         if not os.path.isdir(blobs_path):
88             os.makedirs(blobs_path)
89         self.path = blobs_path
90
91     def __touch(self, path):
92         open(path, 'a')
93
94     def read_blob(self, user, blob_id, namespace=''):
95         logger.info('reading blob: %s - %s@%s' % (user, blob_id, namespace))
96         path = self._get_path(user, blob_id, namespace)
97         logger.debug('blob path: %s' % path)
98         res = static.File(path, defaultType='application/octet-stream')
99         return res
100
101     def get_flags(self, user, blob_id, namespace=''):
102         path = self._get_path(user, blob_id, namespace)
103         if not os.path.isfile(path):
104             raise BlobNotFound
105         if not os.path.isfile(path + '.flags'):
106             return []
107         with open(path + '.flags', 'r') as flags_file:
108             return json.loads(flags_file.read())
109
110     def set_flags(self, user, blob_id, flags, namespace=''):
111         path = self._get_path(user, blob_id, namespace)
112         if not os.path.isfile(path):
113             raise BlobNotFound
114         for flag in flags:
115             if flag not in ACCEPTED_FLAGS:
116                 raise InvalidFlag(flag)
117         with open(path + '.flags', 'w') as flags_file:
118             raw_flags = json.dumps(flags)
119             flags_file.write(raw_flags)
120
121     @defer.inlineCallbacks
122     def write_blob(self, user, blob_id, fd, namespace=''):
123         yield self.semaphore.acquire()
124         path = self._get_path(user, blob_id, namespace)
125         try:
126             mkdir_p(os.path.split(path)[0])
127         except OSError as e:
128             logger.warn("Got exception trying to create directory: %r" % e)
129         if os.path.isfile(path):
130             raise BlobExists
131         used = yield self.get_total_storage(user)
132         if used > self.quota:
133             raise QuotaExceeded
134         logger.info('writing blob: %s - %s' % (user, blob_id))
135         fbp = FileBodyProducer(fd)
136         with open(path, 'wb') as blobfile:
137             yield fbp.startProducing(blobfile)
138         yield self.semaphore.release()
139
140     def delete_blob(self, user, blob_id, namespace=''):
141         blob_path = self._get_path(user, blob_id, namespace)
142         if not os.path.isfile(blob_path):
143             raise BlobNotFound
144         self.__touch(blob_path + '.deleted')
145         os.unlink(blob_path)
146         try:
147             os.unlink(blob_path + '.flags')
148         except Exception:
149             pass
150         return defer.succeed(None)
151
152     def get_blob_size(self, user, blob_id, namespace=''):
153         blob_path = self._get_path(user, blob_id, namespace)
154         size = os.stat(blob_path).st_size
155         return defer.succeed(size)
156
157     def count(self, user, namespace=''):
158         base_path = self._get_path(user, namespace=namespace)
159         count = 0
160         for _, _, filenames in os.walk(base_path):
161             count += len(filter(lambda i: not i.endswith('.flags'), filenames))
162         return json.dumps({"count": count})
163
164     def list_blobs(self, user, namespace='', order_by=None, deleted=False,
165                    filter_flag=False):
166         namespace = namespace or 'default'
167         blob_ids = []
168         base_path = self._get_path(user, namespace=namespace)
169
170         def match(name):
171             if deleted:
172                 return name.endswith('.deleted')
173             return VALID_STRINGS.match(name)
174         for root, dirs, filenames in os.walk(base_path):
175             blob_ids += [os.path.join(root, name) for name in filenames
176                          if match(name)]
177         if order_by in ['date', '+date']:
178             blob_ids.sort(key=lambda x: os.path.getmtime(x))
179         elif order_by == '-date':
180             blob_ids.sort(key=lambda x: os.path.getmtime(x), reverse=True)
181         elif order_by:
182             raise Exception("Unsupported order_by parameter: %s" % order_by)
183         if filter_flag:
184             blob_ids = list(self._filter_flag(blob_ids, filter_flag))
185         blob_ids = [os.path.basename(path).replace('.deleted', '')
186                     for path in blob_ids]
187         return blob_ids
188
189     def _filter_flag(self, blob_paths, flag):
190         for blob_path in blob_paths:
191             flag_path = blob_path + '.flags'
192             if not os.path.isfile(flag_path):
193                 continue
194             with open(flag_path, 'r') as flags_file:
195                 blob_flags = json.loads(flags_file.read())
196             if flag in blob_flags:
197                 yield blob_path
198
199     def get_total_storage(self, user):
200         return self._get_disk_usage(self._get_path(user))
201
202     def get_tag(self, user, blob_id, namespace=''):
203         blob_path = self._get_path(user, blob_id, namespace)
204         if not os.path.isfile(blob_path):
205             raise BlobNotFound
206         with open(blob_path) as doc_file:
207             doc_file.seek(-16, 2)
208             tag = base64.urlsafe_b64encode(doc_file.read())
209             return tag
210
211     @defer.inlineCallbacks
212     def _get_disk_usage(self, start_path):
213         if not os.path.isdir(start_path):
214             defer.returnValue(0)
215         cmd = ['/usr/bin/du', '-s', '-c', start_path]
216         output = yield utils.getProcessOutput(cmd[0], cmd[1:])
217         size = output.split()[0]
218         defer.returnValue(int(size))
219
220     def _validate_path(self, desired_path, user, blob_id):
221         if not VALID_STRINGS.match(user):
222             raise Exception("Invalid characters on user: %s" % user)
223         if blob_id and not VALID_STRINGS.match(blob_id):
224             raise Exception("Invalid characters on blob_id: %s" % blob_id)
225         desired_path = os.path.realpath(desired_path)  # expand path references
226         root = os.path.realpath(self.path)
227         if not desired_path.startswith(root + os.sep + user):
228             err = "User %s tried accessing a invalid path: %s" % (user,
229                                                                   desired_path)
230             raise Exception(err)
231         return desired_path
232
233     def exists(self, user, blob_id, namespace):
234         return os.path.isfile(
235             self._get_path(user, blob_id=blob_id, namespace=namespace))
236
237     def _get_path(self, user, blob_id='', namespace=''):
238         parts = [user]
239         if blob_id:
240             namespace = namespace or 'default'
241             parts += self._get_path_parts(blob_id, namespace)
242         elif namespace and not blob_id:
243             parts += [namespace]  # namespace path
244         else:
245             pass  # root path
246         path = os.path.join(self.path, *parts)
247         return self._validate_path(path, user, blob_id)
248
249     def _get_path_parts(self, blob_id, custom):
250         if custom and not blob_id:
251             return [custom]
252         return [custom] + [blob_id[0], blob_id[0:3], blob_id[0:6]] + [blob_id]
253
254
255 class ImproperlyConfiguredException(Exception):
256     pass
257
258
259 class BlobsResource(resource.Resource):
260
261     isLeaf = True
262
263     # Allowed backend classes are defined here
264     handlers = {"filesystem": FilesystemBlobsBackend}
265
266     def __init__(self, backend, blobs_path, **backend_kwargs):
267         resource.Resource.__init__(self)
268         self._blobs_path = blobs_path
269         backend_kwargs.update({'blobs_path': blobs_path})
270         if backend not in self.handlers:
271             raise ImproperlyConfiguredException("No such backend: %s", backend)
272         self._handler = self.handlers[backend](**backend_kwargs)
273         assert interfaces.IBlobsBackend.providedBy(self._handler)
274
275     # TODO double check credentials, we can have then
276     # under request.
277
278     def render_GET(self, request):
279         logger.info("http get: %s" % request.path)
280         user, blob_id, namespace = self._validate(request)
281         if not blob_id and request.args.get('only_count', [False])[0]:
282             return self._handler.count(user, namespace)
283         elif not blob_id:
284             order = request.args.get('order_by', [None])[0]
285             filter_flag = request.args.get('filter_flag', [False])[0]
286             deleted = request.args.get('deleted', [False])[0]
287             blobs = self._handler.list_blobs(user, namespace,
288                                              order_by=order, deleted=deleted,
289                                              filter_flag=filter_flag)
290             return json.dumps(blobs)
291         only_flags = request.args.get('only_flags', [False])[0]
292         try:
293             if only_flags:
294                 flags = self._handler.get_flags(user, blob_id, namespace)
295                 return json.dumps(flags)
296             tag = self._handler.get_tag(user, blob_id, namespace)
297             request.responseHeaders.setRawHeaders('Tag', [tag])
298         except BlobNotFound:
299             # 404 - Not Found
300             request.setResponseCode(404)
301             return "Blob doesn't exists: %s" % blob_id
302         res = self._handler.read_blob(user, blob_id, namespace=namespace)
303         return res.render_GET(request)
304
305     def render_DELETE(self, request):
306         logger.info("http put: %s" % request.path)
307         user, blob_id, namespace = self._validate(request)
308
309         def catchBlobNotFound(failure):
310             failure.trap(BlobNotFound)
311             request.setResponseCode(404)
312             return "Blob doesn't exists: %s" % blob_id
313
314         d = self._handler.delete_blob(user, blob_id, namespace=namespace)
315         d.addCallback(lambda _: request.finish())
316         d.addErrback(catchBlobNotFound)
317         return NOT_DONE_YET
318
319     def render_PUT(self, request):
320         logger.info("http put: %s" % request.path)
321         user, blob_id, namespace = self._validate(request)
322
323         def catchBlobExists(failure):
324             failure.trap(BlobExists)
325             request.setResponseCode(409)
326             request.write("Blob already exists: %s" % blob_id)
327             request.finish()
328
329         def catchQuotaExceeded(failure):
330             failure.trap(QuotaExceeded)
331             logger.error("Error 507: Quota exceeded for user: %s" % user)
332             request.setResponseCode(507)
333             request.write('Quota Exceeded!')
334             request.finish()
335
336         fd = request.content
337         d = self._handler.write_blob(user, blob_id, fd, namespace=namespace)
338         d.addCallback(lambda _: request.finish())
339         d.addErrback(catchBlobExists)
340         d.addErrback(catchQuotaExceeded)
341         d.addErrback(self._error, request)
342         return NOT_DONE_YET
343
344     def render_POST(self, request):
345         logger.info("http post: %s" % request.path)
346         user, blob_id, namespace = self._validate(request)
347         raw_flags = request.content.read()
348         flags = json.loads(raw_flags)
349         try:
350             self._handler.set_flags(user, blob_id, flags, namespace=namespace)
351         except BlobNotFound:
352             # 404 - Not Found
353             request.setResponseCode(404)
354             return "Blob doesn't exists: %s" % blob_id
355         except InvalidFlag as e:
356             request.setResponseCode(406)
357             flag = e.message
358             return "Invalid flag: %s" % str(flag)
359         return ''
360
361     def _error(self, e, request):
362         logger.error('Error processing request: %s' % e.getErrorMessage())
363         request.setResponseCode(500)
364         request.finish()
365
366     def _validate(self, request):
367         for arg in request.postpath:
368             if arg and not VALID_STRINGS.match(arg):
369                 raise Exception('Invalid blob resource argument: %s' % arg)
370         namespace = request.args.get('namespace', ['default'])[0]
371         if namespace and not VALID_STRINGS.match(namespace):
372             raise Exception('Invalid blob namespace: %s' % namespace)
373         return request.postpath + [namespace]
374
375
376 if __name__ == '__main__':
377     # A dummy blob server
378     # curl -X PUT --data-binary @/tmp/book.pdf localhost:9000/user/someid
379     # curl -X GET -o /dev/null localhost:9000/user/somerandomstring
380     from twisted.python import log
381     import sys
382     log.startLogging(sys.stdout)
383
384     from twisted.web.server import Site
385     from twisted.internet import reactor
386
387     # parse command line arguments
388     import argparse
389
390     parser = argparse.ArgumentParser()
391     parser.add_argument('--port', default=9000, type=int)
392     parser.add_argument('--path', default='/tmp/blobs/user')
393     args = parser.parse_args()
394
395     root = BlobsResource("filesystem", args.path)
396     # I picture somethink like
397     # BlobsResource(backend="filesystem", backend_opts={'path': '/tmp/blobs'})
398
399     factory = Site(root)
400     reactor.listenTCP(args.port, factory)
401     reactor.run()
402
403
404 class BlobsServerState(object):
405     """
406     Given a backend name, it gives a instance of IBlobsBackend
407     """
408     # Allowed backend classes are defined here
409     handlers = {"filesystem": FilesystemBlobsBackend}
410
411     def __init__(self, backend, **backend_kwargs):
412         if backend not in self.handlers:
413             raise ImproperlyConfiguredException("No such backend: %s", backend)
414         self.backend = self.handlers[backend](**backend_kwargs)
415
416     def open_database(self, user_id):
417         """
418         That method is just for compatibility with CouchServerState, so
419         IncomingAPI can change backends.
420         """
421         # TODO: deprecate/refactor it as it's here for compatibility.
422         return self.backend