diff options
| -rw-r--r-- | client/src/leap/soledad/client/__init__.py | 2 | ||||
| -rw-r--r-- | common/changes/feature_5011-use-less-memory-when-putting-docs-on-couch | 1 | ||||
| -rw-r--r-- | common/src/leap/soledad/common/couch.py | 317 | ||||
| -rw-r--r-- | common/src/leap/soledad/common/ddocs/docs/updates/put.js | 64 | ||||
| -rw-r--r-- | common/src/leap/soledad/common/ddocs/docs/updates/resolve_doc.js | 39 | ||||
| -rw-r--r-- | common/src/leap/soledad/common/tests/test_couch.py | 24 | ||||
| -rw-r--r-- | scripts/migrate_dbs.py | 288 | ||||
| -rw-r--r-- | scripts/update_design_docs.py | 147 | 
8 files changed, 374 insertions, 508 deletions
| diff --git a/client/src/leap/soledad/client/__init__.py b/client/src/leap/soledad/client/__init__.py index b5ce7c32..a8d68c88 100644 --- a/client/src/leap/soledad/client/__init__.py +++ b/client/src/leap/soledad/client/__init__.py @@ -778,7 +778,7 @@ class Soledad(object):          ============================== WARNING ==============================          This method converts the document's contents to unicode in-place. This -        meanse that after calling C{put_doc(doc)}, the contents of the +        means that after calling C{put_doc(doc)}, the contents of the          document, i.e. C{doc.content}, might be different from before the          call.          ============================== WARNING ============================== diff --git a/common/changes/feature_5011-use-less-memory-when-putting-docs-on-couch b/common/changes/feature_5011-use-less-memory-when-putting-docs-on-couch new file mode 100644 index 00000000..7d3f6e4f --- /dev/null +++ b/common/changes/feature_5011-use-less-memory-when-putting-docs-on-couch @@ -0,0 +1 @@ +  o Use less memory when putting docs on couch (#5011). diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py index 40d64370..456d4fdf 100644 --- a/common/src/leap/soledad/common/couch.py +++ b/common/src/leap/soledad/common/couch.py @@ -24,10 +24,21 @@ import uuid  import logging  import binascii  import socket +import time +import sys + + +from StringIO import StringIO  from couchdb.client import Server -from couchdb.http import ResourceNotFound, Unauthorized, ServerError, Session +from couchdb.http import ( +    ResourceConflict, +    ResourceNotFound, +    Unauthorized, +    ServerError, +    Session, +)  from u1db import query_parser, vectorclock  from u1db.errors import (      DatabaseDoesNotExist, @@ -87,9 +98,9 @@ class CouchDocument(SoledadDocument):          SoledadDocument.__init__(self, doc_id, rev, json, has_conflicts)          self._couch_rev = None          self._conflicts = None -        self._modified_conflicts = False +        self._transactions = None -    def ensure_fetch_conflicts(self, get_conflicts_fun): +    def _ensure_fetch_conflicts(self, get_conflicts_fun):          """          Ensure conflict data has been fetched from the server. @@ -112,6 +123,16 @@ class CouchDocument(SoledadDocument):          """          return self._conflicts +    def set_conflicts(self, conflicts): +        """ +        Set the conflicted versions of the document. + +        :param conflicts: The conflicted versions of the document. +        :type conflicts: list +        """ +        self._conflicts = conflicts +        self.has_conflicts = len(self._conflicts) > 0 +      def add_conflict(self, doc):          """          Add a conflict to this document. @@ -120,8 +141,7 @@ class CouchDocument(SoledadDocument):          :type doc: CouchDocument          """          if self._conflicts is None: -            raise Exception("Run self.ensure_fetch_conflicts first!") -        self._modified_conflicts = True +            raise Exception("Run self._ensure_fetch_conflicts first!")          self._conflicts.append(doc)          self.has_conflicts = len(self._conflicts) > 0 @@ -133,25 +153,13 @@ class CouchDocument(SoledadDocument):          :type conflict_revs: [str]          """          if self._conflicts is None: -            raise Exception("Run self.ensure_fetch_conflicts first!") +            raise Exception("Run self._ensure_fetch_conflicts first!")          conflicts_len = len(self._conflicts)          self._conflicts = filter(              lambda doc: doc.rev not in conflict_revs,              self._conflicts) -        if len(self._conflicts) < conflicts_len: -            self._modified_conflicts = True          self.has_conflicts = len(self._conflicts) > 0 -    def modified_conflicts(self): -        """ -        Return whether this document's conflicts have been modified. - -        :return: Whether this document's conflicts have been modified. -        :rtype: bool -        """ -        return self._conflicts is not None and \ -            self._modified_conflicts is True -      def _get_couch_rev(self):          return self._couch_rev @@ -160,6 +168,14 @@ class CouchDocument(SoledadDocument):      couch_rev = property(_get_couch_rev, _set_couch_rev) +    def _get_transactions(self): +        return self._transactions + +    def _set_transactions(self, rev): +        self._transactions = rev + +    transactions = property(_get_transactions, _set_transactions) +  # monkey-patch the u1db http app to use CouchDocument  http_app.Document = CouchDocument @@ -225,6 +241,94 @@ def raise_server_error(exc, ddoc_path):      raise errors.DesignDocUnknownError(path) +class MultipartWriter(object): +    """ +    A multipart writer adapted from python-couchdb's one so we can PUT +    documents using couch's multipart PUT. + +    This stripped down version does not allow for nested structures, and +    contains only the essential things we need to PUT SoledadDocuments to the +    couch backend. +    """ + +    CRLF = '\r\n' + +    def __init__(self, fileobj, headers=None, boundary=None): +        """ +        Initialize the multipart writer. +        """ +        self.fileobj = fileobj +        if boundary is None: +            boundary = self._make_boundary() +        self._boundary = boundary +        self._build_headers('related', headers) + +    def add(self, mimetype, content, headers={}): +        """ +        Add a part to the multipart stream. +        """ +        self.fileobj.write('--') +        self.fileobj.write(self._boundary) +        self.fileobj.write(self.CRLF) +        headers['Content-Type'] = mimetype +        self._write_headers(headers) +        if content: +            # XXX: throw an exception if a boundary appears in the content?? +            self.fileobj.write(content) +            self.fileobj.write(self.CRLF) + +    def close(self): +        """ +        Close the multipart stream. +        """ +        self.fileobj.write('--') +        self.fileobj.write(self._boundary) +        # be careful not to have anything after '--', otherwise old couch +        # versions (including bigcouch) will fail. +        self.fileobj.write('--') + +    def _make_boundary(self): +        """ +        Create a boundary to discern multi parts. +        """ +        try: +            from uuid import uuid4 +            return '==' + uuid4().hex + '==' +        except ImportError: +            from random import randrange +            token = randrange(sys.maxint) +            format = '%%0%dd' % len(repr(sys.maxint - 1)) +            return '===============' + (format % token) + '==' + +    def _write_headers(self, headers): +        """ +        Write a part header in the buffer stream. +        """ +        if headers: +            for name in sorted(headers.keys()): +                value = headers[name] +                self.fileobj.write(name) +                self.fileobj.write(': ') +                self.fileobj.write(value) +                self.fileobj.write(self.CRLF) +        self.fileobj.write(self.CRLF) + +    def _build_headers(self, subtype, headers): +        """ +        Build the main headers of the multipart stream. + +        This is here so we can send headers separete from content using +        python-couchdb API. +        """ +        self.headers = {} +        self.headers['Content-Type'] = 'multipart/%s; boundary="%s"' % \ +                                       (subtype, self._boundary) +        if headers: +            for name in sorted(headers.keys()): +                value = headers[name] +                self.headers[name] = value + +  class CouchDatabase(CommonBackend):      """      A U1DB implementation that uses CouchDB as its persistence layer. @@ -564,8 +668,15 @@ class CouchDatabase(CommonBackend):                  and '_attachments' in result \                  and 'u1db_conflicts' in result['_attachments']:              doc.has_conflicts = True +            doc.set_conflicts( +                self._build_conflicts( +                    doc.doc_id, +                    json.loads(binascii.a2b_base64( +                        result['_attachments']['u1db_conflicts']['data']))))          # store couch revision          doc.couch_rev = result['_rev'] +        # store transactions +        doc.transactions = result['u1db_transactions']          return doc      def get_doc(self, doc_id, include_deleted=False): @@ -616,6 +727,10 @@ class CouchDatabase(CommonBackend):          """          Put the document in the Couch backend database. +        Note that C{old_doc} must have been fetched with the parameter +        C{check_for_conflicts} equal to True, so we can properly update the +        new document using the conflict information from the old one. +          :param old_doc: The old document version.          :type old_doc: CouchDocument          :param doc: The document to be put. @@ -637,45 +752,61 @@ class CouchDatabase(CommonBackend):                                               design document for an yet                                               unknown reason.          """ -        trans_id = self._allocate_transaction_id() -        # encode content -        content = doc.get_json() -        if content is not None: -            content = binascii.b2a_base64(content)[:-1]  # exclude trailing \n -        # encode conflicts -        conflicts = None -        update_conflicts = doc.modified_conflicts() -        if update_conflicts is True: -            if doc.has_conflicts: -                conflicts = binascii.b2a_base64( -                    json.dumps( -                        map(lambda cdoc: (cdoc.rev, cdoc.content), -                            doc.get_conflicts())) -            )[:-1]  # exclude \n -        # perform the request -        ddoc_path = ['_design', 'docs', '_update', 'put', doc.doc_id] -        resource = self._database.resource(*ddoc_path) +        attachments = {}  # we save content and conflicts as attachments +        parts = []  # and we put it using couch's multipart PUT +        # save content as attachment +        if doc.is_tombstone() is False: +            content = doc.get_json() +            attachments['u1db_content'] = { +                'follows': True, +                'content_type': 'application/octet-stream', +                'length': len(content), +            } +            parts.append(content) +        # save conflicts as attachment +        if doc.has_conflicts is True: +            conflicts = json.dumps( +                map(lambda cdoc: (cdoc.rev, cdoc.content), +                    doc.get_conflicts())) +            attachments['u1db_conflicts'] = { +                'follows': True, +                'content_type': 'application/octet-stream', +                'length': len(conflicts), +            } +            parts.append(conflicts) +        # store old transactions, if any +        transactions = old_doc.transactions[:] if old_doc is not None else [] +        # create a new transaction id and timestamp it so the transaction log +        # is consistent when querying the database. +        transactions.append( +            # here we store milliseconds to keep consistent with javascript +            # Date.prototype.getTime() which was used before inside a couchdb +            # update handler. +            (int(time.time() * 1000), +            self._allocate_transaction_id())) +        # build the couch document +        couch_doc = { +            '_id': doc.doc_id, +            'u1db_rev': doc.rev, +            'u1db_transactions': transactions, +            '_attachments': attachments, +        } +        # if we are updating a doc we have to add the couch doc revision +        if old_doc is not None: +            couch_doc['_rev'] = old_doc.couch_rev +        # prepare the multipart PUT +        buf = StringIO() +        envelope = MultipartWriter(buf) +        envelope.add('application/json', json.dumps(couch_doc)) +        for part in parts: +            envelope.add('application/octet-stream', part) +        envelope.close() +        # try to save and fail if there's a revision conflict          try: -            response = resource.put_json( -                body={ -                    'couch_rev': old_doc.couch_rev -                        if old_doc is not None else None, -                    'u1db_rev': doc.rev, -                    'content': content, -                    'trans_id': trans_id, -                    'conflicts': conflicts, -                    'update_conflicts': update_conflicts, -                }, -                headers={'content-type': 'application/json'}) -            # the document might have been updated in between, so we check for -            # the return message -            msg = response[2].read() -            if msg == 'ok': -                return -            elif msg == 'revision conflict': -                raise RevisionConflict() -        except ResourceNotFound as e: -            raise_missing_design_doc_error(e, ddoc_path) +            self._database.resource.put_json( +                doc.doc_id, body=buf.getvalue(), headers=envelope.headers) +        except ResourceConflict: +            raise RevisionConflict()      def put_doc(self, doc):          """ @@ -810,6 +941,25 @@ class CouchDatabase(CommonBackend):          self._put_doc(old_doc, doc)          return new_rev +    def _build_conflicts(self, doc_id, attached_conflicts): +        """ +        Build the conflicted documents list from the conflicts attachment +        fetched from a couch document. + +        :param attached_conflicts: The document's conflicts as fetched from a +                                   couch document attachment. +        :type attached_conflicts: dict +        """ +        conflicts = [] +        for doc_rev, content in attached_conflicts: +            doc = self._factory(doc_id, doc_rev) +            if content is None: +                doc.make_tombstone() +            else: +                doc.content = content +            conflicts.append(doc) +        return conflicts +      def _get_conflicts(self, doc_id, couch_rev=None):          """          Get the conflicted versions of a document. @@ -830,16 +980,8 @@ class CouchDatabase(CommonBackend):          resource = self._database.resource(doc_id, 'u1db_conflicts')          try:              response = resource.get_json(**params) -            conflicts = [] -            # build the conflicted versions -            for doc_rev, content in json.loads(response[2].read()): -                doc = self._factory(doc_id, doc_rev) -                if content is None: -                    doc.make_tombstone() -                else: -                    doc.content = content -                conflicts.append(doc) -            return conflicts +            return self._build_conflicts( +                doc_id, json.loads(response[2].read()))          except ResourceNotFound:              return [] @@ -970,7 +1112,7 @@ class CouchDatabase(CommonBackend):                             serialized string.          :type my_content: str          """ -        doc.ensure_fetch_conflicts(self._get_conflicts) +        doc._ensure_fetch_conflicts(self._get_conflicts)          doc.add_conflict(              self._factory(doc_id=doc.doc_id, rev=my_doc_rev,                            json=my_content)) @@ -988,7 +1130,7 @@ class CouchDatabase(CommonBackend):          :param conflict_revs: A list of the revisions to be deleted.          :param conflict_revs: [str]          """ -        doc.ensure_fetch_conflicts(self._get_conflicts) +        doc._ensure_fetch_conflicts(self._get_conflicts)          doc.delete_conflicts(conflict_revs)      def _prune_conflicts(self, doc, doc_vcr): @@ -1067,33 +1209,24 @@ class CouchDatabase(CommonBackend):                                             conflicted_doc_revs)          superseded_revs = set(conflicted_doc_revs)          doc.rev = new_rev +        # this backend stores conflicts as properties of the documents, so we +        # have to copy these conflicts over to the document being updated.          if cur_doc.rev in superseded_revs: +            # the newer doc version will supersede the one in the database, so +            # we copy conflicts before updating the backend. +            doc.set_conflicts(cur_doc.get_conflicts())  # copy conflicts over.              self._delete_conflicts(doc, superseded_revs)              self._put_doc(cur_doc, doc)          else: -            self._add_conflict(doc, new_rev, doc.get_json()) -            self._delete_conflicts(doc, superseded_revs) -            # perform request to resolve document in server -            ddoc_path = [ -                '_design', 'docs', '_update', 'resolve_doc', doc.doc_id -            ] -            resource = self._database.resource(*ddoc_path) -            conflicts = None -            if doc.has_conflicts: -                conflicts = binascii.b2a_base64( -                    json.dumps( -                        map(lambda cdoc: (cdoc.rev, cdoc.content), -                            doc.get_conflicts())) -                )[:-1]  # exclude \n -            try: -                response = resource.put_json( -                    body={ -                        'couch_rev': cur_doc.couch_rev, -                        'conflicts': conflicts, -                    }, -                    headers={'content-type': 'application/json'}) -            except ResourceNotFound as e: -                raise_missing_design_doc_error(e, ddoc_path) +            # the newer doc version does not supersede the one in the +            # database, so we will add a conflict to the database and copy +            # those over to the document the user has in her hands. +            self._add_conflict(cur_doc, new_rev, doc.get_json()) +            self._delete_conflicts(cur_doc, superseded_revs) +            self._put_doc(cur_doc, cur_doc)  # just update conflicts +            # backend has been updated with current conflicts, now copy them +            # to the current document. +            doc.set_conflicts(cur_doc.get_conflicts())      def _put_doc_if_newer(self, doc, save_conflict, replica_uid, replica_gen,                            replica_trans_id=''): @@ -1150,7 +1283,7 @@ class CouchDatabase(CommonBackend):          if cur_doc is not None:              doc.couch_rev = cur_doc.couch_rev          # fetch conflicts because we will eventually manipulate them -        doc.ensure_fetch_conflicts(self._get_conflicts) +        doc._ensure_fetch_conflicts(self._get_conflicts)          # from now on, it works just like u1db sqlite backend          doc_vcr = vectorclock.VectorClockRev(doc.rev)          if cur_doc is None: diff --git a/common/src/leap/soledad/common/ddocs/docs/updates/put.js b/common/src/leap/soledad/common/ddocs/docs/updates/put.js deleted file mode 100644 index 5a4647de..00000000 --- a/common/src/leap/soledad/common/ddocs/docs/updates/put.js +++ /dev/null @@ -1,64 +0,0 @@ -function(doc, req){ -    /* we expect to receive the following in `req.body`: -     * { -     *     'couch_rev': '<couch_rev>', -     *     'u1db_rev': '<u1db_rev>', -     *     'content': '<base64 encoded content>', -     *     'trans_id': '<reansaction_id>' -     *     'conflicts': '<base64 encoded conflicts>', -     *     'update_conflicts': <boolean> -     * } -     */ -    var body = JSON.parse(req.body); - -    // create a new document document -    if (!doc) { -        doc = {} -        doc['_id'] = req['id']; -    } -    // or fail if couch revisions do not match -    else if (doc['_rev'] != body['couch_rev']) { -        // of fail if revisions do not match -        return [null, 'revision conflict'] -    } - -    // store u1db rev -    doc.u1db_rev = body['u1db_rev']; - -    // save content as attachment -    if (body['content'] != null) { -        // save u1db content as attachment -        if (!doc._attachments) -            doc._attachments = {}; -        doc._attachments.u1db_content =  { -            content_type: "application/octet-stream", -            data: body['content']  // should be base64 encoded -        }; -    } -    // or delete the attachment if document is tombstone -    else if (doc._attachments && -             doc._attachments.u1db_content) -        delete doc._attachments.u1db_content; - -    // store the transaction id -    if (!doc.u1db_transactions) -        doc.u1db_transactions = []; -    var d = new Date(); -    doc.u1db_transactions.push([d.getTime(), body['trans_id']]); - -    // save conflicts as attachment if they were sent -    if (body['update_conflicts']) -        if (body['conflicts'] != null) { -            if (!doc._attachments) -                doc._attachments = {}; -            doc._attachments.u1db_conflicts = { -                content_type: "application/octet-stream", -                data: body['conflicts']  // should be base64 encoded -            } -        } else { -            if(doc._attachments && doc._attachments.u1db_conflicts) -                delete doc._attachments.u1db_conflicts -        } - -    return [doc, 'ok']; -} diff --git a/common/src/leap/soledad/common/ddocs/docs/updates/resolve_doc.js b/common/src/leap/soledad/common/ddocs/docs/updates/resolve_doc.js deleted file mode 100644 index 7ba66cf8..00000000 --- a/common/src/leap/soledad/common/ddocs/docs/updates/resolve_doc.js +++ /dev/null @@ -1,39 +0,0 @@ -function(doc, req){ -    /* we expect to receive the following in `req.body`: -     * { -     *     'couch_rev': '<couch_rev>', -     *     'conflicts': '<base64 encoded conflicts>', -     * } -     */ -    var body = JSON.parse(req.body); - -    // fail if no document was given -    if (!doc) { -        return [null, 'document does not exist'] -    }  - -    // fail if couch revisions do not match -    if (body['couch_rev'] != null -        && doc['_rev'] != body['couch_rev']) { -        return [null, 'revision conflict'] -    } - -    // fail if conflicts were not sent -    if (body['conflicts'] == null) -        return [null, 'missing conflicts'] - -    // save conflicts as attachment if they were sent -    if (body['conflicts'] != null) { -        if (!doc._attachments) -            doc._attachments = {}; -        doc._attachments.u1db_conflicts = { -            content_type: "application/octet-stream", -            data: body['conflicts']  // should be base64 encoded -        } -    } -    // or delete attachment if there are no conflicts -    else if (doc._attachments && doc._attachments.u1db_conflicts) -        delete doc._attachments.u1db_conflicts; - -    return [doc, 'ok']; -} diff --git a/common/src/leap/soledad/common/tests/test_couch.py b/common/src/leap/soledad/common/tests/test_couch.py index dc0ea906..86bb4b93 100644 --- a/common/src/leap/soledad/common/tests/test_couch.py +++ b/common/src/leap/soledad/common/tests/test_couch.py @@ -495,10 +495,6 @@ class CouchDatabaseExceptionsTests(CouchDBTestCase):          self.assertRaises(              errors.MissingDesignDocError,              self.db._get_transaction_log) -        # create_doc() -        self.assertRaises( -            errors.MissingDesignDocError, -            self.db.create_doc, {})          # whats_changed()          self.assertRaises(              errors.MissingDesignDocError, @@ -507,14 +503,6 @@ class CouchDatabaseExceptionsTests(CouchDBTestCase):          self.assertRaises(              errors.MissingDesignDocError,              self.db._do_set_replica_gen_and_trans_id, 1, 2, 3) -        # fake a conflict so we can test resolve_doc() -        first_rev = self.db._allocate_doc_rev(None) -        doc = couch.CouchDocument( -            doc_id='mydoc', rev=self.db._allocate_doc_rev(first_rev)) -        self.db._get_doc = Mock(return_value=doc) -        self.assertRaises( -            errors.MissingDesignDocError, -            self.db.resolve_doc, doc, [first_rev])      def test_missing_design_doc_functions_raises(self):          """ @@ -645,10 +633,6 @@ class CouchDatabaseExceptionsTests(CouchDBTestCase):          self.assertRaises(              errors.MissingDesignDocDeletedError,              self.db._get_transaction_log) -        # create_doc() -        self.assertRaises( -            errors.MissingDesignDocDeletedError, -            self.db.create_doc, {})          # whats_changed()          self.assertRaises(              errors.MissingDesignDocDeletedError, @@ -657,14 +641,6 @@ class CouchDatabaseExceptionsTests(CouchDBTestCase):          self.assertRaises(              errors.MissingDesignDocDeletedError,              self.db._do_set_replica_gen_and_trans_id, 1, 2, 3) -        # fake a conflict so we can test resolve_doc() -        first_rev = self.db._allocate_doc_rev(None) -        doc = couch.CouchDocument( -            doc_id='mydoc', rev=self.db._allocate_doc_rev(first_rev)) -        self.db._get_doc = Mock(return_value=doc) -        self.assertRaises( -            errors.MissingDesignDocDeletedError, -            self.db.resolve_doc, doc, [first_rev])  load_tests = tests.load_with_scenarios diff --git a/scripts/migrate_dbs.py b/scripts/migrate_dbs.py deleted file mode 100644 index f1c20d87..00000000 --- a/scripts/migrate_dbs.py +++ /dev/null @@ -1,288 +0,0 @@ -#!/usr/bin/python - -import sys -import json -import logging -import argparse -import re -import threading -from urlparse import urlparse -from ConfigParser import ConfigParser -from couchdb.client import Server -from couchdb.http import ResourceNotFound, Resource, Session -from datetime import datetime - -from leap.soledad.common.couch import CouchDatabase - - -# parse command line for the log file name -logger_fname = "/tmp/u1db-couch-db-migration_%s.log" % \ -               str(datetime.now()).replace(' ', '_') -parser = argparse.ArgumentParser() -parser.add_argument('--log', action='store', default=logger_fname, type=str, -                    required=False, help='the name of the log file', nargs=1) -args = parser.parse_args() - - -# configure the logger -logger = logging.getLogger(__name__) -logger.setLevel(logging.DEBUG) -print "Logging to %s." % args.log -logging.basicConfig( -    filename=args.log, -    format="%(asctime)-15s %(message)s") - - -# configure threads -max_threads = 20 -semaphore_pool = threading.BoundedSemaphore(value=max_threads) - -# get couch url -cp = ConfigParser() -cp.read('/etc/leap/soledad-server.conf') -url = cp.get('soledad-server', 'couch_url') - -resource = Resource(url, Session(retry_delays=[1,2,4,8], timeout=10)) -server = Server(url=resource) - -hidden_url = re.sub( -    'http://(.*):.*@', -    'http://\\1:xxxxx@', -    url) - -print """ -========== -ATTENTION! -========== - -This script will modify Soledad's shared and user databases in: - -  %s - -This script does not make a backup of the couch db data, so make sure youj -have a copy or you may loose data. -""" % hidden_url -confirm = raw_input("Proceed (type uppercase YES)? ") - -if confirm != "YES": -    exit(1) - - -# -# Thread -# - -class DocWorkerThread(threading.Thread): - -    def __init__(self, dbname, doc_id, db_idx, db_len, doc_idx, doc_len, -                 transaction_log, conflict_log, release_fun): -        threading.Thread.__init__(self) -        resource = Resource(url, Session(retry_delays=[1,2,4,8], timeout=10)) -        server = Server(url=resource) -        self._dbname = dbname -        self._cdb = server[self._dbname] -        self._doc_id = doc_id -        self._db_idx = db_idx -        self._db_len = db_len -        self._doc_idx = doc_idx -        self._doc_len = doc_len -        self._transaction_log = transaction_log -        self._conflict_log = conflict_log -        self._release_fun = release_fun - -    def run(self): - -        old_doc = self._cdb[self._doc_id] - -        # skip non u1db docs -        if 'u1db_rev' not in old_doc: -            logger.debug('(%d/%d) (%d/%d) Skipping %s/%s).' % -                         (self._db_idx, self._db_len, self._doc_idx, -                          self._doc_len, self._dbname, self._doc_id)) -            self._release_fun() -            return -        else: -            logger.debug('(%d/%d) (%d/%d) Processing %s/%s ...' % -                         (self._db_idx, self._db_len, self._doc_idx, -                          self._doc_len, self._dbname, self._doc_id)) - -        doc = { -            '_id': self._doc_id, -            '_rev': old_doc['_rev'], -            'u1db_rev': old_doc['u1db_rev'] -        } -        attachments = [] - -        # add transactions -        doc['u1db_transactions'] = map( -            lambda (gen, doc_id, trans_id): (gen, trans_id), -            filter( -                lambda (gen, doc_id, trans_id): doc_id == doc['_id'], -                self._transaction_log)) -        if len(doc['u1db_transactions']) == 0: -            del doc['u1db_transactions'] - -        # add conflicts -        if doc['_id'] in self._conflict_log: -            attachments.append([ -                conflict_log[doc['_id']], -                'u1db_conflicts', -                "application/octet-stream"]) - -        # move document's content to 'u1db_content' attachment -        content = self._cdb.get_attachment(doc, 'u1db_json') -        if content is not None: -            attachments.append([ -                content, -                'u1db_content', -                "application/octet-stream"]) -        #self._cdb.delete_attachment(doc, 'u1db_json') - -        # save modified doc -        self._cdb.save(doc) - -        # save all doc attachments -        for content, att_name, content_type in attachments: -            self._cdb.put_attachment( -                doc, -                content, -                filename=att_name, -                content_type=content_type) - -        # release the semaphore -        self._release_fun() - - -db_idx = 0 -db_len = len(server) -for dbname in server: - -    db_idx += 1 - -    if not (dbname.startswith('user-') or dbname == 'shared') \ -            or dbname == 'user-test-db': -        logger.info("(%d/%d) Skipping db %s." % (db_idx, db_len, dbname)) -        continue - -    logger.info("(%d/%d) Migrating db %s." % (db_idx, db_len, dbname)) - -    # get access to couch db -    cdb = Server(url)[dbname] - -    # get access to soledad db -    sdb = CouchDatabase(url, dbname) - -    # Migration table -    # --------------- -    # -    # * Metadata that was previously stored in special documents migrate to -    #   inside documents, to allow for atomic doc-and-metadata updates. -    # * Doc content attachment name changes. -    # * Indexes are removed, to be implemented in the future possibly as -    #   design docs view functions. -    # -    # +-----------------+-------------------------+-------------------------+ -    # | Data            | old storage             | new storage             | -    # |-----------------+-------------------------+-------------------------+ -    # | doc content     | <doc_id>/u1db_json      | <doc_id>/u1db_content   | -    # | doc conflicts   | u1db/_conflicts         | <doc_id>/u1db_conflicts | -    # | transaction log | u1db/_transaction_log   | doc.u1db_transactions   | -    # | sync log        | u1db/_other_generations | u1db_sync_log           | -    # | indexes         | u1db/_indexes           | not implemented         | -    # | replica uid     | u1db/_replica_uid       | u1db_config             | -    # +-----------------+-------------------------+-------------------------+ - -    def get_att_content(db, doc_id, att_name): -        try: -            return json.loads( -                db.get_attachment( -                    doc_id, att_name).read())['content'] -        except: -            import ipdb -            ipdb.set_trace() - -    # only migrate databases that have the 'u1db/_replica_uid' document -    try: -        metadoc = cdb.get('u1db/_replica_uid') -        replica_uid = get_att_content(cdb, 'u1db/_replica_uid', 'u1db_json') -    except ResourceNotFound: -        continue - -    #--------------------------------------------------------------------- -    # Step 1: Set replica uid. -    #--------------------------------------------------------------------- -    sdb._set_replica_uid(replica_uid) - -    #--------------------------------------------------------------------- -    # Step 2: Obtain metadata. -    #--------------------------------------------------------------------- - -    # obtain the transaction log: [['<doc_id>', '<trans_id>'], ...] -    transaction_log = get_att_content( -        cdb, 'u1db/_transaction_log', 'u1db_json') -    new_transaction_log = [] -    gen = 1 -    for (doc_id, trans_id) in transaction_log: -        new_transaction_log.append((gen, doc_id, trans_id)) -        gen += 1 -    transaction_log = new_transaction_log - -    # obtain the conflict log: {'<doc_id>': ['<rev>', '<content>'], ...} -    conflict_log = get_att_content(cdb, 'u1db/_conflicts', 'u1db_json') - -    # obtain the sync log: -    # {'<replica_uid>': ['<gen>', '<transaction_id>'], ...} -    other_generations = get_att_content( -        cdb, 'u1db/_other_generations', 'u1db_json') - -    #--------------------------------------------------------------------- -    # Step 3: Iterate over all documents in database. -    #--------------------------------------------------------------------- -    doc_len = len(cdb) -    logger.info("(%d, %d) Found %d documents." % (db_idx, db_len, doc_len)) -    doc_idx = 0 -    threads = [] -    for doc_id in cdb: -        doc_idx = doc_idx + 1 - -        semaphore_pool.acquire() -        thread = DocWorkerThread(dbname, doc_id, db_idx, db_len, -                                 doc_idx, doc_len, transaction_log, -                                 conflict_log, semaphore_pool.release) -        thread.daemon = True -        thread.start() -        threads.append(thread) - -    map(lambda thread: thread.join(), threads) - -    #--------------------------------------------------------------------- -    # Step 4: Move sync log. -    #--------------------------------------------------------------------- - -    # move sync log -    sync_doc = { -        '_id': 'u1db_sync_log', -        'syncs': [] -    } - -    for replica_uid in other_generations: -        gen, transaction_id = other_generations[replica_uid] -        sync_doc['syncs'].append([replica_uid, gen, transaction_id]) -    cdb.save(sync_doc) - -    #--------------------------------------------------------------------- -    # Step 5: Delete old meta documents. -    #--------------------------------------------------------------------- - -    # remove unused docs -    for doc_id in ['_transaction_log', '_conflicts', '_other_generations', -            '_indexes', '_replica_uid']: -        for prefix in ['u1db/', 'u1db%2F']: -            try: -                doc = cdb['%s%s' % (prefix, doc_id)] -                logger.info( -                    "(%d/%d) Deleting %s/%s/%s." % -                    (db_idx, db_len, dbname, 'u1db', doc_id)) -                cdb.delete(doc) -            except ResourceNotFound: -                pass diff --git a/scripts/update_design_docs.py b/scripts/update_design_docs.py new file mode 100644 index 00000000..e7b5a29c --- /dev/null +++ b/scripts/update_design_docs.py @@ -0,0 +1,147 @@ +#!/usr/bin/python + +# This script updates Soledad's design documents in the session database and +# all user databases with contents from the installed leap.soledad.common +# package. + +import json +import logging +import argparse +import re +import threading +import binascii + + +from getpass import getpass +from ConfigParser import ConfigParser +from couchdb.client import Server +from couchdb.http import Resource, Session +from datetime import datetime +from urlparse import urlparse + + +from leap.soledad.common import ddocs + + +# parse command line for the log file name +logger_fname = "/tmp/update-design-docs_%s.log" % \ +               str(datetime.now()).replace(' ', '_') +parser = argparse.ArgumentParser() +parser.add_argument('--log', action='store', default=logger_fname, type=str, +                    required=False, help='the name of the log file', nargs=1) +args = parser.parse_args() + + +# configure the logger +logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) +print "Logging to %s." % args.log +logging.basicConfig( +    filename=args.log, +    format="%(asctime)-15s %(message)s") + + +# configure threads +max_threads = 20 +semaphore_pool = threading.BoundedSemaphore(value=max_threads) +threads = [] + +# get couch url +cp = ConfigParser() +cp.read('/etc/leap/soledad-server.conf') +url = urlparse(cp.get('soledad-server', 'couch_url')) + +# get admin password +netloc = re.sub('^.*@', '', url.netloc) +url = url._replace(netloc=netloc) +password = getpass("Admin password for %s: " % url.geturl()) +url = url._replace(netloc='admin:%s@%s' % (password, netloc)) + +resource = Resource(url.geturl(), Session(retry_delays=[1,2,4,8], timeout=10)) +server = Server(url=resource) + +hidden_url = re.sub( +    'http://(.*):.*@', +    'http://\\1:xxxxx@', +    url.geturl()) + +print """ +========== +ATTENTION! +========== + +This script will modify Soledad's shared and user databases in: + +  %s + +This script does not make a backup of the couch db data, so make sure you +have a copy or you may loose data. +""" % hidden_url +confirm = raw_input("Proceed (type uppercase YES)? ") + +if confirm != "YES": +    exit(1) + +# convert design doc content + +design_docs = { +    '_design/docs': json.loads(binascii.a2b_base64(ddocs.docs)), +    '_design/syncs': json.loads(binascii.a2b_base64(ddocs.syncs)), +    '_design/transactions': json.loads(binascii.a2b_base64(ddocs.transactions)), +} + +# +# Thread +# + +class DBWorkerThread(threading.Thread): + +    def __init__(self, server, dbname, db_idx, db_len, release_fun): +        threading.Thread.__init__(self) +        self._dbname = dbname +        self._cdb = server[self._dbname] +        self._db_idx = db_idx +        self._db_len = db_len +        self._release_fun = release_fun + +    def run(self): + +        logger.info("(%d/%d) Updating db %s." % (self._db_idx, self._db_len, +                    self._dbname)) + +        for doc_id in design_docs: +            doc = self._cdb[doc_id] +            for key in ['lists', 'views', 'updates']: +                if key in design_docs[doc_id]: +                    doc[key] = design_docs[doc_id][key] +            self._cdb.save(doc) + +        # release the semaphore +        self._release_fun() + + +db_idx = 0 +db_len = len(server) +for dbname in server: + +    db_idx += 1 + +    if not (dbname.startswith('user-') or dbname == 'shared') \ +            or dbname == 'user-test-db': +        logger.info("(%d/%d) Skipping db %s." % (db_idx, db_len, dbname)) +        continue + + +    # get access to couch db +    cdb = Server(url.geturl())[dbname] + +    #--------------------------------------------------------------------- +    # Start DB worker thread +    #--------------------------------------------------------------------- +    semaphore_pool.acquire() +    thread = DBWorkerThread(server, dbname, db_idx, db_len, semaphore_pool.release) +    thread.daemon = True +    thread.start() +    threads.append(thread) + +map(lambda thread: thread.join(), threads) | 
