summaryrefslogtreecommitdiff
path: root/common
diff options
context:
space:
mode:
authordrebs <drebs@leap.se>2016-07-31 18:25:48 -0300
committerdrebs <drebs@leap.se>2016-08-01 21:09:05 -0300
commit49cd07b909f2185b116bda5b30cfcfe0095291e0 (patch)
tree1686ada5eaa342db28f2f9384cfda7258ae65d1d /common
parent3b237bb46743a93feed4bb6f3c839d72fc28df48 (diff)
[bug] retry allocation of gen instead of using a lock
The use of a lock to allocate the next generation of a change in couch backend suffers from at least 2 problems: 1. all modification to the couch database would have to be made through a soledad server entrypoint, otherwise the lock would have no effect. 2. introducing a lock makes code uglier, harder to debug, and prone to undesired blocks. The solution implemented by this commit is not so elegant, but works for what we need right now. Now, concurrent threads updating the couch database will race for the allocation of a new generation, and retry when they fail to do so. There's no high risk of getting blocked for too much time in the while loop because (1) there's always one thread that wins (what makes the expected number of retries to be N/2 if N is the number of concurrent threads), and (2) the number of concurrent attempts to update the user database is limited by the number of devices syncing at the same time.
Diffstat (limited to 'common')
-rw-r--r--common/src/leap/soledad/common/couch/__init__.py166
1 files changed, 95 insertions, 71 deletions
diff --git a/common/src/leap/soledad/common/couch/__init__.py b/common/src/leap/soledad/common/couch/__init__.py
index d0c1a7ba..06c94c27 100644
--- a/common/src/leap/soledad/common/couch/__init__.py
+++ b/common/src/leap/soledad/common/couch/__init__.py
@@ -25,11 +25,9 @@ import uuid
import binascii
-from collections import defaultdict
from StringIO import StringIO
from urlparse import urljoin
from contextlib import contextmanager
-from threading import Lock
from couchdb.client import Server, Database
@@ -117,8 +115,6 @@ class CouchDatabase(object):
CouchDB details from backend code.
"""
- _put_doc_lock = defaultdict(Lock)
-
@classmethod
def open_database(cls, url, create, ensure_ddocs=False, replica_uid=None,
database_security=None):
@@ -670,6 +666,51 @@ class CouchDatabase(object):
_, _, data = resource.get_json(**kwargs)
return data
+ def _allocate_new_generation(self, doc_id, transaction_id):
+ """
+ Allocate a new generation number for a document modification.
+
+ We need to allocate a new generation to this document modification by
+ creating a new gen doc. In order to avoid concurrent database updates
+ from allocating the same new generation, we will try to create the
+ document until we succeed, meaning that no other piece of code holds
+ the same generation number as ours.
+
+ The loop below would only be executed more than once if:
+
+ 1. there's more than one thread trying to modify the user's database,
+ and
+
+ 2. the execution of getting the current generation and saving the gen
+ doc different threads get interleaved (one of them will succeed
+ and the others will fail and try again).
+
+ Number 1 only happens when more than one user device is syncing at the
+ same time. Number 2 depends on not-so-frequent coincidence of
+ code execution.
+
+ Also, in the race between threads for a generation number there's
+ always one thread that wins. so if there are N threads in the race, the
+ expected number of repetitions of the loop for each thread would be
+ N/2. If N is equal to the number of devices that the user has, the
+ number of possible repetitions of the loop should always be low.
+ """
+ while True:
+ try:
+ # add the gen document
+ gen, _ = self.get_generation_info()
+ new_gen = gen + 1
+ gen_doc = {
+ '_id': _get_gen_doc_id(new_gen),
+ GENERATION_KEY: new_gen,
+ DOC_ID_KEY: doc_id,
+ TRANSACTION_ID_KEY: transaction_id,
+ }
+ self._database.save(gen_doc)
+ break # succeeded allocating a new generation, proceed
+ except ResourceConflict:
+ pass # try again!
+
def save_document(self, old_doc, doc, transaction_id):
"""
Put the document in the Couch backend database.
@@ -710,73 +751,56 @@ class CouchDatabase(object):
}
parts.append(conflicts)
- # TODO: in u1db protocol, the increment of database generation should
- # be made in the same atomic transaction as the actual document save,
- # otherwise the same document might be concurrently updated by
- # concurrent syncs from other replicas. A simple lock based on the uuid
- # and doc_id would be enough to prevent that, if all entry points to
- # database update are made through the soledad api.
- with self._put_doc_lock[self._database.name]:
-
- # add the gen document
- gen, _ = self.get_generation_info()
- new_gen = gen + 1
- gen_doc = {
- '_id': _get_gen_doc_id(new_gen),
- GENERATION_KEY: new_gen,
- DOC_ID_KEY: doc.doc_id,
- TRANSACTION_ID_KEY: transaction_id,
- }
- self._database.save(gen_doc)
-
- # build the couch document
- couch_doc = {
- '_id': doc.doc_id,
- 'u1db_rev': doc.rev,
- '_attachments': attachments,
- }
- # if we are updating a doc we have to add the couch doc revision
- if old_doc is not None and hasattr(old_doc, 'couch_rev'):
- couch_doc['_rev'] = old_doc.couch_rev
- # prepare the multipart PUT
- if not self.batching:
- buf = StringIO()
- envelope = MultipartWriter(buf)
- # the order in which attachments are described inside the
- # serialization of the couch document must match the order in
- # which they are actually written in the multipart structure.
- # Because of that, we use `sorted_keys=True` in the json
- # serialization (so "u1db_conflicts" comes before
- # "u1db_content" on the couch document attachments
- # description), and also reverse the order of the parts before
- # writing them, so the "conflict" part is written before the
- # "content" part.
- envelope.add(
- 'application/json',
- json.dumps(couch_doc, sort_keys=True))
- parts.reverse()
- for part in parts:
- envelope.add('application/octet-stream', part)
- envelope.close()
- # try to save and fail if there's a revision conflict
- try:
- resource = self._new_resource()
- resource.put_json(
- doc.doc_id, body=str(buf.getvalue()),
- headers=envelope.headers)
- except ResourceConflict:
- raise RevisionConflict()
- else:
- for name, attachment in attachments.items():
- del attachment['follows']
- del attachment['length']
- index = 0 if name is 'u1db_content' else 1
- attachment['data'] = binascii.b2a_base64(
- parts[index]).strip()
- couch_doc['_attachments'] = attachments
- self.batch_docs[doc.doc_id] = couch_doc
- last_gen, last_trans_id = self.batch_generation
- self.batch_generation = (last_gen + 1, transaction_id)
+ self._allocate_new_generation(doc.doc_id, transaction_id)
+
+ # build the couch document
+ couch_doc = {
+ '_id': doc.doc_id,
+ 'u1db_rev': doc.rev,
+ '_attachments': attachments,
+ }
+ # if we are updating a doc we have to add the couch doc revision
+ if old_doc is not None and hasattr(old_doc, 'couch_rev'):
+ couch_doc['_rev'] = old_doc.couch_rev
+ # prepare the multipart PUT
+ if not self.batching:
+ buf = StringIO()
+ envelope = MultipartWriter(buf)
+ # the order in which attachments are described inside the
+ # serialization of the couch document must match the order in
+ # which they are actually written in the multipart structure.
+ # Because of that, we use `sorted_keys=True` in the json
+ # serialization (so "u1db_conflicts" comes before
+ # "u1db_content" on the couch document attachments
+ # description), and also reverse the order of the parts before
+ # writing them, so the "conflict" part is written before the
+ # "content" part.
+ envelope.add(
+ 'application/json',
+ json.dumps(couch_doc, sort_keys=True))
+ parts.reverse()
+ for part in parts:
+ envelope.add('application/octet-stream', part)
+ envelope.close()
+ # try to save and fail if there's a revision conflict
+ try:
+ resource = self._new_resource()
+ resource.put_json(
+ doc.doc_id, body=str(buf.getvalue()),
+ headers=envelope.headers)
+ except ResourceConflict:
+ raise RevisionConflict()
+ else:
+ for name, attachment in attachments.items():
+ del attachment['follows']
+ del attachment['length']
+ index = 0 if name is 'u1db_content' else 1
+ attachment['data'] = binascii.b2a_base64(
+ parts[index]).strip()
+ couch_doc['_attachments'] = attachments
+ self.batch_docs[doc.doc_id] = couch_doc
+ last_gen, last_trans_id = self.batch_generation
+ self.batch_generation = (last_gen + 1, transaction_id)
def _new_resource(self, *path):
"""