diff options
Diffstat (limited to 'common/src/leap/soledad')
-rw-r--r-- | common/src/leap/soledad/common/couch/__init__.py | 166 |
1 files changed, 95 insertions, 71 deletions
diff --git a/common/src/leap/soledad/common/couch/__init__.py b/common/src/leap/soledad/common/couch/__init__.py index d0c1a7ba..06c94c27 100644 --- a/common/src/leap/soledad/common/couch/__init__.py +++ b/common/src/leap/soledad/common/couch/__init__.py @@ -25,11 +25,9 @@ import uuid import binascii -from collections import defaultdict from StringIO import StringIO from urlparse import urljoin from contextlib import contextmanager -from threading import Lock from couchdb.client import Server, Database @@ -117,8 +115,6 @@ class CouchDatabase(object): CouchDB details from backend code. """ - _put_doc_lock = defaultdict(Lock) - @classmethod def open_database(cls, url, create, ensure_ddocs=False, replica_uid=None, database_security=None): @@ -670,6 +666,51 @@ class CouchDatabase(object): _, _, data = resource.get_json(**kwargs) return data + def _allocate_new_generation(self, doc_id, transaction_id): + """ + Allocate a new generation number for a document modification. + + We need to allocate a new generation to this document modification by + creating a new gen doc. In order to avoid concurrent database updates + from allocating the same new generation, we will try to create the + document until we succeed, meaning that no other piece of code holds + the same generation number as ours. + + The loop below would only be executed more than once if: + + 1. there's more than one thread trying to modify the user's database, + and + + 2. the execution of getting the current generation and saving the gen + doc different threads get interleaved (one of them will succeed + and the others will fail and try again). + + Number 1 only happens when more than one user device is syncing at the + same time. Number 2 depends on not-so-frequent coincidence of + code execution. + + Also, in the race between threads for a generation number there's + always one thread that wins. so if there are N threads in the race, the + expected number of repetitions of the loop for each thread would be + N/2. If N is equal to the number of devices that the user has, the + number of possible repetitions of the loop should always be low. + """ + while True: + try: + # add the gen document + gen, _ = self.get_generation_info() + new_gen = gen + 1 + gen_doc = { + '_id': _get_gen_doc_id(new_gen), + GENERATION_KEY: new_gen, + DOC_ID_KEY: doc_id, + TRANSACTION_ID_KEY: transaction_id, + } + self._database.save(gen_doc) + break # succeeded allocating a new generation, proceed + except ResourceConflict: + pass # try again! + def save_document(self, old_doc, doc, transaction_id): """ Put the document in the Couch backend database. @@ -710,73 +751,56 @@ class CouchDatabase(object): } parts.append(conflicts) - # TODO: in u1db protocol, the increment of database generation should - # be made in the same atomic transaction as the actual document save, - # otherwise the same document might be concurrently updated by - # concurrent syncs from other replicas. A simple lock based on the uuid - # and doc_id would be enough to prevent that, if all entry points to - # database update are made through the soledad api. - with self._put_doc_lock[self._database.name]: - - # add the gen document - gen, _ = self.get_generation_info() - new_gen = gen + 1 - gen_doc = { - '_id': _get_gen_doc_id(new_gen), - GENERATION_KEY: new_gen, - DOC_ID_KEY: doc.doc_id, - TRANSACTION_ID_KEY: transaction_id, - } - self._database.save(gen_doc) - - # build the couch document - couch_doc = { - '_id': doc.doc_id, - 'u1db_rev': doc.rev, - '_attachments': attachments, - } - # if we are updating a doc we have to add the couch doc revision - if old_doc is not None and hasattr(old_doc, 'couch_rev'): - couch_doc['_rev'] = old_doc.couch_rev - # prepare the multipart PUT - if not self.batching: - buf = StringIO() - envelope = MultipartWriter(buf) - # the order in which attachments are described inside the - # serialization of the couch document must match the order in - # which they are actually written in the multipart structure. - # Because of that, we use `sorted_keys=True` in the json - # serialization (so "u1db_conflicts" comes before - # "u1db_content" on the couch document attachments - # description), and also reverse the order of the parts before - # writing them, so the "conflict" part is written before the - # "content" part. - envelope.add( - 'application/json', - json.dumps(couch_doc, sort_keys=True)) - parts.reverse() - for part in parts: - envelope.add('application/octet-stream', part) - envelope.close() - # try to save and fail if there's a revision conflict - try: - resource = self._new_resource() - resource.put_json( - doc.doc_id, body=str(buf.getvalue()), - headers=envelope.headers) - except ResourceConflict: - raise RevisionConflict() - else: - for name, attachment in attachments.items(): - del attachment['follows'] - del attachment['length'] - index = 0 if name is 'u1db_content' else 1 - attachment['data'] = binascii.b2a_base64( - parts[index]).strip() - couch_doc['_attachments'] = attachments - self.batch_docs[doc.doc_id] = couch_doc - last_gen, last_trans_id = self.batch_generation - self.batch_generation = (last_gen + 1, transaction_id) + self._allocate_new_generation(doc.doc_id, transaction_id) + + # build the couch document + couch_doc = { + '_id': doc.doc_id, + 'u1db_rev': doc.rev, + '_attachments': attachments, + } + # if we are updating a doc we have to add the couch doc revision + if old_doc is not None and hasattr(old_doc, 'couch_rev'): + couch_doc['_rev'] = old_doc.couch_rev + # prepare the multipart PUT + if not self.batching: + buf = StringIO() + envelope = MultipartWriter(buf) + # the order in which attachments are described inside the + # serialization of the couch document must match the order in + # which they are actually written in the multipart structure. + # Because of that, we use `sorted_keys=True` in the json + # serialization (so "u1db_conflicts" comes before + # "u1db_content" on the couch document attachments + # description), and also reverse the order of the parts before + # writing them, so the "conflict" part is written before the + # "content" part. + envelope.add( + 'application/json', + json.dumps(couch_doc, sort_keys=True)) + parts.reverse() + for part in parts: + envelope.add('application/octet-stream', part) + envelope.close() + # try to save and fail if there's a revision conflict + try: + resource = self._new_resource() + resource.put_json( + doc.doc_id, body=str(buf.getvalue()), + headers=envelope.headers) + except ResourceConflict: + raise RevisionConflict() + else: + for name, attachment in attachments.items(): + del attachment['follows'] + del attachment['length'] + index = 0 if name is 'u1db_content' else 1 + attachment['data'] = binascii.b2a_base64( + parts[index]).strip() + couch_doc['_attachments'] = attachments + self.batch_docs[doc.doc_id] = couch_doc + last_gen, last_trans_id = self.batch_generation + self.batch_generation = (last_gen + 1, transaction_id) def _new_resource(self, *path): """ |