diff options
-rw-r--r-- | scripts/db_access/reset_db.py | 132 | ||||
-rw-r--r-- | scripts/ddocs/update_design_docs.py | 191 | ||||
-rwxr-xr-x | scripts/profiling/spam.py | 123 |
3 files changed, 325 insertions, 121 deletions
diff --git a/scripts/db_access/reset_db.py b/scripts/db_access/reset_db.py index 80871856..7c6d281b 100644 --- a/scripts/db_access/reset_db.py +++ b/scripts/db_access/reset_db.py @@ -5,20 +5,21 @@ # WARNING: running this script over a database will delete all documents but # the one with id u1db_config (which contains db metadata) and design docs # needed for couch backend. +# +# Run it like this to get some help: +# +# ./reset_db.py --help -import sys -from ConfigParser import ConfigParser import threading import logging -from couchdb import Database as CouchDatabase - +import argparse +import re -if len(sys.argv) != 2: - print 'Usage: %s <uuid>' % sys.argv[0] - exit(1) -uuid = sys.argv[1] +from ConfigParser import ConfigParser +from couchdb import Database as CouchDatabase +from couchdb import Server as CouchServer # create a logger @@ -27,23 +28,6 @@ LOG_FORMAT = '%(asctime)s %(message)s' logging.basicConfig(format=LOG_FORMAT, level=logging.INFO) -# get couch url -cp = ConfigParser() -cp.read('/etc/leap/soledad-server.conf') -url = cp.get('soledad-server', 'couch_url') - - -# confirm -yes = raw_input("Are you sure you want to reset the database for user %s " - "(type YES)? " % uuid) -if yes != 'YES': - print 'Bailing out...' - exit(2) - - -db = CouchDatabase('%s/user-%s' % (url, uuid)) - - class _DeleterThread(threading.Thread): def __init__(self, db, doc_id, release_fun): @@ -59,21 +43,95 @@ class _DeleterThread(threading.Thread): self._release_fun() -semaphore_pool = threading.BoundedSemaphore(value=20) - - -threads = [] -for doc_id in db: - if doc_id != 'u1db_config' and not doc_id.startswith('_design'): +def get_confirmation(noconfirm, uuid, shared): + msg = "Are you sure you want to reset %s (type YES)? " + if shared: + msg = msg % "the shared database" + elif uuid: + msg = msg % ("the database for user %s" % uuid) + else: + msg = msg % "all databases" + if noconfirm is False: + yes = raw_input(msg) + if yes != 'YES': + print 'Bailing out...' + exit(2) + + +def get_url(empty): + url = None + if empty is False: + # get couch url + cp = ConfigParser() + cp.read('/etc/leap/soledad-server.conf') + url = cp.get('soledad-server', 'couch_url') + else: + with open('/etc/couchdb/couchdb.netrc') as f: + netrc = f.read() + admin_password = re.match('^.* password (.*)$', netrc).groups()[0] + url = 'http://admin:%s@127.0.0.1:5984' % admin_password + return url + + +def reset_all_dbs(url, empty): + server = CouchServer('%s' % (url)) + for dbname in server: + if dbname.startswith('user-') or dbname == 'shared': + reset_db(url, dbname, empty) + + +def reset_db(url, dbname, empty): + db = CouchDatabase('%s/%s' % (url, dbname)) + semaphore_pool = threading.BoundedSemaphore(value=20) + + # launch threads for deleting docs + threads = [] + for doc_id in db: + if empty is False: + if doc_id == 'u1db_config' or doc_id.startswith('_design'): + continue semaphore_pool.acquire() logger.info('[main] launching thread for doc: %s' % doc_id) t = _DeleterThread(db, doc_id, semaphore_pool.release) t.start() threads.append(t) - -logger.info('[main] waiting for threads.') -map(lambda thread: thread.join(), threads) - - -logger.info('[main] done.') + # wait for threads to finish + logger.info('[main] waiting for threads.') + map(lambda thread: thread.join(), threads) + logger.info('[main] done.') + + +def _parse_args(): + parser = argparse.ArgumentParser() + group = parser.add_mutually_exclusive_group() + group.add_argument('-u', dest='uuid', default=False, + help='Reset database of given user.') + group.add_argument('-s', dest='shared', action='store_true', default=False, + help='Reset the shared database.') + group.add_argument('-a', dest='all', action='store_true', default=False, + help='Reset all user databases.') + parser.add_argument( + '-e', dest='empty', action='store_true', required=False, default=False, + help='Empty database (do not preserve minimal set of u1db documents).') + parser.add_argument( + '-y', dest='noconfirm', action='store_true', required=False, + default=False, + help='Do not ask for confirmation.') + return parser.parse_args(), parser + + +if __name__ == '__main__': + args, parser = _parse_args() + if not (args.uuid or args.shared or args.all): + parser.print_help() + exit(1) + + url = get_url(args.empty) + get_confirmation(args.noconfirm, args.uuid, args.shared) + if args.uuid: + reset_db(url, "user-%s" % args.uuid, args.empty) + elif args.shared: + reset_db(url, "shared", args.empty) + elif args.all: + reset_all_dbs(url, args.empty) diff --git a/scripts/ddocs/update_design_docs.py b/scripts/ddocs/update_design_docs.py index e7b5a29c..2e2fa8f0 100644 --- a/scripts/ddocs/update_design_docs.py +++ b/scripts/ddocs/update_design_docs.py @@ -11,84 +11,83 @@ import re import threading import binascii - +from urlparse import urlparse from getpass import getpass from ConfigParser import ConfigParser -from couchdb.client import Server -from couchdb.http import Resource, Session -from datetime import datetime -from urlparse import urlparse +from couchdb.client import Server +from couchdb.http import Resource +from couchdb.http import Session +from couchdb.http import ResourceNotFound from leap.soledad.common import ddocs -# parse command line for the log file name -logger_fname = "/tmp/update-design-docs_%s.log" % \ - str(datetime.now()).replace(' ', '_') -parser = argparse.ArgumentParser() -parser.add_argument('--log', action='store', default=logger_fname, type=str, - required=False, help='the name of the log file', nargs=1) -args = parser.parse_args() +MAX_THREADS = 20 +DESIGN_DOCS = { + '_design/docs': json.loads(binascii.a2b_base64(ddocs.docs)), + '_design/syncs': json.loads(binascii.a2b_base64(ddocs.syncs)), + '_design/transactions': json.loads( + binascii.a2b_base64(ddocs.transactions)), +} -# configure the logger +# create a logger logger = logging.getLogger(__name__) -logger.setLevel(logging.DEBUG) -print "Logging to %s." % args.log -logging.basicConfig( - filename=args.log, - format="%(asctime)-15s %(message)s") +LOG_FORMAT = '%(asctime)s %(message)s' +logging.basicConfig(format=LOG_FORMAT, level=logging.INFO) -# configure threads -max_threads = 20 -semaphore_pool = threading.BoundedSemaphore(value=max_threads) -threads = [] +def _parse_args(): + parser = argparse.ArgumentParser() + parser.add_argument('-u', dest='uuid', default=None, type=str, + help='the UUID of the user') + parser.add_argument('-t', dest='threads', default=MAX_THREADS, type=int, + help='the number of parallel threads') + return parser.parse_args() -# get couch url -cp = ConfigParser() -cp.read('/etc/leap/soledad-server.conf') -url = urlparse(cp.get('soledad-server', 'couch_url')) -# get admin password -netloc = re.sub('^.*@', '', url.netloc) -url = url._replace(netloc=netloc) -password = getpass("Admin password for %s: " % url.geturl()) -url = url._replace(netloc='admin:%s@%s' % (password, netloc)) +def _get_url(): + # get couch url + cp = ConfigParser() + cp.read('/etc/leap/soledad-server.conf') + url = urlparse(cp.get('soledad-server', 'couch_url')) + # get admin password + netloc = re.sub('^.*@', '', url.netloc) + url = url._replace(netloc=netloc) + password = getpass("Admin password for %s: " % url.geturl()) + return url._replace(netloc='admin:%s@%s' % (password, netloc)) -resource = Resource(url.geturl(), Session(retry_delays=[1,2,4,8], timeout=10)) -server = Server(url=resource) -hidden_url = re.sub( - 'http://(.*):.*@', - 'http://\\1:xxxxx@', - url.geturl()) +def _get_server(url): + resource = Resource( + url.geturl(), Session(retry_delays=[1, 2, 4, 8], timeout=10)) + return Server(url=resource) -print """ -========== -ATTENTION! -========== -This script will modify Soledad's shared and user databases in: +def _confirm(url): + hidden_url = re.sub( + 'http://(.*):.*@', + 'http://\\1:xxxxx@', + url.geturl()) - %s + print """ + ========== + ATTENTION! + ========== -This script does not make a backup of the couch db data, so make sure you -have a copy or you may loose data. -""" % hidden_url -confirm = raw_input("Proceed (type uppercase YES)? ") + This script will modify Soledad's shared and user databases in: -if confirm != "YES": - exit(1) + %s -# convert design doc content + This script does not make a backup of the couch db data, so make sure you + have a copy or you may loose data. + """ % hidden_url + confirm = raw_input("Proceed (type uppercase YES)? ") + + if confirm != "YES": + exit(1) -design_docs = { - '_design/docs': json.loads(binascii.a2b_base64(ddocs.docs)), - '_design/syncs': json.loads(binascii.a2b_base64(ddocs.syncs)), - '_design/transactions': json.loads(binascii.a2b_base64(ddocs.transactions)), -} # # Thread @@ -106,42 +105,66 @@ class DBWorkerThread(threading.Thread): def run(self): - logger.info("(%d/%d) Updating db %s." % (self._db_idx, self._db_len, - self._dbname)) + logger.info( + "(%d/%d) Updating db %s." + % (self._db_idx, self._db_len, self._dbname)) - for doc_id in design_docs: - doc = self._cdb[doc_id] + for doc_id in DESIGN_DOCS: + try: + doc = self._cdb[doc_id] + except ResourceNotFound: + doc = {'_id': doc_id} for key in ['lists', 'views', 'updates']: - if key in design_docs[doc_id]: - doc[key] = design_docs[doc_id][key] + if key in DESIGN_DOCS[doc_id]: + doc[key] = DESIGN_DOCS[doc_id][key] self._cdb.save(doc) # release the semaphore self._release_fun() -db_idx = 0 -db_len = len(server) -for dbname in server: - - db_idx += 1 - - if not (dbname.startswith('user-') or dbname == 'shared') \ - or dbname == 'user-test-db': - logger.info("(%d/%d) Skipping db %s." % (db_idx, db_len, dbname)) - continue - - - # get access to couch db - cdb = Server(url.geturl())[dbname] - - #--------------------------------------------------------------------- - # Start DB worker thread - #--------------------------------------------------------------------- - semaphore_pool.acquire() - thread = DBWorkerThread(server, dbname, db_idx, db_len, semaphore_pool.release) +def _launch_update_design_docs_thread( + server, dbname, db_idx, db_len, semaphore_pool): + semaphore_pool.acquire() # wait for an available working slot + thread = DBWorkerThread( + server, dbname, db_idx, db_len, semaphore_pool.release) thread.daemon = True thread.start() - threads.append(thread) - -map(lambda thread: thread.join(), threads) + return thread + + +def _update_design_docs(args, server): + + # find the actual databases to be updated + dbs = [] + if args.uuid: + dbs.append('user-%s' % args.uuid) + else: + for dbname in server: + if dbname.startswith('user-') or dbname == 'shared': + dbs.append(dbname) + else: + logger.info("Skipping db %s." % dbname) + + db_idx = 0 + db_len = len(dbs) + semaphore_pool = threading.BoundedSemaphore(value=args.threads) + threads = [] + + # launch the update + for db in dbs: + db_idx += 1 + threads.append( + _launch_update_design_docs_thread( + server, db, db_idx, db_len, semaphore_pool)) + + # wait for all threads to finish + map(lambda thread: thread.join(), threads) + + +if __name__ == "__main__": + args = _parse_args() + url = _get_url() + _confirm(url) + server = _get_server(url) + _update_design_docs(args, server) diff --git a/scripts/profiling/spam.py b/scripts/profiling/spam.py new file mode 100755 index 00000000..091a8c48 --- /dev/null +++ b/scripts/profiling/spam.py @@ -0,0 +1,123 @@ +#!/usr/bin/python + +# Send a lot of messages in parallel. + + +import string +import smtplib +import threading +import logging + +from argparse import ArgumentParser + + +SMTP_HOST = 'chipmonk.cdev.bitmask.net' +NUMBER_OF_THREADS = 20 + + +logger = logging.getLogger(__name__) +LOG_FORMAT = '%(asctime)s %(message)s' +logging.basicConfig(format=LOG_FORMAT, level=logging.INFO) + + +def _send_email(host, subject, to_addr, from_addr, body_text): + """ + Send an email + """ + body = string.join(( + "From: %s" % from_addr, + "To: %s" % to_addr, + "Subject: %s" % subject, + "", + body_text + ), "\r\n") + server = smtplib.SMTP(host) + server.sendmail(from_addr, [to_addr], body) + server.quit() + + +def _parse_args(): + parser = ArgumentParser() + parser.add_argument( + 'target_address', + help='The target email address to spam') + parser.add_argument( + 'number_of_messages', type=int, + help='The amount of messages email address to spam') + parser.add_argument( + '-s', dest='server', default=SMTP_HOST, + help='The SMTP server to use') + parser.add_argument( + '-t', dest='threads', default=NUMBER_OF_THREADS, + help='The maximum number of parallel threads to launch') + return parser.parse_args() + + +class EmailSenderThread(threading.Thread): + + def __init__(self, host, subject, to_addr, from_addr, body_text, + finished_fun): + threading.Thread.__init__(self) + self._host = host + self._subject = subject + self._to_addr = to_addr + self._from_addr = from_addr + self._body_text = body_text + self._finished_fun = finished_fun + + def run(self): + _send_email( + self._host, self._subject, self._to_addr, self._from_addr, + self._body_text) + self._finished_fun() + + +def _launch_email_thread(host, subject, to_addr, from_addr, body_text, + finished_fun): + thread = EmailSenderThread( + host, subject, to_addr, from_addr, body_text, finished_fun) + thread.start() + return thread + + +class FinishedThreads(object): + + def __init__(self): + self._finished = 0 + self._lock = threading.Lock() + + def signal(self): + with self._lock: + self._finished = self._finished + 1 + logger.info('Number of messages sent: %d.' % self._finished) + + +def _send_messages(args): + host = args.server + subject = "Message from Soledad script" + to_addr = args.target_address + from_addr = args.target_address + body_text = "Test message" + + semaphore = threading.Semaphore(args.threads) + threads = [] + finished_threads = FinishedThreads() + + def _finished_fun(): + semaphore.release() + finished_threads.signal() + + for i in xrange(args.number_of_messages): + semaphore.acquire() + threads.append( + _launch_email_thread( + host, subject, to_addr, from_addr, body_text, + _finished_fun)) + + for t in threads: + t.join() + + +if __name__ == "__main__": + args = _parse_args() + _send_messages(args) |