diff options
author | Tomás Touceda <chiiph@leap.se> | 2014-03-11 09:56:28 -0300 |
---|---|---|
committer | Tomás Touceda <chiiph@leap.se> | 2014-03-11 09:56:28 -0300 |
commit | a8d002714e4ce2ff487785357cc01d082ffad537 (patch) | |
tree | 00bb495df23d7f826bebe85fb500501ca8f460b3 /scripts/migrate_dbs.py | |
parent | 66c93f4515139fcf666948b31e09c5bfae91e3dc (diff) | |
parent | e5664fb1046e71589d0c41cd605761cc642ff28b (diff) |
Merge remote-tracking branch 'refs/remotes/drebs/feature/5011_use-less-memory-when-putting-docs-on-couch-3' into develop
Diffstat (limited to 'scripts/migrate_dbs.py')
-rw-r--r-- | scripts/migrate_dbs.py | 288 |
1 files changed, 0 insertions, 288 deletions
diff --git a/scripts/migrate_dbs.py b/scripts/migrate_dbs.py deleted file mode 100644 index f1c20d87..00000000 --- a/scripts/migrate_dbs.py +++ /dev/null @@ -1,288 +0,0 @@ -#!/usr/bin/python - -import sys -import json -import logging -import argparse -import re -import threading -from urlparse import urlparse -from ConfigParser import ConfigParser -from couchdb.client import Server -from couchdb.http import ResourceNotFound, Resource, Session -from datetime import datetime - -from leap.soledad.common.couch import CouchDatabase - - -# parse command line for the log file name -logger_fname = "/tmp/u1db-couch-db-migration_%s.log" % \ - str(datetime.now()).replace(' ', '_') -parser = argparse.ArgumentParser() -parser.add_argument('--log', action='store', default=logger_fname, type=str, - required=False, help='the name of the log file', nargs=1) -args = parser.parse_args() - - -# configure the logger -logger = logging.getLogger(__name__) -logger.setLevel(logging.DEBUG) -print "Logging to %s." % args.log -logging.basicConfig( - filename=args.log, - format="%(asctime)-15s %(message)s") - - -# configure threads -max_threads = 20 -semaphore_pool = threading.BoundedSemaphore(value=max_threads) - -# get couch url -cp = ConfigParser() -cp.read('/etc/leap/soledad-server.conf') -url = cp.get('soledad-server', 'couch_url') - -resource = Resource(url, Session(retry_delays=[1,2,4,8], timeout=10)) -server = Server(url=resource) - -hidden_url = re.sub( - 'http://(.*):.*@', - 'http://\\1:xxxxx@', - url) - -print """ -========== -ATTENTION! -========== - -This script will modify Soledad's shared and user databases in: - - %s - -This script does not make a backup of the couch db data, so make sure youj -have a copy or you may loose data. -""" % hidden_url -confirm = raw_input("Proceed (type uppercase YES)? ") - -if confirm != "YES": - exit(1) - - -# -# Thread -# - -class DocWorkerThread(threading.Thread): - - def __init__(self, dbname, doc_id, db_idx, db_len, doc_idx, doc_len, - transaction_log, conflict_log, release_fun): - threading.Thread.__init__(self) - resource = Resource(url, Session(retry_delays=[1,2,4,8], timeout=10)) - server = Server(url=resource) - self._dbname = dbname - self._cdb = server[self._dbname] - self._doc_id = doc_id - self._db_idx = db_idx - self._db_len = db_len - self._doc_idx = doc_idx - self._doc_len = doc_len - self._transaction_log = transaction_log - self._conflict_log = conflict_log - self._release_fun = release_fun - - def run(self): - - old_doc = self._cdb[self._doc_id] - - # skip non u1db docs - if 'u1db_rev' not in old_doc: - logger.debug('(%d/%d) (%d/%d) Skipping %s/%s).' % - (self._db_idx, self._db_len, self._doc_idx, - self._doc_len, self._dbname, self._doc_id)) - self._release_fun() - return - else: - logger.debug('(%d/%d) (%d/%d) Processing %s/%s ...' % - (self._db_idx, self._db_len, self._doc_idx, - self._doc_len, self._dbname, self._doc_id)) - - doc = { - '_id': self._doc_id, - '_rev': old_doc['_rev'], - 'u1db_rev': old_doc['u1db_rev'] - } - attachments = [] - - # add transactions - doc['u1db_transactions'] = map( - lambda (gen, doc_id, trans_id): (gen, trans_id), - filter( - lambda (gen, doc_id, trans_id): doc_id == doc['_id'], - self._transaction_log)) - if len(doc['u1db_transactions']) == 0: - del doc['u1db_transactions'] - - # add conflicts - if doc['_id'] in self._conflict_log: - attachments.append([ - conflict_log[doc['_id']], - 'u1db_conflicts', - "application/octet-stream"]) - - # move document's content to 'u1db_content' attachment - content = self._cdb.get_attachment(doc, 'u1db_json') - if content is not None: - attachments.append([ - content, - 'u1db_content', - "application/octet-stream"]) - #self._cdb.delete_attachment(doc, 'u1db_json') - - # save modified doc - self._cdb.save(doc) - - # save all doc attachments - for content, att_name, content_type in attachments: - self._cdb.put_attachment( - doc, - content, - filename=att_name, - content_type=content_type) - - # release the semaphore - self._release_fun() - - -db_idx = 0 -db_len = len(server) -for dbname in server: - - db_idx += 1 - - if not (dbname.startswith('user-') or dbname == 'shared') \ - or dbname == 'user-test-db': - logger.info("(%d/%d) Skipping db %s." % (db_idx, db_len, dbname)) - continue - - logger.info("(%d/%d) Migrating db %s." % (db_idx, db_len, dbname)) - - # get access to couch db - cdb = Server(url)[dbname] - - # get access to soledad db - sdb = CouchDatabase(url, dbname) - - # Migration table - # --------------- - # - # * Metadata that was previously stored in special documents migrate to - # inside documents, to allow for atomic doc-and-metadata updates. - # * Doc content attachment name changes. - # * Indexes are removed, to be implemented in the future possibly as - # design docs view functions. - # - # +-----------------+-------------------------+-------------------------+ - # | Data | old storage | new storage | - # |-----------------+-------------------------+-------------------------+ - # | doc content | <doc_id>/u1db_json | <doc_id>/u1db_content | - # | doc conflicts | u1db/_conflicts | <doc_id>/u1db_conflicts | - # | transaction log | u1db/_transaction_log | doc.u1db_transactions | - # | sync log | u1db/_other_generations | u1db_sync_log | - # | indexes | u1db/_indexes | not implemented | - # | replica uid | u1db/_replica_uid | u1db_config | - # +-----------------+-------------------------+-------------------------+ - - def get_att_content(db, doc_id, att_name): - try: - return json.loads( - db.get_attachment( - doc_id, att_name).read())['content'] - except: - import ipdb - ipdb.set_trace() - - # only migrate databases that have the 'u1db/_replica_uid' document - try: - metadoc = cdb.get('u1db/_replica_uid') - replica_uid = get_att_content(cdb, 'u1db/_replica_uid', 'u1db_json') - except ResourceNotFound: - continue - - #--------------------------------------------------------------------- - # Step 1: Set replica uid. - #--------------------------------------------------------------------- - sdb._set_replica_uid(replica_uid) - - #--------------------------------------------------------------------- - # Step 2: Obtain metadata. - #--------------------------------------------------------------------- - - # obtain the transaction log: [['<doc_id>', '<trans_id>'], ...] - transaction_log = get_att_content( - cdb, 'u1db/_transaction_log', 'u1db_json') - new_transaction_log = [] - gen = 1 - for (doc_id, trans_id) in transaction_log: - new_transaction_log.append((gen, doc_id, trans_id)) - gen += 1 - transaction_log = new_transaction_log - - # obtain the conflict log: {'<doc_id>': ['<rev>', '<content>'], ...} - conflict_log = get_att_content(cdb, 'u1db/_conflicts', 'u1db_json') - - # obtain the sync log: - # {'<replica_uid>': ['<gen>', '<transaction_id>'], ...} - other_generations = get_att_content( - cdb, 'u1db/_other_generations', 'u1db_json') - - #--------------------------------------------------------------------- - # Step 3: Iterate over all documents in database. - #--------------------------------------------------------------------- - doc_len = len(cdb) - logger.info("(%d, %d) Found %d documents." % (db_idx, db_len, doc_len)) - doc_idx = 0 - threads = [] - for doc_id in cdb: - doc_idx = doc_idx + 1 - - semaphore_pool.acquire() - thread = DocWorkerThread(dbname, doc_id, db_idx, db_len, - doc_idx, doc_len, transaction_log, - conflict_log, semaphore_pool.release) - thread.daemon = True - thread.start() - threads.append(thread) - - map(lambda thread: thread.join(), threads) - - #--------------------------------------------------------------------- - # Step 4: Move sync log. - #--------------------------------------------------------------------- - - # move sync log - sync_doc = { - '_id': 'u1db_sync_log', - 'syncs': [] - } - - for replica_uid in other_generations: - gen, transaction_id = other_generations[replica_uid] - sync_doc['syncs'].append([replica_uid, gen, transaction_id]) - cdb.save(sync_doc) - - #--------------------------------------------------------------------- - # Step 5: Delete old meta documents. - #--------------------------------------------------------------------- - - # remove unused docs - for doc_id in ['_transaction_log', '_conflicts', '_other_generations', - '_indexes', '_replica_uid']: - for prefix in ['u1db/', 'u1db%2F']: - try: - doc = cdb['%s%s' % (prefix, doc_id)] - logger.info( - "(%d/%d) Deleting %s/%s/%s." % - (db_idx, db_len, dbname, 'u1db', doc_id)) - cdb.delete(doc) - except ResourceNotFound: - pass |