summaryrefslogtreecommitdiff
path: root/common/src/leap/soledad/common/couch/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'common/src/leap/soledad/common/couch/__init__.py')
-rw-r--r--common/src/leap/soledad/common/couch/__init__.py166
1 files changed, 95 insertions, 71 deletions
diff --git a/common/src/leap/soledad/common/couch/__init__.py b/common/src/leap/soledad/common/couch/__init__.py
index d0c1a7ba..06c94c27 100644
--- a/common/src/leap/soledad/common/couch/__init__.py
+++ b/common/src/leap/soledad/common/couch/__init__.py
@@ -25,11 +25,9 @@ import uuid
import binascii
-from collections import defaultdict
from StringIO import StringIO
from urlparse import urljoin
from contextlib import contextmanager
-from threading import Lock
from couchdb.client import Server, Database
@@ -117,8 +115,6 @@ class CouchDatabase(object):
CouchDB details from backend code.
"""
- _put_doc_lock = defaultdict(Lock)
-
@classmethod
def open_database(cls, url, create, ensure_ddocs=False, replica_uid=None,
database_security=None):
@@ -670,6 +666,51 @@ class CouchDatabase(object):
_, _, data = resource.get_json(**kwargs)
return data
+ def _allocate_new_generation(self, doc_id, transaction_id):
+ """
+ Allocate a new generation number for a document modification.
+
+ We need to allocate a new generation to this document modification by
+ creating a new gen doc. In order to avoid concurrent database updates
+ from allocating the same new generation, we will try to create the
+ document until we succeed, meaning that no other piece of code holds
+ the same generation number as ours.
+
+ The loop below would only be executed more than once if:
+
+ 1. there's more than one thread trying to modify the user's database,
+ and
+
+ 2. the execution of getting the current generation and saving the gen
+ doc different threads get interleaved (one of them will succeed
+ and the others will fail and try again).
+
+ Number 1 only happens when more than one user device is syncing at the
+ same time. Number 2 depends on not-so-frequent coincidence of
+ code execution.
+
+ Also, in the race between threads for a generation number there's
+ always one thread that wins. so if there are N threads in the race, the
+ expected number of repetitions of the loop for each thread would be
+ N/2. If N is equal to the number of devices that the user has, the
+ number of possible repetitions of the loop should always be low.
+ """
+ while True:
+ try:
+ # add the gen document
+ gen, _ = self.get_generation_info()
+ new_gen = gen + 1
+ gen_doc = {
+ '_id': _get_gen_doc_id(new_gen),
+ GENERATION_KEY: new_gen,
+ DOC_ID_KEY: doc_id,
+ TRANSACTION_ID_KEY: transaction_id,
+ }
+ self._database.save(gen_doc)
+ break # succeeded allocating a new generation, proceed
+ except ResourceConflict:
+ pass # try again!
+
def save_document(self, old_doc, doc, transaction_id):
"""
Put the document in the Couch backend database.
@@ -710,73 +751,56 @@ class CouchDatabase(object):
}
parts.append(conflicts)
- # TODO: in u1db protocol, the increment of database generation should
- # be made in the same atomic transaction as the actual document save,
- # otherwise the same document might be concurrently updated by
- # concurrent syncs from other replicas. A simple lock based on the uuid
- # and doc_id would be enough to prevent that, if all entry points to
- # database update are made through the soledad api.
- with self._put_doc_lock[self._database.name]:
-
- # add the gen document
- gen, _ = self.get_generation_info()
- new_gen = gen + 1
- gen_doc = {
- '_id': _get_gen_doc_id(new_gen),
- GENERATION_KEY: new_gen,
- DOC_ID_KEY: doc.doc_id,
- TRANSACTION_ID_KEY: transaction_id,
- }
- self._database.save(gen_doc)
-
- # build the couch document
- couch_doc = {
- '_id': doc.doc_id,
- 'u1db_rev': doc.rev,
- '_attachments': attachments,
- }
- # if we are updating a doc we have to add the couch doc revision
- if old_doc is not None and hasattr(old_doc, 'couch_rev'):
- couch_doc['_rev'] = old_doc.couch_rev
- # prepare the multipart PUT
- if not self.batching:
- buf = StringIO()
- envelope = MultipartWriter(buf)
- # the order in which attachments are described inside the
- # serialization of the couch document must match the order in
- # which they are actually written in the multipart structure.
- # Because of that, we use `sorted_keys=True` in the json
- # serialization (so "u1db_conflicts" comes before
- # "u1db_content" on the couch document attachments
- # description), and also reverse the order of the parts before
- # writing them, so the "conflict" part is written before the
- # "content" part.
- envelope.add(
- 'application/json',
- json.dumps(couch_doc, sort_keys=True))
- parts.reverse()
- for part in parts:
- envelope.add('application/octet-stream', part)
- envelope.close()
- # try to save and fail if there's a revision conflict
- try:
- resource = self._new_resource()
- resource.put_json(
- doc.doc_id, body=str(buf.getvalue()),
- headers=envelope.headers)
- except ResourceConflict:
- raise RevisionConflict()
- else:
- for name, attachment in attachments.items():
- del attachment['follows']
- del attachment['length']
- index = 0 if name is 'u1db_content' else 1
- attachment['data'] = binascii.b2a_base64(
- parts[index]).strip()
- couch_doc['_attachments'] = attachments
- self.batch_docs[doc.doc_id] = couch_doc
- last_gen, last_trans_id = self.batch_generation
- self.batch_generation = (last_gen + 1, transaction_id)
+ self._allocate_new_generation(doc.doc_id, transaction_id)
+
+ # build the couch document
+ couch_doc = {
+ '_id': doc.doc_id,
+ 'u1db_rev': doc.rev,
+ '_attachments': attachments,
+ }
+ # if we are updating a doc we have to add the couch doc revision
+ if old_doc is not None and hasattr(old_doc, 'couch_rev'):
+ couch_doc['_rev'] = old_doc.couch_rev
+ # prepare the multipart PUT
+ if not self.batching:
+ buf = StringIO()
+ envelope = MultipartWriter(buf)
+ # the order in which attachments are described inside the
+ # serialization of the couch document must match the order in
+ # which they are actually written in the multipart structure.
+ # Because of that, we use `sorted_keys=True` in the json
+ # serialization (so "u1db_conflicts" comes before
+ # "u1db_content" on the couch document attachments
+ # description), and also reverse the order of the parts before
+ # writing them, so the "conflict" part is written before the
+ # "content" part.
+ envelope.add(
+ 'application/json',
+ json.dumps(couch_doc, sort_keys=True))
+ parts.reverse()
+ for part in parts:
+ envelope.add('application/octet-stream', part)
+ envelope.close()
+ # try to save and fail if there's a revision conflict
+ try:
+ resource = self._new_resource()
+ resource.put_json(
+ doc.doc_id, body=str(buf.getvalue()),
+ headers=envelope.headers)
+ except ResourceConflict:
+ raise RevisionConflict()
+ else:
+ for name, attachment in attachments.items():
+ del attachment['follows']
+ del attachment['length']
+ index = 0 if name is 'u1db_content' else 1
+ attachment['data'] = binascii.b2a_base64(
+ parts[index]).strip()
+ couch_doc['_attachments'] = attachments
+ self.batch_docs[doc.doc_id] = couch_doc
+ last_gen, last_trans_id = self.batch_generation
+ self.batch_generation = (last_gen + 1, transaction_id)
def _new_resource(self, *path):
"""