diff options
| -rw-r--r-- | scripts/migrate_dbs.py | 288 | 
1 files changed, 0 insertions, 288 deletions
| diff --git a/scripts/migrate_dbs.py b/scripts/migrate_dbs.py deleted file mode 100644 index f1c20d87..00000000 --- a/scripts/migrate_dbs.py +++ /dev/null @@ -1,288 +0,0 @@ -#!/usr/bin/python - -import sys -import json -import logging -import argparse -import re -import threading -from urlparse import urlparse -from ConfigParser import ConfigParser -from couchdb.client import Server -from couchdb.http import ResourceNotFound, Resource, Session -from datetime import datetime - -from leap.soledad.common.couch import CouchDatabase - - -# parse command line for the log file name -logger_fname = "/tmp/u1db-couch-db-migration_%s.log" % \ -               str(datetime.now()).replace(' ', '_') -parser = argparse.ArgumentParser() -parser.add_argument('--log', action='store', default=logger_fname, type=str, -                    required=False, help='the name of the log file', nargs=1) -args = parser.parse_args() - - -# configure the logger -logger = logging.getLogger(__name__) -logger.setLevel(logging.DEBUG) -print "Logging to %s." % args.log -logging.basicConfig( -    filename=args.log, -    format="%(asctime)-15s %(message)s") - - -# configure threads -max_threads = 20 -semaphore_pool = threading.BoundedSemaphore(value=max_threads) - -# get couch url -cp = ConfigParser() -cp.read('/etc/leap/soledad-server.conf') -url = cp.get('soledad-server', 'couch_url') - -resource = Resource(url, Session(retry_delays=[1,2,4,8], timeout=10)) -server = Server(url=resource) - -hidden_url = re.sub( -    'http://(.*):.*@', -    'http://\\1:xxxxx@', -    url) - -print """ -========== -ATTENTION! -========== - -This script will modify Soledad's shared and user databases in: - -  %s - -This script does not make a backup of the couch db data, so make sure youj -have a copy or you may loose data. -""" % hidden_url -confirm = raw_input("Proceed (type uppercase YES)? ") - -if confirm != "YES": -    exit(1) - - -# -# Thread -# - -class DocWorkerThread(threading.Thread): - -    def __init__(self, dbname, doc_id, db_idx, db_len, doc_idx, doc_len, -                 transaction_log, conflict_log, release_fun): -        threading.Thread.__init__(self) -        resource = Resource(url, Session(retry_delays=[1,2,4,8], timeout=10)) -        server = Server(url=resource) -        self._dbname = dbname -        self._cdb = server[self._dbname] -        self._doc_id = doc_id -        self._db_idx = db_idx -        self._db_len = db_len -        self._doc_idx = doc_idx -        self._doc_len = doc_len -        self._transaction_log = transaction_log -        self._conflict_log = conflict_log -        self._release_fun = release_fun - -    def run(self): - -        old_doc = self._cdb[self._doc_id] - -        # skip non u1db docs -        if 'u1db_rev' not in old_doc: -            logger.debug('(%d/%d) (%d/%d) Skipping %s/%s).' % -                         (self._db_idx, self._db_len, self._doc_idx, -                          self._doc_len, self._dbname, self._doc_id)) -            self._release_fun() -            return -        else: -            logger.debug('(%d/%d) (%d/%d) Processing %s/%s ...' % -                         (self._db_idx, self._db_len, self._doc_idx, -                          self._doc_len, self._dbname, self._doc_id)) - -        doc = { -            '_id': self._doc_id, -            '_rev': old_doc['_rev'], -            'u1db_rev': old_doc['u1db_rev'] -        } -        attachments = [] - -        # add transactions -        doc['u1db_transactions'] = map( -            lambda (gen, doc_id, trans_id): (gen, trans_id), -            filter( -                lambda (gen, doc_id, trans_id): doc_id == doc['_id'], -                self._transaction_log)) -        if len(doc['u1db_transactions']) == 0: -            del doc['u1db_transactions'] - -        # add conflicts -        if doc['_id'] in self._conflict_log: -            attachments.append([ -                conflict_log[doc['_id']], -                'u1db_conflicts', -                "application/octet-stream"]) - -        # move document's content to 'u1db_content' attachment -        content = self._cdb.get_attachment(doc, 'u1db_json') -        if content is not None: -            attachments.append([ -                content, -                'u1db_content', -                "application/octet-stream"]) -        #self._cdb.delete_attachment(doc, 'u1db_json') - -        # save modified doc -        self._cdb.save(doc) - -        # save all doc attachments -        for content, att_name, content_type in attachments: -            self._cdb.put_attachment( -                doc, -                content, -                filename=att_name, -                content_type=content_type) - -        # release the semaphore -        self._release_fun() - - -db_idx = 0 -db_len = len(server) -for dbname in server: - -    db_idx += 1 - -    if not (dbname.startswith('user-') or dbname == 'shared') \ -            or dbname == 'user-test-db': -        logger.info("(%d/%d) Skipping db %s." % (db_idx, db_len, dbname)) -        continue - -    logger.info("(%d/%d) Migrating db %s." % (db_idx, db_len, dbname)) - -    # get access to couch db -    cdb = Server(url)[dbname] - -    # get access to soledad db -    sdb = CouchDatabase(url, dbname) - -    # Migration table -    # --------------- -    # -    # * Metadata that was previously stored in special documents migrate to -    #   inside documents, to allow for atomic doc-and-metadata updates. -    # * Doc content attachment name changes. -    # * Indexes are removed, to be implemented in the future possibly as -    #   design docs view functions. -    # -    # +-----------------+-------------------------+-------------------------+ -    # | Data            | old storage             | new storage             | -    # |-----------------+-------------------------+-------------------------+ -    # | doc content     | <doc_id>/u1db_json      | <doc_id>/u1db_content   | -    # | doc conflicts   | u1db/_conflicts         | <doc_id>/u1db_conflicts | -    # | transaction log | u1db/_transaction_log   | doc.u1db_transactions   | -    # | sync log        | u1db/_other_generations | u1db_sync_log           | -    # | indexes         | u1db/_indexes           | not implemented         | -    # | replica uid     | u1db/_replica_uid       | u1db_config             | -    # +-----------------+-------------------------+-------------------------+ - -    def get_att_content(db, doc_id, att_name): -        try: -            return json.loads( -                db.get_attachment( -                    doc_id, att_name).read())['content'] -        except: -            import ipdb -            ipdb.set_trace() - -    # only migrate databases that have the 'u1db/_replica_uid' document -    try: -        metadoc = cdb.get('u1db/_replica_uid') -        replica_uid = get_att_content(cdb, 'u1db/_replica_uid', 'u1db_json') -    except ResourceNotFound: -        continue - -    #--------------------------------------------------------------------- -    # Step 1: Set replica uid. -    #--------------------------------------------------------------------- -    sdb._set_replica_uid(replica_uid) - -    #--------------------------------------------------------------------- -    # Step 2: Obtain metadata. -    #--------------------------------------------------------------------- - -    # obtain the transaction log: [['<doc_id>', '<trans_id>'], ...] -    transaction_log = get_att_content( -        cdb, 'u1db/_transaction_log', 'u1db_json') -    new_transaction_log = [] -    gen = 1 -    for (doc_id, trans_id) in transaction_log: -        new_transaction_log.append((gen, doc_id, trans_id)) -        gen += 1 -    transaction_log = new_transaction_log - -    # obtain the conflict log: {'<doc_id>': ['<rev>', '<content>'], ...} -    conflict_log = get_att_content(cdb, 'u1db/_conflicts', 'u1db_json') - -    # obtain the sync log: -    # {'<replica_uid>': ['<gen>', '<transaction_id>'], ...} -    other_generations = get_att_content( -        cdb, 'u1db/_other_generations', 'u1db_json') - -    #--------------------------------------------------------------------- -    # Step 3: Iterate over all documents in database. -    #--------------------------------------------------------------------- -    doc_len = len(cdb) -    logger.info("(%d, %d) Found %d documents." % (db_idx, db_len, doc_len)) -    doc_idx = 0 -    threads = [] -    for doc_id in cdb: -        doc_idx = doc_idx + 1 - -        semaphore_pool.acquire() -        thread = DocWorkerThread(dbname, doc_id, db_idx, db_len, -                                 doc_idx, doc_len, transaction_log, -                                 conflict_log, semaphore_pool.release) -        thread.daemon = True -        thread.start() -        threads.append(thread) - -    map(lambda thread: thread.join(), threads) - -    #--------------------------------------------------------------------- -    # Step 4: Move sync log. -    #--------------------------------------------------------------------- - -    # move sync log -    sync_doc = { -        '_id': 'u1db_sync_log', -        'syncs': [] -    } - -    for replica_uid in other_generations: -        gen, transaction_id = other_generations[replica_uid] -        sync_doc['syncs'].append([replica_uid, gen, transaction_id]) -    cdb.save(sync_doc) - -    #--------------------------------------------------------------------- -    # Step 5: Delete old meta documents. -    #--------------------------------------------------------------------- - -    # remove unused docs -    for doc_id in ['_transaction_log', '_conflicts', '_other_generations', -            '_indexes', '_replica_uid']: -        for prefix in ['u1db/', 'u1db%2F']: -            try: -                doc = cdb['%s%s' % (prefix, doc_id)] -                logger.info( -                    "(%d/%d) Deleting %s/%s/%s." % -                    (db_idx, db_len, dbname, 'u1db', doc_id)) -                cdb.delete(doc) -            except ResourceNotFound: -                pass | 
