From 92b86079d34da5252b826d32afabafde84dab1af Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 4 Sep 2015 17:30:13 -0300 Subject: [refactor] couch.py should use superclass methods _put_doc_if_newer is implemented on CommonBackend already. This was copied over to CouchBackend just to add ensure conflicts. We can do this before calling the super method instead. --- common/src/leap/soledad/common/couch.py | 91 +++++---------------------------- 1 file changed, 13 insertions(+), 78 deletions(-) (limited to 'common/src/leap/soledad') diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py index 1c762036..771db69e 100644 --- a/common/src/leap/soledad/common/couch.py +++ b/common/src/leap/soledad/common/couch.py @@ -103,6 +103,7 @@ class CouchDocument(SoledadDocument): SoledadDocument.__init__(self, doc_id, rev, json, has_conflicts) self.couch_rev = None self.transactions = None + self._conflicts = None def get_conflicts(self): """ @@ -111,7 +112,7 @@ class CouchDocument(SoledadDocument): :return: The conflicted versions of the document. :rtype: [CouchDocument] """ - return self._conflicts + return self._conflicts or [] def set_conflicts(self, conflicts): """ @@ -1203,10 +1204,10 @@ class CouchDatabase(CommonBackend): :param doc: The document to be put. :type doc: CouchDocument """ - my_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) - doc.prune_conflicts( - vectorclock.VectorClockRev(doc.rev), self._replica_uid) - doc.add_conflict(my_doc) + my_doc = self._get_doc(doc.doc_id) + self._prune_conflicts(doc, vectorclock.VectorClockRev(doc.rev)) + doc.add_conflict(self._factory(doc.doc_id, my_doc.rev, my_doc.get_json())) + doc.has_conflicts = True self._put_doc(my_doc, doc) def resolve_doc(self, doc, conflicted_doc_revs): @@ -1320,30 +1321,14 @@ class CouchDatabase(CommonBackend): """ if not isinstance(doc, CouchDocument): doc = self._factory(doc.doc_id, doc.rev, doc.get_json()) - self._save_source_info(replica_uid, replica_gen, - replica_trans_id, number_of_docs, - doc_idx, sync_id) my_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) - if my_doc is not None: - my_doc.set_conflicts( - self.get_doc_conflicts(my_doc.doc_id, my_doc.couch_rev)) - state, save_doc = _process_incoming_doc( - my_doc, doc, save_conflict, self.replica_uid) - if save_doc: - self._put_doc(my_doc, save_doc) - doc.update(save_doc) - return state, self._get_generation() - - def _save_source_info(self, replica_uid, replica_gen, replica_trans_id, - number_of_docs, doc_idx, sync_id): - """ - Validate and save source information. - """ - self._validate_source(replica_uid, replica_gen, replica_trans_id) - self._set_replica_gen_and_trans_id( - replica_uid, replica_gen, replica_trans_id, - number_of_docs=number_of_docs, doc_idx=doc_idx, - sync_id=sync_id) + if my_doc: + doc.set_conflicts(my_doc.get_conflicts()) + return CommonBackend._put_doc_if_newer(self, doc, save_conflict, replica_uid, + replica_gen, replica_trans_id) + + def _put_and_update_indexes(self, cur_doc, doc): + self._put_doc(cur_doc, doc) def get_docs(self, doc_ids, check_for_conflicts=True, include_deleted=False): @@ -1494,53 +1479,3 @@ class CouchServerState(ServerState): delete databases. """ raise Unauthorized() - - -def _process_incoming_doc(my_doc, other_doc, save_conflict, replica_uid): - """ - Check document, save and return state. - """ - # at this point, `doc` has arrived from the other syncing party, and - # we will decide what to do with it. - # First, we prepare the arriving doc to update couch database. - new_doc = CouchDocument( - other_doc.doc_id, other_doc.rev, other_doc.get_json()) - if my_doc is None: - return 'inserted', new_doc - new_doc.couch_rev = my_doc.couch_rev - new_doc.set_conflicts(my_doc.get_conflicts()) - # fetch conflicts because we will eventually manipulate them - # from now on, it works just like u1db sqlite backend - doc_vcr = vectorclock.VectorClockRev(new_doc.rev) - cur_vcr = vectorclock.VectorClockRev(my_doc.rev) - if doc_vcr.is_newer(cur_vcr): - rev = new_doc.rev - new_doc.prune_conflicts(doc_vcr, replica_uid) - if new_doc.rev != rev: - # conflicts have been autoresolved - return 'superseded', new_doc - else: - return'inserted', new_doc - elif new_doc.rev == my_doc.rev: - # magical convergence - return 'converged', None - elif cur_vcr.is_newer(doc_vcr): - # Don't add this to seen_ids, because we have something newer, - # so we should send it back, and we should not generate a - # conflict - other_doc.update(new_doc) - return 'superseded', None - elif my_doc.same_content_as(new_doc): - # the documents have been edited to the same thing at both ends - doc_vcr.maximize(cur_vcr) - doc_vcr.increment(replica_uid) - new_doc.rev = doc_vcr.as_str() - return 'superseded', new_doc - else: - if save_conflict: - new_doc.prune_conflicts( - vectorclock.VectorClockRev(new_doc.rev), replica_uid) - new_doc.add_conflict(my_doc) - return 'conflicted', new_doc - other_doc.update(new_doc) - return 'conflicted', None -- cgit v1.2.3 From d39b408e92bb2fa6a2dac3835e2f5209d2eee94c Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 4 Sep 2015 19:37:32 -0300 Subject: [refactor] uses ThreadPool instead of own implementation Python has a native ThreadPool implementation that fits our needs. Changing it to use this instead and making some calls simpler. --- common/src/leap/soledad/common/couch.py | 99 ++++++--------------------------- 1 file changed, 18 insertions(+), 81 deletions(-) (limited to 'common/src/leap/soledad') diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py index 771db69e..421fbac1 100644 --- a/common/src/leap/soledad/common/couch.py +++ b/common/src/leap/soledad/common/couch.py @@ -26,13 +26,12 @@ import logging import binascii import time import sys -import threading from StringIO import StringIO -from collections import defaultdict from urlparse import urljoin from contextlib import contextmanager +from multiprocessing.pool import ThreadPool from couchdb.client import Server, Database @@ -362,57 +361,15 @@ def couch_server(url): yield server +THREAD_POOL = ThreadPool(20) + + class CouchDatabase(CommonBackend): """ A U1DB implementation that uses CouchDB as its persistence layer. """ - # We spawn threads to parallelize the CouchDatabase.get_docs() method - MAX_GET_DOCS_THREADS = 20 - - update_handler_lock = defaultdict(threading.Lock) - sync_info_lock = defaultdict(threading.Lock) - - class _GetDocThread(threading.Thread): - - """ - A thread that gets a document from a database. - - TODO: switch this for a twisted deferred to thread. This depends on - replacing python-couchdb for paisley in this module. - """ - - def __init__(self, db, doc_id, check_for_conflicts, - release_fun): - """ - :param db: The database from where to get the document. - :type db: CouchDatabase - :param doc_id: The doc_id of the document to be retrieved. - :type doc_id: str - :param check_for_conflicts: Whether the get_doc() method should - check for existing conflicts. - :type check_for_conflicts: bool - :param release_fun: A function that releases a semaphore, to be - called after the document is fetched. - :type release_fun: function - """ - threading.Thread.__init__(self) - self._db = db - self._doc_id = doc_id - self._check_for_conflicts = check_for_conflicts - self._release_fun = release_fun - self._doc = None - - def run(self): - """ - Fetch the document, store it as a property, and call the release - function. - """ - self._doc = self._db._get_doc( - self._doc_id, self._check_for_conflicts) - self._release_fun() - @classmethod def open_database(cls, url, create, replica_uid=None, ensure_ddocs=False): """ @@ -477,9 +434,6 @@ class CouchDatabase(CommonBackend): self._set_replica_uid(replica_uid) if ensure_ddocs: self.ensure_ddocs_on_db() - # initialize a thread pool for parallelizing get_docs() - self._sem_pool = threading.BoundedSemaphore( - value=self.MAX_GET_DOCS_THREADS) def ensure_ddocs_on_db(self): """ @@ -735,6 +689,9 @@ class CouchDatabase(CommonBackend): attachments=True)[2] except ResourceNotFound: return None + return self.__parse_doc_from_couch(result, doc_id, check_for_conflicts) + + def __parse_doc_from_couch(self, result, doc_id, check_for_conflicts=False): # restrict to u1db documents if 'u1db_rev' not in result: return None @@ -799,11 +756,8 @@ class CouchDatabase(CommonBackend): """ generation = self._get_generation() - results = [] - for row in self._database.view('_all_docs'): - doc = self.get_doc(row.id, include_deleted=include_deleted) - if doc is not None: - results.append(doc) + results = list(self.get_docs(self._database, + include_deleted=include_deleted)) return (generation, results) def _put_doc(self, old_doc, doc): @@ -1335,12 +1289,12 @@ class CouchDatabase(CommonBackend): """ Get the JSON content for many documents. - :param doc_ids: A list of document identifiers. + :param doc_ids: A list of document identifiers or None for all. :type doc_ids: list :param check_for_conflicts: If set to False, then the conflict check will be skipped, and 'None' will be returned instead of True/False. - :type check_for_conflictsa: bool + :type check_for_conflicts: bool :param include_deleted: If set to True, deleted documents will be returned with empty content. Otherwise deleted documents will not be included in the results. @@ -1348,31 +1302,14 @@ class CouchDatabase(CommonBackend): in matching doc_ids order. :rtype: iterable """ - # Workaround for: - # - # http://bugs.python.org/issue7980 - # https://leap.se/code/issues/5449 - # - # python-couchdb uses time.strptime, which is not thread safe. In - # order to avoid the problem described on the issues above, we preload - # strptime here by evaluating the conversion of an arbitrary date. - # This will not be needed when/if we switch from python-couchdb to - # paisley. - time.strptime('Mar 8 1917', '%b %d %Y') - # spawn threads to retrieve docs - threads = [] - for doc_id in doc_ids: - self._sem_pool.acquire() - t = self._GetDocThread(self, doc_id, check_for_conflicts, - self._sem_pool.release) - t.start() - threads.append(t) - # join threads and yield docs - for t in threads: - t.join() - if t._doc.is_tombstone() and not include_deleted: + get_one = lambda doc_id: self._get_doc(doc_id, check_for_conflicts) + docs = [THREAD_POOL.apply_async(get_one, [doc_id]) + for doc_id in doc_ids] + for doc in docs: + doc = doc.get() + if not doc or not include_deleted and doc.is_tombstone(): continue - yield t._doc + yield doc def _prune_conflicts(self, doc, doc_vcr): """ -- cgit v1.2.3 From f9faa007bc0d5d681f85aa246ea05ec5fe35ccc5 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sun, 6 Sep 2015 05:39:05 -0300 Subject: [feat] adds caching for other gen and trans id There are two functions in couch.py used to save and retrieve the last know gen and trans id for the syncing replica. The get function is called very often, but is only set on one point. Added a simple caching to avoid queying couch for a value that we already have. If cache is empty, it just query as usual and fills it. --- common/src/leap/soledad/common/couch.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) (limited to 'common/src/leap/soledad') diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py index 421fbac1..3b120cd1 100644 --- a/common/src/leap/soledad/common/couch.py +++ b/common/src/leap/soledad/common/couch.py @@ -434,6 +434,7 @@ class CouchDatabase(CommonBackend): self._set_replica_uid(replica_uid) if ensure_ddocs: self.ensure_ddocs_on_db() + self.cache = {} def ensure_ddocs_on_db(self): """ @@ -1047,6 +1048,8 @@ class CouchDatabase(CommonBackend): synchronized with the replica, this is (0, ''). :rtype: (int, str) """ + if other_replica_uid in self.cache: + return self.cache[other_replica_uid] # query a couch view result = self._database.view('syncs/log') if len(result[other_replica_uid].rows) == 0: @@ -1129,6 +1132,7 @@ class CouchDatabase(CommonBackend): design document for an yet unknown reason. """ + self.cache[other_replica_uid] = (other_generation, other_transaction_id) # query a couch update function ddoc_path = ['_design', 'syncs', '_update', 'put', 'u1db_sync_log'] res = self._database.resource(*ddoc_path) @@ -1362,7 +1366,7 @@ class CouchServerState(ServerState): Inteface of the WSGI server with the CouchDB backend. """ - def __init__(self, couch_url): + def __init__(self, couch_url, cache=None): """ Initialize the couch server state. @@ -1370,6 +1374,7 @@ class CouchServerState(ServerState): :type couch_url: str """ self.couch_url = couch_url + self.cache = cache or {} def open_database(self, dbname): """ @@ -1381,10 +1386,12 @@ class CouchServerState(ServerState): :return: The CouchDatabase object. :rtype: CouchDatabase """ - return CouchDatabase( + db = CouchDatabase( self.couch_url, dbname, ensure_ddocs=False) + db.cache = self.cache + return db def ensure_database(self, dbname): """ -- cgit v1.2.3 From 14300959a12e4908df7b00281d2590e627a47ba9 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 7 Sep 2015 03:56:02 -0300 Subject: [feat] full caching for each sync_id session The CouchDB backend implementation was accessing CouchDB too many times for the same values. Those values are known inside the same sync_id, which is the id of current sync session. This commit adds caching for all redundant calls to Couch inside the same sync_id for each replica. Refactoring is still needed, but for now couch.py works normally as if caching is not present, while sync.py injects the cache as a attribute to enable it. This needs a simpler implementation. --- common/src/leap/soledad/common/couch.py | 27 +++++++++++++++++++++++---- 1 file changed, 23 insertions(+), 4 deletions(-) (limited to 'common/src/leap/soledad') diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py index 3b120cd1..8c152353 100644 --- a/common/src/leap/soledad/common/couch.py +++ b/common/src/leap/soledad/common/couch.py @@ -434,7 +434,14 @@ class CouchDatabase(CommonBackend): self._set_replica_uid(replica_uid) if ensure_ddocs: self.ensure_ddocs_on_db() - self.cache = {} + self._cache = None + + @property + def cache(self): + if self._cache is not None: + return self._cache + else: + return {} def ensure_ddocs_on_db(self): """ @@ -513,7 +520,10 @@ class CouchDatabase(CommonBackend): :rtype: str """ if self._real_replica_uid is not None: + self.cache[self._url] = {'replica_uid': self._real_replica_uid} return self._real_replica_uid + if self._url in self.cache: + return self.cache[self._url]['replica_uid'] try: # grab replica_uid from server doc = self._database['u1db_config'] @@ -551,10 +561,13 @@ class CouchDatabase(CommonBackend): unknown reason. """ # query a couch list function + if self.replica_uid + '_gen' in self.cache: + return self.cache[self.replica_uid + '_gen']['generation'] ddoc_path = ['_design', 'transactions', '_list', 'generation', 'log'] res = self._database.resource(*ddoc_path) try: response = res.get_json() + self.cache[self.replica_uid + '_gen'] = response[2] return response[2]['generation'] except ResourceNotFound as e: raise_missing_design_doc_error(e, ddoc_path) @@ -582,11 +595,15 @@ class CouchDatabase(CommonBackend): design document for an yet unknown reason. """ + if self.replica_uid + '_gen' in self.cache: + response = self.cache[self.replica_uid + '_gen'] + return (response['generation'], response['transaction_id']) # query a couch list function ddoc_path = ['_design', 'transactions', '_list', 'generation', 'log'] res = self._database.resource(*ddoc_path) try: response = res.get_json() + self.cache[self.replica_uid + '_gen'] = response[2] return (response[2]['generation'], response[2]['transaction_id']) except ResourceNotFound as e: raise_missing_design_doc_error(e, ddoc_path) @@ -846,6 +863,10 @@ class CouchDatabase(CommonBackend): doc.doc_id, body=buf.getvalue(), headers=envelope.headers) except ResourceConflict: raise RevisionConflict() + if self.replica_uid + '_gen' in self.cache: + gen_info = self.cache[self.replica_uid + '_gen'] + gen_info['generation'] += 1 + gen_info['transaction_id'] = transactions[-1][1] def put_doc(self, doc): """ @@ -1366,7 +1387,7 @@ class CouchServerState(ServerState): Inteface of the WSGI server with the CouchDB backend. """ - def __init__(self, couch_url, cache=None): + def __init__(self, couch_url): """ Initialize the couch server state. @@ -1374,7 +1395,6 @@ class CouchServerState(ServerState): :type couch_url: str """ self.couch_url = couch_url - self.cache = cache or {} def open_database(self, dbname): """ @@ -1390,7 +1410,6 @@ class CouchServerState(ServerState): self.couch_url, dbname, ensure_ddocs=False) - db.cache = self.cache return db def ensure_database(self, dbname): -- cgit v1.2.3 From fc5f4ae965ca48946af9f6982b2719562168131c Mon Sep 17 00:00:00 2001 From: drebs Date: Fri, 28 Aug 2015 17:25:41 -0300 Subject: [feat] use one doc per remote replica in sync log Before this change, we used a complicated update handler for storing the sync state on the couchdb backend. That update handler was implemented as an attempt to make couchdb take care of some validation for the update of the sync log during the sync exchange, mainly to allow concurrent received documents insertion during a sync. Right now we rely on the remote sending one document at a time and do not support concurrent insertions in the remote database backed by couch. Because of that, the code removed by this commit was unneeded. And more: it was a bottleneck of the sync process because we were writing to an unique file and using unnecessary couch design docs processing for that. So this commit both simplifies the storage of remote sync and removes a bottleneck of the sync process. Conflicts: common/src/leap/soledad/common/couch.py common/src/leap/soledad/common/tests/test_couch.py --- common/src/leap/soledad/common/couch.py | 63 +++------ .../leap/soledad/common/ddocs/syncs/updates/put.js | 151 --------------------- .../soledad/common/ddocs/syncs/views/log/map.js | 12 -- common/src/leap/soledad/common/tests/test_couch.py | 8 -- .../tests/test_couch_operations_atomicity.py | 53 +++----- 5 files changed, 42 insertions(+), 245 deletions(-) delete mode 100644 common/src/leap/soledad/common/ddocs/syncs/updates/put.js delete mode 100644 common/src/leap/soledad/common/ddocs/syncs/views/log/map.js (limited to 'common/src/leap/soledad') diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py index 8c152353..83f542ab 100644 --- a/common/src/leap/soledad/common/couch.py +++ b/common/src/leap/soledad/common/couch.py @@ -1071,14 +1071,20 @@ class CouchDatabase(CommonBackend): """ if other_replica_uid in self.cache: return self.cache[other_replica_uid] - # query a couch view - result = self._database.view('syncs/log') - if len(result[other_replica_uid].rows) == 0: - return (0, '') - return ( - result[other_replica_uid].rows[0]['value']['known_generation'], - result[other_replica_uid].rows[0]['value']['known_transaction_id'] - ) + + doc_id = 'u1db_sync_%s' % other_replica_uid + try: + doc = self._database[doc_id] + except ResourceNotFound: + doc = { + '_id': doc_id, + 'generation': 0, + 'transaction_id': '', + } + self._database.save(doc) + result = doc['generation'], doc['transaction_id'] + self.cache[other_replica_uid] = result + return result def _set_replica_gen_and_trans_id(self, other_replica_uid, other_generation, other_transaction_id, @@ -1138,43 +1144,16 @@ class CouchDatabase(CommonBackend): :type doc_idx: int :param sync_id: The id of the current sync session. :type sync_id: str - - :raise MissingDesignDocError: Raised when tried to access a missing - design document. - :raise MissingDesignDocListFunctionError: Raised when trying to access - a missing list function on a - design document. - :raise MissingDesignDocNamedViewError: Raised when trying to access a - missing named view on a design - document. - :raise MissingDesignDocDeletedError: Raised when trying to access a - deleted design document. - :raise MissingDesignDocUnknownError: Raised when failed to access a - design document for an yet - unknown reason. """ self.cache[other_replica_uid] = (other_generation, other_transaction_id) - # query a couch update function - ddoc_path = ['_design', 'syncs', '_update', 'put', 'u1db_sync_log'] - res = self._database.resource(*ddoc_path) + doc_id = 'u1db_sync_%s' % other_replica_uid try: - with CouchDatabase.update_handler_lock[self._get_replica_uid()]: - body = { - 'other_replica_uid': other_replica_uid, - 'other_generation': other_generation, - 'other_transaction_id': other_transaction_id, - } - if number_of_docs is not None: - body['number_of_docs'] = number_of_docs - if doc_idx is not None: - body['doc_idx'] = doc_idx - if sync_id is not None: - body['sync_id'] = sync_id - res.put_json( - body=body, - headers={'content-type': 'application/json'}) - except ResourceNotFound as e: - raise_missing_design_doc_error(e, ddoc_path) + doc = self._database[doc_id] + except ResourceNotFound: + doc = {'_id': doc_id} + doc['generation'] = other_generation + doc['transaction_id'] = other_transaction_id + self._database.save(doc) def _force_doc_sync_conflict(self, doc): """ diff --git a/common/src/leap/soledad/common/ddocs/syncs/updates/put.js b/common/src/leap/soledad/common/ddocs/syncs/updates/put.js deleted file mode 100644 index b0ae2de6..00000000 --- a/common/src/leap/soledad/common/ddocs/syncs/updates/put.js +++ /dev/null @@ -1,151 +0,0 @@ -/** - * The u1db_sync_log document stores both the actual sync log and a list of - * pending updates to the log, in case we receive incoming documents out of - * the correct order (i.e. if there are parallel PUTs during the sync - * process). - * - * The structure of the document is the following: - * - * { - * 'syncs': [ - * ['', , ''], - * ... - * ], - * 'pending': { - * 'other_replica_uid': { - * 'sync_id': '', - * 'log': [[, ''], ...] - * }, - * ... - * } - * } - * - * The update function below does the following: - * - * 0. If we do not receive a sync_id, we just update the 'syncs' list with - * the incoming info about the source replica state. - * - * 1. Otherwise, if the incoming sync_id differs from current stored - * sync_id, then we assume that the previous sync session for that source - * replica was interrupted and discard all pending data. - * - * 2. Then we append incoming info as pending data for that source replica - * and current sync_id, and sort the pending data by generation. - * - * 3. Then we go through pending data and find the most recent generation - * that we can use to update the actual sync log. - * - * 4. Finally, we insert the most up to date information into the sync log. - */ -function(doc, req){ - - // create the document if it doesn't exist - if (!doc) { - doc = {} - doc['_id'] = 'u1db_sync_log'; - doc['syncs'] = []; - } - - // get and validate incoming info - var body = JSON.parse(req.body); - var other_replica_uid = body['other_replica_uid']; - var other_generation = parseInt(body['other_generation']); - var other_transaction_id = body['other_transaction_id'] - var sync_id = body['sync_id']; - var number_of_docs = body['number_of_docs']; - var doc_idx = body['doc_idx']; - - // parse integers - if (number_of_docs != null) - number_of_docs = parseInt(number_of_docs); - if (doc_idx != null) - doc_idx = parseInt(doc_idx); - - if (other_replica_uid == null - || other_generation == null - || other_transaction_id == null) - return [null, 'invalid data']; - - // create slot for pending logs - if (doc['pending'] == null) - doc['pending'] = {}; - - // these are the values that will be actually inserted - var current_gen = other_generation; - var current_trans_id = other_transaction_id; - - /*------------- Wait for sequential values before storing -------------*/ - - // we just try to obtain pending log if we received a sync_id - if (sync_id != null) { - - // create slot for current source and sync_id pending log - if (doc['pending'][other_replica_uid] == null - || doc['pending'][other_replica_uid]['sync_id'] != sync_id) { - doc['pending'][other_replica_uid] = { - 'sync_id': sync_id, - 'log': [], - 'last_doc_idx': 0, - } - } - - // append incoming data to pending log - doc['pending'][other_replica_uid]['log'].push([ - other_generation, - other_transaction_id, - doc_idx, - ]) - - // sort pending log according to generation - doc['pending'][other_replica_uid]['log'].sort(function(a, b) { - return a[0] - b[0]; - }); - - // get most up-to-date information from pending log - var last_doc_idx = doc['pending'][other_replica_uid]['last_doc_idx']; - var pending_idx = doc['pending'][other_replica_uid]['log'][0][2]; - - current_gen = null; - current_trans_id = null; - - while (last_doc_idx + 1 == pending_idx) { - pending = doc['pending'][other_replica_uid]['log'].shift() - current_gen = pending[0]; - current_trans_id = pending[1]; - last_doc_idx = pending[2] - if (doc['pending'][other_replica_uid]['log'].length == 0) - break; - pending_idx = doc['pending'][other_replica_uid]['log'][0][2]; - } - - // leave the sync log untouched if we still did not receive enough docs - if (current_gen == null) - return [doc, 'ok']; - - // update last index of received doc - doc['pending'][other_replica_uid]['last_doc_idx'] = last_doc_idx; - - // eventually remove all pending data from that replica - if (last_doc_idx == number_of_docs) - delete doc['pending'][other_replica_uid] - } - - /*--------------- Store source replica info on sync log ---------------*/ - - // remove outdated info - doc['syncs'] = doc['syncs'].filter( - function (entry) { - return entry[0] != other_replica_uid; - } - ); - - // store in log - doc['syncs'].push([ - other_replica_uid, - current_gen, - current_trans_id - ]); - - return [doc, 'ok']; -} - diff --git a/common/src/leap/soledad/common/ddocs/syncs/views/log/map.js b/common/src/leap/soledad/common/ddocs/syncs/views/log/map.js deleted file mode 100644 index a63c7cf4..00000000 --- a/common/src/leap/soledad/common/ddocs/syncs/views/log/map.js +++ /dev/null @@ -1,12 +0,0 @@ -function(doc) { - if (doc._id == 'u1db_sync_log') { - if (doc.syncs) - doc.syncs.forEach(function (entry) { - emit(entry[0], - { - 'known_generation': entry[1], - 'known_transaction_id': entry[2] - }); - }); - } -} diff --git a/common/src/leap/soledad/common/tests/test_couch.py b/common/src/leap/soledad/common/tests/test_couch.py index a08ffd16..c8d13667 100644 --- a/common/src/leap/soledad/common/tests/test_couch.py +++ b/common/src/leap/soledad/common/tests/test_couch.py @@ -1359,10 +1359,6 @@ class CouchDatabaseExceptionsTests(CouchDBTestCase): self.assertRaises( errors.MissingDesignDocError, self.db.whats_changed) - # _do_set_replica_gen_and_trans_id() - self.assertRaises( - errors.MissingDesignDocError, - self.db._do_set_replica_gen_and_trans_id, 1, 2, 3) def test_missing_design_doc_functions_raises(self): """ @@ -1489,10 +1485,6 @@ class CouchDatabaseExceptionsTests(CouchDBTestCase): self.assertRaises( errors.MissingDesignDocDeletedError, self.db.whats_changed) - # _do_set_replica_gen_and_trans_id() - self.assertRaises( - errors.MissingDesignDocDeletedError, - self.db._do_set_replica_gen_and_trans_id, 1, 2, 3) def test_ensure_ddoc_independently(self): """ diff --git a/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py b/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py index 25f709ca..3e8e8cce 100644 --- a/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py +++ b/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py @@ -163,6 +163,7 @@ class CouchAtomicityTestCase(CouchDBTestCase, TestCaseWithServer): 2, len(filter(lambda t: t[0] == doc_id, transaction_log))) + @defer.inlineCallbacks def test_correct_sync_log_after_sequential_syncs(self): """ Assert that the sync_log increases accordingly with sequential syncs. @@ -170,21 +171,21 @@ class CouchAtomicityTestCase(CouchDBTestCase, TestCaseWithServer): sol = self._soledad_instance( auth_token='auth-token', server_url=self.getURL()) + source_replica_uid = sol._dbpool.replica_uid - def _create_docs(results): + def _create_docs(): deferreds = [] for i in xrange(0, REPEAT_TIMES): deferreds.append(sol.create_doc({})) - return defer.DeferredList(deferreds) + return defer.gatherResults(deferreds) def _assert_transaction_and_sync_logs(results, sync_idx): # assert sizes of transaction and sync logs self.assertEqual( sync_idx * REPEAT_TIMES, len(self.db._get_transaction_log())) - self.assertEqual( - 1 if sync_idx > 0 else 0, - len(self.db._database.view('syncs/log').rows)) + gen, _ = self.db._get_replica_gen_and_trans_id(source_replica_uid) + self.assertEqual(sync_idx * REPEAT_TIMES, gen) def _assert_sync(results, sync_idx): gen, docs = results @@ -193,40 +194,28 @@ class CouchAtomicityTestCase(CouchDBTestCase, TestCaseWithServer): # assert sizes of transaction and sync logs self.assertEqual((sync_idx + 1) * REPEAT_TIMES, len(self.db._get_transaction_log())) - sync_log_rows = self.db._database.view('syncs/log').rows - sync_log = sync_log_rows[0].value - replica_uid = sync_log_rows[0].key - known_gen = sync_log['known_generation'] - known_trans_id = sync_log['known_transaction_id'] - # assert sync_log has exactly 1 row - self.assertEqual(1, len(sync_log_rows)) - # assert it has the correct replica_uid, gen and trans_id - self.assertEqual(sol._dbpool.replica_uid, replica_uid) + target_known_gen, target_known_trans_id = \ + self.db._get_replica_gen_and_trans_id(source_replica_uid) + # assert it has the correct gen and trans_id conn_key = sol._dbpool._u1dbconnections.keys().pop() conn = sol._dbpool._u1dbconnections[conn_key] sol_gen, sol_trans_id = conn._get_generation_info() - self.assertEqual(sol_gen, known_gen) - self.assertEqual(sol_trans_id, known_trans_id) - - # create some documents - d = _create_docs(None) + self.assertEqual(sol_gen, target_known_gen) + self.assertEqual(sol_trans_id, target_known_trans_id) # sync first time and assert success - d.addCallback(_assert_transaction_and_sync_logs, 0) - d.addCallback(lambda _: sol.sync()) - d.addCallback(lambda _: sol.get_all_docs()) - d.addCallback(_assert_sync, 0) + results = yield _create_docs() + _assert_transaction_and_sync_logs(results, 0) + yield sol.sync() + results = yield sol.get_all_docs() + _assert_sync(results, 0) # create more docs, sync second time and assert success - d.addCallback(_create_docs) - d.addCallback(_assert_transaction_and_sync_logs, 1) - d.addCallback(lambda _: sol.sync()) - d.addCallback(lambda _: sol.get_all_docs()) - d.addCallback(_assert_sync, 1) - - d.addCallback(lambda _: sol.close()) - - return d + results = yield _create_docs() + _assert_transaction_and_sync_logs(results, 1) + yield sol.sync() + results = yield sol.get_all_docs() + _assert_sync(results, 1) # # Concurrency tests -- cgit v1.2.3 From 0e954f3328b7b8c31c88e0bee796230e87bca829 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sun, 20 Sep 2015 17:47:07 -0300 Subject: [feat] adds cache expiration Now each backend object will be retrieved from cache for sync.py and values will live for 3600 by default. That is changed via parameter if needed. --- common/src/leap/soledad/common/couch.py | 2 ++ 1 file changed, 2 insertions(+) (limited to 'common/src/leap/soledad') diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py index 83f542ab..809af05d 100644 --- a/common/src/leap/soledad/common/couch.py +++ b/common/src/leap/soledad/common/couch.py @@ -527,6 +527,7 @@ class CouchDatabase(CommonBackend): try: # grab replica_uid from server doc = self._database['u1db_config'] + self.cache[self._url] = doc self._real_replica_uid = doc['replica_uid'] return self._real_replica_uid except ResourceNotFound: @@ -1154,6 +1155,7 @@ class CouchDatabase(CommonBackend): doc['generation'] = other_generation doc['transaction_id'] = other_transaction_id self._database.save(doc) + self.cache[other_replica_uid] = (other_generation, other_transaction_id) def _force_doc_sync_conflict(self, doc): """ -- cgit v1.2.3 From cb7aa314ad4d47e9f32e9e111ec13976978ed02d Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sun, 20 Sep 2015 19:28:22 -0300 Subject: [feat] enable delayed commits We use CouchDB with single doc read/write. Following this documentation about performance, we should get more performance by enabling couch to delay and commit later. See: http://guide.couchdb.org/draft/performance.html#single --- common/src/leap/soledad/common/couch.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'common/src/leap/soledad') diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py index 809af05d..014b3f38 100644 --- a/common/src/leap/soledad/common/couch.py +++ b/common/src/leap/soledad/common/couch.py @@ -357,7 +357,7 @@ def couch_server(url): :type url: str """ session = Session(timeout=COUCH_TIMEOUT) - server = Server(url=url, session=session) + server = Server(url=url, full_commit=False, session=session) yield server -- cgit v1.2.3 From df8c794395ee695ea1dfb30603a073b32b24ee58 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 21 Sep 2015 17:19:18 -0300 Subject: [refactor] init_caching instead of setting attr As meskio found commented, setting this attribute directly is ugly, CouchDatabase now has a init_caching method for setting up cache instance. --- common/src/leap/soledad/common/couch.py | 9 +++++++++ 1 file changed, 9 insertions(+) (limited to 'common/src/leap/soledad') diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py index 014b3f38..00a17715 100644 --- a/common/src/leap/soledad/common/couch.py +++ b/common/src/leap/soledad/common/couch.py @@ -443,6 +443,15 @@ class CouchDatabase(CommonBackend): else: return {} + def init_caching(self, cache): + """ + Start using cache by setting internal _cache attribute. + + :param cache: the cache instance, anything that behaves like a dict + :type cache: dict + """ + self._cache = cache + def ensure_ddocs_on_db(self): """ Ensure that the design documents used by the backend exist on the -- cgit v1.2.3 From 5b20b469f22ab99533135beefd49129db49064e5 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 23 Sep 2015 15:43:06 -0300 Subject: [style] pep8 --- common/src/leap/soledad/common/couch.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) (limited to 'common/src/leap/soledad') diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py index 00a17715..38041c09 100644 --- a/common/src/leap/soledad/common/couch.py +++ b/common/src/leap/soledad/common/couch.py @@ -719,7 +719,8 @@ class CouchDatabase(CommonBackend): return None return self.__parse_doc_from_couch(result, doc_id, check_for_conflicts) - def __parse_doc_from_couch(self, result, doc_id, check_for_conflicts=False): + def __parse_doc_from_couch(self, result, doc_id, + check_for_conflicts=False): # restrict to u1db documents if 'u1db_rev' not in result: return None @@ -1155,7 +1156,8 @@ class CouchDatabase(CommonBackend): :param sync_id: The id of the current sync session. :type sync_id: str """ - self.cache[other_replica_uid] = (other_generation, other_transaction_id) + self.cache[other_replica_uid] = (other_generation, + other_transaction_id) doc_id = 'u1db_sync_%s' % other_replica_uid try: doc = self._database[doc_id] @@ -1164,7 +1166,6 @@ class CouchDatabase(CommonBackend): doc['generation'] = other_generation doc['transaction_id'] = other_transaction_id self._database.save(doc) - self.cache[other_replica_uid] = (other_generation, other_transaction_id) def _force_doc_sync_conflict(self, doc): """ @@ -1175,7 +1176,8 @@ class CouchDatabase(CommonBackend): """ my_doc = self._get_doc(doc.doc_id) self._prune_conflicts(doc, vectorclock.VectorClockRev(doc.rev)) - doc.add_conflict(self._factory(doc.doc_id, my_doc.rev, my_doc.get_json())) + doc.add_conflict(self._factory(doc.doc_id, my_doc.rev, + my_doc.get_json())) doc.has_conflicts = True self._put_doc(my_doc, doc) @@ -1293,8 +1295,9 @@ class CouchDatabase(CommonBackend): my_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) if my_doc: doc.set_conflicts(my_doc.get_conflicts()) - return CommonBackend._put_doc_if_newer(self, doc, save_conflict, replica_uid, - replica_gen, replica_trans_id) + return CommonBackend._put_doc_if_newer(self, doc, save_conflict, + replica_uid, replica_gen, + replica_trans_id) def _put_and_update_indexes(self, cur_doc, doc): self._put_doc(cur_doc, doc) -- cgit v1.2.3