diff options
author | drebs <drebs@leap.se> | 2013-12-15 02:04:14 -0200 |
---|---|---|
committer | drebs <drebs@leap.se> | 2013-12-18 14:36:59 -0200 |
commit | 42b02476ff70326e2d52fa70a94f1f7035cb185a (patch) | |
tree | db20c64fdbf732ecb595f742f253f6147e7887b0 /scripts/migrate_dbs.py | |
parent | 96cf0af3880aa9bec8fcad422526297ef7f40964 (diff) |
Add database migration script.
Diffstat (limited to 'scripts/migrate_dbs.py')
-rw-r--r-- | scripts/migrate_dbs.py | 288 |
1 files changed, 288 insertions, 0 deletions
diff --git a/scripts/migrate_dbs.py b/scripts/migrate_dbs.py new file mode 100644 index 00000000..f1c20d87 --- /dev/null +++ b/scripts/migrate_dbs.py @@ -0,0 +1,288 @@ +#!/usr/bin/python + +import sys +import json +import logging +import argparse +import re +import threading +from urlparse import urlparse +from ConfigParser import ConfigParser +from couchdb.client import Server +from couchdb.http import ResourceNotFound, Resource, Session +from datetime import datetime + +from leap.soledad.common.couch import CouchDatabase + + +# parse command line for the log file name +logger_fname = "/tmp/u1db-couch-db-migration_%s.log" % \ + str(datetime.now()).replace(' ', '_') +parser = argparse.ArgumentParser() +parser.add_argument('--log', action='store', default=logger_fname, type=str, + required=False, help='the name of the log file', nargs=1) +args = parser.parse_args() + + +# configure the logger +logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) +print "Logging to %s." % args.log +logging.basicConfig( + filename=args.log, + format="%(asctime)-15s %(message)s") + + +# configure threads +max_threads = 20 +semaphore_pool = threading.BoundedSemaphore(value=max_threads) + +# get couch url +cp = ConfigParser() +cp.read('/etc/leap/soledad-server.conf') +url = cp.get('soledad-server', 'couch_url') + +resource = Resource(url, Session(retry_delays=[1,2,4,8], timeout=10)) +server = Server(url=resource) + +hidden_url = re.sub( + 'http://(.*):.*@', + 'http://\\1:xxxxx@', + url) + +print """ +========== +ATTENTION! +========== + +This script will modify Soledad's shared and user databases in: + + %s + +This script does not make a backup of the couch db data, so make sure youj +have a copy or you may loose data. +""" % hidden_url +confirm = raw_input("Proceed (type uppercase YES)? ") + +if confirm != "YES": + exit(1) + + +# +# Thread +# + +class DocWorkerThread(threading.Thread): + + def __init__(self, dbname, doc_id, db_idx, db_len, doc_idx, doc_len, + transaction_log, conflict_log, release_fun): + threading.Thread.__init__(self) + resource = Resource(url, Session(retry_delays=[1,2,4,8], timeout=10)) + server = Server(url=resource) + self._dbname = dbname + self._cdb = server[self._dbname] + self._doc_id = doc_id + self._db_idx = db_idx + self._db_len = db_len + self._doc_idx = doc_idx + self._doc_len = doc_len + self._transaction_log = transaction_log + self._conflict_log = conflict_log + self._release_fun = release_fun + + def run(self): + + old_doc = self._cdb[self._doc_id] + + # skip non u1db docs + if 'u1db_rev' not in old_doc: + logger.debug('(%d/%d) (%d/%d) Skipping %s/%s).' % + (self._db_idx, self._db_len, self._doc_idx, + self._doc_len, self._dbname, self._doc_id)) + self._release_fun() + return + else: + logger.debug('(%d/%d) (%d/%d) Processing %s/%s ...' % + (self._db_idx, self._db_len, self._doc_idx, + self._doc_len, self._dbname, self._doc_id)) + + doc = { + '_id': self._doc_id, + '_rev': old_doc['_rev'], + 'u1db_rev': old_doc['u1db_rev'] + } + attachments = [] + + # add transactions + doc['u1db_transactions'] = map( + lambda (gen, doc_id, trans_id): (gen, trans_id), + filter( + lambda (gen, doc_id, trans_id): doc_id == doc['_id'], + self._transaction_log)) + if len(doc['u1db_transactions']) == 0: + del doc['u1db_transactions'] + + # add conflicts + if doc['_id'] in self._conflict_log: + attachments.append([ + conflict_log[doc['_id']], + 'u1db_conflicts', + "application/octet-stream"]) + + # move document's content to 'u1db_content' attachment + content = self._cdb.get_attachment(doc, 'u1db_json') + if content is not None: + attachments.append([ + content, + 'u1db_content', + "application/octet-stream"]) + #self._cdb.delete_attachment(doc, 'u1db_json') + + # save modified doc + self._cdb.save(doc) + + # save all doc attachments + for content, att_name, content_type in attachments: + self._cdb.put_attachment( + doc, + content, + filename=att_name, + content_type=content_type) + + # release the semaphore + self._release_fun() + + +db_idx = 0 +db_len = len(server) +for dbname in server: + + db_idx += 1 + + if not (dbname.startswith('user-') or dbname == 'shared') \ + or dbname == 'user-test-db': + logger.info("(%d/%d) Skipping db %s." % (db_idx, db_len, dbname)) + continue + + logger.info("(%d/%d) Migrating db %s." % (db_idx, db_len, dbname)) + + # get access to couch db + cdb = Server(url)[dbname] + + # get access to soledad db + sdb = CouchDatabase(url, dbname) + + # Migration table + # --------------- + # + # * Metadata that was previously stored in special documents migrate to + # inside documents, to allow for atomic doc-and-metadata updates. + # * Doc content attachment name changes. + # * Indexes are removed, to be implemented in the future possibly as + # design docs view functions. + # + # +-----------------+-------------------------+-------------------------+ + # | Data | old storage | new storage | + # |-----------------+-------------------------+-------------------------+ + # | doc content | <doc_id>/u1db_json | <doc_id>/u1db_content | + # | doc conflicts | u1db/_conflicts | <doc_id>/u1db_conflicts | + # | transaction log | u1db/_transaction_log | doc.u1db_transactions | + # | sync log | u1db/_other_generations | u1db_sync_log | + # | indexes | u1db/_indexes | not implemented | + # | replica uid | u1db/_replica_uid | u1db_config | + # +-----------------+-------------------------+-------------------------+ + + def get_att_content(db, doc_id, att_name): + try: + return json.loads( + db.get_attachment( + doc_id, att_name).read())['content'] + except: + import ipdb + ipdb.set_trace() + + # only migrate databases that have the 'u1db/_replica_uid' document + try: + metadoc = cdb.get('u1db/_replica_uid') + replica_uid = get_att_content(cdb, 'u1db/_replica_uid', 'u1db_json') + except ResourceNotFound: + continue + + #--------------------------------------------------------------------- + # Step 1: Set replica uid. + #--------------------------------------------------------------------- + sdb._set_replica_uid(replica_uid) + + #--------------------------------------------------------------------- + # Step 2: Obtain metadata. + #--------------------------------------------------------------------- + + # obtain the transaction log: [['<doc_id>', '<trans_id>'], ...] + transaction_log = get_att_content( + cdb, 'u1db/_transaction_log', 'u1db_json') + new_transaction_log = [] + gen = 1 + for (doc_id, trans_id) in transaction_log: + new_transaction_log.append((gen, doc_id, trans_id)) + gen += 1 + transaction_log = new_transaction_log + + # obtain the conflict log: {'<doc_id>': ['<rev>', '<content>'], ...} + conflict_log = get_att_content(cdb, 'u1db/_conflicts', 'u1db_json') + + # obtain the sync log: + # {'<replica_uid>': ['<gen>', '<transaction_id>'], ...} + other_generations = get_att_content( + cdb, 'u1db/_other_generations', 'u1db_json') + + #--------------------------------------------------------------------- + # Step 3: Iterate over all documents in database. + #--------------------------------------------------------------------- + doc_len = len(cdb) + logger.info("(%d, %d) Found %d documents." % (db_idx, db_len, doc_len)) + doc_idx = 0 + threads = [] + for doc_id in cdb: + doc_idx = doc_idx + 1 + + semaphore_pool.acquire() + thread = DocWorkerThread(dbname, doc_id, db_idx, db_len, + doc_idx, doc_len, transaction_log, + conflict_log, semaphore_pool.release) + thread.daemon = True + thread.start() + threads.append(thread) + + map(lambda thread: thread.join(), threads) + + #--------------------------------------------------------------------- + # Step 4: Move sync log. + #--------------------------------------------------------------------- + + # move sync log + sync_doc = { + '_id': 'u1db_sync_log', + 'syncs': [] + } + + for replica_uid in other_generations: + gen, transaction_id = other_generations[replica_uid] + sync_doc['syncs'].append([replica_uid, gen, transaction_id]) + cdb.save(sync_doc) + + #--------------------------------------------------------------------- + # Step 5: Delete old meta documents. + #--------------------------------------------------------------------- + + # remove unused docs + for doc_id in ['_transaction_log', '_conflicts', '_other_generations', + '_indexes', '_replica_uid']: + for prefix in ['u1db/', 'u1db%2F']: + try: + doc = cdb['%s%s' % (prefix, doc_id)] + logger.info( + "(%d/%d) Deleting %s/%s/%s." % + (db_idx, db_len, dbname, 'u1db', doc_id)) + cdb.delete(doc) + except ResourceNotFound: + pass |