summaryrefslogtreecommitdiff
path: root/src/leap/soledad/server
diff options
context:
space:
mode:
Diffstat (limited to 'src/leap/soledad/server')
-rw-r--r--src/leap/soledad/server/__init__.py192
-rw-r--r--src/leap/soledad/server/_blobs.py234
-rw-r--r--src/leap/soledad/server/_config.py82
-rw-r--r--src/leap/soledad/server/_resource.py86
-rw-r--r--src/leap/soledad/server/_server_info.py44
-rw-r--r--src/leap/soledad/server/_wsgi.py65
-rw-r--r--src/leap/soledad/server/auth.py173
-rw-r--r--src/leap/soledad/server/caching.py32
-rw-r--r--src/leap/soledad/server/entrypoint.py50
-rw-r--r--src/leap/soledad/server/gzip_middleware.py67
-rw-r--r--src/leap/soledad/server/interfaces.py72
-rw-r--r--src/leap/soledad/server/session.py107
-rw-r--r--src/leap/soledad/server/state.py141
-rw-r--r--src/leap/soledad/server/sync.py305
-rw-r--r--src/leap/soledad/server/url_mapper.py77
15 files changed, 1727 insertions, 0 deletions
diff --git a/src/leap/soledad/server/__init__.py b/src/leap/soledad/server/__init__.py
new file mode 100644
index 00000000..a4080f13
--- /dev/null
+++ b/src/leap/soledad/server/__init__.py
@@ -0,0 +1,192 @@
+# -*- coding: utf-8 -*-
+# server.py
+# Copyright (C) 2013 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+"""
+The Soledad Server allows for recovery document storage and database
+synchronization.
+"""
+
+import six.moves.urllib.parse as urlparse
+import sys
+
+from leap.soledad.common.l2db.remote import http_app, utils
+from leap.soledad.common import SHARED_DB_NAME
+
+from .sync import SyncResource
+from .sync import MAX_REQUEST_SIZE
+from .sync import MAX_ENTRY_SIZE
+
+from ._version import get_versions
+from ._config import get_config
+
+
+__all__ = [
+ 'SoledadApp',
+ 'get_config',
+ '__version__',
+]
+
+
+# ----------------------------------------------------------------------------
+# Soledad WSGI application
+# ----------------------------------------------------------------------------
+
+
+class SoledadApp(http_app.HTTPApp):
+ """
+ Soledad WSGI application
+ """
+
+ SHARED_DB_NAME = SHARED_DB_NAME
+ """
+ The name of the shared database that holds user's encrypted secrets.
+ """
+
+ max_request_size = MAX_REQUEST_SIZE * 1024 * 1024
+ max_entry_size = MAX_ENTRY_SIZE * 1024 * 1024
+
+ def __call__(self, environ, start_response):
+ """
+ Handle a WSGI call to the Soledad application.
+
+ @param environ: Dictionary containing CGI variables.
+ @type environ: dict
+ @param start_response: Callable of the form start_response(status,
+ response_headers, exc_info=None).
+ @type start_response: callable
+
+ @return: HTTP application results.
+ @rtype: list
+ """
+ return http_app.HTTPApp.__call__(self, environ, start_response)
+
+
+# ----------------------------------------------------------------------------
+# WSGI resources registration
+# ----------------------------------------------------------------------------
+
+# monkey patch u1db with a new resource map
+http_app.url_to_resource = http_app.URLToResource()
+
+# register u1db unmodified resources
+http_app.url_to_resource.register(http_app.GlobalResource)
+http_app.url_to_resource.register(http_app.DatabaseResource)
+http_app.url_to_resource.register(http_app.DocsResource)
+http_app.url_to_resource.register(http_app.DocResource)
+
+# register Soledad's new or modified resources
+http_app.url_to_resource.register(SyncResource)
+
+
+# ----------------------------------------------------------------------------
+# Modified HTTP method invocation (to account for splitted sync)
+# ----------------------------------------------------------------------------
+
+class HTTPInvocationByMethodWithBody(
+ http_app.HTTPInvocationByMethodWithBody):
+ """
+ Invoke methods on a resource.
+ """
+
+ def __call__(self):
+ """
+ Call an HTTP method of a resource.
+
+ This method was rewritten to allow for a sync flow which uses one POST
+ request for each transferred document (back and forth).
+
+ Usual U1DB sync process transfers all documents from client to server
+ and back in only one POST request. This is inconvenient for some
+ reasons, as lack of possibility of gracefully interrupting the sync
+ process, and possible timeouts for when dealing with large documents
+ that have to be retrieved and encrypted/decrypted. Because of those,
+ we split the sync process into many POST requests.
+ """
+ args = urlparse.parse_qsl(self.environ['QUERY_STRING'],
+ strict_parsing=False)
+ try:
+ args = dict(
+ (k.decode('utf-8'), v.decode('utf-8')) for k, v in args)
+ except ValueError:
+ raise http_app.BadRequest()
+ method = self.environ['REQUEST_METHOD'].lower()
+ if method in ('get', 'delete'):
+ meth = self._lookup(method)
+ return meth(args, None)
+ else:
+ # we expect content-length > 0, reconsider if we move
+ # to support chunked enconding
+ try:
+ content_length = int(self.environ['CONTENT_LENGTH'])
+ except (ValueError, KeyError):
+ # raise http_app.BadRequest
+ content_length = self.max_request_size
+ if content_length <= 0:
+ raise http_app.BadRequest
+ if content_length > self.max_request_size:
+ raise http_app.BadRequest
+ reader = http_app._FencedReader(
+ self.environ['wsgi.input'], content_length,
+ self.max_entry_size)
+ content_type = self.environ.get('CONTENT_TYPE')
+ if content_type == 'application/json':
+ meth = self._lookup(method)
+ body = reader.read_chunk(sys.maxint)
+ return meth(args, body)
+ elif content_type.startswith('application/x-soledad-sync'):
+ # read one line and validate it
+ body_getline = reader.getline
+ if body_getline().strip() != '[':
+ raise http_app.BadRequest()
+ line = body_getline()
+ line, comma = utils.check_and_strip_comma(line.strip())
+ meth_args = self._lookup('%s_args' % method)
+ meth_args(args, line)
+ # handle incoming documents
+ if content_type == 'application/x-soledad-sync-put':
+ meth_put = self._lookup('%s_put' % method)
+ meth_end = self._lookup('%s_end' % method)
+ while True:
+ entry = body_getline().strip()
+ if entry == ']': # end of incoming document stream
+ break
+ if not entry or not comma: # empty or no prec comma
+ raise http_app.BadRequest
+ entry, comma = utils.check_and_strip_comma(entry)
+ content = body_getline().strip()
+ content, comma = utils.check_and_strip_comma(content)
+ meth_put({'content': content or None}, entry)
+ if comma or body_getline(): # extra comma or data
+ raise http_app.BadRequest
+ return meth_end()
+ # handle outgoing documents
+ elif content_type == 'application/x-soledad-sync-get':
+ meth_get = self._lookup('%s_get' % method)
+ return meth_get()
+ else:
+ raise http_app.BadRequest()
+ else:
+ raise http_app.BadRequest()
+
+
+# monkey patch server with new http invocation
+http_app.HTTPInvocationByMethodWithBody = HTTPInvocationByMethodWithBody
+
+
+__version__ = get_versions()['version']
+del get_versions
diff --git a/src/leap/soledad/server/_blobs.py b/src/leap/soledad/server/_blobs.py
new file mode 100644
index 00000000..10678360
--- /dev/null
+++ b/src/leap/soledad/server/_blobs.py
@@ -0,0 +1,234 @@
+# -*- coding: utf-8 -*-
+# _blobs.py
+# Copyright (C) 2017 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+"""
+Blobs Server implementation.
+
+This is a very simplistic implementation for the time being.
+Clients should be able to opt-in util the feature is complete.
+
+A more performant BlobsBackend can (and should) be implemented for production
+environments.
+"""
+import os
+import base64
+import json
+import re
+
+from twisted.logger import Logger
+from twisted.web import static
+from twisted.web import resource
+from twisted.web.client import FileBodyProducer
+from twisted.web.server import NOT_DONE_YET
+from twisted.internet import utils, defer
+
+from zope.interface import implementer
+
+from leap.common.files import mkdir_p
+from leap.soledad.server import interfaces
+
+
+__all__ = ['BlobsResource']
+
+
+logger = Logger()
+
+# Used for sanitizers, we accept only letters, numbers, '-' and '_'
+VALID_STRINGS = re.compile('^[a-zA-Z0-9_-]+$')
+
+
+# for the future:
+# [ ] isolate user avatar in a safer way
+# [ ] catch timeout in the server (and delete incomplete upload)
+# [ ] chunking (should we do it on the client or on the server?)
+
+
+@implementer(interfaces.IBlobsBackend)
+class FilesystemBlobsBackend(object):
+
+ def __init__(self, blobs_path='/tmp/blobs/', quota=200 * 1024):
+ self.quota = quota
+ if not os.path.isdir(blobs_path):
+ os.makedirs(blobs_path)
+ self.path = blobs_path
+
+ def read_blob(self, user, blob_id, request):
+ logger.info('reading blob: %s - %s' % (user, blob_id))
+ path = self._get_path(user, blob_id)
+ logger.debug('blob path: %s' % path)
+ _file = static.File(path, defaultType='application/octet-stream')
+ return _file.render_GET(request)
+
+ @defer.inlineCallbacks
+ def write_blob(self, user, blob_id, request):
+ path = self._get_path(user, blob_id)
+ try:
+ mkdir_p(os.path.split(path)[0])
+ except OSError:
+ pass
+ if os.path.isfile(path):
+ # 409 - Conflict
+ request.setResponseCode(409)
+ request.write("Blob already exists: %s" % blob_id)
+ defer.returnValue(None)
+ used = yield self.get_total_storage(user)
+ if used > self.quota:
+ logger.error("Error 507: Quota exceeded for user: %s" % user)
+ request.setResponseCode(507)
+ request.write('Quota Exceeded!')
+ defer.returnValue(None)
+ logger.info('writing blob: %s - %s' % (user, blob_id))
+ fbp = FileBodyProducer(request.content)
+ yield fbp.startProducing(open(path, 'wb'))
+
+ def delete_blob(self, user, blob_id):
+ blob_path = self._get_path(user, blob_id)
+ os.unlink(blob_path)
+
+ def get_blob_size(user, blob_id):
+ raise NotImplementedError
+
+ def list_blobs(self, user, request):
+ blob_ids = []
+ base_path = self._get_path(user)
+ for _, _, filenames in os.walk(base_path):
+ blob_ids += filenames
+ return json.dumps(blob_ids)
+
+ def get_total_storage(self, user):
+ return self._get_disk_usage(self._get_path(user))
+
+ def add_tag_header(self, user, blob_id, request):
+ with open(self._get_path(user, blob_id)) as doc_file:
+ doc_file.seek(-16, 2)
+ tag = base64.urlsafe_b64encode(doc_file.read())
+ request.responseHeaders.setRawHeaders('Tag', [tag])
+
+ @defer.inlineCallbacks
+ def _get_disk_usage(self, start_path):
+ if not os.path.isdir(start_path):
+ defer.returnValue(0)
+ cmd = ['/usr/bin/du', '-s', '-c', start_path]
+ output = yield utils.getProcessOutput(cmd[0], cmd[1:])
+ size = output.split()[0]
+ defer.returnValue(int(size))
+
+ def _validate_path(self, desired_path, user, blob_id):
+ if not VALID_STRINGS.match(user):
+ raise Exception("Invalid characters on user: %s" % user)
+ if blob_id and not VALID_STRINGS.match(blob_id):
+ raise Exception("Invalid characters on blob_id: %s" % blob_id)
+ desired_path = os.path.realpath(desired_path) # expand path references
+ root = os.path.realpath(self.path)
+ if not desired_path.startswith(root + os.sep + user):
+ err = "User %s tried accessing a invalid path: %s" % (user,
+ desired_path)
+ raise Exception(err)
+ return desired_path
+
+ def _get_path(self, user, blob_id=False):
+ parts = [user]
+ if blob_id:
+ parts += [blob_id[0], blob_id[0:3], blob_id[0:6]]
+ parts += [blob_id]
+ path = os.path.join(self.path, *parts)
+ return self._validate_path(path, user, blob_id)
+
+
+class ImproperlyConfiguredException(Exception):
+ pass
+
+
+class BlobsResource(resource.Resource):
+
+ isLeaf = True
+
+ # Allowed backend classes are defined here
+ handlers = {"filesystem": FilesystemBlobsBackend}
+
+ def __init__(self, backend, blobs_path, **backend_kwargs):
+ resource.Resource.__init__(self)
+ self._blobs_path = blobs_path
+ backend_kwargs.update({'blobs_path': blobs_path})
+ if backend not in self.handlers:
+ raise ImproperlyConfiguredException("No such backend: %s", backend)
+ self._handler = self.handlers[backend](**backend_kwargs)
+ assert interfaces.IBlobsBackend.providedBy(self._handler)
+
+ # TODO double check credentials, we can have then
+ # under request.
+
+ def render_GET(self, request):
+ logger.info("http get: %s" % request.path)
+ user, blob_id = self._validate(request)
+ if not blob_id:
+ return self._handler.list_blobs(user, request)
+ self._handler.add_tag_header(user, blob_id, request)
+ return self._handler.read_blob(user, blob_id, request)
+
+ def render_DELETE(self, request):
+ logger.info("http put: %s" % request.path)
+ user, blob_id = self._validate(request)
+ self._handler.delete_blob(user, blob_id)
+ return ''
+
+ def render_PUT(self, request):
+ logger.info("http put: %s" % request.path)
+ user, blob_id = self._validate(request)
+ d = self._handler.write_blob(user, blob_id, request)
+ d.addCallback(lambda _: request.finish())
+ d.addErrback(self._error, request)
+ return NOT_DONE_YET
+
+ def _error(self, e, request):
+ logger.error('Error processing request: %s' % e.getErrorMessage())
+ request.setResponseCode(500)
+ request.finish()
+
+ def _validate(self, request):
+ for arg in request.postpath:
+ if arg and not VALID_STRINGS.match(arg):
+ raise Exception('Invalid blob resource argument: %s' % arg)
+ return request.postpath
+
+
+if __name__ == '__main__':
+ # A dummy blob server
+ # curl -X PUT --data-binary @/tmp/book.pdf localhost:9000/user/someid
+ # curl -X GET -o /dev/null localhost:9000/user/somerandomstring
+ from twisted.python import log
+ import sys
+ log.startLogging(sys.stdout)
+
+ from twisted.web.server import Site
+ from twisted.internet import reactor
+
+ # parse command line arguments
+ import argparse
+
+ parser = argparse.ArgumentParser()
+ parser.add_argument('--port', default=9000, type=int)
+ parser.add_argument('--path', default='/tmp/blobs/user')
+ args = parser.parse_args()
+
+ root = BlobsResource("filesystem", args.path)
+ # I picture somethink like
+ # BlobsResource(backend="filesystem", backend_opts={'path': '/tmp/blobs'})
+
+ factory = Site(root)
+ reactor.listenTCP(args.port, factory)
+ reactor.run()
diff --git a/src/leap/soledad/server/_config.py b/src/leap/soledad/server/_config.py
new file mode 100644
index 00000000..e89e70d6
--- /dev/null
+++ b/src/leap/soledad/server/_config.py
@@ -0,0 +1,82 @@
+# -*- coding: utf-8 -*-
+# config.py
+# Copyright (C) 2016 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+import configparser
+
+
+__all__ = ['get_config']
+
+
+CONFIG_DEFAULTS = {
+ 'soledad-server': {
+ 'couch_url': 'http://localhost:5984',
+ 'create_cmd': None,
+ 'admin_netrc': '/etc/couchdb/couchdb-admin.netrc',
+ 'batching': True,
+ 'blobs': False,
+ 'blobs_path': '/srv/leap/soledad/blobs',
+ },
+ 'database-security': {
+ 'members': ['soledad'],
+ 'members_roles': [],
+ 'admins': [],
+ 'admins_roles': []
+ }
+}
+
+
+_config = None
+
+
+def get_config(section='soledad-server'):
+ global _config
+ if not _config:
+ _config = _load_config('/etc/soledad/soledad-server.conf')
+ return _config[section]
+
+
+def _load_config(file_path):
+ """
+ Load server configuration from file.
+
+ @param file_path: The path to the configuration file.
+ @type file_path: str
+
+ @return: A dictionary with the configuration.
+ @rtype: dict
+ """
+ conf = dict(CONFIG_DEFAULTS)
+ config = configparser.SafeConfigParser()
+ config.read(file_path)
+ for section in conf:
+ if not config.has_section(section):
+ continue
+ for key, value in conf[section].items():
+ if not config.has_option(section, key):
+ continue
+ elif type(value) == bool:
+ conf[section][key] = config.getboolean(section, key)
+ elif type(value) == list:
+ values = config.get(section, key).split(',')
+ values = [v.strip() for v in values]
+ conf[section][key] = values
+ else:
+ conf[section][key] = config.get(section, key)
+ # TODO: implement basic parsing/sanitization of options comming from
+ # config file.
+ return conf
diff --git a/src/leap/soledad/server/_resource.py b/src/leap/soledad/server/_resource.py
new file mode 100644
index 00000000..49c4b742
--- /dev/null
+++ b/src/leap/soledad/server/_resource.py
@@ -0,0 +1,86 @@
+# -*- coding: utf-8 -*-
+# resource.py
+# Copyright (C) 2016 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+A twisted resource that serves the Soledad Server.
+"""
+from twisted.web.resource import Resource
+
+from ._server_info import ServerInfo
+from ._wsgi import get_sync_resource
+
+
+__all__ = ['SoledadResource', 'SoledadAnonResource']
+
+
+class _Robots(Resource):
+ def render_GET(self, request):
+ return (
+ 'User-agent: *\n'
+ 'Disallow: /\n'
+ '# you are not a robot, are you???')
+
+
+class SoledadAnonResource(Resource):
+
+ """
+ The parts of Soledad Server that unauthenticated users can see.
+ This is nice because this means that a non-authenticated user will get 404
+ for anything that is not in this minimal resource tree.
+ """
+
+ def __init__(self, enable_blobs=False):
+ Resource.__init__(self)
+ server_info = ServerInfo(enable_blobs)
+ self.putChild('', server_info)
+ self.putChild('robots.txt', _Robots())
+
+
+class SoledadResource(Resource):
+ """
+ This is a dummy twisted resource, used only to allow different entry points
+ for the Soledad Server.
+ """
+
+ def __init__(self, blobs_resource=None, sync_pool=None):
+ """
+ Initialize the Soledad resource.
+
+ :param blobs_resource: a resource to serve blobs, if enabled.
+ :type blobs_resource: _blobs.BlobsResource
+
+ :param sync_pool: A pool to pass to the WSGI sync resource.
+ :type sync_pool: twisted.python.threadpool.ThreadPool
+ """
+ Resource.__init__(self)
+
+ # requests to / return server information
+ server_info = ServerInfo(bool(blobs_resource))
+ self.putChild('', server_info)
+
+ # requests to /blobs will serve blobs if enabled
+ if blobs_resource:
+ self.putChild('blobs', blobs_resource)
+
+ # other requests are routed to legacy sync resource
+ self._sync_resource = get_sync_resource(sync_pool)
+
+ def getChild(self, path, request):
+ """
+ Route requests to legacy WSGI sync resource dynamically.
+ """
+ request.postpath.insert(0, request.prepath.pop())
+ return self._sync_resource
diff --git a/src/leap/soledad/server/_server_info.py b/src/leap/soledad/server/_server_info.py
new file mode 100644
index 00000000..50659338
--- /dev/null
+++ b/src/leap/soledad/server/_server_info.py
@@ -0,0 +1,44 @@
+# -*- coding: utf-8 -*-
+# _server_info.py
+# Copyright (C) 2017 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Resource that announces information about the server.
+"""
+import json
+
+from twisted.web.resource import Resource
+
+from leap.soledad.server import __version__
+
+
+__all__ = ['ServerInfo']
+
+
+class ServerInfo(Resource):
+ """
+ Return information about the server.
+ """
+
+ isLeaf = True
+
+ def __init__(self, blobs_enabled):
+ self._info = {
+ "blobs": blobs_enabled,
+ "version": __version__
+ }
+
+ def render_GET(self, request):
+ return json.dumps(self._info)
diff --git a/src/leap/soledad/server/_wsgi.py b/src/leap/soledad/server/_wsgi.py
new file mode 100644
index 00000000..f6ff6b26
--- /dev/null
+++ b/src/leap/soledad/server/_wsgi.py
@@ -0,0 +1,65 @@
+# -*- coding: utf-8 -*-
+# application.py
+# Copyright (C) 2016 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+A WSGI application that serves Soledad synchronization.
+"""
+from twisted.internet import reactor
+from twisted.web.wsgi import WSGIResource
+
+from leap.soledad.server import SoledadApp
+from leap.soledad.server.gzip_middleware import GzipMiddleware
+from leap.soledad.common.backend import SoledadBackend
+from leap.soledad.common.couch.state import CouchServerState
+from leap.soledad.common.log import getLogger
+
+from twisted.logger import Logger
+log = Logger()
+
+__all__ = ['init_couch_state', 'get_sync_resource']
+
+
+def _get_couch_state(conf):
+ state = CouchServerState(conf['couch_url'], create_cmd=conf['create_cmd'],
+ check_schema_versions=True)
+ SoledadBackend.BATCH_SUPPORT = conf.get('batching', False)
+ return state
+
+
+_app = SoledadApp(None) # delay state init
+wsgi_application = GzipMiddleware(_app)
+
+
+# During its initialization, the couch state verifies if all user databases
+# contain a config document with the correct couch schema version stored, and
+# will log an error and raise an exception if that is not the case.
+#
+# If this verification made too early (i.e. before the reactor has started and
+# the twistd web logging facilities have been setup), the logging will not
+# work. Because of that, we delay couch state initialization until the reactor
+# is running.
+
+def init_couch_state(conf):
+ try:
+ _app.state = _get_couch_state(conf)
+ except Exception as e:
+ logger = getLogger()
+ logger.error(str(e))
+ reactor.stop()
+
+
+def get_sync_resource(pool):
+ return WSGIResource(reactor, pool, wsgi_application)
diff --git a/src/leap/soledad/server/auth.py b/src/leap/soledad/server/auth.py
new file mode 100644
index 00000000..1357b289
--- /dev/null
+++ b/src/leap/soledad/server/auth.py
@@ -0,0 +1,173 @@
+# -*- coding: utf-8 -*-
+# auth.py
+# Copyright (C) 2013 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Twisted http token auth.
+"""
+import binascii
+import time
+
+from hashlib import sha512
+from zope.interface import implementer
+
+from twisted.cred import error
+from twisted.cred.checkers import ICredentialsChecker
+from twisted.cred.credentials import IUsernamePassword
+from twisted.cred.credentials import IAnonymous
+from twisted.cred.credentials import Anonymous
+from twisted.cred.credentials import UsernamePassword
+from twisted.cred.portal import IRealm
+from twisted.cred.portal import Portal
+from twisted.internet import defer
+from twisted.logger import Logger
+from twisted.web.iweb import ICredentialFactory
+from twisted.web.resource import IResource
+
+from leap.soledad.common.couch import couch_server
+
+from ._resource import SoledadResource, SoledadAnonResource
+from ._blobs import BlobsResource
+from ._config import get_config
+
+
+log = Logger()
+
+
+@implementer(IRealm)
+class SoledadRealm(object):
+
+ def __init__(self, sync_pool, conf=None):
+ assert sync_pool is not None
+ if conf is None:
+ conf = get_config()
+ blobs = conf['blobs']
+ blobs_resource = BlobsResource("filesystem",
+ conf['blobs_path']) if blobs else None
+ self.anon_resource = SoledadAnonResource(
+ enable_blobs=blobs)
+ self.auth_resource = SoledadResource(
+ blobs_resource=blobs_resource,
+ sync_pool=sync_pool)
+
+ def requestAvatar(self, avatarId, mind, *interfaces):
+
+ # Anonymous access
+ if IAnonymous.providedBy(avatarId):
+ return (IResource, self.anon_resource,
+ lambda: None)
+
+ # Authenticated access
+ else:
+ if IResource in interfaces:
+ return (IResource, self.auth_resource,
+ lambda: None)
+ raise NotImplementedError()
+
+
+@implementer(ICredentialsChecker)
+class TokenChecker(object):
+
+ credentialInterfaces = [IUsernamePassword, IAnonymous]
+
+ TOKENS_DB_PREFIX = "tokens_"
+ TOKENS_DB_EXPIRE = 30 * 24 * 3600 # 30 days in seconds
+ TOKENS_TYPE_KEY = "type"
+ TOKENS_TYPE_DEF = "Token"
+ TOKENS_USER_ID_KEY = "user_id"
+
+ def __init__(self):
+ self._couch_url = get_config().get('couch_url')
+
+ def _get_server(self):
+ return couch_server(self._couch_url)
+
+ def _tokens_dbname(self):
+ # the tokens db rotates every 30 days, and the current db name is
+ # "tokens_NNN", where NNN is the number of seconds since epoch
+ # divide dby the rotate period in seconds. When rotating, old and
+ # new tokens db coexist during a certain window of time and valid
+ # tokens are replicated from the old db to the new one. See:
+ # https://leap.se/code/issues/6785
+ dbname = self.TOKENS_DB_PREFIX + \
+ str(int(time.time() / self.TOKENS_DB_EXPIRE))
+ return dbname
+
+ def _tokens_db(self):
+ dbname = self._tokens_dbname()
+
+ # TODO -- leaking abstraction here: this module shouldn't need
+ # to known anything about the context manager. hide that in the couch
+ # module
+ with self._get_server() as server:
+ db = server[dbname]
+ return db
+
+ def requestAvatarId(self, credentials):
+ if IAnonymous.providedBy(credentials):
+ return defer.succeed(Anonymous())
+
+ uuid = credentials.username
+ token = credentials.password
+
+ # lookup key is a hash of the token to prevent timing attacks.
+ # TODO cache the tokens already!
+
+ db = self._tokens_db()
+ token = db.get(sha512(token).hexdigest())
+ if token is None:
+ return defer.fail(error.UnauthorizedLogin())
+
+ # TODO -- use cryptography constant time builtin comparison.
+ # we compare uuid hashes to avoid possible timing attacks that
+ # might exploit python's builtin comparison operator behaviour,
+ # which fails immediatelly when non-matching bytes are found.
+ couch_uuid_hash = sha512(token[self.TOKENS_USER_ID_KEY]).digest()
+ req_uuid_hash = sha512(uuid).digest()
+ if token[self.TOKENS_TYPE_KEY] != self.TOKENS_TYPE_DEF \
+ or couch_uuid_hash != req_uuid_hash:
+ return defer.fail(error.UnauthorizedLogin())
+
+ return defer.succeed(uuid)
+
+
+@implementer(ICredentialFactory)
+class TokenCredentialFactory(object):
+
+ scheme = 'token'
+
+ def getChallenge(self, request):
+ return {}
+
+ def decode(self, response, request):
+ try:
+ creds = binascii.a2b_base64(response + b'===')
+ except binascii.Error:
+ raise error.LoginFailed('Invalid credentials')
+
+ creds = creds.split(b':', 1)
+ if len(creds) == 2:
+ return UsernamePassword(*creds)
+ else:
+ raise error.LoginFailed('Invalid credentials')
+
+
+def portalFactory(sync_pool):
+ realm = SoledadRealm(sync_pool=sync_pool)
+ checker = TokenChecker()
+ return Portal(realm, [checker])
+
+
+credentialFactory = TokenCredentialFactory()
diff --git a/src/leap/soledad/server/caching.py b/src/leap/soledad/server/caching.py
new file mode 100644
index 00000000..9a049a39
--- /dev/null
+++ b/src/leap/soledad/server/caching.py
@@ -0,0 +1,32 @@
+# -*- coding: utf-8 -*-
+# caching.py
+# Copyright (C) 2015 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Server side caching. Using beaker for now.
+"""
+from beaker.cache import CacheManager
+
+
+def setup_caching():
+ _cache_manager = CacheManager(type='memory')
+ return _cache_manager
+
+
+_cache_manager = setup_caching()
+
+
+def get_cache_for(key, expire=3600):
+ return _cache_manager.get_cache(key, expire=expire)
diff --git a/src/leap/soledad/server/entrypoint.py b/src/leap/soledad/server/entrypoint.py
new file mode 100644
index 00000000..c06b740e
--- /dev/null
+++ b/src/leap/soledad/server/entrypoint.py
@@ -0,0 +1,50 @@
+# -*- coding: utf-8 -*-
+# entrypoint.py
+# Copyright (C) 2016 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+The entrypoint for Soledad server.
+
+This is the entrypoint for the application that is loaded from the initscript
+or the systemd script.
+"""
+
+from twisted.internet import reactor
+from twisted.python import threadpool
+
+from .auth import portalFactory
+from .session import SoledadSession
+from ._config import get_config
+from ._wsgi import init_couch_state
+
+
+# load configuration from file
+conf = get_config()
+
+
+class SoledadEntrypoint(SoledadSession):
+
+ def __init__(self):
+ pool = threadpool.ThreadPool(name='wsgi')
+ reactor.callWhenRunning(pool.start)
+ reactor.addSystemEventTrigger('after', 'shutdown', pool.stop)
+ portal = portalFactory(pool)
+ SoledadSession.__init__(self, portal)
+
+
+# see the comments in application.py recarding why couch state has to be
+# initialized when the reactor is running
+
+reactor.callWhenRunning(init_couch_state, conf)
diff --git a/src/leap/soledad/server/gzip_middleware.py b/src/leap/soledad/server/gzip_middleware.py
new file mode 100644
index 00000000..c77f9f67
--- /dev/null
+++ b/src/leap/soledad/server/gzip_middleware.py
@@ -0,0 +1,67 @@
+# -*- coding: utf-8 -*-
+# gzip_middleware.py
+# Copyright (C) 2013 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Gzip middleware for WSGI apps.
+"""
+from six import StringIO
+from gzip import GzipFile
+
+
+class GzipMiddleware(object):
+ """
+ GzipMiddleware class for WSGI.
+ """
+ def __init__(self, app, compresslevel=9):
+ self.app = app
+ self.compresslevel = compresslevel
+
+ def __call__(self, environ, start_response):
+ if 'gzip' not in environ.get('HTTP_ACCEPT_ENCODING', ''):
+ return self.app(environ, start_response)
+
+ buffer = StringIO.StringIO()
+ output = GzipFile(
+ mode='wb',
+ compresslevel=self.compresslevel,
+ fileobj=buffer
+ )
+
+ start_response_args = []
+
+ def dummy_start_response(status, headers, exc_info=None):
+ start_response_args.append(status)
+ start_response_args.append(headers)
+ start_response_args.append(exc_info)
+ return output.write
+
+ app_iter = self.app(environ, dummy_start_response)
+ for line in app_iter:
+ output.write(line)
+ if hasattr(app_iter, 'close'):
+ app_iter.close()
+ output.close()
+ buffer.seek(0)
+ result = buffer.getvalue()
+ headers = []
+ for name, value in start_response_args[1]:
+ if name.lower() != 'content-length':
+ headers.append((name, value))
+ headers.append(('Content-Length', str(len(result))))
+ headers.append(('Content-Encoding', 'gzip'))
+ start_response(start_response_args[0], headers, start_response_args[2])
+ buffer.close()
+ return [result]
diff --git a/src/leap/soledad/server/interfaces.py b/src/leap/soledad/server/interfaces.py
new file mode 100644
index 00000000..67b04bc3
--- /dev/null
+++ b/src/leap/soledad/server/interfaces.py
@@ -0,0 +1,72 @@
+# -*- coding: utf-8 -*-
+# interfaces.py
+# Copyright (C) 2017 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+from zope.interface import Interface
+
+
+class IBlobsBackend(Interface):
+
+ """
+ An interface for a BlobsBackend.
+ """
+
+ def read_blob(user, blob_id, request):
+ """
+ Read blob with a given blob_id, and write it to the passed request.
+
+ :returns: a deferred that fires upon finishing.
+ """
+
+ def write_blob(user, blob_id, request):
+ """
+ Write blob to the storage, reading it from the passed request.
+
+ :returns: a deferred that fires upon finishing.
+ """
+
+ def delete_blob(user, blob_id):
+ """
+ Delete the given blob_id.
+ """
+
+ def get_blob_size(user, blob_id):
+ """
+ Get the size of the given blob id.
+ """
+
+ def list_blobs(user, request):
+ """
+ Returns a json-encoded list of ids from user's blob.
+
+ :returns: a deferred that fires upon finishing.
+ """
+
+ def get_total_storage(user):
+ """
+ Get the size used by a given user as the sum of all the blobs stored
+ unders its namespace.
+ """
+
+ def add_tag_header(user, blob_id, request):
+ """
+ Adds a header 'Tag' to the passed request object, containing the last
+ 16 bytes of the encoded blob, which according to the spec contains the
+ tag.
+
+ :returns: a deferred that fires upon finishing.
+ """
diff --git a/src/leap/soledad/server/session.py b/src/leap/soledad/server/session.py
new file mode 100644
index 00000000..1c1b5345
--- /dev/null
+++ b/src/leap/soledad/server/session.py
@@ -0,0 +1,107 @@
+# -*- coding: utf-8 -*-
+# session.py
+# Copyright (C) 2017 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Twisted resource containing an authenticated Soledad session.
+"""
+from zope.interface import implementer
+
+from twisted.cred.credentials import Anonymous
+from twisted.cred import error
+from twisted.python import log
+from twisted.web import util
+from twisted.web._auth import wrapper
+from twisted.web.guard import HTTPAuthSessionWrapper
+from twisted.web.resource import ErrorPage
+from twisted.web.resource import IResource
+
+from leap.soledad.server.auth import credentialFactory
+from leap.soledad.server.url_mapper import URLMapper
+
+
+@implementer(IResource)
+class UnauthorizedResource(wrapper.UnauthorizedResource):
+ isLeaf = True
+
+ def __init__(self):
+ pass
+
+ def render(self, request):
+ request.setResponseCode(401)
+ if request.method == b'HEAD':
+ return b''
+ return b'Unauthorized'
+
+ def getChildWithDefault(self, path, request):
+ return self
+
+
+@implementer(IResource)
+class SoledadSession(HTTPAuthSessionWrapper):
+
+ def __init__(self, portal):
+ self._mapper = URLMapper()
+ self._portal = portal
+ self._credentialFactory = credentialFactory
+ # expected by the contract of the parent class
+ self._credentialFactories = [credentialFactory]
+
+ def _matchPath(self, request):
+ match = self._mapper.match(request.path, request.method)
+ return match
+
+ def _parseHeader(self, header):
+ elements = header.split(b' ')
+ scheme = elements[0].lower()
+ if scheme == self._credentialFactory.scheme:
+ return (b' '.join(elements[1:]))
+ return None
+
+ def _authorizedResource(self, request):
+ # check whether the path of the request exists in the app
+ match = self._matchPath(request)
+ if not match:
+ return UnauthorizedResource()
+
+ # get authorization header or fail
+ header = request.getHeader(b'authorization')
+ if not header:
+ return util.DeferredResource(self._login(Anonymous()))
+
+ # parse the authorization header
+ auth_data = self._parseHeader(header)
+ if not auth_data:
+ return UnauthorizedResource()
+
+ # decode the credentials from the parsed header
+ try:
+ credentials = self._credentialFactory.decode(auth_data, request)
+ except error.LoginFailed:
+ return UnauthorizedResource()
+ except:
+ # If you port this to the newer log facility, be aware that
+ # the tests rely on the error to be logged.
+ log.err(None, "Unexpected failure from credentials factory")
+ return ErrorPage(500, None, None)
+
+ # make sure the uuid given in path corresponds to the one given in
+ # the credentials
+ request_uuid = match.get('uuid')
+ if request_uuid and request_uuid != credentials.username:
+ return ErrorPage(500, None, None)
+
+ # if all checks pass, try to login with credentials
+ return util.DeferredResource(self._login(credentials))
diff --git a/src/leap/soledad/server/state.py b/src/leap/soledad/server/state.py
new file mode 100644
index 00000000..f269b77e
--- /dev/null
+++ b/src/leap/soledad/server/state.py
@@ -0,0 +1,141 @@
+# -*- coding: utf-8 -*-
+# state.py
+# Copyright (C) 2015 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Server side synchronization infrastructure.
+"""
+from leap.soledad.server import caching
+
+
+class ServerSyncState(object):
+ """
+ The state of one sync session, as stored on backend server.
+
+ On server side, the ongoing syncs metadata is maintained in
+ a caching layer.
+ """
+
+ def __init__(self, source_replica_uid, sync_id):
+ """
+ Initialize the sync state object.
+
+ :param sync_id: The id of current sync
+ :type sync_id: str
+ :param source_replica_uid: The source replica uid
+ :type source_replica_uid: str
+ """
+ self._source_replica_uid = source_replica_uid
+ self._sync_id = sync_id
+ caching_key = source_replica_uid + sync_id
+ self._storage = caching.get_cache_for(caching_key)
+
+ def _put_dict_info(self, key, value):
+ """
+ Put some information about the sync state.
+
+ :param key: The key for the info to be put.
+ :type key: str
+ :param value: The value for the info to be put.
+ :type value: str
+ """
+ if key not in self._storage:
+ self._storage[key] = []
+ info_list = self._storage.get(key)
+ info_list.append(value)
+ self._storage[key] = info_list
+
+ def put_seen_id(self, seen_id, gen):
+ """
+ Put one seen id on the sync state.
+
+ :param seen_id: The doc_id of a document seen during sync.
+ :type seen_id: str
+ :param gen: The corresponding db generation.
+ :type gen: int
+ """
+ self._put_dict_info(
+ 'seen_id',
+ (seen_id, gen))
+
+ def seen_ids(self):
+ """
+ Return all document ids seen during the sync.
+
+ :return: A dict with doc ids seen during the sync.
+ :rtype: dict
+ """
+ if 'seen_id' in self._storage:
+ seen_ids = self._storage.get('seen_id')
+ else:
+ seen_ids = []
+ return dict(seen_ids)
+
+ def put_changes_to_return(self, gen, trans_id, changes_to_return):
+ """
+ Put the calculated changes to return in the backend sync state.
+
+ :param gen: The target database generation that will be synced.
+ :type gen: int
+ :param trans_id: The target database transaction id that will be
+ synced.
+ :type trans_id: str
+ :param changes_to_return: A list of tuples with the changes to be
+ returned during the sync process.
+ :type changes_to_return: list
+ """
+ self._put_dict_info(
+ 'changes_to_return',
+ {
+ 'gen': gen,
+ 'trans_id': trans_id,
+ 'changes_to_return': changes_to_return,
+ }
+ )
+
+ def sync_info(self):
+ """
+ Return information about the current sync state.
+
+ :return: The generation and transaction id of the target database
+ which will be synced, and the number of documents to return,
+ or a tuple of Nones if those have not already been sent to
+ server.
+ :rtype: tuple
+ """
+ gen = trans_id = number_of_changes = None
+ if 'changes_to_return' in self._storage:
+ info = self._storage.get('changes_to_return')[0]
+ gen = info['gen']
+ trans_id = info['trans_id']
+ number_of_changes = len(info['changes_to_return'])
+ return gen, trans_id, number_of_changes
+
+ def next_change_to_return(self, received):
+ """
+ Return the next change to be returned to the source syncing replica.
+
+ :param received: How many documents the source replica has already
+ received during the current sync process.
+ :type received: int
+ """
+ gen = trans_id = next_change_to_return = None
+ if 'changes_to_return' in self._storage:
+ info = self._storage.get('changes_to_return')[0]
+ gen = info['gen']
+ trans_id = info['trans_id']
+ if received < len(info['changes_to_return']):
+ next_change_to_return = (info['changes_to_return'][received])
+ return gen, trans_id, next_change_to_return
diff --git a/src/leap/soledad/server/sync.py b/src/leap/soledad/server/sync.py
new file mode 100644
index 00000000..6791c06c
--- /dev/null
+++ b/src/leap/soledad/server/sync.py
@@ -0,0 +1,305 @@
+# -*- coding: utf-8 -*-
+# sync.py
+# Copyright (C) 2014 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Server side synchronization infrastructure.
+"""
+import time
+from six.moves import zip as izip
+
+from leap.soledad.common.l2db import sync
+from leap.soledad.common.l2db.remote import http_app
+from leap.soledad.server.caching import get_cache_for
+from leap.soledad.server.state import ServerSyncState
+from leap.soledad.common.document import ServerDocument
+
+
+MAX_REQUEST_SIZE = float('inf') # It's a stream.
+MAX_ENTRY_SIZE = 200 # in Mb
+ENTRY_CACHE_SIZE = 8192 * 1024
+
+
+class SyncExchange(sync.SyncExchange):
+
+ def __init__(self, db, source_replica_uid, last_known_generation, sync_id):
+ """
+ :param db: The target syncing database.
+ :type db: SoledadBackend
+ :param source_replica_uid: The uid of the source syncing replica.
+ :type source_replica_uid: str
+ :param last_known_generation: The last target replica generation the
+ source replica knows about.
+ :type last_known_generation: int
+ :param sync_id: The id of the current sync session.
+ :type sync_id: str
+ """
+ self._db = db
+ self.source_replica_uid = source_replica_uid
+ self.source_last_known_generation = last_known_generation
+ self.sync_id = sync_id
+ self.new_gen = None
+ self.new_trans_id = None
+ self._trace_hook = None
+ # recover sync state
+ self._sync_state = ServerSyncState(self.source_replica_uid, sync_id)
+
+ def find_changes_to_return(self):
+ """
+ Find changes to return.
+
+ Find changes since last_known_generation in db generation
+ order using whats_changed. It excludes documents ids that have
+ already been considered (superseded by the sender, etc).
+
+ :return: the generation of this database, which the caller can
+ consider themselves to be synchronized after processing
+ allreturned documents, and the amount of documents to be sent
+ to the source syncing replica.
+ :rtype: int
+ """
+ # check if changes to return have already been calculated
+ new_gen, new_trans_id, number_of_changes = self._sync_state.sync_info()
+ if number_of_changes is None:
+ self._trace('before whats_changed')
+ new_gen, new_trans_id, changes = self._db.whats_changed(
+ self.source_last_known_generation)
+ self._trace('after whats_changed')
+ seen_ids = self._sync_state.seen_ids()
+ # changed docs that weren't superseded by or converged with
+ self.changes_to_return = [
+ (doc_id, gen, trans_id) for (doc_id, gen, trans_id) in changes
+ # there was a subsequent update
+ if doc_id not in seen_ids or seen_ids.get(doc_id) < gen]
+ self._sync_state.put_changes_to_return(
+ new_gen, new_trans_id, self.changes_to_return)
+ number_of_changes = len(self.changes_to_return)
+ self.new_gen = new_gen
+ self.new_trans_id = new_trans_id
+ return self.new_gen, number_of_changes
+
+ def return_docs(self, return_doc_cb):
+ """Return the changed documents and their last change generation
+ repeatedly invoking the callback return_doc_cb.
+
+ The final step of a sync exchange.
+
+ :param: return_doc_cb(doc, gen, trans_id): is a callback
+ used to return the documents with their last change generation
+ to the target replica.
+ :return: None
+ """
+ changes_to_return = self.changes_to_return
+ # return docs, including conflicts.
+ # content as a file-object (will be read when writing)
+ changed_doc_ids = [doc_id for doc_id, _, _ in changes_to_return]
+ docs = self._db.get_docs(
+ changed_doc_ids, check_for_conflicts=False,
+ include_deleted=True, read_content=False)
+
+ docs_by_gen = izip(
+ docs, (gen for _, gen, _ in changes_to_return),
+ (trans_id for _, _, trans_id in changes_to_return))
+ for doc, gen, trans_id in docs_by_gen:
+ return_doc_cb(doc, gen, trans_id)
+
+ def batched_insert_from_source(self, entries, sync_id):
+ if not entries:
+ return
+ self._db.batch_start()
+ for entry in entries:
+ doc, gen, trans_id, number_of_docs, doc_idx = entry
+ self.insert_doc_from_source(doc, gen, trans_id, number_of_docs,
+ doc_idx, sync_id)
+ self._db.batch_end()
+
+ def insert_doc_from_source(
+ self, doc, source_gen, trans_id,
+ number_of_docs=None, doc_idx=None, sync_id=None):
+ """Try to insert synced document from source.
+
+ Conflicting documents are not inserted but will be sent over
+ to the sync source.
+
+ It keeps track of progress by storing the document source
+ generation as well.
+
+ The 1st step of a sync exchange is to call this repeatedly to
+ try insert all incoming documents from the source.
+
+ :param doc: A Document object.
+ :type doc: Document
+ :param source_gen: The source generation of doc.
+ :type source_gen: int
+ :param trans_id: The transaction id of that document change.
+ :type trans_id: str
+ :param number_of_docs: The total amount of documents sent on this sync
+ session.
+ :type number_of_docs: int
+ :param doc_idx: The index of the current document.
+ :type doc_idx: int
+ :param sync_id: The id of the current sync session.
+ :type sync_id: str
+ """
+ state, at_gen = self._db._put_doc_if_newer(
+ doc, save_conflict=False, replica_uid=self.source_replica_uid,
+ replica_gen=source_gen, replica_trans_id=trans_id,
+ number_of_docs=number_of_docs, doc_idx=doc_idx, sync_id=sync_id)
+ if state == 'inserted':
+ self._sync_state.put_seen_id(doc.doc_id, at_gen)
+ elif state == 'converged':
+ # magical convergence
+ self._sync_state.put_seen_id(doc.doc_id, at_gen)
+ elif state == 'superseded':
+ # we have something newer that we will return
+ pass
+ else:
+ # conflict that we will returne
+ assert state == 'conflicted'
+
+
+class SyncResource(http_app.SyncResource):
+
+ max_request_size = MAX_REQUEST_SIZE * 1024 * 1024
+ max_entry_size = MAX_ENTRY_SIZE * 1024 * 1024
+
+ sync_exchange_class = SyncExchange
+
+ @http_app.http_method(
+ last_known_generation=int, last_known_trans_id=http_app.none_or_str,
+ sync_id=http_app.none_or_str, content_as_args=True)
+ def post_args(self, last_known_generation, last_known_trans_id=None,
+ sync_id=None, ensure=False):
+ """
+ Handle the initial arguments for the sync POST request from client.
+
+ :param last_known_generation: The last server replica generation the
+ client knows about.
+ :type last_known_generation: int
+ :param last_known_trans_id: The last server replica transaction_id the
+ client knows about.
+ :type last_known_trans_id: str
+ :param sync_id: The id of the current sync session.
+ :type sync_id: str
+ :param ensure: Whether the server replica should be created if it does
+ not already exist.
+ :type ensure: bool
+ """
+ # create or open the database
+ cache = get_cache_for('db-' + sync_id + self.dbname, expire=120)
+ if ensure:
+ db, self.replica_uid = self.state.ensure_database(self.dbname)
+ else:
+ db = self.state.open_database(self.dbname)
+ db.init_caching(cache)
+ # validate the information the client has about server replica
+ db.validate_gen_and_trans_id(
+ last_known_generation, last_known_trans_id)
+ # get a sync exchange object
+ self.sync_exch = self.sync_exchange_class(
+ db, self.source_replica_uid, last_known_generation, sync_id)
+ self._sync_id = sync_id
+ self._staging = []
+ self._staging_size = 0
+
+ @http_app.http_method(content_as_args=True)
+ def post_put(
+ self, id, rev, content, gen,
+ trans_id, number_of_docs, doc_idx):
+ """
+ Put one incoming document into the server replica.
+
+ :param id: The id of the incoming document.
+ :type id: str
+ :param rev: The revision of the incoming document.
+ :type rev: str
+ :param content: The content of the incoming document.
+ :type content: dict
+ :param gen: The source replica generation corresponding to the
+ revision of the incoming document.
+ :type gen: int
+ :param trans_id: The source replica transaction id corresponding to
+ the revision of the incoming document.
+ :type trans_id: str
+ :param number_of_docs: The total amount of documents sent on this sync
+ session.
+ :type number_of_docs: int
+ :param doc_idx: The index of the current document.
+ :type doc_idx: int
+ """
+ doc = ServerDocument(id, rev, json=content)
+ self._staging_size += len(content or '')
+ self._staging.append((doc, gen, trans_id, number_of_docs, doc_idx))
+ if self._staging_size > ENTRY_CACHE_SIZE or doc_idx == number_of_docs:
+ self.sync_exch.batched_insert_from_source(self._staging,
+ self._sync_id)
+ self._staging = []
+ self._staging_size = 0
+
+ def post_get(self):
+ """
+ Return syncing documents to the client.
+ """
+ def send_doc(doc, gen, trans_id):
+ entry = dict(id=doc.doc_id, rev=doc.rev,
+ gen=gen, trans_id=trans_id)
+ self.responder.stream_entry(entry)
+ content_reader = doc.get_json()
+ if content_reader:
+ content = content_reader.read()
+ self.responder.stream_entry(content)
+ content_reader.close()
+ # throttle at 5mb/s
+ # FIXME: twistd cant control througput
+ # we need to either use gunicorn or go async
+ time.sleep(len(content) / (5.0 * 1024 * 1024))
+ else:
+ self.responder.stream_entry('')
+
+ new_gen, number_of_changes = \
+ self.sync_exch.find_changes_to_return()
+ self.responder.content_type = 'application/x-u1db-sync-response'
+ self.responder.start_response(200)
+ self.responder.start_stream(),
+ header = {
+ "new_generation": new_gen,
+ "new_transaction_id": self.sync_exch.new_trans_id,
+ "number_of_changes": number_of_changes,
+ }
+ if self.replica_uid is not None:
+ header['replica_uid'] = self.replica_uid
+ self.responder.stream_entry(header)
+ self.sync_exch.return_docs(send_doc)
+ self.responder.end_stream()
+ self.responder.finish_response()
+
+ def post_end(self):
+ """
+ Return the current generation and transaction_id after inserting one
+ incoming document.
+ """
+ self.responder.content_type = 'application/x-soledad-sync-response'
+ self.responder.start_response(200)
+ self.responder.start_stream(),
+ new_gen, new_trans_id = self.sync_exch._db._get_generation_info()
+ header = {
+ "new_generation": new_gen,
+ "new_transaction_id": new_trans_id,
+ }
+ if self.replica_uid is not None:
+ header['replica_uid'] = self.replica_uid
+ self.responder.stream_entry(header)
+ self.responder.end_stream()
+ self.responder.finish_response()
diff --git a/src/leap/soledad/server/url_mapper.py b/src/leap/soledad/server/url_mapper.py
new file mode 100644
index 00000000..b50a81cd
--- /dev/null
+++ b/src/leap/soledad/server/url_mapper.py
@@ -0,0 +1,77 @@
+# -*- coding: utf-8 -*-
+# url_mapper.py
+# Copyright (C) 2013 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+An URL mapper that represents authorized paths.
+"""
+from routes.mapper import Mapper
+
+from leap.soledad.common import SHARED_DB_NAME
+from leap.soledad.common.l2db import DBNAME_CONSTRAINTS
+
+
+class URLMapper(object):
+ """
+ Maps the URLs users can access.
+ """
+
+ def __init__(self):
+ self._map = Mapper(controller_scan=None)
+ self._connect_urls()
+ self._map.create_regs()
+
+ def match(self, path, method):
+ environ = {'PATH_INFO': path, 'REQUEST_METHOD': method}
+ return self._map.match(environ=environ)
+
+ def _connect(self, pattern, http_methods):
+ self._map.connect(
+ None, pattern, http_methods=http_methods,
+ conditions=dict(method=http_methods),
+ requirements={'dbname': DBNAME_CONSTRAINTS})
+
+ def _connect_urls(self):
+ """
+ Register the authorization info in the mapper using C{SHARED_DB_NAME}
+ as the user's database name.
+
+ This method sets up the following authorization rules:
+
+ URL path | Authorized actions
+ ----------------------------------------------------
+ / | GET
+ /robots.txt | GET
+ /shared-db | GET
+ /shared-db/doc/{any_id} | GET, PUT, DELETE
+ /user-{uuid}/sync-from/{source} | GET, PUT, POST
+ /blobs/{uuid}/{blob_id} | GET, PUT, POST
+ /blobs/{uuid} | GET
+ """
+ # auth info for global resource
+ self._connect('/', ['GET'])
+ # robots
+ self._connect('/robots.txt', ['GET'])
+ # auth info for shared-db database resource
+ self._connect('/%s' % SHARED_DB_NAME, ['GET'])
+ # auth info for shared-db doc resource
+ self._connect('/%s/doc/{id:.*}' % SHARED_DB_NAME,
+ ['GET', 'PUT', 'DELETE'])
+ # auth info for user-db sync resource
+ self._connect('/user-{uuid}/sync-from/{source_replica_uid}',
+ ['GET', 'PUT', 'POST'])
+ # auth info for blobs resource
+ self._connect('/blobs/{uuid}/{blob_id}', ['GET', 'PUT'])
+ self._connect('/blobs/{uuid}', ['GET'])