summaryrefslogtreecommitdiff
path: root/scripts/migrate_dbs.py
diff options
context:
space:
mode:
authorTomás Touceda <chiiph@leap.se>2014-03-11 09:56:28 -0300
committerTomás Touceda <chiiph@leap.se>2014-03-11 09:56:28 -0300
commita8d002714e4ce2ff487785357cc01d082ffad537 (patch)
tree00bb495df23d7f826bebe85fb500501ca8f460b3 /scripts/migrate_dbs.py
parent66c93f4515139fcf666948b31e09c5bfae91e3dc (diff)
parente5664fb1046e71589d0c41cd605761cc642ff28b (diff)
Merge remote-tracking branch 'refs/remotes/drebs/feature/5011_use-less-memory-when-putting-docs-on-couch-3' into develop
Diffstat (limited to 'scripts/migrate_dbs.py')
-rw-r--r--scripts/migrate_dbs.py288
1 files changed, 0 insertions, 288 deletions
diff --git a/scripts/migrate_dbs.py b/scripts/migrate_dbs.py
deleted file mode 100644
index f1c20d87..00000000
--- a/scripts/migrate_dbs.py
+++ /dev/null
@@ -1,288 +0,0 @@
-#!/usr/bin/python
-
-import sys
-import json
-import logging
-import argparse
-import re
-import threading
-from urlparse import urlparse
-from ConfigParser import ConfigParser
-from couchdb.client import Server
-from couchdb.http import ResourceNotFound, Resource, Session
-from datetime import datetime
-
-from leap.soledad.common.couch import CouchDatabase
-
-
-# parse command line for the log file name
-logger_fname = "/tmp/u1db-couch-db-migration_%s.log" % \
- str(datetime.now()).replace(' ', '_')
-parser = argparse.ArgumentParser()
-parser.add_argument('--log', action='store', default=logger_fname, type=str,
- required=False, help='the name of the log file', nargs=1)
-args = parser.parse_args()
-
-
-# configure the logger
-logger = logging.getLogger(__name__)
-logger.setLevel(logging.DEBUG)
-print "Logging to %s." % args.log
-logging.basicConfig(
- filename=args.log,
- format="%(asctime)-15s %(message)s")
-
-
-# configure threads
-max_threads = 20
-semaphore_pool = threading.BoundedSemaphore(value=max_threads)
-
-# get couch url
-cp = ConfigParser()
-cp.read('/etc/leap/soledad-server.conf')
-url = cp.get('soledad-server', 'couch_url')
-
-resource = Resource(url, Session(retry_delays=[1,2,4,8], timeout=10))
-server = Server(url=resource)
-
-hidden_url = re.sub(
- 'http://(.*):.*@',
- 'http://\\1:xxxxx@',
- url)
-
-print """
-==========
-ATTENTION!
-==========
-
-This script will modify Soledad's shared and user databases in:
-
- %s
-
-This script does not make a backup of the couch db data, so make sure youj
-have a copy or you may loose data.
-""" % hidden_url
-confirm = raw_input("Proceed (type uppercase YES)? ")
-
-if confirm != "YES":
- exit(1)
-
-
-#
-# Thread
-#
-
-class DocWorkerThread(threading.Thread):
-
- def __init__(self, dbname, doc_id, db_idx, db_len, doc_idx, doc_len,
- transaction_log, conflict_log, release_fun):
- threading.Thread.__init__(self)
- resource = Resource(url, Session(retry_delays=[1,2,4,8], timeout=10))
- server = Server(url=resource)
- self._dbname = dbname
- self._cdb = server[self._dbname]
- self._doc_id = doc_id
- self._db_idx = db_idx
- self._db_len = db_len
- self._doc_idx = doc_idx
- self._doc_len = doc_len
- self._transaction_log = transaction_log
- self._conflict_log = conflict_log
- self._release_fun = release_fun
-
- def run(self):
-
- old_doc = self._cdb[self._doc_id]
-
- # skip non u1db docs
- if 'u1db_rev' not in old_doc:
- logger.debug('(%d/%d) (%d/%d) Skipping %s/%s).' %
- (self._db_idx, self._db_len, self._doc_idx,
- self._doc_len, self._dbname, self._doc_id))
- self._release_fun()
- return
- else:
- logger.debug('(%d/%d) (%d/%d) Processing %s/%s ...' %
- (self._db_idx, self._db_len, self._doc_idx,
- self._doc_len, self._dbname, self._doc_id))
-
- doc = {
- '_id': self._doc_id,
- '_rev': old_doc['_rev'],
- 'u1db_rev': old_doc['u1db_rev']
- }
- attachments = []
-
- # add transactions
- doc['u1db_transactions'] = map(
- lambda (gen, doc_id, trans_id): (gen, trans_id),
- filter(
- lambda (gen, doc_id, trans_id): doc_id == doc['_id'],
- self._transaction_log))
- if len(doc['u1db_transactions']) == 0:
- del doc['u1db_transactions']
-
- # add conflicts
- if doc['_id'] in self._conflict_log:
- attachments.append([
- conflict_log[doc['_id']],
- 'u1db_conflicts',
- "application/octet-stream"])
-
- # move document's content to 'u1db_content' attachment
- content = self._cdb.get_attachment(doc, 'u1db_json')
- if content is not None:
- attachments.append([
- content,
- 'u1db_content',
- "application/octet-stream"])
- #self._cdb.delete_attachment(doc, 'u1db_json')
-
- # save modified doc
- self._cdb.save(doc)
-
- # save all doc attachments
- for content, att_name, content_type in attachments:
- self._cdb.put_attachment(
- doc,
- content,
- filename=att_name,
- content_type=content_type)
-
- # release the semaphore
- self._release_fun()
-
-
-db_idx = 0
-db_len = len(server)
-for dbname in server:
-
- db_idx += 1
-
- if not (dbname.startswith('user-') or dbname == 'shared') \
- or dbname == 'user-test-db':
- logger.info("(%d/%d) Skipping db %s." % (db_idx, db_len, dbname))
- continue
-
- logger.info("(%d/%d) Migrating db %s." % (db_idx, db_len, dbname))
-
- # get access to couch db
- cdb = Server(url)[dbname]
-
- # get access to soledad db
- sdb = CouchDatabase(url, dbname)
-
- # Migration table
- # ---------------
- #
- # * Metadata that was previously stored in special documents migrate to
- # inside documents, to allow for atomic doc-and-metadata updates.
- # * Doc content attachment name changes.
- # * Indexes are removed, to be implemented in the future possibly as
- # design docs view functions.
- #
- # +-----------------+-------------------------+-------------------------+
- # | Data | old storage | new storage |
- # |-----------------+-------------------------+-------------------------+
- # | doc content | <doc_id>/u1db_json | <doc_id>/u1db_content |
- # | doc conflicts | u1db/_conflicts | <doc_id>/u1db_conflicts |
- # | transaction log | u1db/_transaction_log | doc.u1db_transactions |
- # | sync log | u1db/_other_generations | u1db_sync_log |
- # | indexes | u1db/_indexes | not implemented |
- # | replica uid | u1db/_replica_uid | u1db_config |
- # +-----------------+-------------------------+-------------------------+
-
- def get_att_content(db, doc_id, att_name):
- try:
- return json.loads(
- db.get_attachment(
- doc_id, att_name).read())['content']
- except:
- import ipdb
- ipdb.set_trace()
-
- # only migrate databases that have the 'u1db/_replica_uid' document
- try:
- metadoc = cdb.get('u1db/_replica_uid')
- replica_uid = get_att_content(cdb, 'u1db/_replica_uid', 'u1db_json')
- except ResourceNotFound:
- continue
-
- #---------------------------------------------------------------------
- # Step 1: Set replica uid.
- #---------------------------------------------------------------------
- sdb._set_replica_uid(replica_uid)
-
- #---------------------------------------------------------------------
- # Step 2: Obtain metadata.
- #---------------------------------------------------------------------
-
- # obtain the transaction log: [['<doc_id>', '<trans_id>'], ...]
- transaction_log = get_att_content(
- cdb, 'u1db/_transaction_log', 'u1db_json')
- new_transaction_log = []
- gen = 1
- for (doc_id, trans_id) in transaction_log:
- new_transaction_log.append((gen, doc_id, trans_id))
- gen += 1
- transaction_log = new_transaction_log
-
- # obtain the conflict log: {'<doc_id>': ['<rev>', '<content>'], ...}
- conflict_log = get_att_content(cdb, 'u1db/_conflicts', 'u1db_json')
-
- # obtain the sync log:
- # {'<replica_uid>': ['<gen>', '<transaction_id>'], ...}
- other_generations = get_att_content(
- cdb, 'u1db/_other_generations', 'u1db_json')
-
- #---------------------------------------------------------------------
- # Step 3: Iterate over all documents in database.
- #---------------------------------------------------------------------
- doc_len = len(cdb)
- logger.info("(%d, %d) Found %d documents." % (db_idx, db_len, doc_len))
- doc_idx = 0
- threads = []
- for doc_id in cdb:
- doc_idx = doc_idx + 1
-
- semaphore_pool.acquire()
- thread = DocWorkerThread(dbname, doc_id, db_idx, db_len,
- doc_idx, doc_len, transaction_log,
- conflict_log, semaphore_pool.release)
- thread.daemon = True
- thread.start()
- threads.append(thread)
-
- map(lambda thread: thread.join(), threads)
-
- #---------------------------------------------------------------------
- # Step 4: Move sync log.
- #---------------------------------------------------------------------
-
- # move sync log
- sync_doc = {
- '_id': 'u1db_sync_log',
- 'syncs': []
- }
-
- for replica_uid in other_generations:
- gen, transaction_id = other_generations[replica_uid]
- sync_doc['syncs'].append([replica_uid, gen, transaction_id])
- cdb.save(sync_doc)
-
- #---------------------------------------------------------------------
- # Step 5: Delete old meta documents.
- #---------------------------------------------------------------------
-
- # remove unused docs
- for doc_id in ['_transaction_log', '_conflicts', '_other_generations',
- '_indexes', '_replica_uid']:
- for prefix in ['u1db/', 'u1db%2F']:
- try:
- doc = cdb['%s%s' % (prefix, doc_id)]
- logger.info(
- "(%d/%d) Deleting %s/%s/%s." %
- (db_idx, db_len, dbname, 'u1db', doc_id))
- cdb.delete(doc)
- except ResourceNotFound:
- pass