From 34731d6208de517be3fa387e57fb90efb265b554 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Fri, 20 Nov 2015 15:20:33 -0400 Subject: [bug] do not register exception, breaks since the exception doesn't have the code and description, it breaks. we don't need those since the couch child exception describes them. --- common/src/leap/soledad/common/errors.py | 1 - 1 file changed, 1 deletion(-) diff --git a/common/src/leap/soledad/common/errors.py b/common/src/leap/soledad/common/errors.py index 7a8a8929..f783404c 100644 --- a/common/src/leap/soledad/common/errors.py +++ b/common/src/leap/soledad/common/errors.py @@ -145,7 +145,6 @@ class InvalidURLError(Exception): """ -@register_exception class BackendNotReadyError(SoledadError): """ Generic exception raised when the backend is not ready to dispatch a client -- cgit v1.2.3 From 086f9fad3ebb1bdcc0ec3fa6161f801068a0c641 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 17 Nov 2015 16:58:15 -0300 Subject: [bug] put a monkeypatch back for bigcouch Current code was tested on couch 1.6 and a monkeypatch got removed during refactor. This commit re-adds it, but in a separate module that is intended to hold temporary code for compatibility that can be removed on version upgrades. --- common/src/leap/soledad/common/couch/__init__.py | 8 +- common/src/leap/soledad/common/couch/support.py | 115 +++++++++++++++++++++++ 2 files changed, 119 insertions(+), 4 deletions(-) create mode 100644 common/src/leap/soledad/common/couch/support.py diff --git a/common/src/leap/soledad/common/couch/__init__.py b/common/src/leap/soledad/common/couch/__init__.py index bd8b08b7..fb3d57af 100644 --- a/common/src/leap/soledad/common/couch/__init__.py +++ b/common/src/leap/soledad/common/couch/__init__.py @@ -33,7 +33,6 @@ from multiprocessing.pool import ThreadPool from couchdb.client import Server, Database -from couchdb.multipart import MultipartWriter from couchdb.http import ( ResourceConflict, ResourceNotFound, @@ -53,6 +52,7 @@ from u1db.remote import http_app from leap.soledad.common import ddocs from .errors import raise_server_error from .errors import raise_missing_design_doc_error +from .support import MultipartWriter from leap.soledad.common.errors import InvalidURLError from leap.soledad.common.document import ServerDocument from leap.soledad.common.backend import SoledadBackend @@ -692,8 +692,7 @@ class CouchDatabase(object): couch_doc['_rev'] = old_doc.couch_rev # prepare the multipart PUT buf = StringIO() - headers = {} - envelope = MultipartWriter(buf, headers=headers, subtype='related') + envelope = MultipartWriter(buf) envelope.add('application/json', json.dumps(couch_doc)) for part in parts: envelope.add('application/octet-stream', part) @@ -702,7 +701,8 @@ class CouchDatabase(object): try: resource = self._new_resource() resource.put_json( - doc.doc_id, body=str(buf.getvalue()), headers=headers) + doc.doc_id, body=str(buf.getvalue()), + headers=envelope.headers) except ResourceConflict: raise RevisionConflict() return transactions[-1][1] diff --git a/common/src/leap/soledad/common/couch/support.py b/common/src/leap/soledad/common/couch/support.py new file mode 100644 index 00000000..bfc4fef6 --- /dev/null +++ b/common/src/leap/soledad/common/couch/support.py @@ -0,0 +1,115 @@ +# -*- coding: utf-8 -*- +# support.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +import sys + + +""" +Monkey patches and temporary code that may be removed with version changes. +""" + + +# for bigcouch +# TODO: Remove if bigcouch support is dropped +class MultipartWriter(object): + + """ + A multipart writer adapted from python-couchdb's one so we can PUT + documents using couch's multipart PUT. + + This stripped down version does not allow for nested structures, and + contains only the essential things we need to PUT SoledadDocuments to the + couch backend. Also, please note that this is a patch. The couchdb lib has + another implementation that works fine with CouchDB 1.6, but removing this + now will break compatibility with bigcouch. + """ + + CRLF = '\r\n' + + def __init__(self, fileobj, headers=None, boundary=None): + """ + Initialize the multipart writer. + """ + self.fileobj = fileobj + if boundary is None: + boundary = self._make_boundary() + self._boundary = boundary + self._build_headers('related', headers) + + def add(self, mimetype, content, headers={}): + """ + Add a part to the multipart stream. + """ + self.fileobj.write('--') + self.fileobj.write(self._boundary) + self.fileobj.write(self.CRLF) + headers['Content-Type'] = mimetype + self._write_headers(headers) + if content: + # XXX: throw an exception if a boundary appears in the content?? + self.fileobj.write(content) + self.fileobj.write(self.CRLF) + + def close(self): + """ + Close the multipart stream. + """ + self.fileobj.write('--') + self.fileobj.write(self._boundary) + # be careful not to have anything after '--', otherwise old couch + # versions (including bigcouch) will fail. + self.fileobj.write('--') + + def _make_boundary(self): + """ + Create a boundary to discern multi parts. + """ + try: + from uuid import uuid4 + return '==' + uuid4().hex + '==' + except ImportError: + from random import randrange + token = randrange(sys.maxint) + format = '%%0%dd' % len(repr(sys.maxint - 1)) + return '===============' + (format % token) + '==' + + def _write_headers(self, headers): + """ + Write a part header in the buffer stream. + """ + if headers: + for name in sorted(headers.keys()): + value = headers[name] + self.fileobj.write(name) + self.fileobj.write(': ') + self.fileobj.write(value) + self.fileobj.write(self.CRLF) + self.fileobj.write(self.CRLF) + + def _build_headers(self, subtype, headers): + """ + Build the main headers of the multipart stream. + + This is here so we can send headers separete from content using + python-couchdb API. + """ + self.headers = {} + self.headers['Content-Type'] = 'multipart/%s; boundary="%s"' % \ + (subtype, self._boundary) + if headers: + for name in sorted(headers.keys()): + value = headers[name] + self.headers[name] = value -- cgit v1.2.3 From 3537351ad94dd17d0c0a57ec77489b4a540b3d23 Mon Sep 17 00:00:00 2001 From: Bruno Wagner Date: Fri, 20 Nov 2015 11:02:14 -0200 Subject: [bug] BackendNotReadyError breaks without status The new BackendNotReadyError didn't have a status or a wire description, because of that, when you tried to use the leap.soledad.server package it would break trying to import this exception (because the annotation tries to use this variable). This was preventing soledad server from starting at all, after this change it works again --- common/src/leap/soledad/common/errors.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/common/src/leap/soledad/common/errors.py b/common/src/leap/soledad/common/errors.py index f783404c..0b6bb4e6 100644 --- a/common/src/leap/soledad/common/errors.py +++ b/common/src/leap/soledad/common/errors.py @@ -150,3 +150,5 @@ class BackendNotReadyError(SoledadError): Generic exception raised when the backend is not ready to dispatch a client request. """ + wire_description = "backend not ready" + status = 500 -- cgit v1.2.3 From b44a2a9b3894e438bbb2b18da6e22f86602655af Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 24 Nov 2015 22:45:34 -0300 Subject: [bug] generation_info cant be cached per replica This info can be changed by another syncing replica and would not reflect real database generation. That would be ok inside of the same sync, but can cause trouble on concurrent syncs. The other calls are ok, since they hold info that doesnt change during concurrent syncs or are only read/write by the replica syncing. A global cache could fit better this removed case, but for now let's stay on the safe side. --- common/src/leap/soledad/common/backend.py | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/common/src/leap/soledad/common/backend.py b/common/src/leap/soledad/common/backend.py index deed5ac2..a1493adc 100644 --- a/common/src/leap/soledad/common/backend.py +++ b/common/src/leap/soledad/common/backend.py @@ -154,11 +154,7 @@ class SoledadBackend(CommonBackend): :raise SoledadError: Raised by database on operation failure """ - if self.replica_uid + '_gen' in self.cache: - response = self.cache[self.replica_uid + '_gen'] - return response cur_gen, newest_trans_id = self._database.get_generation_info() - self.cache[self.replica_uid + '_gen'] = (cur_gen, newest_trans_id) return (cur_gen, newest_trans_id) def _get_trans_id_for_gen(self, generation): @@ -253,14 +249,8 @@ class SoledadBackend(CommonBackend): :param doc: The document to be put. :type doc: ServerDocument """ - last_transaction =\ - self._database.save_document(old_doc, doc, - self._allocate_transaction_id()) - if self.replica_uid + '_gen' in self.cache: - gen, trans = self.cache[self.replica_uid + '_gen'] - gen += 1 - trans = last_transaction - self.cache[self.replica_uid + '_gen'] = (gen, trans) + self._database.save_document(old_doc, doc, + self._allocate_transaction_id()) def put_doc(self, doc): """ -- cgit v1.2.3 From 332ce9e68a34e2ddc31ef371c991565e5e38812b Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 1 Dec 2015 13:07:14 -0300 Subject: [tests] encdec pool is being reused, adding a test This new test case will run the single insert test 5 times to ensure that using the same pool again is fine. This is needed due failures to shutdown the pool or inconsistency between syncs. --- common/src/leap/soledad/common/tests/test_encdecpool.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/common/src/leap/soledad/common/tests/test_encdecpool.py b/common/src/leap/soledad/common/tests/test_encdecpool.py index 793bfa1a..27247c94 100644 --- a/common/src/leap/soledad/common/tests/test_encdecpool.py +++ b/common/src/leap/soledad/common/tests/test_encdecpool.py @@ -171,12 +171,22 @@ class TestSyncDecrypterPool(BaseSoledadTest): DOC_ID, DOC_REV, encrypted_content, 1, "trans_id", 1) def _assert_doc_was_decrypted_and_inserted(_): + self.assertEqual(1, len(self._inserted_docs)) self.assertEqual(self._inserted_docs, [(doc, 1, u"trans_id")]) self._pool.deferred.addCallback( _assert_doc_was_decrypted_and_inserted) return self._pool.deferred + @inlineCallbacks + def test_pool_reuse(self): + """ + The pool is reused between syncs, this test verifies that + reusing is fine. + """ + for _ in xrange(5): + yield self.test_insert_encrypted_received_doc() + def test_insert_encrypted_received_doc_many(self): """ Test that many encrypted documents added to the pool are decrypted and -- cgit v1.2.3 From 55f45b770a57d1c5f54a66a490aeeea7edae0184 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 1 Dec 2015 15:58:57 -0300 Subject: [bug] fire callback after reseting instance vars If we reset the vars after firing the finish callback, other thread can pick up a dirty state on due concurrency. --- client/src/leap/soledad/client/encdecpool.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py index 6d3c11b9..9333578b 100644 --- a/client/src/leap/soledad/client/encdecpool.py +++ b/client/src/leap/soledad/client/encdecpool.py @@ -807,6 +807,6 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): self._finish() def _finish(self): - self._deferred.callback(None) self._processed_docs = 0 self._last_inserted_idx = 0 + self._deferred.callback(None) -- cgit v1.2.3 From 202bdc553cc576bfbce1ba8a4c34569b1751c04d Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 1 Dec 2015 16:02:47 -0300 Subject: [test] shuffle and decrypts 5 docs 3 times On real usage the docs will arrive shuffled and pool will be reused after many decrypts. This test asserts that everything ended up clear between execution and no inconsistency is left over for the next run. --- .../leap/soledad/common/tests/test_encdecpool.py | 34 +++++++++++++--------- 1 file changed, 21 insertions(+), 13 deletions(-) diff --git a/common/src/leap/soledad/common/tests/test_encdecpool.py b/common/src/leap/soledad/common/tests/test_encdecpool.py index 27247c94..694eb7ad 100644 --- a/common/src/leap/soledad/common/tests/test_encdecpool.py +++ b/common/src/leap/soledad/common/tests/test_encdecpool.py @@ -18,6 +18,7 @@ Tests for encryption and decryption pool. """ import json +from random import shuffle from twisted.internet.defer import inlineCallbacks @@ -178,23 +179,14 @@ class TestSyncDecrypterPool(BaseSoledadTest): _assert_doc_was_decrypted_and_inserted) return self._pool.deferred - @inlineCallbacks - def test_pool_reuse(self): - """ - The pool is reused between syncs, this test verifies that - reusing is fine. - """ - for _ in xrange(5): - yield self.test_insert_encrypted_received_doc() - - def test_insert_encrypted_received_doc_many(self): + def test_insert_encrypted_received_doc_many(self, many=100): """ Test that many encrypted documents added to the pool are decrypted and inserted using the callback. """ crypto = self._soledad._crypto - many = 100 self._pool.start(many) + docs = [] # insert many encrypted docs in the pool for i in xrange(many): @@ -208,9 +200,12 @@ class TestSyncDecrypterPool(BaseSoledadTest): doc_id=doc_id, rev=rev, json=json.dumps(content)) encrypted_content = json.loads(crypto.encrypt_doc(doc)) + docs.append((doc_id, rev, encrypted_content, gen, + trans_id, idx)) + shuffle(docs) - self._pool.insert_encrypted_received_doc( - doc_id, rev, encrypted_content, gen, trans_id, idx) + for doc in docs: + self._pool.insert_encrypted_received_doc(*doc) def _assert_docs_were_decrypted_and_inserted(_): self.assertEqual(many, len(self._inserted_docs)) @@ -233,3 +228,16 @@ class TestSyncDecrypterPool(BaseSoledadTest): self._pool.deferred.addCallback( _assert_docs_were_decrypted_and_inserted) return self._pool.deferred + + @inlineCallbacks + def test_pool_reuse(self): + """ + The pool is reused between syncs, this test verifies that + reusing is fine. + """ + for i in xrange(3): + yield self.test_insert_encrypted_received_doc_many(5) + self._inserted_docs = [] + decrypted_docs = yield self._pool._get_docs(encrypted=False) + # check that decrypted docs staging is clean + self.assertEquals([], decrypted_docs) -- cgit v1.2.3 From 841e6712ff9ff1ce2b8a5fe92012d89c2aec7077 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 1 Dec 2015 16:06:26 -0300 Subject: [bug] concurrency bug while querying and inserting This line was missing an yield and without it we end up inserting a document that is being retrieved and bad things happen. This is the core fix from yesterday debugging session. During sequential syncs the pool was inserting and querying at the same time and sometimes repeating or failing to delete documents. --- client/src/leap/soledad/client/encdecpool.py | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py index 9333578b..a01d3b71 100644 --- a/client/src/leap/soledad/client/encdecpool.py +++ b/client/src/leap/soledad/client/encdecpool.py @@ -70,11 +70,13 @@ class SyncEncryptDecryptPool(object): self._started = False def start(self): + if self.running: + return self._create_pool() self._started = True def stop(self): - if not self._started: + if not self.running: return self._started = False self._destroy_pool() @@ -650,14 +652,6 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): last_idx = self._last_inserted_idx for doc_id, rev, content, gen, trans_id, encrypted, idx in \ decrypted_docs: - # XXX for some reason, a document might not have been deleted from - # the database. This is a bug. In this point, already - # processed documents should have been removed from the sync - # database and we should not have to skip them here. We need - # to find out why this is happening, fix, and remove the - # skipping below. - if (idx < last_idx + 1): - continue if (idx != last_idx + 1): break insertable.append((doc_id, rev, content, gen, trans_id, idx)) @@ -763,6 +757,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): query = "DELETE FROM %s WHERE 1" % (self.TABLE_NAME,) return self._runOperation(query) + @defer.inlineCallbacks def _collect_async_decryption_results(self): """ Collect the results of the asynchronous doc decryptions and re-raise @@ -773,7 +768,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): async_results = self._async_results[:] for res in async_results: if res.ready(): - self._decrypt_doc_cb(res.get()) # might raise an exception! + yield self._decrypt_doc_cb(res.get()) # might raise an exception! self._async_results.remove(res) @defer.inlineCallbacks @@ -796,7 +791,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): if processed < pending: yield self._async_decrypt_received_docs() - self._collect_async_decryption_results() + yield self._collect_async_decryption_results() docs = yield self._process_decrypted_docs() yield self._delete_processed_docs(docs) # recurse -- cgit v1.2.3 From 3f327c4f472f43d281e3bd7be67aaa9ce3f7d822 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 1 Dec 2015 16:38:02 -0300 Subject: [style] fix pep8 --- client/src/leap/soledad/client/encdecpool.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py index a01d3b71..0954c1df 100644 --- a/client/src/leap/soledad/client/encdecpool.py +++ b/client/src/leap/soledad/client/encdecpool.py @@ -768,7 +768,8 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): async_results = self._async_results[:] for res in async_results: if res.ready(): - yield self._decrypt_doc_cb(res.get()) # might raise an exception! + # XXX: might raise an exception! + yield self._decrypt_doc_cb(res.get()) self._async_results.remove(res) @defer.inlineCallbacks -- cgit v1.2.3 From 129db70b5237ffbc7b38d9931598629f46ce4763 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Wed, 2 Dec 2015 16:16:27 -0400 Subject: [docs] fix run-tests snippet --- README.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.rst b/README.rst index 815db808..74a49d12 100644 --- a/README.rst +++ b/README.rst @@ -51,7 +51,7 @@ Client and server tests are both included in leap.soledad.common. If you want to run tests in development mode you must do the following:: scripts/develop_mode.sh - ./run_tests.sh + scripts/run_tests.sh Note that to run CouchDB tests, be sure you have `CouchDB`_ installed on your system. -- cgit v1.2.3 From 6fd543b9fd9679f4978aeedee32eeece5593acc3 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 13 Nov 2015 22:10:18 -0300 Subject: [feat] Adds support to batching limited by size u1db provides batching by default. Current Soledad HTTPS Sync Target was stuck at 1 doc per request. This commit adds batching capability, limiting the size to a predefined value. Default limit size: 500kB --- client/src/leap/soledad/client/http_target/send.py | 46 +++++++++++++++------- .../src/leap/soledad/client/http_target/support.py | 13 ++++-- 2 files changed, 41 insertions(+), 18 deletions(-) diff --git a/client/src/leap/soledad/client/http_target/send.py b/client/src/leap/soledad/client/http_target/send.py index 80483f0d..c1252c13 100644 --- a/client/src/leap/soledad/client/http_target/send.py +++ b/client/src/leap/soledad/client/http_target/send.py @@ -29,6 +29,8 @@ class HTTPDocSender(object): They need to be encrypted and metadata prepared before sending. """ + MAX_BATCH_SIZE = 500 * 1000 # 500kB by default + @defer.inlineCallbacks def _send_docs(self, docs_by_generation, last_known_generation, last_known_trans_id, sync_id): @@ -43,25 +45,41 @@ class HTTPDocSender(object): sync_id=sync_id, ensure=self._ensure_callback is not None) total = len(docs_by_generation) - for idx, entry in enumerate(docs_by_generation, 1): - yield self._prepare_one_doc(entry, body, idx, total) - result = yield self._http_request( - self._url, - method='POST', - body=body.pop(1), - content_type='application/x-soledad-sync-put') - if self._defer_encryption: - self._delete_sent(idx, docs_by_generation) - _emit_send_status(idx, total) + while body.consumed < total: + result = yield self._send_batch(total, body, docs_by_generation) response_dict = json.loads(result)[0] gen_after_send = response_dict['new_generation'] trans_id_after_send = response_dict['new_transaction_id'] defer.returnValue([gen_after_send, trans_id_after_send]) - def _delete_sent(self, idx, docs_by_generation): - doc = docs_by_generation[idx - 1][0] - self._sync_enc_pool.delete_encrypted_doc( - doc.doc_id, doc.rev) + def _delete_sent(self, docs): + for doc, gen, trans_id in docs: + self._sync_enc_pool.delete_encrypted_doc( + doc.doc_id, doc.rev) + + @defer.inlineCallbacks + def _send_batch(self, total, body, docs): + sent = [] + missing = total - body.consumed + for i in xrange(1, missing + 1): + if body.pending_size > self.MAX_BATCH_SIZE: + break + idx = body.consumed + i + entry = docs[idx - 1] + sent.append(entry) + yield self._prepare_one_doc(entry, body, idx, total) + result = yield self._send_request(body.pop()) + if self._defer_encryption: + self._delete_sent(sent) + _emit_send_status(body.consumed, total) + defer.returnValue(result) + + def _send_request(self, body): + return self._http_request( + self._url, + method='POST', + body=body, + content_type='application/x-soledad-sync-put') @defer.inlineCallbacks def _prepare_one_doc(self, entry, body, idx, total): diff --git a/client/src/leap/soledad/client/http_target/support.py b/client/src/leap/soledad/client/http_target/support.py index 44cd7089..2625744c 100644 --- a/client/src/leap/soledad/client/http_target/support.py +++ b/client/src/leap/soledad/client/http_target/support.py @@ -152,6 +152,8 @@ class RequestBody(object): """ self.headers = header_dict self.entries = [] + self.consumed = 0 + self.pending_size = 0 def insert_info(self, **entry_dict): """ @@ -165,11 +167,11 @@ class RequestBody(object): """ entry = json.dumps(entry_dict) self.entries.append(entry) - return len(entry) + self.pending_size += len(entry) - def pop(self, number=1): + def pop(self): """ - Removes an amount of entries and returns it formatted and ready + Removes all entries and returns it formatted and ready to be sent. :param number: number of entries to pop and format @@ -178,7 +180,10 @@ class RequestBody(object): :return: formatted body ready to be sent :rtype: str """ - entries = [self.entries.pop(0) for i in xrange(number)] + entries = self.entries[:] + self.entries = [] + self.pending_size = 0 + self.consumed += len(entries) return self.entries_to_str(entries) def __str__(self): -- cgit v1.2.3 From 577abee147c98592753bcdc68e1693d1f4ab5a08 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 13 Nov 2015 23:02:28 -0300 Subject: [feat] prepare server to handle batches Created two methods on the backend to start and finish a batch. A dict of callbacks is available to defer actions for the last document, allowing temporary (changing often) metadata to be recorded only once. Using those methods we will also be able to put all docs in one go on the CouchDatabase implementation, but that is another step. --- client/changes/feat_send_batch | 1 + common/src/leap/soledad/common/backend.py | 26 ++++++++++++++++++++++---- server/changes/feat_handle_send_batch_better | 1 + server/src/leap/soledad/server/sync.py | 14 +++++++++++--- 4 files changed, 35 insertions(+), 7 deletions(-) create mode 100644 client/changes/feat_send_batch create mode 100644 server/changes/feat_handle_send_batch_better diff --git a/client/changes/feat_send_batch b/client/changes/feat_send_batch new file mode 100644 index 00000000..fbfce519 --- /dev/null +++ b/client/changes/feat_send_batch @@ -0,0 +1 @@ +o Client will now send documents at a limited size batch due to changes on SyncTarget. The default limit is 500kB. diff --git a/common/src/leap/soledad/common/backend.py b/common/src/leap/soledad/common/backend.py index a1493adc..b083163e 100644 --- a/common/src/leap/soledad/common/backend.py +++ b/common/src/leap/soledad/common/backend.py @@ -53,9 +53,20 @@ class SoledadBackend(CommonBackend): self._cache = None self._dbname = database._dbname self._database = database + self.batching = False if replica_uid is not None: self._set_replica_uid(replica_uid) + def batch_start(self): + self.batching = True + self.after_batch_callbacks = {} + + def batch_end(self): + self.batching = False + for name in self.after_batch_callbacks: + self.after_batch_callbacks[name]() + self.after_batch_callbacks = None + @property def cache(self): if self._cache is not None: @@ -373,7 +384,10 @@ class SoledadBackend(CommonBackend): """ if other_replica_uid in self.cache: return self.cache[other_replica_uid] - return self._database.get_replica_gen_and_trans_id(other_replica_uid) + gen, trans_id = \ + self._database.get_replica_gen_and_trans_id(other_replica_uid) + self.cache[other_replica_uid] = (gen, trans_id) + return (gen, trans_id) def _set_replica_gen_and_trans_id(self, other_replica_uid, other_generation, other_transaction_id): @@ -413,9 +427,13 @@ class SoledadBackend(CommonBackend): generation. :type other_transaction_id: str """ - self._set_replica_gen_and_trans_id(other_replica_uid, - other_generation, - other_transaction_id) + function = self._set_replica_gen_and_trans_id + args = [other_replica_uid, other_generation, other_transaction_id] + callback = lambda: function(*args) + if self.batching: + self.after_batch_callbacks['set_source_info'] = callback + else: + callback() def _force_doc_sync_conflict(self, doc): """ diff --git a/server/changes/feat_handle_send_batch_better b/server/changes/feat_handle_send_batch_better new file mode 100644 index 00000000..6ee8688a --- /dev/null +++ b/server/changes/feat_handle_send_batch_better @@ -0,0 +1 @@ +o Added two methods to start and finish a batch on backend. They can be used to change database behaviour, allowing batch operations to be optimized. diff --git a/server/src/leap/soledad/server/sync.py b/server/src/leap/soledad/server/sync.py index db25c406..96f65912 100644 --- a/server/src/leap/soledad/server/sync.py +++ b/server/src/leap/soledad/server/sync.py @@ -112,6 +112,14 @@ class SyncExchange(sync.SyncExchange): doc = self._db.get_doc(changed_doc_id, include_deleted=True) return_doc_cb(doc, gen, trans_id) + def batched_insert_from_source(self, entries, sync_id): + self._db.batch_start() + for entry in entries: + doc, gen, trans_id, number_of_docs, doc_idx = entry + self.insert_doc_from_source(doc, gen, trans_id, number_of_docs, + doc_idx, sync_id) + self._db.batch_end() + def insert_doc_from_source( self, doc, source_gen, trans_id, number_of_docs=None, doc_idx=None, sync_id=None): @@ -198,6 +206,7 @@ class SyncResource(http_app.SyncResource): self.sync_exch = self.sync_exchange_class( db, self.source_replica_uid, last_known_generation, sync_id) self._sync_id = sync_id + self._staging = [] @http_app.http_method(content_as_args=True) def post_put( @@ -225,9 +234,7 @@ class SyncResource(http_app.SyncResource): :type doc_idx: int """ doc = Document(id, rev, content) - self.sync_exch.insert_doc_from_source( - doc, gen, trans_id, number_of_docs=number_of_docs, - doc_idx=doc_idx, sync_id=self._sync_id) + self._staging.append((doc, gen, trans_id, number_of_docs, doc_idx)) @http_app.http_method(received=int, content_as_args=True) def post_get(self, received): @@ -266,6 +273,7 @@ class SyncResource(http_app.SyncResource): Return the current generation and transaction_id after inserting one incoming document. """ + self.sync_exch.batched_insert_from_source(self._staging, self._sync_id) self.responder.content_type = 'application/x-soledad-sync-response' self.responder.start_response(200) self.responder.start_stream(), -- cgit v1.2.3 From bcbb9ccd1d4a281b6922340c12ec01b09d636380 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sat, 14 Nov 2015 03:38:20 -0300 Subject: [feat] put all docs at once Using _bulk_docs api from CouchDB we can put all docs at a single request. Also, prefetching all ids removes the need to HEAD requests during the batch. --- common/src/leap/soledad/common/backend.py | 6 +++ common/src/leap/soledad/common/couch/__init__.py | 51 +++++++++++++++++------- 2 files changed, 43 insertions(+), 14 deletions(-) diff --git a/common/src/leap/soledad/common/backend.py b/common/src/leap/soledad/common/backend.py index b083163e..9f5950b2 100644 --- a/common/src/leap/soledad/common/backend.py +++ b/common/src/leap/soledad/common/backend.py @@ -60,9 +60,15 @@ class SoledadBackend(CommonBackend): def batch_start(self): self.batching = True self.after_batch_callbacks = {} + self._database.batch_start() + if not self._cache: + # batching needs cache + self._cache = {} + self._get_generation() # warm up gen info def batch_end(self): self.batching = False + self._database.batch_end() for name in self.after_batch_callbacks: self.after_batch_callbacks[name]() self.after_batch_callbacks = None diff --git a/common/src/leap/soledad/common/couch/__init__.py b/common/src/leap/soledad/common/couch/__init__.py index fb3d57af..f3437ae1 100644 --- a/common/src/leap/soledad/common/couch/__init__.py +++ b/common/src/leap/soledad/common/couch/__init__.py @@ -153,10 +153,22 @@ class CouchDatabase(object): self._url = url self._dbname = dbname self._database = self.get_couch_database(url, dbname) + self.batching = False + self.batch_docs = [] if ensure_ddocs: self.ensure_ddocs_on_db() self.ensure_security_ddoc(database_security) + def batch_start(self): + self.batching = True + ids = set(row.id for row in self._database.view('_all_docs')) + self.batched_ids = ids + + def batch_end(self): + self.batching = False + self._database.update(self.batch_docs) + self.batch_docs = [] + def get_couch_database(self, url, dbname): """ Generate a couchdb.Database instance given a url and dbname. @@ -339,6 +351,8 @@ class CouchDatabase(object): """ # get document with all attachments (u1db content and eventual # conflicts) + if self.batching and doc_id not in self.batched_ids: + return None if doc_id not in self._database: return None result = self.json_from_resource([doc_id], attachments=True) @@ -691,20 +705,29 @@ class CouchDatabase(object): if old_doc is not None and hasattr(old_doc, 'couch_rev'): couch_doc['_rev'] = old_doc.couch_rev # prepare the multipart PUT - buf = StringIO() - envelope = MultipartWriter(buf) - envelope.add('application/json', json.dumps(couch_doc)) - for part in parts: - envelope.add('application/octet-stream', part) - envelope.close() - # try to save and fail if there's a revision conflict - try: - resource = self._new_resource() - resource.put_json( - doc.doc_id, body=str(buf.getvalue()), - headers=envelope.headers) - except ResourceConflict: - raise RevisionConflict() + if not self.batching: + buf = StringIO() + envelope = MultipartWriter(buf) + envelope.add('application/json', json.dumps(couch_doc)) + for part in parts: + envelope.add('application/octet-stream', part) + envelope.close() + # try to save and fail if there's a revision conflict + try: + resource = self._new_resource() + resource.put_json( + doc.doc_id, body=str(buf.getvalue()), + headers=envelope.headers) + except ResourceConflict: + raise RevisionConflict() + else: + for name, attachment in attachments.items(): + del attachment['follows'] + del attachment['length'] + index = 0 if name is 'u1db_content' else 1 + attachment['data'] = binascii.b2a_base64(parts[index]).strip() + couch_doc['attachments'] = attachments + self.batch_docs.append(couch_doc) return transactions[-1][1] def _new_resource(self, *path): -- cgit v1.2.3 From fa595231b5ea346dfd9e06c364854469397fca3f Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sat, 14 Nov 2015 21:53:14 -0300 Subject: [feat] checks staged docs inside batch This commit adds checking for consistency on batch. When a doc is needed during a batched sync and it doesnt exists on database, current code will make a partial batch to avoid processing like it doesnt exist. --- common/src/leap/soledad/common/couch/__init__.py | 50 ++++++++++++++++++++---- 1 file changed, 42 insertions(+), 8 deletions(-) diff --git a/common/src/leap/soledad/common/couch/__init__.py b/common/src/leap/soledad/common/couch/__init__.py index f3437ae1..a922fd48 100644 --- a/common/src/leap/soledad/common/couch/__init__.py +++ b/common/src/leap/soledad/common/couch/__init__.py @@ -154,7 +154,7 @@ class CouchDatabase(object): self._dbname = dbname self._database = self.get_couch_database(url, dbname) self.batching = False - self.batch_docs = [] + self.batch_docs = {} if ensure_ddocs: self.ensure_ddocs_on_db() self.ensure_security_ddoc(database_security) @@ -166,8 +166,7 @@ class CouchDatabase(object): def batch_end(self): self.batching = False - self._database.update(self.batch_docs) - self.batch_docs = [] + self.__perform_batch() def get_couch_database(self, url, dbname): """ @@ -197,7 +196,8 @@ class CouchDatabase(object): """ for ddoc_name in ['docs', 'syncs', 'transactions']: try: - self.json_from_resource(['_design', ddoc_name, '_info'], + self.json_from_resource(['_design'] + + ddoc_name.split('/') + ['_info'], check_missing_ddoc=False) except ResourceNotFound: ddoc = json.loads( @@ -349,15 +349,49 @@ class CouchDatabase(object): :return: The document. :rtype: ServerDocument """ - # get document with all attachments (u1db content and eventual - # conflicts) + doc_from_batch = self.__check_batch_before_get(doc_id) + if doc_from_batch: + return doc_from_batch if self.batching and doc_id not in self.batched_ids: return None if doc_id not in self._database: return None + # get document with all attachments (u1db content and eventual + # conflicts) result = self.json_from_resource([doc_id], attachments=True) return self.__parse_doc_from_couch(result, doc_id, check_for_conflicts) + def __check_batch_before_get(self, doc_id): + """ + If doc_id is staged for batching, then we need to commit the batch + before going ahead. This avoids consistency problems, like trying to + get a document that isn't persisted and processing like it is missing. + + :param doc_id: The unique document identifier + :type doc_id: str + """ + if doc_id in self.batch_docs: + couch_doc = self.batch_docs[doc_id] + rev = self.__perform_batch(doc_id) + couch_doc['_rev'] = rev + self.batched_ids.add(doc_id) + return self.__parse_doc_from_couch(couch_doc, doc_id, True) + return None + + def __perform_batch(self, doc_id=None): + status = self._database.update(self.batch_docs.values()) + rev = None + for ok, stored_doc_id, rev_or_error in status: + if not ok: + error = rev_or_error + if type(error) is ResourceConflict: + raise RevisionConflict + raise error + elif doc_id == stored_doc_id: + rev = rev_or_error + self.batch_docs.clear() + return rev + def __parse_doc_from_couch(self, result, doc_id, check_for_conflicts=False): # restrict to u1db documents @@ -726,8 +760,8 @@ class CouchDatabase(object): del attachment['length'] index = 0 if name is 'u1db_content' else 1 attachment['data'] = binascii.b2a_base64(parts[index]).strip() - couch_doc['attachments'] = attachments - self.batch_docs.append(couch_doc) + couch_doc['_attachments'] = attachments + self.batch_docs[doc.doc_id] = couch_doc return transactions[-1][1] def _new_resource(self, *path): -- cgit v1.2.3 From f1497b92aff3b953eca572c08d85d8ddffb36391 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 17 Nov 2015 21:27:38 -0300 Subject: [feat] add configuration to disable batching Batch support is optional. This commit adds a 'batching' configuration option to disable it. --- common/src/leap/soledad/common/backend.py | 5 ++++ .../leap/soledad/common/tests/fixture_soledad.conf | 1 + .../src/leap/soledad/common/tests/test_server.py | 33 +++++++++++++--------- server/src/leap/soledad/server/__init__.py | 25 +++++++++------- 4 files changed, 41 insertions(+), 23 deletions(-) diff --git a/common/src/leap/soledad/common/backend.py b/common/src/leap/soledad/common/backend.py index 9f5950b2..91f28fff 100644 --- a/common/src/leap/soledad/common/backend.py +++ b/common/src/leap/soledad/common/backend.py @@ -33,6 +33,7 @@ from leap.soledad.common.document import ServerDocument class SoledadBackend(CommonBackend): + BATCH_SUPPORT = True """ A U1DB backend implementation. @@ -58,6 +59,8 @@ class SoledadBackend(CommonBackend): self._set_replica_uid(replica_uid) def batch_start(self): + if not self.BATCH_SUPPORT: + return self.batching = True self.after_batch_callbacks = {} self._database.batch_start() @@ -67,6 +70,8 @@ class SoledadBackend(CommonBackend): self._get_generation() # warm up gen info def batch_end(self): + if not self.BATCH_SUPPORT: + return self.batching = False self._database.batch_end() for name in self.after_batch_callbacks: diff --git a/common/src/leap/soledad/common/tests/fixture_soledad.conf b/common/src/leap/soledad/common/tests/fixture_soledad.conf index c0ffacf6..8d8161c3 100644 --- a/common/src/leap/soledad/common/tests/fixture_soledad.conf +++ b/common/src/leap/soledad/common/tests/fixture_soledad.conf @@ -2,6 +2,7 @@ couch_url = http://soledad:passwd@localhost:5984 create_cmd = sudo -u soledad-admin /usr/bin/create-user-db admin_netrc = /etc/couchdb/couchdb-soledad-admin.netrc +batching = 0 [database-security] members = user1, user2 diff --git a/common/src/leap/soledad/common/tests/test_server.py b/common/src/leap/soledad/common/tests/test_server.py index e1129a9f..20fe8579 100644 --- a/common/src/leap/soledad/common/tests/test_server.py +++ b/common/src/leap/soledad/common/tests/test_server.py @@ -640,18 +640,25 @@ class ConfigurationParsingTest(unittest.TestCase): config = load_configuration(config_path) # then - expected = {'soledad-server': { - 'couch_url': - 'http://soledad:passwd@localhost:5984', - 'create_cmd': - 'sudo -u soledad-admin /usr/bin/create-user-db', - 'admin_netrc': - '/etc/couchdb/couchdb-soledad-admin.netrc', - }, - 'database-security': { - 'members': ['user1', 'user2'], + expected = {'members': ['user1', 'user2'], 'members_roles': ['role1', 'role2'], 'admins': ['user3', 'user4'], - 'admins_roles': ['role3', 'role3'], - }} - self.assertDictEqual(expected, config) + 'admins_roles': ['role3', 'role3']} + self.assertDictEqual(expected, config['database-security']) + + def test_server_values_configuration(self): + # given + config_path = resource_filename('leap.soledad.common.tests', + 'fixture_soledad.conf') + # when + config = load_configuration(config_path) + + # then + expected = {'couch_url': + 'http://soledad:passwd@localhost:5984', + 'create_cmd': + 'sudo -u soledad-admin /usr/bin/create-user-db', + 'admin_netrc': + '/etc/couchdb/couchdb-soledad-admin.netrc', + 'batching': False} + self.assertDictEqual(expected, config['soledad-server']) diff --git a/server/src/leap/soledad/server/__init__.py b/server/src/leap/soledad/server/__init__.py index 00e1e9fb..7320c133 100644 --- a/server/src/leap/soledad/server/__init__.py +++ b/server/src/leap/soledad/server/__init__.py @@ -104,6 +104,7 @@ from leap.soledad.server.sync import ( ) from leap.soledad.common import SHARED_DB_NAME +from leap.soledad.common.backend import SoledadBackend from leap.soledad.common.couch.state import CouchServerState # ---------------------------------------------------------------------------- @@ -264,6 +265,7 @@ CONFIG_DEFAULTS = { 'couch_url': 'http://localhost:5984', 'create_cmd': None, 'admin_netrc': '/etc/couchdb/couchdb-admin.netrc', + 'batching': True }, 'database-security': { 'members': ['soledad'], @@ -285,18 +287,20 @@ def load_configuration(file_path): @rtype: dict """ defaults = dict(CONFIG_DEFAULTS) - config = configparser.ConfigParser() + config = configparser.SafeConfigParser() config.read(file_path) - for section in defaults.keys(): - if section in config: - for key in defaults[section]: - if key in config[section]: - defaults[section][key] = config[section][key] - for key, value in defaults['database-security'].iteritems(): - if type(value) is not unicode: + for section in defaults: + if not config.has_section(section): continue - defaults['database-security'][key] = \ - [item.strip() for item in value.split(',')] + for key, value in defaults[section].items(): + if type(value) == bool: + defaults[section][key] = config.getboolean(section, key) + elif type(value) == list: + values = config.get(section, key).split(',') + values = [v.strip() for v in values] + defaults[section][key] = values + else: + defaults[section][key] = config.get(section, key) # TODO: implement basic parsing/sanitization of options comming from # config file. return defaults @@ -310,6 +314,7 @@ def application(environ, start_response): conf = load_configuration('/etc/soledad/soledad-server.conf') conf = conf['soledad-server'] state = CouchServerState(conf['couch_url'], create_cmd=conf['create_cmd']) + SoledadBackend.BATCH_SUPPORT = conf['batching'] # WSGI application that may be used by `twistd -web` application = GzipMiddleware( SoledadTokenAuthMiddleware(SoledadApp(state))) -- cgit v1.2.3 From d103491cbc17e6e7422653e9b01101ff446e7391 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Thu, 3 Dec 2015 19:25:29 -0300 Subject: [feat] generation caching during a batch Generation cache was removed for simple processing and it should not got back, but during a batch the server wont change its generation. So a little trick to hold that temporary information until batch finishes is needed. --- common/src/leap/soledad/common/couch/__init__.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/common/src/leap/soledad/common/couch/__init__.py b/common/src/leap/soledad/common/couch/__init__.py index a922fd48..dae460cb 100644 --- a/common/src/leap/soledad/common/couch/__init__.py +++ b/common/src/leap/soledad/common/couch/__init__.py @@ -154,6 +154,7 @@ class CouchDatabase(object): self._dbname = dbname self._database = self.get_couch_database(url, dbname) self.batching = False + self.batch_generation = None self.batch_docs = {} if ensure_ddocs: self.ensure_ddocs_on_db() @@ -161,11 +162,13 @@ class CouchDatabase(object): def batch_start(self): self.batching = True + self.batch_generation = self.get_generation_info() ids = set(row.id for row in self._database.view('_all_docs')) self.batched_ids = ids def batch_end(self): self.batching = False + self.batch_generation = None self.__perform_batch() def get_couch_database(self, url, dbname): @@ -619,6 +622,8 @@ class CouchDatabase(object): :return: A tuple containing the current generation and transaction id. :rtype: (int, str) """ + if self.batching and self.batch_generation: + return self.batch_generation # query a couch list function ddoc_path = ['_design', 'transactions', '_list', 'generation', 'log'] info = self.json_from_resource(ddoc_path) @@ -762,6 +767,8 @@ class CouchDatabase(object): attachment['data'] = binascii.b2a_base64(parts[index]).strip() couch_doc['_attachments'] = attachments self.batch_docs[doc.doc_id] = couch_doc + last_gen, last_trans_id = self.batch_generation + self.batch_generation = (last_gen + 1, transaction_id) return transactions[-1][1] def _new_resource(self, *path): -- cgit v1.2.3 From 7208d8bc5e5f23d0773533b15763f64d236489b4 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Thu, 3 Dec 2015 19:34:56 -0300 Subject: [feat] set default to False on batching for now All batching code has no effect by default with this commit. Since we know that this is a dangerous new feature we will enable them only on our test servers and check them manually before setting it as default or adding more configuration features. Use SyncTarget and server conf file to enable it for testing. --- client/src/leap/soledad/client/http_target/send.py | 2 +- common/src/leap/soledad/common/backend.py | 2 +- server/src/leap/soledad/server/__init__.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/client/src/leap/soledad/client/http_target/send.py b/client/src/leap/soledad/client/http_target/send.py index c1252c13..e8abf35b 100644 --- a/client/src/leap/soledad/client/http_target/send.py +++ b/client/src/leap/soledad/client/http_target/send.py @@ -29,7 +29,7 @@ class HTTPDocSender(object): They need to be encrypted and metadata prepared before sending. """ - MAX_BATCH_SIZE = 500 * 1000 # 500kB by default + MAX_BATCH_SIZE = 0 # disabled by now, this is being tested yet @defer.inlineCallbacks def _send_docs(self, docs_by_generation, last_known_generation, diff --git a/common/src/leap/soledad/common/backend.py b/common/src/leap/soledad/common/backend.py index 91f28fff..53426fb5 100644 --- a/common/src/leap/soledad/common/backend.py +++ b/common/src/leap/soledad/common/backend.py @@ -33,7 +33,7 @@ from leap.soledad.common.document import ServerDocument class SoledadBackend(CommonBackend): - BATCH_SUPPORT = True + BATCH_SUPPORT = False """ A U1DB backend implementation. diff --git a/server/src/leap/soledad/server/__init__.py b/server/src/leap/soledad/server/__init__.py index 7320c133..39edcc1b 100644 --- a/server/src/leap/soledad/server/__init__.py +++ b/server/src/leap/soledad/server/__init__.py @@ -265,7 +265,7 @@ CONFIG_DEFAULTS = { 'couch_url': 'http://localhost:5984', 'create_cmd': None, 'admin_netrc': '/etc/couchdb/couchdb-admin.netrc', - 'batching': True + 'batching': False }, 'database-security': { 'members': ['soledad'], -- cgit v1.2.3 From 27bda0ac201e236e3a2c9671462a337f2970e993 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 4 Dec 2015 15:47:53 -0300 Subject: [bug] skip missing keys on existing sections While parsing the configuration file, if a key doesnt exist we need to skip it. --- server/src/leap/soledad/server/__init__.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/server/src/leap/soledad/server/__init__.py b/server/src/leap/soledad/server/__init__.py index 39edcc1b..22894dac 100644 --- a/server/src/leap/soledad/server/__init__.py +++ b/server/src/leap/soledad/server/__init__.py @@ -293,7 +293,9 @@ def load_configuration(file_path): if not config.has_section(section): continue for key, value in defaults[section].items(): - if type(value) == bool: + if not config.has_option(section, key): + continue + elif type(value) == bool: defaults[section][key] = config.getboolean(section, key) elif type(value) == list: values = config.get(section, key).split(',') -- cgit v1.2.3