diff options
-rw-r--r-- | common/src/leap/soledad/common/couch/__init__.py | 121 |
1 files changed, 65 insertions, 56 deletions
diff --git a/common/src/leap/soledad/common/couch/__init__.py b/common/src/leap/soledad/common/couch/__init__.py index 21ffd036..6cad2b19 100644 --- a/common/src/leap/soledad/common/couch/__init__.py +++ b/common/src/leap/soledad/common/couch/__init__.py @@ -103,6 +103,10 @@ def couch_server(url): THREAD_POOL = ThreadPool(20) +def _get_gen_doc_id(gen): + return 'gen-%s' % str(gen).zfill(10) + + class CouchDatabase(object): """ Holds CouchDB related code. @@ -213,7 +217,7 @@ class CouchDatabase(object): Ensure that the design documents used by the backend exist on the couch database. """ - for ddoc_name in ['docs', 'syncs', 'transactions']: + for ddoc_name in ['docs', 'syncs']: try: self.json_from_resource(['_design'] + ddoc_name.split('/') + ['_info'], @@ -437,8 +441,6 @@ class CouchDatabase(object): result['_attachments']['u1db_conflicts']['data'])))) # store couch revision doc.couch_rev = result['_rev'] - # store transactions - doc.transactions = result['u1db_transactions'] return doc def _build_conflicts(self, doc_id, attached_conflicts): @@ -474,14 +476,11 @@ class CouchDatabase(object): """ if generation == 0: return '' - # query a couch list function - ddoc_path = [ - '_design', 'transactions', '_list', 'trans_id_for_gen', 'log' - ] - response = self.json_from_resource(ddoc_path, gen=generation) - if response == {}: + log = self._get_transaction_log(start=generation, end=generation) + if not log: raise InvalidGeneration - return response['transaction_id'] + _, _, trans_id = log[0] + return trans_id def get_replica_gen_and_trans_id(self, other_replica_uid): """ @@ -512,8 +511,8 @@ class CouchDatabase(object): 'transaction_id': '', } self._database.save(doc) - result = doc['generation'], doc['transaction_id'] - return result + gen, trans_id = doc['generation'], doc['transaction_id'] + return gen, trans_id def get_doc_conflicts(self, doc_id, couch_rev=None): """ @@ -581,12 +580,32 @@ class CouchDatabase(object): :return: The complete transaction log. :rtype: [(str, str)] """ - # query a couch view - ddoc_path = ['_design', 'transactions', '_view', 'log'] - response = self.json_from_resource(ddoc_path) - return map( - lambda row: (row['id'], row['value']), - response['rows']) + log = self._get_transaction_log() + return map(lambda i: (i[1], i[2]), log) + + def _get_gen_docs( + self, start=0, end=9999999999, descending=None, limit=None): + params = {} + if descending: + params['descending'] = 'true' + # honor couch way of traversing the view tree in reverse order + start, end = end, start + params['startkey'] = _get_gen_doc_id(start) + params['endkey'] = _get_gen_doc_id(end) + params['include_docs'] = 'true' + if limit: + params['limit'] = limit + view = self._database.view("_all_docs", **params) + return view.rows + + def _get_transaction_log(self, start=0, end=9999999999): + # get current gen and trans_id + rows = self._get_gen_docs(start=start, end=end) + log = [] + for row in rows: + doc = row['doc'] + log.append((doc['gen'], doc['doc_id'], doc['trans_id'])) + return log def whats_changed(self, old_generation=0): """ @@ -605,32 +624,18 @@ class CouchDatabase(object): changes first) :rtype: (int, str, [(str, int, str)]) """ - # query a couch list function - ddoc_path = [ - '_design', 'transactions', '_list', 'whats_changed', 'log' - ] - response = self.json_from_resource(ddoc_path, old_gen=old_generation) - results = map( - lambda row: - (row['generation'], row['doc_id'], row['transaction_id']), - response['transactions']) - results.reverse() - cur_gen = old_generation - seen = set() changes = [] - newest_trans_id = '' - for generation, doc_id, trans_id in results: + cur_generation, last_trans_id = self.get_generation_info() + relevant_tail = self._get_transaction_log(start=old_generation + 1) + seen = set() + generation = cur_generation + for _, doc_id, trans_id in reversed(relevant_tail): if doc_id not in seen: changes.append((doc_id, generation, trans_id)) seen.add(doc_id) - if changes: - cur_gen = changes[0][1] # max generation - newest_trans_id = changes[0][2] - changes.reverse() - else: - cur_gen, newest_trans_id = self.get_generation_info() - - return cur_gen, newest_trans_id, changes + generation -= 1 + changes.reverse() + return (cur_generation, last_trans_id, changes) def get_generation_info(self): """ @@ -641,10 +646,11 @@ class CouchDatabase(object): """ if self.batching and self.batch_generation: return self.batch_generation - # query a couch list function - ddoc_path = ['_design', 'transactions', '_list', 'generation', 'log'] - info = self.json_from_resource(ddoc_path) - return (info['generation'], info['transaction_id']) + rows = self._get_gen_docs(descending=True, limit=1) + if not rows: + return 0, '' + gen_doc = rows.pop()['doc'] + return gen_doc['gen'], gen_doc['trans_id'] def json_from_resource(self, ddoc_path, check_missing_ddoc=True, **kwargs): @@ -740,21 +746,25 @@ class CouchDatabase(object): 'length': len(conflicts), } parts.append(conflicts) - # store old transactions, if any - transactions = old_doc.transactions[:] if old_doc is not None else [] - # create a new transaction id and timestamp it so the transaction log - # is consistent when querying the database. - transactions.append( - # here we store milliseconds to keep consistent with javascript - # Date.prototype.getTime() which was used before inside a couchdb - # update handler. - (int(time.time() * 1000), - transaction_id)) + # add the gen document + while True: # TODO: add a lock, remove this while + try: + gen, _ = self.get_generation_info() + new_gen = gen + 1 + gen_doc = { + '_id': _get_gen_doc_id(new_gen), + 'gen': new_gen, + 'doc_id': doc.doc_id, + 'trans_id': transaction_id, + } + self._database.save(gen_doc) + break + except ResourceConflict: + pass # build the couch document couch_doc = { '_id': doc.doc_id, 'u1db_rev': doc.rev, - 'u1db_transactions': transactions, '_attachments': attachments, } # if we are updating a doc we have to add the couch doc revision @@ -786,7 +796,6 @@ class CouchDatabase(object): self.batch_docs[doc.doc_id] = couch_doc last_gen, last_trans_id = self.batch_generation self.batch_generation = (last_gen + 1, transaction_id) - return transactions[-1][1] def _new_resource(self, *path): """ |