diff options
| -rw-r--r-- | common/src/leap/soledad/common/couch/__init__.py | 121 | 
1 files changed, 65 insertions, 56 deletions
diff --git a/common/src/leap/soledad/common/couch/__init__.py b/common/src/leap/soledad/common/couch/__init__.py index 21ffd036..6cad2b19 100644 --- a/common/src/leap/soledad/common/couch/__init__.py +++ b/common/src/leap/soledad/common/couch/__init__.py @@ -103,6 +103,10 @@ def couch_server(url):  THREAD_POOL = ThreadPool(20) +def _get_gen_doc_id(gen): +    return 'gen-%s' % str(gen).zfill(10) + +  class CouchDatabase(object):      """      Holds CouchDB related code. @@ -213,7 +217,7 @@ class CouchDatabase(object):          Ensure that the design documents used by the backend exist on the          couch database.          """ -        for ddoc_name in ['docs', 'syncs', 'transactions']: +        for ddoc_name in ['docs', 'syncs']:              try:                  self.json_from_resource(['_design'] +                                          ddoc_name.split('/') + ['_info'], @@ -437,8 +441,6 @@ class CouchDatabase(object):                          result['_attachments']['u1db_conflicts']['data']))))          # store couch revision          doc.couch_rev = result['_rev'] -        # store transactions -        doc.transactions = result['u1db_transactions']          return doc      def _build_conflicts(self, doc_id, attached_conflicts): @@ -474,14 +476,11 @@ class CouchDatabase(object):          """          if generation == 0:              return '' -        # query a couch list function -        ddoc_path = [ -            '_design', 'transactions', '_list', 'trans_id_for_gen', 'log' -        ] -        response = self.json_from_resource(ddoc_path, gen=generation) -        if response == {}: +        log = self._get_transaction_log(start=generation, end=generation) +        if not log:              raise InvalidGeneration -        return response['transaction_id'] +        _, _, trans_id = log[0] +        return trans_id      def get_replica_gen_and_trans_id(self, other_replica_uid):          """ @@ -512,8 +511,8 @@ class CouchDatabase(object):                  'transaction_id': '',              }              self._database.save(doc) -        result = doc['generation'], doc['transaction_id'] -        return result +        gen, trans_id = doc['generation'], doc['transaction_id'] +        return gen, trans_id      def get_doc_conflicts(self, doc_id, couch_rev=None):          """ @@ -581,12 +580,32 @@ class CouchDatabase(object):          :return: The complete transaction log.          :rtype: [(str, str)]          """ -        # query a couch view -        ddoc_path = ['_design', 'transactions', '_view', 'log'] -        response = self.json_from_resource(ddoc_path) -        return map( -            lambda row: (row['id'], row['value']), -            response['rows']) +        log = self._get_transaction_log() +        return map(lambda i: (i[1], i[2]), log) + +    def _get_gen_docs( +            self, start=0, end=9999999999, descending=None, limit=None): +        params = {} +        if descending: +            params['descending'] = 'true' +            # honor couch way of traversing the view tree in reverse order +            start, end = end, start +        params['startkey'] = _get_gen_doc_id(start) +        params['endkey'] = _get_gen_doc_id(end) +        params['include_docs'] = 'true' +        if limit: +            params['limit'] = limit +        view = self._database.view("_all_docs", **params) +        return view.rows + +    def _get_transaction_log(self, start=0, end=9999999999): +        # get current gen and trans_id +        rows = self._get_gen_docs(start=start, end=end) +        log = [] +        for row in rows: +            doc = row['doc'] +            log.append((doc['gen'], doc['doc_id'], doc['trans_id'])) +        return log      def whats_changed(self, old_generation=0):          """ @@ -605,32 +624,18 @@ class CouchDatabase(object):                   changes first)          :rtype: (int, str, [(str, int, str)])          """ -        # query a couch list function -        ddoc_path = [ -            '_design', 'transactions', '_list', 'whats_changed', 'log' -        ] -        response = self.json_from_resource(ddoc_path, old_gen=old_generation) -        results = map( -            lambda row: -                (row['generation'], row['doc_id'], row['transaction_id']), -            response['transactions']) -        results.reverse() -        cur_gen = old_generation -        seen = set()          changes = [] -        newest_trans_id = '' -        for generation, doc_id, trans_id in results: +        cur_generation, last_trans_id = self.get_generation_info() +        relevant_tail = self._get_transaction_log(start=old_generation + 1) +        seen = set() +        generation = cur_generation +        for _, doc_id, trans_id in reversed(relevant_tail):              if doc_id not in seen:                  changes.append((doc_id, generation, trans_id))                  seen.add(doc_id) -        if changes: -            cur_gen = changes[0][1]  # max generation -            newest_trans_id = changes[0][2] -            changes.reverse() -        else: -            cur_gen, newest_trans_id = self.get_generation_info() - -        return cur_gen, newest_trans_id, changes +            generation -= 1 +        changes.reverse() +        return (cur_generation, last_trans_id, changes)      def get_generation_info(self):          """ @@ -641,10 +646,11 @@ class CouchDatabase(object):          """          if self.batching and self.batch_generation:              return self.batch_generation -        # query a couch list function -        ddoc_path = ['_design', 'transactions', '_list', 'generation', 'log'] -        info = self.json_from_resource(ddoc_path) -        return (info['generation'], info['transaction_id']) +        rows = self._get_gen_docs(descending=True, limit=1) +        if not rows: +            return 0, '' +        gen_doc = rows.pop()['doc'] +        return gen_doc['gen'], gen_doc['trans_id']      def json_from_resource(self, ddoc_path, check_missing_ddoc=True,                             **kwargs): @@ -740,21 +746,25 @@ class CouchDatabase(object):                  'length': len(conflicts),              }              parts.append(conflicts) -        # store old transactions, if any -        transactions = old_doc.transactions[:] if old_doc is not None else [] -        # create a new transaction id and timestamp it so the transaction log -        # is consistent when querying the database. -        transactions.append( -            # here we store milliseconds to keep consistent with javascript -            # Date.prototype.getTime() which was used before inside a couchdb -            # update handler. -            (int(time.time() * 1000), -             transaction_id)) +        # add the gen document +        while True:  # TODO: add a lock, remove this while +            try: +                gen, _ = self.get_generation_info() +                new_gen = gen + 1 +                gen_doc = { +                    '_id': _get_gen_doc_id(new_gen), +                    'gen': new_gen, +                    'doc_id': doc.doc_id, +                    'trans_id': transaction_id, +                } +                self._database.save(gen_doc) +                break +            except ResourceConflict: +                pass          # build the couch document          couch_doc = {              '_id': doc.doc_id,              'u1db_rev': doc.rev, -            'u1db_transactions': transactions,              '_attachments': attachments,          }          # if we are updating a doc we have to add the couch doc revision @@ -786,7 +796,6 @@ class CouchDatabase(object):              self.batch_docs[doc.doc_id] = couch_doc              last_gen, last_trans_id = self.batch_generation              self.batch_generation = (last_gen + 1, transaction_id) -        return transactions[-1][1]      def _new_resource(self, *path):          """  | 
