From 3e94cafa43d464d73815e21810b97a4faf54136d Mon Sep 17 00:00:00 2001 From: drebs Date: Sun, 18 Jun 2017 11:18:10 -0300 Subject: [pkg] unify client and server into a single python package We have been discussing about this merge for a while. Its main goal is to simplify things: code navigation, but also packaging. The rationale is that the code is more cohesive in this way, and there's only one source package to install. Dependencies that are only for the server or the client will not be installed by default, and they are expected to be provided by the environment. There are setuptools extras defined for the client and the server. Debianization is still expected to split the single source package into 3 binaries. Another avantage is that the documentation can now install a single package with a single step, and therefore include the docstrings into the generated docs. - Resolves: #8896 --- src/leap/soledad/common/l2db/remote/__init__.py | 15 + src/leap/soledad/common/l2db/remote/http_app.py | 660 +++++++++++++++++++++ src/leap/soledad/common/l2db/remote/http_client.py | 178 ++++++ .../soledad/common/l2db/remote/http_database.py | 158 +++++ src/leap/soledad/common/l2db/remote/http_errors.py | 48 ++ src/leap/soledad/common/l2db/remote/http_target.py | 125 ++++ .../soledad/common/l2db/remote/server_state.py | 68 +++ .../common/l2db/remote/ssl_match_hostname.py | 65 ++ src/leap/soledad/common/l2db/remote/utils.py | 23 + 9 files changed, 1340 insertions(+) create mode 100644 src/leap/soledad/common/l2db/remote/__init__.py create mode 100644 src/leap/soledad/common/l2db/remote/http_app.py create mode 100644 src/leap/soledad/common/l2db/remote/http_client.py create mode 100644 src/leap/soledad/common/l2db/remote/http_database.py create mode 100644 src/leap/soledad/common/l2db/remote/http_errors.py create mode 100644 src/leap/soledad/common/l2db/remote/http_target.py create mode 100644 src/leap/soledad/common/l2db/remote/server_state.py create mode 100644 src/leap/soledad/common/l2db/remote/ssl_match_hostname.py create mode 100644 src/leap/soledad/common/l2db/remote/utils.py (limited to 'src/leap/soledad/common/l2db/remote') diff --git a/src/leap/soledad/common/l2db/remote/__init__.py b/src/leap/soledad/common/l2db/remote/__init__.py new file mode 100644 index 00000000..3f32e381 --- /dev/null +++ b/src/leap/soledad/common/l2db/remote/__init__.py @@ -0,0 +1,15 @@ +# Copyright 2011 Canonical Ltd. +# +# This file is part of u1db. +# +# u1db is free software: you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License version 3 +# as published by the Free Software Foundation. +# +# u1db is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with u1db. If not, see . diff --git a/src/leap/soledad/common/l2db/remote/http_app.py b/src/leap/soledad/common/l2db/remote/http_app.py new file mode 100644 index 00000000..a4eddb36 --- /dev/null +++ b/src/leap/soledad/common/l2db/remote/http_app.py @@ -0,0 +1,660 @@ +# Copyright 2011-2012 Canonical Ltd. +# Copyright 2016 LEAP Encryption Access Project +# +# This file is part of u1db. +# +# u1db is free software: you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License version 3 +# as published by the Free Software Foundation. +# +# u1db is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with u1db. If not, see . + +""" +HTTP Application exposing U1DB. +""" +# TODO -- deprecate, use twisted/txaio. + +import functools +import six.moves.http_client as httplib +import inspect +import json +import sys +import six.moves.urllib.parse as urlparse + +import routes.mapper + +from leap.soledad.common.l2db import ( + __version__ as _u1db_version, + DBNAME_CONSTRAINTS, Document, + errors, sync) +from leap.soledad.common.l2db.remote import http_errors, utils + + +def parse_bool(expression): + """Parse boolean querystring parameter.""" + if expression == 'true': + return True + return False + + +def parse_list(expression): + if not expression: + return [] + return [t.strip() for t in expression.split(',')] + + +def none_or_str(expression): + if expression is None: + return None + return str(expression) + + +class BadRequest(Exception): + """Bad request.""" + + +class _FencedReader(object): + """Read and get lines from a file but not past a given length.""" + + MAXCHUNK = 8192 + + def __init__(self, rfile, total, max_entry_size): + self.rfile = rfile + self.remaining = total + self.max_entry_size = max_entry_size + self._kept = None + + def read_chunk(self, atmost): + if self._kept is not None: + # ignore atmost, kept data should be a subchunk anyway + kept, self._kept = self._kept, None + return kept + if self.remaining == 0: + return '' + data = self.rfile.read(min(self.remaining, atmost)) + self.remaining -= len(data) + return data + + def getline(self): + line_parts = [] + size = 0 + while True: + chunk = self.read_chunk(self.MAXCHUNK) + if chunk == '': + break + nl = chunk.find("\n") + if nl != -1: + size += nl + 1 + if size > self.max_entry_size: + raise BadRequest + line_parts.append(chunk[:nl + 1]) + rest = chunk[nl + 1:] + self._kept = rest or None + break + else: + size += len(chunk) + if size > self.max_entry_size: + raise BadRequest + line_parts.append(chunk) + return ''.join(line_parts) + + +def http_method(**control): + """Decoration for handling of query arguments and content for a HTTP + method. + + args and content here are the query arguments and body of the incoming + HTTP requests. + + Match query arguments to python method arguments: + w = http_method()(f) + w(self, args, content) => args["content"]=content; + f(self, **args) + + JSON deserialize content to arguments: + w = http_method(content_as_args=True,...)(f) + w(self, args, content) => args.update(json.loads(content)); + f(self, **args) + + Support conversions (e.g int): + w = http_method(Arg=Conv,...)(f) + w(self, args, content) => args["Arg"]=Conv(args["Arg"]); + f(self, **args) + + Enforce no use of query arguments: + w = http_method(no_query=True,...)(f) + w(self, args, content) raises BadRequest if args is not empty + + Argument mismatches, deserialisation failures produce BadRequest. + """ + content_as_args = control.pop('content_as_args', False) + no_query = control.pop('no_query', False) + conversions = control.items() + + def wrap(f): + argspec = inspect.getargspec(f) + assert argspec.args[0] == "self" + nargs = len(argspec.args) + ndefaults = len(argspec.defaults or ()) + required_args = set(argspec.args[1:nargs - ndefaults]) + all_args = set(argspec.args) + + @functools.wraps(f) + def wrapper(self, args, content): + if no_query and args: + raise BadRequest() + if content is not None: + if content_as_args: + try: + args.update(json.loads(content)) + except ValueError: + raise BadRequest() + else: + args["content"] = content + if not (required_args <= set(args) <= all_args): + raise BadRequest("Missing required arguments.") + for name, conv in conversions: + if name not in args: + continue + try: + args[name] = conv(args[name]) + except ValueError: + raise BadRequest() + return f(self, **args) + + return wrapper + + return wrap + + +class URLToResource(object): + """Mappings from URLs to resources.""" + + def __init__(self): + self._map = routes.mapper.Mapper(controller_scan=None) + + def register(self, resource_cls): + # register + self._map.connect(None, resource_cls.url_pattern, + resource_cls=resource_cls, + requirements={"dbname": DBNAME_CONSTRAINTS}) + self._map.create_regs() + return resource_cls + + def match(self, path): + params = self._map.match(path) + if params is None: + return None, None + resource_cls = params.pop('resource_cls') + return resource_cls, params + + +url_to_resource = URLToResource() + + +@url_to_resource.register +class GlobalResource(object): + """Global (root) resource.""" + + url_pattern = "/" + + def __init__(self, state, responder): + self.state = state + self.responder = responder + + @http_method() + def get(self): + info = self.state.global_info() + info['version'] = _u1db_version + self.responder.send_response_json(**info) + + +@url_to_resource.register +class DatabaseResource(object): + """Database resource.""" + + url_pattern = "/{dbname}" + + def __init__(self, dbname, state, responder): + self.dbname = dbname + self.state = state + self.responder = responder + + @http_method() + def get(self): + self.state.check_database(self.dbname) + self.responder.send_response_json(200) + + @http_method(content_as_args=True) + def put(self): + self.state.ensure_database(self.dbname) + self.responder.send_response_json(200, ok=True) + + @http_method() + def delete(self): + self.state.delete_database(self.dbname) + self.responder.send_response_json(200, ok=True) + + +@url_to_resource.register +class DocsResource(object): + """Documents resource.""" + + url_pattern = "/{dbname}/docs" + + def __init__(self, dbname, state, responder): + self.responder = responder + self.db = state.open_database(dbname) + + @http_method(doc_ids=parse_list, check_for_conflicts=parse_bool, + include_deleted=parse_bool) + def get(self, doc_ids=None, check_for_conflicts=True, + include_deleted=False): + if doc_ids is None: + raise errors.MissingDocIds + docs = self.db.get_docs(doc_ids, include_deleted=include_deleted) + self.responder.content_type = 'application/json' + self.responder.start_response(200) + self.responder.start_stream(), + for doc in docs: + entry = dict( + doc_id=doc.doc_id, doc_rev=doc.rev, content=doc.get_json(), + has_conflicts=doc.has_conflicts) + self.responder.stream_entry(entry) + self.responder.end_stream() + self.responder.finish_response() + + +@url_to_resource.register +class AllDocsResource(object): + """All Documents resource.""" + + url_pattern = "/{dbname}/all-docs" + + def __init__(self, dbname, state, responder): + self.responder = responder + self.db = state.open_database(dbname) + + @http_method(include_deleted=parse_bool) + def get(self, include_deleted=False): + gen, docs = self.db.get_all_docs(include_deleted=include_deleted) + self.responder.content_type = 'application/json' + # returning a x-u1db-generation header is optional + # HTTPDatabase will fallback to return -1 if it's missing + self.responder.start_response(200, + headers={'x-u1db-generation': str(gen)}) + self.responder.start_stream(), + for doc in docs: + entry = dict( + doc_id=doc.doc_id, doc_rev=doc.rev, content=doc.get_json(), + has_conflicts=doc.has_conflicts) + self.responder.stream_entry(entry) + self.responder.end_stream() + self.responder.finish_response() + + +@url_to_resource.register +class DocResource(object): + """Document resource.""" + + url_pattern = "/{dbname}/doc/{id:.*}" + + def __init__(self, dbname, id, state, responder): + self.id = id + self.responder = responder + self.db = state.open_database(dbname) + + @http_method(old_rev=str) + def put(self, content, old_rev=None): + doc = Document(self.id, old_rev, content) + doc_rev = self.db.put_doc(doc) + if old_rev is None: + status = 201 # created + else: + status = 200 + self.responder.send_response_json(status, rev=doc_rev) + + @http_method(old_rev=str) + def delete(self, old_rev=None): + doc = Document(self.id, old_rev, None) + self.db.delete_doc(doc) + self.responder.send_response_json(200, rev=doc.rev) + + @http_method(include_deleted=parse_bool) + def get(self, include_deleted=False): + doc = self.db.get_doc(self.id, include_deleted=include_deleted) + if doc is None: + wire_descr = errors.DocumentDoesNotExist.wire_description + self.responder.send_response_json( + http_errors.wire_description_to_status[wire_descr], + error=wire_descr, + headers={ + 'x-u1db-rev': '', + 'x-u1db-has-conflicts': 'false' + }) + return + headers = { + 'x-u1db-rev': doc.rev, + 'x-u1db-has-conflicts': json.dumps(doc.has_conflicts) + } + if doc.is_tombstone(): + self.responder.send_response_json( + http_errors.wire_description_to_status[ + errors.DOCUMENT_DELETED], + error=errors.DOCUMENT_DELETED, + headers=headers) + else: + self.responder.send_response_content( + doc.get_json(), headers=headers) + + +@url_to_resource.register +class SyncResource(object): + """Sync endpoint resource.""" + + # maximum allowed request body size + max_request_size = 15 * 1024 * 1024 # 15Mb + # maximum allowed entry/line size in request body + max_entry_size = 10 * 1024 * 1024 # 10Mb + + url_pattern = "/{dbname}/sync-from/{source_replica_uid}" + + # pluggable + sync_exchange_class = sync.SyncExchange + + def __init__(self, dbname, source_replica_uid, state, responder): + self.source_replica_uid = source_replica_uid + self.responder = responder + self.state = state + self.dbname = dbname + self.replica_uid = None + + def get_target(self): + return self.state.open_database(self.dbname).get_sync_target() + + @http_method() + def get(self): + result = self.get_target().get_sync_info(self.source_replica_uid) + self.responder.send_response_json( + target_replica_uid=result[0], target_replica_generation=result[1], + target_replica_transaction_id=result[2], + source_replica_uid=self.source_replica_uid, + source_replica_generation=result[3], + source_transaction_id=result[4]) + + @http_method(generation=int, + content_as_args=True, no_query=True) + def put(self, generation, transaction_id): + self.get_target().record_sync_info(self.source_replica_uid, + generation, + transaction_id) + self.responder.send_response_json(ok=True) + + # Implements the same logic as LocalSyncTarget.sync_exchange + + @http_method(last_known_generation=int, last_known_trans_id=none_or_str, + content_as_args=True) + def post_args(self, last_known_generation, last_known_trans_id=None, + ensure=False): + if ensure: + db, self.replica_uid = self.state.ensure_database(self.dbname) + else: + db = self.state.open_database(self.dbname) + db.validate_gen_and_trans_id( + last_known_generation, last_known_trans_id) + self.sync_exch = self.sync_exchange_class( + db, self.source_replica_uid, last_known_generation) + + @http_method(content_as_args=True) + def post_stream_entry(self, id, rev, content, gen, trans_id): + doc = Document(id, rev, content) + self.sync_exch.insert_doc_from_source(doc, gen, trans_id) + + def post_end(self): + + def send_doc(doc, gen, trans_id): + entry = dict(id=doc.doc_id, rev=doc.rev, content=doc.get_json(), + gen=gen, trans_id=trans_id) + self.responder.stream_entry(entry) + + new_gen = self.sync_exch.find_changes_to_return() + self.responder.content_type = 'application/x-u1db-sync-stream' + self.responder.start_response(200) + self.responder.start_stream(), + header = {"new_generation": new_gen, + "new_transaction_id": self.sync_exch.new_trans_id} + if self.replica_uid is not None: + header['replica_uid'] = self.replica_uid + self.responder.stream_entry(header) + self.sync_exch.return_docs(send_doc) + self.responder.end_stream() + self.responder.finish_response() + + +class HTTPResponder(object): + """Encode responses from the server back to the client.""" + + # a multi document response will put args and documents + # each on one line of the response body + + def __init__(self, start_response): + self._started = False + self._stream_state = -1 + self._no_initial_obj = True + self.sent_response = False + self._start_response = start_response + self._write = None + self.content_type = 'application/json' + self.content = [] + + def start_response(self, status, obj_dic=None, headers={}): + """start sending response with optional first json object.""" + if self._started: + return + self._started = True + status_text = httplib.responses[status] + self._write = self._start_response( + '%d %s' % (status, status_text), + [('content-type', self.content_type), + ('cache-control', 'no-cache')] + + headers.items()) + # xxx version in headers + if obj_dic is not None: + self._no_initial_obj = False + self._write(json.dumps(obj_dic) + "\r\n") + + def finish_response(self): + """finish sending response.""" + self.sent_response = True + + def send_response_json(self, status=200, headers={}, **kwargs): + """send and finish response with json object body from keyword args.""" + content = json.dumps(kwargs) + "\r\n" + self.send_response_content(content, headers=headers, status=status) + + def send_response_content(self, content, status=200, headers={}): + """send and finish response with content""" + headers['content-length'] = str(len(content)) + self.start_response(status, headers=headers) + if self._stream_state == 1: + self.content = [',\r\n', content] + else: + self.content = [content] + self.finish_response() + + def start_stream(self): + "start stream (array) as part of the response." + assert self._started and self._no_initial_obj + self._stream_state = 0 + self._write("[") + + def stream_entry(self, entry): + "send stream entry as part of the response." + assert self._stream_state != -1 + if self._stream_state == 0: + self._stream_state = 1 + self._write('\r\n') + else: + self._write(',\r\n') + if type(entry) == dict: + entry = json.dumps(entry) + self._write(entry) + + def end_stream(self): + "end stream (array)." + assert self._stream_state != -1 + self._write("\r\n]\r\n") + + +class HTTPInvocationByMethodWithBody(object): + """Invoke methods on a resource.""" + + def __init__(self, resource, environ, parameters): + self.resource = resource + self.environ = environ + self.max_request_size = getattr( + resource, 'max_request_size', parameters.max_request_size) + self.max_entry_size = getattr( + resource, 'max_entry_size', parameters.max_entry_size) + + def _lookup(self, method): + try: + return getattr(self.resource, method) + except AttributeError: + raise BadRequest() + + def __call__(self): + args = urlparse.parse_qsl(self.environ['QUERY_STRING'], + strict_parsing=False) + try: + args = dict( + (k.decode('utf-8'), v.decode('utf-8')) for k, v in args) + except ValueError: + raise BadRequest() + method = self.environ['REQUEST_METHOD'].lower() + if method in ('get', 'delete'): + meth = self._lookup(method) + return meth(args, None) + else: + # we expect content-length > 0, reconsider if we move + # to support chunked enconding + try: + content_length = int(self.environ['CONTENT_LENGTH']) + except (ValueError, KeyError): + raise BadRequest + if content_length <= 0: + raise BadRequest + if content_length > self.max_request_size: + raise BadRequest + reader = _FencedReader(self.environ['wsgi.input'], content_length, + self.max_entry_size) + content_type = self.environ.get('CONTENT_TYPE', '') + content_type = content_type.split(';', 1)[0].strip() + if content_type == 'application/json': + meth = self._lookup(method) + body = reader.read_chunk(sys.maxint) + return meth(args, body) + elif content_type == 'application/x-u1db-sync-stream': + meth_args = self._lookup('%s_args' % method) + meth_entry = self._lookup('%s_stream_entry' % method) + meth_end = self._lookup('%s_end' % method) + body_getline = reader.getline + if body_getline().strip() != '[': + raise BadRequest() + line = body_getline() + line, comma = utils.check_and_strip_comma(line.strip()) + meth_args(args, line) + while True: + line = body_getline() + entry = line.strip() + if entry == ']': + break + if not entry or not comma: # empty or no prec comma + raise BadRequest + entry, comma = utils.check_and_strip_comma(entry) + meth_entry({}, entry) + if comma or body_getline(): # extra comma or data + raise BadRequest + return meth_end() + else: + raise BadRequest() + + +class HTTPApp(object): + + # maximum allowed request body size + max_request_size = 15 * 1024 * 1024 # 15Mb + # maximum allowed entry/line size in request body + max_entry_size = 10 * 1024 * 1024 # 10Mb + + def __init__(self, state): + self.state = state + + def _lookup_resource(self, environ, responder): + resource_cls, params = url_to_resource.match(environ['PATH_INFO']) + if resource_cls is None: + raise BadRequest # 404 instead? + resource = resource_cls( + state=self.state, responder=responder, **params) + return resource + + def __call__(self, environ, start_response): + responder = HTTPResponder(start_response) + self.request_begin(environ) + try: + resource = self._lookup_resource(environ, responder) + HTTPInvocationByMethodWithBody(resource, environ, self)() + except errors.U1DBError as e: + self.request_u1db_error(environ, e) + status = http_errors.wire_description_to_status.get( + e.wire_description, 500) + responder.send_response_json(status, error=e.wire_description) + except BadRequest: + self.request_bad_request(environ) + responder.send_response_json(400, error="bad request") + except KeyboardInterrupt: + raise + except: + self.request_failed(environ) + raise + else: + self.request_done(environ) + return responder.content + + # hooks for tracing requests + + def request_begin(self, environ): + """Hook called at the beginning of processing a request.""" + pass + + def request_done(self, environ): + """Hook called when done processing a request.""" + pass + + def request_u1db_error(self, environ, exc): + """Hook called when processing a request resulted in a U1DBError. + + U1DBError passed as exc. + """ + pass + + def request_bad_request(self, environ): + """Hook called when processing a bad request. + + No actual processing was done. + """ + pass + + def request_failed(self, environ): + """Hook called when processing a request failed unexpectedly. + + Invoked from an except block, so there's interpreter exception + information available. + """ + pass diff --git a/src/leap/soledad/common/l2db/remote/http_client.py b/src/leap/soledad/common/l2db/remote/http_client.py new file mode 100644 index 00000000..1124b038 --- /dev/null +++ b/src/leap/soledad/common/l2db/remote/http_client.py @@ -0,0 +1,178 @@ +# Copyright 2011-2012 Canonical Ltd. +# +# This file is part of u1db. +# +# u1db is free software: you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License version 3 +# as published by the Free Software Foundation. +# +# u1db is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with u1db. If not, see . + +"""Base class to make requests to a remote HTTP server.""" + +import json +import socket +import ssl +import sys +import urllib +import six.moves.urllib.parse as urlparse +import six.moves.http_client as httplib +from time import sleep +from leap.soledad.common.l2db import errors +from leap.soledad.common.l2db.remote import http_errors + +from leap.soledad.common.l2db.remote.ssl_match_hostname import match_hostname + +# Ubuntu/debian +# XXX other... +CA_CERTS = "/etc/ssl/certs/ca-certificates.crt" + + +def _encode_query_parameter(value): + """Encode query parameter.""" + if isinstance(value, bool): + if value: + value = 'true' + else: + value = 'false' + return unicode(value).encode('utf-8') + + +class _VerifiedHTTPSConnection(httplib.HTTPSConnection): + """HTTPSConnection verifying server side certificates.""" + # derived from httplib.py + + def connect(self): + "Connect to a host on a given (SSL) port." + + sock = socket.create_connection((self.host, self.port), + self.timeout, self.source_address) + if self._tunnel_host: + self.sock = sock + self._tunnel() + if sys.platform.startswith('linux'): + cert_opts = { + 'cert_reqs': ssl.CERT_REQUIRED, + 'ca_certs': CA_CERTS + } + else: + # XXX no cert verification implemented elsewhere for now + cert_opts = {} + self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file, + ssl_version=ssl.PROTOCOL_SSLv3, + **cert_opts + ) + if cert_opts: + match_hostname(self.sock.getpeercert(), self.host) + + +class HTTPClientBase(object): + """Base class to make requests to a remote HTTP server.""" + + # Will use these delays to retry on 503 befor finally giving up. The final + # 0 is there to not wait after the final try fails. + _delays = (1, 1, 2, 4, 0) + + def __init__(self, url, creds=None): + self._url = urlparse.urlsplit(url) + self._conn = None + self._creds = {} + if creds is not None: + if len(creds) != 1: + raise errors.UnknownAuthMethod() + auth_meth, credentials = creds.items()[0] + try: + set_creds = getattr(self, 'set_%s_credentials' % auth_meth) + except AttributeError: + raise errors.UnknownAuthMethod(auth_meth) + set_creds(**credentials) + + def _ensure_connection(self): + if self._conn is not None: + return + if self._url.scheme == 'https': + connClass = _VerifiedHTTPSConnection + else: + connClass = httplib.HTTPConnection + self._conn = connClass(self._url.hostname, self._url.port) + + def close(self): + if self._conn: + self._conn.close() + self._conn = None + + # xxx retry mechanism? + + def _error(self, respdic): + descr = respdic.get("error") + exc_cls = errors.wire_description_to_exc.get(descr) + if exc_cls is not None: + message = respdic.get("message") + raise exc_cls(message) + + def _response(self): + resp = self._conn.getresponse() + body = resp.read() + headers = dict(resp.getheaders()) + if resp.status in (200, 201): + return body, headers + elif resp.status in http_errors.ERROR_STATUSES: + try: + respdic = json.loads(body) + except ValueError: + pass + else: + self._error(respdic) + # special case + if resp.status == 503: + raise errors.Unavailable(body, headers) + raise errors.HTTPError(resp.status, body, headers) + + def _sign_request(self, method, url_query, params): + raise NotImplementedError + + def _request(self, method, url_parts, params=None, body=None, + content_type=None): + self._ensure_connection() + unquoted_url = url_query = self._url.path + if url_parts: + if not url_query.endswith('/'): + url_query += '/' + unquoted_url = url_query + url_query += '/'.join(urllib.quote(part, safe='') + for part in url_parts) + # oauth performs its own quoting + unquoted_url += '/'.join(url_parts) + encoded_params = {} + if params: + for key, value in params.items(): + key = unicode(key).encode('utf-8') + encoded_params[key] = _encode_query_parameter(value) + url_query += ('?' + urllib.urlencode(encoded_params)) + if body is not None and not isinstance(body, basestring): + body = json.dumps(body) + content_type = 'application/json' + headers = {} + if content_type: + headers['content-type'] = content_type + headers.update( + self._sign_request(method, unquoted_url, encoded_params)) + for delay in self._delays: + try: + self._conn.request(method, url_query, body, headers) + return self._response() + except errors.Unavailable as e: + sleep(delay) + raise e + + def _request_json(self, method, url_parts, params=None, body=None, + content_type=None): + res, headers = self._request(method, url_parts, params, body, + content_type) + return json.loads(res), headers diff --git a/src/leap/soledad/common/l2db/remote/http_database.py b/src/leap/soledad/common/l2db/remote/http_database.py new file mode 100644 index 00000000..7e61e5a4 --- /dev/null +++ b/src/leap/soledad/common/l2db/remote/http_database.py @@ -0,0 +1,158 @@ +# Copyright 2011 Canonical Ltd. +# +# This file is part of u1db. +# +# u1db is free software: you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License version 3 +# as published by the Free Software Foundation. +# +# u1db is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with u1db. If not, see . + +"""HTTPDatabase to access a remote db over the HTTP API.""" + +import json +import uuid + +from leap.soledad.common.l2db import ( + Database, + Document, + errors) +from leap.soledad.common.l2db.remote import ( + http_client, + http_errors, + http_target) + + +DOCUMENT_DELETED_STATUS = http_errors.wire_description_to_status[ + errors.DOCUMENT_DELETED] + + +class HTTPDatabase(http_client.HTTPClientBase, Database): + """Implement the Database API to a remote HTTP server.""" + + def __init__(self, url, document_factory=None, creds=None): + super(HTTPDatabase, self).__init__(url, creds=creds) + self._factory = document_factory or Document + + def set_document_factory(self, factory): + self._factory = factory + + @staticmethod + def open_database(url, create): + db = HTTPDatabase(url) + db.open(create) + return db + + @staticmethod + def delete_database(url): + db = HTTPDatabase(url) + db._delete() + db.close() + + def open(self, create): + if create: + self._ensure() + else: + self._check() + + def _check(self): + return self._request_json('GET', [])[0] + + def _ensure(self): + self._request_json('PUT', [], {}, {}) + + def _delete(self): + self._request_json('DELETE', [], {}, {}) + + def put_doc(self, doc): + if doc.doc_id is None: + raise errors.InvalidDocId() + params = {} + if doc.rev is not None: + params['old_rev'] = doc.rev + res, headers = self._request_json('PUT', ['doc', doc.doc_id], params, + doc.get_json(), 'application/json') + doc.rev = res['rev'] + return res['rev'] + + def get_doc(self, doc_id, include_deleted=False): + try: + res, headers = self._request( + 'GET', ['doc', doc_id], {"include_deleted": include_deleted}) + except errors.DocumentDoesNotExist: + return None + except errors.HTTPError as e: + if (e.status == DOCUMENT_DELETED_STATUS and + 'x-u1db-rev' in e.headers): + res = None + headers = e.headers + else: + raise + doc_rev = headers['x-u1db-rev'] + has_conflicts = json.loads(headers['x-u1db-has-conflicts']) + doc = self._factory(doc_id, doc_rev, res) + doc.has_conflicts = has_conflicts + return doc + + def _build_docs(self, res): + for doc_dict in json.loads(res): + doc = self._factory( + doc_dict['doc_id'], doc_dict['doc_rev'], doc_dict['content']) + doc.has_conflicts = doc_dict['has_conflicts'] + yield doc + + def get_docs(self, doc_ids, check_for_conflicts=True, + include_deleted=False): + if not doc_ids: + return [] + doc_ids = ','.join(doc_ids) + res, headers = self._request( + 'GET', ['docs'], { + "doc_ids": doc_ids, "include_deleted": include_deleted, + "check_for_conflicts": check_for_conflicts}) + return self._build_docs(res) + + def get_all_docs(self, include_deleted=False): + res, headers = self._request( + 'GET', ['all-docs'], {"include_deleted": include_deleted}) + gen = -1 + if 'x-u1db-generation' in headers: + gen = int(headers['x-u1db-generation']) + return gen, list(self._build_docs(res)) + + def _allocate_doc_id(self): + return 'D-%s' % (uuid.uuid4().hex,) + + def create_doc(self, content, doc_id=None): + if not isinstance(content, dict): + raise errors.InvalidContent + json_string = json.dumps(content) + return self.create_doc_from_json(json_string, doc_id) + + def create_doc_from_json(self, content, doc_id=None): + if doc_id is None: + doc_id = self._allocate_doc_id() + res, headers = self._request_json('PUT', ['doc', doc_id], {}, + content, 'application/json') + new_doc = self._factory(doc_id, res['rev'], content) + return new_doc + + def delete_doc(self, doc): + if doc.doc_id is None: + raise errors.InvalidDocId() + params = {'old_rev': doc.rev} + res, headers = self._request_json( + 'DELETE', ['doc', doc.doc_id], params) + doc.make_tombstone() + doc.rev = res['rev'] + + def get_sync_target(self): + st = http_target.HTTPSyncTarget(self._url.geturl()) + st._creds = self._creds + return st diff --git a/src/leap/soledad/common/l2db/remote/http_errors.py b/src/leap/soledad/common/l2db/remote/http_errors.py new file mode 100644 index 00000000..ee4cfefa --- /dev/null +++ b/src/leap/soledad/common/l2db/remote/http_errors.py @@ -0,0 +1,48 @@ +# Copyright 2011-2012 Canonical Ltd. +# Copyright 2016 LEAP Encryption Access Project +# +# This file is part of u1db. +# +# u1db is free software: you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License version 3 +# as published by the Free Software Foundation. +# +# u1db is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with u1db. If not, see . + +""" +Information about the encoding of errors over HTTP. +""" + +from leap.soledad.common.l2db import errors + + +# error wire descriptions mapping to HTTP status codes +wire_description_to_status = dict([ + (errors.InvalidDocId.wire_description, 400), + (errors.MissingDocIds.wire_description, 400), + (errors.Unauthorized.wire_description, 401), + (errors.DocumentTooBig.wire_description, 403), + (errors.UserQuotaExceeded.wire_description, 403), + (errors.SubscriptionNeeded.wire_description, 403), + (errors.DatabaseDoesNotExist.wire_description, 404), + (errors.DocumentDoesNotExist.wire_description, 404), + (errors.DocumentAlreadyDeleted.wire_description, 404), + (errors.RevisionConflict.wire_description, 409), + (errors.InvalidGeneration.wire_description, 409), + (errors.InvalidReplicaUID.wire_description, 409), + (errors.InvalidTransactionId.wire_description, 409), + (errors.Unavailable.wire_description, 503), + # without matching exception + (errors.DOCUMENT_DELETED, 404) +]) + + +ERROR_STATUSES = set(wire_description_to_status.values()) +# 400 included explicitly for tests +ERROR_STATUSES.add(400) diff --git a/src/leap/soledad/common/l2db/remote/http_target.py b/src/leap/soledad/common/l2db/remote/http_target.py new file mode 100644 index 00000000..38804f01 --- /dev/null +++ b/src/leap/soledad/common/l2db/remote/http_target.py @@ -0,0 +1,125 @@ +# Copyright 2011-2012 Canonical Ltd. +# +# This file is part of u1db. +# +# u1db is free software: you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License version 3 +# as published by the Free Software Foundation. +# +# u1db is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with u1db. If not, see . + +"""SyncTarget API implementation to a remote HTTP server.""" + +import json + +from leap.soledad.common.l2db import Document, SyncTarget +from leap.soledad.common.l2db.errors import BrokenSyncStream +from leap.soledad.common.l2db.remote import ( + http_client, utils) + + +class HTTPSyncTarget(http_client.HTTPClientBase, SyncTarget): + """Implement the SyncTarget api to a remote HTTP server.""" + + @staticmethod + def connect(url): + return HTTPSyncTarget(url) + + def get_sync_info(self, source_replica_uid): + self._ensure_connection() + res, _ = self._request_json('GET', ['sync-from', source_replica_uid]) + return (res['target_replica_uid'], res['target_replica_generation'], + res['target_replica_transaction_id'], + res['source_replica_generation'], res['source_transaction_id']) + + def record_sync_info(self, source_replica_uid, source_replica_generation, + source_transaction_id): + self._ensure_connection() + if self._trace_hook: # for tests + self._trace_hook('record_sync_info') + self._request_json('PUT', ['sync-from', source_replica_uid], {}, + {'generation': source_replica_generation, + 'transaction_id': source_transaction_id}) + + def _parse_sync_stream(self, data, return_doc_cb, ensure_callback=None): + parts = data.splitlines() # one at a time + if not parts or parts[0] != '[': + raise BrokenSyncStream + data = parts[1:-1] + comma = False + if data: + line, comma = utils.check_and_strip_comma(data[0]) + res = json.loads(line) + if ensure_callback and 'replica_uid' in res: + ensure_callback(res['replica_uid']) + for entry in data[1:]: + if not comma: # missing in between comma + raise BrokenSyncStream + line, comma = utils.check_and_strip_comma(entry) + entry = json.loads(line) + doc = Document(entry['id'], entry['rev'], entry['content']) + return_doc_cb(doc, entry['gen'], entry['trans_id']) + if parts[-1] != ']': + try: + partdic = json.loads(parts[-1]) + except ValueError: + pass + else: + if isinstance(partdic, dict): + self._error(partdic) + raise BrokenSyncStream + if not data or comma: # no entries or bad extra comma + raise BrokenSyncStream + return res + + def sync_exchange(self, docs_by_generations, source_replica_uid, + last_known_generation, last_known_trans_id, + return_doc_cb, ensure_callback=None): + self._ensure_connection() + if self._trace_hook: # for tests + self._trace_hook('sync_exchange') + url = '%s/sync-from/%s' % (self._url.path, source_replica_uid) + self._conn.putrequest('POST', url) + self._conn.putheader('content-type', 'application/x-u1db-sync-stream') + for header_name, header_value in self._sign_request('POST', url, {}): + self._conn.putheader(header_name, header_value) + entries = ['['] + size = 1 + + def prepare(**dic): + entry = comma + '\r\n' + json.dumps(dic) + entries.append(entry) + return len(entry) + + comma = '' + size += prepare( + last_known_generation=last_known_generation, + last_known_trans_id=last_known_trans_id, + ensure=ensure_callback is not None) + comma = ',' + for doc, gen, trans_id in docs_by_generations: + size += prepare(id=doc.doc_id, rev=doc.rev, content=doc.get_json(), + gen=gen, trans_id=trans_id) + entries.append('\r\n]') + size += len(entries[-1]) + self._conn.putheader('content-length', str(size)) + self._conn.endheaders() + for entry in entries: + self._conn.send(entry) + entries = None + data, _ = self._response() + res = self._parse_sync_stream(data, return_doc_cb, ensure_callback) + data = None + return res['new_generation'], res['new_transaction_id'] + + # for tests + _trace_hook = None + + def _set_trace_hook_shallow(self, cb): + self._trace_hook = cb diff --git a/src/leap/soledad/common/l2db/remote/server_state.py b/src/leap/soledad/common/l2db/remote/server_state.py new file mode 100644 index 00000000..d4c3c45f --- /dev/null +++ b/src/leap/soledad/common/l2db/remote/server_state.py @@ -0,0 +1,68 @@ +# Copyright 2011 Canonical Ltd. +# +# This file is part of u1db. +# +# u1db is free software: you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License version 3 +# as published by the Free Software Foundation. +# +# u1db is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with u1db. If not, see . + +"""State for servers exposing a set of U1DB databases.""" + + +class ServerState(object): + """Passed to a Request when it is instantiated. + + This is used to track server-side state, such as working-directory, open + databases, etc. + """ + + def __init__(self): + self._workingdir = None + + def set_workingdir(self, path): + self._workingdir = path + + def global_info(self): + """Return global information about the server.""" + return {} + + def _relpath(self, relpath): + # Note: We don't want to allow absolute paths here, because we + # don't want to expose the filesystem. We should also check that + # relpath doesn't have '..' in it, etc. + return self._workingdir + '/' + relpath + + def open_database(self, path): + """Open a database at the given location.""" + from leap.soledad.client._db import sqlite + full_path = self._relpath(path) + return sqlite.SQLiteDatabase.open_database(full_path, create=False) + + def check_database(self, path): + """Check if the database at the given location exists. + + Simply returns if it does or raises DatabaseDoesNotExist. + """ + db = self.open_database(path) + db.close() + + def ensure_database(self, path): + """Ensure database at the given location.""" + from leap.soledad.client._db import sqlite + full_path = self._relpath(path) + db = sqlite.SQLiteDatabase.open_database(full_path, create=True) + return db, db._replica_uid + + def delete_database(self, path): + """Delete database at the given location.""" + from leap.soledad.client._db import sqlite + full_path = self._relpath(path) + sqlite.SQLiteDatabase.delete_database(full_path) diff --git a/src/leap/soledad/common/l2db/remote/ssl_match_hostname.py b/src/leap/soledad/common/l2db/remote/ssl_match_hostname.py new file mode 100644 index 00000000..ce82f1b2 --- /dev/null +++ b/src/leap/soledad/common/l2db/remote/ssl_match_hostname.py @@ -0,0 +1,65 @@ +"""The match_hostname() function from Python 3.2, essential when using SSL.""" +# XXX put it here until it's packaged + +import re + +__version__ = '3.2a3' + + +class CertificateError(ValueError): + pass + + +def _dnsname_to_pat(dn): + pats = [] + for frag in dn.split(r'.'): + if frag == '*': + # When '*' is a fragment by itself, it matches a non-empty dotless + # fragment. + pats.append('[^.]+') + else: + # Otherwise, '*' matches any dotless fragment. + frag = re.escape(frag) + pats.append(frag.replace(r'\*', '[^.]*')) + return re.compile(r'\A' + r'\.'.join(pats) + r'\Z', re.IGNORECASE) + + +def match_hostname(cert, hostname): + """Verify that *cert* (in decoded format as returned by + SSLSocket.getpeercert()) matches the *hostname*. RFC 2818 rules + are mostly followed, but IP addresses are not accepted for *hostname*. + + CertificateError is raised on failure. On success, the function + returns nothing. + """ + if not cert: + raise ValueError("empty or no certificate") + dnsnames = [] + san = cert.get('subjectAltName', ()) + for key, value in san: + if key == 'DNS': + if _dnsname_to_pat(value).match(hostname): + return + dnsnames.append(value) + if not san: + # The subject is only checked when subjectAltName is empty + for sub in cert.get('subject', ()): + for key, value in sub: + # XXX according to RFC 2818, the most specific Common Name + # must be used. + if key == 'commonName': + if _dnsname_to_pat(value).match(hostname): + return + dnsnames.append(value) + if len(dnsnames) > 1: + raise CertificateError( + "hostname %r doesn't match either of %s" + % (hostname, ', '.join(map(repr, dnsnames)))) + elif len(dnsnames) == 1: + raise CertificateError( + "hostname %r doesn't match %r" + % (hostname, dnsnames[0])) + else: + raise CertificateError( + "no appropriate commonName or " + "subjectAltName fields were found") diff --git a/src/leap/soledad/common/l2db/remote/utils.py b/src/leap/soledad/common/l2db/remote/utils.py new file mode 100644 index 00000000..14cedea9 --- /dev/null +++ b/src/leap/soledad/common/l2db/remote/utils.py @@ -0,0 +1,23 @@ +# Copyright 2012 Canonical Ltd. +# +# This file is part of u1db. +# +# u1db is free software: you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License version 3 +# as published by the Free Software Foundation. +# +# u1db is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with u1db. If not, see . + +"""Utilities for details of the procotol.""" + + +def check_and_strip_comma(line): + if line and line[-1] == ',': + return line[:-1], True + return line, False -- cgit v1.2.3