From 086f9fad3ebb1bdcc0ec3fa6161f801068a0c641 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 17 Nov 2015 16:58:15 -0300 Subject: [bug] put a monkeypatch back for bigcouch Current code was tested on couch 1.6 and a monkeypatch got removed during refactor. This commit re-adds it, but in a separate module that is intended to hold temporary code for compatibility that can be removed on version upgrades. --- common/src/leap/soledad/common/couch/__init__.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'common/src/leap/soledad/common/couch/__init__.py') diff --git a/common/src/leap/soledad/common/couch/__init__.py b/common/src/leap/soledad/common/couch/__init__.py index bd8b08b7..fb3d57af 100644 --- a/common/src/leap/soledad/common/couch/__init__.py +++ b/common/src/leap/soledad/common/couch/__init__.py @@ -33,7 +33,6 @@ from multiprocessing.pool import ThreadPool from couchdb.client import Server, Database -from couchdb.multipart import MultipartWriter from couchdb.http import ( ResourceConflict, ResourceNotFound, @@ -53,6 +52,7 @@ from u1db.remote import http_app from leap.soledad.common import ddocs from .errors import raise_server_error from .errors import raise_missing_design_doc_error +from .support import MultipartWriter from leap.soledad.common.errors import InvalidURLError from leap.soledad.common.document import ServerDocument from leap.soledad.common.backend import SoledadBackend @@ -692,8 +692,7 @@ class CouchDatabase(object): couch_doc['_rev'] = old_doc.couch_rev # prepare the multipart PUT buf = StringIO() - headers = {} - envelope = MultipartWriter(buf, headers=headers, subtype='related') + envelope = MultipartWriter(buf) envelope.add('application/json', json.dumps(couch_doc)) for part in parts: envelope.add('application/octet-stream', part) @@ -702,7 +701,8 @@ class CouchDatabase(object): try: resource = self._new_resource() resource.put_json( - doc.doc_id, body=str(buf.getvalue()), headers=headers) + doc.doc_id, body=str(buf.getvalue()), + headers=envelope.headers) except ResourceConflict: raise RevisionConflict() return transactions[-1][1] -- cgit v1.2.3 From bcbb9ccd1d4a281b6922340c12ec01b09d636380 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sat, 14 Nov 2015 03:38:20 -0300 Subject: [feat] put all docs at once Using _bulk_docs api from CouchDB we can put all docs at a single request. Also, prefetching all ids removes the need to HEAD requests during the batch. --- common/src/leap/soledad/common/couch/__init__.py | 51 +++++++++++++++++------- 1 file changed, 37 insertions(+), 14 deletions(-) (limited to 'common/src/leap/soledad/common/couch/__init__.py') diff --git a/common/src/leap/soledad/common/couch/__init__.py b/common/src/leap/soledad/common/couch/__init__.py index fb3d57af..f3437ae1 100644 --- a/common/src/leap/soledad/common/couch/__init__.py +++ b/common/src/leap/soledad/common/couch/__init__.py @@ -153,10 +153,22 @@ class CouchDatabase(object): self._url = url self._dbname = dbname self._database = self.get_couch_database(url, dbname) + self.batching = False + self.batch_docs = [] if ensure_ddocs: self.ensure_ddocs_on_db() self.ensure_security_ddoc(database_security) + def batch_start(self): + self.batching = True + ids = set(row.id for row in self._database.view('_all_docs')) + self.batched_ids = ids + + def batch_end(self): + self.batching = False + self._database.update(self.batch_docs) + self.batch_docs = [] + def get_couch_database(self, url, dbname): """ Generate a couchdb.Database instance given a url and dbname. @@ -339,6 +351,8 @@ class CouchDatabase(object): """ # get document with all attachments (u1db content and eventual # conflicts) + if self.batching and doc_id not in self.batched_ids: + return None if doc_id not in self._database: return None result = self.json_from_resource([doc_id], attachments=True) @@ -691,20 +705,29 @@ class CouchDatabase(object): if old_doc is not None and hasattr(old_doc, 'couch_rev'): couch_doc['_rev'] = old_doc.couch_rev # prepare the multipart PUT - buf = StringIO() - envelope = MultipartWriter(buf) - envelope.add('application/json', json.dumps(couch_doc)) - for part in parts: - envelope.add('application/octet-stream', part) - envelope.close() - # try to save and fail if there's a revision conflict - try: - resource = self._new_resource() - resource.put_json( - doc.doc_id, body=str(buf.getvalue()), - headers=envelope.headers) - except ResourceConflict: - raise RevisionConflict() + if not self.batching: + buf = StringIO() + envelope = MultipartWriter(buf) + envelope.add('application/json', json.dumps(couch_doc)) + for part in parts: + envelope.add('application/octet-stream', part) + envelope.close() + # try to save and fail if there's a revision conflict + try: + resource = self._new_resource() + resource.put_json( + doc.doc_id, body=str(buf.getvalue()), + headers=envelope.headers) + except ResourceConflict: + raise RevisionConflict() + else: + for name, attachment in attachments.items(): + del attachment['follows'] + del attachment['length'] + index = 0 if name is 'u1db_content' else 1 + attachment['data'] = binascii.b2a_base64(parts[index]).strip() + couch_doc['attachments'] = attachments + self.batch_docs.append(couch_doc) return transactions[-1][1] def _new_resource(self, *path): -- cgit v1.2.3 From fa595231b5ea346dfd9e06c364854469397fca3f Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sat, 14 Nov 2015 21:53:14 -0300 Subject: [feat] checks staged docs inside batch This commit adds checking for consistency on batch. When a doc is needed during a batched sync and it doesnt exists on database, current code will make a partial batch to avoid processing like it doesnt exist. --- common/src/leap/soledad/common/couch/__init__.py | 50 ++++++++++++++++++++---- 1 file changed, 42 insertions(+), 8 deletions(-) (limited to 'common/src/leap/soledad/common/couch/__init__.py') diff --git a/common/src/leap/soledad/common/couch/__init__.py b/common/src/leap/soledad/common/couch/__init__.py index f3437ae1..a922fd48 100644 --- a/common/src/leap/soledad/common/couch/__init__.py +++ b/common/src/leap/soledad/common/couch/__init__.py @@ -154,7 +154,7 @@ class CouchDatabase(object): self._dbname = dbname self._database = self.get_couch_database(url, dbname) self.batching = False - self.batch_docs = [] + self.batch_docs = {} if ensure_ddocs: self.ensure_ddocs_on_db() self.ensure_security_ddoc(database_security) @@ -166,8 +166,7 @@ class CouchDatabase(object): def batch_end(self): self.batching = False - self._database.update(self.batch_docs) - self.batch_docs = [] + self.__perform_batch() def get_couch_database(self, url, dbname): """ @@ -197,7 +196,8 @@ class CouchDatabase(object): """ for ddoc_name in ['docs', 'syncs', 'transactions']: try: - self.json_from_resource(['_design', ddoc_name, '_info'], + self.json_from_resource(['_design'] + + ddoc_name.split('/') + ['_info'], check_missing_ddoc=False) except ResourceNotFound: ddoc = json.loads( @@ -349,15 +349,49 @@ class CouchDatabase(object): :return: The document. :rtype: ServerDocument """ - # get document with all attachments (u1db content and eventual - # conflicts) + doc_from_batch = self.__check_batch_before_get(doc_id) + if doc_from_batch: + return doc_from_batch if self.batching and doc_id not in self.batched_ids: return None if doc_id not in self._database: return None + # get document with all attachments (u1db content and eventual + # conflicts) result = self.json_from_resource([doc_id], attachments=True) return self.__parse_doc_from_couch(result, doc_id, check_for_conflicts) + def __check_batch_before_get(self, doc_id): + """ + If doc_id is staged for batching, then we need to commit the batch + before going ahead. This avoids consistency problems, like trying to + get a document that isn't persisted and processing like it is missing. + + :param doc_id: The unique document identifier + :type doc_id: str + """ + if doc_id in self.batch_docs: + couch_doc = self.batch_docs[doc_id] + rev = self.__perform_batch(doc_id) + couch_doc['_rev'] = rev + self.batched_ids.add(doc_id) + return self.__parse_doc_from_couch(couch_doc, doc_id, True) + return None + + def __perform_batch(self, doc_id=None): + status = self._database.update(self.batch_docs.values()) + rev = None + for ok, stored_doc_id, rev_or_error in status: + if not ok: + error = rev_or_error + if type(error) is ResourceConflict: + raise RevisionConflict + raise error + elif doc_id == stored_doc_id: + rev = rev_or_error + self.batch_docs.clear() + return rev + def __parse_doc_from_couch(self, result, doc_id, check_for_conflicts=False): # restrict to u1db documents @@ -726,8 +760,8 @@ class CouchDatabase(object): del attachment['length'] index = 0 if name is 'u1db_content' else 1 attachment['data'] = binascii.b2a_base64(parts[index]).strip() - couch_doc['attachments'] = attachments - self.batch_docs.append(couch_doc) + couch_doc['_attachments'] = attachments + self.batch_docs[doc.doc_id] = couch_doc return transactions[-1][1] def _new_resource(self, *path): -- cgit v1.2.3 From d103491cbc17e6e7422653e9b01101ff446e7391 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Thu, 3 Dec 2015 19:25:29 -0300 Subject: [feat] generation caching during a batch Generation cache was removed for simple processing and it should not got back, but during a batch the server wont change its generation. So a little trick to hold that temporary information until batch finishes is needed. --- common/src/leap/soledad/common/couch/__init__.py | 7 +++++++ 1 file changed, 7 insertions(+) (limited to 'common/src/leap/soledad/common/couch/__init__.py') diff --git a/common/src/leap/soledad/common/couch/__init__.py b/common/src/leap/soledad/common/couch/__init__.py index a922fd48..dae460cb 100644 --- a/common/src/leap/soledad/common/couch/__init__.py +++ b/common/src/leap/soledad/common/couch/__init__.py @@ -154,6 +154,7 @@ class CouchDatabase(object): self._dbname = dbname self._database = self.get_couch_database(url, dbname) self.batching = False + self.batch_generation = None self.batch_docs = {} if ensure_ddocs: self.ensure_ddocs_on_db() @@ -161,11 +162,13 @@ class CouchDatabase(object): def batch_start(self): self.batching = True + self.batch_generation = self.get_generation_info() ids = set(row.id for row in self._database.view('_all_docs')) self.batched_ids = ids def batch_end(self): self.batching = False + self.batch_generation = None self.__perform_batch() def get_couch_database(self, url, dbname): @@ -619,6 +622,8 @@ class CouchDatabase(object): :return: A tuple containing the current generation and transaction id. :rtype: (int, str) """ + if self.batching and self.batch_generation: + return self.batch_generation # query a couch list function ddoc_path = ['_design', 'transactions', '_list', 'generation', 'log'] info = self.json_from_resource(ddoc_path) @@ -762,6 +767,8 @@ class CouchDatabase(object): attachment['data'] = binascii.b2a_base64(parts[index]).strip() couch_doc['_attachments'] = attachments self.batch_docs[doc.doc_id] = couch_doc + last_gen, last_trans_id = self.batch_generation + self.batch_generation = (last_gen + 1, transaction_id) return transactions[-1][1] def _new_resource(self, *path): -- cgit v1.2.3