diff options
| -rw-r--r-- | scripts/db_access/reset_db.py | 132 | ||||
| -rw-r--r-- | scripts/ddocs/update_design_docs.py | 191 | ||||
| -rwxr-xr-x | scripts/profiling/spam.py | 123 | 
3 files changed, 325 insertions, 121 deletions
| diff --git a/scripts/db_access/reset_db.py b/scripts/db_access/reset_db.py index 80871856..7c6d281b 100644 --- a/scripts/db_access/reset_db.py +++ b/scripts/db_access/reset_db.py @@ -5,20 +5,21 @@  # WARNING: running this script over a database will delete all documents but  # the one with id u1db_config (which contains db metadata) and design docs  # needed for couch backend. +# +# Run it like this to get some help: +# +#     ./reset_db.py --help -import sys -from ConfigParser import ConfigParser  import threading  import logging -from couchdb import Database as CouchDatabase - +import argparse +import re -if len(sys.argv) != 2: -    print 'Usage: %s <uuid>' % sys.argv[0] -    exit(1) -uuid = sys.argv[1] +from ConfigParser import ConfigParser +from couchdb import Database as CouchDatabase +from couchdb import Server as CouchServer  # create a logger @@ -27,23 +28,6 @@ LOG_FORMAT = '%(asctime)s %(message)s'  logging.basicConfig(format=LOG_FORMAT, level=logging.INFO) -# get couch url -cp = ConfigParser() -cp.read('/etc/leap/soledad-server.conf') -url = cp.get('soledad-server', 'couch_url') - - -# confirm -yes = raw_input("Are you sure you want to reset the database for user %s " -                "(type YES)? " % uuid) -if yes != 'YES': -    print 'Bailing out...' -    exit(2) - - -db = CouchDatabase('%s/user-%s' % (url, uuid)) - -  class _DeleterThread(threading.Thread):      def __init__(self, db, doc_id, release_fun): @@ -59,21 +43,95 @@ class _DeleterThread(threading.Thread):          self._release_fun() -semaphore_pool = threading.BoundedSemaphore(value=20) - - -threads = [] -for doc_id in db: -    if doc_id != 'u1db_config' and not doc_id.startswith('_design'): +def get_confirmation(noconfirm, uuid, shared): +    msg = "Are you sure you want to reset %s (type YES)? " +    if shared: +        msg = msg % "the shared database" +    elif uuid: +        msg = msg % ("the database for user %s" % uuid) +    else: +        msg = msg % "all databases" +    if noconfirm is False: +        yes = raw_input(msg) +        if yes != 'YES': +            print 'Bailing out...' +            exit(2) + + +def get_url(empty): +    url = None +    if empty is False: +        # get couch url +        cp = ConfigParser() +        cp.read('/etc/leap/soledad-server.conf') +        url = cp.get('soledad-server', 'couch_url') +    else: +        with open('/etc/couchdb/couchdb.netrc') as f: +            netrc = f.read() +            admin_password = re.match('^.* password (.*)$', netrc).groups()[0] +            url = 'http://admin:%s@127.0.0.1:5984' % admin_password +    return url + + +def reset_all_dbs(url, empty): +    server = CouchServer('%s' % (url)) +    for dbname in server: +        if dbname.startswith('user-') or dbname == 'shared': +            reset_db(url, dbname, empty) + + +def reset_db(url, dbname, empty): +    db = CouchDatabase('%s/%s' % (url, dbname)) +    semaphore_pool = threading.BoundedSemaphore(value=20) + +    # launch threads for deleting docs +    threads = [] +    for doc_id in db: +        if empty is False: +            if doc_id == 'u1db_config' or doc_id.startswith('_design'): +                continue          semaphore_pool.acquire()          logger.info('[main] launching thread for doc: %s' % doc_id)          t = _DeleterThread(db, doc_id, semaphore_pool.release)          t.start()          threads.append(t) - -logger.info('[main] waiting for threads.') -map(lambda thread: thread.join(), threads) - - -logger.info('[main] done.') +    # wait for threads to finish +    logger.info('[main] waiting for threads.') +    map(lambda thread: thread.join(), threads) +    logger.info('[main] done.') + + +def _parse_args(): +    parser = argparse.ArgumentParser() +    group = parser.add_mutually_exclusive_group() +    group.add_argument('-u', dest='uuid', default=False, +        help='Reset database of given user.') +    group.add_argument('-s', dest='shared', action='store_true', default=False, +        help='Reset the shared database.') +    group.add_argument('-a', dest='all', action='store_true', default=False, +        help='Reset all user databases.') +    parser.add_argument( +        '-e', dest='empty', action='store_true', required=False, default=False, +        help='Empty database (do not preserve minimal set of u1db documents).') +    parser.add_argument( +        '-y', dest='noconfirm', action='store_true', required=False, +        default=False, +        help='Do not ask for confirmation.') +    return parser.parse_args(), parser + + +if __name__ == '__main__': +    args, parser = _parse_args() +    if not (args.uuid or args.shared or args.all): +        parser.print_help() +        exit(1) + +    url = get_url(args.empty) +    get_confirmation(args.noconfirm, args.uuid, args.shared) +    if args.uuid: +        reset_db(url, "user-%s" % args.uuid, args.empty) +    elif args.shared: +        reset_db(url, "shared", args.empty) +    elif args.all: +        reset_all_dbs(url, args.empty) diff --git a/scripts/ddocs/update_design_docs.py b/scripts/ddocs/update_design_docs.py index e7b5a29c..2e2fa8f0 100644 --- a/scripts/ddocs/update_design_docs.py +++ b/scripts/ddocs/update_design_docs.py @@ -11,84 +11,83 @@ import re  import threading  import binascii - +from urlparse import urlparse  from getpass import getpass  from ConfigParser import ConfigParser -from couchdb.client import Server -from couchdb.http import Resource, Session -from datetime import datetime -from urlparse import urlparse +from couchdb.client import Server +from couchdb.http import Resource +from couchdb.http import Session +from couchdb.http import ResourceNotFound  from leap.soledad.common import ddocs -# parse command line for the log file name -logger_fname = "/tmp/update-design-docs_%s.log" % \ -               str(datetime.now()).replace(' ', '_') -parser = argparse.ArgumentParser() -parser.add_argument('--log', action='store', default=logger_fname, type=str, -                    required=False, help='the name of the log file', nargs=1) -args = parser.parse_args() +MAX_THREADS = 20 +DESIGN_DOCS = { +    '_design/docs': json.loads(binascii.a2b_base64(ddocs.docs)), +    '_design/syncs': json.loads(binascii.a2b_base64(ddocs.syncs)), +    '_design/transactions': json.loads( +        binascii.a2b_base64(ddocs.transactions)), +} -# configure the logger +# create a logger  logger = logging.getLogger(__name__) -logger.setLevel(logging.DEBUG) -print "Logging to %s." % args.log -logging.basicConfig( -    filename=args.log, -    format="%(asctime)-15s %(message)s") +LOG_FORMAT = '%(asctime)s %(message)s' +logging.basicConfig(format=LOG_FORMAT, level=logging.INFO) -# configure threads -max_threads = 20 -semaphore_pool = threading.BoundedSemaphore(value=max_threads) -threads = [] +def _parse_args(): +    parser = argparse.ArgumentParser() +    parser.add_argument('-u', dest='uuid', default=None, type=str, +                        help='the UUID of the user') +    parser.add_argument('-t', dest='threads', default=MAX_THREADS, type=int, +                        help='the number of parallel threads') +    return parser.parse_args() -# get couch url -cp = ConfigParser() -cp.read('/etc/leap/soledad-server.conf') -url = urlparse(cp.get('soledad-server', 'couch_url')) -# get admin password -netloc = re.sub('^.*@', '', url.netloc) -url = url._replace(netloc=netloc) -password = getpass("Admin password for %s: " % url.geturl()) -url = url._replace(netloc='admin:%s@%s' % (password, netloc)) +def _get_url(): +    # get couch url +    cp = ConfigParser() +    cp.read('/etc/leap/soledad-server.conf') +    url = urlparse(cp.get('soledad-server', 'couch_url')) +    # get admin password +    netloc = re.sub('^.*@', '', url.netloc) +    url = url._replace(netloc=netloc) +    password = getpass("Admin password for %s: " % url.geturl()) +    return url._replace(netloc='admin:%s@%s' % (password, netloc)) -resource = Resource(url.geturl(), Session(retry_delays=[1,2,4,8], timeout=10)) -server = Server(url=resource) -hidden_url = re.sub( -    'http://(.*):.*@', -    'http://\\1:xxxxx@', -    url.geturl()) +def _get_server(url): +    resource = Resource( +        url.geturl(), Session(retry_delays=[1, 2, 4, 8], timeout=10)) +    return Server(url=resource) -print """ -========== -ATTENTION! -========== -This script will modify Soledad's shared and user databases in: +def _confirm(url): +    hidden_url = re.sub( +        'http://(.*):.*@', +        'http://\\1:xxxxx@', +        url.geturl()) -  %s +    print """ +    ========== +    ATTENTION! +    ========== -This script does not make a backup of the couch db data, so make sure you -have a copy or you may loose data. -""" % hidden_url -confirm = raw_input("Proceed (type uppercase YES)? ") +    This script will modify Soledad's shared and user databases in: -if confirm != "YES": -    exit(1) +      %s -# convert design doc content +    This script does not make a backup of the couch db data, so make sure you +    have a copy or you may loose data. +    """ % hidden_url +    confirm = raw_input("Proceed (type uppercase YES)? ") + +    if confirm != "YES": +        exit(1) -design_docs = { -    '_design/docs': json.loads(binascii.a2b_base64(ddocs.docs)), -    '_design/syncs': json.loads(binascii.a2b_base64(ddocs.syncs)), -    '_design/transactions': json.loads(binascii.a2b_base64(ddocs.transactions)), -}  #  # Thread @@ -106,42 +105,66 @@ class DBWorkerThread(threading.Thread):      def run(self): -        logger.info("(%d/%d) Updating db %s." % (self._db_idx, self._db_len, -                    self._dbname)) +        logger.info( +            "(%d/%d) Updating db %s." +            % (self._db_idx, self._db_len, self._dbname)) -        for doc_id in design_docs: -            doc = self._cdb[doc_id] +        for doc_id in DESIGN_DOCS: +            try: +                doc = self._cdb[doc_id] +            except ResourceNotFound: +                doc = {'_id': doc_id}              for key in ['lists', 'views', 'updates']: -                if key in design_docs[doc_id]: -                    doc[key] = design_docs[doc_id][key] +                if key in DESIGN_DOCS[doc_id]: +                    doc[key] = DESIGN_DOCS[doc_id][key]              self._cdb.save(doc)          # release the semaphore          self._release_fun() -db_idx = 0 -db_len = len(server) -for dbname in server: - -    db_idx += 1 - -    if not (dbname.startswith('user-') or dbname == 'shared') \ -            or dbname == 'user-test-db': -        logger.info("(%d/%d) Skipping db %s." % (db_idx, db_len, dbname)) -        continue - - -    # get access to couch db -    cdb = Server(url.geturl())[dbname] - -    #--------------------------------------------------------------------- -    # Start DB worker thread -    #--------------------------------------------------------------------- -    semaphore_pool.acquire() -    thread = DBWorkerThread(server, dbname, db_idx, db_len, semaphore_pool.release) +def _launch_update_design_docs_thread( +        server, dbname, db_idx, db_len, semaphore_pool): +    semaphore_pool.acquire()  # wait for an available working slot +    thread = DBWorkerThread( +        server, dbname, db_idx, db_len, semaphore_pool.release)      thread.daemon = True      thread.start() -    threads.append(thread) - -map(lambda thread: thread.join(), threads) +    return thread + + +def _update_design_docs(args, server): + +    # find the actual databases to be updated +    dbs = [] +    if args.uuid: +        dbs.append('user-%s' % args.uuid) +    else: +        for dbname in server: +            if dbname.startswith('user-') or dbname == 'shared': +                dbs.append(dbname) +            else: +                logger.info("Skipping db %s." % dbname) + +    db_idx = 0 +    db_len = len(dbs) +    semaphore_pool = threading.BoundedSemaphore(value=args.threads) +    threads = [] + +    # launch the update +    for db in dbs: +        db_idx += 1 +        threads.append( +            _launch_update_design_docs_thread( +                server, db, db_idx, db_len, semaphore_pool)) + +    # wait for all threads to finish +    map(lambda thread: thread.join(), threads) + + +if __name__ == "__main__": +    args = _parse_args() +    url = _get_url() +    _confirm(url) +    server = _get_server(url) +    _update_design_docs(args, server) diff --git a/scripts/profiling/spam.py b/scripts/profiling/spam.py new file mode 100755 index 00000000..091a8c48 --- /dev/null +++ b/scripts/profiling/spam.py @@ -0,0 +1,123 @@ +#!/usr/bin/python + +# Send a lot of messages in parallel. + + +import string +import smtplib +import threading +import logging + +from argparse import ArgumentParser + + +SMTP_HOST = 'chipmonk.cdev.bitmask.net' +NUMBER_OF_THREADS = 20 + + +logger = logging.getLogger(__name__) +LOG_FORMAT = '%(asctime)s %(message)s' +logging.basicConfig(format=LOG_FORMAT, level=logging.INFO) + + +def _send_email(host, subject, to_addr, from_addr, body_text): +    """ +    Send an email +    """ +    body = string.join(( +            "From: %s" % from_addr, +            "To: %s" % to_addr, +            "Subject: %s" % subject, +            "", +            body_text +            ), "\r\n") +    server = smtplib.SMTP(host) +    server.sendmail(from_addr, [to_addr], body) +    server.quit() + + +def _parse_args(): +    parser = ArgumentParser() +    parser.add_argument( +        'target_address', +        help='The target email address to spam') +    parser.add_argument( +        'number_of_messages', type=int, +        help='The amount of messages email address to spam') +    parser.add_argument( +        '-s', dest='server', default=SMTP_HOST, +        help='The SMTP server to use') +    parser.add_argument( +        '-t', dest='threads', default=NUMBER_OF_THREADS, +        help='The maximum number of parallel threads to launch') +    return parser.parse_args() + + +class EmailSenderThread(threading.Thread): + +    def __init__(self, host, subject, to_addr, from_addr, body_text, +            finished_fun): +        threading.Thread.__init__(self) +        self._host = host +        self._subject = subject +        self._to_addr = to_addr +        self._from_addr = from_addr +        self._body_text = body_text +        self._finished_fun = finished_fun + +    def run(self): +        _send_email( +            self._host, self._subject, self._to_addr, self._from_addr, +            self._body_text) +        self._finished_fun() + + +def _launch_email_thread(host, subject, to_addr, from_addr, body_text, +        finished_fun): +    thread = EmailSenderThread( +        host, subject, to_addr, from_addr, body_text, finished_fun) +    thread.start() +    return thread + + +class FinishedThreads(object): + +    def __init__(self): +        self._finished = 0 +        self._lock = threading.Lock() + +    def signal(self): +        with self._lock: +            self._finished = self._finished + 1 +            logger.info('Number of messages sent: %d.' % self._finished) + + +def _send_messages(args): +    host = args.server +    subject = "Message from Soledad script" +    to_addr = args.target_address +    from_addr = args.target_address +    body_text = "Test message" + +    semaphore = threading.Semaphore(args.threads) +    threads = [] +    finished_threads = FinishedThreads() + +    def _finished_fun(): +        semaphore.release() +        finished_threads.signal() + +    for i in xrange(args.number_of_messages): +        semaphore.acquire() +        threads.append( +            _launch_email_thread( +               host, subject, to_addr, from_addr, body_text, +               _finished_fun)) + +    for t in threads: +        t.join() + + +if __name__ == "__main__": +    args = _parse_args() +    _send_messages(args) | 
