summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--CHANGELOG66
-rw-r--r--client/changes/bug_reuse-http-connection2
-rw-r--r--client/changes/bug_unlock_shared_if_fails2
-rw-r--r--client/changes/feature_4616_sqlite_count_by_index1
-rw-r--r--client/src/leap/soledad/client/__init__.py68
-rw-r--r--client/src/leap/soledad/client/sqlcipher.py112
-rw-r--r--common/changes/bug_4475_remodel-couch-backend2
-rw-r--r--common/src/leap/soledad/common/couch.py863
-rw-r--r--common/src/leap/soledad/common/ddocs/docs/updates/put.js64
-rw-r--r--common/src/leap/soledad/common/ddocs/docs/updates/resolve_doc.js39
-rw-r--r--common/src/leap/soledad/common/errors.py132
-rw-r--r--common/src/leap/soledad/common/tests/couchdb.ini.template2
-rw-r--r--common/src/leap/soledad/common/tests/test_couch.py214
-rw-r--r--common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py50
-rw-r--r--common/src/leap/soledad/common/tests/test_server.py127
-rw-r--r--docs/sphinx/client.rst44
-rw-r--r--docs/sphinx/common.rst38
-rw-r--r--docs/sphinx/conf.py266
-rw-r--r--docs/sphinx/index.rst31
-rw-r--r--docs/sphinx/server.rst27
-rw-r--r--scripts/README.rst13
-rwxr-xr-xscripts/backends_cpu_usage/log_cpu_usage.py46
-rw-r--r--scripts/backends_cpu_usage/movingaverage.py209
-rwxr-xr-xscripts/backends_cpu_usage/plot.py81
-rwxr-xr-xscripts/backends_cpu_usage/test_u1db_sync.py113
-rwxr-xr-xscripts/build_debian_package.sh32
-rw-r--r--scripts/client-side-db.py36
-rw-r--r--scripts/db_access/client_side_db.py154
-rw-r--r--scripts/db_access/reset_db.py79
-rw-r--r--scripts/db_access/server_side_db.py (renamed from scripts/server-side-db.py)4
-rwxr-xr-xscripts/doc_put_memory_usage/find_max_upload_size.py169
-rwxr-xr-xscripts/doc_put_memory_usage/get-mem.py16
-rwxr-xr-xscripts/doc_put_memory_usage/plot-mem.py73
-rw-r--r--scripts/migrate_dbs.py288
-rw-r--r--scripts/profiling/sync/sync-many.py125
-rw-r--r--scripts/update_design_docs.py147
-rw-r--r--server/changes/VERSION_COMPAT0
-rw-r--r--server/changes/feature_enable-gzip1
-rw-r--r--server/src/leap/soledad/server/__init__.py213
-rw-r--r--server/src/leap/soledad/server/auth.py63
-rw-r--r--server/src/leap/soledad/server/lock_resource.py232
41 files changed, 3266 insertions, 978 deletions
diff --git a/CHANGELOG b/CHANGELOG
index 83bb883e..a1876ef2 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,3 +1,69 @@
+0.4.5 Apr 4:
+Client:
+ o Catch lock timeout exception. Fixes #4435.
+ o Add lock for create_doc and update_indexes call, prevents
+ concurrent access to the db. Closes #5139.
+ o Back-compatibility for socket.create_connection interface in
+ 2.6. Closes #5208.
+ o Always return unicode in helper method, even on
+ UnicodeError. Related to #4998.
+ o Fix a bug in soledad.client.sqlcipher by which we were creating a
+ new connection for each sync.
+ o Unlock shared_db if anything fails in the bootstrap
+ sequence. Fixes #4702.
+ o Avoid concurrent syncs for the same account, but allow for
+ distinct accounts. Fixes #4451.
+ o Adds a get_count_by_index to sqlcipher u1db backend. Related to:
+ #4616.
+ o Do not autocreate remote user database when syncing. Tapicero
+ should make sure that that db is created when the user is
+ created. Closes #5302.
+ o Add a read-write lock for all client operations. Addresses: #4972
+ o Add sync=off and tem_store=mem to soledad client, for
+ optimization.
+
+Common:
+ o Add lock timeout HTTP error. Fixes #4435.
+ o Remodel couch backend to fix concurrency and scalability. Closes
+ #4475, #4682, #4683 and #4680.
+ o Remove check for design docs on couch server state initialization
+ Closes #5387.
+ o Renew HTTP session after multipart PUTs to avoid request hanging.
+ Fixes #5449.
+ o Preload time.strptime() to avoid multi-threaded problem on couch
+ backend get_docs() method. Fixes #5449.
+ o Improve error messages. Closes #5035.
+ o Add MissingTokenError and InvalidTokenError as sub exceptions
+ from Unauthorized.
+ o Allow sync of large files (~100MB). Closes #4836.
+ o Add exceptions to deal with missing design documents. Fixes #4994.
+ o Parallelize get_docs() on couch backend to accelerate sync.
+ Closes #5008.
+ o Use less memory when putting docs on couch. Fixes #5011.
+ o Prevent CouchServerState from creating or deleting databases. This
+ way, Soledad remote clients won't ever be able to do these
+ operations when syncing. Part of #5302.
+ o Avoid concurrent syncs problem by adding a lock for PUTting to the
+ sync log update handler. Fixes #5388.
+ o Remove check for couch permissions when CouchServerState is
+ instantiated. This is not necessary anymore because platform
+ takes care of giving the soledad user enough permissions and
+ tapicero takes care of uploading the needed design documents.
+
+Server:
+ o Send propper lock timeout response. Fixes #4435.
+ o Fix raising of auth token errors. Fixes #5191.
+ o Allow sync of large files (~100MB). Closes #4836.
+ o Use a temporary directory for server side locks. Fixes #4918.
+ o Catch couchdb.http.ResourceNotFound exceptions when accessing
+ design documents on couch backend, and raise appropriate missing
+ design documents exceptions. Fixes #4994.
+ o Do not try to create the shared database when running the Soledad
+ Server application. Fixes #5302.
+ o Enable Gzip compression on the soledad wsgi app.
+
+-- 2014 --
+
0.4.4 Dec 6:
Client:
o Add MAC verirication to the recovery document and
diff --git a/client/changes/bug_reuse-http-connection b/client/changes/bug_reuse-http-connection
deleted file mode 100644
index c6cdd9b4..00000000
--- a/client/changes/bug_reuse-http-connection
+++ /dev/null
@@ -1,2 +0,0 @@
- o Fix a bug in soledad.client.sqlcipher by which we were creating
- a new connection for each sync.
diff --git a/client/changes/bug_unlock_shared_if_fails b/client/changes/bug_unlock_shared_if_fails
deleted file mode 100644
index fc5716e4..00000000
--- a/client/changes/bug_unlock_shared_if_fails
+++ /dev/null
@@ -1,2 +0,0 @@
- o Unlock shared_db if anything fails in the bootstrap
- sequence. Fixes #4702. \ No newline at end of file
diff --git a/client/changes/feature_4616_sqlite_count_by_index b/client/changes/feature_4616_sqlite_count_by_index
deleted file mode 100644
index c7819d38..00000000
--- a/client/changes/feature_4616_sqlite_count_by_index
+++ /dev/null
@@ -1 +0,0 @@
- o Adds a get_count_by_index to sqlcipher u1db backend. Related to: #4616
diff --git a/client/src/leap/soledad/client/__init__.py b/client/src/leap/soledad/client/__init__.py
index a0b3f45a..46e3cd5f 100644
--- a/client/src/leap/soledad/client/__init__.py
+++ b/client/src/leap/soledad/client/__init__.py
@@ -34,6 +34,8 @@ import urlparse
import hmac
from hashlib import sha256
+from threading import Lock
+from collections import defaultdict
try:
import cchardet as chardet
@@ -52,6 +54,7 @@ from leap.soledad.common.errors import (
InvalidTokenError,
NotLockedError,
AlreadyLockedError,
+ LockTimedOutError,
)
from leap.soledad.common.crypto import (
MacMethods,
@@ -245,6 +248,12 @@ class Soledad(object):
Prefix for default values for path.
"""
+ syncing_lock = defaultdict(Lock)
+ """
+ A dictionary that hold locks which avoid multiple sync attempts from the
+ same database replica.
+ """
+
def __init__(self, uuid, passphrase, secrets_path, local_db_path,
server_url, cert_file, auth_token=None, secret_id=None):
"""
@@ -402,6 +411,8 @@ class Soledad(object):
token, timeout = self._shared_db.lock()
except AlreadyLockedError:
raise BootstrapSequenceError('Database is already locked.')
+ except LockTimedOutError:
+ raise BootstrapSequenceError('Lock operation timed out.')
try:
self._get_or_gen_crypto_secrets()
@@ -767,7 +778,7 @@ class Soledad(object):
============================== WARNING ==============================
This method converts the document's contents to unicode in-place. This
- meanse that after calling C{put_doc(doc)}, the contents of the
+ means that after calling C{put_doc(doc)}, the contents of the
document, i.e. C{doc.content}, might be different from before the
call.
============================== WARNING ==============================
@@ -824,9 +835,9 @@ class Soledad(object):
in matching doc_ids order.
:rtype: generator
"""
- return self._db.get_docs(doc_ids,
- check_for_conflicts=check_for_conflicts,
- include_deleted=include_deleted)
+ return self._db.get_docs(
+ doc_ids, check_for_conflicts=check_for_conflicts,
+ include_deleted=include_deleted)
def get_all_docs(self, include_deleted=False):
"""Get the JSON content for all documents in the database.
@@ -842,7 +853,7 @@ class Soledad(object):
def _convert_to_unicode(self, content):
"""
- Converts content to utf8 (or all the strings in content)
+ Converts content to unicode (or all the strings in content)
NOTE: Even though this method supports any type, it will
currently ignore contents of lists, tuple or any other
@@ -857,13 +868,14 @@ class Soledad(object):
if isinstance(content, unicode):
return content
elif isinstance(content, str):
+ result = chardet.detect(content)
+ default = "utf-8"
+ encoding = result["encoding"] or default
try:
- result = chardet.detect(content)
- default = "utf-8"
- encoding = result["encoding"] or default
content = content.decode(encoding)
- except UnicodeError:
- pass
+ except UnicodeError as e:
+ logger.error("Unicode error: {0!r}. Using 'replace'".format(e))
+ content = content.decode(encoding, 'replace')
return content
else:
if isinstance(content, dict):
@@ -928,7 +940,8 @@ class Soledad(object):
"number(fieldname, width)", "lower(fieldname)"
"""
if self._db:
- return self._db.create_index(index_name, *index_expressions)
+ return self._db.create_index(
+ index_name, *index_expressions)
def delete_index(self, index_name):
"""
@@ -1063,6 +1076,9 @@ class Soledad(object):
"""
Synchronize the local encrypted replica with a remote replica.
+ This method blocks until a syncing lock is acquired, so there are no
+ attempts of concurrent syncs from the same client replica.
+
:param url: the url of the target replica to sync with
:type url: str
@@ -1071,11 +1087,13 @@ class Soledad(object):
:rtype: str
"""
if self._db:
- local_gen = self._db.sync(
- urlparse.urljoin(self.server_url, 'user-%s' % self._uuid),
- creds=self._creds, autocreate=True)
- signal(SOLEDAD_DONE_DATA_SYNC, self._uuid)
- return local_gen
+ # acquire lock before attempt to sync
+ with Soledad.syncing_lock[self._db._get_replica_uid()]:
+ local_gen = self._db.sync(
+ urlparse.urljoin(self.server_url, 'user-%s' % self._uuid),
+ creds=self._creds, autocreate=False)
+ signal(SOLEDAD_DONE_DATA_SYNC, self._uuid)
+ return local_gen
def need_sync(self, url):
"""
@@ -1193,7 +1211,7 @@ class Soledad(object):
"""
soledad_assert(self.STORAGE_SECRETS_KEY in data)
# check mac of the recovery document
- mac_auth = False
+ #mac_auth = False # XXX ?
mac = None
if MAC_KEY in data:
soledad_assert(data[MAC_KEY] is not None)
@@ -1216,7 +1234,7 @@ class Soledad(object):
if mac != data[MAC_KEY]:
raise WrongMac('Could not authenticate recovery document\'s '
'contents.')
- mac_auth = True
+ #mac_auth = True # XXX ?
# include secrets in the secret pool.
secrets = 0
for secret_id, secret_data in data[self.STORAGE_SECRETS_KEY].items():
@@ -1293,9 +1311,17 @@ class VerifiedHTTPSConnection(httplib.HTTPSConnection):
# derived from httplib.py
def connect(self):
- "Connect to a host on a given (SSL) port."
- sock = socket.create_connection((self.host, self.port),
- SOLEDAD_TIMEOUT, self.source_address)
+ """
+ Connect to a host on a given (SSL) port.
+ """
+ try:
+ source = self.source_address
+ sock = socket.create_connection((self.host, self.port),
+ SOLEDAD_TIMEOUT, source)
+ except AttributeError:
+ # source_address was introduced in 2.7
+ sock = socket.create_connection((self.host, self.port),
+ SOLEDAD_TIMEOUT)
if self._tunnel_host:
self.sock = sock
self._tunnel()
diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py
index 43c871c3..3aea340d 100644
--- a/client/src/leap/soledad/client/sqlcipher.py
+++ b/client/src/leap/soledad/client/sqlcipher.py
@@ -91,10 +91,10 @@ def open(path, password, create=True, document_factory=None, crypto=None,
database does not already exist.
:param path: The filesystem path for the database to open.
- :param type: str
+ :type path: str
:param create: True/False, should the database be created if it doesn't
already exist?
- :param type: bool
+ :param create: bool
:param document_factory: A function that will be called with the same
parameters as Document.__init__.
:type document_factory: callable
@@ -147,26 +147,30 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
_index_storage_value = 'expand referenced encrypted'
k_lock = threading.Lock()
+ create_doc_lock = threading.Lock()
+ update_indexes_lock = threading.Lock()
_syncer = None
def __init__(self, sqlcipher_file, password, document_factory=None,
crypto=None, raw_key=False, cipher='aes-256-cbc',
kdf_iter=4000, cipher_page_size=1024):
"""
- Create a new sqlcipher file.
+ Connect to an existing SQLCipher database, creating a new sqlcipher
+ database file if needed.
:param sqlcipher_file: The path for the SQLCipher file.
:type sqlcipher_file: str
:param password: The password that protects the SQLCipher db.
:type password: str
:param document_factory: A function that will be called with the same
- parameters as Document.__init__.
+ parameters as Document.__init__.
:type document_factory: callable
:param crypto: An instance of SoledadCrypto so we can encrypt/decrypt
- document contents when syncing.
+ document contents when syncing.
:type crypto: soledad.crypto.SoledadCrypto
- :param raw_key: Whether C{password} is a raw 64-char hex string or a
- passphrase that should be hashed to obtain the encyrption key.
+ :param raw_key: Whether password is a raw 64-char hex string or a
+ passphrase that should be hashed to obtain the
+ encyrption key.
:type raw_key: bool
:param cipher: The cipher and mode to use.
:type cipher: str
@@ -190,6 +194,13 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
self._set_crypto_pragmas(
self._db_handle, password, raw_key, cipher, kdf_iter,
cipher_page_size)
+ if os.environ.get('LEAP_SQLITE_NOSYNC'):
+ self._pragma_synchronous_off(self._db_handle)
+ else:
+ self._pragma_synchronous_normal(self._db_handle)
+ if os.environ.get('LEAP_SQLITE_MEMSTORE'):
+ self._pragma_mem_temp_store(self._db_handle)
+ self._pragma_write_ahead_logging(self._db_handle)
self._real_replica_uid = None
self._ensure_schema()
self._crypto = crypto
@@ -396,6 +407,22 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
'ALTER TABLE document '
'ADD COLUMN syncable BOOL NOT NULL DEFAULT TRUE')
+ def create_doc(self, content, doc_id=None):
+ """
+ Create a new document in the local encrypted database.
+
+ :param content: the contents of the new document
+ :type content: dict
+ :param doc_id: an optional identifier specifying the document id
+ :type doc_id: str
+
+ :return: the new document
+ :rtype: SoledadDocument
+ """
+ with self.create_doc_lock:
+ return sqlite_backend.SQLitePartialExpandDatabase.create_doc(
+ self, content, doc_id=doc_id)
+
def _put_and_update_indexes(self, old_doc, doc):
"""
Update a document and all indexes related to it.
@@ -405,12 +432,13 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
:param doc: The new version of the document.
:type doc: u1db.Document
"""
- sqlite_backend.SQLitePartialExpandDatabase._put_and_update_indexes(
- self, old_doc, doc)
- c = self._db_handle.cursor()
- c.execute('UPDATE document SET syncable=? '
- 'WHERE doc_id=?',
- (doc.syncable, doc.doc_id))
+ with self.update_indexes_lock:
+ sqlite_backend.SQLitePartialExpandDatabase._put_and_update_indexes(
+ self, old_doc, doc)
+ c = self._db_handle.cursor()
+ c.execute('UPDATE document SET syncable=? '
+ 'WHERE doc_id=?',
+ (doc.syncable, doc.doc_id))
def _get_doc(self, doc_id, check_for_conflicts=False):
"""
@@ -734,6 +762,64 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
# XXX change passphrase param!
db_handle.cursor().execute('PRAGMA rekey = "x\'%s"' % passphrase)
+ @classmethod
+ def _pragma_synchronous_off(cls, db_handle):
+ """
+ Change the setting of the "synchronous" flag to OFF.
+ """
+ logger.debug("SQLCIPHER: SETTING SYNCHRONOUS OFF")
+ db_handle.cursor().execute('PRAGMA synchronous=OFF')
+
+ @classmethod
+ def _pragma_synchronous_normal(cls, db_handle):
+ """
+ Change the setting of the "synchronous" flag to NORMAL.
+ """
+ logger.debug("SQLCIPHER: SETTING SYNCHRONOUS NORMAL")
+ db_handle.cursor().execute('PRAGMA synchronous=NORMAL')
+
+ @classmethod
+ def _pragma_mem_temp_store(cls, db_handle):
+ """
+ Use a in-memory store for temporary tables.
+ """
+ logger.debug("SQLCIPHER: SETTING TEMP_STORE MEMORY")
+ db_handle.cursor().execute('PRAGMA temp_store=MEMORY')
+
+ @classmethod
+ def _pragma_write_ahead_logging(cls, db_handle):
+ """
+ Enable write-ahead logging, and set the autocheckpoint to 50 pages.
+
+ Setting the autocheckpoint to a small value, we make the reads not
+ suffer too much performance degradation.
+
+ From the sqlite docs:
+
+ "There is a tradeoff between average read performance and average write
+ performance. To maximize the read performance, one wants to keep the
+ WAL as small as possible and hence run checkpoints frequently, perhaps
+ as often as every COMMIT. To maximize write performance, one wants to
+ amortize the cost of each checkpoint over as many writes as possible,
+ meaning that one wants to run checkpoints infrequently and let the WAL
+ grow as large as possible before each checkpoint. The decision of how
+ often to run checkpoints may therefore vary from one application to
+ another depending on the relative read and write performance
+ requirements of the application. The default strategy is to run a
+ checkpoint once the WAL reaches 1000 pages"
+ """
+ logger.debug("SQLCIPHER: SETTING WRITE-AHEAD LOGGING")
+ db_handle.cursor().execute('PRAGMA journal_mode=WAL')
+ # The optimum value can still use a little bit of tuning, but we favor
+ # small sizes of the WAL file to get fast reads, since we assume that
+ # the writes will be quick enough to not block too much.
+
+ # TODO
+ # As a further improvement, we might want to set autocheckpoint to 0
+ # here and do the checkpoints manually in a separate thread, to avoid
+ # any blocks in the main thread (we should run a loopingcall from here)
+ db_handle.cursor().execute('PRAGMA wal_autocheckpoint=50')
+
# Extra query methods: extensions to the base sqlite implmentation.
def get_count_from_index(self, index_name, *key_values):
diff --git a/common/changes/bug_4475_remodel-couch-backend b/common/changes/bug_4475_remodel-couch-backend
deleted file mode 100644
index 13a1b121..00000000
--- a/common/changes/bug_4475_remodel-couch-backend
+++ /dev/null
@@ -1,2 +0,0 @@
- o Remodel couch backend to fix concurrency and scalability. Closes #4475,
- #4682, #4683 and #4680.
diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py
index d2414477..8e8613a1 100644
--- a/common/src/leap/soledad/common/couch.py
+++ b/common/src/leap/soledad/common/couch.py
@@ -24,23 +24,48 @@ import uuid
import logging
import binascii
import socket
+import time
+import sys
+import threading
+
+
+from StringIO import StringIO
+from collections import defaultdict
from couchdb.client import Server
-from couchdb.http import ResourceNotFound, Unauthorized
-from u1db import errors, query_parser, vectorclock
+from couchdb.http import (
+ ResourceConflict,
+ ResourceNotFound,
+ ServerError,
+ Session,
+)
+from u1db import query_parser, vectorclock
+from u1db.errors import (
+ DatabaseDoesNotExist,
+ InvalidGeneration,
+ RevisionConflict,
+ InvalidDocId,
+ ConflictedDoc,
+ DocumentDoesNotExist,
+ DocumentAlreadyDeleted,
+ Unauthorized,
+)
from u1db.backends import CommonBackend, CommonSyncTarget
from u1db.remote import http_app
from u1db.remote.server_state import ServerState
-from leap.soledad.common import USER_DB_PREFIX, ddocs
+from leap.soledad.common import USER_DB_PREFIX, ddocs, errors
from leap.soledad.common.document import SoledadDocument
logger = logging.getLogger(__name__)
+COUCH_TIMEOUT = 120 # timeout for transfers between Soledad server and Couch
+
+
class InvalidURLError(Exception):
"""
Exception raised when Soledad encounters a malformed URL.
@@ -75,9 +100,9 @@ class CouchDocument(SoledadDocument):
SoledadDocument.__init__(self, doc_id, rev, json, has_conflicts)
self._couch_rev = None
self._conflicts = None
- self._modified_conflicts = False
+ self._transactions = None
- def ensure_fetch_conflicts(self, get_conflicts_fun):
+ def _ensure_fetch_conflicts(self, get_conflicts_fun):
"""
Ensure conflict data has been fetched from the server.
@@ -100,6 +125,16 @@ class CouchDocument(SoledadDocument):
"""
return self._conflicts
+ def set_conflicts(self, conflicts):
+ """
+ Set the conflicted versions of the document.
+
+ :param conflicts: The conflicted versions of the document.
+ :type conflicts: list
+ """
+ self._conflicts = conflicts
+ self.has_conflicts = len(self._conflicts) > 0
+
def add_conflict(self, doc):
"""
Add a conflict to this document.
@@ -108,8 +143,7 @@ class CouchDocument(SoledadDocument):
:type doc: CouchDocument
"""
if self._conflicts is None:
- raise Exception("Run self.ensure_fetch_conflicts first!")
- self._modified_conflicts = True
+ raise Exception("Run self._ensure_fetch_conflicts first!")
self._conflicts.append(doc)
self.has_conflicts = len(self._conflicts) > 0
@@ -121,25 +155,13 @@ class CouchDocument(SoledadDocument):
:type conflict_revs: [str]
"""
if self._conflicts is None:
- raise Exception("Run self.ensure_fetch_conflicts first!")
+ raise Exception("Run self._ensure_fetch_conflicts first!")
conflicts_len = len(self._conflicts)
self._conflicts = filter(
lambda doc: doc.rev not in conflict_revs,
self._conflicts)
- if len(self._conflicts) < conflicts_len:
- self._modified_conflicts = True
self.has_conflicts = len(self._conflicts) > 0
- def modified_conflicts(self):
- """
- Return whether this document's conflicts have been modified.
-
- :return: Whether this document's conflicts have been modified.
- :rtype: bool
- """
- return self._conflicts is not None and \
- self._modified_conflicts is True
-
def _get_couch_rev(self):
return self._couch_rev
@@ -148,18 +170,217 @@ class CouchDocument(SoledadDocument):
couch_rev = property(_get_couch_rev, _set_couch_rev)
+ def _get_transactions(self):
+ return self._transactions
+
+ def _set_transactions(self, rev):
+ self._transactions = rev
+
+ transactions = property(_get_transactions, _set_transactions)
+
# monkey-patch the u1db http app to use CouchDocument
http_app.Document = CouchDocument
+def raise_missing_design_doc_error(exc, ddoc_path):
+ """
+ Raise an appropriate exception when catching a ResourceNotFound when
+ accessing a design document.
+
+ :param exc: The exception cought.
+ :type exc: ResourceNotFound
+ :param ddoc_path: A list representing the requested path.
+ :type ddoc_path: list
+
+ :raise MissingDesignDocError: Raised when tried to access a missing design
+ document.
+ :raise MissingDesignDocListFunctionError: Raised when trying to access a
+ missing list function on a
+ design document.
+ :raise MissingDesignDocNamedViewError: Raised when trying to access a
+ missing named view on a design
+ document.
+ :raise MissingDesignDocDeletedError: Raised when trying to access a
+ deleted design document.
+ :raise MissingDesignDocUnknownError: Raised when failed to access a design
+ document for an yet unknown reason.
+ """
+ path = "".join(ddoc_path)
+ if exc.message[1] == 'missing':
+ raise errors.MissingDesignDocError(path)
+ elif exc.message[1] == 'missing function' or \
+ exc.message[1].startswith('missing lists function'):
+ raise errors.MissingDesignDocListFunctionError(path)
+ elif exc.message[1] == 'missing_named_view':
+ raise errors.MissingDesignDocNamedViewError(path)
+ elif exc.message[1] == 'deleted':
+ raise errors.MissingDesignDocDeletedError(path)
+ # other errors are unknown for now
+ raise errors.DesignDocUnknownError("%s: %s" % (path, str(exc.message)))
+
+
+def raise_server_error(exc, ddoc_path):
+ """
+ Raise an appropriate exception when catching a ServerError when
+ accessing a design document.
+
+ :param exc: The exception cought.
+ :type exc: ResourceNotFound
+ :param ddoc_path: A list representing the requested path.
+ :type ddoc_path: list
+
+ :raise MissingDesignDocListFunctionError: Raised when trying to access a
+ missing list function on a
+ design document.
+ :raise MissingDesignDocUnknownError: Raised when failed to access a design
+ document for an yet unknown reason.
+ """
+ path = "".join(ddoc_path)
+ if exc.message[1][0] == 'unnamed_error':
+ raise errors.MissingDesignDocListFunctionError(path)
+ # other errors are unknown for now
+ raise errors.DesignDocUnknownError(path)
+
+
+class MultipartWriter(object):
+ """
+ A multipart writer adapted from python-couchdb's one so we can PUT
+ documents using couch's multipart PUT.
+
+ This stripped down version does not allow for nested structures, and
+ contains only the essential things we need to PUT SoledadDocuments to the
+ couch backend.
+ """
+
+ CRLF = '\r\n'
+
+ def __init__(self, fileobj, headers=None, boundary=None):
+ """
+ Initialize the multipart writer.
+ """
+ self.fileobj = fileobj
+ if boundary is None:
+ boundary = self._make_boundary()
+ self._boundary = boundary
+ self._build_headers('related', headers)
+
+ def add(self, mimetype, content, headers={}):
+ """
+ Add a part to the multipart stream.
+ """
+ self.fileobj.write('--')
+ self.fileobj.write(self._boundary)
+ self.fileobj.write(self.CRLF)
+ headers['Content-Type'] = mimetype
+ self._write_headers(headers)
+ if content:
+ # XXX: throw an exception if a boundary appears in the content??
+ self.fileobj.write(content)
+ self.fileobj.write(self.CRLF)
+
+ def close(self):
+ """
+ Close the multipart stream.
+ """
+ self.fileobj.write('--')
+ self.fileobj.write(self._boundary)
+ # be careful not to have anything after '--', otherwise old couch
+ # versions (including bigcouch) will fail.
+ self.fileobj.write('--')
+
+ def _make_boundary(self):
+ """
+ Create a boundary to discern multi parts.
+ """
+ try:
+ from uuid import uuid4
+ return '==' + uuid4().hex + '=='
+ except ImportError:
+ from random import randrange
+ token = randrange(sys.maxint)
+ format = '%%0%dd' % len(repr(sys.maxint - 1))
+ return '===============' + (format % token) + '=='
+
+ def _write_headers(self, headers):
+ """
+ Write a part header in the buffer stream.
+ """
+ if headers:
+ for name in sorted(headers.keys()):
+ value = headers[name]
+ self.fileobj.write(name)
+ self.fileobj.write(': ')
+ self.fileobj.write(value)
+ self.fileobj.write(self.CRLF)
+ self.fileobj.write(self.CRLF)
+
+ def _build_headers(self, subtype, headers):
+ """
+ Build the main headers of the multipart stream.
+
+ This is here so we can send headers separete from content using
+ python-couchdb API.
+ """
+ self.headers = {}
+ self.headers['Content-Type'] = 'multipart/%s; boundary="%s"' % \
+ (subtype, self._boundary)
+ if headers:
+ for name in sorted(headers.keys()):
+ value = headers[name]
+ self.headers[name] = value
+
+
class CouchDatabase(CommonBackend):
"""
A U1DB implementation that uses CouchDB as its persistence layer.
"""
+ # We spawn threads to parallelize the CouchDatabase.get_docs() method
+ MAX_GET_DOCS_THREADS = 20
+
+ update_handler_lock = defaultdict(threading.Lock)
+
+ class _GetDocThread(threading.Thread):
+ """
+ A thread that gets a document from a database.
+
+ TODO: switch this for a twisted deferred to thread. This depends on
+ replacing python-couchdb for paisley in this module.
+ """
+
+ def __init__(self, db, doc_id, check_for_conflicts,
+ release_fun):
+ """
+ :param db: The database from where to get the document.
+ :type db: u1db.Database
+ :param doc_id: The doc_id of the document to be retrieved.
+ :type doc_id: str
+ :param check_for_conflicts: Whether the get_doc() method should
+ check for existing conflicts.
+ :type check_for_conflicts: bool
+ :param release_fun: A function that releases a semaphore, to be
+ called after the document is fetched.
+ :type release_fun: function
+ """
+ threading.Thread.__init__(self)
+ self._db = db
+ self._doc_id = doc_id
+ self._check_for_conflicts = check_for_conflicts
+ self._release_fun = release_fun
+ self._doc = None
+
+ def run(self):
+ """
+ Fetch the document, store it as a property, and call the release
+ function.
+ """
+ self._doc = self._db._get_doc(
+ self._doc_id, self._check_for_conflicts)
+ self._release_fun()
+
@classmethod
- def open_database(cls, url, create):
+ def open_database(cls, url, create, ensure_ddocs=False):
"""
Open a U1DB database using CouchDB as backend.
@@ -167,6 +388,8 @@ class CouchDatabase(CommonBackend):
:type url: str
:param create: should the replica be created if it does not exist?
:type create: bool
+ :param ensure_ddocs: Ensure that the design docs exist on server.
+ :type ensure_ddocs: bool
:return: the database instance
:rtype: CouchDatabase
@@ -182,8 +405,8 @@ class CouchDatabase(CommonBackend):
server[dbname]
except ResourceNotFound:
if not create:
- raise errors.DatabaseDoesNotExist()
- return cls(url, dbname)
+ raise DatabaseDoesNotExist()
+ return cls(url, dbname, ensure_ddocs=ensure_ddocs)
def __init__(self, url, dbname, replica_uid=None, full_commit=True,
session=None, ensure_ddocs=True):
@@ -206,6 +429,8 @@ class CouchDatabase(CommonBackend):
# save params
self._url = url
self._full_commit = full_commit
+ if session is None:
+ session = Session(timeout=COUCH_TIMEOUT)
self._session = session
self._factory = CouchDocument
self._real_replica_uid = None
@@ -223,6 +448,9 @@ class CouchDatabase(CommonBackend):
self._set_replica_uid(replica_uid)
if ensure_ddocs:
self.ensure_ddocs_on_db()
+ # initialize a thread pool for parallelizing get_docs()
+ self._sem_pool = threading.BoundedSemaphore(
+ value=self.MAX_GET_DOCS_THREADS)
def ensure_ddocs_on_db(self):
"""
@@ -318,12 +546,31 @@ class CouchDatabase(CommonBackend):
:return: The current generation.
:rtype: int
+
+ :raise MissingDesignDocError: Raised when tried to access a missing
+ design document.
+ :raise MissingDesignDocListFunctionError: Raised when trying to access
+ a missing list function on a
+ design document.
+ :raise MissingDesignDocNamedViewError: Raised when trying to access a
+ missing named view on a design
+ document.
+ :raise MissingDesignDocDeletedError: Raised when trying to access a
+ deleted design document.
+ :raise MissingDesignDocUnknownError: Raised when failed to access a
+ design document for an yet
+ unknown reason.
"""
# query a couch list function
- res = self._database.resource(
- '_design', 'transactions', '_list', 'generation', 'log')
- response = res.get_json()
- return response[2]['generation']
+ ddoc_path = ['_design', 'transactions', '_list', 'generation', 'log']
+ res = self._database.resource(*ddoc_path)
+ try:
+ response = res.get_json()
+ return response[2]['generation']
+ except ResourceNotFound as e:
+ raise_missing_design_doc_error(e, ddoc_path)
+ except ServerError as e:
+ raise_server_error(e, ddoc_path)
def _get_generation_info(self):
"""
@@ -331,12 +578,31 @@ class CouchDatabase(CommonBackend):
:return: A tuple containing the current generation and transaction id.
:rtype: (int, str)
+
+ :raise MissingDesignDocError: Raised when tried to access a missing
+ design document.
+ :raise MissingDesignDocListFunctionError: Raised when trying to access
+ a missing list function on a
+ design document.
+ :raise MissingDesignDocNamedViewError: Raised when trying to access a
+ missing named view on a design
+ document.
+ :raise MissingDesignDocDeletedError: Raised when trying to access a
+ deleted design document.
+ :raise MissingDesignDocUnknownError: Raised when failed to access a
+ design document for an yet
+ unknown reason.
"""
# query a couch list function
- res = self._database.resource(
- '_design', 'transactions', '_list', 'generation', 'log')
- response = res.get_json()
- return (response[2]['generation'], response[2]['transaction_id'])
+ ddoc_path = ['_design', 'transactions', '_list', 'generation', 'log']
+ res = self._database.resource(*ddoc_path)
+ try:
+ response = res.get_json()
+ return (response[2]['generation'], response[2]['transaction_id'])
+ except ResourceNotFound as e:
+ raise_missing_design_doc_error(e, ddoc_path)
+ except ServerError as e:
+ raise_server_error(e, ddoc_path)
def _get_trans_id_for_gen(self, generation):
"""
@@ -349,16 +615,36 @@ class CouchDatabase(CommonBackend):
:rtype: str
:raise InvalidGeneration: Raised when the generation does not exist.
+ :raise MissingDesignDocError: Raised when tried to access a missing
+ design document.
+ :raise MissingDesignDocListFunctionError: Raised when trying to access
+ a missing list function on a
+ design document.
+ :raise MissingDesignDocNamedViewError: Raised when trying to access a
+ missing named view on a design
+ document.
+ :raise MissingDesignDocDeletedError: Raised when trying to access a
+ deleted design document.
+ :raise MissingDesignDocUnknownError: Raised when failed to access a
+ design document for an yet
+ unknown reason.
"""
if generation == 0:
return ''
# query a couch list function
- res = self._database.resource(
- '_design', 'transactions', '_list', 'trans_id_for_gen', 'log')
- response = res.get_json(gen=generation)
- if response[2] == {}:
- raise errors.InvalidGeneration
- return response[2]['transaction_id']
+ ddoc_path = [
+ '_design', 'transactions', '_list', 'trans_id_for_gen', 'log'
+ ]
+ res = self._database.resource(*ddoc_path)
+ try:
+ response = res.get_json(gen=generation)
+ if response[2] == {}:
+ raise InvalidGeneration
+ return response[2]['transaction_id']
+ except ResourceNotFound as e:
+ raise_missing_design_doc_error(e, ddoc_path)
+ except ServerError as e:
+ raise_server_error(e, ddoc_path)
def _get_transaction_log(self):
"""
@@ -366,12 +652,31 @@ class CouchDatabase(CommonBackend):
:return: The complete transaction log.
:rtype: [(str, str)]
+
+ :raise MissingDesignDocError: Raised when tried to access a missing
+ design document.
+ :raise MissingDesignDocListFunctionError: Raised when trying to access
+ a missing list function on a
+ design document.
+ :raise MissingDesignDocNamedViewError: Raised when trying to access a
+ missing named view on a design
+ document.
+ :raise MissingDesignDocDeletedError: Raised when trying to access a
+ deleted design document.
+ :raise MissingDesignDocUnknownError: Raised when failed to access a
+ design document for an yet
+ unknown reason.
"""
# query a couch view
- res = self._database.resource(
- '_design', 'transactions', '_view', 'log')
- response = res.get_json()
- return map(lambda row: (row['id'], row['value']), response[2]['rows'])
+ ddoc_path = ['_design', 'transactions', '_view', 'log']
+ res = self._database.resource(*ddoc_path)
+ try:
+ response = res.get_json()
+ return map(
+ lambda row: (row['id'], row['value']),
+ response[2]['rows'])
+ except ResourceNotFound as e:
+ raise_missing_design_doc_error(e, ddoc_path)
def _get_doc(self, doc_id, check_for_conflicts=False):
"""
@@ -413,8 +718,15 @@ class CouchDatabase(CommonBackend):
and '_attachments' in result \
and 'u1db_conflicts' in result['_attachments']:
doc.has_conflicts = True
+ doc.set_conflicts(
+ self._build_conflicts(
+ doc.doc_id,
+ json.loads(binascii.a2b_base64(
+ result['_attachments']['u1db_conflicts']['data']))))
# store couch revision
doc.couch_rev = result['_rev']
+ # store transactions
+ doc.transactions = result['u1db_transactions']
return doc
def get_doc(self, doc_id, include_deleted=False):
@@ -465,6 +777,10 @@ class CouchDatabase(CommonBackend):
"""
Put the document in the Couch backend database.
+ Note that C{old_doc} must have been fetched with the parameter
+ C{check_for_conflicts} equal to True, so we can properly update the
+ new document using the conflict information from the old one.
+
:param old_doc: The old document version.
:type old_doc: CouchDocument
:param doc: The document to be put.
@@ -472,43 +788,76 @@ class CouchDatabase(CommonBackend):
:raise RevisionConflict: Raised when trying to update a document but
couch revisions mismatch.
- """
- trans_id = self._allocate_transaction_id()
- # encode content
- content = doc.get_json()
- if content is not None:
- content = binascii.b2a_base64(content)[:-1] # exclude trailing \n
- # encode conflicts
- conflicts = None
- update_conflicts = doc.modified_conflicts()
- if update_conflicts is True:
- if doc.has_conflicts:
- conflicts = binascii.b2a_base64(
- json.dumps(
- map(lambda cdoc: (cdoc.rev, cdoc.content),
- doc.get_conflicts()))
- )[:-1] # exclude \n
- # perform the request
- resource = self._database.resource(
- '_design', 'docs', '_update', 'put', doc.doc_id)
- response = resource.put_json(
- body={
- 'couch_rev': old_doc.couch_rev
- if old_doc is not None else None,
- 'u1db_rev': doc.rev,
- 'content': content,
- 'trans_id': trans_id,
- 'conflicts': conflicts,
- 'update_conflicts': update_conflicts,
- },
- headers={'content-type': 'application/json'})
- # the document might have been updated in between, so we check for the
- # return message
- msg = response[2].read()
- if msg == 'ok':
- return
- elif msg == 'revision conflict':
- raise errors.RevisionConflict()
+ :raise MissingDesignDocError: Raised when tried to access a missing
+ design document.
+ :raise MissingDesignDocListFunctionError: Raised when trying to access
+ a missing list function on a
+ design document.
+ :raise MissingDesignDocNamedViewError: Raised when trying to access a
+ missing named view on a design
+ document.
+ :raise MissingDesignDocDeletedError: Raised when trying to access a
+ deleted design document.
+ :raise MissingDesignDocUnknownError: Raised when failed to access a
+ design document for an yet
+ unknown reason.
+ """
+ attachments = {} # we save content and conflicts as attachments
+ parts = [] # and we put it using couch's multipart PUT
+ # save content as attachment
+ if doc.is_tombstone() is False:
+ content = doc.get_json()
+ attachments['u1db_content'] = {
+ 'follows': True,
+ 'content_type': 'application/octet-stream',
+ 'length': len(content),
+ }
+ parts.append(content)
+ # save conflicts as attachment
+ if doc.has_conflicts is True:
+ conflicts = json.dumps(
+ map(lambda cdoc: (cdoc.rev, cdoc.content),
+ doc.get_conflicts()))
+ attachments['u1db_conflicts'] = {
+ 'follows': True,
+ 'content_type': 'application/octet-stream',
+ 'length': len(conflicts),
+ }
+ parts.append(conflicts)
+ # store old transactions, if any
+ transactions = old_doc.transactions[:] if old_doc is not None else []
+ # create a new transaction id and timestamp it so the transaction log
+ # is consistent when querying the database.
+ transactions.append(
+ # here we store milliseconds to keep consistent with javascript
+ # Date.prototype.getTime() which was used before inside a couchdb
+ # update handler.
+ (int(time.time() * 1000),
+ self._allocate_transaction_id()))
+ # build the couch document
+ couch_doc = {
+ '_id': doc.doc_id,
+ 'u1db_rev': doc.rev,
+ 'u1db_transactions': transactions,
+ '_attachments': attachments,
+ }
+ # if we are updating a doc we have to add the couch doc revision
+ if old_doc is not None:
+ couch_doc['_rev'] = old_doc.couch_rev
+ # prepare the multipart PUT
+ buf = StringIO()
+ envelope = MultipartWriter(buf)
+ envelope.add('application/json', json.dumps(couch_doc))
+ for part in parts:
+ envelope.add('application/octet-stream', part)
+ envelope.close()
+ # try to save and fail if there's a revision conflict
+ try:
+ self._database.resource.put_json(
+ doc.doc_id, body=buf.getvalue(), headers=envelope.headers)
+ self._renew_couch_session()
+ except ResourceConflict:
+ raise RevisionConflict()
def put_doc(self, doc):
"""
@@ -522,26 +871,26 @@ class CouchDatabase(CommonBackend):
:return: new_doc_rev - The new revision identifier for the document.
The Document object will also be updated.
- :raise errors.InvalidDocId: Raised if the document's id is invalid.
- :raise errors.DocumentTooBig: Raised if the document size is too big.
- :raise errors.ConflictedDoc: Raised if the document has conflicts.
+ :raise InvalidDocId: Raised if the document's id is invalid.
+ :raise DocumentTooBig: Raised if the document size is too big.
+ :raise ConflictedDoc: Raised if the document has conflicts.
"""
if doc.doc_id is None:
- raise errors.InvalidDocId()
+ raise InvalidDocId()
self._check_doc_id(doc.doc_id)
self._check_doc_size(doc)
old_doc = self._get_doc(doc.doc_id, check_for_conflicts=True)
if old_doc and old_doc.has_conflicts:
- raise errors.ConflictedDoc()
+ raise ConflictedDoc()
if old_doc and doc.rev is None and old_doc.is_tombstone():
new_rev = self._allocate_doc_rev(old_doc.rev)
else:
if old_doc is not None:
if old_doc.rev != doc.rev:
- raise errors.RevisionConflict()
+ raise RevisionConflict()
else:
if doc.rev is not None:
- raise errors.RevisionConflict()
+ raise RevisionConflict()
new_rev = self._allocate_doc_rev(doc.rev)
doc.rev = new_rev
self._put_doc(old_doc, doc)
@@ -563,32 +912,53 @@ class CouchDatabase(CommonBackend):
to the last intervening change and sorted by generation (old
changes first)
:rtype: (int, str, [(str, int, str)])
+
+ :raise MissingDesignDocError: Raised when tried to access a missing
+ design document.
+ :raise MissingDesignDocListFunctionError: Raised when trying to access
+ a missing list function on a
+ design document.
+ :raise MissingDesignDocNamedViewError: Raised when trying to access a
+ missing named view on a design
+ document.
+ :raise MissingDesignDocDeletedError: Raised when trying to access a
+ deleted design document.
+ :raise MissingDesignDocUnknownError: Raised when failed to access a
+ design document for an yet
+ unknown reason.
"""
# query a couch list function
- res = self._database.resource(
- '_design', 'transactions', '_list', 'whats_changed', 'log')
- response = res.get_json(old_gen=old_generation)
- results = map(
- lambda row:
- (row['generation'], row['doc_id'], row['transaction_id']),
- response[2]['transactions'])
- results.reverse()
- cur_gen = old_generation
- seen = set()
- changes = []
- newest_trans_id = ''
- for generation, doc_id, trans_id in results:
- if doc_id not in seen:
- changes.append((doc_id, generation, trans_id))
- seen.add(doc_id)
- if changes:
- cur_gen = changes[0][1] # max generation
- newest_trans_id = changes[0][2]
- changes.reverse()
- else:
- cur_gen, newest_trans_id = self._get_generation_info()
+ ddoc_path = [
+ '_design', 'transactions', '_list', 'whats_changed', 'log'
+ ]
+ res = self._database.resource(*ddoc_path)
+ try:
+ response = res.get_json(old_gen=old_generation)
+ results = map(
+ lambda row:
+ (row['generation'], row['doc_id'], row['transaction_id']),
+ response[2]['transactions'])
+ results.reverse()
+ cur_gen = old_generation
+ seen = set()
+ changes = []
+ newest_trans_id = ''
+ for generation, doc_id, trans_id in results:
+ if doc_id not in seen:
+ changes.append((doc_id, generation, trans_id))
+ seen.add(doc_id)
+ if changes:
+ cur_gen = changes[0][1] # max generation
+ newest_trans_id = changes[0][2]
+ changes.reverse()
+ else:
+ cur_gen, newest_trans_id = self._get_generation_info()
- return cur_gen, newest_trans_id, changes
+ return cur_gen, newest_trans_id, changes
+ except ResourceNotFound as e:
+ raise_missing_design_doc_error(e, ddoc_path)
+ except ServerError as e:
+ raise_server_error(e, ddoc_path)
def delete_doc(self, doc):
"""
@@ -600,28 +970,47 @@ class CouchDatabase(CommonBackend):
:param doc: The document to mark as deleted.
:type doc: CouchDocument.
- :raise errors.DocumentDoesNotExist: Raised if the document does not
+ :raise DocumentDoesNotExist: Raised if the document does not
exist.
- :raise errors.RevisionConflict: Raised if the revisions do not match.
- :raise errors.DocumentAlreadyDeleted: Raised if the document is
+ :raise RevisionConflict: Raised if the revisions do not match.
+ :raise DocumentAlreadyDeleted: Raised if the document is
already deleted.
- :raise errors.ConflictedDoc: Raised if the doc has conflicts.
+ :raise ConflictedDoc: Raised if the doc has conflicts.
"""
old_doc = self._get_doc(doc.doc_id, check_for_conflicts=True)
if old_doc is None:
- raise errors.DocumentDoesNotExist
+ raise DocumentDoesNotExist
if old_doc.rev != doc.rev:
- raise errors.RevisionConflict()
+ raise RevisionConflict()
if old_doc.is_tombstone():
- raise errors.DocumentAlreadyDeleted
+ raise DocumentAlreadyDeleted
if old_doc.has_conflicts:
- raise errors.ConflictedDoc()
+ raise ConflictedDoc()
new_rev = self._allocate_doc_rev(doc.rev)
doc.rev = new_rev
doc.make_tombstone()
self._put_doc(old_doc, doc)
return new_rev
+ def _build_conflicts(self, doc_id, attached_conflicts):
+ """
+ Build the conflicted documents list from the conflicts attachment
+ fetched from a couch document.
+
+ :param attached_conflicts: The document's conflicts as fetched from a
+ couch document attachment.
+ :type attached_conflicts: dict
+ """
+ conflicts = []
+ for doc_rev, content in attached_conflicts:
+ doc = self._factory(doc_id, doc_rev)
+ if content is None:
+ doc.make_tombstone()
+ else:
+ doc.content = content
+ conflicts.append(doc)
+ return conflicts
+
def _get_conflicts(self, doc_id, couch_rev=None):
"""
Get the conflicted versions of a document.
@@ -642,16 +1031,8 @@ class CouchDatabase(CommonBackend):
resource = self._database.resource(doc_id, 'u1db_conflicts')
try:
response = resource.get_json(**params)
- conflicts = []
- # build the conflicted versions
- for doc_rev, content in json.loads(response[2].read()):
- doc = self._factory(doc_id, doc_rev)
- if content is None:
- doc.make_tombstone()
- else:
- doc.content = content
- conflicts.append(doc)
- return conflicts
+ return self._build_conflicts(
+ doc_id, json.loads(response[2].read()))
except ResourceNotFound:
return []
@@ -737,17 +1118,35 @@ class CouchDatabase(CommonBackend):
:param other_transaction_id: The transaction id associated with the
generation.
:type other_transaction_id: str
+
+ :raise MissingDesignDocError: Raised when tried to access a missing
+ design document.
+ :raise MissingDesignDocListFunctionError: Raised when trying to access
+ a missing list function on a
+ design document.
+ :raise MissingDesignDocNamedViewError: Raised when trying to access a
+ missing named view on a design
+ document.
+ :raise MissingDesignDocDeletedError: Raised when trying to access a
+ deleted design document.
+ :raise MissingDesignDocUnknownError: Raised when failed to access a
+ design document for an yet
+ unknown reason.
"""
# query a couch update function
- res = self._database.resource(
- '_design', 'syncs', '_update', 'put', 'u1db_sync_log')
- res.put_json(
- body={
- 'other_replica_uid': other_replica_uid,
- 'other_generation': other_generation,
- 'other_transaction_id': other_transaction_id,
- },
- headers={'content-type': 'application/json'})
+ ddoc_path = ['_design', 'syncs', '_update', 'put', 'u1db_sync_log']
+ res = self._database.resource(*ddoc_path)
+ try:
+ with CouchDatabase.update_handler_lock[self._get_replica_uid()]:
+ res.put_json(
+ body={
+ 'other_replica_uid': other_replica_uid,
+ 'other_generation': other_generation,
+ 'other_transaction_id': other_transaction_id,
+ },
+ headers={'content-type': 'application/json'})
+ except ResourceNotFound as e:
+ raise_missing_design_doc_error(e, ddoc_path)
def _add_conflict(self, doc, my_doc_rev, my_content):
"""
@@ -765,7 +1164,7 @@ class CouchDatabase(CommonBackend):
serialized string.
:type my_content: str
"""
- doc.ensure_fetch_conflicts(self._get_conflicts)
+ doc._ensure_fetch_conflicts(self._get_conflicts)
doc.add_conflict(
self._factory(doc_id=doc.doc_id, rev=my_doc_rev,
json=my_content))
@@ -774,7 +1173,7 @@ class CouchDatabase(CommonBackend):
"""
Delete the conflicted revisions from the list of conflicts of C{doc}.
- Note that thie method does not actually update the backed; rather, it
+ Note that this method does not actually update the backend; rather, it
updates the CouchDocument object which will provide the conflict data
when the atomic document update is made.
@@ -783,7 +1182,7 @@ class CouchDatabase(CommonBackend):
:param conflict_revs: A list of the revisions to be deleted.
:param conflict_revs: [str]
"""
- doc.ensure_fetch_conflicts(self._get_conflicts)
+ doc._ensure_fetch_conflicts(self._get_conflicts)
doc.delete_conflicts(conflict_revs)
def _prune_conflicts(self, doc, doc_vcr):
@@ -842,34 +1241,44 @@ class CouchDatabase(CommonBackend):
:param conflicted_doc_revs: A list of revisions that the new content
supersedes.
:type conflicted_doc_revs: [str]
+
+ :raise MissingDesignDocError: Raised when tried to access a missing
+ design document.
+ :raise MissingDesignDocListFunctionError: Raised when trying to access
+ a missing list function on a
+ design document.
+ :raise MissingDesignDocNamedViewError: Raised when trying to access a
+ missing named view on a design
+ document.
+ :raise MissingDesignDocDeletedError: Raised when trying to access a
+ deleted design document.
+ :raise MissingDesignDocUnknownError: Raised when failed to access a
+ design document for an yet
+ unknown reason.
"""
cur_doc = self._get_doc(doc.doc_id, check_for_conflicts=True)
new_rev = self._ensure_maximal_rev(cur_doc.rev,
conflicted_doc_revs)
superseded_revs = set(conflicted_doc_revs)
doc.rev = new_rev
+ # this backend stores conflicts as properties of the documents, so we
+ # have to copy these conflicts over to the document being updated.
if cur_doc.rev in superseded_revs:
+ # the newer doc version will supersede the one in the database, so
+ # we copy conflicts before updating the backend.
+ doc.set_conflicts(cur_doc.get_conflicts()) # copy conflicts over.
self._delete_conflicts(doc, superseded_revs)
self._put_doc(cur_doc, doc)
else:
- self._add_conflict(doc, new_rev, doc.get_json())
- self._delete_conflicts(doc, superseded_revs)
- # perform request to resolve document in server
- resource = self._database.resource(
- '_design', 'docs', '_update', 'resolve_doc', doc.doc_id)
- conflicts = None
- if doc.has_conflicts:
- conflicts = binascii.b2a_base64(
- json.dumps(
- map(lambda cdoc: (cdoc.rev, cdoc.content),
- doc.get_conflicts()))
- )[:-1] # exclude \n
- response = resource.put_json(
- body={
- 'couch_rev': cur_doc.couch_rev,
- 'conflicts': conflicts,
- },
- headers={'content-type': 'application/json'})
+ # the newer doc version does not supersede the one in the
+ # database, so we will add a conflict to the database and copy
+ # those over to the document the user has in her hands.
+ self._add_conflict(cur_doc, new_rev, doc.get_json())
+ self._delete_conflicts(cur_doc, superseded_revs)
+ self._put_doc(cur_doc, cur_doc) # just update conflicts
+ # backend has been updated with current conflicts, now copy them
+ # to the current document.
+ doc.set_conflicts(cur_doc.get_conflicts())
def _put_doc_if_newer(self, doc, save_conflict, replica_uid, replica_gen,
replica_trans_id=''):
@@ -926,7 +1335,7 @@ class CouchDatabase(CommonBackend):
if cur_doc is not None:
doc.couch_rev = cur_doc.couch_rev
# fetch conflicts because we will eventually manipulate them
- doc.ensure_fetch_conflicts(self._get_conflicts)
+ doc._ensure_fetch_conflicts(self._get_conflicts)
# from now on, it works just like u1db sqlite backend
doc_vcr = vectorclock.VectorClockRev(doc.rev)
if cur_doc is None:
@@ -963,7 +1372,7 @@ class CouchDatabase(CommonBackend):
if save_conflict:
self._force_doc_sync_conflict(doc)
if replica_uid is not None and replica_gen is not None:
- self._do_set_replica_gen_and_trans_id(
+ self._set_replica_gen_and_trans_id(
replica_uid, replica_gen, replica_trans_id)
# update info
old_doc.rev = doc.rev
@@ -974,6 +1383,59 @@ class CouchDatabase(CommonBackend):
old_doc.has_conflicts = doc.has_conflicts
return state, self._get_generation()
+ def get_docs(self, doc_ids, check_for_conflicts=True,
+ include_deleted=False):
+ """
+ Get the JSON content for many documents.
+
+ :param doc_ids: A list of document identifiers.
+ :type doc_ids: list
+ :param check_for_conflicts: If set to False, then the conflict check
+ will be skipped, and 'None' will be
+ returned instead of True/False.
+ :type check_for_conflictsa: bool
+ :param include_deleted: If set to True, deleted documents will be
+ returned with empty content. Otherwise deleted
+ documents will not be included in the results.
+ :return: iterable giving the Document object for each document id
+ in matching doc_ids order.
+ :rtype: iterable
+ """
+ # Workaround for:
+ #
+ # http://bugs.python.org/issue7980
+ # https://leap.se/code/issues/5449
+ #
+ # python-couchdb uses time.strptime, which is not thread safe. In
+ # order to avoid the problem described on the issues above, we preload
+ # strptime here by evaluating the conversion of an arbitrary date.
+ # This will not be needed when/if we switch from python-couchdb to
+ # paisley.
+ time.strptime('Mar 4 1917', '%b %d %Y')
+ # spawn threads to retrieve docs
+ threads = []
+ for doc_id in doc_ids:
+ self._sem_pool.acquire()
+ t = self._GetDocThread(self, doc_id, check_for_conflicts,
+ self._sem_pool.release)
+ t.start()
+ threads.append(t)
+ # join threads and yield docs
+ for t in threads:
+ t.join()
+ if t._doc.is_tombstone() and not include_deleted:
+ continue
+ yield t._doc
+
+ def _renew_couch_session(self):
+ """
+ Create a new couch connection session.
+
+ This is a workaround for #5448. Will not be needed once bigcouch is
+ merged with couchdb.
+ """
+ self._database.resource.session = Session(timeout=COUCH_TIMEOUT)
+
class CouchSyncTarget(CommonSyncTarget):
"""
@@ -997,14 +1459,6 @@ class CouchSyncTarget(CommonSyncTarget):
source_replica_transaction_id)
-class NotEnoughCouchPermissions(Exception):
- """
- Raised when failing to assert for enough permissions on underlying Couch
- Database.
- """
- pass
-
-
class CouchServerState(ServerState):
"""
Inteface of the WSGI server with the CouchDB backend.
@@ -1024,65 +1478,6 @@ class CouchServerState(ServerState):
self._couch_url = couch_url
self._shared_db_name = shared_db_name
self._tokens_db_name = tokens_db_name
- try:
- self._check_couch_permissions()
- except NotEnoughCouchPermissions:
- logger.error("Not enough permissions on underlying couch "
- "database (%s)." % self._couch_url)
- except (socket.error, socket.gaierror, socket.herror,
- socket.timeout), e:
- logger.error("Socket problem while trying to reach underlying "
- "couch database: (%s, %s)." %
- (self._couch_url, e))
-
- def _check_couch_permissions(self):
- """
- Assert that Soledad Server has enough permissions on the underlying
- couch database.
-
- Soledad Server has to be able to do the following in the couch server:
-
- * Create, read and write from/to 'shared' db.
- * Create, read and write from/to 'user-<anything>' dbs.
- * Read from 'tokens' db.
-
- This function tries to perform the actions above using the "low level"
- couch library to ensure that Soledad Server can do everything it needs
- on the underlying couch database.
-
- :param couch_url: The URL of the couch database.
- :type couch_url: str
-
- @raise NotEnoughCouchPermissions: Raised in case there are not enough
- permissions to read/write/create the needed couch databases.
- :rtype: bool
- """
-
- def _open_couch_db(dbname):
- server = Server(url=self._couch_url)
- try:
- server[dbname]
- except ResourceNotFound:
- server.create(dbname)
- return server[dbname]
-
- def _create_delete_test_doc(db):
- doc_id, _ = db.save({'test': 'document'})
- doc = db[doc_id]
- db.delete(doc)
-
- try:
- # test read/write auth for shared db
- _create_delete_test_doc(
- _open_couch_db(self._shared_db_name))
- # test read/write auth for user-<something> db
- _create_delete_test_doc(
- _open_couch_db('%stest-db' % USER_DB_PREFIX))
- # test read auth for tokens db
- tokensdb = _open_couch_db(self._tokens_db_name)
- tokensdb.info()
- except Unauthorized:
- raise NotEnoughCouchPermissions(self._couch_url)
def open_database(self, dbname):
"""
@@ -1094,25 +1489,29 @@ class CouchServerState(ServerState):
:return: The CouchDatabase object.
:rtype: CouchDatabase
"""
- # TODO: open couch
return CouchDatabase.open_database(
self._couch_url + '/' + dbname,
- create=False)
+ create=False,
+ ensure_ddocs=False)
def ensure_database(self, dbname):
"""
Ensure couch database exists.
+ Usually, this method is used by the server to ensure the existence of
+ a database. In our setup, the Soledad user that accesses the underlying
+ couch server should never have permission to create (or delete)
+ databases. But, in case it ever does, by raising an exception here we
+ have one more guarantee that no modified client will be able to
+ enforce creation of a database when syncing.
+
:param dbname: The name of the database to ensure.
:type dbname: str
:return: The CouchDatabase object and the replica uid.
:rtype: (CouchDatabase, str)
"""
- db = CouchDatabase.open_database(
- self._couch_url + '/' + dbname,
- create=True)
- return db, db._replica_uid
+ raise Unauthorized()
def delete_database(self, dbname):
"""
@@ -1121,7 +1520,7 @@ class CouchServerState(ServerState):
:param dbname: The name of the database to delete.
:type dbname: str
"""
- CouchDatabase.delete_database(self._couch_url + '/' + dbname)
+ raise Unauthorized()
def _set_couch_url(self, url):
"""
diff --git a/common/src/leap/soledad/common/ddocs/docs/updates/put.js b/common/src/leap/soledad/common/ddocs/docs/updates/put.js
deleted file mode 100644
index 5a4647de..00000000
--- a/common/src/leap/soledad/common/ddocs/docs/updates/put.js
+++ /dev/null
@@ -1,64 +0,0 @@
-function(doc, req){
- /* we expect to receive the following in `req.body`:
- * {
- * 'couch_rev': '<couch_rev>',
- * 'u1db_rev': '<u1db_rev>',
- * 'content': '<base64 encoded content>',
- * 'trans_id': '<reansaction_id>'
- * 'conflicts': '<base64 encoded conflicts>',
- * 'update_conflicts': <boolean>
- * }
- */
- var body = JSON.parse(req.body);
-
- // create a new document document
- if (!doc) {
- doc = {}
- doc['_id'] = req['id'];
- }
- // or fail if couch revisions do not match
- else if (doc['_rev'] != body['couch_rev']) {
- // of fail if revisions do not match
- return [null, 'revision conflict']
- }
-
- // store u1db rev
- doc.u1db_rev = body['u1db_rev'];
-
- // save content as attachment
- if (body['content'] != null) {
- // save u1db content as attachment
- if (!doc._attachments)
- doc._attachments = {};
- doc._attachments.u1db_content = {
- content_type: "application/octet-stream",
- data: body['content'] // should be base64 encoded
- };
- }
- // or delete the attachment if document is tombstone
- else if (doc._attachments &&
- doc._attachments.u1db_content)
- delete doc._attachments.u1db_content;
-
- // store the transaction id
- if (!doc.u1db_transactions)
- doc.u1db_transactions = [];
- var d = new Date();
- doc.u1db_transactions.push([d.getTime(), body['trans_id']]);
-
- // save conflicts as attachment if they were sent
- if (body['update_conflicts'])
- if (body['conflicts'] != null) {
- if (!doc._attachments)
- doc._attachments = {};
- doc._attachments.u1db_conflicts = {
- content_type: "application/octet-stream",
- data: body['conflicts'] // should be base64 encoded
- }
- } else {
- if(doc._attachments && doc._attachments.u1db_conflicts)
- delete doc._attachments.u1db_conflicts
- }
-
- return [doc, 'ok'];
-}
diff --git a/common/src/leap/soledad/common/ddocs/docs/updates/resolve_doc.js b/common/src/leap/soledad/common/ddocs/docs/updates/resolve_doc.js
deleted file mode 100644
index 7ba66cf8..00000000
--- a/common/src/leap/soledad/common/ddocs/docs/updates/resolve_doc.js
+++ /dev/null
@@ -1,39 +0,0 @@
-function(doc, req){
- /* we expect to receive the following in `req.body`:
- * {
- * 'couch_rev': '<couch_rev>',
- * 'conflicts': '<base64 encoded conflicts>',
- * }
- */
- var body = JSON.parse(req.body);
-
- // fail if no document was given
- if (!doc) {
- return [null, 'document does not exist']
- }
-
- // fail if couch revisions do not match
- if (body['couch_rev'] != null
- && doc['_rev'] != body['couch_rev']) {
- return [null, 'revision conflict']
- }
-
- // fail if conflicts were not sent
- if (body['conflicts'] == null)
- return [null, 'missing conflicts']
-
- // save conflicts as attachment if they were sent
- if (body['conflicts'] != null) {
- if (!doc._attachments)
- doc._attachments = {};
- doc._attachments.u1db_conflicts = {
- content_type: "application/octet-stream",
- data: body['conflicts'] // should be base64 encoded
- }
- }
- // or delete attachment if there are no conflicts
- else if (doc._attachments && doc._attachments.u1db_conflicts)
- delete doc._attachments.u1db_conflicts;
-
- return [doc, 'ok'];
-}
diff --git a/common/src/leap/soledad/common/errors.py b/common/src/leap/soledad/common/errors.py
index 7c2d7296..3a7eadd2 100644
--- a/common/src/leap/soledad/common/errors.py
+++ b/common/src/leap/soledad/common/errors.py
@@ -25,11 +25,49 @@ from u1db import errors
from u1db.remote import http_errors
+def register_exception(cls):
+ """
+ A small decorator that registers exceptions in u1db maps.
+ """
+ # update u1db "wire description to status" and "wire description to
+ # exception" maps.
+ http_errors.wire_description_to_status.update({
+ cls.wire_description: cls.status})
+ errors.wire_description_to_exc.update({
+ cls.wire_description: cls})
+ # do not modify the exception
+ return cls
+
+
+class SoledadError(errors.U1DBError):
+ """
+ Base Soledad HTTP errors.
+ """
+ pass
+
+
#
-# LockResource: a lock based on a document in the shared database.
+# Authorization errors
#
-class InvalidTokenError(errors.U1DBError):
+@register_exception
+class InvalidAuthTokenError(errors.Unauthorized):
+ """
+ Exception raised when failing to get authorization for some action because
+ the provided token either does not exist in the tokens database, has a
+ distinct structure from the expected one, or is associated with a user
+ with a distinct uuid than the one provided by the client.
+ """
+
+ wire_descrition = "invalid auth token"
+ status = 401
+
+#
+# LockResource errors
+#
+
+@register_exception
+class InvalidTokenError(SoledadError):
"""
Exception raised when trying to unlock shared database with invalid token.
"""
@@ -38,7 +76,8 @@ class InvalidTokenError(errors.U1DBError):
status = 401
-class NotLockedError(errors.U1DBError):
+@register_exception
+class NotLockedError(SoledadError):
"""
Exception raised when trying to unlock shared database when it is not
locked.
@@ -48,7 +87,8 @@ class NotLockedError(errors.U1DBError):
status = 404
-class AlreadyLockedError(errors.U1DBError):
+@register_exception
+class AlreadyLockedError(SoledadError):
"""
Exception raised when trying to lock shared database but it is already
locked.
@@ -57,13 +97,83 @@ class AlreadyLockedError(errors.U1DBError):
wire_description = "lock is locked"
status = 403
-# update u1db "wire description to status" and "wire description to exception"
-# maps.
-for e in [InvalidTokenError, NotLockedError, AlreadyLockedError]:
- http_errors.wire_description_to_status.update({
- e.wire_description: e.status})
- errors.wire_description_to_exc.update({
- e.wire_description: e})
+
+@register_exception
+class LockTimedOutError(SoledadError):
+ """
+ Exception raised when timing out while trying to lock the shared database.
+ """
+
+ wire_description = "lock timed out"
+ status = 408
+
+
+@register_exception
+class CouldNotObtainLockError(SoledadError):
+ """
+ Exception raised when timing out while trying to lock the shared database.
+ """
+
+ wire_description = "error obtaining lock"
+ status = 500
+
+
+#
+# CouchDatabase errors
+#
+
+@register_exception
+class MissingDesignDocError(SoledadError):
+ """
+ Raised when trying to access a missing couch design document.
+ """
+
+ wire_description = "missing design document"
+ status = 500
+
+
+@register_exception
+class MissingDesignDocNamedViewError(SoledadError):
+ """
+ Raised when trying to access a missing named view on a couch design
+ document.
+ """
+
+ wire_description = "missing design document named function"
+ status = 500
+
+
+@register_exception
+class MissingDesignDocListFunctionError(SoledadError):
+ """
+ Raised when trying to access a missing list function on a couch design
+ document.
+ """
+
+ wire_description = "missing design document list function"
+ status = 500
+
+
+@register_exception
+class MissingDesignDocDeletedError(SoledadError):
+ """
+ Raised when trying to access a deleted couch design document.
+ """
+
+ wire_description = "design document was deleted"
+ status = 500
+
+
+@register_exception
+class DesignDocUnknownError(SoledadError):
+ """
+ Raised when trying to access a couch design document and getting an
+ unknown error.
+ """
+
+ wire_description = "missing design document unknown error"
+ status = 500
+
# u1db error statuses also have to be updated
http_errors.ERROR_STATUSES = set(
diff --git a/common/src/leap/soledad/common/tests/couchdb.ini.template b/common/src/leap/soledad/common/tests/couchdb.ini.template
index 217ae201..1fc2205b 100644
--- a/common/src/leap/soledad/common/tests/couchdb.ini.template
+++ b/common/src/leap/soledad/common/tests/couchdb.ini.template
@@ -6,7 +6,7 @@
database_dir = %(tempdir)s/lib
view_index_dir = %(tempdir)s/lib
max_document_size = 4294967296 ; 4 GB
-os_process_timeout = 5000 ; 5 seconds. for view and external servers.
+os_process_timeout = 120000 ; 120 seconds. for view and external servers.
max_dbs_open = 100
delayed_commits = true ; set this to false to ensure an fsync before 201 Created is returned
uri_file = %(tempdir)s/lib/couch.uri
diff --git a/common/src/leap/soledad/common/tests/test_couch.py b/common/src/leap/soledad/common/tests/test_couch.py
index 72346333..86bb4b93 100644
--- a/common/src/leap/soledad/common/tests/test_couch.py
+++ b/common/src/leap/soledad/common/tests/test_couch.py
@@ -24,16 +24,17 @@ import re
import copy
import shutil
from base64 import b64decode
+from mock import Mock
from couchdb.client import Server
-from u1db import errors
+from u1db import errors as u1db_errors
from leap.common.files import mkdir_p
from leap.soledad.common.tests import u1db_tests as tests
from leap.soledad.common.tests.u1db_tests import test_backends
from leap.soledad.common.tests.u1db_tests import test_sync
-from leap.soledad.common import couch
+from leap.soledad.common import couch, errors
import simplejson as json
@@ -80,9 +81,10 @@ class CouchDBWrapper(object):
mkdir_p(os.path.join(self.tempdir, 'lib'))
mkdir_p(os.path.join(self.tempdir, 'log'))
args = ['couchdb', '-n', '-a', confPath]
- #null = open('/dev/null', 'w')
+ null = open('/dev/null', 'w')
+
self.process = subprocess.Popen(
- args, env=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
+ args, env=None, stdout=null.fileno(), stderr=null.fileno(),
close_fds=True)
# find port
logPath = os.path.join(self.tempdir, 'log', 'couch.log')
@@ -125,21 +127,21 @@ class CouchDBTestCase(unittest.TestCase):
TestCase base class for tests against a real CouchDB server.
"""
- def setUp(self):
+ @classmethod
+ def setUpClass(cls):
"""
Make sure we have a CouchDB instance for a test.
"""
- self.wrapper = CouchDBWrapper()
- self.wrapper.start()
+ cls.wrapper = CouchDBWrapper()
+ cls.wrapper.start()
#self.db = self.wrapper.db
- unittest.TestCase.setUp(self)
- def tearDown(self):
+ @classmethod
+ def tearDownClass(cls):
"""
Stop CouchDB instance for test.
"""
- self.wrapper.stop()
- unittest.TestCase.tearDown(self)
+ cls.wrapper.stop()
#-----------------------------------------------------------------------------
@@ -356,7 +358,7 @@ class IndexedCouchDatabase(couch.CouchDatabase):
def __init__(self, url, dbname, replica_uid=None, full_commit=True,
session=None, ensure_ddocs=True):
old_class.__init__(self, url, dbname, replica_uid, full_commit,
- session, ensure_ddocs=True)
+ session, ensure_ddocs=ensure_ddocs)
self._indexes = {}
def _put_doc(self, old_doc, doc):
@@ -372,7 +374,7 @@ class IndexedCouchDatabase(couch.CouchDatabase):
if self._indexes[index_name]._definition == list(
index_expressions):
return
- raise errors.IndexNameTakenError
+ raise u1db_errors.IndexNameTakenError
index = InMemoryIndex(index_name, list(index_expressions))
_, all_docs = self.get_all_docs()
for doc in all_docs:
@@ -392,7 +394,7 @@ class IndexedCouchDatabase(couch.CouchDatabase):
try:
index = self._indexes[index_name]
except KeyError:
- raise errors.IndexDoesNotExist
+ raise u1db_errors.IndexDoesNotExist
doc_ids = index.lookup(key_values)
result = []
for doc_id in doc_ids:
@@ -405,7 +407,7 @@ class IndexedCouchDatabase(couch.CouchDatabase):
try:
index = self._indexes[index_name]
except KeyError:
- raise errors.IndexDoesNotExist
+ raise u1db_errors.IndexDoesNotExist
if isinstance(start_value, basestring):
start_value = (start_value,)
if isinstance(end_value, basestring):
@@ -420,7 +422,7 @@ class IndexedCouchDatabase(couch.CouchDatabase):
try:
index = self._indexes[index_name]
except KeyError:
- raise errors.IndexDoesNotExist
+ raise u1db_errors.IndexDoesNotExist
keys = index.keys()
# XXX inefficiency warning
return list(set([tuple(key.split('\x01')) for key in keys]))
@@ -461,4 +463,184 @@ class CouchDatabaseSyncTests(test_sync.DatabaseSyncTests, CouchDBTestCase):
test_sync.DatabaseSyncTests.tearDown(self)
+class CouchDatabaseExceptionsTests(CouchDBTestCase):
+
+ def setUp(self):
+ CouchDBTestCase.setUp(self)
+ self.db = couch.CouchDatabase(
+ 'http://127.0.0.1:%d' % self.wrapper.port, 'test',
+ ensure_ddocs=False) # note that we don't enforce ddocs here
+
+ def tearDown(self):
+ self.db.delete_database()
+
+ def test_missing_design_doc_raises(self):
+ """
+ Test that all methods that access design documents will raise if the
+ design docs are not present.
+ """
+ # _get_generation()
+ self.assertRaises(
+ errors.MissingDesignDocError,
+ self.db._get_generation)
+ # _get_generation_info()
+ self.assertRaises(
+ errors.MissingDesignDocError,
+ self.db._get_generation_info)
+ # _get_trans_id_for_gen()
+ self.assertRaises(
+ errors.MissingDesignDocError,
+ self.db._get_trans_id_for_gen, 1)
+ # _get_transaction_log()
+ self.assertRaises(
+ errors.MissingDesignDocError,
+ self.db._get_transaction_log)
+ # whats_changed()
+ self.assertRaises(
+ errors.MissingDesignDocError,
+ self.db.whats_changed)
+ # _do_set_replica_gen_and_trans_id()
+ self.assertRaises(
+ errors.MissingDesignDocError,
+ self.db._do_set_replica_gen_and_trans_id, 1, 2, 3)
+
+ def test_missing_design_doc_functions_raises(self):
+ """
+ Test that all methods that access design documents list functions
+ will raise if the functions are not present.
+ """
+ self.db = couch.CouchDatabase(
+ 'http://127.0.0.1:%d' % self.wrapper.port, 'test',
+ ensure_ddocs=True)
+ # erase views from _design/transactions
+ transactions = self.db._database['_design/transactions']
+ transactions['lists'] = {}
+ self.db._database.save(transactions)
+ # _get_generation()
+ self.assertRaises(
+ errors.MissingDesignDocListFunctionError,
+ self.db._get_generation)
+ # _get_generation_info()
+ self.assertRaises(
+ errors.MissingDesignDocListFunctionError,
+ self.db._get_generation_info)
+ # _get_trans_id_for_gen()
+ self.assertRaises(
+ errors.MissingDesignDocListFunctionError,
+ self.db._get_trans_id_for_gen, 1)
+ # whats_changed()
+ self.assertRaises(
+ errors.MissingDesignDocListFunctionError,
+ self.db.whats_changed)
+
+ def test_absent_design_doc_functions_raises(self):
+ """
+ Test that all methods that access design documents list functions
+ will raise if the functions are not present.
+ """
+ self.db = couch.CouchDatabase(
+ 'http://127.0.0.1:%d' % self.wrapper.port, 'test',
+ ensure_ddocs=True)
+ # erase views from _design/transactions
+ transactions = self.db._database['_design/transactions']
+ del transactions['lists']
+ self.db._database.save(transactions)
+ # _get_generation()
+ self.assertRaises(
+ errors.MissingDesignDocListFunctionError,
+ self.db._get_generation)
+ # _get_generation_info()
+ self.assertRaises(
+ errors.MissingDesignDocListFunctionError,
+ self.db._get_generation_info)
+ # _get_trans_id_for_gen()
+ self.assertRaises(
+ errors.MissingDesignDocListFunctionError,
+ self.db._get_trans_id_for_gen, 1)
+ # whats_changed()
+ self.assertRaises(
+ errors.MissingDesignDocListFunctionError,
+ self.db.whats_changed)
+
+ def test_missing_design_doc_named_views_raises(self):
+ """
+ Test that all methods that access design documents' named views will
+ raise if the views are not present.
+ """
+ self.db = couch.CouchDatabase(
+ 'http://127.0.0.1:%d' % self.wrapper.port, 'test',
+ ensure_ddocs=True)
+ # erase views from _design/docs
+ docs = self.db._database['_design/docs']
+ del docs['views']
+ self.db._database.save(docs)
+ # erase views from _design/syncs
+ syncs = self.db._database['_design/syncs']
+ del syncs['views']
+ self.db._database.save(syncs)
+ # erase views from _design/transactions
+ transactions = self.db._database['_design/transactions']
+ del transactions['views']
+ self.db._database.save(transactions)
+ # _get_generation()
+ self.assertRaises(
+ errors.MissingDesignDocNamedViewError,
+ self.db._get_generation)
+ # _get_generation_info()
+ self.assertRaises(
+ errors.MissingDesignDocNamedViewError,
+ self.db._get_generation_info)
+ # _get_trans_id_for_gen()
+ self.assertRaises(
+ errors.MissingDesignDocNamedViewError,
+ self.db._get_trans_id_for_gen, 1)
+ # _get_transaction_log()
+ self.assertRaises(
+ errors.MissingDesignDocNamedViewError,
+ self.db._get_transaction_log)
+ # whats_changed()
+ self.assertRaises(
+ errors.MissingDesignDocNamedViewError,
+ self.db.whats_changed)
+
+ def test_deleted_design_doc_raises(self):
+ """
+ Test that all methods that access design documents will raise if the
+ design docs are not present.
+ """
+ self.db = couch.CouchDatabase(
+ 'http://127.0.0.1:%d' % self.wrapper.port, 'test',
+ ensure_ddocs=True)
+ # delete _design/docs
+ del self.db._database['_design/docs']
+ # delete _design/syncs
+ del self.db._database['_design/syncs']
+ # delete _design/transactions
+ del self.db._database['_design/transactions']
+ # _get_generation()
+ self.assertRaises(
+ errors.MissingDesignDocDeletedError,
+ self.db._get_generation)
+ # _get_generation_info()
+ self.assertRaises(
+ errors.MissingDesignDocDeletedError,
+ self.db._get_generation_info)
+ # _get_trans_id_for_gen()
+ self.assertRaises(
+ errors.MissingDesignDocDeletedError,
+ self.db._get_trans_id_for_gen, 1)
+ # _get_transaction_log()
+ self.assertRaises(
+ errors.MissingDesignDocDeletedError,
+ self.db._get_transaction_log)
+ # whats_changed()
+ self.assertRaises(
+ errors.MissingDesignDocDeletedError,
+ self.db.whats_changed)
+ # _do_set_replica_gen_and_trans_id()
+ self.assertRaises(
+ errors.MissingDesignDocDeletedError,
+ self.db._do_set_replica_gen_and_trans_id, 1, 2, 3)
+
+
load_tests = tests.load_with_scenarios
diff --git a/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py b/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py
index a0c473b1..3c457cc5 100644
--- a/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py
+++ b/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py
@@ -33,11 +33,17 @@ from leap.soledad.common.tests.test_target import (
make_leap_document_for_test,
token_leap_sync_target,
)
+from leap.soledad.common.tests.test_server import _couch_ensure_database
REPEAT_TIMES = 20
+# monkey path CouchServerState so it can ensure databases.
+
+CouchServerState.ensure_database = _couch_ensure_database
+
+
class CouchAtomicityTestCase(CouchDBTestCase, TestCaseWithServer):
@staticmethod
@@ -100,6 +106,7 @@ class CouchAtomicityTestCase(CouchDBTestCase, TestCaseWithServer):
self.tempdir = tempfile.mkdtemp(prefix="leap_tests-")
def tearDown(self):
+ self.db.delete_database()
CouchDBTestCase.tearDown(self)
TestCaseWithServer.tearDown(self)
@@ -337,3 +344,46 @@ class CouchAtomicityTestCase(CouchDBTestCase, TestCaseWithServer):
self.assertEqual(
1,
len(filter(lambda t: t[0] == doc_id, transaction_log)))
+
+ def test_concurrent_syncs_do_not_fail(self):
+ """
+ Assert that concurrent attempts to sync end up being executed
+ sequentially and do not fail.
+ """
+ threads = []
+ docs = []
+ pool = threading.BoundedSemaphore(value=1)
+ self.startServer()
+ sol = self._soledad_instance(
+ auth_token='auth-token',
+ server_url=self.getURL())
+
+ def _run_method(self):
+ # create a lot of documents
+ doc = self._params['sol'].create_doc({})
+ # do the sync!
+ sol.sync()
+ pool.acquire()
+ docs.append(doc.doc_id)
+ pool.release()
+
+ # launch threads to create documents in parallel
+ for i in range(0, REPEAT_TIMES):
+ thread = self._WorkerThread(
+ {'sol': sol, 'syncs': i},
+ _run_method)
+ thread.start()
+ threads.append(thread)
+
+ # wait for threads to finish
+ for thread in threads:
+ thread.join()
+
+ transaction_log = self.db._get_transaction_log()
+ self.assertEqual(REPEAT_TIMES, len(transaction_log))
+ # assert all documents are in the remote log
+ self.assertEqual(REPEAT_TIMES, len(docs))
+ for doc_id in docs:
+ self.assertEqual(
+ 1,
+ len(filter(lambda t: t[0] == doc_id, transaction_log)))
diff --git a/common/src/leap/soledad/common/tests/test_server.py b/common/src/leap/soledad/common/tests/test_server.py
index 83df192b..f8d2a64f 100644
--- a/common/src/leap/soledad/common/tests/test_server.py
+++ b/common/src/leap/soledad/common/tests/test_server.py
@@ -25,6 +25,7 @@ import tempfile
import simplejson as json
import mock
import time
+import binascii
from leap.common.testing.basetest import BaseLeapTest
@@ -50,6 +51,17 @@ from leap.soledad.server import SoledadApp, LockResource
from leap.soledad.server.auth import URLToAuthorization
+# monkey path CouchServerState so it can ensure databases.
+
+def _couch_ensure_database(self, dbname):
+ db = CouchDatabase.open_database(
+ self._couch_url + '/' + dbname,
+ create=True)
+ return db, db._replica_uid
+
+CouchServerState.ensure_database = _couch_ensure_database
+
+
class ServerAuthorizationTestCase(BaseLeapTest):
"""
Tests related to Soledad server authorization.
@@ -339,15 +351,16 @@ class EncryptedSyncTestCase(
_, doclist = sol1.get_all_docs()
self.assertEqual([], doclist)
doc1 = sol1.create_doc(json.loads(simple_doc))
- # sync with server
- sol1._server_url = self.getURL()
- sol1.sync()
- # assert doc was sent to couch db
+ # ensure remote db exists before syncing
db = CouchDatabase(
self._couch_url,
# the name of the user database is "user-<uuid>".
'user-user-uuid',
)
+ # sync with server
+ sol1._server_url = self.getURL()
+ sol1.sync()
+ # assert doc was sent to couch db
_, doclist = db.get_all_docs()
self.assertEqual(1, len(doclist))
couchdoc = doclist[0]
@@ -376,6 +389,7 @@ class EncryptedSyncTestCase(
doc2 = doclist[0]
# assert incoming doc is equal to the first sent doc
self.assertEqual(doc1, doc2)
+ db.delete_database()
def test_encrypted_sym_sync_with_unicode_passphrase(self):
"""
@@ -393,15 +407,16 @@ class EncryptedSyncTestCase(
_, doclist = sol1.get_all_docs()
self.assertEqual([], doclist)
doc1 = sol1.create_doc(json.loads(simple_doc))
- # sync with server
- sol1._server_url = self.getURL()
- sol1.sync()
- # assert doc was sent to couch db
+ # ensure remote db exists before syncing
db = CouchDatabase(
self._couch_url,
# the name of the user database is "user-<uuid>".
'user-user-uuid',
)
+ # sync with server
+ sol1._server_url = self.getURL()
+ sol1.sync()
+ # assert doc was sent to couch db
_, doclist = db.get_all_docs()
self.assertEqual(1, len(doclist))
couchdoc = doclist[0]
@@ -434,7 +449,94 @@ class EncryptedSyncTestCase(
doc2 = doclist[0]
# assert incoming doc is equal to the first sent doc
self.assertEqual(doc1, doc2)
+ db.delete_database()
+ def test_sync_very_large_files(self):
+ """
+ Test if Soledad can sync very large files.
+ """
+ # define the size of the "very large file"
+ length = 100*(10**6) # 100 MB
+ self.startServer()
+ # instantiate soledad and create a document
+ sol1 = self._soledad_instance(
+ # token is verified in test_target.make_token_soledad_app
+ auth_token='auth-token'
+ )
+ _, doclist = sol1.get_all_docs()
+ self.assertEqual([], doclist)
+ content = binascii.hexlify(os.urandom(length/2)) # len() == length
+ doc1 = sol1.create_doc({'data': content})
+ # ensure remote db exists before syncing
+ db = CouchDatabase(
+ self._couch_url,
+ # the name of the user database is "user-<uuid>".
+ 'user-user-uuid',
+ )
+ # sync with server
+ sol1._server_url = self.getURL()
+ sol1.sync()
+ # instantiate soledad with empty db, but with same secrets path
+ sol2 = self._soledad_instance(prefix='x', auth_token='auth-token')
+ _, doclist = sol2.get_all_docs()
+ self.assertEqual([], doclist)
+ sol2._secrets_path = sol1.secrets_path
+ sol2._load_secrets()
+ sol2._set_secret_id(sol1._secret_id)
+ # sync the new instance
+ sol2._server_url = self.getURL()
+ sol2.sync()
+ _, doclist = sol2.get_all_docs()
+ self.assertEqual(1, len(doclist))
+ doc2 = doclist[0]
+ # assert incoming doc is equal to the first sent doc
+ self.assertEqual(doc1, doc2)
+ # delete remote database
+ db.delete_database()
+
+
+ def test_sync_many_small_files(self):
+ """
+ Test if Soledad can sync many smallfiles.
+ """
+ number_of_docs = 100
+ self.startServer()
+ # instantiate soledad and create a document
+ sol1 = self._soledad_instance(
+ # token is verified in test_target.make_token_soledad_app
+ auth_token='auth-token'
+ )
+ _, doclist = sol1.get_all_docs()
+ self.assertEqual([], doclist)
+ # create many small files
+ for i in range(0, number_of_docs):
+ sol1.create_doc(json.loads(simple_doc))
+ # ensure remote db exists before syncing
+ db = CouchDatabase(
+ self._couch_url,
+ # the name of the user database is "user-<uuid>".
+ 'user-user-uuid',
+ )
+ # sync with server
+ sol1._server_url = self.getURL()
+ sol1.sync()
+ # instantiate soledad with empty db, but with same secrets path
+ sol2 = self._soledad_instance(prefix='x', auth_token='auth-token')
+ _, doclist = sol2.get_all_docs()
+ self.assertEqual([], doclist)
+ sol2._secrets_path = sol1.secrets_path
+ sol2._load_secrets()
+ sol2._set_secret_id(sol1._secret_id)
+ # sync the new instance
+ sol2._server_url = self.getURL()
+ sol2.sync()
+ _, doclist = sol2.get_all_docs()
+ self.assertEqual(number_of_docs, len(doclist))
+ # assert incoming docs are equal to sent docs
+ for doc in doclist:
+ self.assertEqual(sol1.get_doc(doc.doc_id), doc)
+ # delete remote database
+ db.delete_database()
class LockResourceTestCase(
CouchDBTestCase, TestCaseWithServer):
@@ -455,12 +557,21 @@ class LockResourceTestCase(
CouchDBTestCase.setUp(self)
self.tempdir = tempfile.mkdtemp(prefix="leap_tests-")
self._couch_url = 'http://localhost:' + str(self.wrapper.port)
+ # create the databases
+ CouchDatabase(self._couch_url, 'shared')
+ CouchDatabase(self._couch_url, 'tokens')
self._state = CouchServerState(
self._couch_url, 'shared', 'tokens')
def tearDown(self):
CouchDBTestCase.tearDown(self)
TestCaseWithServer.tearDown(self)
+ # delete remote database
+ db = CouchDatabase(
+ self._couch_url,
+ 'shared',
+ )
+ db.delete_database()
def test__try_obtain_filesystem_lock(self):
responder = mock.Mock()
diff --git a/docs/sphinx/client.rst b/docs/sphinx/client.rst
new file mode 100644
index 00000000..0c608c31
--- /dev/null
+++ b/docs/sphinx/client.rst
@@ -0,0 +1,44 @@
+Soledad Client documentation
+============================
+
+.. automodule:: leap.soledad.client
+ :members:
+ :undoc-members:
+ :private-members:
+ :show-inheritance:
+
+.. automodule:: leap.soledad.client.auth
+ :members:
+ :undoc-members:
+ :private-members:
+ :show-inheritance:
+
+.. automodule:: leap.soledad.client.crypto
+ :members:
+ :undoc-members:
+ :private-members:
+ :show-inheritance:
+
+.. automodule:: leap.soledad.client.shared_db
+ :members:
+ :undoc-members:
+ :private-members:
+ :show-inheritance:
+
+.. automodule:: leap.soledad.client.soledad_db
+ :members:
+ :undoc-members:
+ :private-members:
+ :show-inheritance:
+
+.. automodule:: leap.soledad.client.sqlcipher
+ :members:
+ :undoc-members:
+ :private-members:
+ :show-inheritance:
+
+.. automodule:: leap.soledad.client.target
+ :members:
+ :undoc-members:
+ :private-members:
+ :show-inheritance:
diff --git a/docs/sphinx/common.rst b/docs/sphinx/common.rst
new file mode 100644
index 00000000..8755b3bd
--- /dev/null
+++ b/docs/sphinx/common.rst
@@ -0,0 +1,38 @@
+Soledad Common documentation
+============================
+
+.. automodule:: leap.soledad.common
+ :members:
+ :undoc-members:
+ :private-members:
+ :show-inheritance:
+
+.. automodule:: leap.soledad.common.couch
+ :members:
+ :undoc-members:
+ :private-members:
+ :show-inheritance:
+
+.. automodule:: leap.soledad.common.crypto
+ :members:
+ :undoc-members:
+ :private-members:
+ :show-inheritance:
+
+.. automodule:: leap.soledad.common.ddocs
+ :members:
+ :undoc-members:
+ :private-members:
+ :show-inheritance:
+
+.. automodule:: leap.soledad.common.document
+ :members:
+ :undoc-members:
+ :private-members:
+ :show-inheritance:
+
+.. automodule:: leap.soledad.common.errors
+ :members:
+ :undoc-members:
+ :private-members:
+ :show-inheritance:
diff --git a/docs/sphinx/conf.py b/docs/sphinx/conf.py
new file mode 100644
index 00000000..552e5f56
--- /dev/null
+++ b/docs/sphinx/conf.py
@@ -0,0 +1,266 @@
+# -*- coding: utf-8 -*-
+#
+# Soledad documentation build configuration file, created by
+# sphinx-quickstart on Mon Feb 17 18:20:43 2014.
+#
+# This file is execfile()d with the current directory set to its
+# containing dir.
+#
+# Note that not all possible configuration values are present in this
+# autogenerated file.
+#
+# All configuration values have a default; values that are commented out
+# serve to show the default.
+
+import sys
+import os
+
+# If extensions (or modules to document with autodoc) are in another directory,
+# add these directories to sys.path here. If the directory is relative to the
+# documentation root, use os.path.abspath to make it absolute, like shown here.
+sys.path.insert(0, os.path.abspath('../../common/src'))
+sys.path.insert(0, os.path.abspath('../../client/src'))
+sys.path.insert(0, os.path.abspath('../../server/src'))
+
+# -- General configuration ------------------------------------------------
+
+# If your documentation needs a minimal Sphinx version, state it here.
+#needs_sphinx = '1.0'
+
+# Add any Sphinx extension module names here, as strings. They can be
+# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom
+# ones.
+extensions = [
+ 'sphinx.ext.autodoc',
+ 'sphinx.ext.todo',
+ 'sphinx.ext.coverage',
+ 'sphinx.ext.pngmath',
+ 'sphinx.ext.viewcode',
+]
+
+# Add any paths that contain templates here, relative to this directory.
+templates_path = ['_templates']
+
+# The suffix of source filenames.
+source_suffix = '.rst'
+
+# The encoding of source files.
+#source_encoding = 'utf-8-sig'
+
+# The master toctree document.
+master_doc = 'index'
+
+# General information about the project.
+project = u'Soledad'
+copyright = u'2014, LEAP Encryption Access Project'
+
+# The version info for the project you're documenting, acts as replacement for
+# |version| and |release|, also used in various other places throughout the
+# built documents.
+#
+# The short X.Y version.
+version = '0.4'
+# The full version, including alpha/beta/rc tags.
+release = '0.4.0'
+
+# The language for content autogenerated by Sphinx. Refer to documentation
+# for a list of supported languages.
+#language = None
+
+# There are two options for replacing |today|: either, you set today to some
+# non-false value, then it is used:
+#today = ''
+# Else, today_fmt is used as the format for a strftime call.
+#today_fmt = '%B %d, %Y'
+
+# List of patterns, relative to source directory, that match files and
+# directories to ignore when looking for source files.
+exclude_patterns = ['_build']
+
+# The reST default role (used for this markup: `text`) to use for all
+# documents.
+#default_role = None
+
+# If true, '()' will be appended to :func: etc. cross-reference text.
+#add_function_parentheses = True
+
+# If true, the current module name will be prepended to all description
+# unit titles (such as .. function::).
+#add_module_names = True
+
+# If true, sectionauthor and moduleauthor directives will be shown in the
+# output. They are ignored by default.
+#show_authors = False
+
+# The name of the Pygments (syntax highlighting) style to use.
+pygments_style = 'sphinx'
+
+# A list of ignored prefixes for module index sorting.
+#modindex_common_prefix = []
+
+# If true, keep warnings as "system message" paragraphs in the built documents.
+#keep_warnings = False
+
+
+# -- Options for HTML output ----------------------------------------------
+
+# The theme to use for HTML and HTML Help pages. See the documentation for
+# a list of builtin themes.
+html_theme = 'default'
+
+# Theme options are theme-specific and customize the look and feel of a theme
+# further. For a list of options available for each theme, see the
+# documentation.
+#html_theme_options = {}
+
+# Add any paths that contain custom themes here, relative to this directory.
+#html_theme_path = []
+
+# The name for this set of Sphinx documents. If None, it defaults to
+# "<project> v<release> documentation".
+#html_title = None
+
+# A shorter title for the navigation bar. Default is the same as html_title.
+#html_short_title = None
+
+# The name of an image file (relative to this directory) to place at the top
+# of the sidebar.
+#html_logo = None
+
+# The name of an image file (within the static path) to use as favicon of the
+# docs. This file should be a Windows icon file (.ico) being 16x16 or 32x32
+# pixels large.
+#html_favicon = None
+
+# Add any paths that contain custom static files (such as style sheets) here,
+# relative to this directory. They are copied after the builtin static files,
+# so a file named "default.css" will overwrite the builtin "default.css".
+html_static_path = ['_static']
+
+# Add any extra paths that contain custom files (such as robots.txt or
+# .htaccess) here, relative to this directory. These files are copied
+# directly to the root of the documentation.
+#html_extra_path = []
+
+# If not '', a 'Last updated on:' timestamp is inserted at every page bottom,
+# using the given strftime format.
+#html_last_updated_fmt = '%b %d, %Y'
+
+# If true, SmartyPants will be used to convert quotes and dashes to
+# typographically correct entities.
+#html_use_smartypants = True
+
+# Custom sidebar templates, maps document names to template names.
+#html_sidebars = {}
+
+# Additional templates that should be rendered to pages, maps page names to
+# template names.
+#html_additional_pages = {}
+
+# If false, no module index is generated.
+#html_domain_indices = True
+
+# If false, no index is generated.
+#html_use_index = True
+
+# If true, the index is split into individual pages for each letter.
+#html_split_index = False
+
+# If true, links to the reST sources are added to the pages.
+#html_show_sourcelink = True
+
+# If true, "Created using Sphinx" is shown in the HTML footer. Default is True.
+#html_show_sphinx = True
+
+# If true, "(C) Copyright ..." is shown in the HTML footer. Default is True.
+#html_show_copyright = True
+
+# If true, an OpenSearch description file will be output, and all pages will
+# contain a <link> tag referring to it. The value of this option must be the
+# base URL from which the finished HTML is served.
+#html_use_opensearch = ''
+
+# This is the file name suffix for HTML files (e.g. ".xhtml").
+#html_file_suffix = None
+
+# Output file base name for HTML help builder.
+htmlhelp_basename = 'Soledaddoc'
+
+
+# -- Options for LaTeX output ---------------------------------------------
+
+latex_elements = {
+# The paper size ('letterpaper' or 'a4paper').
+#'papersize': 'letterpaper',
+
+# The font size ('10pt', '11pt' or '12pt').
+#'pointsize': '10pt',
+
+# Additional stuff for the LaTeX preamble.
+#'preamble': '',
+}
+
+# Grouping the document tree into LaTeX files. List of tuples
+# (source start file, target name, title,
+# author, documentclass [howto, manual, or own class]).
+latex_documents = [
+ ('index', 'Soledad.tex', u'Soledad Documentation',
+ u'LEAP Encryption Access Project', 'manual'),
+]
+
+# The name of an image file (relative to this directory) to place at the top of
+# the title page.
+#latex_logo = None
+
+# For "manual" documents, if this is true, then toplevel headings are parts,
+# not chapters.
+#latex_use_parts = False
+
+# If true, show page references after internal links.
+#latex_show_pagerefs = False
+
+# If true, show URL addresses after external links.
+#latex_show_urls = False
+
+# Documents to append as an appendix to all manuals.
+#latex_appendices = []
+
+# If false, no module index is generated.
+#latex_domain_indices = True
+
+
+# -- Options for manual page output ---------------------------------------
+
+# One entry per manual page. List of tuples
+# (source start file, name, description, authors, manual section).
+man_pages = [
+ ('index', 'soledad', u'Soledad Documentation',
+ [u'LEAP Encryption Access Project'], 1)
+]
+
+# If true, show URL addresses after external links.
+#man_show_urls = False
+
+
+# -- Options for Texinfo output -------------------------------------------
+
+# Grouping the document tree into Texinfo files. List of tuples
+# (source start file, target name, title, author,
+# dir menu entry, description, category)
+texinfo_documents = [
+ ('index', 'Soledad', u'Soledad Documentation',
+ u'LEAP Encryption Access Project', 'Soledad', 'One line description of project.',
+ 'Miscellaneous'),
+]
+
+# Documents to append as an appendix to all manuals.
+#texinfo_appendices = []
+
+# If false, no module index is generated.
+#texinfo_domain_indices = True
+
+# How to display URL addresses: 'footnote', 'no', or 'inline'.
+#texinfo_show_urls = 'footnote'
+
+# If true, do not generate a @detailmenu in the "Top" node's menu.
+#texinfo_no_detailmenu = False
diff --git a/docs/sphinx/index.rst b/docs/sphinx/index.rst
new file mode 100644
index 00000000..6298d034
--- /dev/null
+++ b/docs/sphinx/index.rst
@@ -0,0 +1,31 @@
+.. Soledad documentation master file, created by
+ sphinx-quickstart on Mon Feb 17 17:54:47 2014.
+ You can adapt this file completely to your liking, but it should at least
+ contain the root `toctree` directive.
+
+Soledad documentation
+=====================
+
+Contents:
+
+.. toctree::
+ :maxdepth: 2
+
+ common
+ client
+ server
+
+.. automodule:: leap.soledad.common
+ :members:
+ :undoc-members:
+ :private-members:
+ :show-inheritance:
+
+
+Indices and tables
+==================
+
+* :ref:`genindex`
+* :ref:`modindex`
+* :ref:`search`
+
diff --git a/docs/sphinx/server.rst b/docs/sphinx/server.rst
new file mode 100644
index 00000000..f093adf4
--- /dev/null
+++ b/docs/sphinx/server.rst
@@ -0,0 +1,27 @@
+Soledad Server documentation
+============================
+
+.. automodule:: leap.soledad.server
+ :members:
+ :undoc-members:
+ :private-members:
+ :show-inheritance:
+
+.. automodule:: leap.soledad.server.auth
+ :members:
+ :undoc-members:
+ :private-members:
+ :show-inheritance:
+
+.. automodule:: leap.soledad.server.gzip_middleware
+ :members:
+ :undoc-members:
+ :private-members:
+ :show-inheritance:
+
+.. automodule:: leap.soledad.server.lock_resource
+ :members:
+ :undoc-members:
+ :private-members:
+ :show-inheritance:
+
diff --git a/scripts/README.rst b/scripts/README.rst
index fdd1d642..37cf2c0e 100644
--- a/scripts/README.rst
+++ b/scripts/README.rst
@@ -2,16 +2,3 @@ Soledad Scripts
===============
The scripts in this directory are meant to be used for development purposes.
-
-Currently, the scripts are:
-
- * server-side-db.py: Gives access to server-side soledad user database,
- based on the configuration in /etc/leap/soledad-server.conf. One should
- use it as:
-
- python -i server-side-db.py <uuid>
-
- * client-side-db.py: Gives access to client-side soledad user database,
- based on data stored in ~/.config/leap/soledad. One should use it as:
-
- python -i client-side-db.py <uuid> <passphrase>
diff --git a/scripts/backends_cpu_usage/log_cpu_usage.py b/scripts/backends_cpu_usage/log_cpu_usage.py
new file mode 100755
index 00000000..2674e1ff
--- /dev/null
+++ b/scripts/backends_cpu_usage/log_cpu_usage.py
@@ -0,0 +1,46 @@
+#!/usr/bin/python
+
+
+# Get the CPU usage and print to file.
+
+
+import psutil
+import time
+import argparse
+import os
+import threading
+
+
+class LogCpuUsage(threading.Thread):
+
+ def __init__(self, fname):
+ threading.Thread.__init__(self)
+ self._stopped = True
+ self._fname = fname
+
+ def run(self):
+ self._stopped = False
+ with open(self._fname, 'w') as f:
+ start = time.time()
+ while self._stopped is False:
+ now = time.time()
+ f.write("%f %f\n" % ((now - start), psutil.cpu_percent()))
+ time.sleep(0.01)
+
+ def stop(self):
+ self._stopped = True
+
+
+if __name__ == '__main__':
+ parser = argparse.ArgumentParser()
+ parser.add_argument('file', help='where to save output')
+ args = parser.parse_args()
+
+ if os.path.isfile(args.file):
+ replace = raw_input('File %s exists, replace it (y/N)? ' % args.file)
+ if replace.lower() != 'y':
+ print 'Bailing out.'
+ exit(1)
+
+ log_cpu = LogCpuUsage(args.file)
+ log_cpu.run()
diff --git a/scripts/backends_cpu_usage/movingaverage.py b/scripts/backends_cpu_usage/movingaverage.py
new file mode 100644
index 00000000..bac1b3e1
--- /dev/null
+++ b/scripts/backends_cpu_usage/movingaverage.py
@@ -0,0 +1,209 @@
+#!/usr/bin/env python
+#
+# Sean Reifschneider, tummy.com, ltd. <jafo@tummy.com>
+# Released into the Public Domain, 2011-02-06
+
+import itertools
+from itertools import islice
+from collections import deque
+
+
+#########################################################
+def movingaverage(data, subset_size, data_is_list = None,
+ avoid_fp_drift = True):
+ '''Return the moving averages of the data, with a window size of
+ `subset_size`. `subset_size` must be an integer greater than 0 and
+ less than the length of the input data, or a ValueError will be raised.
+
+ `data_is_list` can be used to tune the algorithm for list or iteratable
+ as an input. The default value, `None` will auto-detect this.
+ The algorithm used if `data` is a list is almost twice as fast as if
+ it is an iteratable.
+
+ `avoid_fp_drift`, if True (the default) sums every sub-set rather than
+ keeping a "rolling sum" (which may be subject to floating-point drift).
+ While more correct, it is also dramatically slower for subset sizes
+ much larger than 20.
+
+ NOTE: You really should consider setting `avoid_fp_drift = False` unless
+ you are dealing with very small numbers (say, far smaller than 0.00001)
+ or require extreme accuracy at the cost of execution time. For
+ `subset_size` < 20, the performance difference is very small.
+ '''
+ if subset_size < 1:
+ raise ValueError('subset_size must be 1 or larger')
+
+ if data_is_list is None:
+ data_is_list = hasattr(data, '__getslice__')
+
+ divisor = float(subset_size)
+ if data_is_list:
+ # This only works if we can re-access old elements, but is much faster.
+ # In other words, it can't be just an iterable, it needs to be a list.
+
+ if subset_size > len(data):
+ raise ValueError('subset_size must be smaller than data set size')
+
+ if avoid_fp_drift:
+ for x in range(subset_size, len(data) + 1):
+ yield sum(data[x - subset_size:x]) / divisor
+ else:
+ cur = sum(data[0:subset_size])
+ yield cur / divisor
+ for x in range(subset_size, len(data)):
+ cur += data[x] - data[x - subset_size]
+ yield cur / divisor
+ else:
+ # Based on the recipe at:
+ # http://docs.python.org/library/collections.html#deque-recipes
+ it = iter(data)
+ d = deque(islice(it, subset_size))
+
+ if subset_size > len(d):
+ raise ValueError('subset_size must be smaller than data set size')
+
+ if avoid_fp_drift:
+ yield sum(d) / divisor
+ for elem in it:
+ d.popleft()
+ d.append(elem)
+ yield sum(d) / divisor
+ else:
+ s = sum(d)
+ yield s / divisor
+ for elem in it:
+ s += elem - d.popleft()
+ d.append(elem)
+ yield s / divisor
+
+
+##########################
+if __name__ == '__main__':
+ import unittest
+
+ class TestMovingAverage(unittest.TestCase):
+ ####################
+ def test_List(self):
+ try:
+ list(movingaverage([1,2,3], 0))
+ self.fail('Did not raise ValueError on subset_size=0')
+ except ValueError:
+ pass
+
+ try:
+ list(movingaverage([1,2,3,4,5,6], 7))
+ self.fail('Did not raise ValueError on subset_size > len(data)')
+ except ValueError:
+ pass
+
+ self.assertEqual(list(movingaverage([1,2,3,4,5,6], 1)), [1,2,3,4,5,6])
+ self.assertEqual(list(movingaverage([1,2,3,4,5,6], 2)),
+ [1.5,2.5,3.5,4.5,5.5])
+ self.assertEqual(list(movingaverage(map(float, [1,2,3,4,5,6]), 2)),
+ [1.5,2.5,3.5,4.5,5.5])
+ self.assertEqual(list(movingaverage([1,2,3,4,5,6], 3)), [2,3,4,5])
+ self.assertEqual(list(movingaverage([1,2,3,4,5,6], 4)), [2.5,3.5,4.5])
+ self.assertEqual(list(movingaverage([1,2,3,4,5,6], 5)), [3,4])
+ self.assertEqual(list(movingaverage([1,2,3,4,5,6], 6)), [3.5])
+
+ self.assertEqual(list(movingaverage([40, 30, 50, 46, 39, 44],
+ 3, False)), [40.0,42.0,45.0,43.0])
+ self.assertEqual(list(movingaverage([40, 30, 50, 46, 39, 44],
+ 3, True)), [40.0,42.0,45.0,43.0])
+
+
+ ######################
+ def test_XRange(self):
+ try:
+ list(movingaverage(xrange(1, 4), 0))
+ self.fail('Did not raise ValueError on subset_size=0')
+ except ValueError:
+ pass
+
+ try:
+ list(movingaverage(xrange(1, 7), 7))
+ self.fail('Did not raise ValueError on subset_size > len(data)')
+ except ValueError:
+ pass
+
+ self.assertEqual(list(movingaverage(xrange(1, 7), 1)), [1,2,3,4,5,6])
+ self.assertEqual(list(movingaverage(xrange(1, 7), 2)),
+ [1.5,2.5,3.5,4.5,5.5])
+ self.assertEqual(list(movingaverage(iter(map(float, xrange(1, 7))),
+ 2)), [1.5,2.5,3.5,4.5,5.5])
+ self.assertEqual(list(movingaverage(xrange(1, 7), 3)), [2,3,4,5])
+ self.assertEqual(list(movingaverage(xrange(1, 7), 4)), [2.5,3.5,4.5])
+ self.assertEqual(list(movingaverage(xrange(1, 7), 5)), [3,4])
+ self.assertEqual(list(movingaverage(xrange(1, 7), 6)), [3.5])
+
+
+ ###########################
+ def test_ListRolling(self):
+ try:
+ list(movingaverage([1,2,3], 0, avoid_fp_drift = False))
+ self.fail('Did not raise ValueError on subset_size=0')
+ except ValueError:
+ pass
+
+ try:
+ list(movingaverage([1,2,3,4,5,6], 7, avoid_fp_drift = False))
+ self.fail('Did not raise ValueError on subset_size > len(data)')
+ except ValueError:
+ pass
+
+ self.assertEqual(list(movingaverage([1,2,3,4,5,6], 1,
+ avoid_fp_drift = False)), [1,2,3,4,5,6])
+ self.assertEqual(list(movingaverage([1,2,3,4,5,6], 2,
+ avoid_fp_drift = False)),
+ [1.5,2.5,3.5,4.5,5.5])
+ self.assertEqual(list(movingaverage(map(float, [1,2,3,4,5,6]), 2,
+ avoid_fp_drift = False)), [1.5,2.5,3.5,4.5,5.5])
+ self.assertEqual(list(movingaverage([1,2,3,4,5,6], 3,
+ avoid_fp_drift = False)), [2,3,4,5])
+ self.assertEqual(list(movingaverage([1,2,3,4,5,6], 4,
+ avoid_fp_drift = False)), [2.5,3.5,4.5])
+ self.assertEqual(list(movingaverage([1,2,3,4,5,6], 5,
+ avoid_fp_drift = False)), [3,4])
+ self.assertEqual(list(movingaverage([1,2,3,4,5,6], 6,
+ avoid_fp_drift = False)), [3.5])
+
+ self.assertEqual(list(movingaverage([40, 30, 50, 46, 39, 44],
+ 3, False, avoid_fp_drift = False)), [40.0,42.0,45.0,43.0])
+ self.assertEqual(list(movingaverage([40, 30, 50, 46, 39, 44],
+ 3, True, avoid_fp_drift = False)), [40.0,42.0,45.0,43.0])
+
+
+ #############################
+ def test_XRangeRolling(self):
+ try:
+ list(movingaverage(xrange(1, 4), 0, avoid_fp_drift = False))
+ self.fail('Did not raise ValueError on subset_size=0')
+ except ValueError:
+ pass
+
+ try:
+ list(movingaverage(xrange(1, 7), 7, avoid_fp_drift = False))
+ self.fail('Did not raise ValueError on subset_size > len(data)')
+ except ValueError:
+ pass
+
+ self.assertEqual(list(movingaverage(xrange(1, 7), 1,
+ avoid_fp_drift = False)), [1,2,3,4,5,6])
+ self.assertEqual(list(movingaverage(xrange(1, 7), 2,
+ avoid_fp_drift = False)), [1.5,2.5,3.5,4.5,5.5])
+ self.assertEqual(list(movingaverage(iter(map(float, xrange(1, 7))),
+ 2, avoid_fp_drift = False)), [1.5,2.5,3.5,4.5,5.5])
+ self.assertEqual(list(movingaverage(xrange(1, 7), 3,
+ avoid_fp_drift = False)), [2,3,4,5])
+ self.assertEqual(list(movingaverage(xrange(1, 7), 4,
+ avoid_fp_drift = False)), [2.5,3.5,4.5])
+ self.assertEqual(list(movingaverage(xrange(1, 7), 5,
+ avoid_fp_drift = False)), [3,4])
+ self.assertEqual(list(movingaverage(xrange(1, 7), 6,
+ avoid_fp_drift = False)), [3.5])
+
+
+ ######################################################################
+ suite = unittest.TestLoader().loadTestsFromTestCase(TestMovingAverage)
+ unittest.TextTestRunner(verbosity = 2).run(suite)
+
diff --git a/scripts/backends_cpu_usage/plot.py b/scripts/backends_cpu_usage/plot.py
new file mode 100755
index 00000000..4e5083ad
--- /dev/null
+++ b/scripts/backends_cpu_usage/plot.py
@@ -0,0 +1,81 @@
+#!/usr/bin/python
+
+
+from matplotlib import pyplot as plt
+from movingaverage import movingaverage
+
+
+def smooth(l):
+ return movingaverage(l, 10, data_is_list=True, avoid_fp_drift=False)
+
+
+files = [
+ ('sqlite', 'b'),
+ ('sqlcipher', 'r'),
+ ('u1dblite', 'g'),
+ ('u1dbcipher', 'm'),
+]
+
+
+# config the plot
+plt.xlabel('time (s)')
+plt.ylabel('cpu usage (%)')
+plt.title('u1db backends CPU usage')
+
+
+for fi in files:
+
+ backend = fi[0]
+ color = fi[1]
+ filename = '%s.txt' % backend
+
+ x = []
+ y = []
+
+ xmax = None
+ xmin = None
+ ymax = None
+ ymin = None
+
+ # read data from file
+ with open(filename, 'r') as f:
+ line = f.readline()
+ while line is not None:
+ time, cpu = tuple(line.strip().split(' '))
+ cpu = float(cpu)
+ x.append(float(time))
+ y.append(cpu)
+ if ymax == None or cpu > ymax:
+ ymax = cpu
+ xmax = time
+ if ymin == None or cpu < ymin:
+ ymin = cpu
+ xmin = time
+ line = f.readline()
+ if line == '':
+ break
+
+ kwargs = {
+ 'linewidth': 1.0,
+ 'linestyle': '-',
+ # 'marker': '.',
+ 'color': color,
+ }
+ plt.plot(
+ [n for n in smooth(x)],
+ [n for n in smooth(y)],
+ label=backend, **kwargs)
+
+ #plt.axes().get_xaxis().set_ticks(x)
+ #plt.axes().get_xaxis().set_ticklabels(x)
+
+ # annotate max and min values
+ #plt.axes().annotate("%.2f GB" % ymax, xy=(xmax, ymax))
+ #plt.axes().annotate("%.2f GB" % ymin, xy=(xmin, ymin))
+
+
+plt.ylim(0, 100)
+plt.grid()
+plt.legend()
+plt.show()
+
diff --git a/scripts/backends_cpu_usage/test_u1db_sync.py b/scripts/backends_cpu_usage/test_u1db_sync.py
new file mode 100755
index 00000000..26ef8f9f
--- /dev/null
+++ b/scripts/backends_cpu_usage/test_u1db_sync.py
@@ -0,0 +1,113 @@
+#!/usr/bin/python
+
+
+import u1db
+import tempfile
+import logging
+import shutil
+import os
+import argparse
+import time
+import binascii
+import random
+
+
+from leap.soledad.client.sqlcipher import open as sqlcipher_open
+from log_cpu_usage import LogCpuUsage
+from u1dblite import open as u1dblite_open
+from u1dbcipher import open as u1dbcipher_open
+
+
+DOCS_TO_SYNC = 1000
+SMALLEST_DOC_SIZE = 1 * 1024 # 1 KB
+BIGGEST_DOC_SIZE = 100 * 1024 # 100 KB
+
+
+def get_data(size):
+ return binascii.hexlify(os.urandom(size/2))
+
+
+def run_test(testname, open_fun, tempdir, docs, *args):
+ logger.info('Starting test \"%s\".' % testname)
+
+ # instantiate dbs
+ db1 = open_fun(os.path.join(tempdir, testname + '1.db'), *args)
+ db2 = open_fun(os.path.join(tempdir, testname + '2.db'), *args)
+
+ # get sync target and synchsonizer
+ target = db2.get_sync_target()
+ synchronizer = u1db.sync.Synchronizer(db1, target)
+
+
+ # generate lots of small documents
+ logger.info('Creating %d documents in source db...' % DOCS_TO_SYNC)
+ for content in docs:
+ db1.create_doc(content)
+ logger.info('%d documents created in source db.' % DOCS_TO_SYNC)
+
+ # run the test
+ filename = testname + '.txt'
+ logger.info('Logging CPU usage to %s.' % filename)
+ log_cpu = LogCpuUsage(filename)
+ tstart = time.time()
+
+ # start logging cpu
+ log_cpu.start()
+ logger.info('Sleeping for 5 seconds...')
+ time.sleep(5)
+
+ # sync
+ logger.info('Starting sync...')
+ sstart = time.time()
+ synchronizer.sync()
+ send = time.time()
+ logger.info('Sync finished.')
+
+ # stop logging cpu
+ logger.info('Sleeping for 5 seconds...')
+ time.sleep(5)
+ tend = time.time()
+ log_cpu.stop()
+
+ # report
+ logger.info('Total sync time: %f seconds' % (send - sstart))
+ logger.info('Total test time: %f seconds' % (tend - tstart))
+ logger.info('Finished test \"%s\".' % testname)
+
+ # close dbs
+ db1.close()
+ db2.close()
+
+
+if __name__ == '__main__':
+
+ # configure logger
+ logger = logging.getLogger(__name__)
+ LOG_FORMAT = '%(asctime)s %(message)s'
+ logging.basicConfig(format=LOG_FORMAT, level=logging.INFO)
+
+
+ # get a temporary dir
+ tempdir = tempfile.mkdtemp()
+ logger.info('Using temporary directory %s' % tempdir)
+
+
+ # create a lot of documents with random sizes
+ docs = []
+ for i in xrange(DOCS_TO_SYNC):
+ docs.append({
+ 'index': i,
+ #'data': get_data(
+ # random.randrange(
+ # SMALLEST_DOC_SIZE, BIGGEST_DOC_SIZE))
+ })
+
+ # run tests
+ run_test('sqlite', u1db.open, tempdir, docs, True)
+ run_test('sqlcipher', sqlcipher_open, tempdir, docs, '123456', True)
+ run_test('u1dblite', u1dblite_open, tempdir, docs)
+ run_test('u1dbcipher', u1dbcipher_open, tempdir, docs, '123456', True)
+
+ # remove temporary dir
+ logger.info('Removing temporary directory %s' % tempdir)
+ shutil.rmtree(tempdir)
diff --git a/scripts/build_debian_package.sh b/scripts/build_debian_package.sh
new file mode 100755
index 00000000..cc62c3ac
--- /dev/null
+++ b/scripts/build_debian_package.sh
@@ -0,0 +1,32 @@
+#!/bin/sh
+
+# This script generates Soledad Debian packages.
+#
+# When invoking this script, you should pass a git repository URL and the name
+# of the branch that contains the code you wish to build the packages from.
+#
+# The script will clone the given branch from the given repo, as well as the
+# main Soledad repo in github which contains the most up-to-date debian
+# branch. It will then merge the desired branch into the debian branch and
+# build the packages.
+
+if [ $# -ne 2 ]; then
+ echo "Usage: ${0} <url> <branch>"
+ exit 1
+fi
+
+SOLEDAD_MAIN_REPO=git://github.com/leapcode/soledad.git
+
+url=$1
+branch=$2
+workdir=`mktemp -d`
+
+git clone -b ${branch} ${url} ${workdir}/soledad
+export GIT_DIR=${workdir}/soledad/.git
+export GIT_WORK_TREE=${workdir}/soledad
+git remote add leapcode ${SOLEDAD_MAIN_REPO}
+git fetch leapcode
+git checkout debian
+git merge --no-edit ${branch}
+(cd ${workdir}/soledad && debuild -uc -us)
+echo "Packages generated in ${workdir}"
diff --git a/scripts/client-side-db.py b/scripts/client-side-db.py
deleted file mode 100644
index 0c3df7a4..00000000
--- a/scripts/client-side-db.py
+++ /dev/null
@@ -1,36 +0,0 @@
-#!/usr/bin/python
-
-# This script gives client-side access to one Soledad user database by using
-# the data stored in ~/.config/leap/soledad/
-
-import sys
-import os
-
-from leap.common.config import get_path_prefix
-from leap.soledad.client import Soledad
-
-if len(sys.argv) != 3:
- print 'Usage: %s <uuid> <passphrase>' % sys.argv[0]
- exit(1)
-
-uuid = sys.argv[1]
-passphrase = unicode(sys.argv[2])
-
-secrets_path = os.path.join(get_path_prefix(), 'leap', 'soledad',
- '%s.secret' % uuid)
-local_db_path = os.path.join(get_path_prefix(), 'leap', 'soledad',
- '%s.db' % uuid)
-server_url = 'http://dummy-url'
-cert_file = 'cert'
-
-sol = Soledad(uuid, passphrase, secrets_path, local_db_path, server_url,
- cert_file)
-db = sol._db
-
-# get replica info
-replica_uid = db._replica_uid
-gen, docs = db.get_all_docs()
-print "replica_uid: %s" % replica_uid
-print "generation: %d" % gen
-gen, trans_id = db._get_generation_info()
-print "transaction_id: %s" % trans_id
diff --git a/scripts/db_access/client_side_db.py b/scripts/db_access/client_side_db.py
new file mode 100644
index 00000000..2bf4ab5e
--- /dev/null
+++ b/scripts/db_access/client_side_db.py
@@ -0,0 +1,154 @@
+#!/usr/bin/python
+
+# This script gives client-side access to one Soledad user database.
+
+
+import sys
+import os
+import argparse
+import re
+import tempfile
+import getpass
+import requests
+import json
+import srp._pysrp as srp
+import binascii
+import logging
+
+from leap.common.config import get_path_prefix
+from leap.soledad.client import Soledad
+
+
+# create a logger
+logger = logging.getLogger(__name__)
+LOG_FORMAT = '%(asctime)s %(message)s'
+logging.basicConfig(format=LOG_FORMAT, level=logging.INFO)
+
+
+safe_unhexlify = lambda x: binascii.unhexlify(x) if (
+ len(x) % 2 == 0) else binascii.unhexlify('0' + x)
+
+
+def fail(reason):
+ logger.error('Fail: ' + reason)
+ exit(2)
+
+
+def get_api_info(provider):
+ info = requests.get(
+ 'https://'+provider+'/provider.json', verify=False).json()
+ return info['api_uri'], info['api_version']
+
+
+def login(username, passphrase, provider, api_uri, api_version):
+ usr = srp.User(username, passphrase, srp.SHA256, srp.NG_1024)
+ auth = None
+ try:
+ auth = authenticate(api_uri, api_version, usr).json()
+ except requests.exceptions.ConnectionError:
+ fail('Could not connect to server.')
+ if 'errors' in auth:
+ fail(str(auth['errors']))
+ return api_uri, api_version, auth
+
+
+def authenticate(api_uri, api_version, usr):
+ api_url = "%s/%s" % (api_uri, api_version)
+ session = requests.session()
+ uname, A = usr.start_authentication()
+ params = {'login': uname, 'A': binascii.hexlify(A)}
+ init = session.post(
+ api_url + '/sessions', data=params, verify=False).json()
+ if 'errors' in init:
+ fail('test user not found')
+ M = usr.process_challenge(
+ safe_unhexlify(init['salt']), safe_unhexlify(init['B']))
+ return session.put(api_url + '/sessions/' + uname, verify=False,
+ data={'client_auth': binascii.hexlify(M)})
+
+
+def get_soledad_info(username, provider, passphrase, basedir):
+ api_uri, api_version = get_api_info(provider)
+ auth = login(username, passphrase, provider, api_uri, api_version)
+ # get soledad server url
+ service_url = '%s/%s/config/soledad-service.json' % \
+ (api_uri, api_version)
+ soledad_hosts = requests.get(service_url, verify=False).json()['hosts']
+ hostnames = soledad_hosts.keys()
+ # allow for choosing the host
+ host = hostnames[0]
+ if len(hostnames) > 1:
+ i = 1
+ print "There are many available hosts:"
+ for h in hostnames:
+ print " (%d) %s.%s" % (i, h, provider)
+ i += 1
+ choice = raw_input("Choose a host to use (default: 1): ")
+ if choice != '':
+ host = hostnames[int(choice) - 1]
+ server_url = 'https://%s:%d/user-%s' % \
+ (soledad_hosts[host]['hostname'], soledad_hosts[host]['port'],
+ auth[2]['id'])
+ # get provider ca certificate
+ ca_cert = requests.get('https://%s/ca.crt' % provider, verify=False).text
+ cert_file = os.path.join(basedir, 'ca.crt')
+ with open(cert_file, 'w') as f:
+ f.write(ca_cert)
+ return auth[2]['id'], server_url, cert_file, auth[2]['token']
+
+
+def get_soledad_instance(username, provider, passphrase, basedir):
+ # setup soledad info
+ uuid, server_url, cert_file, token = \
+ get_soledad_info(username, provider, passphrase, basedir)
+ logger.info('UUID is %s' % uuid)
+ logger.info('Server URL is %s' % server_url)
+ secrets_path = os.path.join(
+ basedir, '%s.secret' % uuid)
+ local_db_path = os.path.join(
+ basedir, '%s.db' % uuid)
+ # instantiate soledad
+ return Soledad(
+ uuid,
+ unicode(passphrase),
+ secrets_path=secrets_path,
+ local_db_path=local_db_path,
+ server_url=server_url,
+ cert_file=cert_file,
+ auth_token=token)
+
+
+# main program
+
+if __name__ == '__main__':
+
+ class ValidateUserHandle(argparse.Action):
+ def __call__(self, parser, namespace, values, option_string=None):
+ m = re.compile('^([^@]+)@([^@]+\.[^@]+)$')
+ res = m.match(values)
+ if res == None:
+ parser.error('User handle should have the form user@provider.')
+ setattr(namespace, 'username', res.groups()[0])
+ setattr(namespace, 'provider', res.groups()[1])
+
+ # parse command line
+ parser = argparse.ArgumentParser()
+ parser.add_argument(
+ 'user@provider', action=ValidateUserHandle, help='the user handle')
+ parser.add_argument(
+ '-b', dest='basedir', required=False, default=None, help='the user handle')
+ args = parser.parse_args()
+
+ # get the password
+ passphrase = getpass.getpass(
+ 'Password for %s@%s: ' % (args.username, args.provider))
+
+ # get the basedir
+ basedir = args.basedir
+ if basedir is None:
+ basedir = tempfile.mkdtemp()
+ logger.info('Using %s as base directory.' % basedir)
+
+ # get the soledad instance
+ s = get_soledad_instance(
+ args.username, args.provider, passphrase, basedir)
diff --git a/scripts/db_access/reset_db.py b/scripts/db_access/reset_db.py
new file mode 100644
index 00000000..80871856
--- /dev/null
+++ b/scripts/db_access/reset_db.py
@@ -0,0 +1,79 @@
+#!/usr/bin/python
+
+# This script can be run on server side to completelly reset a user database.
+#
+# WARNING: running this script over a database will delete all documents but
+# the one with id u1db_config (which contains db metadata) and design docs
+# needed for couch backend.
+
+
+import sys
+from ConfigParser import ConfigParser
+import threading
+import logging
+from couchdb import Database as CouchDatabase
+
+
+if len(sys.argv) != 2:
+ print 'Usage: %s <uuid>' % sys.argv[0]
+ exit(1)
+
+uuid = sys.argv[1]
+
+
+# create a logger
+logger = logging.getLogger(__name__)
+LOG_FORMAT = '%(asctime)s %(message)s'
+logging.basicConfig(format=LOG_FORMAT, level=logging.INFO)
+
+
+# get couch url
+cp = ConfigParser()
+cp.read('/etc/leap/soledad-server.conf')
+url = cp.get('soledad-server', 'couch_url')
+
+
+# confirm
+yes = raw_input("Are you sure you want to reset the database for user %s "
+ "(type YES)? " % uuid)
+if yes != 'YES':
+ print 'Bailing out...'
+ exit(2)
+
+
+db = CouchDatabase('%s/user-%s' % (url, uuid))
+
+
+class _DeleterThread(threading.Thread):
+
+ def __init__(self, db, doc_id, release_fun):
+ threading.Thread.__init__(self)
+ self._db = db
+ self._doc_id = doc_id
+ self._release_fun = release_fun
+
+ def run(self):
+ logger.info('[%s] deleting doc...' % self._doc_id)
+ del self._db[self._doc_id]
+ logger.info('[%s] done.' % self._doc_id)
+ self._release_fun()
+
+
+semaphore_pool = threading.BoundedSemaphore(value=20)
+
+
+threads = []
+for doc_id in db:
+ if doc_id != 'u1db_config' and not doc_id.startswith('_design'):
+ semaphore_pool.acquire()
+ logger.info('[main] launching thread for doc: %s' % doc_id)
+ t = _DeleterThread(db, doc_id, semaphore_pool.release)
+ t.start()
+ threads.append(t)
+
+
+logger.info('[main] waiting for threads.')
+map(lambda thread: thread.join(), threads)
+
+
+logger.info('[main] done.')
diff --git a/scripts/server-side-db.py b/scripts/db_access/server_side_db.py
index 01a9aaac..18641a0f 100644
--- a/scripts/server-side-db.py
+++ b/scripts/db_access/server_side_db.py
@@ -2,6 +2,10 @@
# This script gives server-side access to one Soledad user database by using
# the configuration stored in /etc/leap/soledad-server.conf.
+#
+# Use it like this:
+#
+# python -i server-side-db.py <uuid>
import sys
from ConfigParser import ConfigParser
diff --git a/scripts/doc_put_memory_usage/find_max_upload_size.py b/scripts/doc_put_memory_usage/find_max_upload_size.py
new file mode 100755
index 00000000..02c68015
--- /dev/null
+++ b/scripts/doc_put_memory_usage/find_max_upload_size.py
@@ -0,0 +1,169 @@
+#!/usr/bin/python
+
+# This script finds the maximum upload size for a document in the current
+# server. It pulls couch URL from Soledad config file and attempts multiple
+# PUTs until it finds the maximum size supported by the server.
+#
+# As the Soledad couch user is not an admin, you have to pass a database into
+# which the test will be run. The database should already exist and be
+# initialized with soledad design documents.
+#
+# Use it like this:
+#
+# ./find_max_upload_size.py <dbname>
+# ./find_max_upload_size.py -h
+
+import os
+import configparser
+import logging
+import argparse
+import random
+import string
+import binascii
+import json
+import time
+import uuid
+
+
+from couchdb.client import Database
+from socket import error as socket_error
+from leap.soledad.common.couch import CouchDatabase
+
+
+SOLEDAD_CONFIG_FILE = '/etc/leap/soledad-server.conf'
+PREFIX = '/tmp/soledad_test'
+LOG_FORMAT = '%(asctime)s %(levelname)s %(message)s'
+RETRIES = 3 # number of times to retry uploading a document of a certain
+ # size after a failure
+
+
+# configure logger
+logger = logging.getLogger(__name__)
+
+
+def config_log(level):
+ logging.basicConfig(format=LOG_FORMAT, level=level)
+
+
+def log_to_file(filename):
+ handler = logging.FileHandler(filename, mode='a')
+ handler.setFormatter(logging.Formatter(fmt=LOG_FORMAT))
+ logger.addHandler(handler)
+
+
+# create test dir
+if not os.path.exists(PREFIX):
+ os.mkdir(PREFIX)
+
+
+def get_couch_url(config_file=SOLEDAD_CONFIG_FILE):
+ config = configparser.ConfigParser()
+ config.read(config_file)
+ return config['soledad-server']['couch_url']
+
+
+# generate or load an uploadable doc with the given size in mb
+def get_content(size):
+ fname = os.path.join(PREFIX, 'content-%d.json' % size)
+ if os.path.exists(fname):
+ logger.debug('Loading content with %d MB...' % size)
+ with open(fname, 'r') as f:
+ return f.read()
+ else:
+ length = int(size * 1024 ** 2)
+ logger.debug('Generating body with %d MB...' % size)
+ content = binascii.hexlify(os.urandom(length))[:length]
+ with open(fname, 'w') as f:
+ f.write(content)
+ return content
+
+
+def delete_doc(db):
+ doc = db.get('largedoc')
+ db.delete(doc)
+
+
+def upload(db, size, couch_db):
+ # try many times to be sure that size is infeasible
+ for i in range(RETRIES):
+ # wait until server is up to upload
+ while True:
+ try:
+ 'largedoc' in couch_db
+ break
+ except socket_error:
+ logger.debug('Waiting for server to come up...')
+ time.sleep(1)
+ # attempt to upload
+ try:
+ logger.debug(
+ 'Trying to upload %d MB document (attempt %d/%d)...' %
+ (size, (i+1), RETRIES))
+ content = get_content(size)
+ logger.debug('Starting upload of %d bytes.' % len(content))
+ doc = db.create_doc({'data': content}, doc_id='largedoc')
+ delete_doc(couch_db)
+ logger.debug('Success uploading %d MB doc.' % size)
+ return True
+ except Exception as e:
+ logger.debug('Failed to upload %d MB doc: %s' % (size, str(e)))
+ return False
+
+
+def find_max_upload_size(db_uri):
+ db = CouchDatabase.open_database(db_uri, False)
+ couch_db = Database(db_uri)
+ logger.debug('Database URI: %s' % db_uri)
+ # delete eventual leftover from last run
+ if 'largedoc' in couch_db:
+ delete_doc(couch_db)
+ # phase 1: increase upload size exponentially
+ logger.info('Starting phase 1: increasing size exponentially.')
+ size = 1
+ #import ipdb; ipdb.set_trace()
+ while True:
+ if upload(db, size, couch_db):
+ size *= 2
+ else:
+ break
+
+ # phase 2: binary search for maximum value
+ unable = size
+ able = size / 2
+ logger.info('Starting phase 2: binary search for maximum value.')
+ while unable - able > 1:
+ size = able + ((unable - able) / 2)
+ if upload(db, size, couch_db):
+ able = size
+ else:
+ unable = size
+ return able
+
+
+if __name__ == '__main__':
+ # parse command line
+ parser = argparse.ArgumentParser()
+ parser.add_argument(
+ '-d', action='store_true', dest='debug',
+ help='print debugging information')
+ parser.add_argument(
+ '-l', dest='logfile',
+ help='log output to file')
+ parser.add_argument(
+ 'db_uri', help='the couch database URI to test')
+ args = parser.parse_args()
+
+ # log to file
+ if args.logfile is not None:
+ log_to_file(args.logfile)
+
+ # set loglevel
+ if args.debug is True:
+ config_log(logging.DEBUG)
+ else:
+ config_log(logging.INFO)
+
+ # run test and report
+ logger.info('Will test using db at %s.' % args.db_uri)
+ maxsize = find_max_upload_size(args.db_uri)
+ logger.info('Max upload size is %d MB.' % maxsize)
diff --git a/scripts/doc_put_memory_usage/get-mem.py b/scripts/doc_put_memory_usage/get-mem.py
new file mode 100755
index 00000000..d64875fc
--- /dev/null
+++ b/scripts/doc_put_memory_usage/get-mem.py
@@ -0,0 +1,16 @@
+#!/usr/bin/python
+
+
+import psutil
+import time
+
+
+delta = 50 * 60
+start = time.time()
+
+while True:
+ now = time.time()
+ print "%s %s" % (now - start, psutil.phymem_usage().used)
+ time.sleep(0.1)
+ if now > start + delta:
+ break
diff --git a/scripts/doc_put_memory_usage/plot-mem.py b/scripts/doc_put_memory_usage/plot-mem.py
new file mode 100755
index 00000000..e24679a2
--- /dev/null
+++ b/scripts/doc_put_memory_usage/plot-mem.py
@@ -0,0 +1,73 @@
+#!/usr/bin/python
+
+
+from matplotlib import pyplot as plt
+
+
+files = [
+ ('local', 'couchdb-json', 'b'),
+ ('local', 'bigcouch-json', 'r'),
+ ('local', 'couchdb-multipart', 'g'),
+ ('local', 'bigcouch-multipart', 'm'),
+]
+
+
+# config the plot
+plt.xlabel('time')
+plt.ylabel('memory usage')
+plt.title('bigcouch versus couch memory usage')
+
+
+for fi in files:
+
+ machine = fi[0]
+ database = fi[1]
+ color = fi[2]
+ filename = '%s-%s.txt' % (machine, database)
+
+ x = []
+ y = []
+
+ xmax = None
+ xmin = None
+ ymax = None
+ ymin = None
+
+ # read data from file
+ with open(filename, 'r') as f:
+ line = f.readline()
+ while line is not None:
+ time, mem = tuple(line.strip().split(' '))
+ mem = float(mem) / (10**9)
+ x.append(float(time))
+ y.append(mem)
+ if ymax == None or mem > ymax:
+ ymax = mem
+ xmax = time
+ if ymin == None or mem < ymin:
+ ymin = mem
+ xmin = time
+ line = f.readline()
+ if line == '':
+ break
+
+ kwargs = {
+ 'linewidth': 1.0,
+ 'linestyle': '-',
+ # 'marker': '.',
+ 'color': color,
+ }
+ plt.plot(x, y, label=database, **kwargs)
+
+ #plt.axes().get_xaxis().set_ticks(x)
+ #plt.axes().get_xaxis().set_ticklabels(x)
+
+ # annotate max and min values
+ #plt.axes().annotate("%.2f GB" % ymax, xy=(xmax, ymax))
+ #plt.axes().annotate("%.2f GB" % ymin, xy=(xmin, ymin))
+
+
+plt.grid()
+plt.legend()
+plt.show()
+
diff --git a/scripts/migrate_dbs.py b/scripts/migrate_dbs.py
deleted file mode 100644
index f1c20d87..00000000
--- a/scripts/migrate_dbs.py
+++ /dev/null
@@ -1,288 +0,0 @@
-#!/usr/bin/python
-
-import sys
-import json
-import logging
-import argparse
-import re
-import threading
-from urlparse import urlparse
-from ConfigParser import ConfigParser
-from couchdb.client import Server
-from couchdb.http import ResourceNotFound, Resource, Session
-from datetime import datetime
-
-from leap.soledad.common.couch import CouchDatabase
-
-
-# parse command line for the log file name
-logger_fname = "/tmp/u1db-couch-db-migration_%s.log" % \
- str(datetime.now()).replace(' ', '_')
-parser = argparse.ArgumentParser()
-parser.add_argument('--log', action='store', default=logger_fname, type=str,
- required=False, help='the name of the log file', nargs=1)
-args = parser.parse_args()
-
-
-# configure the logger
-logger = logging.getLogger(__name__)
-logger.setLevel(logging.DEBUG)
-print "Logging to %s." % args.log
-logging.basicConfig(
- filename=args.log,
- format="%(asctime)-15s %(message)s")
-
-
-# configure threads
-max_threads = 20
-semaphore_pool = threading.BoundedSemaphore(value=max_threads)
-
-# get couch url
-cp = ConfigParser()
-cp.read('/etc/leap/soledad-server.conf')
-url = cp.get('soledad-server', 'couch_url')
-
-resource = Resource(url, Session(retry_delays=[1,2,4,8], timeout=10))
-server = Server(url=resource)
-
-hidden_url = re.sub(
- 'http://(.*):.*@',
- 'http://\\1:xxxxx@',
- url)
-
-print """
-==========
-ATTENTION!
-==========
-
-This script will modify Soledad's shared and user databases in:
-
- %s
-
-This script does not make a backup of the couch db data, so make sure youj
-have a copy or you may loose data.
-""" % hidden_url
-confirm = raw_input("Proceed (type uppercase YES)? ")
-
-if confirm != "YES":
- exit(1)
-
-
-#
-# Thread
-#
-
-class DocWorkerThread(threading.Thread):
-
- def __init__(self, dbname, doc_id, db_idx, db_len, doc_idx, doc_len,
- transaction_log, conflict_log, release_fun):
- threading.Thread.__init__(self)
- resource = Resource(url, Session(retry_delays=[1,2,4,8], timeout=10))
- server = Server(url=resource)
- self._dbname = dbname
- self._cdb = server[self._dbname]
- self._doc_id = doc_id
- self._db_idx = db_idx
- self._db_len = db_len
- self._doc_idx = doc_idx
- self._doc_len = doc_len
- self._transaction_log = transaction_log
- self._conflict_log = conflict_log
- self._release_fun = release_fun
-
- def run(self):
-
- old_doc = self._cdb[self._doc_id]
-
- # skip non u1db docs
- if 'u1db_rev' not in old_doc:
- logger.debug('(%d/%d) (%d/%d) Skipping %s/%s).' %
- (self._db_idx, self._db_len, self._doc_idx,
- self._doc_len, self._dbname, self._doc_id))
- self._release_fun()
- return
- else:
- logger.debug('(%d/%d) (%d/%d) Processing %s/%s ...' %
- (self._db_idx, self._db_len, self._doc_idx,
- self._doc_len, self._dbname, self._doc_id))
-
- doc = {
- '_id': self._doc_id,
- '_rev': old_doc['_rev'],
- 'u1db_rev': old_doc['u1db_rev']
- }
- attachments = []
-
- # add transactions
- doc['u1db_transactions'] = map(
- lambda (gen, doc_id, trans_id): (gen, trans_id),
- filter(
- lambda (gen, doc_id, trans_id): doc_id == doc['_id'],
- self._transaction_log))
- if len(doc['u1db_transactions']) == 0:
- del doc['u1db_transactions']
-
- # add conflicts
- if doc['_id'] in self._conflict_log:
- attachments.append([
- conflict_log[doc['_id']],
- 'u1db_conflicts',
- "application/octet-stream"])
-
- # move document's content to 'u1db_content' attachment
- content = self._cdb.get_attachment(doc, 'u1db_json')
- if content is not None:
- attachments.append([
- content,
- 'u1db_content',
- "application/octet-stream"])
- #self._cdb.delete_attachment(doc, 'u1db_json')
-
- # save modified doc
- self._cdb.save(doc)
-
- # save all doc attachments
- for content, att_name, content_type in attachments:
- self._cdb.put_attachment(
- doc,
- content,
- filename=att_name,
- content_type=content_type)
-
- # release the semaphore
- self._release_fun()
-
-
-db_idx = 0
-db_len = len(server)
-for dbname in server:
-
- db_idx += 1
-
- if not (dbname.startswith('user-') or dbname == 'shared') \
- or dbname == 'user-test-db':
- logger.info("(%d/%d) Skipping db %s." % (db_idx, db_len, dbname))
- continue
-
- logger.info("(%d/%d) Migrating db %s." % (db_idx, db_len, dbname))
-
- # get access to couch db
- cdb = Server(url)[dbname]
-
- # get access to soledad db
- sdb = CouchDatabase(url, dbname)
-
- # Migration table
- # ---------------
- #
- # * Metadata that was previously stored in special documents migrate to
- # inside documents, to allow for atomic doc-and-metadata updates.
- # * Doc content attachment name changes.
- # * Indexes are removed, to be implemented in the future possibly as
- # design docs view functions.
- #
- # +-----------------+-------------------------+-------------------------+
- # | Data | old storage | new storage |
- # |-----------------+-------------------------+-------------------------+
- # | doc content | <doc_id>/u1db_json | <doc_id>/u1db_content |
- # | doc conflicts | u1db/_conflicts | <doc_id>/u1db_conflicts |
- # | transaction log | u1db/_transaction_log | doc.u1db_transactions |
- # | sync log | u1db/_other_generations | u1db_sync_log |
- # | indexes | u1db/_indexes | not implemented |
- # | replica uid | u1db/_replica_uid | u1db_config |
- # +-----------------+-------------------------+-------------------------+
-
- def get_att_content(db, doc_id, att_name):
- try:
- return json.loads(
- db.get_attachment(
- doc_id, att_name).read())['content']
- except:
- import ipdb
- ipdb.set_trace()
-
- # only migrate databases that have the 'u1db/_replica_uid' document
- try:
- metadoc = cdb.get('u1db/_replica_uid')
- replica_uid = get_att_content(cdb, 'u1db/_replica_uid', 'u1db_json')
- except ResourceNotFound:
- continue
-
- #---------------------------------------------------------------------
- # Step 1: Set replica uid.
- #---------------------------------------------------------------------
- sdb._set_replica_uid(replica_uid)
-
- #---------------------------------------------------------------------
- # Step 2: Obtain metadata.
- #---------------------------------------------------------------------
-
- # obtain the transaction log: [['<doc_id>', '<trans_id>'], ...]
- transaction_log = get_att_content(
- cdb, 'u1db/_transaction_log', 'u1db_json')
- new_transaction_log = []
- gen = 1
- for (doc_id, trans_id) in transaction_log:
- new_transaction_log.append((gen, doc_id, trans_id))
- gen += 1
- transaction_log = new_transaction_log
-
- # obtain the conflict log: {'<doc_id>': ['<rev>', '<content>'], ...}
- conflict_log = get_att_content(cdb, 'u1db/_conflicts', 'u1db_json')
-
- # obtain the sync log:
- # {'<replica_uid>': ['<gen>', '<transaction_id>'], ...}
- other_generations = get_att_content(
- cdb, 'u1db/_other_generations', 'u1db_json')
-
- #---------------------------------------------------------------------
- # Step 3: Iterate over all documents in database.
- #---------------------------------------------------------------------
- doc_len = len(cdb)
- logger.info("(%d, %d) Found %d documents." % (db_idx, db_len, doc_len))
- doc_idx = 0
- threads = []
- for doc_id in cdb:
- doc_idx = doc_idx + 1
-
- semaphore_pool.acquire()
- thread = DocWorkerThread(dbname, doc_id, db_idx, db_len,
- doc_idx, doc_len, transaction_log,
- conflict_log, semaphore_pool.release)
- thread.daemon = True
- thread.start()
- threads.append(thread)
-
- map(lambda thread: thread.join(), threads)
-
- #---------------------------------------------------------------------
- # Step 4: Move sync log.
- #---------------------------------------------------------------------
-
- # move sync log
- sync_doc = {
- '_id': 'u1db_sync_log',
- 'syncs': []
- }
-
- for replica_uid in other_generations:
- gen, transaction_id = other_generations[replica_uid]
- sync_doc['syncs'].append([replica_uid, gen, transaction_id])
- cdb.save(sync_doc)
-
- #---------------------------------------------------------------------
- # Step 5: Delete old meta documents.
- #---------------------------------------------------------------------
-
- # remove unused docs
- for doc_id in ['_transaction_log', '_conflicts', '_other_generations',
- '_indexes', '_replica_uid']:
- for prefix in ['u1db/', 'u1db%2F']:
- try:
- doc = cdb['%s%s' % (prefix, doc_id)]
- logger.info(
- "(%d/%d) Deleting %s/%s/%s." %
- (db_idx, db_len, dbname, 'u1db', doc_id))
- cdb.delete(doc)
- except ResourceNotFound:
- pass
diff --git a/scripts/profiling/sync/sync-many.py b/scripts/profiling/sync/sync-many.py
new file mode 100644
index 00000000..83793b0a
--- /dev/null
+++ b/scripts/profiling/sync/sync-many.py
@@ -0,0 +1,125 @@
+#!/usr/bin/python
+
+# The purpose of this script is to stress a soledad server by:
+#
+# - Instantiating multiple clients.
+# - Creating many documents in each client.
+# - Syncing all at the same time with th server multiple times, until
+# they've all reached an agreement on the state of the databases and
+# there's nothing else to be synced.
+
+
+import threading
+import tempfile
+import argparse
+import logging
+import re
+import getpass
+import time
+import shutil
+
+
+from client_side_db import get_soledad_instance
+
+
+from leap.soledad.client import BootstrapSequenceError
+
+
+NUMBER_OF_REPLICAS = 1
+DOCUMENTS_PER_REPLICA = 10
+
+
+# create a logger
+logger = logging.getLogger(__name__)
+LOG_FORMAT = '%(asctime)s %(message)s'
+logging.basicConfig(format=LOG_FORMAT, level=logging.INFO)
+
+
+class WorkerThread(threading.Thread):
+
+ def __init__(self, thread_id, soledad, all_set):
+ threading.Thread.__init__(self)
+ self._id = thread_id
+ self._soledad = soledad
+ self._all_set = all_set
+ self._done_creating = threading.Event()
+
+ def run(self):
+ # create many documents
+ logger.info('[replica %d] creating documents...' % self._id)
+ for i in xrange(DOCUMENTS_PER_REPLICA):
+ self._soledad.create_doc({'a_doc': i})
+ # wait for others
+ self._done_creating.set()
+ logger.info('[replica %d] done creating documents.' % self._id)
+ self._all_set.wait()
+ # sync
+ successes = 0
+ while True:
+ logger.info('[replica %d] syncing.' % self._id)
+ if self._id == 1:
+ time.sleep(5)
+ old_gen = self._soledad.sync()
+ logger.info('[replica %d] synced.' % self._id)
+ new_gen = self._soledad._db._get_generation()
+ logger.info('[replica %d] old gen %d - new gen %d.' %
+ (self._id, old_gen, new_gen))
+ if old_gen == new_gen:
+ successes += 1
+ logger.info('[replica %d] sync not needed.' % self._id)
+ if successes == 3:
+ break
+
+
+def stress_test(username, provider, passphrase, basedir):
+ threads = []
+ all_set = threading.Event()
+ for i in xrange(NUMBER_OF_REPLICAS):
+ logging.info('[main] starting thread %d.' % i)
+ s = get_soledad_instance(
+ username,
+ provider,
+ passphrase,
+ tempfile.mkdtemp(dir=basedir))
+ t = WorkerThread(i, s, all_set)
+ t.start()
+ threads.append(t)
+ map(lambda t: t._done_creating.wait(), threads)
+ all_set.set()
+ map(lambda t: t.join(), threads)
+ logger.info('Removing dir %s' % basedir)
+ shutil.rmtree(basedir)
+
+
+# main program
+
+if __name__ == '__main__':
+
+ class ValidateUserHandle(argparse.Action):
+ def __call__(self, parser, namespace, values, option_string=None):
+ m = re.compile('^([^@]+)@([^@]+\.[^@]+)$')
+ res = m.match(values)
+ if res == None:
+ parser.error('User handle should have the form user@provider.')
+ setattr(namespace, 'username', res.groups()[0])
+ setattr(namespace, 'provider', res.groups()[1])
+
+ # parse command line
+ parser = argparse.ArgumentParser()
+ parser.add_argument(
+ 'user@provider', action=ValidateUserHandle, help='the user handle')
+ parser.add_argument(
+ '-b', dest='basedir', required=False, default=None, help='the user handle')
+ args = parser.parse_args()
+
+ # get the password
+ passphrase = getpass.getpass(
+ 'Password for %s@%s: ' % (args.username, args.provider))
+
+ # get the basedir
+ basedir = args.basedir
+ if basedir is None:
+ basedir = tempfile.mkdtemp()
+ logger.info('[main] using %s as base directory.' % basedir)
+
+ stress_test(args.username, args.provider, passphrase, basedir)
diff --git a/scripts/update_design_docs.py b/scripts/update_design_docs.py
new file mode 100644
index 00000000..e7b5a29c
--- /dev/null
+++ b/scripts/update_design_docs.py
@@ -0,0 +1,147 @@
+#!/usr/bin/python
+
+# This script updates Soledad's design documents in the session database and
+# all user databases with contents from the installed leap.soledad.common
+# package.
+
+import json
+import logging
+import argparse
+import re
+import threading
+import binascii
+
+
+from getpass import getpass
+from ConfigParser import ConfigParser
+from couchdb.client import Server
+from couchdb.http import Resource, Session
+from datetime import datetime
+from urlparse import urlparse
+
+
+from leap.soledad.common import ddocs
+
+
+# parse command line for the log file name
+logger_fname = "/tmp/update-design-docs_%s.log" % \
+ str(datetime.now()).replace(' ', '_')
+parser = argparse.ArgumentParser()
+parser.add_argument('--log', action='store', default=logger_fname, type=str,
+ required=False, help='the name of the log file', nargs=1)
+args = parser.parse_args()
+
+
+# configure the logger
+logger = logging.getLogger(__name__)
+logger.setLevel(logging.DEBUG)
+print "Logging to %s." % args.log
+logging.basicConfig(
+ filename=args.log,
+ format="%(asctime)-15s %(message)s")
+
+
+# configure threads
+max_threads = 20
+semaphore_pool = threading.BoundedSemaphore(value=max_threads)
+threads = []
+
+# get couch url
+cp = ConfigParser()
+cp.read('/etc/leap/soledad-server.conf')
+url = urlparse(cp.get('soledad-server', 'couch_url'))
+
+# get admin password
+netloc = re.sub('^.*@', '', url.netloc)
+url = url._replace(netloc=netloc)
+password = getpass("Admin password for %s: " % url.geturl())
+url = url._replace(netloc='admin:%s@%s' % (password, netloc))
+
+resource = Resource(url.geturl(), Session(retry_delays=[1,2,4,8], timeout=10))
+server = Server(url=resource)
+
+hidden_url = re.sub(
+ 'http://(.*):.*@',
+ 'http://\\1:xxxxx@',
+ url.geturl())
+
+print """
+==========
+ATTENTION!
+==========
+
+This script will modify Soledad's shared and user databases in:
+
+ %s
+
+This script does not make a backup of the couch db data, so make sure you
+have a copy or you may loose data.
+""" % hidden_url
+confirm = raw_input("Proceed (type uppercase YES)? ")
+
+if confirm != "YES":
+ exit(1)
+
+# convert design doc content
+
+design_docs = {
+ '_design/docs': json.loads(binascii.a2b_base64(ddocs.docs)),
+ '_design/syncs': json.loads(binascii.a2b_base64(ddocs.syncs)),
+ '_design/transactions': json.loads(binascii.a2b_base64(ddocs.transactions)),
+}
+
+#
+# Thread
+#
+
+class DBWorkerThread(threading.Thread):
+
+ def __init__(self, server, dbname, db_idx, db_len, release_fun):
+ threading.Thread.__init__(self)
+ self._dbname = dbname
+ self._cdb = server[self._dbname]
+ self._db_idx = db_idx
+ self._db_len = db_len
+ self._release_fun = release_fun
+
+ def run(self):
+
+ logger.info("(%d/%d) Updating db %s." % (self._db_idx, self._db_len,
+ self._dbname))
+
+ for doc_id in design_docs:
+ doc = self._cdb[doc_id]
+ for key in ['lists', 'views', 'updates']:
+ if key in design_docs[doc_id]:
+ doc[key] = design_docs[doc_id][key]
+ self._cdb.save(doc)
+
+ # release the semaphore
+ self._release_fun()
+
+
+db_idx = 0
+db_len = len(server)
+for dbname in server:
+
+ db_idx += 1
+
+ if not (dbname.startswith('user-') or dbname == 'shared') \
+ or dbname == 'user-test-db':
+ logger.info("(%d/%d) Skipping db %s." % (db_idx, db_len, dbname))
+ continue
+
+
+ # get access to couch db
+ cdb = Server(url.geturl())[dbname]
+
+ #---------------------------------------------------------------------
+ # Start DB worker thread
+ #---------------------------------------------------------------------
+ semaphore_pool.acquire()
+ thread = DBWorkerThread(server, dbname, db_idx, db_len, semaphore_pool.release)
+ thread.daemon = True
+ thread.start()
+ threads.append(thread)
+
+map(lambda thread: thread.join(), threads)
diff --git a/server/changes/VERSION_COMPAT b/server/changes/VERSION_COMPAT
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/server/changes/VERSION_COMPAT
diff --git a/server/changes/feature_enable-gzip b/server/changes/feature_enable-gzip
deleted file mode 100644
index 5cc1597c..00000000
--- a/server/changes/feature_enable-gzip
+++ /dev/null
@@ -1 +0,0 @@
- o Enable Gzip compression on the soledad wsgi app.
diff --git a/server/src/leap/soledad/server/__init__.py b/server/src/leap/soledad/server/__init__.py
index a4b25fe2..c170f230 100644
--- a/server/src/leap/soledad/server/__init__.py
+++ b/server/src/leap/soledad/server/__init__.py
@@ -87,9 +87,6 @@ and lock documents on the shared database is handled by
"""
import configparser
-import time
-import hashlib
-import os
from u1db.remote import http_app
@@ -98,10 +95,6 @@ from u1db.remote import http_app
from OpenSSL import tsafe
old_tsafe = tsafe
-from twisted.web.wsgi import WSGIResource
-from twisted.internet import reactor
-from twisted.internet.error import TimeoutError
-from twisted.python.lockfile import FilesystemLock
from twisted import version
if version.base() == "12.0.0":
# Put OpenSSL's tsafe back into place. This can probably be removed if we
@@ -111,23 +104,20 @@ if version.base() == "12.0.0":
from leap.soledad.server.auth import SoledadTokenAuthMiddleware
from leap.soledad.server.gzip_middleware import GzipMiddleware
+from leap.soledad.server.lock_resource import LockResource
-from leap.soledad.common import (
- SHARED_DB_NAME,
- SHARED_DB_LOCK_DOC_ID_PREFIX,
-)
+from leap.soledad.common import SHARED_DB_NAME
from leap.soledad.common.couch import CouchServerState
-from leap.soledad.common.errors import (
- InvalidTokenError,
- NotLockedError,
- AlreadyLockedError,
-)
#-----------------------------------------------------------------------------
# Soledad WSGI application
#-----------------------------------------------------------------------------
+MAX_REQUEST_SIZE = 200 # in Mb
+MAX_ENTRY_SIZE = 200 # in Mb
+
+
class SoledadApp(http_app.HTTPApp):
"""
Soledad WSGI application
@@ -138,6 +128,9 @@ class SoledadApp(http_app.HTTPApp):
The name of the shared database that holds user's encrypted secrets.
"""
+ max_request_size = MAX_REQUEST_SIZE * 1024 * 1024
+ max_entry_size = MAX_ENTRY_SIZE * 1024 * 1024
+
def __call__(self, environ, start_response):
"""
Handle a WSGI call to the Soledad application.
@@ -151,194 +144,12 @@ class SoledadApp(http_app.HTTPApp):
@return: HTTP application results.
@rtype: list
"""
- # ensure the shared database exists
- self.state.ensure_database(self.SHARED_DB_NAME)
return http_app.HTTPApp.__call__(self, environ, start_response)
-#
-# LockResource: a lock based on a document in the shared database.
-#
-
-@http_app.url_to_resource.register
-class LockResource(object):
- """
- Handle requests for locking documents.
-
- This class uses Twisted's Filesystem lock to manage a lock in the shared
- database.
- """
-
- url_pattern = '/%s/lock/{uuid}' % SoledadApp.SHARED_DB_NAME
- """
- """
-
- TIMEOUT = 300 # XXX is 5 minutes reasonable?
- """
- The timeout after which the lock expires.
- """
-
- # used for lock doc storage
- TIMESTAMP_KEY = '_timestamp'
- LOCK_TOKEN_KEY = '_token'
-
- FILESYSTEM_LOCK_TRIES = 5
- FILESYSTEM_LOCK_SLEEP_SECONDS = 1
-
- def __init__(self, uuid, state, responder):
- """
- Initialize the lock resource. Parameters to this constructor are
- automatically passed by u1db.
-
- :param uuid: The user unique id.
- :type uuid: str
- :param state: The backend database state.
- :type state: u1db.remote.ServerState
- :param responder: The infrastructure to send responses to client.
- :type responder: u1db.remote.HTTPResponder
- """
- self._shared_db = state.open_database(SoledadApp.SHARED_DB_NAME)
- self._lock_doc_id = '%s%s' % (SHARED_DB_LOCK_DOC_ID_PREFIX, uuid)
- self._lock = FilesystemLock(
- hashlib.sha512(self._lock_doc_id).hexdigest())
- self._state = state
- self._responder = responder
-
- @http_app.http_method(content=str)
- def put(self, content=None):
- """
- Handle a PUT request to the lock document.
-
- A lock is a document in the shared db with doc_id equal to
- 'lock-<uuid>' and the timestamp of its creation as content. This
- method obtains a threaded-lock and creates a lock document if it does
- not exist or if it has expired.
-
- It returns '201 Created' and a pair containing a token to unlock and
- the lock timeout, or '403 AlreadyLockedError' and the remaining amount
- of seconds the lock will still be valid.
-
- :param content: The content of the PUT request. It is only here
- because PUT requests with empty content are considered
- invalid requests by u1db.
- :type content: str
- """
- # obtain filesystem lock
- if not self._try_obtain_filesystem_lock():
- self._responder.send_response_json(408) # error: request timeout
- return
-
- created_lock = False
- now = time.time()
- token = hashlib.sha256(os.urandom(10)).hexdigest() # for releasing
- lock_doc = self._shared_db.get_doc(self._lock_doc_id)
- remaining = self._remaining(lock_doc, now)
-
- # if there's no lock, create one
- if lock_doc is None:
- lock_doc = self._shared_db.create_doc(
- {
- self.TIMESTAMP_KEY: now,
- self.LOCK_TOKEN_KEY: token,
- },
- doc_id=self._lock_doc_id)
- created_lock = True
- else:
- if remaining == 0:
- # lock expired, create new one
- lock_doc.content = {
- self.TIMESTAMP_KEY: now,
- self.LOCK_TOKEN_KEY: token,
- }
- self._shared_db.put_doc(lock_doc)
- created_lock = True
-
- self._try_release_filesystem_lock()
-
- # send response to client
- if created_lock is True:
- self._responder.send_response_json(
- 201, timeout=self.TIMEOUT, token=token) # success: created
- else:
- wire_descr = AlreadyLockedError.wire_description
- self._responder.send_response_json(
- AlreadyLockedError.status, # error: forbidden
- error=AlreadyLockedError.wire_description, remaining=remaining)
-
- @http_app.http_method(token=str)
- def delete(self, token=None):
- """
- Delete the lock if the C{token} is valid.
-
- Delete the lock document in case C{token} is equal to the token stored
- in the lock document.
-
- :param token: The token returned when locking.
- :type token: str
-
- :raise NotLockedError: Raised in case the lock is not locked.
- :raise InvalidTokenError: Raised in case the token is invalid for
- unlocking.
- """
- lock_doc = self._shared_db.get_doc(self._lock_doc_id)
- if lock_doc is None or self._remaining(lock_doc, time.time()) == 0:
- self._responder.send_response_json(
- NotLockedError.status, # error: not found
- error=NotLockedError.wire_description)
- elif token != lock_doc.content[self.LOCK_TOKEN_KEY]:
- self._responder.send_response_json(
- InvalidTokenError.status, # error: unauthorized
- error=InvalidTokenError.wire_description)
- else:
- self._shared_db.delete_doc(lock_doc)
- self._responder.send_response_json(200) # success: should use 204
- # but u1db does not
- # support it.
-
- def _remaining(self, lock_doc, now):
- """
- Return the number of seconds the lock contained in C{lock_doc} is
- still valid, when compared to C{now}.
-
- :param lock_doc: The document containing the lock.
- :type lock_doc: u1db.Document
- :param now: The time to which to compare the lock timestamp.
- :type now: float
-
- :return: The amount of seconds the lock is still valid.
- :rtype: float
- """
- if lock_doc is not None:
- lock_timestamp = lock_doc.content[self.TIMESTAMP_KEY]
- remaining = lock_timestamp + self.TIMEOUT - now
- return remaining if remaining > 0 else 0.0
- return 0.0
-
- def _try_obtain_filesystem_lock(self):
- """
- Try to obtain the file system lock.
-
- @return: Whether the lock was succesfully obtained.
- @rtype: bool
- """
- tries = self.FILESYSTEM_LOCK_TRIES
- while tries > 0:
- try:
- return self._lock.lock()
- except Exception as e:
- tries -= 1
- time.sleep(self.FILESYSTEM_LOCK_SLEEP_SECONDS)
- return False
-
- def _try_release_filesystem_lock(self):
- """
- Release the filesystem lock.
- """
- try:
- self._lock.unlock()
- return True
- except Exception:
- return False
+http_app.url_to_resource.register(LockResource)
+http_app.SyncResource.max_request_size = MAX_REQUEST_SIZE * 1024 * 1024
+http_app.SyncResource.max_entry_size = MAX_ENTRY_SIZE * 1024 * 1024
#-----------------------------------------------------------------------------
diff --git a/server/src/leap/soledad/server/auth.py b/server/src/leap/soledad/server/auth.py
index 0ae49576..e9d2b032 100644
--- a/server/src/leap/soledad/server/auth.py
+++ b/server/src/leap/soledad/server/auth.py
@@ -25,7 +25,7 @@ import httplib
import simplejson as json
-from u1db import DBNAME_CONSTRAINTS
+from u1db import DBNAME_CONSTRAINTS, errors as u1db_errors
from abc import ABCMeta, abstractmethod
from routes.mapper import Mapper
from couchdb.client import Server
@@ -37,16 +37,7 @@ from leap.soledad.common import (
SHARED_DB_LOCK_DOC_ID_PREFIX,
USER_DB_PREFIX,
)
-
-
-#-----------------------------------------------------------------------------
-# Authentication
-#-----------------------------------------------------------------------------
-
-class Unauthorized(Exception):
- """
- User authentication failed.
- """
+from leap.soledad.common.errors import InvalidAuthTokenError
class URLToAuthorization(object):
@@ -279,10 +270,16 @@ class SoledadAuthMiddleware(object):
return self._unauthorized_error("Wrong authentication scheme")
# verify if user is athenticated
- if not self._verify_authentication_data(uuid, auth_data):
- return self._unauthorized_error(
+ try:
+ if not self._verify_authentication_data(uuid, auth_data):
+ return self._unauthorized_error(
+ start_response,
+ self._get_auth_error_string())
+ except u1db_errors.Unauthorized as e:
+ return self._error(
start_response,
- self._get_auth_error_string())
+ 401,
+ e.wire_description)
# verify if user is authorized to perform action
if not self._verify_authorization(environ, uuid):
@@ -319,6 +316,9 @@ class SoledadAuthMiddleware(object):
@return: Whether the token is valid for authenticating the request.
@rtype: bool
+
+ @raise Unauthorized: Raised when C{auth_data} is not enough to
+ authenticate C{uuid}.
"""
return None
@@ -386,11 +386,20 @@ class SoledadTokenAuthMiddleware(SoledadAuthMiddleware):
@return: Whether the token is valid for authenticating the request.
@rtype: bool
+
+ @raise Unauthorized: Raised when C{auth_data} is not enough to
+ authenticate C{uuid}.
"""
token = auth_data # we expect a cleartext token at this point
- return self._verify_token_in_couchdb(uuid, token)
+ try:
+ return self._verify_token_in_couch(uuid, token)
+ except InvalidAuthTokenError:
+ raise
+ except Exception as e:
+ log.err(e)
+ return False
- def _verify_token_in_couchdb(self, uuid, token):
+ def _verify_token_in_couch(self, uuid, token):
"""
Query couchdb to decide if C{token} is valid for C{uuid}.
@@ -398,19 +407,19 @@ class SoledadTokenAuthMiddleware(SoledadAuthMiddleware):
@type uuid: str
@param token: The token.
@type token: str
+
+ @raise InvalidAuthTokenError: Raised when token received from user is
+ either missing in the tokens db or is
+ invalid.
"""
server = Server(url=self._app.state.couch_url)
- try:
- dbname = self.TOKENS_DB
- db = server[dbname]
- token = db.get(token)
- if token is None:
- return False
- return token[self.TOKENS_TYPE_KEY] == self.TOKENS_TYPE_DEF and \
- token[self.TOKENS_USER_ID_KEY] == uuid
- except Exception as e:
- log.err(e)
- return False
+ dbname = self.TOKENS_DB
+ db = server[dbname]
+ token = db.get(token)
+ if token is None or \
+ token[self.TOKENS_TYPE_KEY] != self.TOKENS_TYPE_DEF or \
+ token[self.TOKENS_USER_ID_KEY] != uuid:
+ raise InvalidAuthTokenError()
return True
def _get_auth_error_string(self):
diff --git a/server/src/leap/soledad/server/lock_resource.py b/server/src/leap/soledad/server/lock_resource.py
new file mode 100644
index 00000000..a7870f77
--- /dev/null
+++ b/server/src/leap/soledad/server/lock_resource.py
@@ -0,0 +1,232 @@
+# -*- coding: utf-8 -*-
+# lock_resource.py
+# Copyright (C) 2013 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+"""
+LockResource: a lock based on a document in the shared database.
+"""
+
+
+import hashlib
+import time
+import os
+import tempfile
+import errno
+
+
+from u1db.remote import http_app
+from twisted.python.lockfile import FilesystemLock
+
+
+from leap.soledad.common import (
+ SHARED_DB_NAME,
+ SHARED_DB_LOCK_DOC_ID_PREFIX,
+)
+from leap.soledad.common.errors import (
+ InvalidTokenError,
+ NotLockedError,
+ AlreadyLockedError,
+ LockTimedOutError,
+ CouldNotObtainLockError,
+)
+
+
+class LockResource(object):
+ """
+ Handle requests for locking documents.
+
+ This class uses Twisted's Filesystem lock to manage a lock in the shared
+ database.
+ """
+
+ url_pattern = '/%s/lock/{uuid}' % SHARED_DB_NAME
+ """
+ """
+
+ TIMEOUT = 300 # XXX is 5 minutes reasonable?
+ """
+ The timeout after which the lock expires.
+ """
+
+ # used for lock doc storage
+ TIMESTAMP_KEY = '_timestamp'
+ LOCK_TOKEN_KEY = '_token'
+
+ FILESYSTEM_LOCK_TRIES = 5
+ FILESYSTEM_LOCK_SLEEP_SECONDS = 1
+
+ def __init__(self, uuid, state, responder):
+ """
+ Initialize the lock resource. Parameters to this constructor are
+ automatically passed by u1db.
+
+ :param uuid: The user unique id.
+ :type uuid: str
+ :param state: The backend database state.
+ :type state: u1db.remote.ServerState
+ :param responder: The infrastructure to send responses to client.
+ :type responder: u1db.remote.HTTPResponder
+ """
+ self._shared_db = state.open_database(SHARED_DB_NAME)
+ self._lock_doc_id = '%s%s' % (SHARED_DB_LOCK_DOC_ID_PREFIX, uuid)
+ self._lock = FilesystemLock(
+ os.path.join(
+ tempfile.gettempdir(),
+ hashlib.sha512(self._lock_doc_id).hexdigest()))
+ self._state = state
+ self._responder = responder
+
+ @http_app.http_method(content=str)
+ def put(self, content=None):
+ """
+ Handle a PUT request to the lock document.
+
+ A lock is a document in the shared db with doc_id equal to
+ 'lock-<uuid>' and the timestamp of its creation as content. This
+ method obtains a threaded-lock and creates a lock document if it does
+ not exist or if it has expired.
+
+ It returns '201 Created' and a pair containing a token to unlock and
+ the lock timeout, or '403 AlreadyLockedError' and the remaining amount
+ of seconds the lock will still be valid.
+
+ :param content: The content of the PUT request. It is only here
+ because PUT requests with empty content are considered
+ invalid requests by u1db.
+ :type content: str
+ """
+ # obtain filesystem lock
+ if not self._try_obtain_filesystem_lock():
+ self._responder.send_response_json(
+ LockTimedOutError.status, # error: request timeout
+ error=LockTimedOutError.wire_description)
+ return
+
+ created_lock = False
+ now = time.time()
+ token = hashlib.sha256(os.urandom(10)).hexdigest() # for releasing
+ lock_doc = self._shared_db.get_doc(self._lock_doc_id)
+ remaining = self._remaining(lock_doc, now)
+
+ # if there's no lock, create one
+ if lock_doc is None:
+ lock_doc = self._shared_db.create_doc(
+ {
+ self.TIMESTAMP_KEY: now,
+ self.LOCK_TOKEN_KEY: token,
+ },
+ doc_id=self._lock_doc_id)
+ created_lock = True
+ else:
+ if remaining == 0:
+ # lock expired, create new one
+ lock_doc.content = {
+ self.TIMESTAMP_KEY: now,
+ self.LOCK_TOKEN_KEY: token,
+ }
+ self._shared_db.put_doc(lock_doc)
+ created_lock = True
+
+ self._try_release_filesystem_lock()
+
+ # send response to client
+ if created_lock is True:
+ self._responder.send_response_json(
+ 201, timeout=self.TIMEOUT, token=token) # success: created
+ else:
+ self._responder.send_response_json(
+ AlreadyLockedError.status, # error: forbidden
+ error=AlreadyLockedError.wire_description, remaining=remaining)
+
+ @http_app.http_method(token=str)
+ def delete(self, token=None):
+ """
+ Delete the lock if the C{token} is valid.
+
+ Delete the lock document in case C{token} is equal to the token stored
+ in the lock document.
+
+ :param token: The token returned when locking.
+ :type token: str
+
+ :raise NotLockedError: Raised in case the lock is not locked.
+ :raise InvalidTokenError: Raised in case the token is invalid for
+ unlocking.
+ """
+ lock_doc = self._shared_db.get_doc(self._lock_doc_id)
+ if lock_doc is None or self._remaining(lock_doc, time.time()) == 0:
+ self._responder.send_response_json(
+ NotLockedError.status, # error: not found
+ error=NotLockedError.wire_description)
+ elif token != lock_doc.content[self.LOCK_TOKEN_KEY]:
+ self._responder.send_response_json(
+ InvalidTokenError.status, # error: unauthorized
+ error=InvalidTokenError.wire_description)
+ else:
+ self._shared_db.delete_doc(lock_doc)
+ self._responder.send_response_json(200) # success: should use 204
+ # but u1db does not
+ # support it.
+
+ def _remaining(self, lock_doc, now):
+ """
+ Return the number of seconds the lock contained in C{lock_doc} is
+ still valid, when compared to C{now}.
+
+ :param lock_doc: The document containing the lock.
+ :type lock_doc: u1db.Document
+ :param now: The time to which to compare the lock timestamp.
+ :type now: float
+
+ :return: The amount of seconds the lock is still valid.
+ :rtype: float
+ """
+ if lock_doc is not None:
+ lock_timestamp = lock_doc.content[self.TIMESTAMP_KEY]
+ remaining = lock_timestamp + self.TIMEOUT - now
+ return remaining if remaining > 0 else 0.0
+ return 0.0
+
+ def _try_obtain_filesystem_lock(self):
+ """
+ Try to obtain the file system lock.
+
+ @return: Whether the lock was succesfully obtained.
+ @rtype: bool
+ """
+ tries = self.FILESYSTEM_LOCK_TRIES
+ while tries > 0:
+ try:
+ return self._lock.lock()
+ except OSError as e:
+ tries -= 1
+ if tries == 0:
+ raise CouldNotObtainLockError(e.message)
+ time.sleep(self.FILESYSTEM_LOCK_SLEEP_SECONDS)
+ return False
+
+ def _try_release_filesystem_lock(self):
+ """
+ Release the filesystem lock.
+ """
+ try:
+ self._lock.unlock()
+ return True
+ except OSError as e:
+ if e.errno == errno.ENOENT:
+ return True
+ return False