diff options
Diffstat (limited to 'common/src/leap')
-rw-r--r-- | common/src/leap/soledad/common/backend.py | 51 | ||||
-rw-r--r-- | common/src/leap/soledad/common/couch/__init__.py | 100 | ||||
-rw-r--r-- | common/src/leap/soledad/common/couch/support.py | 115 | ||||
-rw-r--r-- | common/src/leap/soledad/common/errors.py | 3 | ||||
-rw-r--r-- | common/src/leap/soledad/common/tests/fixture_soledad.conf | 1 | ||||
-rw-r--r-- | common/src/leap/soledad/common/tests/test_encdecpool.py | 26 | ||||
-rw-r--r-- | common/src/leap/soledad/common/tests/test_server.py | 33 |
7 files changed, 277 insertions, 52 deletions
diff --git a/common/src/leap/soledad/common/backend.py b/common/src/leap/soledad/common/backend.py index deed5ac2..53426fb5 100644 --- a/common/src/leap/soledad/common/backend.py +++ b/common/src/leap/soledad/common/backend.py @@ -33,6 +33,7 @@ from leap.soledad.common.document import ServerDocument class SoledadBackend(CommonBackend): + BATCH_SUPPORT = False """ A U1DB backend implementation. @@ -53,9 +54,30 @@ class SoledadBackend(CommonBackend): self._cache = None self._dbname = database._dbname self._database = database + self.batching = False if replica_uid is not None: self._set_replica_uid(replica_uid) + def batch_start(self): + if not self.BATCH_SUPPORT: + return + self.batching = True + self.after_batch_callbacks = {} + self._database.batch_start() + if not self._cache: + # batching needs cache + self._cache = {} + self._get_generation() # warm up gen info + + def batch_end(self): + if not self.BATCH_SUPPORT: + return + self.batching = False + self._database.batch_end() + for name in self.after_batch_callbacks: + self.after_batch_callbacks[name]() + self.after_batch_callbacks = None + @property def cache(self): if self._cache is not None: @@ -154,11 +176,7 @@ class SoledadBackend(CommonBackend): :raise SoledadError: Raised by database on operation failure """ - if self.replica_uid + '_gen' in self.cache: - response = self.cache[self.replica_uid + '_gen'] - return response cur_gen, newest_trans_id = self._database.get_generation_info() - self.cache[self.replica_uid + '_gen'] = (cur_gen, newest_trans_id) return (cur_gen, newest_trans_id) def _get_trans_id_for_gen(self, generation): @@ -253,14 +271,8 @@ class SoledadBackend(CommonBackend): :param doc: The document to be put. :type doc: ServerDocument """ - last_transaction =\ - self._database.save_document(old_doc, doc, - self._allocate_transaction_id()) - if self.replica_uid + '_gen' in self.cache: - gen, trans = self.cache[self.replica_uid + '_gen'] - gen += 1 - trans = last_transaction - self.cache[self.replica_uid + '_gen'] = (gen, trans) + self._database.save_document(old_doc, doc, + self._allocate_transaction_id()) def put_doc(self, doc): """ @@ -383,7 +395,10 @@ class SoledadBackend(CommonBackend): """ if other_replica_uid in self.cache: return self.cache[other_replica_uid] - return self._database.get_replica_gen_and_trans_id(other_replica_uid) + gen, trans_id = \ + self._database.get_replica_gen_and_trans_id(other_replica_uid) + self.cache[other_replica_uid] = (gen, trans_id) + return (gen, trans_id) def _set_replica_gen_and_trans_id(self, other_replica_uid, other_generation, other_transaction_id): @@ -423,9 +438,13 @@ class SoledadBackend(CommonBackend): generation. :type other_transaction_id: str """ - self._set_replica_gen_and_trans_id(other_replica_uid, - other_generation, - other_transaction_id) + function = self._set_replica_gen_and_trans_id + args = [other_replica_uid, other_generation, other_transaction_id] + callback = lambda: function(*args) + if self.batching: + self.after_batch_callbacks['set_source_info'] = callback + else: + callback() def _force_doc_sync_conflict(self, doc): """ diff --git a/common/src/leap/soledad/common/couch/__init__.py b/common/src/leap/soledad/common/couch/__init__.py index bd8b08b7..dae460cb 100644 --- a/common/src/leap/soledad/common/couch/__init__.py +++ b/common/src/leap/soledad/common/couch/__init__.py @@ -33,7 +33,6 @@ from multiprocessing.pool import ThreadPool from couchdb.client import Server, Database -from couchdb.multipart import MultipartWriter from couchdb.http import ( ResourceConflict, ResourceNotFound, @@ -53,6 +52,7 @@ from u1db.remote import http_app from leap.soledad.common import ddocs from .errors import raise_server_error from .errors import raise_missing_design_doc_error +from .support import MultipartWriter from leap.soledad.common.errors import InvalidURLError from leap.soledad.common.document import ServerDocument from leap.soledad.common.backend import SoledadBackend @@ -153,10 +153,24 @@ class CouchDatabase(object): self._url = url self._dbname = dbname self._database = self.get_couch_database(url, dbname) + self.batching = False + self.batch_generation = None + self.batch_docs = {} if ensure_ddocs: self.ensure_ddocs_on_db() self.ensure_security_ddoc(database_security) + def batch_start(self): + self.batching = True + self.batch_generation = self.get_generation_info() + ids = set(row.id for row in self._database.view('_all_docs')) + self.batched_ids = ids + + def batch_end(self): + self.batching = False + self.batch_generation = None + self.__perform_batch() + def get_couch_database(self, url, dbname): """ Generate a couchdb.Database instance given a url and dbname. @@ -185,7 +199,8 @@ class CouchDatabase(object): """ for ddoc_name in ['docs', 'syncs', 'transactions']: try: - self.json_from_resource(['_design', ddoc_name, '_info'], + self.json_from_resource(['_design'] + + ddoc_name.split('/') + ['_info'], check_missing_ddoc=False) except ResourceNotFound: ddoc = json.loads( @@ -337,13 +352,49 @@ class CouchDatabase(object): :return: The document. :rtype: ServerDocument """ - # get document with all attachments (u1db content and eventual - # conflicts) + doc_from_batch = self.__check_batch_before_get(doc_id) + if doc_from_batch: + return doc_from_batch + if self.batching and doc_id not in self.batched_ids: + return None if doc_id not in self._database: return None + # get document with all attachments (u1db content and eventual + # conflicts) result = self.json_from_resource([doc_id], attachments=True) return self.__parse_doc_from_couch(result, doc_id, check_for_conflicts) + def __check_batch_before_get(self, doc_id): + """ + If doc_id is staged for batching, then we need to commit the batch + before going ahead. This avoids consistency problems, like trying to + get a document that isn't persisted and processing like it is missing. + + :param doc_id: The unique document identifier + :type doc_id: str + """ + if doc_id in self.batch_docs: + couch_doc = self.batch_docs[doc_id] + rev = self.__perform_batch(doc_id) + couch_doc['_rev'] = rev + self.batched_ids.add(doc_id) + return self.__parse_doc_from_couch(couch_doc, doc_id, True) + return None + + def __perform_batch(self, doc_id=None): + status = self._database.update(self.batch_docs.values()) + rev = None + for ok, stored_doc_id, rev_or_error in status: + if not ok: + error = rev_or_error + if type(error) is ResourceConflict: + raise RevisionConflict + raise error + elif doc_id == stored_doc_id: + rev = rev_or_error + self.batch_docs.clear() + return rev + def __parse_doc_from_couch(self, result, doc_id, check_for_conflicts=False): # restrict to u1db documents @@ -571,6 +622,8 @@ class CouchDatabase(object): :return: A tuple containing the current generation and transaction id. :rtype: (int, str) """ + if self.batching and self.batch_generation: + return self.batch_generation # query a couch list function ddoc_path = ['_design', 'transactions', '_list', 'generation', 'log'] info = self.json_from_resource(ddoc_path) @@ -691,20 +744,31 @@ class CouchDatabase(object): if old_doc is not None and hasattr(old_doc, 'couch_rev'): couch_doc['_rev'] = old_doc.couch_rev # prepare the multipart PUT - buf = StringIO() - headers = {} - envelope = MultipartWriter(buf, headers=headers, subtype='related') - envelope.add('application/json', json.dumps(couch_doc)) - for part in parts: - envelope.add('application/octet-stream', part) - envelope.close() - # try to save and fail if there's a revision conflict - try: - resource = self._new_resource() - resource.put_json( - doc.doc_id, body=str(buf.getvalue()), headers=headers) - except ResourceConflict: - raise RevisionConflict() + if not self.batching: + buf = StringIO() + envelope = MultipartWriter(buf) + envelope.add('application/json', json.dumps(couch_doc)) + for part in parts: + envelope.add('application/octet-stream', part) + envelope.close() + # try to save and fail if there's a revision conflict + try: + resource = self._new_resource() + resource.put_json( + doc.doc_id, body=str(buf.getvalue()), + headers=envelope.headers) + except ResourceConflict: + raise RevisionConflict() + else: + for name, attachment in attachments.items(): + del attachment['follows'] + del attachment['length'] + index = 0 if name is 'u1db_content' else 1 + attachment['data'] = binascii.b2a_base64(parts[index]).strip() + couch_doc['_attachments'] = attachments + self.batch_docs[doc.doc_id] = couch_doc + last_gen, last_trans_id = self.batch_generation + self.batch_generation = (last_gen + 1, transaction_id) return transactions[-1][1] def _new_resource(self, *path): diff --git a/common/src/leap/soledad/common/couch/support.py b/common/src/leap/soledad/common/couch/support.py new file mode 100644 index 00000000..bfc4fef6 --- /dev/null +++ b/common/src/leap/soledad/common/couch/support.py @@ -0,0 +1,115 @@ +# -*- coding: utf-8 -*- +# support.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +import sys + + +""" +Monkey patches and temporary code that may be removed with version changes. +""" + + +# for bigcouch +# TODO: Remove if bigcouch support is dropped +class MultipartWriter(object): + + """ + A multipart writer adapted from python-couchdb's one so we can PUT + documents using couch's multipart PUT. + + This stripped down version does not allow for nested structures, and + contains only the essential things we need to PUT SoledadDocuments to the + couch backend. Also, please note that this is a patch. The couchdb lib has + another implementation that works fine with CouchDB 1.6, but removing this + now will break compatibility with bigcouch. + """ + + CRLF = '\r\n' + + def __init__(self, fileobj, headers=None, boundary=None): + """ + Initialize the multipart writer. + """ + self.fileobj = fileobj + if boundary is None: + boundary = self._make_boundary() + self._boundary = boundary + self._build_headers('related', headers) + + def add(self, mimetype, content, headers={}): + """ + Add a part to the multipart stream. + """ + self.fileobj.write('--') + self.fileobj.write(self._boundary) + self.fileobj.write(self.CRLF) + headers['Content-Type'] = mimetype + self._write_headers(headers) + if content: + # XXX: throw an exception if a boundary appears in the content?? + self.fileobj.write(content) + self.fileobj.write(self.CRLF) + + def close(self): + """ + Close the multipart stream. + """ + self.fileobj.write('--') + self.fileobj.write(self._boundary) + # be careful not to have anything after '--', otherwise old couch + # versions (including bigcouch) will fail. + self.fileobj.write('--') + + def _make_boundary(self): + """ + Create a boundary to discern multi parts. + """ + try: + from uuid import uuid4 + return '==' + uuid4().hex + '==' + except ImportError: + from random import randrange + token = randrange(sys.maxint) + format = '%%0%dd' % len(repr(sys.maxint - 1)) + return '===============' + (format % token) + '==' + + def _write_headers(self, headers): + """ + Write a part header in the buffer stream. + """ + if headers: + for name in sorted(headers.keys()): + value = headers[name] + self.fileobj.write(name) + self.fileobj.write(': ') + self.fileobj.write(value) + self.fileobj.write(self.CRLF) + self.fileobj.write(self.CRLF) + + def _build_headers(self, subtype, headers): + """ + Build the main headers of the multipart stream. + + This is here so we can send headers separete from content using + python-couchdb API. + """ + self.headers = {} + self.headers['Content-Type'] = 'multipart/%s; boundary="%s"' % \ + (subtype, self._boundary) + if headers: + for name in sorted(headers.keys()): + value = headers[name] + self.headers[name] = value diff --git a/common/src/leap/soledad/common/errors.py b/common/src/leap/soledad/common/errors.py index 7a8a8929..0b6bb4e6 100644 --- a/common/src/leap/soledad/common/errors.py +++ b/common/src/leap/soledad/common/errors.py @@ -145,9 +145,10 @@ class InvalidURLError(Exception): """ -@register_exception class BackendNotReadyError(SoledadError): """ Generic exception raised when the backend is not ready to dispatch a client request. """ + wire_description = "backend not ready" + status = 500 diff --git a/common/src/leap/soledad/common/tests/fixture_soledad.conf b/common/src/leap/soledad/common/tests/fixture_soledad.conf index c0ffacf6..8d8161c3 100644 --- a/common/src/leap/soledad/common/tests/fixture_soledad.conf +++ b/common/src/leap/soledad/common/tests/fixture_soledad.conf @@ -2,6 +2,7 @@ couch_url = http://soledad:passwd@localhost:5984 create_cmd = sudo -u soledad-admin /usr/bin/create-user-db admin_netrc = /etc/couchdb/couchdb-soledad-admin.netrc +batching = 0 [database-security] members = user1, user2 diff --git a/common/src/leap/soledad/common/tests/test_encdecpool.py b/common/src/leap/soledad/common/tests/test_encdecpool.py index 793bfa1a..694eb7ad 100644 --- a/common/src/leap/soledad/common/tests/test_encdecpool.py +++ b/common/src/leap/soledad/common/tests/test_encdecpool.py @@ -18,6 +18,7 @@ Tests for encryption and decryption pool. """ import json +from random import shuffle from twisted.internet.defer import inlineCallbacks @@ -171,20 +172,21 @@ class TestSyncDecrypterPool(BaseSoledadTest): DOC_ID, DOC_REV, encrypted_content, 1, "trans_id", 1) def _assert_doc_was_decrypted_and_inserted(_): + self.assertEqual(1, len(self._inserted_docs)) self.assertEqual(self._inserted_docs, [(doc, 1, u"trans_id")]) self._pool.deferred.addCallback( _assert_doc_was_decrypted_and_inserted) return self._pool.deferred - def test_insert_encrypted_received_doc_many(self): + def test_insert_encrypted_received_doc_many(self, many=100): """ Test that many encrypted documents added to the pool are decrypted and inserted using the callback. """ crypto = self._soledad._crypto - many = 100 self._pool.start(many) + docs = [] # insert many encrypted docs in the pool for i in xrange(many): @@ -198,9 +200,12 @@ class TestSyncDecrypterPool(BaseSoledadTest): doc_id=doc_id, rev=rev, json=json.dumps(content)) encrypted_content = json.loads(crypto.encrypt_doc(doc)) + docs.append((doc_id, rev, encrypted_content, gen, + trans_id, idx)) + shuffle(docs) - self._pool.insert_encrypted_received_doc( - doc_id, rev, encrypted_content, gen, trans_id, idx) + for doc in docs: + self._pool.insert_encrypted_received_doc(*doc) def _assert_docs_were_decrypted_and_inserted(_): self.assertEqual(many, len(self._inserted_docs)) @@ -223,3 +228,16 @@ class TestSyncDecrypterPool(BaseSoledadTest): self._pool.deferred.addCallback( _assert_docs_were_decrypted_and_inserted) return self._pool.deferred + + @inlineCallbacks + def test_pool_reuse(self): + """ + The pool is reused between syncs, this test verifies that + reusing is fine. + """ + for i in xrange(3): + yield self.test_insert_encrypted_received_doc_many(5) + self._inserted_docs = [] + decrypted_docs = yield self._pool._get_docs(encrypted=False) + # check that decrypted docs staging is clean + self.assertEquals([], decrypted_docs) diff --git a/common/src/leap/soledad/common/tests/test_server.py b/common/src/leap/soledad/common/tests/test_server.py index e1129a9f..20fe8579 100644 --- a/common/src/leap/soledad/common/tests/test_server.py +++ b/common/src/leap/soledad/common/tests/test_server.py @@ -640,18 +640,25 @@ class ConfigurationParsingTest(unittest.TestCase): config = load_configuration(config_path) # then - expected = {'soledad-server': { - 'couch_url': - 'http://soledad:passwd@localhost:5984', - 'create_cmd': - 'sudo -u soledad-admin /usr/bin/create-user-db', - 'admin_netrc': - '/etc/couchdb/couchdb-soledad-admin.netrc', - }, - 'database-security': { - 'members': ['user1', 'user2'], + expected = {'members': ['user1', 'user2'], 'members_roles': ['role1', 'role2'], 'admins': ['user3', 'user4'], - 'admins_roles': ['role3', 'role3'], - }} - self.assertDictEqual(expected, config) + 'admins_roles': ['role3', 'role3']} + self.assertDictEqual(expected, config['database-security']) + + def test_server_values_configuration(self): + # given + config_path = resource_filename('leap.soledad.common.tests', + 'fixture_soledad.conf') + # when + config = load_configuration(config_path) + + # then + expected = {'couch_url': + 'http://soledad:passwd@localhost:5984', + 'create_cmd': + 'sudo -u soledad-admin /usr/bin/create-user-db', + 'admin_netrc': + '/etc/couchdb/couchdb-soledad-admin.netrc', + 'batching': False} + self.assertDictEqual(expected, config['soledad-server']) |