summaryrefslogtreecommitdiff
path: root/common
diff options
context:
space:
mode:
Diffstat (limited to 'common')
-rw-r--r--common/src/leap/soledad/common/backend.py51
-rw-r--r--common/src/leap/soledad/common/couch/__init__.py100
-rw-r--r--common/src/leap/soledad/common/couch/support.py115
-rw-r--r--common/src/leap/soledad/common/errors.py3
-rw-r--r--common/src/leap/soledad/common/tests/fixture_soledad.conf1
-rw-r--r--common/src/leap/soledad/common/tests/test_encdecpool.py26
-rw-r--r--common/src/leap/soledad/common/tests/test_server.py33
7 files changed, 277 insertions, 52 deletions
diff --git a/common/src/leap/soledad/common/backend.py b/common/src/leap/soledad/common/backend.py
index deed5ac2..53426fb5 100644
--- a/common/src/leap/soledad/common/backend.py
+++ b/common/src/leap/soledad/common/backend.py
@@ -33,6 +33,7 @@ from leap.soledad.common.document import ServerDocument
class SoledadBackend(CommonBackend):
+ BATCH_SUPPORT = False
"""
A U1DB backend implementation.
@@ -53,9 +54,30 @@ class SoledadBackend(CommonBackend):
self._cache = None
self._dbname = database._dbname
self._database = database
+ self.batching = False
if replica_uid is not None:
self._set_replica_uid(replica_uid)
+ def batch_start(self):
+ if not self.BATCH_SUPPORT:
+ return
+ self.batching = True
+ self.after_batch_callbacks = {}
+ self._database.batch_start()
+ if not self._cache:
+ # batching needs cache
+ self._cache = {}
+ self._get_generation() # warm up gen info
+
+ def batch_end(self):
+ if not self.BATCH_SUPPORT:
+ return
+ self.batching = False
+ self._database.batch_end()
+ for name in self.after_batch_callbacks:
+ self.after_batch_callbacks[name]()
+ self.after_batch_callbacks = None
+
@property
def cache(self):
if self._cache is not None:
@@ -154,11 +176,7 @@ class SoledadBackend(CommonBackend):
:raise SoledadError: Raised by database on operation failure
"""
- if self.replica_uid + '_gen' in self.cache:
- response = self.cache[self.replica_uid + '_gen']
- return response
cur_gen, newest_trans_id = self._database.get_generation_info()
- self.cache[self.replica_uid + '_gen'] = (cur_gen, newest_trans_id)
return (cur_gen, newest_trans_id)
def _get_trans_id_for_gen(self, generation):
@@ -253,14 +271,8 @@ class SoledadBackend(CommonBackend):
:param doc: The document to be put.
:type doc: ServerDocument
"""
- last_transaction =\
- self._database.save_document(old_doc, doc,
- self._allocate_transaction_id())
- if self.replica_uid + '_gen' in self.cache:
- gen, trans = self.cache[self.replica_uid + '_gen']
- gen += 1
- trans = last_transaction
- self.cache[self.replica_uid + '_gen'] = (gen, trans)
+ self._database.save_document(old_doc, doc,
+ self._allocate_transaction_id())
def put_doc(self, doc):
"""
@@ -383,7 +395,10 @@ class SoledadBackend(CommonBackend):
"""
if other_replica_uid in self.cache:
return self.cache[other_replica_uid]
- return self._database.get_replica_gen_and_trans_id(other_replica_uid)
+ gen, trans_id = \
+ self._database.get_replica_gen_and_trans_id(other_replica_uid)
+ self.cache[other_replica_uid] = (gen, trans_id)
+ return (gen, trans_id)
def _set_replica_gen_and_trans_id(self, other_replica_uid,
other_generation, other_transaction_id):
@@ -423,9 +438,13 @@ class SoledadBackend(CommonBackend):
generation.
:type other_transaction_id: str
"""
- self._set_replica_gen_and_trans_id(other_replica_uid,
- other_generation,
- other_transaction_id)
+ function = self._set_replica_gen_and_trans_id
+ args = [other_replica_uid, other_generation, other_transaction_id]
+ callback = lambda: function(*args)
+ if self.batching:
+ self.after_batch_callbacks['set_source_info'] = callback
+ else:
+ callback()
def _force_doc_sync_conflict(self, doc):
"""
diff --git a/common/src/leap/soledad/common/couch/__init__.py b/common/src/leap/soledad/common/couch/__init__.py
index bd8b08b7..dae460cb 100644
--- a/common/src/leap/soledad/common/couch/__init__.py
+++ b/common/src/leap/soledad/common/couch/__init__.py
@@ -33,7 +33,6 @@ from multiprocessing.pool import ThreadPool
from couchdb.client import Server, Database
-from couchdb.multipart import MultipartWriter
from couchdb.http import (
ResourceConflict,
ResourceNotFound,
@@ -53,6 +52,7 @@ from u1db.remote import http_app
from leap.soledad.common import ddocs
from .errors import raise_server_error
from .errors import raise_missing_design_doc_error
+from .support import MultipartWriter
from leap.soledad.common.errors import InvalidURLError
from leap.soledad.common.document import ServerDocument
from leap.soledad.common.backend import SoledadBackend
@@ -153,10 +153,24 @@ class CouchDatabase(object):
self._url = url
self._dbname = dbname
self._database = self.get_couch_database(url, dbname)
+ self.batching = False
+ self.batch_generation = None
+ self.batch_docs = {}
if ensure_ddocs:
self.ensure_ddocs_on_db()
self.ensure_security_ddoc(database_security)
+ def batch_start(self):
+ self.batching = True
+ self.batch_generation = self.get_generation_info()
+ ids = set(row.id for row in self._database.view('_all_docs'))
+ self.batched_ids = ids
+
+ def batch_end(self):
+ self.batching = False
+ self.batch_generation = None
+ self.__perform_batch()
+
def get_couch_database(self, url, dbname):
"""
Generate a couchdb.Database instance given a url and dbname.
@@ -185,7 +199,8 @@ class CouchDatabase(object):
"""
for ddoc_name in ['docs', 'syncs', 'transactions']:
try:
- self.json_from_resource(['_design', ddoc_name, '_info'],
+ self.json_from_resource(['_design'] +
+ ddoc_name.split('/') + ['_info'],
check_missing_ddoc=False)
except ResourceNotFound:
ddoc = json.loads(
@@ -337,13 +352,49 @@ class CouchDatabase(object):
:return: The document.
:rtype: ServerDocument
"""
- # get document with all attachments (u1db content and eventual
- # conflicts)
+ doc_from_batch = self.__check_batch_before_get(doc_id)
+ if doc_from_batch:
+ return doc_from_batch
+ if self.batching and doc_id not in self.batched_ids:
+ return None
if doc_id not in self._database:
return None
+ # get document with all attachments (u1db content and eventual
+ # conflicts)
result = self.json_from_resource([doc_id], attachments=True)
return self.__parse_doc_from_couch(result, doc_id, check_for_conflicts)
+ def __check_batch_before_get(self, doc_id):
+ """
+ If doc_id is staged for batching, then we need to commit the batch
+ before going ahead. This avoids consistency problems, like trying to
+ get a document that isn't persisted and processing like it is missing.
+
+ :param doc_id: The unique document identifier
+ :type doc_id: str
+ """
+ if doc_id in self.batch_docs:
+ couch_doc = self.batch_docs[doc_id]
+ rev = self.__perform_batch(doc_id)
+ couch_doc['_rev'] = rev
+ self.batched_ids.add(doc_id)
+ return self.__parse_doc_from_couch(couch_doc, doc_id, True)
+ return None
+
+ def __perform_batch(self, doc_id=None):
+ status = self._database.update(self.batch_docs.values())
+ rev = None
+ for ok, stored_doc_id, rev_or_error in status:
+ if not ok:
+ error = rev_or_error
+ if type(error) is ResourceConflict:
+ raise RevisionConflict
+ raise error
+ elif doc_id == stored_doc_id:
+ rev = rev_or_error
+ self.batch_docs.clear()
+ return rev
+
def __parse_doc_from_couch(self, result, doc_id,
check_for_conflicts=False):
# restrict to u1db documents
@@ -571,6 +622,8 @@ class CouchDatabase(object):
:return: A tuple containing the current generation and transaction id.
:rtype: (int, str)
"""
+ if self.batching and self.batch_generation:
+ return self.batch_generation
# query a couch list function
ddoc_path = ['_design', 'transactions', '_list', 'generation', 'log']
info = self.json_from_resource(ddoc_path)
@@ -691,20 +744,31 @@ class CouchDatabase(object):
if old_doc is not None and hasattr(old_doc, 'couch_rev'):
couch_doc['_rev'] = old_doc.couch_rev
# prepare the multipart PUT
- buf = StringIO()
- headers = {}
- envelope = MultipartWriter(buf, headers=headers, subtype='related')
- envelope.add('application/json', json.dumps(couch_doc))
- for part in parts:
- envelope.add('application/octet-stream', part)
- envelope.close()
- # try to save and fail if there's a revision conflict
- try:
- resource = self._new_resource()
- resource.put_json(
- doc.doc_id, body=str(buf.getvalue()), headers=headers)
- except ResourceConflict:
- raise RevisionConflict()
+ if not self.batching:
+ buf = StringIO()
+ envelope = MultipartWriter(buf)
+ envelope.add('application/json', json.dumps(couch_doc))
+ for part in parts:
+ envelope.add('application/octet-stream', part)
+ envelope.close()
+ # try to save and fail if there's a revision conflict
+ try:
+ resource = self._new_resource()
+ resource.put_json(
+ doc.doc_id, body=str(buf.getvalue()),
+ headers=envelope.headers)
+ except ResourceConflict:
+ raise RevisionConflict()
+ else:
+ for name, attachment in attachments.items():
+ del attachment['follows']
+ del attachment['length']
+ index = 0 if name is 'u1db_content' else 1
+ attachment['data'] = binascii.b2a_base64(parts[index]).strip()
+ couch_doc['_attachments'] = attachments
+ self.batch_docs[doc.doc_id] = couch_doc
+ last_gen, last_trans_id = self.batch_generation
+ self.batch_generation = (last_gen + 1, transaction_id)
return transactions[-1][1]
def _new_resource(self, *path):
diff --git a/common/src/leap/soledad/common/couch/support.py b/common/src/leap/soledad/common/couch/support.py
new file mode 100644
index 00000000..bfc4fef6
--- /dev/null
+++ b/common/src/leap/soledad/common/couch/support.py
@@ -0,0 +1,115 @@
+# -*- coding: utf-8 -*-
+# support.py
+# Copyright (C) 2015 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import sys
+
+
+"""
+Monkey patches and temporary code that may be removed with version changes.
+"""
+
+
+# for bigcouch
+# TODO: Remove if bigcouch support is dropped
+class MultipartWriter(object):
+
+ """
+ A multipart writer adapted from python-couchdb's one so we can PUT
+ documents using couch's multipart PUT.
+
+ This stripped down version does not allow for nested structures, and
+ contains only the essential things we need to PUT SoledadDocuments to the
+ couch backend. Also, please note that this is a patch. The couchdb lib has
+ another implementation that works fine with CouchDB 1.6, but removing this
+ now will break compatibility with bigcouch.
+ """
+
+ CRLF = '\r\n'
+
+ def __init__(self, fileobj, headers=None, boundary=None):
+ """
+ Initialize the multipart writer.
+ """
+ self.fileobj = fileobj
+ if boundary is None:
+ boundary = self._make_boundary()
+ self._boundary = boundary
+ self._build_headers('related', headers)
+
+ def add(self, mimetype, content, headers={}):
+ """
+ Add a part to the multipart stream.
+ """
+ self.fileobj.write('--')
+ self.fileobj.write(self._boundary)
+ self.fileobj.write(self.CRLF)
+ headers['Content-Type'] = mimetype
+ self._write_headers(headers)
+ if content:
+ # XXX: throw an exception if a boundary appears in the content??
+ self.fileobj.write(content)
+ self.fileobj.write(self.CRLF)
+
+ def close(self):
+ """
+ Close the multipart stream.
+ """
+ self.fileobj.write('--')
+ self.fileobj.write(self._boundary)
+ # be careful not to have anything after '--', otherwise old couch
+ # versions (including bigcouch) will fail.
+ self.fileobj.write('--')
+
+ def _make_boundary(self):
+ """
+ Create a boundary to discern multi parts.
+ """
+ try:
+ from uuid import uuid4
+ return '==' + uuid4().hex + '=='
+ except ImportError:
+ from random import randrange
+ token = randrange(sys.maxint)
+ format = '%%0%dd' % len(repr(sys.maxint - 1))
+ return '===============' + (format % token) + '=='
+
+ def _write_headers(self, headers):
+ """
+ Write a part header in the buffer stream.
+ """
+ if headers:
+ for name in sorted(headers.keys()):
+ value = headers[name]
+ self.fileobj.write(name)
+ self.fileobj.write(': ')
+ self.fileobj.write(value)
+ self.fileobj.write(self.CRLF)
+ self.fileobj.write(self.CRLF)
+
+ def _build_headers(self, subtype, headers):
+ """
+ Build the main headers of the multipart stream.
+
+ This is here so we can send headers separete from content using
+ python-couchdb API.
+ """
+ self.headers = {}
+ self.headers['Content-Type'] = 'multipart/%s; boundary="%s"' % \
+ (subtype, self._boundary)
+ if headers:
+ for name in sorted(headers.keys()):
+ value = headers[name]
+ self.headers[name] = value
diff --git a/common/src/leap/soledad/common/errors.py b/common/src/leap/soledad/common/errors.py
index 7a8a8929..0b6bb4e6 100644
--- a/common/src/leap/soledad/common/errors.py
+++ b/common/src/leap/soledad/common/errors.py
@@ -145,9 +145,10 @@ class InvalidURLError(Exception):
"""
-@register_exception
class BackendNotReadyError(SoledadError):
"""
Generic exception raised when the backend is not ready to dispatch a client
request.
"""
+ wire_description = "backend not ready"
+ status = 500
diff --git a/common/src/leap/soledad/common/tests/fixture_soledad.conf b/common/src/leap/soledad/common/tests/fixture_soledad.conf
index c0ffacf6..8d8161c3 100644
--- a/common/src/leap/soledad/common/tests/fixture_soledad.conf
+++ b/common/src/leap/soledad/common/tests/fixture_soledad.conf
@@ -2,6 +2,7 @@
couch_url = http://soledad:passwd@localhost:5984
create_cmd = sudo -u soledad-admin /usr/bin/create-user-db
admin_netrc = /etc/couchdb/couchdb-soledad-admin.netrc
+batching = 0
[database-security]
members = user1, user2
diff --git a/common/src/leap/soledad/common/tests/test_encdecpool.py b/common/src/leap/soledad/common/tests/test_encdecpool.py
index 793bfa1a..694eb7ad 100644
--- a/common/src/leap/soledad/common/tests/test_encdecpool.py
+++ b/common/src/leap/soledad/common/tests/test_encdecpool.py
@@ -18,6 +18,7 @@
Tests for encryption and decryption pool.
"""
import json
+from random import shuffle
from twisted.internet.defer import inlineCallbacks
@@ -171,20 +172,21 @@ class TestSyncDecrypterPool(BaseSoledadTest):
DOC_ID, DOC_REV, encrypted_content, 1, "trans_id", 1)
def _assert_doc_was_decrypted_and_inserted(_):
+ self.assertEqual(1, len(self._inserted_docs))
self.assertEqual(self._inserted_docs, [(doc, 1, u"trans_id")])
self._pool.deferred.addCallback(
_assert_doc_was_decrypted_and_inserted)
return self._pool.deferred
- def test_insert_encrypted_received_doc_many(self):
+ def test_insert_encrypted_received_doc_many(self, many=100):
"""
Test that many encrypted documents added to the pool are decrypted and
inserted using the callback.
"""
crypto = self._soledad._crypto
- many = 100
self._pool.start(many)
+ docs = []
# insert many encrypted docs in the pool
for i in xrange(many):
@@ -198,9 +200,12 @@ class TestSyncDecrypterPool(BaseSoledadTest):
doc_id=doc_id, rev=rev, json=json.dumps(content))
encrypted_content = json.loads(crypto.encrypt_doc(doc))
+ docs.append((doc_id, rev, encrypted_content, gen,
+ trans_id, idx))
+ shuffle(docs)
- self._pool.insert_encrypted_received_doc(
- doc_id, rev, encrypted_content, gen, trans_id, idx)
+ for doc in docs:
+ self._pool.insert_encrypted_received_doc(*doc)
def _assert_docs_were_decrypted_and_inserted(_):
self.assertEqual(many, len(self._inserted_docs))
@@ -223,3 +228,16 @@ class TestSyncDecrypterPool(BaseSoledadTest):
self._pool.deferred.addCallback(
_assert_docs_were_decrypted_and_inserted)
return self._pool.deferred
+
+ @inlineCallbacks
+ def test_pool_reuse(self):
+ """
+ The pool is reused between syncs, this test verifies that
+ reusing is fine.
+ """
+ for i in xrange(3):
+ yield self.test_insert_encrypted_received_doc_many(5)
+ self._inserted_docs = []
+ decrypted_docs = yield self._pool._get_docs(encrypted=False)
+ # check that decrypted docs staging is clean
+ self.assertEquals([], decrypted_docs)
diff --git a/common/src/leap/soledad/common/tests/test_server.py b/common/src/leap/soledad/common/tests/test_server.py
index e1129a9f..20fe8579 100644
--- a/common/src/leap/soledad/common/tests/test_server.py
+++ b/common/src/leap/soledad/common/tests/test_server.py
@@ -640,18 +640,25 @@ class ConfigurationParsingTest(unittest.TestCase):
config = load_configuration(config_path)
# then
- expected = {'soledad-server': {
- 'couch_url':
- 'http://soledad:passwd@localhost:5984',
- 'create_cmd':
- 'sudo -u soledad-admin /usr/bin/create-user-db',
- 'admin_netrc':
- '/etc/couchdb/couchdb-soledad-admin.netrc',
- },
- 'database-security': {
- 'members': ['user1', 'user2'],
+ expected = {'members': ['user1', 'user2'],
'members_roles': ['role1', 'role2'],
'admins': ['user3', 'user4'],
- 'admins_roles': ['role3', 'role3'],
- }}
- self.assertDictEqual(expected, config)
+ 'admins_roles': ['role3', 'role3']}
+ self.assertDictEqual(expected, config['database-security'])
+
+ def test_server_values_configuration(self):
+ # given
+ config_path = resource_filename('leap.soledad.common.tests',
+ 'fixture_soledad.conf')
+ # when
+ config = load_configuration(config_path)
+
+ # then
+ expected = {'couch_url':
+ 'http://soledad:passwd@localhost:5984',
+ 'create_cmd':
+ 'sudo -u soledad-admin /usr/bin/create-user-db',
+ 'admin_netrc':
+ '/etc/couchdb/couchdb-soledad-admin.netrc',
+ 'batching': False}
+ self.assertDictEqual(expected, config['soledad-server'])