summaryrefslogtreecommitdiff
path: root/src/leap/soledad/server
diff options
context:
space:
mode:
authordrebs <drebs@leap.se>2017-06-18 11:18:10 -0300
committerKali Kaneko <kali@leap.se>2017-06-24 00:49:17 +0200
commit3e94cafa43d464d73815e21810b97a4faf54136d (patch)
tree04f5152e3dfc9f27b7dca2368c0eb8b3f094f5b5 /src/leap/soledad/server
parent7d8ee786b086e47264619df3efa73e74440fd068 (diff)
[pkg] unify client and server into a single python package
We have been discussing about this merge for a while. Its main goal is to simplify things: code navigation, but also packaging. The rationale is that the code is more cohesive in this way, and there's only one source package to install. Dependencies that are only for the server or the client will not be installed by default, and they are expected to be provided by the environment. There are setuptools extras defined for the client and the server. Debianization is still expected to split the single source package into 3 binaries. Another avantage is that the documentation can now install a single package with a single step, and therefore include the docstrings into the generated docs. - Resolves: #8896
Diffstat (limited to 'src/leap/soledad/server')
-rw-r--r--src/leap/soledad/server/__init__.py192
-rw-r--r--src/leap/soledad/server/_blobs.py234
-rw-r--r--src/leap/soledad/server/_config.py82
-rw-r--r--src/leap/soledad/server/_resource.py86
-rw-r--r--src/leap/soledad/server/_server_info.py44
-rw-r--r--src/leap/soledad/server/_wsgi.py65
-rw-r--r--src/leap/soledad/server/auth.py173
-rw-r--r--src/leap/soledad/server/caching.py32
-rw-r--r--src/leap/soledad/server/entrypoint.py50
-rw-r--r--src/leap/soledad/server/gzip_middleware.py67
-rw-r--r--src/leap/soledad/server/interfaces.py72
-rw-r--r--src/leap/soledad/server/session.py107
-rw-r--r--src/leap/soledad/server/state.py141
-rw-r--r--src/leap/soledad/server/sync.py305
-rw-r--r--src/leap/soledad/server/url_mapper.py77
15 files changed, 1727 insertions, 0 deletions
diff --git a/src/leap/soledad/server/__init__.py b/src/leap/soledad/server/__init__.py
new file mode 100644
index 00000000..a4080f13
--- /dev/null
+++ b/src/leap/soledad/server/__init__.py
@@ -0,0 +1,192 @@
+# -*- coding: utf-8 -*-
+# server.py
+# Copyright (C) 2013 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+"""
+The Soledad Server allows for recovery document storage and database
+synchronization.
+"""
+
+import six.moves.urllib.parse as urlparse
+import sys
+
+from leap.soledad.common.l2db.remote import http_app, utils
+from leap.soledad.common import SHARED_DB_NAME
+
+from .sync import SyncResource
+from .sync import MAX_REQUEST_SIZE
+from .sync import MAX_ENTRY_SIZE
+
+from ._version import get_versions
+from ._config import get_config
+
+
+__all__ = [
+ 'SoledadApp',
+ 'get_config',
+ '__version__',
+]
+
+
+# ----------------------------------------------------------------------------
+# Soledad WSGI application
+# ----------------------------------------------------------------------------
+
+
+class SoledadApp(http_app.HTTPApp):
+ """
+ Soledad WSGI application
+ """
+
+ SHARED_DB_NAME = SHARED_DB_NAME
+ """
+ The name of the shared database that holds user's encrypted secrets.
+ """
+
+ max_request_size = MAX_REQUEST_SIZE * 1024 * 1024
+ max_entry_size = MAX_ENTRY_SIZE * 1024 * 1024
+
+ def __call__(self, environ, start_response):
+ """
+ Handle a WSGI call to the Soledad application.
+
+ @param environ: Dictionary containing CGI variables.
+ @type environ: dict
+ @param start_response: Callable of the form start_response(status,
+ response_headers, exc_info=None).
+ @type start_response: callable
+
+ @return: HTTP application results.
+ @rtype: list
+ """
+ return http_app.HTTPApp.__call__(self, environ, start_response)
+
+
+# ----------------------------------------------------------------------------
+# WSGI resources registration
+# ----------------------------------------------------------------------------
+
+# monkey patch u1db with a new resource map
+http_app.url_to_resource = http_app.URLToResource()
+
+# register u1db unmodified resources
+http_app.url_to_resource.register(http_app.GlobalResource)
+http_app.url_to_resource.register(http_app.DatabaseResource)
+http_app.url_to_resource.register(http_app.DocsResource)
+http_app.url_to_resource.register(http_app.DocResource)
+
+# register Soledad's new or modified resources
+http_app.url_to_resource.register(SyncResource)
+
+
+# ----------------------------------------------------------------------------
+# Modified HTTP method invocation (to account for splitted sync)
+# ----------------------------------------------------------------------------
+
+class HTTPInvocationByMethodWithBody(
+ http_app.HTTPInvocationByMethodWithBody):
+ """
+ Invoke methods on a resource.
+ """
+
+ def __call__(self):
+ """
+ Call an HTTP method of a resource.
+
+ This method was rewritten to allow for a sync flow which uses one POST
+ request for each transferred document (back and forth).
+
+ Usual U1DB sync process transfers all documents from client to server
+ and back in only one POST request. This is inconvenient for some
+ reasons, as lack of possibility of gracefully interrupting the sync
+ process, and possible timeouts for when dealing with large documents
+ that have to be retrieved and encrypted/decrypted. Because of those,
+ we split the sync process into many POST requests.
+ """
+ args = urlparse.parse_qsl(self.environ['QUERY_STRING'],
+ strict_parsing=False)
+ try:
+ args = dict(
+ (k.decode('utf-8'), v.decode('utf-8')) for k, v in args)
+ except ValueError:
+ raise http_app.BadRequest()
+ method = self.environ['REQUEST_METHOD'].lower()
+ if method in ('get', 'delete'):
+ meth = self._lookup(method)
+ return meth(args, None)
+ else:
+ # we expect content-length > 0, reconsider if we move
+ # to support chunked enconding
+ try:
+ content_length = int(self.environ['CONTENT_LENGTH'])
+ except (ValueError, KeyError):
+ # raise http_app.BadRequest
+ content_length = self.max_request_size
+ if content_length <= 0:
+ raise http_app.BadRequest
+ if content_length > self.max_request_size:
+ raise http_app.BadRequest
+ reader = http_app._FencedReader(
+ self.environ['wsgi.input'], content_length,
+ self.max_entry_size)
+ content_type = self.environ.get('CONTENT_TYPE')
+ if content_type == 'application/json':
+ meth = self._lookup(method)
+ body = reader.read_chunk(sys.maxint)
+ return meth(args, body)
+ elif content_type.startswith('application/x-soledad-sync'):
+ # read one line and validate it
+ body_getline = reader.getline
+ if body_getline().strip() != '[':
+ raise http_app.BadRequest()
+ line = body_getline()
+ line, comma = utils.check_and_strip_comma(line.strip())
+ meth_args = self._lookup('%s_args' % method)
+ meth_args(args, line)
+ # handle incoming documents
+ if content_type == 'application/x-soledad-sync-put':
+ meth_put = self._lookup('%s_put' % method)
+ meth_end = self._lookup('%s_end' % method)
+ while True:
+ entry = body_getline().strip()
+ if entry == ']': # end of incoming document stream
+ break
+ if not entry or not comma: # empty or no prec comma
+ raise http_app.BadRequest
+ entry, comma = utils.check_and_strip_comma(entry)
+ content = body_getline().strip()
+ content, comma = utils.check_and_strip_comma(content)
+ meth_put({'content': content or None}, entry)
+ if comma or body_getline(): # extra comma or data
+ raise http_app.BadRequest
+ return meth_end()
+ # handle outgoing documents
+ elif content_type == 'application/x-soledad-sync-get':
+ meth_get = self._lookup('%s_get' % method)
+ return meth_get()
+ else:
+ raise http_app.BadRequest()
+ else:
+ raise http_app.BadRequest()
+
+
+# monkey patch server with new http invocation
+http_app.HTTPInvocationByMethodWithBody = HTTPInvocationByMethodWithBody
+
+
+__version__ = get_versions()['version']
+del get_versions
diff --git a/src/leap/soledad/server/_blobs.py b/src/leap/soledad/server/_blobs.py
new file mode 100644
index 00000000..10678360
--- /dev/null
+++ b/src/leap/soledad/server/_blobs.py
@@ -0,0 +1,234 @@
+# -*- coding: utf-8 -*-
+# _blobs.py
+# Copyright (C) 2017 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+"""
+Blobs Server implementation.
+
+This is a very simplistic implementation for the time being.
+Clients should be able to opt-in util the feature is complete.
+
+A more performant BlobsBackend can (and should) be implemented for production
+environments.
+"""
+import os
+import base64
+import json
+import re
+
+from twisted.logger import Logger
+from twisted.web import static
+from twisted.web import resource
+from twisted.web.client import FileBodyProducer
+from twisted.web.server import NOT_DONE_YET
+from twisted.internet import utils, defer
+
+from zope.interface import implementer
+
+from leap.common.files import mkdir_p
+from leap.soledad.server import interfaces
+
+
+__all__ = ['BlobsResource']
+
+
+logger = Logger()
+
+# Used for sanitizers, we accept only letters, numbers, '-' and '_'
+VALID_STRINGS = re.compile('^[a-zA-Z0-9_-]+$')
+
+
+# for the future:
+# [ ] isolate user avatar in a safer way
+# [ ] catch timeout in the server (and delete incomplete upload)
+# [ ] chunking (should we do it on the client or on the server?)
+
+
+@implementer(interfaces.IBlobsBackend)
+class FilesystemBlobsBackend(object):
+
+ def __init__(self, blobs_path='/tmp/blobs/', quota=200 * 1024):
+ self.quota = quota
+ if not os.path.isdir(blobs_path):
+ os.makedirs(blobs_path)
+ self.path = blobs_path
+
+ def read_blob(self, user, blob_id, request):
+ logger.info('reading blob: %s - %s' % (user, blob_id))
+ path = self._get_path(user, blob_id)
+ logger.debug('blob path: %s' % path)
+ _file = static.File(path, defaultType='application/octet-stream')
+ return _file.render_GET(request)
+
+ @defer.inlineCallbacks
+ def write_blob(self, user, blob_id, request):
+ path = self._get_path(user, blob_id)
+ try:
+ mkdir_p(os.path.split(path)[0])
+ except OSError:
+ pass
+ if os.path.isfile(path):
+ # 409 - Conflict
+ request.setResponseCode(409)
+ request.write("Blob already exists: %s" % blob_id)
+ defer.returnValue(None)
+ used = yield self.get_total_storage(user)
+ if used > self.quota:
+ logger.error("Error 507: Quota exceeded for user: %s" % user)
+ request.setResponseCode(507)
+ request.write('Quota Exceeded!')
+ defer.returnValue(None)
+ logger.info('writing blob: %s - %s' % (user, blob_id))
+ fbp = FileBodyProducer(request.content)
+ yield fbp.startProducing(open(path, 'wb'))
+
+ def delete_blob(self, user, blob_id):
+ blob_path = self._get_path(user, blob_id)
+ os.unlink(blob_path)
+
+ def get_blob_size(user, blob_id):
+ raise NotImplementedError
+
+ def list_blobs(self, user, request):
+ blob_ids = []
+ base_path = self._get_path(user)
+ for _, _, filenames in os.walk(base_path):
+ blob_ids += filenames
+ return json.dumps(blob_ids)
+
+ def get_total_storage(self, user):
+ return self._get_disk_usage(self._get_path(user))
+
+ def add_tag_header(self, user, blob_id, request):
+ with open(self._get_path(user, blob_id)) as doc_file:
+ doc_file.seek(-16, 2)
+ tag = base64.urlsafe_b64encode(doc_file.read())
+ request.responseHeaders.setRawHeaders('Tag', [tag])
+
+ @defer.inlineCallbacks
+ def _get_disk_usage(self, start_path):
+ if not os.path.isdir(start_path):
+ defer.returnValue(0)
+ cmd = ['/usr/bin/du', '-s', '-c', start_path]
+ output = yield utils.getProcessOutput(cmd[0], cmd[1:])
+ size = output.split()[0]
+ defer.returnValue(int(size))
+
+ def _validate_path(self, desired_path, user, blob_id):
+ if not VALID_STRINGS.match(user):
+ raise Exception("Invalid characters on user: %s" % user)
+ if blob_id and not VALID_STRINGS.match(blob_id):
+ raise Exception("Invalid characters on blob_id: %s" % blob_id)
+ desired_path = os.path.realpath(desired_path) # expand path references
+ root = os.path.realpath(self.path)
+ if not desired_path.startswith(root + os.sep + user):
+ err = "User %s tried accessing a invalid path: %s" % (user,
+ desired_path)
+ raise Exception(err)
+ return desired_path
+
+ def _get_path(self, user, blob_id=False):
+ parts = [user]
+ if blob_id:
+ parts += [blob_id[0], blob_id[0:3], blob_id[0:6]]
+ parts += [blob_id]
+ path = os.path.join(self.path, *parts)
+ return self._validate_path(path, user, blob_id)
+
+
+class ImproperlyConfiguredException(Exception):
+ pass
+
+
+class BlobsResource(resource.Resource):
+
+ isLeaf = True
+
+ # Allowed backend classes are defined here
+ handlers = {"filesystem": FilesystemBlobsBackend}
+
+ def __init__(self, backend, blobs_path, **backend_kwargs):
+ resource.Resource.__init__(self)
+ self._blobs_path = blobs_path
+ backend_kwargs.update({'blobs_path': blobs_path})
+ if backend not in self.handlers:
+ raise ImproperlyConfiguredException("No such backend: %s", backend)
+ self._handler = self.handlers[backend](**backend_kwargs)
+ assert interfaces.IBlobsBackend.providedBy(self._handler)
+
+ # TODO double check credentials, we can have then
+ # under request.
+
+ def render_GET(self, request):
+ logger.info("http get: %s" % request.path)
+ user, blob_id = self._validate(request)
+ if not blob_id:
+ return self._handler.list_blobs(user, request)
+ self._handler.add_tag_header(user, blob_id, request)
+ return self._handler.read_blob(user, blob_id, request)
+
+ def render_DELETE(self, request):
+ logger.info("http put: %s" % request.path)
+ user, blob_id = self._validate(request)
+ self._handler.delete_blob(user, blob_id)
+ return ''
+
+ def render_PUT(self, request):
+ logger.info("http put: %s" % request.path)
+ user, blob_id = self._validate(request)
+ d = self._handler.write_blob(user, blob_id, request)
+ d.addCallback(lambda _: request.finish())
+ d.addErrback(self._error, request)
+ return NOT_DONE_YET
+
+ def _error(self, e, request):
+ logger.error('Error processing request: %s' % e.getErrorMessage())
+ request.setResponseCode(500)
+ request.finish()
+
+ def _validate(self, request):
+ for arg in request.postpath:
+ if arg and not VALID_STRINGS.match(arg):
+ raise Exception('Invalid blob resource argument: %s' % arg)
+ return request.postpath
+
+
+if __name__ == '__main__':
+ # A dummy blob server
+ # curl -X PUT --data-binary @/tmp/book.pdf localhost:9000/user/someid
+ # curl -X GET -o /dev/null localhost:9000/user/somerandomstring
+ from twisted.python import log
+ import sys
+ log.startLogging(sys.stdout)
+
+ from twisted.web.server import Site
+ from twisted.internet import reactor
+
+ # parse command line arguments
+ import argparse
+
+ parser = argparse.ArgumentParser()
+ parser.add_argument('--port', default=9000, type=int)
+ parser.add_argument('--path', default='/tmp/blobs/user')
+ args = parser.parse_args()
+
+ root = BlobsResource("filesystem", args.path)
+ # I picture somethink like
+ # BlobsResource(backend="filesystem", backend_opts={'path': '/tmp/blobs'})
+
+ factory = Site(root)
+ reactor.listenTCP(args.port, factory)
+ reactor.run()
diff --git a/src/leap/soledad/server/_config.py b/src/leap/soledad/server/_config.py
new file mode 100644
index 00000000..e89e70d6
--- /dev/null
+++ b/src/leap/soledad/server/_config.py
@@ -0,0 +1,82 @@
+# -*- coding: utf-8 -*-
+# config.py
+# Copyright (C) 2016 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+import configparser
+
+
+__all__ = ['get_config']
+
+
+CONFIG_DEFAULTS = {
+ 'soledad-server': {
+ 'couch_url': 'http://localhost:5984',
+ 'create_cmd': None,
+ 'admin_netrc': '/etc/couchdb/couchdb-admin.netrc',
+ 'batching': True,
+ 'blobs': False,
+ 'blobs_path': '/srv/leap/soledad/blobs',
+ },
+ 'database-security': {
+ 'members': ['soledad'],
+ 'members_roles': [],
+ 'admins': [],
+ 'admins_roles': []
+ }
+}
+
+
+_config = None
+
+
+def get_config(section='soledad-server'):
+ global _config
+ if not _config:
+ _config = _load_config('/etc/soledad/soledad-server.conf')
+ return _config[section]
+
+
+def _load_config(file_path):
+ """
+ Load server configuration from file.
+
+ @param file_path: The path to the configuration file.
+ @type file_path: str
+
+ @return: A dictionary with the configuration.
+ @rtype: dict
+ """
+ conf = dict(CONFIG_DEFAULTS)
+ config = configparser.SafeConfigParser()
+ config.read(file_path)
+ for section in conf:
+ if not config.has_section(section):
+ continue
+ for key, value in conf[section].items():
+ if not config.has_option(section, key):
+ continue
+ elif type(value) == bool:
+ conf[section][key] = config.getboolean(section, key)
+ elif type(value) == list:
+ values = config.get(section, key).split(',')
+ values = [v.strip() for v in values]
+ conf[section][key] = values
+ else:
+ conf[section][key] = config.get(section, key)
+ # TODO: implement basic parsing/sanitization of options comming from
+ # config file.
+ return conf
diff --git a/src/leap/soledad/server/_resource.py b/src/leap/soledad/server/_resource.py
new file mode 100644
index 00000000..49c4b742
--- /dev/null
+++ b/src/leap/soledad/server/_resource.py
@@ -0,0 +1,86 @@
+# -*- coding: utf-8 -*-
+# resource.py
+# Copyright (C) 2016 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+A twisted resource that serves the Soledad Server.
+"""
+from twisted.web.resource import Resource
+
+from ._server_info import ServerInfo
+from ._wsgi import get_sync_resource
+
+
+__all__ = ['SoledadResource', 'SoledadAnonResource']
+
+
+class _Robots(Resource):
+ def render_GET(self, request):
+ return (
+ 'User-agent: *\n'
+ 'Disallow: /\n'
+ '# you are not a robot, are you???')
+
+
+class SoledadAnonResource(Resource):
+
+ """
+ The parts of Soledad Server that unauthenticated users can see.
+ This is nice because this means that a non-authenticated user will get 404
+ for anything that is not in this minimal resource tree.
+ """
+
+ def __init__(self, enable_blobs=False):
+ Resource.__init__(self)
+ server_info = ServerInfo(enable_blobs)
+ self.putChild('', server_info)
+ self.putChild('robots.txt', _Robots())
+
+
+class SoledadResource(Resource):
+ """
+ This is a dummy twisted resource, used only to allow different entry points
+ for the Soledad Server.
+ """
+
+ def __init__(self, blobs_resource=None, sync_pool=None):
+ """
+ Initialize the Soledad resource.
+
+ :param blobs_resource: a resource to serve blobs, if enabled.
+ :type blobs_resource: _blobs.BlobsResource
+
+ :param sync_pool: A pool to pass to the WSGI sync resource.
+ :type sync_pool: twisted.python.threadpool.ThreadPool
+ """
+ Resource.__init__(self)
+
+ # requests to / return server information
+ server_info = ServerInfo(bool(blobs_resource))
+ self.putChild('', server_info)
+
+ # requests to /blobs will serve blobs if enabled
+ if blobs_resource:
+ self.putChild('blobs', blobs_resource)
+
+ # other requests are routed to legacy sync resource
+ self._sync_resource = get_sync_resource(sync_pool)
+
+ def getChild(self, path, request):
+ """
+ Route requests to legacy WSGI sync resource dynamically.
+ """
+ request.postpath.insert(0, request.prepath.pop())
+ return self._sync_resource
diff --git a/src/leap/soledad/server/_server_info.py b/src/leap/soledad/server/_server_info.py
new file mode 100644
index 00000000..50659338
--- /dev/null
+++ b/src/leap/soledad/server/_server_info.py
@@ -0,0 +1,44 @@
+# -*- coding: utf-8 -*-
+# _server_info.py
+# Copyright (C) 2017 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Resource that announces information about the server.
+"""
+import json
+
+from twisted.web.resource import Resource
+
+from leap.soledad.server import __version__
+
+
+__all__ = ['ServerInfo']
+
+
+class ServerInfo(Resource):
+ """
+ Return information about the server.
+ """
+
+ isLeaf = True
+
+ def __init__(self, blobs_enabled):
+ self._info = {
+ "blobs": blobs_enabled,
+ "version": __version__
+ }
+
+ def render_GET(self, request):
+ return json.dumps(self._info)
diff --git a/src/leap/soledad/server/_wsgi.py b/src/leap/soledad/server/_wsgi.py
new file mode 100644
index 00000000..f6ff6b26
--- /dev/null
+++ b/src/leap/soledad/server/_wsgi.py
@@ -0,0 +1,65 @@
+# -*- coding: utf-8 -*-
+# application.py
+# Copyright (C) 2016 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+A WSGI application that serves Soledad synchronization.
+"""
+from twisted.internet import reactor
+from twisted.web.wsgi import WSGIResource
+
+from leap.soledad.server import SoledadApp
+from leap.soledad.server.gzip_middleware import GzipMiddleware
+from leap.soledad.common.backend import SoledadBackend
+from leap.soledad.common.couch.state import CouchServerState
+from leap.soledad.common.log import getLogger
+
+from twisted.logger import Logger
+log = Logger()
+
+__all__ = ['init_couch_state', 'get_sync_resource']
+
+
+def _get_couch_state(conf):
+ state = CouchServerState(conf['couch_url'], create_cmd=conf['create_cmd'],
+ check_schema_versions=True)
+ SoledadBackend.BATCH_SUPPORT = conf.get('batching', False)
+ return state
+
+
+_app = SoledadApp(None) # delay state init
+wsgi_application = GzipMiddleware(_app)
+
+
+# During its initialization, the couch state verifies if all user databases
+# contain a config document with the correct couch schema version stored, and
+# will log an error and raise an exception if that is not the case.
+#
+# If this verification made too early (i.e. before the reactor has started and
+# the twistd web logging facilities have been setup), the logging will not
+# work. Because of that, we delay couch state initialization until the reactor
+# is running.
+
+def init_couch_state(conf):
+ try:
+ _app.state = _get_couch_state(conf)
+ except Exception as e:
+ logger = getLogger()
+ logger.error(str(e))
+ reactor.stop()
+
+
+def get_sync_resource(pool):
+ return WSGIResource(reactor, pool, wsgi_application)
diff --git a/src/leap/soledad/server/auth.py b/src/leap/soledad/server/auth.py
new file mode 100644
index 00000000..1357b289
--- /dev/null
+++ b/src/leap/soledad/server/auth.py
@@ -0,0 +1,173 @@
+# -*- coding: utf-8 -*-
+# auth.py
+# Copyright (C) 2013 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Twisted http token auth.
+"""
+import binascii
+import time
+
+from hashlib import sha512
+from zope.interface import implementer
+
+from twisted.cred import error
+from twisted.cred.checkers import ICredentialsChecker
+from twisted.cred.credentials import IUsernamePassword
+from twisted.cred.credentials import IAnonymous
+from twisted.cred.credentials import Anonymous
+from twisted.cred.credentials import UsernamePassword
+from twisted.cred.portal import IRealm
+from twisted.cred.portal import Portal
+from twisted.internet import defer
+from twisted.logger import Logger
+from twisted.web.iweb import ICredentialFactory
+from twisted.web.resource import IResource
+
+from leap.soledad.common.couch import couch_server
+
+from ._resource import SoledadResource, SoledadAnonResource
+from ._blobs import BlobsResource
+from ._config import get_config
+
+
+log = Logger()
+
+
+@implementer(IRealm)
+class SoledadRealm(object):
+
+ def __init__(self, sync_pool, conf=None):
+ assert sync_pool is not None
+ if conf is None:
+ conf = get_config()
+ blobs = conf['blobs']
+ blobs_resource = BlobsResource("filesystem",
+ conf['blobs_path']) if blobs else None
+ self.anon_resource = SoledadAnonResource(
+ enable_blobs=blobs)
+ self.auth_resource = SoledadResource(
+ blobs_resource=blobs_resource,
+ sync_pool=sync_pool)
+
+ def requestAvatar(self, avatarId, mind, *interfaces):
+
+ # Anonymous access
+ if IAnonymous.providedBy(avatarId):
+ return (IResource, self.anon_resource,
+ lambda: None)
+
+ # Authenticated access
+ else:
+ if IResource in interfaces:
+ return (IResource, self.auth_resource,
+ lambda: None)
+ raise NotImplementedError()
+
+
+@implementer(ICredentialsChecker)
+class TokenChecker(object):
+
+ credentialInterfaces = [IUsernamePassword, IAnonymous]
+
+ TOKENS_DB_PREFIX = "tokens_"
+ TOKENS_DB_EXPIRE = 30 * 24 * 3600 # 30 days in seconds
+ TOKENS_TYPE_KEY = "type"
+ TOKENS_TYPE_DEF = "Token"
+ TOKENS_USER_ID_KEY = "user_id"
+
+ def __init__(self):
+ self._couch_url = get_config().get('couch_url')
+
+ def _get_server(self):
+ return couch_server(self._couch_url)
+
+ def _tokens_dbname(self):
+ # the tokens db rotates every 30 days, and the current db name is
+ # "tokens_NNN", where NNN is the number of seconds since epoch
+ # divide dby the rotate period in seconds. When rotating, old and
+ # new tokens db coexist during a certain window of time and valid
+ # tokens are replicated from the old db to the new one. See:
+ # https://leap.se/code/issues/6785
+ dbname = self.TOKENS_DB_PREFIX + \
+ str(int(time.time() / self.TOKENS_DB_EXPIRE))
+ return dbname
+
+ def _tokens_db(self):
+ dbname = self._tokens_dbname()
+
+ # TODO -- leaking abstraction here: this module shouldn't need
+ # to known anything about the context manager. hide that in the couch
+ # module
+ with self._get_server() as server:
+ db = server[dbname]
+ return db
+
+ def requestAvatarId(self, credentials):
+ if IAnonymous.providedBy(credentials):
+ return defer.succeed(Anonymous())
+
+ uuid = credentials.username
+ token = credentials.password
+
+ # lookup key is a hash of the token to prevent timing attacks.
+ # TODO cache the tokens already!
+
+ db = self._tokens_db()
+ token = db.get(sha512(token).hexdigest())
+ if token is None:
+ return defer.fail(error.UnauthorizedLogin())
+
+ # TODO -- use cryptography constant time builtin comparison.
+ # we compare uuid hashes to avoid possible timing attacks that
+ # might exploit python's builtin comparison operator behaviour,
+ # which fails immediatelly when non-matching bytes are found.
+ couch_uuid_hash = sha512(token[self.TOKENS_USER_ID_KEY]).digest()
+ req_uuid_hash = sha512(uuid).digest()
+ if token[self.TOKENS_TYPE_KEY] != self.TOKENS_TYPE_DEF \
+ or couch_uuid_hash != req_uuid_hash:
+ return defer.fail(error.UnauthorizedLogin())
+
+ return defer.succeed(uuid)
+
+
+@implementer(ICredentialFactory)
+class TokenCredentialFactory(object):
+
+ scheme = 'token'
+
+ def getChallenge(self, request):
+ return {}
+
+ def decode(self, response, request):
+ try:
+ creds = binascii.a2b_base64(response + b'===')
+ except binascii.Error:
+ raise error.LoginFailed('Invalid credentials')
+
+ creds = creds.split(b':', 1)
+ if len(creds) == 2:
+ return UsernamePassword(*creds)
+ else:
+ raise error.LoginFailed('Invalid credentials')
+
+
+def portalFactory(sync_pool):
+ realm = SoledadRealm(sync_pool=sync_pool)
+ checker = TokenChecker()
+ return Portal(realm, [checker])
+
+
+credentialFactory = TokenCredentialFactory()
diff --git a/src/leap/soledad/server/caching.py b/src/leap/soledad/server/caching.py
new file mode 100644
index 00000000..9a049a39
--- /dev/null
+++ b/src/leap/soledad/server/caching.py
@@ -0,0 +1,32 @@
+# -*- coding: utf-8 -*-
+# caching.py
+# Copyright (C) 2015 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Server side caching. Using beaker for now.
+"""
+from beaker.cache import CacheManager
+
+
+def setup_caching():
+ _cache_manager = CacheManager(type='memory')
+ return _cache_manager
+
+
+_cache_manager = setup_caching()
+
+
+def get_cache_for(key, expire=3600):
+ return _cache_manager.get_cache(key, expire=expire)
diff --git a/src/leap/soledad/server/entrypoint.py b/src/leap/soledad/server/entrypoint.py
new file mode 100644
index 00000000..c06b740e
--- /dev/null
+++ b/src/leap/soledad/server/entrypoint.py
@@ -0,0 +1,50 @@
+# -*- coding: utf-8 -*-
+# entrypoint.py
+# Copyright (C) 2016 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+The entrypoint for Soledad server.
+
+This is the entrypoint for the application that is loaded from the initscript
+or the systemd script.
+"""
+
+from twisted.internet import reactor
+from twisted.python import threadpool
+
+from .auth import portalFactory
+from .session import SoledadSession
+from ._config import get_config
+from ._wsgi import init_couch_state
+
+
+# load configuration from file
+conf = get_config()
+
+
+class SoledadEntrypoint(SoledadSession):
+
+ def __init__(self):
+ pool = threadpool.ThreadPool(name='wsgi')
+ reactor.callWhenRunning(pool.start)
+ reactor.addSystemEventTrigger('after', 'shutdown', pool.stop)
+ portal = portalFactory(pool)
+ SoledadSession.__init__(self, portal)
+
+
+# see the comments in application.py recarding why couch state has to be
+# initialized when the reactor is running
+
+reactor.callWhenRunning(init_couch_state, conf)
diff --git a/src/leap/soledad/server/gzip_middleware.py b/src/leap/soledad/server/gzip_middleware.py
new file mode 100644
index 00000000..c77f9f67
--- /dev/null
+++ b/src/leap/soledad/server/gzip_middleware.py
@@ -0,0 +1,67 @@
+# -*- coding: utf-8 -*-
+# gzip_middleware.py
+# Copyright (C) 2013 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Gzip middleware for WSGI apps.
+"""
+from six import StringIO
+from gzip import GzipFile
+
+
+class GzipMiddleware(object):
+ """
+ GzipMiddleware class for WSGI.
+ """
+ def __init__(self, app, compresslevel=9):
+ self.app = app
+ self.compresslevel = compresslevel
+
+ def __call__(self, environ, start_response):
+ if 'gzip' not in environ.get('HTTP_ACCEPT_ENCODING', ''):
+ return self.app(environ, start_response)
+
+ buffer = StringIO.StringIO()
+ output = GzipFile(
+ mode='wb',
+ compresslevel=self.compresslevel,
+ fileobj=buffer
+ )
+
+ start_response_args = []
+
+ def dummy_start_response(status, headers, exc_info=None):
+ start_response_args.append(status)
+ start_response_args.append(headers)
+ start_response_args.append(exc_info)
+ return output.write
+
+ app_iter = self.app(environ, dummy_start_response)
+ for line in app_iter:
+ output.write(line)
+ if hasattr(app_iter, 'close'):
+ app_iter.close()
+ output.close()
+ buffer.seek(0)
+ result = buffer.getvalue()
+ headers = []
+ for name, value in start_response_args[1]:
+ if name.lower() != 'content-length':
+ headers.append((name, value))
+ headers.append(('Content-Length', str(len(result))))
+ headers.append(('Content-Encoding', 'gzip'))
+ start_response(start_response_args[0], headers, start_response_args[2])
+ buffer.close()
+ return [result]
diff --git a/src/leap/soledad/server/interfaces.py b/src/leap/soledad/server/interfaces.py
new file mode 100644
index 00000000..67b04bc3
--- /dev/null
+++ b/src/leap/soledad/server/interfaces.py
@@ -0,0 +1,72 @@
+# -*- coding: utf-8 -*-
+# interfaces.py
+# Copyright (C) 2017 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+from zope.interface import Interface
+
+
+class IBlobsBackend(Interface):
+
+ """
+ An interface for a BlobsBackend.
+ """
+
+ def read_blob(user, blob_id, request):
+ """
+ Read blob with a given blob_id, and write it to the passed request.
+
+ :returns: a deferred that fires upon finishing.
+ """
+
+ def write_blob(user, blob_id, request):
+ """
+ Write blob to the storage, reading it from the passed request.
+
+ :returns: a deferred that fires upon finishing.
+ """
+
+ def delete_blob(user, blob_id):
+ """
+ Delete the given blob_id.
+ """
+
+ def get_blob_size(user, blob_id):
+ """
+ Get the size of the given blob id.
+ """
+
+ def list_blobs(user, request):
+ """
+ Returns a json-encoded list of ids from user's blob.
+
+ :returns: a deferred that fires upon finishing.
+ """
+
+ def get_total_storage(user):
+ """
+ Get the size used by a given user as the sum of all the blobs stored
+ unders its namespace.
+ """
+
+ def add_tag_header(user, blob_id, request):
+ """
+ Adds a header 'Tag' to the passed request object, containing the last
+ 16 bytes of the encoded blob, which according to the spec contains the
+ tag.
+
+ :returns: a deferred that fires upon finishing.
+ """
diff --git a/src/leap/soledad/server/session.py b/src/leap/soledad/server/session.py
new file mode 100644
index 00000000..1c1b5345
--- /dev/null
+++ b/src/leap/soledad/server/session.py
@@ -0,0 +1,107 @@
+# -*- coding: utf-8 -*-
+# session.py
+# Copyright (C) 2017 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Twisted resource containing an authenticated Soledad session.
+"""
+from zope.interface import implementer
+
+from twisted.cred.credentials import Anonymous
+from twisted.cred import error
+from twisted.python import log
+from twisted.web import util
+from twisted.web._auth import wrapper
+from twisted.web.guard import HTTPAuthSessionWrapper
+from twisted.web.resource import ErrorPage
+from twisted.web.resource import IResource
+
+from leap.soledad.server.auth import credentialFactory
+from leap.soledad.server.url_mapper import URLMapper
+
+
+@implementer(IResource)
+class UnauthorizedResource(wrapper.UnauthorizedResource):
+ isLeaf = True
+
+ def __init__(self):
+ pass
+
+ def render(self, request):
+ request.setResponseCode(401)
+ if request.method == b'HEAD':
+ return b''
+ return b'Unauthorized'
+
+ def getChildWithDefault(self, path, request):
+ return self
+
+
+@implementer(IResource)
+class SoledadSession(HTTPAuthSessionWrapper):
+
+ def __init__(self, portal):
+ self._mapper = URLMapper()
+ self._portal = portal
+ self._credentialFactory = credentialFactory
+ # expected by the contract of the parent class
+ self._credentialFactories = [credentialFactory]
+
+ def _matchPath(self, request):
+ match = self._mapper.match(request.path, request.method)
+ return match
+
+ def _parseHeader(self, header):
+ elements = header.split(b' ')
+ scheme = elements[0].lower()
+ if scheme == self._credentialFactory.scheme:
+ return (b' '.join(elements[1:]))
+ return None
+
+ def _authorizedResource(self, request):
+ # check whether the path of the request exists in the app
+ match = self._matchPath(request)
+ if not match:
+ return UnauthorizedResource()
+
+ # get authorization header or fail
+ header = request.getHeader(b'authorization')
+ if not header:
+ return util.DeferredResource(self._login(Anonymous()))
+
+ # parse the authorization header
+ auth_data = self._parseHeader(header)
+ if not auth_data:
+ return UnauthorizedResource()
+
+ # decode the credentials from the parsed header
+ try:
+ credentials = self._credentialFactory.decode(auth_data, request)
+ except error.LoginFailed:
+ return UnauthorizedResource()
+ except:
+ # If you port this to the newer log facility, be aware that
+ # the tests rely on the error to be logged.
+ log.err(None, "Unexpected failure from credentials factory")
+ return ErrorPage(500, None, None)
+
+ # make sure the uuid given in path corresponds to the one given in
+ # the credentials
+ request_uuid = match.get('uuid')
+ if request_uuid and request_uuid != credentials.username:
+ return ErrorPage(500, None, None)
+
+ # if all checks pass, try to login with credentials
+ return util.DeferredResource(self._login(credentials))
diff --git a/src/leap/soledad/server/state.py b/src/leap/soledad/server/state.py
new file mode 100644
index 00000000..f269b77e
--- /dev/null
+++ b/src/leap/soledad/server/state.py
@@ -0,0 +1,141 @@
+# -*- coding: utf-8 -*-
+# state.py
+# Copyright (C) 2015 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Server side synchronization infrastructure.
+"""
+from leap.soledad.server import caching
+
+
+class ServerSyncState(object):
+ """
+ The state of one sync session, as stored on backend server.
+
+ On server side, the ongoing syncs metadata is maintained in
+ a caching layer.
+ """
+
+ def __init__(self, source_replica_uid, sync_id):
+ """
+ Initialize the sync state object.
+
+ :param sync_id: The id of current sync
+ :type sync_id: str
+ :param source_replica_uid: The source replica uid
+ :type source_replica_uid: str
+ """
+ self._source_replica_uid = source_replica_uid
+ self._sync_id = sync_id
+ caching_key = source_replica_uid + sync_id
+ self._storage = caching.get_cache_for(caching_key)
+
+ def _put_dict_info(self, key, value):
+ """
+ Put some information about the sync state.
+
+ :param key: The key for the info to be put.
+ :type key: str
+ :param value: The value for the info to be put.
+ :type value: str
+ """
+ if key not in self._storage:
+ self._storage[key] = []
+ info_list = self._storage.get(key)
+ info_list.append(value)
+ self._storage[key] = info_list
+
+ def put_seen_id(self, seen_id, gen):
+ """
+ Put one seen id on the sync state.
+
+ :param seen_id: The doc_id of a document seen during sync.
+ :type seen_id: str
+ :param gen: The corresponding db generation.
+ :type gen: int
+ """
+ self._put_dict_info(
+ 'seen_id',
+ (seen_id, gen))
+
+ def seen_ids(self):
+ """
+ Return all document ids seen during the sync.
+
+ :return: A dict with doc ids seen during the sync.
+ :rtype: dict
+ """
+ if 'seen_id' in self._storage:
+ seen_ids = self._storage.get('seen_id')
+ else:
+ seen_ids = []
+ return dict(seen_ids)
+
+ def put_changes_to_return(self, gen, trans_id, changes_to_return):
+ """
+ Put the calculated changes to return in the backend sync state.
+
+ :param gen: The target database generation that will be synced.
+ :type gen: int
+ :param trans_id: The target database transaction id that will be
+ synced.
+ :type trans_id: str
+ :param changes_to_return: A list of tuples with the changes to be
+ returned during the sync process.
+ :type changes_to_return: list
+ """
+ self._put_dict_info(
+ 'changes_to_return',
+ {
+ 'gen': gen,
+ 'trans_id': trans_id,
+ 'changes_to_return': changes_to_return,
+ }
+ )
+
+ def sync_info(self):
+ """
+ Return information about the current sync state.
+
+ :return: The generation and transaction id of the target database
+ which will be synced, and the number of documents to return,
+ or a tuple of Nones if those have not already been sent to
+ server.
+ :rtype: tuple
+ """
+ gen = trans_id = number_of_changes = None
+ if 'changes_to_return' in self._storage:
+ info = self._storage.get('changes_to_return')[0]
+ gen = info['gen']
+ trans_id = info['trans_id']
+ number_of_changes = len(info['changes_to_return'])
+ return gen, trans_id, number_of_changes
+
+ def next_change_to_return(self, received):
+ """
+ Return the next change to be returned to the source syncing replica.
+
+ :param received: How many documents the source replica has already
+ received during the current sync process.
+ :type received: int
+ """
+ gen = trans_id = next_change_to_return = None
+ if 'changes_to_return' in self._storage:
+ info = self._storage.get('changes_to_return')[0]
+ gen = info['gen']
+ trans_id = info['trans_id']
+ if received < len(info['changes_to_return']):
+ next_change_to_return = (info['changes_to_return'][received])
+ return gen, trans_id, next_change_to_return
diff --git a/src/leap/soledad/server/sync.py b/src/leap/soledad/server/sync.py
new file mode 100644
index 00000000..6791c06c
--- /dev/null
+++ b/src/leap/soledad/server/sync.py
@@ -0,0 +1,305 @@
+# -*- coding: utf-8 -*-
+# sync.py
+# Copyright (C) 2014 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Server side synchronization infrastructure.
+"""
+import time
+from six.moves import zip as izip
+
+from leap.soledad.common.l2db import sync
+from leap.soledad.common.l2db.remote import http_app
+from leap.soledad.server.caching import get_cache_for
+from leap.soledad.server.state import ServerSyncState
+from leap.soledad.common.document import ServerDocument
+
+
+MAX_REQUEST_SIZE = float('inf') # It's a stream.
+MAX_ENTRY_SIZE = 200 # in Mb
+ENTRY_CACHE_SIZE = 8192 * 1024
+
+
+class SyncExchange(sync.SyncExchange):
+
+ def __init__(self, db, source_replica_uid, last_known_generation, sync_id):
+ """
+ :param db: The target syncing database.
+ :type db: SoledadBackend
+ :param source_replica_uid: The uid of the source syncing replica.
+ :type source_replica_uid: str
+ :param last_known_generation: The last target replica generation the
+ source replica knows about.
+ :type last_known_generation: int
+ :param sync_id: The id of the current sync session.
+ :type sync_id: str
+ """
+ self._db = db
+ self.source_replica_uid = source_replica_uid
+ self.source_last_known_generation = last_known_generation
+ self.sync_id = sync_id
+ self.new_gen = None
+ self.new_trans_id = None
+ self._trace_hook = None
+ # recover sync state
+ self._sync_state = ServerSyncState(self.source_replica_uid, sync_id)
+
+ def find_changes_to_return(self):
+ """
+ Find changes to return.
+
+ Find changes since last_known_generation in db generation
+ order using whats_changed. It excludes documents ids that have
+ already been considered (superseded by the sender, etc).
+
+ :return: the generation of this database, which the caller can
+ consider themselves to be synchronized after processing
+ allreturned documents, and the amount of documents to be sent
+ to the source syncing replica.
+ :rtype: int
+ """
+ # check if changes to return have already been calculated
+ new_gen, new_trans_id, number_of_changes = self._sync_state.sync_info()
+ if number_of_changes is None:
+ self._trace('before whats_changed')
+ new_gen, new_trans_id, changes = self._db.whats_changed(
+ self.source_last_known_generation)
+ self._trace('after whats_changed')
+ seen_ids = self._sync_state.seen_ids()
+ # changed docs that weren't superseded by or converged with
+ self.changes_to_return = [
+ (doc_id, gen, trans_id) for (doc_id, gen, trans_id) in changes
+ # there was a subsequent update
+ if doc_id not in seen_ids or seen_ids.get(doc_id) < gen]
+ self._sync_state.put_changes_to_return(
+ new_gen, new_trans_id, self.changes_to_return)
+ number_of_changes = len(self.changes_to_return)
+ self.new_gen = new_gen
+ self.new_trans_id = new_trans_id
+ return self.new_gen, number_of_changes
+
+ def return_docs(self, return_doc_cb):
+ """Return the changed documents and their last change generation
+ repeatedly invoking the callback return_doc_cb.
+
+ The final step of a sync exchange.
+
+ :param: return_doc_cb(doc, gen, trans_id): is a callback
+ used to return the documents with their last change generation
+ to the target replica.
+ :return: None
+ """
+ changes_to_return = self.changes_to_return
+ # return docs, including conflicts.
+ # content as a file-object (will be read when writing)
+ changed_doc_ids = [doc_id for doc_id, _, _ in changes_to_return]
+ docs = self._db.get_docs(
+ changed_doc_ids, check_for_conflicts=False,
+ include_deleted=True, read_content=False)
+
+ docs_by_gen = izip(
+ docs, (gen for _, gen, _ in changes_to_return),
+ (trans_id for _, _, trans_id in changes_to_return))
+ for doc, gen, trans_id in docs_by_gen:
+ return_doc_cb(doc, gen, trans_id)
+
+ def batched_insert_from_source(self, entries, sync_id):
+ if not entries:
+ return
+ self._db.batch_start()
+ for entry in entries:
+ doc, gen, trans_id, number_of_docs, doc_idx = entry
+ self.insert_doc_from_source(doc, gen, trans_id, number_of_docs,
+ doc_idx, sync_id)
+ self._db.batch_end()
+
+ def insert_doc_from_source(
+ self, doc, source_gen, trans_id,
+ number_of_docs=None, doc_idx=None, sync_id=None):
+ """Try to insert synced document from source.
+
+ Conflicting documents are not inserted but will be sent over
+ to the sync source.
+
+ It keeps track of progress by storing the document source
+ generation as well.
+
+ The 1st step of a sync exchange is to call this repeatedly to
+ try insert all incoming documents from the source.
+
+ :param doc: A Document object.
+ :type doc: Document
+ :param source_gen: The source generation of doc.
+ :type source_gen: int
+ :param trans_id: The transaction id of that document change.
+ :type trans_id: str
+ :param number_of_docs: The total amount of documents sent on this sync
+ session.
+ :type number_of_docs: int
+ :param doc_idx: The index of the current document.
+ :type doc_idx: int
+ :param sync_id: The id of the current sync session.
+ :type sync_id: str
+ """
+ state, at_gen = self._db._put_doc_if_newer(
+ doc, save_conflict=False, replica_uid=self.source_replica_uid,
+ replica_gen=source_gen, replica_trans_id=trans_id,
+ number_of_docs=number_of_docs, doc_idx=doc_idx, sync_id=sync_id)
+ if state == 'inserted':
+ self._sync_state.put_seen_id(doc.doc_id, at_gen)
+ elif state == 'converged':
+ # magical convergence
+ self._sync_state.put_seen_id(doc.doc_id, at_gen)
+ elif state == 'superseded':
+ # we have something newer that we will return
+ pass
+ else:
+ # conflict that we will returne
+ assert state == 'conflicted'
+
+
+class SyncResource(http_app.SyncResource):
+
+ max_request_size = MAX_REQUEST_SIZE * 1024 * 1024
+ max_entry_size = MAX_ENTRY_SIZE * 1024 * 1024
+
+ sync_exchange_class = SyncExchange
+
+ @http_app.http_method(
+ last_known_generation=int, last_known_trans_id=http_app.none_or_str,
+ sync_id=http_app.none_or_str, content_as_args=True)
+ def post_args(self, last_known_generation, last_known_trans_id=None,
+ sync_id=None, ensure=False):
+ """
+ Handle the initial arguments for the sync POST request from client.
+
+ :param last_known_generation: The last server replica generation the
+ client knows about.
+ :type last_known_generation: int
+ :param last_known_trans_id: The last server replica transaction_id the
+ client knows about.
+ :type last_known_trans_id: str
+ :param sync_id: The id of the current sync session.
+ :type sync_id: str
+ :param ensure: Whether the server replica should be created if it does
+ not already exist.
+ :type ensure: bool
+ """
+ # create or open the database
+ cache = get_cache_for('db-' + sync_id + self.dbname, expire=120)
+ if ensure:
+ db, self.replica_uid = self.state.ensure_database(self.dbname)
+ else:
+ db = self.state.open_database(self.dbname)
+ db.init_caching(cache)
+ # validate the information the client has about server replica
+ db.validate_gen_and_trans_id(
+ last_known_generation, last_known_trans_id)
+ # get a sync exchange object
+ self.sync_exch = self.sync_exchange_class(
+ db, self.source_replica_uid, last_known_generation, sync_id)
+ self._sync_id = sync_id
+ self._staging = []
+ self._staging_size = 0
+
+ @http_app.http_method(content_as_args=True)
+ def post_put(
+ self, id, rev, content, gen,
+ trans_id, number_of_docs, doc_idx):
+ """
+ Put one incoming document into the server replica.
+
+ :param id: The id of the incoming document.
+ :type id: str
+ :param rev: The revision of the incoming document.
+ :type rev: str
+ :param content: The content of the incoming document.
+ :type content: dict
+ :param gen: The source replica generation corresponding to the
+ revision of the incoming document.
+ :type gen: int
+ :param trans_id: The source replica transaction id corresponding to
+ the revision of the incoming document.
+ :type trans_id: str
+ :param number_of_docs: The total amount of documents sent on this sync
+ session.
+ :type number_of_docs: int
+ :param doc_idx: The index of the current document.
+ :type doc_idx: int
+ """
+ doc = ServerDocument(id, rev, json=content)
+ self._staging_size += len(content or '')
+ self._staging.append((doc, gen, trans_id, number_of_docs, doc_idx))
+ if self._staging_size > ENTRY_CACHE_SIZE or doc_idx == number_of_docs:
+ self.sync_exch.batched_insert_from_source(self._staging,
+ self._sync_id)
+ self._staging = []
+ self._staging_size = 0
+
+ def post_get(self):
+ """
+ Return syncing documents to the client.
+ """
+ def send_doc(doc, gen, trans_id):
+ entry = dict(id=doc.doc_id, rev=doc.rev,
+ gen=gen, trans_id=trans_id)
+ self.responder.stream_entry(entry)
+ content_reader = doc.get_json()
+ if content_reader:
+ content = content_reader.read()
+ self.responder.stream_entry(content)
+ content_reader.close()
+ # throttle at 5mb/s
+ # FIXME: twistd cant control througput
+ # we need to either use gunicorn or go async
+ time.sleep(len(content) / (5.0 * 1024 * 1024))
+ else:
+ self.responder.stream_entry('')
+
+ new_gen, number_of_changes = \
+ self.sync_exch.find_changes_to_return()
+ self.responder.content_type = 'application/x-u1db-sync-response'
+ self.responder.start_response(200)
+ self.responder.start_stream(),
+ header = {
+ "new_generation": new_gen,
+ "new_transaction_id": self.sync_exch.new_trans_id,
+ "number_of_changes": number_of_changes,
+ }
+ if self.replica_uid is not None:
+ header['replica_uid'] = self.replica_uid
+ self.responder.stream_entry(header)
+ self.sync_exch.return_docs(send_doc)
+ self.responder.end_stream()
+ self.responder.finish_response()
+
+ def post_end(self):
+ """
+ Return the current generation and transaction_id after inserting one
+ incoming document.
+ """
+ self.responder.content_type = 'application/x-soledad-sync-response'
+ self.responder.start_response(200)
+ self.responder.start_stream(),
+ new_gen, new_trans_id = self.sync_exch._db._get_generation_info()
+ header = {
+ "new_generation": new_gen,
+ "new_transaction_id": new_trans_id,
+ }
+ if self.replica_uid is not None:
+ header['replica_uid'] = self.replica_uid
+ self.responder.stream_entry(header)
+ self.responder.end_stream()
+ self.responder.finish_response()
diff --git a/src/leap/soledad/server/url_mapper.py b/src/leap/soledad/server/url_mapper.py
new file mode 100644
index 00000000..b50a81cd
--- /dev/null
+++ b/src/leap/soledad/server/url_mapper.py
@@ -0,0 +1,77 @@
+# -*- coding: utf-8 -*-
+# url_mapper.py
+# Copyright (C) 2013 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+An URL mapper that represents authorized paths.
+"""
+from routes.mapper import Mapper
+
+from leap.soledad.common import SHARED_DB_NAME
+from leap.soledad.common.l2db import DBNAME_CONSTRAINTS
+
+
+class URLMapper(object):
+ """
+ Maps the URLs users can access.
+ """
+
+ def __init__(self):
+ self._map = Mapper(controller_scan=None)
+ self._connect_urls()
+ self._map.create_regs()
+
+ def match(self, path, method):
+ environ = {'PATH_INFO': path, 'REQUEST_METHOD': method}
+ return self._map.match(environ=environ)
+
+ def _connect(self, pattern, http_methods):
+ self._map.connect(
+ None, pattern, http_methods=http_methods,
+ conditions=dict(method=http_methods),
+ requirements={'dbname': DBNAME_CONSTRAINTS})
+
+ def _connect_urls(self):
+ """
+ Register the authorization info in the mapper using C{SHARED_DB_NAME}
+ as the user's database name.
+
+ This method sets up the following authorization rules:
+
+ URL path | Authorized actions
+ ----------------------------------------------------
+ / | GET
+ /robots.txt | GET
+ /shared-db | GET
+ /shared-db/doc/{any_id} | GET, PUT, DELETE
+ /user-{uuid}/sync-from/{source} | GET, PUT, POST
+ /blobs/{uuid}/{blob_id} | GET, PUT, POST
+ /blobs/{uuid} | GET
+ """
+ # auth info for global resource
+ self._connect('/', ['GET'])
+ # robots
+ self._connect('/robots.txt', ['GET'])
+ # auth info for shared-db database resource
+ self._connect('/%s' % SHARED_DB_NAME, ['GET'])
+ # auth info for shared-db doc resource
+ self._connect('/%s/doc/{id:.*}' % SHARED_DB_NAME,
+ ['GET', 'PUT', 'DELETE'])
+ # auth info for user-db sync resource
+ self._connect('/user-{uuid}/sync-from/{source_replica_uid}',
+ ['GET', 'PUT', 'POST'])
+ # auth info for blobs resource
+ self._connect('/blobs/{uuid}/{blob_id}', ['GET', 'PUT'])
+ self._connect('/blobs/{uuid}', ['GET'])