diff options
Diffstat (limited to 'src/leap/soledad/server')
-rw-r--r-- | src/leap/soledad/server/__init__.py | 192 | ||||
-rw-r--r-- | src/leap/soledad/server/_blobs.py | 234 | ||||
-rw-r--r-- | src/leap/soledad/server/_config.py | 82 | ||||
-rw-r--r-- | src/leap/soledad/server/_resource.py | 86 | ||||
-rw-r--r-- | src/leap/soledad/server/_server_info.py | 44 | ||||
-rw-r--r-- | src/leap/soledad/server/_wsgi.py | 65 | ||||
-rw-r--r-- | src/leap/soledad/server/auth.py | 173 | ||||
-rw-r--r-- | src/leap/soledad/server/caching.py | 32 | ||||
-rw-r--r-- | src/leap/soledad/server/entrypoint.py | 50 | ||||
-rw-r--r-- | src/leap/soledad/server/gzip_middleware.py | 67 | ||||
-rw-r--r-- | src/leap/soledad/server/interfaces.py | 72 | ||||
-rw-r--r-- | src/leap/soledad/server/session.py | 107 | ||||
-rw-r--r-- | src/leap/soledad/server/state.py | 141 | ||||
-rw-r--r-- | src/leap/soledad/server/sync.py | 305 | ||||
-rw-r--r-- | src/leap/soledad/server/url_mapper.py | 77 |
15 files changed, 1727 insertions, 0 deletions
diff --git a/src/leap/soledad/server/__init__.py b/src/leap/soledad/server/__init__.py new file mode 100644 index 00000000..a4080f13 --- /dev/null +++ b/src/leap/soledad/server/__init__.py @@ -0,0 +1,192 @@ +# -*- coding: utf-8 -*- +# server.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +""" +The Soledad Server allows for recovery document storage and database +synchronization. +""" + +import six.moves.urllib.parse as urlparse +import sys + +from leap.soledad.common.l2db.remote import http_app, utils +from leap.soledad.common import SHARED_DB_NAME + +from .sync import SyncResource +from .sync import MAX_REQUEST_SIZE +from .sync import MAX_ENTRY_SIZE + +from ._version import get_versions +from ._config import get_config + + +__all__ = [ + 'SoledadApp', + 'get_config', + '__version__', +] + + +# ---------------------------------------------------------------------------- +# Soledad WSGI application +# ---------------------------------------------------------------------------- + + +class SoledadApp(http_app.HTTPApp): + """ + Soledad WSGI application + """ + + SHARED_DB_NAME = SHARED_DB_NAME + """ + The name of the shared database that holds user's encrypted secrets. + """ + + max_request_size = MAX_REQUEST_SIZE * 1024 * 1024 + max_entry_size = MAX_ENTRY_SIZE * 1024 * 1024 + + def __call__(self, environ, start_response): + """ + Handle a WSGI call to the Soledad application. + + @param environ: Dictionary containing CGI variables. + @type environ: dict + @param start_response: Callable of the form start_response(status, + response_headers, exc_info=None). + @type start_response: callable + + @return: HTTP application results. + @rtype: list + """ + return http_app.HTTPApp.__call__(self, environ, start_response) + + +# ---------------------------------------------------------------------------- +# WSGI resources registration +# ---------------------------------------------------------------------------- + +# monkey patch u1db with a new resource map +http_app.url_to_resource = http_app.URLToResource() + +# register u1db unmodified resources +http_app.url_to_resource.register(http_app.GlobalResource) +http_app.url_to_resource.register(http_app.DatabaseResource) +http_app.url_to_resource.register(http_app.DocsResource) +http_app.url_to_resource.register(http_app.DocResource) + +# register Soledad's new or modified resources +http_app.url_to_resource.register(SyncResource) + + +# ---------------------------------------------------------------------------- +# Modified HTTP method invocation (to account for splitted sync) +# ---------------------------------------------------------------------------- + +class HTTPInvocationByMethodWithBody( + http_app.HTTPInvocationByMethodWithBody): + """ + Invoke methods on a resource. + """ + + def __call__(self): + """ + Call an HTTP method of a resource. + + This method was rewritten to allow for a sync flow which uses one POST + request for each transferred document (back and forth). + + Usual U1DB sync process transfers all documents from client to server + and back in only one POST request. This is inconvenient for some + reasons, as lack of possibility of gracefully interrupting the sync + process, and possible timeouts for when dealing with large documents + that have to be retrieved and encrypted/decrypted. Because of those, + we split the sync process into many POST requests. + """ + args = urlparse.parse_qsl(self.environ['QUERY_STRING'], + strict_parsing=False) + try: + args = dict( + (k.decode('utf-8'), v.decode('utf-8')) for k, v in args) + except ValueError: + raise http_app.BadRequest() + method = self.environ['REQUEST_METHOD'].lower() + if method in ('get', 'delete'): + meth = self._lookup(method) + return meth(args, None) + else: + # we expect content-length > 0, reconsider if we move + # to support chunked enconding + try: + content_length = int(self.environ['CONTENT_LENGTH']) + except (ValueError, KeyError): + # raise http_app.BadRequest + content_length = self.max_request_size + if content_length <= 0: + raise http_app.BadRequest + if content_length > self.max_request_size: + raise http_app.BadRequest + reader = http_app._FencedReader( + self.environ['wsgi.input'], content_length, + self.max_entry_size) + content_type = self.environ.get('CONTENT_TYPE') + if content_type == 'application/json': + meth = self._lookup(method) + body = reader.read_chunk(sys.maxint) + return meth(args, body) + elif content_type.startswith('application/x-soledad-sync'): + # read one line and validate it + body_getline = reader.getline + if body_getline().strip() != '[': + raise http_app.BadRequest() + line = body_getline() + line, comma = utils.check_and_strip_comma(line.strip()) + meth_args = self._lookup('%s_args' % method) + meth_args(args, line) + # handle incoming documents + if content_type == 'application/x-soledad-sync-put': + meth_put = self._lookup('%s_put' % method) + meth_end = self._lookup('%s_end' % method) + while True: + entry = body_getline().strip() + if entry == ']': # end of incoming document stream + break + if not entry or not comma: # empty or no prec comma + raise http_app.BadRequest + entry, comma = utils.check_and_strip_comma(entry) + content = body_getline().strip() + content, comma = utils.check_and_strip_comma(content) + meth_put({'content': content or None}, entry) + if comma or body_getline(): # extra comma or data + raise http_app.BadRequest + return meth_end() + # handle outgoing documents + elif content_type == 'application/x-soledad-sync-get': + meth_get = self._lookup('%s_get' % method) + return meth_get() + else: + raise http_app.BadRequest() + else: + raise http_app.BadRequest() + + +# monkey patch server with new http invocation +http_app.HTTPInvocationByMethodWithBody = HTTPInvocationByMethodWithBody + + +__version__ = get_versions()['version'] +del get_versions diff --git a/src/leap/soledad/server/_blobs.py b/src/leap/soledad/server/_blobs.py new file mode 100644 index 00000000..10678360 --- /dev/null +++ b/src/leap/soledad/server/_blobs.py @@ -0,0 +1,234 @@ +# -*- coding: utf-8 -*- +# _blobs.py +# Copyright (C) 2017 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +""" +Blobs Server implementation. + +This is a very simplistic implementation for the time being. +Clients should be able to opt-in util the feature is complete. + +A more performant BlobsBackend can (and should) be implemented for production +environments. +""" +import os +import base64 +import json +import re + +from twisted.logger import Logger +from twisted.web import static +from twisted.web import resource +from twisted.web.client import FileBodyProducer +from twisted.web.server import NOT_DONE_YET +from twisted.internet import utils, defer + +from zope.interface import implementer + +from leap.common.files import mkdir_p +from leap.soledad.server import interfaces + + +__all__ = ['BlobsResource'] + + +logger = Logger() + +# Used for sanitizers, we accept only letters, numbers, '-' and '_' +VALID_STRINGS = re.compile('^[a-zA-Z0-9_-]+$') + + +# for the future: +# [ ] isolate user avatar in a safer way +# [ ] catch timeout in the server (and delete incomplete upload) +# [ ] chunking (should we do it on the client or on the server?) + + +@implementer(interfaces.IBlobsBackend) +class FilesystemBlobsBackend(object): + + def __init__(self, blobs_path='/tmp/blobs/', quota=200 * 1024): + self.quota = quota + if not os.path.isdir(blobs_path): + os.makedirs(blobs_path) + self.path = blobs_path + + def read_blob(self, user, blob_id, request): + logger.info('reading blob: %s - %s' % (user, blob_id)) + path = self._get_path(user, blob_id) + logger.debug('blob path: %s' % path) + _file = static.File(path, defaultType='application/octet-stream') + return _file.render_GET(request) + + @defer.inlineCallbacks + def write_blob(self, user, blob_id, request): + path = self._get_path(user, blob_id) + try: + mkdir_p(os.path.split(path)[0]) + except OSError: + pass + if os.path.isfile(path): + # 409 - Conflict + request.setResponseCode(409) + request.write("Blob already exists: %s" % blob_id) + defer.returnValue(None) + used = yield self.get_total_storage(user) + if used > self.quota: + logger.error("Error 507: Quota exceeded for user: %s" % user) + request.setResponseCode(507) + request.write('Quota Exceeded!') + defer.returnValue(None) + logger.info('writing blob: %s - %s' % (user, blob_id)) + fbp = FileBodyProducer(request.content) + yield fbp.startProducing(open(path, 'wb')) + + def delete_blob(self, user, blob_id): + blob_path = self._get_path(user, blob_id) + os.unlink(blob_path) + + def get_blob_size(user, blob_id): + raise NotImplementedError + + def list_blobs(self, user, request): + blob_ids = [] + base_path = self._get_path(user) + for _, _, filenames in os.walk(base_path): + blob_ids += filenames + return json.dumps(blob_ids) + + def get_total_storage(self, user): + return self._get_disk_usage(self._get_path(user)) + + def add_tag_header(self, user, blob_id, request): + with open(self._get_path(user, blob_id)) as doc_file: + doc_file.seek(-16, 2) + tag = base64.urlsafe_b64encode(doc_file.read()) + request.responseHeaders.setRawHeaders('Tag', [tag]) + + @defer.inlineCallbacks + def _get_disk_usage(self, start_path): + if not os.path.isdir(start_path): + defer.returnValue(0) + cmd = ['/usr/bin/du', '-s', '-c', start_path] + output = yield utils.getProcessOutput(cmd[0], cmd[1:]) + size = output.split()[0] + defer.returnValue(int(size)) + + def _validate_path(self, desired_path, user, blob_id): + if not VALID_STRINGS.match(user): + raise Exception("Invalid characters on user: %s" % user) + if blob_id and not VALID_STRINGS.match(blob_id): + raise Exception("Invalid characters on blob_id: %s" % blob_id) + desired_path = os.path.realpath(desired_path) # expand path references + root = os.path.realpath(self.path) + if not desired_path.startswith(root + os.sep + user): + err = "User %s tried accessing a invalid path: %s" % (user, + desired_path) + raise Exception(err) + return desired_path + + def _get_path(self, user, blob_id=False): + parts = [user] + if blob_id: + parts += [blob_id[0], blob_id[0:3], blob_id[0:6]] + parts += [blob_id] + path = os.path.join(self.path, *parts) + return self._validate_path(path, user, blob_id) + + +class ImproperlyConfiguredException(Exception): + pass + + +class BlobsResource(resource.Resource): + + isLeaf = True + + # Allowed backend classes are defined here + handlers = {"filesystem": FilesystemBlobsBackend} + + def __init__(self, backend, blobs_path, **backend_kwargs): + resource.Resource.__init__(self) + self._blobs_path = blobs_path + backend_kwargs.update({'blobs_path': blobs_path}) + if backend not in self.handlers: + raise ImproperlyConfiguredException("No such backend: %s", backend) + self._handler = self.handlers[backend](**backend_kwargs) + assert interfaces.IBlobsBackend.providedBy(self._handler) + + # TODO double check credentials, we can have then + # under request. + + def render_GET(self, request): + logger.info("http get: %s" % request.path) + user, blob_id = self._validate(request) + if not blob_id: + return self._handler.list_blobs(user, request) + self._handler.add_tag_header(user, blob_id, request) + return self._handler.read_blob(user, blob_id, request) + + def render_DELETE(self, request): + logger.info("http put: %s" % request.path) + user, blob_id = self._validate(request) + self._handler.delete_blob(user, blob_id) + return '' + + def render_PUT(self, request): + logger.info("http put: %s" % request.path) + user, blob_id = self._validate(request) + d = self._handler.write_blob(user, blob_id, request) + d.addCallback(lambda _: request.finish()) + d.addErrback(self._error, request) + return NOT_DONE_YET + + def _error(self, e, request): + logger.error('Error processing request: %s' % e.getErrorMessage()) + request.setResponseCode(500) + request.finish() + + def _validate(self, request): + for arg in request.postpath: + if arg and not VALID_STRINGS.match(arg): + raise Exception('Invalid blob resource argument: %s' % arg) + return request.postpath + + +if __name__ == '__main__': + # A dummy blob server + # curl -X PUT --data-binary @/tmp/book.pdf localhost:9000/user/someid + # curl -X GET -o /dev/null localhost:9000/user/somerandomstring + from twisted.python import log + import sys + log.startLogging(sys.stdout) + + from twisted.web.server import Site + from twisted.internet import reactor + + # parse command line arguments + import argparse + + parser = argparse.ArgumentParser() + parser.add_argument('--port', default=9000, type=int) + parser.add_argument('--path', default='/tmp/blobs/user') + args = parser.parse_args() + + root = BlobsResource("filesystem", args.path) + # I picture somethink like + # BlobsResource(backend="filesystem", backend_opts={'path': '/tmp/blobs'}) + + factory = Site(root) + reactor.listenTCP(args.port, factory) + reactor.run() diff --git a/src/leap/soledad/server/_config.py b/src/leap/soledad/server/_config.py new file mode 100644 index 00000000..e89e70d6 --- /dev/null +++ b/src/leap/soledad/server/_config.py @@ -0,0 +1,82 @@ +# -*- coding: utf-8 -*- +# config.py +# Copyright (C) 2016 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +import configparser + + +__all__ = ['get_config'] + + +CONFIG_DEFAULTS = { + 'soledad-server': { + 'couch_url': 'http://localhost:5984', + 'create_cmd': None, + 'admin_netrc': '/etc/couchdb/couchdb-admin.netrc', + 'batching': True, + 'blobs': False, + 'blobs_path': '/srv/leap/soledad/blobs', + }, + 'database-security': { + 'members': ['soledad'], + 'members_roles': [], + 'admins': [], + 'admins_roles': [] + } +} + + +_config = None + + +def get_config(section='soledad-server'): + global _config + if not _config: + _config = _load_config('/etc/soledad/soledad-server.conf') + return _config[section] + + +def _load_config(file_path): + """ + Load server configuration from file. + + @param file_path: The path to the configuration file. + @type file_path: str + + @return: A dictionary with the configuration. + @rtype: dict + """ + conf = dict(CONFIG_DEFAULTS) + config = configparser.SafeConfigParser() + config.read(file_path) + for section in conf: + if not config.has_section(section): + continue + for key, value in conf[section].items(): + if not config.has_option(section, key): + continue + elif type(value) == bool: + conf[section][key] = config.getboolean(section, key) + elif type(value) == list: + values = config.get(section, key).split(',') + values = [v.strip() for v in values] + conf[section][key] = values + else: + conf[section][key] = config.get(section, key) + # TODO: implement basic parsing/sanitization of options comming from + # config file. + return conf diff --git a/src/leap/soledad/server/_resource.py b/src/leap/soledad/server/_resource.py new file mode 100644 index 00000000..49c4b742 --- /dev/null +++ b/src/leap/soledad/server/_resource.py @@ -0,0 +1,86 @@ +# -*- coding: utf-8 -*- +# resource.py +# Copyright (C) 2016 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +A twisted resource that serves the Soledad Server. +""" +from twisted.web.resource import Resource + +from ._server_info import ServerInfo +from ._wsgi import get_sync_resource + + +__all__ = ['SoledadResource', 'SoledadAnonResource'] + + +class _Robots(Resource): + def render_GET(self, request): + return ( + 'User-agent: *\n' + 'Disallow: /\n' + '# you are not a robot, are you???') + + +class SoledadAnonResource(Resource): + + """ + The parts of Soledad Server that unauthenticated users can see. + This is nice because this means that a non-authenticated user will get 404 + for anything that is not in this minimal resource tree. + """ + + def __init__(self, enable_blobs=False): + Resource.__init__(self) + server_info = ServerInfo(enable_blobs) + self.putChild('', server_info) + self.putChild('robots.txt', _Robots()) + + +class SoledadResource(Resource): + """ + This is a dummy twisted resource, used only to allow different entry points + for the Soledad Server. + """ + + def __init__(self, blobs_resource=None, sync_pool=None): + """ + Initialize the Soledad resource. + + :param blobs_resource: a resource to serve blobs, if enabled. + :type blobs_resource: _blobs.BlobsResource + + :param sync_pool: A pool to pass to the WSGI sync resource. + :type sync_pool: twisted.python.threadpool.ThreadPool + """ + Resource.__init__(self) + + # requests to / return server information + server_info = ServerInfo(bool(blobs_resource)) + self.putChild('', server_info) + + # requests to /blobs will serve blobs if enabled + if blobs_resource: + self.putChild('blobs', blobs_resource) + + # other requests are routed to legacy sync resource + self._sync_resource = get_sync_resource(sync_pool) + + def getChild(self, path, request): + """ + Route requests to legacy WSGI sync resource dynamically. + """ + request.postpath.insert(0, request.prepath.pop()) + return self._sync_resource diff --git a/src/leap/soledad/server/_server_info.py b/src/leap/soledad/server/_server_info.py new file mode 100644 index 00000000..50659338 --- /dev/null +++ b/src/leap/soledad/server/_server_info.py @@ -0,0 +1,44 @@ +# -*- coding: utf-8 -*- +# _server_info.py +# Copyright (C) 2017 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Resource that announces information about the server. +""" +import json + +from twisted.web.resource import Resource + +from leap.soledad.server import __version__ + + +__all__ = ['ServerInfo'] + + +class ServerInfo(Resource): + """ + Return information about the server. + """ + + isLeaf = True + + def __init__(self, blobs_enabled): + self._info = { + "blobs": blobs_enabled, + "version": __version__ + } + + def render_GET(self, request): + return json.dumps(self._info) diff --git a/src/leap/soledad/server/_wsgi.py b/src/leap/soledad/server/_wsgi.py new file mode 100644 index 00000000..f6ff6b26 --- /dev/null +++ b/src/leap/soledad/server/_wsgi.py @@ -0,0 +1,65 @@ +# -*- coding: utf-8 -*- +# application.py +# Copyright (C) 2016 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +A WSGI application that serves Soledad synchronization. +""" +from twisted.internet import reactor +from twisted.web.wsgi import WSGIResource + +from leap.soledad.server import SoledadApp +from leap.soledad.server.gzip_middleware import GzipMiddleware +from leap.soledad.common.backend import SoledadBackend +from leap.soledad.common.couch.state import CouchServerState +from leap.soledad.common.log import getLogger + +from twisted.logger import Logger +log = Logger() + +__all__ = ['init_couch_state', 'get_sync_resource'] + + +def _get_couch_state(conf): + state = CouchServerState(conf['couch_url'], create_cmd=conf['create_cmd'], + check_schema_versions=True) + SoledadBackend.BATCH_SUPPORT = conf.get('batching', False) + return state + + +_app = SoledadApp(None) # delay state init +wsgi_application = GzipMiddleware(_app) + + +# During its initialization, the couch state verifies if all user databases +# contain a config document with the correct couch schema version stored, and +# will log an error and raise an exception if that is not the case. +# +# If this verification made too early (i.e. before the reactor has started and +# the twistd web logging facilities have been setup), the logging will not +# work. Because of that, we delay couch state initialization until the reactor +# is running. + +def init_couch_state(conf): + try: + _app.state = _get_couch_state(conf) + except Exception as e: + logger = getLogger() + logger.error(str(e)) + reactor.stop() + + +def get_sync_resource(pool): + return WSGIResource(reactor, pool, wsgi_application) diff --git a/src/leap/soledad/server/auth.py b/src/leap/soledad/server/auth.py new file mode 100644 index 00000000..1357b289 --- /dev/null +++ b/src/leap/soledad/server/auth.py @@ -0,0 +1,173 @@ +# -*- coding: utf-8 -*- +# auth.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Twisted http token auth. +""" +import binascii +import time + +from hashlib import sha512 +from zope.interface import implementer + +from twisted.cred import error +from twisted.cred.checkers import ICredentialsChecker +from twisted.cred.credentials import IUsernamePassword +from twisted.cred.credentials import IAnonymous +from twisted.cred.credentials import Anonymous +from twisted.cred.credentials import UsernamePassword +from twisted.cred.portal import IRealm +from twisted.cred.portal import Portal +from twisted.internet import defer +from twisted.logger import Logger +from twisted.web.iweb import ICredentialFactory +from twisted.web.resource import IResource + +from leap.soledad.common.couch import couch_server + +from ._resource import SoledadResource, SoledadAnonResource +from ._blobs import BlobsResource +from ._config import get_config + + +log = Logger() + + +@implementer(IRealm) +class SoledadRealm(object): + + def __init__(self, sync_pool, conf=None): + assert sync_pool is not None + if conf is None: + conf = get_config() + blobs = conf['blobs'] + blobs_resource = BlobsResource("filesystem", + conf['blobs_path']) if blobs else None + self.anon_resource = SoledadAnonResource( + enable_blobs=blobs) + self.auth_resource = SoledadResource( + blobs_resource=blobs_resource, + sync_pool=sync_pool) + + def requestAvatar(self, avatarId, mind, *interfaces): + + # Anonymous access + if IAnonymous.providedBy(avatarId): + return (IResource, self.anon_resource, + lambda: None) + + # Authenticated access + else: + if IResource in interfaces: + return (IResource, self.auth_resource, + lambda: None) + raise NotImplementedError() + + +@implementer(ICredentialsChecker) +class TokenChecker(object): + + credentialInterfaces = [IUsernamePassword, IAnonymous] + + TOKENS_DB_PREFIX = "tokens_" + TOKENS_DB_EXPIRE = 30 * 24 * 3600 # 30 days in seconds + TOKENS_TYPE_KEY = "type" + TOKENS_TYPE_DEF = "Token" + TOKENS_USER_ID_KEY = "user_id" + + def __init__(self): + self._couch_url = get_config().get('couch_url') + + def _get_server(self): + return couch_server(self._couch_url) + + def _tokens_dbname(self): + # the tokens db rotates every 30 days, and the current db name is + # "tokens_NNN", where NNN is the number of seconds since epoch + # divide dby the rotate period in seconds. When rotating, old and + # new tokens db coexist during a certain window of time and valid + # tokens are replicated from the old db to the new one. See: + # https://leap.se/code/issues/6785 + dbname = self.TOKENS_DB_PREFIX + \ + str(int(time.time() / self.TOKENS_DB_EXPIRE)) + return dbname + + def _tokens_db(self): + dbname = self._tokens_dbname() + + # TODO -- leaking abstraction here: this module shouldn't need + # to known anything about the context manager. hide that in the couch + # module + with self._get_server() as server: + db = server[dbname] + return db + + def requestAvatarId(self, credentials): + if IAnonymous.providedBy(credentials): + return defer.succeed(Anonymous()) + + uuid = credentials.username + token = credentials.password + + # lookup key is a hash of the token to prevent timing attacks. + # TODO cache the tokens already! + + db = self._tokens_db() + token = db.get(sha512(token).hexdigest()) + if token is None: + return defer.fail(error.UnauthorizedLogin()) + + # TODO -- use cryptography constant time builtin comparison. + # we compare uuid hashes to avoid possible timing attacks that + # might exploit python's builtin comparison operator behaviour, + # which fails immediatelly when non-matching bytes are found. + couch_uuid_hash = sha512(token[self.TOKENS_USER_ID_KEY]).digest() + req_uuid_hash = sha512(uuid).digest() + if token[self.TOKENS_TYPE_KEY] != self.TOKENS_TYPE_DEF \ + or couch_uuid_hash != req_uuid_hash: + return defer.fail(error.UnauthorizedLogin()) + + return defer.succeed(uuid) + + +@implementer(ICredentialFactory) +class TokenCredentialFactory(object): + + scheme = 'token' + + def getChallenge(self, request): + return {} + + def decode(self, response, request): + try: + creds = binascii.a2b_base64(response + b'===') + except binascii.Error: + raise error.LoginFailed('Invalid credentials') + + creds = creds.split(b':', 1) + if len(creds) == 2: + return UsernamePassword(*creds) + else: + raise error.LoginFailed('Invalid credentials') + + +def portalFactory(sync_pool): + realm = SoledadRealm(sync_pool=sync_pool) + checker = TokenChecker() + return Portal(realm, [checker]) + + +credentialFactory = TokenCredentialFactory() diff --git a/src/leap/soledad/server/caching.py b/src/leap/soledad/server/caching.py new file mode 100644 index 00000000..9a049a39 --- /dev/null +++ b/src/leap/soledad/server/caching.py @@ -0,0 +1,32 @@ +# -*- coding: utf-8 -*- +# caching.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Server side caching. Using beaker for now. +""" +from beaker.cache import CacheManager + + +def setup_caching(): + _cache_manager = CacheManager(type='memory') + return _cache_manager + + +_cache_manager = setup_caching() + + +def get_cache_for(key, expire=3600): + return _cache_manager.get_cache(key, expire=expire) diff --git a/src/leap/soledad/server/entrypoint.py b/src/leap/soledad/server/entrypoint.py new file mode 100644 index 00000000..c06b740e --- /dev/null +++ b/src/leap/soledad/server/entrypoint.py @@ -0,0 +1,50 @@ +# -*- coding: utf-8 -*- +# entrypoint.py +# Copyright (C) 2016 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +The entrypoint for Soledad server. + +This is the entrypoint for the application that is loaded from the initscript +or the systemd script. +""" + +from twisted.internet import reactor +from twisted.python import threadpool + +from .auth import portalFactory +from .session import SoledadSession +from ._config import get_config +from ._wsgi import init_couch_state + + +# load configuration from file +conf = get_config() + + +class SoledadEntrypoint(SoledadSession): + + def __init__(self): + pool = threadpool.ThreadPool(name='wsgi') + reactor.callWhenRunning(pool.start) + reactor.addSystemEventTrigger('after', 'shutdown', pool.stop) + portal = portalFactory(pool) + SoledadSession.__init__(self, portal) + + +# see the comments in application.py recarding why couch state has to be +# initialized when the reactor is running + +reactor.callWhenRunning(init_couch_state, conf) diff --git a/src/leap/soledad/server/gzip_middleware.py b/src/leap/soledad/server/gzip_middleware.py new file mode 100644 index 00000000..c77f9f67 --- /dev/null +++ b/src/leap/soledad/server/gzip_middleware.py @@ -0,0 +1,67 @@ +# -*- coding: utf-8 -*- +# gzip_middleware.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Gzip middleware for WSGI apps. +""" +from six import StringIO +from gzip import GzipFile + + +class GzipMiddleware(object): + """ + GzipMiddleware class for WSGI. + """ + def __init__(self, app, compresslevel=9): + self.app = app + self.compresslevel = compresslevel + + def __call__(self, environ, start_response): + if 'gzip' not in environ.get('HTTP_ACCEPT_ENCODING', ''): + return self.app(environ, start_response) + + buffer = StringIO.StringIO() + output = GzipFile( + mode='wb', + compresslevel=self.compresslevel, + fileobj=buffer + ) + + start_response_args = [] + + def dummy_start_response(status, headers, exc_info=None): + start_response_args.append(status) + start_response_args.append(headers) + start_response_args.append(exc_info) + return output.write + + app_iter = self.app(environ, dummy_start_response) + for line in app_iter: + output.write(line) + if hasattr(app_iter, 'close'): + app_iter.close() + output.close() + buffer.seek(0) + result = buffer.getvalue() + headers = [] + for name, value in start_response_args[1]: + if name.lower() != 'content-length': + headers.append((name, value)) + headers.append(('Content-Length', str(len(result)))) + headers.append(('Content-Encoding', 'gzip')) + start_response(start_response_args[0], headers, start_response_args[2]) + buffer.close() + return [result] diff --git a/src/leap/soledad/server/interfaces.py b/src/leap/soledad/server/interfaces.py new file mode 100644 index 00000000..67b04bc3 --- /dev/null +++ b/src/leap/soledad/server/interfaces.py @@ -0,0 +1,72 @@ +# -*- coding: utf-8 -*- +# interfaces.py +# Copyright (C) 2017 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +from zope.interface import Interface + + +class IBlobsBackend(Interface): + + """ + An interface for a BlobsBackend. + """ + + def read_blob(user, blob_id, request): + """ + Read blob with a given blob_id, and write it to the passed request. + + :returns: a deferred that fires upon finishing. + """ + + def write_blob(user, blob_id, request): + """ + Write blob to the storage, reading it from the passed request. + + :returns: a deferred that fires upon finishing. + """ + + def delete_blob(user, blob_id): + """ + Delete the given blob_id. + """ + + def get_blob_size(user, blob_id): + """ + Get the size of the given blob id. + """ + + def list_blobs(user, request): + """ + Returns a json-encoded list of ids from user's blob. + + :returns: a deferred that fires upon finishing. + """ + + def get_total_storage(user): + """ + Get the size used by a given user as the sum of all the blobs stored + unders its namespace. + """ + + def add_tag_header(user, blob_id, request): + """ + Adds a header 'Tag' to the passed request object, containing the last + 16 bytes of the encoded blob, which according to the spec contains the + tag. + + :returns: a deferred that fires upon finishing. + """ diff --git a/src/leap/soledad/server/session.py b/src/leap/soledad/server/session.py new file mode 100644 index 00000000..1c1b5345 --- /dev/null +++ b/src/leap/soledad/server/session.py @@ -0,0 +1,107 @@ +# -*- coding: utf-8 -*- +# session.py +# Copyright (C) 2017 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Twisted resource containing an authenticated Soledad session. +""" +from zope.interface import implementer + +from twisted.cred.credentials import Anonymous +from twisted.cred import error +from twisted.python import log +from twisted.web import util +from twisted.web._auth import wrapper +from twisted.web.guard import HTTPAuthSessionWrapper +from twisted.web.resource import ErrorPage +from twisted.web.resource import IResource + +from leap.soledad.server.auth import credentialFactory +from leap.soledad.server.url_mapper import URLMapper + + +@implementer(IResource) +class UnauthorizedResource(wrapper.UnauthorizedResource): + isLeaf = True + + def __init__(self): + pass + + def render(self, request): + request.setResponseCode(401) + if request.method == b'HEAD': + return b'' + return b'Unauthorized' + + def getChildWithDefault(self, path, request): + return self + + +@implementer(IResource) +class SoledadSession(HTTPAuthSessionWrapper): + + def __init__(self, portal): + self._mapper = URLMapper() + self._portal = portal + self._credentialFactory = credentialFactory + # expected by the contract of the parent class + self._credentialFactories = [credentialFactory] + + def _matchPath(self, request): + match = self._mapper.match(request.path, request.method) + return match + + def _parseHeader(self, header): + elements = header.split(b' ') + scheme = elements[0].lower() + if scheme == self._credentialFactory.scheme: + return (b' '.join(elements[1:])) + return None + + def _authorizedResource(self, request): + # check whether the path of the request exists in the app + match = self._matchPath(request) + if not match: + return UnauthorizedResource() + + # get authorization header or fail + header = request.getHeader(b'authorization') + if not header: + return util.DeferredResource(self._login(Anonymous())) + + # parse the authorization header + auth_data = self._parseHeader(header) + if not auth_data: + return UnauthorizedResource() + + # decode the credentials from the parsed header + try: + credentials = self._credentialFactory.decode(auth_data, request) + except error.LoginFailed: + return UnauthorizedResource() + except: + # If you port this to the newer log facility, be aware that + # the tests rely on the error to be logged. + log.err(None, "Unexpected failure from credentials factory") + return ErrorPage(500, None, None) + + # make sure the uuid given in path corresponds to the one given in + # the credentials + request_uuid = match.get('uuid') + if request_uuid and request_uuid != credentials.username: + return ErrorPage(500, None, None) + + # if all checks pass, try to login with credentials + return util.DeferredResource(self._login(credentials)) diff --git a/src/leap/soledad/server/state.py b/src/leap/soledad/server/state.py new file mode 100644 index 00000000..f269b77e --- /dev/null +++ b/src/leap/soledad/server/state.py @@ -0,0 +1,141 @@ +# -*- coding: utf-8 -*- +# state.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Server side synchronization infrastructure. +""" +from leap.soledad.server import caching + + +class ServerSyncState(object): + """ + The state of one sync session, as stored on backend server. + + On server side, the ongoing syncs metadata is maintained in + a caching layer. + """ + + def __init__(self, source_replica_uid, sync_id): + """ + Initialize the sync state object. + + :param sync_id: The id of current sync + :type sync_id: str + :param source_replica_uid: The source replica uid + :type source_replica_uid: str + """ + self._source_replica_uid = source_replica_uid + self._sync_id = sync_id + caching_key = source_replica_uid + sync_id + self._storage = caching.get_cache_for(caching_key) + + def _put_dict_info(self, key, value): + """ + Put some information about the sync state. + + :param key: The key for the info to be put. + :type key: str + :param value: The value for the info to be put. + :type value: str + """ + if key not in self._storage: + self._storage[key] = [] + info_list = self._storage.get(key) + info_list.append(value) + self._storage[key] = info_list + + def put_seen_id(self, seen_id, gen): + """ + Put one seen id on the sync state. + + :param seen_id: The doc_id of a document seen during sync. + :type seen_id: str + :param gen: The corresponding db generation. + :type gen: int + """ + self._put_dict_info( + 'seen_id', + (seen_id, gen)) + + def seen_ids(self): + """ + Return all document ids seen during the sync. + + :return: A dict with doc ids seen during the sync. + :rtype: dict + """ + if 'seen_id' in self._storage: + seen_ids = self._storage.get('seen_id') + else: + seen_ids = [] + return dict(seen_ids) + + def put_changes_to_return(self, gen, trans_id, changes_to_return): + """ + Put the calculated changes to return in the backend sync state. + + :param gen: The target database generation that will be synced. + :type gen: int + :param trans_id: The target database transaction id that will be + synced. + :type trans_id: str + :param changes_to_return: A list of tuples with the changes to be + returned during the sync process. + :type changes_to_return: list + """ + self._put_dict_info( + 'changes_to_return', + { + 'gen': gen, + 'trans_id': trans_id, + 'changes_to_return': changes_to_return, + } + ) + + def sync_info(self): + """ + Return information about the current sync state. + + :return: The generation and transaction id of the target database + which will be synced, and the number of documents to return, + or a tuple of Nones if those have not already been sent to + server. + :rtype: tuple + """ + gen = trans_id = number_of_changes = None + if 'changes_to_return' in self._storage: + info = self._storage.get('changes_to_return')[0] + gen = info['gen'] + trans_id = info['trans_id'] + number_of_changes = len(info['changes_to_return']) + return gen, trans_id, number_of_changes + + def next_change_to_return(self, received): + """ + Return the next change to be returned to the source syncing replica. + + :param received: How many documents the source replica has already + received during the current sync process. + :type received: int + """ + gen = trans_id = next_change_to_return = None + if 'changes_to_return' in self._storage: + info = self._storage.get('changes_to_return')[0] + gen = info['gen'] + trans_id = info['trans_id'] + if received < len(info['changes_to_return']): + next_change_to_return = (info['changes_to_return'][received]) + return gen, trans_id, next_change_to_return diff --git a/src/leap/soledad/server/sync.py b/src/leap/soledad/server/sync.py new file mode 100644 index 00000000..6791c06c --- /dev/null +++ b/src/leap/soledad/server/sync.py @@ -0,0 +1,305 @@ +# -*- coding: utf-8 -*- +# sync.py +# Copyright (C) 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Server side synchronization infrastructure. +""" +import time +from six.moves import zip as izip + +from leap.soledad.common.l2db import sync +from leap.soledad.common.l2db.remote import http_app +from leap.soledad.server.caching import get_cache_for +from leap.soledad.server.state import ServerSyncState +from leap.soledad.common.document import ServerDocument + + +MAX_REQUEST_SIZE = float('inf') # It's a stream. +MAX_ENTRY_SIZE = 200 # in Mb +ENTRY_CACHE_SIZE = 8192 * 1024 + + +class SyncExchange(sync.SyncExchange): + + def __init__(self, db, source_replica_uid, last_known_generation, sync_id): + """ + :param db: The target syncing database. + :type db: SoledadBackend + :param source_replica_uid: The uid of the source syncing replica. + :type source_replica_uid: str + :param last_known_generation: The last target replica generation the + source replica knows about. + :type last_known_generation: int + :param sync_id: The id of the current sync session. + :type sync_id: str + """ + self._db = db + self.source_replica_uid = source_replica_uid + self.source_last_known_generation = last_known_generation + self.sync_id = sync_id + self.new_gen = None + self.new_trans_id = None + self._trace_hook = None + # recover sync state + self._sync_state = ServerSyncState(self.source_replica_uid, sync_id) + + def find_changes_to_return(self): + """ + Find changes to return. + + Find changes since last_known_generation in db generation + order using whats_changed. It excludes documents ids that have + already been considered (superseded by the sender, etc). + + :return: the generation of this database, which the caller can + consider themselves to be synchronized after processing + allreturned documents, and the amount of documents to be sent + to the source syncing replica. + :rtype: int + """ + # check if changes to return have already been calculated + new_gen, new_trans_id, number_of_changes = self._sync_state.sync_info() + if number_of_changes is None: + self._trace('before whats_changed') + new_gen, new_trans_id, changes = self._db.whats_changed( + self.source_last_known_generation) + self._trace('after whats_changed') + seen_ids = self._sync_state.seen_ids() + # changed docs that weren't superseded by or converged with + self.changes_to_return = [ + (doc_id, gen, trans_id) for (doc_id, gen, trans_id) in changes + # there was a subsequent update + if doc_id not in seen_ids or seen_ids.get(doc_id) < gen] + self._sync_state.put_changes_to_return( + new_gen, new_trans_id, self.changes_to_return) + number_of_changes = len(self.changes_to_return) + self.new_gen = new_gen + self.new_trans_id = new_trans_id + return self.new_gen, number_of_changes + + def return_docs(self, return_doc_cb): + """Return the changed documents and their last change generation + repeatedly invoking the callback return_doc_cb. + + The final step of a sync exchange. + + :param: return_doc_cb(doc, gen, trans_id): is a callback + used to return the documents with their last change generation + to the target replica. + :return: None + """ + changes_to_return = self.changes_to_return + # return docs, including conflicts. + # content as a file-object (will be read when writing) + changed_doc_ids = [doc_id for doc_id, _, _ in changes_to_return] + docs = self._db.get_docs( + changed_doc_ids, check_for_conflicts=False, + include_deleted=True, read_content=False) + + docs_by_gen = izip( + docs, (gen for _, gen, _ in changes_to_return), + (trans_id for _, _, trans_id in changes_to_return)) + for doc, gen, trans_id in docs_by_gen: + return_doc_cb(doc, gen, trans_id) + + def batched_insert_from_source(self, entries, sync_id): + if not entries: + return + self._db.batch_start() + for entry in entries: + doc, gen, trans_id, number_of_docs, doc_idx = entry + self.insert_doc_from_source(doc, gen, trans_id, number_of_docs, + doc_idx, sync_id) + self._db.batch_end() + + def insert_doc_from_source( + self, doc, source_gen, trans_id, + number_of_docs=None, doc_idx=None, sync_id=None): + """Try to insert synced document from source. + + Conflicting documents are not inserted but will be sent over + to the sync source. + + It keeps track of progress by storing the document source + generation as well. + + The 1st step of a sync exchange is to call this repeatedly to + try insert all incoming documents from the source. + + :param doc: A Document object. + :type doc: Document + :param source_gen: The source generation of doc. + :type source_gen: int + :param trans_id: The transaction id of that document change. + :type trans_id: str + :param number_of_docs: The total amount of documents sent on this sync + session. + :type number_of_docs: int + :param doc_idx: The index of the current document. + :type doc_idx: int + :param sync_id: The id of the current sync session. + :type sync_id: str + """ + state, at_gen = self._db._put_doc_if_newer( + doc, save_conflict=False, replica_uid=self.source_replica_uid, + replica_gen=source_gen, replica_trans_id=trans_id, + number_of_docs=number_of_docs, doc_idx=doc_idx, sync_id=sync_id) + if state == 'inserted': + self._sync_state.put_seen_id(doc.doc_id, at_gen) + elif state == 'converged': + # magical convergence + self._sync_state.put_seen_id(doc.doc_id, at_gen) + elif state == 'superseded': + # we have something newer that we will return + pass + else: + # conflict that we will returne + assert state == 'conflicted' + + +class SyncResource(http_app.SyncResource): + + max_request_size = MAX_REQUEST_SIZE * 1024 * 1024 + max_entry_size = MAX_ENTRY_SIZE * 1024 * 1024 + + sync_exchange_class = SyncExchange + + @http_app.http_method( + last_known_generation=int, last_known_trans_id=http_app.none_or_str, + sync_id=http_app.none_or_str, content_as_args=True) + def post_args(self, last_known_generation, last_known_trans_id=None, + sync_id=None, ensure=False): + """ + Handle the initial arguments for the sync POST request from client. + + :param last_known_generation: The last server replica generation the + client knows about. + :type last_known_generation: int + :param last_known_trans_id: The last server replica transaction_id the + client knows about. + :type last_known_trans_id: str + :param sync_id: The id of the current sync session. + :type sync_id: str + :param ensure: Whether the server replica should be created if it does + not already exist. + :type ensure: bool + """ + # create or open the database + cache = get_cache_for('db-' + sync_id + self.dbname, expire=120) + if ensure: + db, self.replica_uid = self.state.ensure_database(self.dbname) + else: + db = self.state.open_database(self.dbname) + db.init_caching(cache) + # validate the information the client has about server replica + db.validate_gen_and_trans_id( + last_known_generation, last_known_trans_id) + # get a sync exchange object + self.sync_exch = self.sync_exchange_class( + db, self.source_replica_uid, last_known_generation, sync_id) + self._sync_id = sync_id + self._staging = [] + self._staging_size = 0 + + @http_app.http_method(content_as_args=True) + def post_put( + self, id, rev, content, gen, + trans_id, number_of_docs, doc_idx): + """ + Put one incoming document into the server replica. + + :param id: The id of the incoming document. + :type id: str + :param rev: The revision of the incoming document. + :type rev: str + :param content: The content of the incoming document. + :type content: dict + :param gen: The source replica generation corresponding to the + revision of the incoming document. + :type gen: int + :param trans_id: The source replica transaction id corresponding to + the revision of the incoming document. + :type trans_id: str + :param number_of_docs: The total amount of documents sent on this sync + session. + :type number_of_docs: int + :param doc_idx: The index of the current document. + :type doc_idx: int + """ + doc = ServerDocument(id, rev, json=content) + self._staging_size += len(content or '') + self._staging.append((doc, gen, trans_id, number_of_docs, doc_idx)) + if self._staging_size > ENTRY_CACHE_SIZE or doc_idx == number_of_docs: + self.sync_exch.batched_insert_from_source(self._staging, + self._sync_id) + self._staging = [] + self._staging_size = 0 + + def post_get(self): + """ + Return syncing documents to the client. + """ + def send_doc(doc, gen, trans_id): + entry = dict(id=doc.doc_id, rev=doc.rev, + gen=gen, trans_id=trans_id) + self.responder.stream_entry(entry) + content_reader = doc.get_json() + if content_reader: + content = content_reader.read() + self.responder.stream_entry(content) + content_reader.close() + # throttle at 5mb/s + # FIXME: twistd cant control througput + # we need to either use gunicorn or go async + time.sleep(len(content) / (5.0 * 1024 * 1024)) + else: + self.responder.stream_entry('') + + new_gen, number_of_changes = \ + self.sync_exch.find_changes_to_return() + self.responder.content_type = 'application/x-u1db-sync-response' + self.responder.start_response(200) + self.responder.start_stream(), + header = { + "new_generation": new_gen, + "new_transaction_id": self.sync_exch.new_trans_id, + "number_of_changes": number_of_changes, + } + if self.replica_uid is not None: + header['replica_uid'] = self.replica_uid + self.responder.stream_entry(header) + self.sync_exch.return_docs(send_doc) + self.responder.end_stream() + self.responder.finish_response() + + def post_end(self): + """ + Return the current generation and transaction_id after inserting one + incoming document. + """ + self.responder.content_type = 'application/x-soledad-sync-response' + self.responder.start_response(200) + self.responder.start_stream(), + new_gen, new_trans_id = self.sync_exch._db._get_generation_info() + header = { + "new_generation": new_gen, + "new_transaction_id": new_trans_id, + } + if self.replica_uid is not None: + header['replica_uid'] = self.replica_uid + self.responder.stream_entry(header) + self.responder.end_stream() + self.responder.finish_response() diff --git a/src/leap/soledad/server/url_mapper.py b/src/leap/soledad/server/url_mapper.py new file mode 100644 index 00000000..b50a81cd --- /dev/null +++ b/src/leap/soledad/server/url_mapper.py @@ -0,0 +1,77 @@ +# -*- coding: utf-8 -*- +# url_mapper.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +An URL mapper that represents authorized paths. +""" +from routes.mapper import Mapper + +from leap.soledad.common import SHARED_DB_NAME +from leap.soledad.common.l2db import DBNAME_CONSTRAINTS + + +class URLMapper(object): + """ + Maps the URLs users can access. + """ + + def __init__(self): + self._map = Mapper(controller_scan=None) + self._connect_urls() + self._map.create_regs() + + def match(self, path, method): + environ = {'PATH_INFO': path, 'REQUEST_METHOD': method} + return self._map.match(environ=environ) + + def _connect(self, pattern, http_methods): + self._map.connect( + None, pattern, http_methods=http_methods, + conditions=dict(method=http_methods), + requirements={'dbname': DBNAME_CONSTRAINTS}) + + def _connect_urls(self): + """ + Register the authorization info in the mapper using C{SHARED_DB_NAME} + as the user's database name. + + This method sets up the following authorization rules: + + URL path | Authorized actions + ---------------------------------------------------- + / | GET + /robots.txt | GET + /shared-db | GET + /shared-db/doc/{any_id} | GET, PUT, DELETE + /user-{uuid}/sync-from/{source} | GET, PUT, POST + /blobs/{uuid}/{blob_id} | GET, PUT, POST + /blobs/{uuid} | GET + """ + # auth info for global resource + self._connect('/', ['GET']) + # robots + self._connect('/robots.txt', ['GET']) + # auth info for shared-db database resource + self._connect('/%s' % SHARED_DB_NAME, ['GET']) + # auth info for shared-db doc resource + self._connect('/%s/doc/{id:.*}' % SHARED_DB_NAME, + ['GET', 'PUT', 'DELETE']) + # auth info for user-db sync resource + self._connect('/user-{uuid}/sync-from/{source_replica_uid}', + ['GET', 'PUT', 'POST']) + # auth info for blobs resource + self._connect('/blobs/{uuid}/{blob_id}', ['GET', 'PUT']) + self._connect('/blobs/{uuid}', ['GET']) |