From 1106d871e9a7e09cedac436a0488fc87af177b67 Mon Sep 17 00:00:00 2001 From: drebs Date: Fri, 29 Jul 2016 20:34:19 -0300 Subject: [bug] use couch lock to atomize saving of document --- common/src/leap/soledad/common/couch/__init__.py | 98 ++++++++++++------------ 1 file changed, 50 insertions(+), 48 deletions(-) (limited to 'common/src/leap') diff --git a/common/src/leap/soledad/common/couch/__init__.py b/common/src/leap/soledad/common/couch/__init__.py index 032f230b..2d57635a 100644 --- a/common/src/leap/soledad/common/couch/__init__.py +++ b/common/src/leap/soledad/common/couch/__init__.py @@ -705,8 +705,6 @@ class CouchDatabase(object): } parts.append(conflicts) - # add the gen document - # TODO: in u1db protocol, the increment of database generation should # be made in the same atomic transaction as the actual document save, # otherwise the same document might be concurrently updated by @@ -714,6 +712,8 @@ class CouchDatabase(object): # and doc_id would be enough to prevent that, if all entry points to # database update are made through the soledad api. with self._put_doc_lock[self._database.name]: + + # add the gen document gen, _ = self.get_generation_info() new_gen = gen + 1 gen_doc = { @@ -724,52 +724,54 @@ class CouchDatabase(object): } self._database.save(gen_doc) - # build the couch document - couch_doc = { - '_id': doc.doc_id, - 'u1db_rev': doc.rev, - '_attachments': attachments, - } - # if we are updating a doc we have to add the couch doc revision - if old_doc is not None and hasattr(old_doc, 'couch_rev'): - couch_doc['_rev'] = old_doc.couch_rev - # prepare the multipart PUT - if not self.batching: - buf = StringIO() - envelope = MultipartWriter(buf) - # the order in which attachments are described inside the - # serialization of the couch document must match the order in which - # they are actually written in the multipart structure. Because of - # that, we use `sorted_keys=True` in the json serialization (so - # "u1db_conflicts" comes before "u1db_content" on the couch - # document attachments description), and also reverse the order of - # the parts before writing them, so the "conflict" part is written - # before the "content" part. - envelope.add( - 'application/json', - json.dumps(couch_doc, sort_keys=True)) - parts.reverse() - for part in parts: - envelope.add('application/octet-stream', part) - envelope.close() - # try to save and fail if there's a revision conflict - try: - resource = self._new_resource() - resource.put_json( - doc.doc_id, body=str(buf.getvalue()), - headers=envelope.headers) - except ResourceConflict: - raise RevisionConflict() - else: - for name, attachment in attachments.items(): - del attachment['follows'] - del attachment['length'] - index = 0 if name is 'u1db_content' else 1 - attachment['data'] = binascii.b2a_base64(parts[index]).strip() - couch_doc['_attachments'] = attachments - self.batch_docs[doc.doc_id] = couch_doc - last_gen, last_trans_id = self.batch_generation - self.batch_generation = (last_gen + 1, transaction_id) + # build the couch document + couch_doc = { + '_id': doc.doc_id, + 'u1db_rev': doc.rev, + '_attachments': attachments, + } + # if we are updating a doc we have to add the couch doc revision + if old_doc is not None and hasattr(old_doc, 'couch_rev'): + couch_doc['_rev'] = old_doc.couch_rev + # prepare the multipart PUT + if not self.batching: + buf = StringIO() + envelope = MultipartWriter(buf) + # the order in which attachments are described inside the + # serialization of the couch document must match the order in + # which they are actually written in the multipart structure. + # Because of that, we use `sorted_keys=True` in the json + # serialization (so "u1db_conflicts" comes before + # "u1db_content" on the couch document attachments + # description), and also reverse the order of the parts before + # writing them, so the "conflict" part is written before the + # "content" part. + envelope.add( + 'application/json', + json.dumps(couch_doc, sort_keys=True)) + parts.reverse() + for part in parts: + envelope.add('application/octet-stream', part) + envelope.close() + # try to save and fail if there's a revision conflict + try: + resource = self._new_resource() + resource.put_json( + doc.doc_id, body=str(buf.getvalue()), + headers=envelope.headers) + except ResourceConflict: + raise RevisionConflict() + else: + for name, attachment in attachments.items(): + del attachment['follows'] + del attachment['length'] + index = 0 if name is 'u1db_content' else 1 + attachment['data'] = binascii.b2a_base64( + parts[index]).strip() + couch_doc['_attachments'] = attachments + self.batch_docs[doc.doc_id] = couch_doc + last_gen, last_trans_id = self.batch_generation + self.batch_generation = (last_gen + 1, transaction_id) def _new_resource(self, *path): """ -- cgit v1.2.3