summaryrefslogtreecommitdiff
path: root/common/src/leap/soledad/common/couch.py
diff options
context:
space:
mode:
Diffstat (limited to 'common/src/leap/soledad/common/couch.py')
-rw-r--r--common/src/leap/soledad/common/couch.py1109
1 files changed, 676 insertions, 433 deletions
diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py
index 4f569559..d4f67696 100644
--- a/common/src/leap/soledad/common/couch.py
+++ b/common/src/leap/soledad/common/couch.py
@@ -113,7 +113,7 @@ def couch_server(url):
THREAD_POOL = ThreadPool(20)
-class CouchDatabase(CommonBackend):
+class SoledadBackend(CommonBackend):
"""
A U1DB implementation that uses CouchDB as its persistence layer.
@@ -135,27 +135,13 @@ class CouchDatabase(CommonBackend):
:type ensure_ddocs: bool
:return: the database instance
- :rtype: CouchDatabase
+ :rtype: SoledadBackend
"""
- # get database from url
- m = re.match('(^https?://[^/]+)/(.+)$', url)
- if not m:
- raise InvalidURLError
- url = m.group(1)
- dbname = m.group(2)
- with couch_server(url) as server:
- try:
- server[dbname]
- except ResourceNotFound:
- if not create:
- raise DatabaseDoesNotExist()
- server.create(dbname)
+ db = CouchDatabase.open_database(url, create, ensure_ddocs)
return cls(
- url, dbname, replica_uid=replica_uid,
- ensure_ddocs=ensure_ddocs, database_security=database_security)
+ db, replica_uid=replica_uid)
- def __init__(self, url, dbname, replica_uid=None, ensure_ddocs=False,
- database_security=None):
+ def __init__(self, database, replica_uid=None):
"""
Create a new Couch data container.
@@ -169,25 +155,14 @@ class CouchDatabase(CommonBackend):
:type ensure_ddocs: bool
"""
# save params
- self._url = url
- self._session = Session(timeout=COUCH_TIMEOUT)
self._factory = CouchDocument
self._real_replica_uid = None
# configure couch
- self._dbname = dbname
- self._database = Database(
- urljoin(self._url, self._dbname),
- self._session)
- try:
- self._database.info()
- except ResourceNotFound:
- raise DatabaseDoesNotExist()
+ self._cache = None
+ self._dbname = database._dbname
+ self._database = database
if replica_uid is not None:
self._set_replica_uid(replica_uid)
- if ensure_ddocs:
- self.ensure_ddocs_on_db()
- self.ensure_security_ddoc(database_security)
- self._cache = None
@property
def cache(self):
@@ -205,45 +180,6 @@ class CouchDatabase(CommonBackend):
"""
self._cache = cache
- def ensure_ddocs_on_db(self):
- """
- Ensure that the design documents used by the backend exist on the
- couch database.
- """
- for ddoc_name in ['docs', 'syncs', 'transactions']:
- try:
- self._database.resource('_design',
- ddoc_name, '_info').get_json()
- except ResourceNotFound:
- ddoc = json.loads(
- binascii.a2b_base64(
- getattr(ddocs, ddoc_name)))
- self._database.save(ddoc)
-
- def ensure_security_ddoc(self, security_config=None):
- """
- Make sure that only soledad user is able to access this database as
- an unprivileged member, meaning that administration access will
- be forbidden even inside an user database.
- The goal is to make sure that only the lowest access level is given
- to the unprivileged CouchDB user set on the server process.
- This is achieved by creating a _security design document, see:
- http://docs.couchdb.org/en/latest/api/database/security.html
-
- :param database_security: security configuration parsed from conf file
- :type cache: dict
- """
- security_config = security_config or {}
- security = self._database.resource.get_json('_security')[2]
- security['members'] = {'names': [], 'roles': []}
- security['members']['names'] = security_config.get('members',
- ['soledad'])
- security['members']['roles'] = security_config.get('members_roles', [])
- security['admins'] = {'names': [], 'roles': []}
- security['admins']['names'] = security_config.get('admins', [])
- security['admins']['roles'] = security_config.get('admins_roles', [])
- self._database.resource.put_json('_security', body=security)
-
def get_sync_target(self):
"""
Return a SyncTarget object, for another u1db to synchronize with.
@@ -257,8 +193,7 @@ class CouchDatabase(CommonBackend):
"""
Delete a U1DB CouchDB database.
"""
- with couch_server(self._url) as server:
- del(server[self._dbname])
+ self._database.delete_database()
def close(self):
"""
@@ -267,10 +202,7 @@ class CouchDatabase(CommonBackend):
:return: True if db was succesfully closed.
:rtype: bool
"""
- self._url = None
- self._full_commit = None
- self._session = None
- self._database = None
+ self._database.close()
return True
def __del__(self):
@@ -286,18 +218,9 @@ class CouchDatabase(CommonBackend):
:param replica_uid: The new replica uid.
:type replica_uid: str
"""
- try:
- # set on existent config document
- doc = self._database['u1db_config']
- doc['replica_uid'] = replica_uid
- except ResourceNotFound:
- # or create the config document
- doc = {
- '_id': 'u1db_config',
- 'replica_uid': replica_uid,
- }
- self._database.save(doc)
+ self._database.set_replica_uid(replica_uid)
self._real_replica_uid = replica_uid
+ self.cache['replica_uid'] = self._real_replica_uid
def _get_replica_uid(self):
"""
@@ -307,21 +230,13 @@ class CouchDatabase(CommonBackend):
:rtype: str
"""
if self._real_replica_uid is not None:
- self.cache[self._url] = {'replica_uid': self._real_replica_uid}
- return self._real_replica_uid
- if self._url in self.cache:
- return self.cache[self._url]['replica_uid']
- try:
- # grab replica_uid from server
- doc = self._database['u1db_config']
- self.cache[self._url] = doc
- self._real_replica_uid = doc['replica_uid']
- return self._real_replica_uid
- except ResourceNotFound:
- # create a unique replica_uid
- self._real_replica_uid = uuid.uuid4().hex
- self._set_replica_uid(self._real_replica_uid)
+ self.cache['replica_uid'] = self._real_replica_uid
return self._real_replica_uid
+ if 'replica_uid' in self.cache:
+ return self.cache['replica_uid']
+ self._real_replica_uid = self._database.get_replica_uid()
+ self._set_replica_uid(self._real_replica_uid)
+ return self._real_replica_uid
_replica_uid = property(_get_replica_uid, _set_replica_uid)
@@ -348,19 +263,7 @@ class CouchDatabase(CommonBackend):
design document for an yet
unknown reason.
"""
- # query a couch list function
- if self.replica_uid + '_gen' in self.cache:
- return self.cache[self.replica_uid + '_gen']['generation']
- ddoc_path = ['_design', 'transactions', '_list', 'generation', 'log']
- res = self._database.resource(*ddoc_path)
- try:
- response = res.get_json()
- self.cache[self.replica_uid + '_gen'] = response[2]
- return response[2]['generation']
- except ResourceNotFound as e:
- raise_missing_design_doc_error(e, ddoc_path)
- except ServerError as e:
- raise_server_error(e, ddoc_path)
+ return self._get_generation_info()[0]
def _get_generation_info(self):
"""
@@ -385,18 +288,10 @@ class CouchDatabase(CommonBackend):
"""
if self.replica_uid + '_gen' in self.cache:
response = self.cache[self.replica_uid + '_gen']
- return (response['generation'], response['transaction_id'])
- # query a couch list function
- ddoc_path = ['_design', 'transactions', '_list', 'generation', 'log']
- res = self._database.resource(*ddoc_path)
- try:
- response = res.get_json()
- self.cache[self.replica_uid + '_gen'] = response[2]
- return (response[2]['generation'], response[2]['transaction_id'])
- except ResourceNotFound as e:
- raise_missing_design_doc_error(e, ddoc_path)
- except ServerError as e:
- raise_server_error(e, ddoc_path)
+ return response
+ cur_gen, newest_trans_id = self._database._get_generation_info()
+ self.cache[self.replica_uid + '_gen'] = (cur_gen, newest_trans_id)
+ return (cur_gen, newest_trans_id)
def _get_trans_id_for_gen(self, generation):
"""
@@ -408,37 +303,8 @@ class CouchDatabase(CommonBackend):
:return: The transaction id for C{generation}.
:rtype: str
- :raise InvalidGeneration: Raised when the generation does not exist.
- :raise MissingDesignDocError: Raised when tried to access a missing
- design document.
- :raise MissingDesignDocListFunctionError: Raised when trying to access
- a missing list function on a
- design document.
- :raise MissingDesignDocNamedViewError: Raised when trying to access a
- missing named view on a design
- document.
- :raise MissingDesignDocDeletedError: Raised when trying to access a
- deleted design document.
- :raise MissingDesignDocUnknownError: Raised when failed to access a
- design document for an yet
- unknown reason.
"""
- if generation == 0:
- return ''
- # query a couch list function
- ddoc_path = [
- '_design', 'transactions', '_list', 'trans_id_for_gen', 'log'
- ]
- res = self._database.resource(*ddoc_path)
- try:
- response = res.get_json(gen=generation)
- if response[2] == {}:
- raise InvalidGeneration
- return response[2]['transaction_id']
- except ResourceNotFound as e:
- raise_missing_design_doc_error(e, ddoc_path)
- except ServerError as e:
- raise_server_error(e, ddoc_path)
+ return self._database._get_trans_id_for_gen(generation)
def _get_transaction_log(self):
"""
@@ -447,30 +313,8 @@ class CouchDatabase(CommonBackend):
:return: The complete transaction log.
:rtype: [(str, str)]
- :raise MissingDesignDocError: Raised when tried to access a missing
- design document.
- :raise MissingDesignDocListFunctionError: Raised when trying to access
- a missing list function on a
- design document.
- :raise MissingDesignDocNamedViewError: Raised when trying to access a
- missing named view on a design
- document.
- :raise MissingDesignDocDeletedError: Raised when trying to access a
- deleted design document.
- :raise MissingDesignDocUnknownError: Raised when failed to access a
- design document for an yet
- unknown reason.
"""
- # query a couch view
- ddoc_path = ['_design', 'transactions', '_view', 'log']
- res = self._database.resource(*ddoc_path)
- try:
- response = res.get_json()
- return map(
- lambda row: (row['id'], row['value']),
- response[2]['rows'])
- except ResourceNotFound as e:
- raise_missing_design_doc_error(e, ddoc_path)
+ return self._database._get_transaction_log()
def _get_doc(self, doc_id, check_for_conflicts=False):
"""
@@ -487,44 +331,7 @@ class CouchDatabase(CommonBackend):
:return: The document.
:rtype: CouchDocument
"""
- # get document with all attachments (u1db content and eventual
- # conflicts)
- try:
- result = \
- self._database.resource(doc_id).get_json(
- attachments=True)[2]
- except ResourceNotFound:
- return None
- return self.__parse_doc_from_couch(result, doc_id, check_for_conflicts)
-
- def __parse_doc_from_couch(self, result, doc_id,
- check_for_conflicts=False):
- # restrict to u1db documents
- if 'u1db_rev' not in result:
- return None
- doc = self._factory(doc_id, result['u1db_rev'])
- # set contents or make tombstone
- if '_attachments' not in result \
- or 'u1db_content' not in result['_attachments']:
- doc.make_tombstone()
- else:
- doc.content = json.loads(
- binascii.a2b_base64(
- result['_attachments']['u1db_content']['data']))
- # determine if there are conflicts
- if check_for_conflicts \
- and '_attachments' in result \
- and 'u1db_conflicts' in result['_attachments']:
- doc.set_conflicts(
- self._build_conflicts(
- doc.doc_id,
- json.loads(binascii.a2b_base64(
- result['_attachments']['u1db_conflicts']['data']))))
- # store couch revision
- doc.couch_rev = result['_rev']
- # store transactions
- doc.transactions = result['u1db_transactions']
- return doc
+ return self._database._get_doc(doc_id, check_for_conflicts)
def get_doc(self, doc_id, include_deleted=False):
"""
@@ -561,11 +368,7 @@ class CouchDatabase(CommonBackend):
the documents in the database.
:rtype: (int, [CouchDocument])
"""
-
- generation = self._get_generation()
- results = list(self.get_docs(self._database,
- include_deleted=include_deleted))
- return (generation, results)
+ return self._database.get_all_docs(include_deleted)
def _put_doc(self, old_doc, doc):
"""
@@ -579,84 +382,15 @@ class CouchDatabase(CommonBackend):
:type old_doc: CouchDocument
:param doc: The document to be put.
:type doc: CouchDocument
-
- :raise RevisionConflict: Raised when trying to update a document but
- couch revisions mismatch.
- :raise MissingDesignDocError: Raised when tried to access a missing
- design document.
- :raise MissingDesignDocListFunctionError: Raised when trying to access
- a missing list function on a
- design document.
- :raise MissingDesignDocNamedViewError: Raised when trying to access a
- missing named view on a design
- document.
- :raise MissingDesignDocDeletedError: Raised when trying to access a
- deleted design document.
- :raise MissingDesignDocUnknownError: Raised when failed to access a
- design document for an yet
- unknown reason.
"""
- attachments = {} # we save content and conflicts as attachments
- parts = [] # and we put it using couch's multipart PUT
- # save content as attachment
- if doc.is_tombstone() is False:
- content = doc.get_json()
- attachments['u1db_content'] = {
- 'follows': True,
- 'content_type': 'application/octet-stream',
- 'length': len(content),
- }
- parts.append(content)
- # save conflicts as attachment
- if doc.has_conflicts is True:
- conflicts = json.dumps(
- map(lambda cdoc: (cdoc.rev, cdoc.content),
- doc.get_conflicts()))
- attachments['u1db_conflicts'] = {
- 'follows': True,
- 'content_type': 'application/octet-stream',
- 'length': len(conflicts),
- }
- parts.append(conflicts)
- # store old transactions, if any
- transactions = old_doc.transactions[:] if old_doc is not None else []
- # create a new transaction id and timestamp it so the transaction log
- # is consistent when querying the database.
- transactions.append(
- # here we store milliseconds to keep consistent with javascript
- # Date.prototype.getTime() which was used before inside a couchdb
- # update handler.
- (int(time.time() * 1000),
- self._allocate_transaction_id()))
- # build the couch document
- couch_doc = {
- '_id': doc.doc_id,
- 'u1db_rev': doc.rev,
- 'u1db_transactions': transactions,
- '_attachments': attachments,
- }
- # if we are updating a doc we have to add the couch doc revision
- if old_doc is not None:
- couch_doc['_rev'] = old_doc.couch_rev
- # prepare the multipart PUT
- buf = StringIO()
- headers = {}
- envelope = MultipartWriter(buf, headers=headers, subtype='related')
- envelope.add('application/json', json.dumps(couch_doc))
- for part in parts:
- envelope.add('application/octet-stream', part)
- envelope.close()
- # try to save and fail if there's a revision conflict
- try:
- resource = self._new_resource()
- resource.put_json(
- doc.doc_id, body=str(buf.getvalue()), headers=envelope.headers)
- except ResourceConflict:
- raise RevisionConflict()
+ last_transaction =\
+ self._database.save_document(old_doc, doc,
+ self._allocate_transaction_id())
if self.replica_uid + '_gen' in self.cache:
- gen_info = self.cache[self.replica_uid + '_gen']
- gen_info['generation'] += 1
- gen_info['transaction_id'] = transactions[-1][1]
+ gen, trans = self.cache[self.replica_uid + '_gen']
+ gen += 1
+ trans = last_transaction
+ self.cache[self.replica_uid + '_gen'] = (gen, trans)
def put_doc(self, doc):
"""
@@ -711,53 +445,8 @@ class CouchDatabase(CommonBackend):
to the last intervening change and sorted by generation (old
changes first)
:rtype: (int, str, [(str, int, str)])
-
- :raise MissingDesignDocError: Raised when tried to access a missing
- design document.
- :raise MissingDesignDocListFunctionError: Raised when trying to access
- a missing list function on a
- design document.
- :raise MissingDesignDocNamedViewError: Raised when trying to access a
- missing named view on a design
- document.
- :raise MissingDesignDocDeletedError: Raised when trying to access a
- deleted design document.
- :raise MissingDesignDocUnknownError: Raised when failed to access a
- design document for an yet
- unknown reason.
"""
- # query a couch list function
- ddoc_path = [
- '_design', 'transactions', '_list', 'whats_changed', 'log'
- ]
- res = self._database.resource(*ddoc_path)
- try:
- response = res.get_json(old_gen=old_generation)
- results = map(
- lambda row:
- (row['generation'], row['doc_id'], row['transaction_id']),
- response[2]['transactions'])
- results.reverse()
- cur_gen = old_generation
- seen = set()
- changes = []
- newest_trans_id = ''
- for generation, doc_id, trans_id in results:
- if doc_id not in seen:
- changes.append((doc_id, generation, trans_id))
- seen.add(doc_id)
- if changes:
- cur_gen = changes[0][1] # max generation
- newest_trans_id = changes[0][2]
- changes.reverse()
- else:
- cur_gen, newest_trans_id = self._get_generation_info()
-
- return cur_gen, newest_trans_id, changes
- except ResourceNotFound as e:
- raise_missing_design_doc_error(e, ddoc_path)
- except ServerError as e:
- raise_server_error(e, ddoc_path)
+ return self._database.whats_changed(old_generation)
def delete_doc(self, doc):
"""
@@ -791,25 +480,6 @@ class CouchDatabase(CommonBackend):
self._put_doc(old_doc, doc)
return new_rev
- def _build_conflicts(self, doc_id, attached_conflicts):
- """
- Build the conflicted documents list from the conflicts attachment
- fetched from a couch document.
-
- :param attached_conflicts: The document's conflicts as fetched from a
- couch document attachment.
- :type attached_conflicts: dict
- """
- conflicts = []
- for doc_rev, content in attached_conflicts:
- doc = self._factory(doc_id, doc_rev)
- if content is None:
- doc.make_tombstone()
- else:
- doc.content = content
- conflicts.append(doc)
- return conflicts
-
def get_doc_conflicts(self, doc_id, couch_rev=None):
"""
Get the conflicted versions of a document.
@@ -823,22 +493,7 @@ class CouchDatabase(CommonBackend):
:return: A list of conflicted versions of the document.
:rtype: list
"""
- # request conflicts attachment from server
- params = {}
- conflicts = []
- if couch_rev is not None:
- params['rev'] = couch_rev # restric document's couch revision
- else:
- # TODO: move into resource logic!
- first_entry = self._get_doc(doc_id, check_for_conflicts=True)
- conflicts.append(first_entry)
- resource = self._database.resource(doc_id, 'u1db_conflicts')
- try:
- response = resource.get_json(**params)
- return conflicts + self._build_conflicts(
- doc_id, json.loads(response[2].read()))
- except ResourceNotFound:
- return []
+ return self._database.get_doc_conflicts(doc_id, couch_rev)
def _get_replica_gen_and_trans_id(self, other_replica_uid):
"""
@@ -859,22 +514,7 @@ class CouchDatabase(CommonBackend):
synchronized with the replica, this is (0, '').
:rtype: (int, str)
"""
- if other_replica_uid in self.cache:
- return self.cache[other_replica_uid]
-
- doc_id = 'u1db_sync_%s' % other_replica_uid
- try:
- doc = self._database[doc_id]
- except ResourceNotFound:
- doc = {
- '_id': doc_id,
- 'generation': 0,
- 'transaction_id': '',
- }
- self._database.save(doc)
- result = doc['generation'], doc['transaction_id']
- self.cache[other_replica_uid] = result
- return result
+ return self._database._get_replica_gen_and_trans_id(other_replica_uid)
def _set_replica_gen_and_trans_id(self, other_replica_uid,
other_generation, other_transaction_id,
@@ -937,14 +577,9 @@ class CouchDatabase(CommonBackend):
"""
self.cache[other_replica_uid] = (other_generation,
other_transaction_id)
- doc_id = 'u1db_sync_%s' % other_replica_uid
- try:
- doc = self._database[doc_id]
- except ResourceNotFound:
- doc = {'_id': doc_id}
- doc['generation'] = other_generation
- doc['transaction_id'] = other_transaction_id
- self._database.save(doc)
+ self._database._do_set_replica_gen_and_trans_id(other_replica_uid,
+ other_generation,
+ other_transaction_id)
def _force_doc_sync_conflict(self, doc):
"""
@@ -1099,25 +734,8 @@ class CouchDatabase(CommonBackend):
in matching doc_ids order.
:rtype: iterable
"""
- # Workaround for:
- #
- # http://bugs.python.org/issue7980
- # https://leap.se/code/issues/5449
- #
- # python-couchdb uses time.strptime, which is not thread safe. In
- # order to avoid the problem described on the issues above, we preload
- # strptime here by evaluating the conversion of an arbitrary date.
- # This will not be needed when/if we switch from python-couchdb to
- # paisley.
- time.strptime('Mar 8 1917', '%b %d %Y')
- get_one = lambda doc_id: self._get_doc(doc_id, check_for_conflicts)
- docs = [THREAD_POOL.apply_async(get_one, [doc_id])
- for doc_id in doc_ids]
- for doc in docs:
- doc = doc.get()
- if not doc or not include_deleted and doc.is_tombstone():
- continue
- yield doc
+ return self._database.get_docs(doc_ids, check_for_conflicts,
+ include_deleted)
def _prune_conflicts(self, doc, doc_vcr):
"""
@@ -1147,6 +765,632 @@ class CouchDatabase(CommonBackend):
doc.rev = doc_vcr.as_str()
doc.delete_conflicts(c_revs_to_prune)
+
+class CouchDatabase(object):
+ """
+ Holds CouchDB related code.
+ This class gives methods to encapsulate database operations and hide
+ CouchDB details from backend code.
+ """
+
+ @classmethod
+ def open_database(cls, url, create, ensure_ddocs=False):
+ """
+ Open a U1DB database using CouchDB as backend.
+
+ :param url: the url of the database replica
+ :type url: str
+ :param create: should the replica be created if it does not exist?
+ :type create: bool
+ :param replica_uid: an optional unique replica identifier
+ :type replica_uid: str
+ :param ensure_ddocs: Ensure that the design docs exist on server.
+ :type ensure_ddocs: bool
+
+ :return: the database instance
+ :rtype: SoledadBackend
+ """
+ # get database from url
+ m = re.match('(^https?://[^/]+)/(.+)$', url)
+ if not m:
+ raise InvalidURLError
+ url = m.group(1)
+ dbname = m.group(2)
+ with couch_server(url) as server:
+ try:
+ server[dbname]
+ except ResourceNotFound:
+ if not create:
+ raise DatabaseDoesNotExist()
+ server.create(dbname)
+ return cls(
+ url, dbname, ensure_ddocs=ensure_ddocs)
+
+ def __init__(self, url, dbname, ensure_ddocs=True,
+ database_security=None):
+ self._session = Session(timeout=COUCH_TIMEOUT)
+ self._url = url
+ self._dbname = dbname
+ self._database = Database(
+ urljoin(url, dbname),
+ self._session)
+ self._database.info()
+ if ensure_ddocs:
+ self.ensure_ddocs_on_db()
+ self.ensure_security_ddoc(database_security)
+
+ def ensure_ddocs_on_db(self):
+ """
+ Ensure that the design documents used by the backend exist on the
+ couch database.
+ """
+ for ddoc_name in ['docs', 'syncs', 'transactions']:
+ try:
+ self._database.resource('_design',
+ ddoc_name, '_info').get_json()
+ except ResourceNotFound:
+ ddoc = json.loads(
+ binascii.a2b_base64(
+ getattr(ddocs, ddoc_name)))
+ self._database.save(ddoc)
+
+ def ensure_security_ddoc(self, security_config=None):
+ """
+ Make sure that only soledad user is able to access this database as
+ an unprivileged member, meaning that administration access will
+ be forbidden even inside an user database.
+ The goal is to make sure that only the lowest access level is given
+ to the unprivileged CouchDB user set on the server process.
+ This is achieved by creating a _security design document, see:
+ http://docs.couchdb.org/en/latest/api/database/security.html
+
+ :param database_security: security configuration parsed from conf file
+ :type cache: dict
+ """
+ security_config = security_config or {}
+ security = self._database.resource.get_json('_security')[2]
+ security['members'] = {'names': [], 'roles': []}
+ security['members']['names'] = security_config.get('members',
+ ['soledad'])
+ security['members']['roles'] = security_config.get('members_roles', [])
+ security['admins'] = {'names': [], 'roles': []}
+ security['admins']['names'] = security_config.get('admins', [])
+ security['admins']['roles'] = security_config.get('admins_roles', [])
+ self._database.resource.put_json('_security', body=security)
+
+ def delete_database(self):
+ """
+ Delete a U1DB CouchDB database.
+ """
+ with couch_server(self._url) as server:
+ del(server[self._dbname])
+
+ def set_replica_uid(self, replica_uid):
+ """
+ Force the replica uid to be set.
+
+ :param replica_uid: The new replica uid.
+ :type replica_uid: str
+ """
+ try:
+ # set on existent config document
+ doc = self._database['u1db_config']
+ doc['replica_uid'] = replica_uid
+ except ResourceNotFound:
+ # or create the config document
+ doc = {
+ '_id': 'u1db_config',
+ 'replica_uid': replica_uid,
+ }
+ self._database.save(doc)
+
+ def get_replica_uid(self):
+ """
+ Get the replica uid.
+
+ :return: The replica uid.
+ :rtype: str
+ """
+ try:
+ # grab replica_uid from server
+ doc = self._database['u1db_config']
+ replica_uid = doc['replica_uid']
+ return replica_uid
+ except ResourceNotFound:
+ # create a unique replica_uid
+ replica_uid = uuid.uuid4().hex
+ self.set_replica_uid(replica_uid)
+ return replica_uid
+
+ def close(self):
+ self._database = None
+
+ def get_all_docs(self, include_deleted=False):
+ """
+ Get the JSON content for all documents in the database.
+
+ :param include_deleted: If set to True, deleted documents will be
+ returned with empty content. Otherwise deleted
+ documents will not be included in the results.
+ :type include_deleted: bool
+
+ :return: (generation, [CouchDocument])
+ The current generation of the database, followed by a list of all
+ the documents in the database.
+ :rtype: (int, [CouchDocument])
+ """
+
+ generation, _ = self._get_generation_info()
+ results = list(self.get_docs(self._database,
+ include_deleted=include_deleted))
+ return (generation, results)
+
+ def get_docs(self, doc_ids, check_for_conflicts=True,
+ include_deleted=False):
+ """
+ Get the JSON content for many documents.
+
+ :param doc_ids: A list of document identifiers or None for all.
+ :type doc_ids: list
+ :param check_for_conflicts: If set to False, then the conflict check
+ will be skipped, and 'None' will be
+ returned instead of True/False.
+ :type check_for_conflicts: bool
+ :param include_deleted: If set to True, deleted documents will be
+ returned with empty content. Otherwise deleted
+ documents will not be included in the results.
+ :return: iterable giving the Document object for each document id
+ in matching doc_ids order.
+ :rtype: iterable
+ """
+ # Workaround for:
+ #
+ # http://bugs.python.org/issue7980
+ # https://leap.se/code/issues/5449
+ #
+ # python-couchdb uses time.strptime, which is not thread safe. In
+ # order to avoid the problem described on the issues above, we preload
+ # strptime here by evaluating the conversion of an arbitrary date.
+ # This will not be needed when/if we switch from python-couchdb to
+ # paisley.
+ time.strptime('Mar 8 1917', '%b %d %Y')
+ get_one = lambda doc_id: self._get_doc(doc_id, check_for_conflicts)
+ docs = [THREAD_POOL.apply_async(get_one, [doc_id])
+ for doc_id in doc_ids]
+ for doc in docs:
+ doc = doc.get()
+ if not doc or not include_deleted and doc.is_tombstone():
+ continue
+ yield doc
+
+ def _get_doc(self, doc_id, check_for_conflicts=False):
+ """
+ Extract the document from storage.
+
+ This can return None if the document doesn't exist.
+
+ :param doc_id: The unique document identifier
+ :type doc_id: str
+ :param check_for_conflicts: If set to False, then the conflict check
+ will be skipped.
+ :type check_for_conflicts: bool
+
+ :return: The document.
+ :rtype: CouchDocument
+ """
+ # get document with all attachments (u1db content and eventual
+ # conflicts)
+ try:
+ result = \
+ self._database.resource(doc_id).get_json(
+ attachments=True)[2]
+ except ResourceNotFound:
+ return None
+ return self.__parse_doc_from_couch(result, doc_id, check_for_conflicts)
+
+ def __parse_doc_from_couch(self, result, doc_id,
+ check_for_conflicts=False):
+ # restrict to u1db documents
+ if 'u1db_rev' not in result:
+ return None
+ doc = CouchDocument(doc_id, result['u1db_rev'])
+ # set contents or make tombstone
+ if '_attachments' not in result \
+ or 'u1db_content' not in result['_attachments']:
+ doc.make_tombstone()
+ else:
+ doc.content = json.loads(
+ binascii.a2b_base64(
+ result['_attachments']['u1db_content']['data']))
+ # determine if there are conflicts
+ if check_for_conflicts \
+ and '_attachments' in result \
+ and 'u1db_conflicts' in result['_attachments']:
+ doc.set_conflicts(
+ self._build_conflicts(
+ doc.doc_id,
+ json.loads(binascii.a2b_base64(
+ result['_attachments']['u1db_conflicts']['data']))))
+ # store couch revision
+ doc.couch_rev = result['_rev']
+ # store transactions
+ doc.transactions = result['u1db_transactions']
+ return doc
+
+ def _build_conflicts(self, doc_id, attached_conflicts):
+ """
+ Build the conflicted documents list from the conflicts attachment
+ fetched from a couch document.
+
+ :param attached_conflicts: The document's conflicts as fetched from a
+ couch document attachment.
+ :type attached_conflicts: dict
+ """
+ conflicts = []
+ for doc_rev, content in attached_conflicts:
+ doc = CouchDocument(doc_id, doc_rev)
+ if content is None:
+ doc.make_tombstone()
+ else:
+ doc.content = content
+ conflicts.append(doc)
+ return conflicts
+
+ def _get_trans_id_for_gen(self, generation):
+ """
+ Get the transaction id corresponding to a particular generation.
+
+ :param generation: The generation for which to get the transaction id.
+ :type generation: int
+
+ :return: The transaction id for C{generation}.
+ :rtype: str
+
+ :raise InvalidGeneration: Raised when the generation does not exist.
+ :raise MissingDesignDocError: Raised when tried to access a missing
+ design document.
+ :raise MissingDesignDocListFunctionError: Raised when trying to access
+ a missing list function on a
+ design document.
+ :raise MissingDesignDocNamedViewError: Raised when trying to access a
+ missing named view on a design
+ document.
+ :raise MissingDesignDocDeletedError: Raised when trying to access a
+ deleted design document.
+ :raise MissingDesignDocUnknownError: Raised when failed to access a
+ design document for an yet
+ unknown reason.
+ """
+ if generation == 0:
+ return ''
+ # query a couch list function
+ ddoc_path = [
+ '_design', 'transactions', '_list', 'trans_id_for_gen', 'log'
+ ]
+ res = self._database.resource(*ddoc_path)
+ try:
+ response = res.get_json(gen=generation)
+ if response[2] == {}:
+ raise InvalidGeneration
+ return response[2]['transaction_id']
+ except ResourceNotFound as e:
+ raise_missing_design_doc_error(e, ddoc_path)
+ except ServerError as e:
+ raise_server_error(e, ddoc_path)
+
+ def _get_replica_gen_and_trans_id(self, other_replica_uid):
+ """
+ Return the last known generation and transaction id for the other db
+ replica.
+
+ When you do a synchronization with another replica, the Database keeps
+ track of what generation the other database replica was at, and what
+ the associated transaction id was. This is used to determine what data
+ needs to be sent, and if two databases are claiming to be the same
+ replica.
+
+ :param other_replica_uid: The identifier for the other replica.
+ :type other_replica_uid: str
+
+ :return: A tuple containing the generation and transaction id we
+ encountered during synchronization. If we've never
+ synchronized with the replica, this is (0, '').
+ :rtype: (int, str)
+ """
+ doc_id = 'u1db_sync_%s' % other_replica_uid
+ try:
+ doc = self._database[doc_id]
+ except ResourceNotFound:
+ doc = {
+ '_id': doc_id,
+ 'generation': 0,
+ 'transaction_id': '',
+ }
+ self._database.save(doc)
+ result = doc['generation'], doc['transaction_id']
+ return result
+
+ def get_doc_conflicts(self, doc_id, couch_rev=None):
+ """
+ Get the conflicted versions of a document.
+
+ If the C{couch_rev} parameter is not None, conflicts for a specific
+ document's couch revision are returned.
+
+ :param couch_rev: The couch document revision.
+ :type couch_rev: str
+
+ :return: A list of conflicted versions of the document.
+ :rtype: list
+ """
+ # request conflicts attachment from server
+ params = {}
+ conflicts = []
+ if couch_rev is not None:
+ params['rev'] = couch_rev # restric document's couch revision
+ else:
+ # TODO: move into resource logic!
+ first_entry = self._get_doc(doc_id, check_for_conflicts=True)
+ conflicts.append(first_entry)
+ resource = self._database.resource(doc_id, 'u1db_conflicts')
+ try:
+ response = resource.get_json(**params)
+ return conflicts + self._build_conflicts(
+ doc_id, json.loads(response[2].read()))
+ except ResourceNotFound:
+ return []
+
+ def _do_set_replica_gen_and_trans_id(
+ self, other_replica_uid, other_generation, other_transaction_id,
+ number_of_docs=None, doc_idx=None, sync_id=None):
+ """
+ Set the last-known generation and transaction id for the other
+ database replica.
+
+ We have just performed some synchronization, and we want to track what
+ generation the other replica was at. See also
+ _get_replica_gen_and_trans_id.
+
+ :param other_replica_uid: The U1DB identifier for the other replica.
+ :type other_replica_uid: str
+ :param other_generation: The generation number for the other replica.
+ :type other_generation: int
+ :param other_transaction_id: The transaction id associated with the
+ generation.
+ :type other_transaction_id: str
+ :param number_of_docs: The total amount of documents sent on this sync
+ session.
+ :type number_of_docs: int
+ :param doc_idx: The index of the current document being sent.
+ :type doc_idx: int
+ :param sync_id: The id of the current sync session.
+ :type sync_id: str
+ """
+ doc_id = 'u1db_sync_%s' % other_replica_uid
+ try:
+ doc = self._database[doc_id]
+ except ResourceNotFound:
+ doc = {'_id': doc_id}
+ doc['generation'] = other_generation
+ doc['transaction_id'] = other_transaction_id
+ self._database.save(doc)
+
+ def _get_transaction_log(self):
+ """
+ This is only for the test suite, it is not part of the api.
+
+ :return: The complete transaction log.
+ :rtype: [(str, str)]
+
+ :raise MissingDesignDocError: Raised when tried to access a missing
+ design document.
+ :raise MissingDesignDocListFunctionError: Raised when trying to access
+ a missing list function on a
+ design document.
+ :raise MissingDesignDocNamedViewError: Raised when trying to access a
+ missing named view on a design
+ document.
+ :raise MissingDesignDocDeletedError: Raised when trying to access a
+ deleted design document.
+ :raise MissingDesignDocUnknownError: Raised when failed to access a
+ design document for an yet
+ unknown reason.
+ """
+ # query a couch view
+ ddoc_path = ['_design', 'transactions', '_view', 'log']
+ res = self._database.resource(*ddoc_path)
+ try:
+ response = res.get_json()
+ return map(
+ lambda row: (row['id'], row['value']),
+ response[2]['rows'])
+ except ResourceNotFound as e:
+ raise_missing_design_doc_error(e, ddoc_path)
+
+ def whats_changed(self, old_generation=0):
+ """
+ Return a list of documents that have changed since old_generation.
+
+ :param old_generation: The generation of the database in the old
+ state.
+ :type old_generation: int
+
+ :return: (generation, trans_id, [(doc_id, generation, trans_id),...])
+ The current generation of the database, its associated
+ transaction id, and a list of of changed documents since
+ old_generation, represented by tuples with for each document
+ its doc_id and the generation and transaction id corresponding
+ to the last intervening change and sorted by generation (old
+ changes first)
+ :rtype: (int, str, [(str, int, str)])
+
+ :raise MissingDesignDocError: Raised when tried to access a missing
+ design document.
+ :raise MissingDesignDocListFunctionError: Raised when trying to access
+ a missing list function on a
+ design document.
+ :raise MissingDesignDocNamedViewError: Raised when trying to access a
+ missing named view on a design
+ document.
+ :raise MissingDesignDocDeletedError: Raised when trying to access a
+ deleted design document.
+ :raise MissingDesignDocUnknownError: Raised when failed to access a
+ design document for an yet
+ unknown reason.
+ """
+ # query a couch list function
+ ddoc_path = [
+ '_design', 'transactions', '_list', 'whats_changed', 'log'
+ ]
+ res = self._database.resource(*ddoc_path)
+ try:
+ response = res.get_json(old_gen=old_generation)
+ results = map(
+ lambda row:
+ (row['generation'], row['doc_id'], row['transaction_id']),
+ response[2]['transactions'])
+ results.reverse()
+ cur_gen = old_generation
+ seen = set()
+ changes = []
+ newest_trans_id = ''
+ for generation, doc_id, trans_id in results:
+ if doc_id not in seen:
+ changes.append((doc_id, generation, trans_id))
+ seen.add(doc_id)
+ if changes:
+ cur_gen = changes[0][1] # max generation
+ newest_trans_id = changes[0][2]
+ changes.reverse()
+ else:
+ cur_gen, newest_trans_id = self._get_generation_info()
+
+ return cur_gen, newest_trans_id, changes
+ except ResourceNotFound as e:
+ raise_missing_design_doc_error(e, ddoc_path)
+ except ServerError as e:
+ raise_server_error(e, ddoc_path)
+
+ def _get_generation_info(self):
+ """
+ Return the current generation.
+
+ :return: A tuple containing the current generation and transaction id.
+ :rtype: (int, str)
+
+ :raise MissingDesignDocError: Raised when tried to access a missing
+ design document.
+ :raise MissingDesignDocListFunctionError: Raised when trying to access
+ a missing list function on a
+ design document.
+ :raise MissingDesignDocNamedViewError: Raised when trying to access a
+ missing named view on a design
+ document.
+ :raise MissingDesignDocDeletedError: Raised when trying to access a
+ deleted design document.
+ :raise MissingDesignDocUnknownError: Raised when failed to access a
+ design document for an yet
+ unknown reason.
+ """
+ # query a couch list function
+ ddoc_path = ['_design', 'transactions', '_list', 'generation', 'log']
+ res = self._database.resource(*ddoc_path)
+ try:
+ response = res.get_json()
+ return (response[2]['generation'], response[2]['transaction_id'])
+ except ResourceNotFound as e:
+ raise_missing_design_doc_error(e, ddoc_path)
+ except ServerError as e:
+ raise_server_error(e, ddoc_path)
+
+ def save_document(self, old_doc, doc, transaction_id):
+ """
+ Put the document in the Couch backend database.
+
+ Note that C{old_doc} must have been fetched with the parameter
+ C{check_for_conflicts} equal to True, so we can properly update the
+ new document using the conflict information from the old one.
+
+ :param old_doc: The old document version.
+ :type old_doc: CouchDocument
+ :param doc: The document to be put.
+ :type doc: CouchDocument
+
+ :raise RevisionConflict: Raised when trying to update a document but
+ couch revisions mismatch.
+ :raise MissingDesignDocError: Raised when tried to access a missing
+ design document.
+ :raise MissingDesignDocListFunctionError: Raised when trying to access
+ a missing list function on a
+ design document.
+ :raise MissingDesignDocNamedViewError: Raised when trying to access a
+ missing named view on a design
+ document.
+ :raise MissingDesignDocDeletedError: Raised when trying to access a
+ deleted design document.
+ :raise MissingDesignDocUnknownError: Raised when failed to access a
+ design document for an yet
+ unknown reason.
+ """
+ attachments = {} # we save content and conflicts as attachments
+ parts = [] # and we put it using couch's multipart PUT
+ # save content as attachment
+ if doc.is_tombstone() is False:
+ content = doc.get_json()
+ attachments['u1db_content'] = {
+ 'follows': True,
+ 'content_type': 'application/octet-stream',
+ 'length': len(content),
+ }
+ parts.append(content)
+ # save conflicts as attachment
+ if doc.has_conflicts is True:
+ conflicts = json.dumps(
+ map(lambda cdoc: (cdoc.rev, cdoc.content),
+ doc.get_conflicts()))
+ attachments['u1db_conflicts'] = {
+ 'follows': True,
+ 'content_type': 'application/octet-stream',
+ 'length': len(conflicts),
+ }
+ parts.append(conflicts)
+ # store old transactions, if any
+ transactions = old_doc.transactions[:] if old_doc is not None else []
+ # create a new transaction id and timestamp it so the transaction log
+ # is consistent when querying the database.
+ transactions.append(
+ # here we store milliseconds to keep consistent with javascript
+ # Date.prototype.getTime() which was used before inside a couchdb
+ # update handler.
+ (int(time.time() * 1000),
+ transaction_id))
+ # build the couch document
+ couch_doc = {
+ '_id': doc.doc_id,
+ 'u1db_rev': doc.rev,
+ 'u1db_transactions': transactions,
+ '_attachments': attachments,
+ }
+ # if we are updating a doc we have to add the couch doc revision
+ if old_doc is not None:
+ couch_doc['_rev'] = old_doc.couch_rev
+ # prepare the multipart PUT
+ buf = StringIO()
+ headers = {}
+ envelope = MultipartWriter(buf, headers=headers, subtype='related')
+ envelope.add('application/json', json.dumps(couch_doc))
+ for part in parts:
+ envelope.add('application/octet-stream', part)
+ envelope.close()
+ # try to save and fail if there's a revision conflict
+ try:
+ resource = self._new_resource()
+ resource.put_json(
+ doc.doc_id, body=str(buf.getvalue()), headers=headers)
+ except ResourceConflict:
+ raise RevisionConflict()
+ return transactions[-1][1]
+
def _new_resource(self, *path):
"""
Return a new resource for accessing a couch database.
@@ -1165,7 +1409,7 @@ class CouchDatabase(CommonBackend):
class CouchSyncTarget(CommonSyncTarget):
"""
- Functionality for using a CouchDatabase as a synchronization target.
+ Functionality for using a SoledadBackend as a synchronization target.
"""
def get_sync_info(self, source_replica_uid):
@@ -1222,13 +1466,12 @@ class CouchServerState(ServerState):
:param dbname: The name of the database to open.
:type dbname: str
- :return: The CouchDatabase object.
- :rtype: CouchDatabase
+ :return: The SoledadBackend object.
+ :rtype: SoledadBackend
"""
- db = CouchDatabase(
- self.couch_url,
- dbname,
- ensure_ddocs=False)
+ url = urljoin(self.couch_url, dbname)
+ db = SoledadBackend.open_database(url, create=False,
+ ensure_ddocs=False)
return db
def ensure_database(self, dbname):
@@ -1240,8 +1483,8 @@ class CouchServerState(ServerState):
:raise Unauthorized: If disabled or other error was raised.
- :return: The CouchDatabase object and its replica_uid.
- :rtype: (CouchDatabase, str)
+ :return: The SoledadBackend object and its replica_uid.
+ :rtype: (SoledadBackend, str)
"""
if not self.create_cmd:
raise Unauthorized()