summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--README.rst2
-rw-r--r--client/changes/feat_send_batch1
-rw-r--r--client/src/leap/soledad/client/encdecpool.py20
-rw-r--r--client/src/leap/soledad/client/http_target/send.py46
-rw-r--r--client/src/leap/soledad/client/http_target/support.py13
-rw-r--r--common/src/leap/soledad/common/backend.py51
-rw-r--r--common/src/leap/soledad/common/couch/__init__.py100
-rw-r--r--common/src/leap/soledad/common/couch/support.py115
-rw-r--r--common/src/leap/soledad/common/errors.py3
-rw-r--r--common/src/leap/soledad/common/tests/fixture_soledad.conf1
-rw-r--r--common/src/leap/soledad/common/tests/test_encdecpool.py26
-rw-r--r--common/src/leap/soledad/common/tests/test_server.py33
-rw-r--r--server/changes/feat_handle_send_batch_better1
-rw-r--r--server/src/leap/soledad/server/__init__.py27
-rw-r--r--server/src/leap/soledad/server/sync.py14
15 files changed, 357 insertions, 96 deletions
diff --git a/README.rst b/README.rst
index 815db808..74a49d12 100644
--- a/README.rst
+++ b/README.rst
@@ -51,7 +51,7 @@ Client and server tests are both included in leap.soledad.common. If you want
to run tests in development mode you must do the following::
scripts/develop_mode.sh
- ./run_tests.sh
+ scripts/run_tests.sh
Note that to run CouchDB tests, be sure you have `CouchDB`_ installed on your
system.
diff --git a/client/changes/feat_send_batch b/client/changes/feat_send_batch
new file mode 100644
index 00000000..fbfce519
--- /dev/null
+++ b/client/changes/feat_send_batch
@@ -0,0 +1 @@
+o Client will now send documents at a limited size batch due to changes on SyncTarget. The default limit is 500kB.
diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py
index 6d3c11b9..0954c1df 100644
--- a/client/src/leap/soledad/client/encdecpool.py
+++ b/client/src/leap/soledad/client/encdecpool.py
@@ -70,11 +70,13 @@ class SyncEncryptDecryptPool(object):
self._started = False
def start(self):
+ if self.running:
+ return
self._create_pool()
self._started = True
def stop(self):
- if not self._started:
+ if not self.running:
return
self._started = False
self._destroy_pool()
@@ -650,14 +652,6 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
last_idx = self._last_inserted_idx
for doc_id, rev, content, gen, trans_id, encrypted, idx in \
decrypted_docs:
- # XXX for some reason, a document might not have been deleted from
- # the database. This is a bug. In this point, already
- # processed documents should have been removed from the sync
- # database and we should not have to skip them here. We need
- # to find out why this is happening, fix, and remove the
- # skipping below.
- if (idx < last_idx + 1):
- continue
if (idx != last_idx + 1):
break
insertable.append((doc_id, rev, content, gen, trans_id, idx))
@@ -763,6 +757,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
query = "DELETE FROM %s WHERE 1" % (self.TABLE_NAME,)
return self._runOperation(query)
+ @defer.inlineCallbacks
def _collect_async_decryption_results(self):
"""
Collect the results of the asynchronous doc decryptions and re-raise
@@ -773,7 +768,8 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
async_results = self._async_results[:]
for res in async_results:
if res.ready():
- self._decrypt_doc_cb(res.get()) # might raise an exception!
+ # XXX: might raise an exception!
+ yield self._decrypt_doc_cb(res.get())
self._async_results.remove(res)
@defer.inlineCallbacks
@@ -796,7 +792,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
if processed < pending:
yield self._async_decrypt_received_docs()
- self._collect_async_decryption_results()
+ yield self._collect_async_decryption_results()
docs = yield self._process_decrypted_docs()
yield self._delete_processed_docs(docs)
# recurse
@@ -807,6 +803,6 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
self._finish()
def _finish(self):
- self._deferred.callback(None)
self._processed_docs = 0
self._last_inserted_idx = 0
+ self._deferred.callback(None)
diff --git a/client/src/leap/soledad/client/http_target/send.py b/client/src/leap/soledad/client/http_target/send.py
index 80483f0d..e8abf35b 100644
--- a/client/src/leap/soledad/client/http_target/send.py
+++ b/client/src/leap/soledad/client/http_target/send.py
@@ -29,6 +29,8 @@ class HTTPDocSender(object):
They need to be encrypted and metadata prepared before sending.
"""
+ MAX_BATCH_SIZE = 0 # disabled by now, this is being tested yet
+
@defer.inlineCallbacks
def _send_docs(self, docs_by_generation, last_known_generation,
last_known_trans_id, sync_id):
@@ -43,25 +45,41 @@ class HTTPDocSender(object):
sync_id=sync_id,
ensure=self._ensure_callback is not None)
total = len(docs_by_generation)
- for idx, entry in enumerate(docs_by_generation, 1):
- yield self._prepare_one_doc(entry, body, idx, total)
- result = yield self._http_request(
- self._url,
- method='POST',
- body=body.pop(1),
- content_type='application/x-soledad-sync-put')
- if self._defer_encryption:
- self._delete_sent(idx, docs_by_generation)
- _emit_send_status(idx, total)
+ while body.consumed < total:
+ result = yield self._send_batch(total, body, docs_by_generation)
response_dict = json.loads(result)[0]
gen_after_send = response_dict['new_generation']
trans_id_after_send = response_dict['new_transaction_id']
defer.returnValue([gen_after_send, trans_id_after_send])
- def _delete_sent(self, idx, docs_by_generation):
- doc = docs_by_generation[idx - 1][0]
- self._sync_enc_pool.delete_encrypted_doc(
- doc.doc_id, doc.rev)
+ def _delete_sent(self, docs):
+ for doc, gen, trans_id in docs:
+ self._sync_enc_pool.delete_encrypted_doc(
+ doc.doc_id, doc.rev)
+
+ @defer.inlineCallbacks
+ def _send_batch(self, total, body, docs):
+ sent = []
+ missing = total - body.consumed
+ for i in xrange(1, missing + 1):
+ if body.pending_size > self.MAX_BATCH_SIZE:
+ break
+ idx = body.consumed + i
+ entry = docs[idx - 1]
+ sent.append(entry)
+ yield self._prepare_one_doc(entry, body, idx, total)
+ result = yield self._send_request(body.pop())
+ if self._defer_encryption:
+ self._delete_sent(sent)
+ _emit_send_status(body.consumed, total)
+ defer.returnValue(result)
+
+ def _send_request(self, body):
+ return self._http_request(
+ self._url,
+ method='POST',
+ body=body,
+ content_type='application/x-soledad-sync-put')
@defer.inlineCallbacks
def _prepare_one_doc(self, entry, body, idx, total):
diff --git a/client/src/leap/soledad/client/http_target/support.py b/client/src/leap/soledad/client/http_target/support.py
index 44cd7089..2625744c 100644
--- a/client/src/leap/soledad/client/http_target/support.py
+++ b/client/src/leap/soledad/client/http_target/support.py
@@ -152,6 +152,8 @@ class RequestBody(object):
"""
self.headers = header_dict
self.entries = []
+ self.consumed = 0
+ self.pending_size = 0
def insert_info(self, **entry_dict):
"""
@@ -165,11 +167,11 @@ class RequestBody(object):
"""
entry = json.dumps(entry_dict)
self.entries.append(entry)
- return len(entry)
+ self.pending_size += len(entry)
- def pop(self, number=1):
+ def pop(self):
"""
- Removes an amount of entries and returns it formatted and ready
+ Removes all entries and returns it formatted and ready
to be sent.
:param number: number of entries to pop and format
@@ -178,7 +180,10 @@ class RequestBody(object):
:return: formatted body ready to be sent
:rtype: str
"""
- entries = [self.entries.pop(0) for i in xrange(number)]
+ entries = self.entries[:]
+ self.entries = []
+ self.pending_size = 0
+ self.consumed += len(entries)
return self.entries_to_str(entries)
def __str__(self):
diff --git a/common/src/leap/soledad/common/backend.py b/common/src/leap/soledad/common/backend.py
index deed5ac2..53426fb5 100644
--- a/common/src/leap/soledad/common/backend.py
+++ b/common/src/leap/soledad/common/backend.py
@@ -33,6 +33,7 @@ from leap.soledad.common.document import ServerDocument
class SoledadBackend(CommonBackend):
+ BATCH_SUPPORT = False
"""
A U1DB backend implementation.
@@ -53,9 +54,30 @@ class SoledadBackend(CommonBackend):
self._cache = None
self._dbname = database._dbname
self._database = database
+ self.batching = False
if replica_uid is not None:
self._set_replica_uid(replica_uid)
+ def batch_start(self):
+ if not self.BATCH_SUPPORT:
+ return
+ self.batching = True
+ self.after_batch_callbacks = {}
+ self._database.batch_start()
+ if not self._cache:
+ # batching needs cache
+ self._cache = {}
+ self._get_generation() # warm up gen info
+
+ def batch_end(self):
+ if not self.BATCH_SUPPORT:
+ return
+ self.batching = False
+ self._database.batch_end()
+ for name in self.after_batch_callbacks:
+ self.after_batch_callbacks[name]()
+ self.after_batch_callbacks = None
+
@property
def cache(self):
if self._cache is not None:
@@ -154,11 +176,7 @@ class SoledadBackend(CommonBackend):
:raise SoledadError: Raised by database on operation failure
"""
- if self.replica_uid + '_gen' in self.cache:
- response = self.cache[self.replica_uid + '_gen']
- return response
cur_gen, newest_trans_id = self._database.get_generation_info()
- self.cache[self.replica_uid + '_gen'] = (cur_gen, newest_trans_id)
return (cur_gen, newest_trans_id)
def _get_trans_id_for_gen(self, generation):
@@ -253,14 +271,8 @@ class SoledadBackend(CommonBackend):
:param doc: The document to be put.
:type doc: ServerDocument
"""
- last_transaction =\
- self._database.save_document(old_doc, doc,
- self._allocate_transaction_id())
- if self.replica_uid + '_gen' in self.cache:
- gen, trans = self.cache[self.replica_uid + '_gen']
- gen += 1
- trans = last_transaction
- self.cache[self.replica_uid + '_gen'] = (gen, trans)
+ self._database.save_document(old_doc, doc,
+ self._allocate_transaction_id())
def put_doc(self, doc):
"""
@@ -383,7 +395,10 @@ class SoledadBackend(CommonBackend):
"""
if other_replica_uid in self.cache:
return self.cache[other_replica_uid]
- return self._database.get_replica_gen_and_trans_id(other_replica_uid)
+ gen, trans_id = \
+ self._database.get_replica_gen_and_trans_id(other_replica_uid)
+ self.cache[other_replica_uid] = (gen, trans_id)
+ return (gen, trans_id)
def _set_replica_gen_and_trans_id(self, other_replica_uid,
other_generation, other_transaction_id):
@@ -423,9 +438,13 @@ class SoledadBackend(CommonBackend):
generation.
:type other_transaction_id: str
"""
- self._set_replica_gen_and_trans_id(other_replica_uid,
- other_generation,
- other_transaction_id)
+ function = self._set_replica_gen_and_trans_id
+ args = [other_replica_uid, other_generation, other_transaction_id]
+ callback = lambda: function(*args)
+ if self.batching:
+ self.after_batch_callbacks['set_source_info'] = callback
+ else:
+ callback()
def _force_doc_sync_conflict(self, doc):
"""
diff --git a/common/src/leap/soledad/common/couch/__init__.py b/common/src/leap/soledad/common/couch/__init__.py
index bd8b08b7..dae460cb 100644
--- a/common/src/leap/soledad/common/couch/__init__.py
+++ b/common/src/leap/soledad/common/couch/__init__.py
@@ -33,7 +33,6 @@ from multiprocessing.pool import ThreadPool
from couchdb.client import Server, Database
-from couchdb.multipart import MultipartWriter
from couchdb.http import (
ResourceConflict,
ResourceNotFound,
@@ -53,6 +52,7 @@ from u1db.remote import http_app
from leap.soledad.common import ddocs
from .errors import raise_server_error
from .errors import raise_missing_design_doc_error
+from .support import MultipartWriter
from leap.soledad.common.errors import InvalidURLError
from leap.soledad.common.document import ServerDocument
from leap.soledad.common.backend import SoledadBackend
@@ -153,10 +153,24 @@ class CouchDatabase(object):
self._url = url
self._dbname = dbname
self._database = self.get_couch_database(url, dbname)
+ self.batching = False
+ self.batch_generation = None
+ self.batch_docs = {}
if ensure_ddocs:
self.ensure_ddocs_on_db()
self.ensure_security_ddoc(database_security)
+ def batch_start(self):
+ self.batching = True
+ self.batch_generation = self.get_generation_info()
+ ids = set(row.id for row in self._database.view('_all_docs'))
+ self.batched_ids = ids
+
+ def batch_end(self):
+ self.batching = False
+ self.batch_generation = None
+ self.__perform_batch()
+
def get_couch_database(self, url, dbname):
"""
Generate a couchdb.Database instance given a url and dbname.
@@ -185,7 +199,8 @@ class CouchDatabase(object):
"""
for ddoc_name in ['docs', 'syncs', 'transactions']:
try:
- self.json_from_resource(['_design', ddoc_name, '_info'],
+ self.json_from_resource(['_design'] +
+ ddoc_name.split('/') + ['_info'],
check_missing_ddoc=False)
except ResourceNotFound:
ddoc = json.loads(
@@ -337,13 +352,49 @@ class CouchDatabase(object):
:return: The document.
:rtype: ServerDocument
"""
- # get document with all attachments (u1db content and eventual
- # conflicts)
+ doc_from_batch = self.__check_batch_before_get(doc_id)
+ if doc_from_batch:
+ return doc_from_batch
+ if self.batching and doc_id not in self.batched_ids:
+ return None
if doc_id not in self._database:
return None
+ # get document with all attachments (u1db content and eventual
+ # conflicts)
result = self.json_from_resource([doc_id], attachments=True)
return self.__parse_doc_from_couch(result, doc_id, check_for_conflicts)
+ def __check_batch_before_get(self, doc_id):
+ """
+ If doc_id is staged for batching, then we need to commit the batch
+ before going ahead. This avoids consistency problems, like trying to
+ get a document that isn't persisted and processing like it is missing.
+
+ :param doc_id: The unique document identifier
+ :type doc_id: str
+ """
+ if doc_id in self.batch_docs:
+ couch_doc = self.batch_docs[doc_id]
+ rev = self.__perform_batch(doc_id)
+ couch_doc['_rev'] = rev
+ self.batched_ids.add(doc_id)
+ return self.__parse_doc_from_couch(couch_doc, doc_id, True)
+ return None
+
+ def __perform_batch(self, doc_id=None):
+ status = self._database.update(self.batch_docs.values())
+ rev = None
+ for ok, stored_doc_id, rev_or_error in status:
+ if not ok:
+ error = rev_or_error
+ if type(error) is ResourceConflict:
+ raise RevisionConflict
+ raise error
+ elif doc_id == stored_doc_id:
+ rev = rev_or_error
+ self.batch_docs.clear()
+ return rev
+
def __parse_doc_from_couch(self, result, doc_id,
check_for_conflicts=False):
# restrict to u1db documents
@@ -571,6 +622,8 @@ class CouchDatabase(object):
:return: A tuple containing the current generation and transaction id.
:rtype: (int, str)
"""
+ if self.batching and self.batch_generation:
+ return self.batch_generation
# query a couch list function
ddoc_path = ['_design', 'transactions', '_list', 'generation', 'log']
info = self.json_from_resource(ddoc_path)
@@ -691,20 +744,31 @@ class CouchDatabase(object):
if old_doc is not None and hasattr(old_doc, 'couch_rev'):
couch_doc['_rev'] = old_doc.couch_rev
# prepare the multipart PUT
- buf = StringIO()
- headers = {}
- envelope = MultipartWriter(buf, headers=headers, subtype='related')
- envelope.add('application/json', json.dumps(couch_doc))
- for part in parts:
- envelope.add('application/octet-stream', part)
- envelope.close()
- # try to save and fail if there's a revision conflict
- try:
- resource = self._new_resource()
- resource.put_json(
- doc.doc_id, body=str(buf.getvalue()), headers=headers)
- except ResourceConflict:
- raise RevisionConflict()
+ if not self.batching:
+ buf = StringIO()
+ envelope = MultipartWriter(buf)
+ envelope.add('application/json', json.dumps(couch_doc))
+ for part in parts:
+ envelope.add('application/octet-stream', part)
+ envelope.close()
+ # try to save and fail if there's a revision conflict
+ try:
+ resource = self._new_resource()
+ resource.put_json(
+ doc.doc_id, body=str(buf.getvalue()),
+ headers=envelope.headers)
+ except ResourceConflict:
+ raise RevisionConflict()
+ else:
+ for name, attachment in attachments.items():
+ del attachment['follows']
+ del attachment['length']
+ index = 0 if name is 'u1db_content' else 1
+ attachment['data'] = binascii.b2a_base64(parts[index]).strip()
+ couch_doc['_attachments'] = attachments
+ self.batch_docs[doc.doc_id] = couch_doc
+ last_gen, last_trans_id = self.batch_generation
+ self.batch_generation = (last_gen + 1, transaction_id)
return transactions[-1][1]
def _new_resource(self, *path):
diff --git a/common/src/leap/soledad/common/couch/support.py b/common/src/leap/soledad/common/couch/support.py
new file mode 100644
index 00000000..bfc4fef6
--- /dev/null
+++ b/common/src/leap/soledad/common/couch/support.py
@@ -0,0 +1,115 @@
+# -*- coding: utf-8 -*-
+# support.py
+# Copyright (C) 2015 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import sys
+
+
+"""
+Monkey patches and temporary code that may be removed with version changes.
+"""
+
+
+# for bigcouch
+# TODO: Remove if bigcouch support is dropped
+class MultipartWriter(object):
+
+ """
+ A multipart writer adapted from python-couchdb's one so we can PUT
+ documents using couch's multipart PUT.
+
+ This stripped down version does not allow for nested structures, and
+ contains only the essential things we need to PUT SoledadDocuments to the
+ couch backend. Also, please note that this is a patch. The couchdb lib has
+ another implementation that works fine with CouchDB 1.6, but removing this
+ now will break compatibility with bigcouch.
+ """
+
+ CRLF = '\r\n'
+
+ def __init__(self, fileobj, headers=None, boundary=None):
+ """
+ Initialize the multipart writer.
+ """
+ self.fileobj = fileobj
+ if boundary is None:
+ boundary = self._make_boundary()
+ self._boundary = boundary
+ self._build_headers('related', headers)
+
+ def add(self, mimetype, content, headers={}):
+ """
+ Add a part to the multipart stream.
+ """
+ self.fileobj.write('--')
+ self.fileobj.write(self._boundary)
+ self.fileobj.write(self.CRLF)
+ headers['Content-Type'] = mimetype
+ self._write_headers(headers)
+ if content:
+ # XXX: throw an exception if a boundary appears in the content??
+ self.fileobj.write(content)
+ self.fileobj.write(self.CRLF)
+
+ def close(self):
+ """
+ Close the multipart stream.
+ """
+ self.fileobj.write('--')
+ self.fileobj.write(self._boundary)
+ # be careful not to have anything after '--', otherwise old couch
+ # versions (including bigcouch) will fail.
+ self.fileobj.write('--')
+
+ def _make_boundary(self):
+ """
+ Create a boundary to discern multi parts.
+ """
+ try:
+ from uuid import uuid4
+ return '==' + uuid4().hex + '=='
+ except ImportError:
+ from random import randrange
+ token = randrange(sys.maxint)
+ format = '%%0%dd' % len(repr(sys.maxint - 1))
+ return '===============' + (format % token) + '=='
+
+ def _write_headers(self, headers):
+ """
+ Write a part header in the buffer stream.
+ """
+ if headers:
+ for name in sorted(headers.keys()):
+ value = headers[name]
+ self.fileobj.write(name)
+ self.fileobj.write(': ')
+ self.fileobj.write(value)
+ self.fileobj.write(self.CRLF)
+ self.fileobj.write(self.CRLF)
+
+ def _build_headers(self, subtype, headers):
+ """
+ Build the main headers of the multipart stream.
+
+ This is here so we can send headers separete from content using
+ python-couchdb API.
+ """
+ self.headers = {}
+ self.headers['Content-Type'] = 'multipart/%s; boundary="%s"' % \
+ (subtype, self._boundary)
+ if headers:
+ for name in sorted(headers.keys()):
+ value = headers[name]
+ self.headers[name] = value
diff --git a/common/src/leap/soledad/common/errors.py b/common/src/leap/soledad/common/errors.py
index 7a8a8929..0b6bb4e6 100644
--- a/common/src/leap/soledad/common/errors.py
+++ b/common/src/leap/soledad/common/errors.py
@@ -145,9 +145,10 @@ class InvalidURLError(Exception):
"""
-@register_exception
class BackendNotReadyError(SoledadError):
"""
Generic exception raised when the backend is not ready to dispatch a client
request.
"""
+ wire_description = "backend not ready"
+ status = 500
diff --git a/common/src/leap/soledad/common/tests/fixture_soledad.conf b/common/src/leap/soledad/common/tests/fixture_soledad.conf
index c0ffacf6..8d8161c3 100644
--- a/common/src/leap/soledad/common/tests/fixture_soledad.conf
+++ b/common/src/leap/soledad/common/tests/fixture_soledad.conf
@@ -2,6 +2,7 @@
couch_url = http://soledad:passwd@localhost:5984
create_cmd = sudo -u soledad-admin /usr/bin/create-user-db
admin_netrc = /etc/couchdb/couchdb-soledad-admin.netrc
+batching = 0
[database-security]
members = user1, user2
diff --git a/common/src/leap/soledad/common/tests/test_encdecpool.py b/common/src/leap/soledad/common/tests/test_encdecpool.py
index 793bfa1a..694eb7ad 100644
--- a/common/src/leap/soledad/common/tests/test_encdecpool.py
+++ b/common/src/leap/soledad/common/tests/test_encdecpool.py
@@ -18,6 +18,7 @@
Tests for encryption and decryption pool.
"""
import json
+from random import shuffle
from twisted.internet.defer import inlineCallbacks
@@ -171,20 +172,21 @@ class TestSyncDecrypterPool(BaseSoledadTest):
DOC_ID, DOC_REV, encrypted_content, 1, "trans_id", 1)
def _assert_doc_was_decrypted_and_inserted(_):
+ self.assertEqual(1, len(self._inserted_docs))
self.assertEqual(self._inserted_docs, [(doc, 1, u"trans_id")])
self._pool.deferred.addCallback(
_assert_doc_was_decrypted_and_inserted)
return self._pool.deferred
- def test_insert_encrypted_received_doc_many(self):
+ def test_insert_encrypted_received_doc_many(self, many=100):
"""
Test that many encrypted documents added to the pool are decrypted and
inserted using the callback.
"""
crypto = self._soledad._crypto
- many = 100
self._pool.start(many)
+ docs = []
# insert many encrypted docs in the pool
for i in xrange(many):
@@ -198,9 +200,12 @@ class TestSyncDecrypterPool(BaseSoledadTest):
doc_id=doc_id, rev=rev, json=json.dumps(content))
encrypted_content = json.loads(crypto.encrypt_doc(doc))
+ docs.append((doc_id, rev, encrypted_content, gen,
+ trans_id, idx))
+ shuffle(docs)
- self._pool.insert_encrypted_received_doc(
- doc_id, rev, encrypted_content, gen, trans_id, idx)
+ for doc in docs:
+ self._pool.insert_encrypted_received_doc(*doc)
def _assert_docs_were_decrypted_and_inserted(_):
self.assertEqual(many, len(self._inserted_docs))
@@ -223,3 +228,16 @@ class TestSyncDecrypterPool(BaseSoledadTest):
self._pool.deferred.addCallback(
_assert_docs_were_decrypted_and_inserted)
return self._pool.deferred
+
+ @inlineCallbacks
+ def test_pool_reuse(self):
+ """
+ The pool is reused between syncs, this test verifies that
+ reusing is fine.
+ """
+ for i in xrange(3):
+ yield self.test_insert_encrypted_received_doc_many(5)
+ self._inserted_docs = []
+ decrypted_docs = yield self._pool._get_docs(encrypted=False)
+ # check that decrypted docs staging is clean
+ self.assertEquals([], decrypted_docs)
diff --git a/common/src/leap/soledad/common/tests/test_server.py b/common/src/leap/soledad/common/tests/test_server.py
index e1129a9f..20fe8579 100644
--- a/common/src/leap/soledad/common/tests/test_server.py
+++ b/common/src/leap/soledad/common/tests/test_server.py
@@ -640,18 +640,25 @@ class ConfigurationParsingTest(unittest.TestCase):
config = load_configuration(config_path)
# then
- expected = {'soledad-server': {
- 'couch_url':
- 'http://soledad:passwd@localhost:5984',
- 'create_cmd':
- 'sudo -u soledad-admin /usr/bin/create-user-db',
- 'admin_netrc':
- '/etc/couchdb/couchdb-soledad-admin.netrc',
- },
- 'database-security': {
- 'members': ['user1', 'user2'],
+ expected = {'members': ['user1', 'user2'],
'members_roles': ['role1', 'role2'],
'admins': ['user3', 'user4'],
- 'admins_roles': ['role3', 'role3'],
- }}
- self.assertDictEqual(expected, config)
+ 'admins_roles': ['role3', 'role3']}
+ self.assertDictEqual(expected, config['database-security'])
+
+ def test_server_values_configuration(self):
+ # given
+ config_path = resource_filename('leap.soledad.common.tests',
+ 'fixture_soledad.conf')
+ # when
+ config = load_configuration(config_path)
+
+ # then
+ expected = {'couch_url':
+ 'http://soledad:passwd@localhost:5984',
+ 'create_cmd':
+ 'sudo -u soledad-admin /usr/bin/create-user-db',
+ 'admin_netrc':
+ '/etc/couchdb/couchdb-soledad-admin.netrc',
+ 'batching': False}
+ self.assertDictEqual(expected, config['soledad-server'])
diff --git a/server/changes/feat_handle_send_batch_better b/server/changes/feat_handle_send_batch_better
new file mode 100644
index 00000000..6ee8688a
--- /dev/null
+++ b/server/changes/feat_handle_send_batch_better
@@ -0,0 +1 @@
+o Added two methods to start and finish a batch on backend. They can be used to change database behaviour, allowing batch operations to be optimized.
diff --git a/server/src/leap/soledad/server/__init__.py b/server/src/leap/soledad/server/__init__.py
index 00e1e9fb..22894dac 100644
--- a/server/src/leap/soledad/server/__init__.py
+++ b/server/src/leap/soledad/server/__init__.py
@@ -104,6 +104,7 @@ from leap.soledad.server.sync import (
)
from leap.soledad.common import SHARED_DB_NAME
+from leap.soledad.common.backend import SoledadBackend
from leap.soledad.common.couch.state import CouchServerState
# ----------------------------------------------------------------------------
@@ -264,6 +265,7 @@ CONFIG_DEFAULTS = {
'couch_url': 'http://localhost:5984',
'create_cmd': None,
'admin_netrc': '/etc/couchdb/couchdb-admin.netrc',
+ 'batching': False
},
'database-security': {
'members': ['soledad'],
@@ -285,18 +287,22 @@ def load_configuration(file_path):
@rtype: dict
"""
defaults = dict(CONFIG_DEFAULTS)
- config = configparser.ConfigParser()
+ config = configparser.SafeConfigParser()
config.read(file_path)
- for section in defaults.keys():
- if section in config:
- for key in defaults[section]:
- if key in config[section]:
- defaults[section][key] = config[section][key]
- for key, value in defaults['database-security'].iteritems():
- if type(value) is not unicode:
+ for section in defaults:
+ if not config.has_section(section):
continue
- defaults['database-security'][key] = \
- [item.strip() for item in value.split(',')]
+ for key, value in defaults[section].items():
+ if not config.has_option(section, key):
+ continue
+ elif type(value) == bool:
+ defaults[section][key] = config.getboolean(section, key)
+ elif type(value) == list:
+ values = config.get(section, key).split(',')
+ values = [v.strip() for v in values]
+ defaults[section][key] = values
+ else:
+ defaults[section][key] = config.get(section, key)
# TODO: implement basic parsing/sanitization of options comming from
# config file.
return defaults
@@ -310,6 +316,7 @@ def application(environ, start_response):
conf = load_configuration('/etc/soledad/soledad-server.conf')
conf = conf['soledad-server']
state = CouchServerState(conf['couch_url'], create_cmd=conf['create_cmd'])
+ SoledadBackend.BATCH_SUPPORT = conf['batching']
# WSGI application that may be used by `twistd -web`
application = GzipMiddleware(
SoledadTokenAuthMiddleware(SoledadApp(state)))
diff --git a/server/src/leap/soledad/server/sync.py b/server/src/leap/soledad/server/sync.py
index db25c406..96f65912 100644
--- a/server/src/leap/soledad/server/sync.py
+++ b/server/src/leap/soledad/server/sync.py
@@ -112,6 +112,14 @@ class SyncExchange(sync.SyncExchange):
doc = self._db.get_doc(changed_doc_id, include_deleted=True)
return_doc_cb(doc, gen, trans_id)
+ def batched_insert_from_source(self, entries, sync_id):
+ self._db.batch_start()
+ for entry in entries:
+ doc, gen, trans_id, number_of_docs, doc_idx = entry
+ self.insert_doc_from_source(doc, gen, trans_id, number_of_docs,
+ doc_idx, sync_id)
+ self._db.batch_end()
+
def insert_doc_from_source(
self, doc, source_gen, trans_id,
number_of_docs=None, doc_idx=None, sync_id=None):
@@ -198,6 +206,7 @@ class SyncResource(http_app.SyncResource):
self.sync_exch = self.sync_exchange_class(
db, self.source_replica_uid, last_known_generation, sync_id)
self._sync_id = sync_id
+ self._staging = []
@http_app.http_method(content_as_args=True)
def post_put(
@@ -225,9 +234,7 @@ class SyncResource(http_app.SyncResource):
:type doc_idx: int
"""
doc = Document(id, rev, content)
- self.sync_exch.insert_doc_from_source(
- doc, gen, trans_id, number_of_docs=number_of_docs,
- doc_idx=doc_idx, sync_id=self._sync_id)
+ self._staging.append((doc, gen, trans_id, number_of_docs, doc_idx))
@http_app.http_method(received=int, content_as_args=True)
def post_get(self, received):
@@ -266,6 +273,7 @@ class SyncResource(http_app.SyncResource):
Return the current generation and transaction_id after inserting one
incoming document.
"""
+ self.sync_exch.batched_insert_from_source(self._staging, self._sync_id)
self.responder.content_type = 'application/x-soledad-sync-response'
self.responder.start_response(200)
self.responder.start_stream(),