diff options
-rw-r--r-- | client/src/leap/soledad/client/__init__.py | 2 | ||||
-rw-r--r-- | common/changes/feature_5011-use-less-memory-when-putting-docs-on-couch | 1 | ||||
-rw-r--r-- | common/src/leap/soledad/common/couch.py | 317 | ||||
-rw-r--r-- | common/src/leap/soledad/common/ddocs/docs/updates/put.js | 64 | ||||
-rw-r--r-- | common/src/leap/soledad/common/ddocs/docs/updates/resolve_doc.js | 39 | ||||
-rw-r--r-- | common/src/leap/soledad/common/tests/test_couch.py | 24 | ||||
-rw-r--r-- | scripts/migrate_dbs.py | 288 | ||||
-rw-r--r-- | scripts/update_design_docs.py | 147 |
8 files changed, 374 insertions, 508 deletions
diff --git a/client/src/leap/soledad/client/__init__.py b/client/src/leap/soledad/client/__init__.py index b5ce7c32..a8d68c88 100644 --- a/client/src/leap/soledad/client/__init__.py +++ b/client/src/leap/soledad/client/__init__.py @@ -778,7 +778,7 @@ class Soledad(object): ============================== WARNING ============================== This method converts the document's contents to unicode in-place. This - meanse that after calling C{put_doc(doc)}, the contents of the + means that after calling C{put_doc(doc)}, the contents of the document, i.e. C{doc.content}, might be different from before the call. ============================== WARNING ============================== diff --git a/common/changes/feature_5011-use-less-memory-when-putting-docs-on-couch b/common/changes/feature_5011-use-less-memory-when-putting-docs-on-couch new file mode 100644 index 00000000..7d3f6e4f --- /dev/null +++ b/common/changes/feature_5011-use-less-memory-when-putting-docs-on-couch @@ -0,0 +1 @@ + o Use less memory when putting docs on couch (#5011). diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py index 40d64370..456d4fdf 100644 --- a/common/src/leap/soledad/common/couch.py +++ b/common/src/leap/soledad/common/couch.py @@ -24,10 +24,21 @@ import uuid import logging import binascii import socket +import time +import sys + + +from StringIO import StringIO from couchdb.client import Server -from couchdb.http import ResourceNotFound, Unauthorized, ServerError, Session +from couchdb.http import ( + ResourceConflict, + ResourceNotFound, + Unauthorized, + ServerError, + Session, +) from u1db import query_parser, vectorclock from u1db.errors import ( DatabaseDoesNotExist, @@ -87,9 +98,9 @@ class CouchDocument(SoledadDocument): SoledadDocument.__init__(self, doc_id, rev, json, has_conflicts) self._couch_rev = None self._conflicts = None - self._modified_conflicts = False + self._transactions = None - def ensure_fetch_conflicts(self, get_conflicts_fun): + def _ensure_fetch_conflicts(self, get_conflicts_fun): """ Ensure conflict data has been fetched from the server. @@ -112,6 +123,16 @@ class CouchDocument(SoledadDocument): """ return self._conflicts + def set_conflicts(self, conflicts): + """ + Set the conflicted versions of the document. + + :param conflicts: The conflicted versions of the document. + :type conflicts: list + """ + self._conflicts = conflicts + self.has_conflicts = len(self._conflicts) > 0 + def add_conflict(self, doc): """ Add a conflict to this document. @@ -120,8 +141,7 @@ class CouchDocument(SoledadDocument): :type doc: CouchDocument """ if self._conflicts is None: - raise Exception("Run self.ensure_fetch_conflicts first!") - self._modified_conflicts = True + raise Exception("Run self._ensure_fetch_conflicts first!") self._conflicts.append(doc) self.has_conflicts = len(self._conflicts) > 0 @@ -133,25 +153,13 @@ class CouchDocument(SoledadDocument): :type conflict_revs: [str] """ if self._conflicts is None: - raise Exception("Run self.ensure_fetch_conflicts first!") + raise Exception("Run self._ensure_fetch_conflicts first!") conflicts_len = len(self._conflicts) self._conflicts = filter( lambda doc: doc.rev not in conflict_revs, self._conflicts) - if len(self._conflicts) < conflicts_len: - self._modified_conflicts = True self.has_conflicts = len(self._conflicts) > 0 - def modified_conflicts(self): - """ - Return whether this document's conflicts have been modified. - - :return: Whether this document's conflicts have been modified. - :rtype: bool - """ - return self._conflicts is not None and \ - self._modified_conflicts is True - def _get_couch_rev(self): return self._couch_rev @@ -160,6 +168,14 @@ class CouchDocument(SoledadDocument): couch_rev = property(_get_couch_rev, _set_couch_rev) + def _get_transactions(self): + return self._transactions + + def _set_transactions(self, rev): + self._transactions = rev + + transactions = property(_get_transactions, _set_transactions) + # monkey-patch the u1db http app to use CouchDocument http_app.Document = CouchDocument @@ -225,6 +241,94 @@ def raise_server_error(exc, ddoc_path): raise errors.DesignDocUnknownError(path) +class MultipartWriter(object): + """ + A multipart writer adapted from python-couchdb's one so we can PUT + documents using couch's multipart PUT. + + This stripped down version does not allow for nested structures, and + contains only the essential things we need to PUT SoledadDocuments to the + couch backend. + """ + + CRLF = '\r\n' + + def __init__(self, fileobj, headers=None, boundary=None): + """ + Initialize the multipart writer. + """ + self.fileobj = fileobj + if boundary is None: + boundary = self._make_boundary() + self._boundary = boundary + self._build_headers('related', headers) + + def add(self, mimetype, content, headers={}): + """ + Add a part to the multipart stream. + """ + self.fileobj.write('--') + self.fileobj.write(self._boundary) + self.fileobj.write(self.CRLF) + headers['Content-Type'] = mimetype + self._write_headers(headers) + if content: + # XXX: throw an exception if a boundary appears in the content?? + self.fileobj.write(content) + self.fileobj.write(self.CRLF) + + def close(self): + """ + Close the multipart stream. + """ + self.fileobj.write('--') + self.fileobj.write(self._boundary) + # be careful not to have anything after '--', otherwise old couch + # versions (including bigcouch) will fail. + self.fileobj.write('--') + + def _make_boundary(self): + """ + Create a boundary to discern multi parts. + """ + try: + from uuid import uuid4 + return '==' + uuid4().hex + '==' + except ImportError: + from random import randrange + token = randrange(sys.maxint) + format = '%%0%dd' % len(repr(sys.maxint - 1)) + return '===============' + (format % token) + '==' + + def _write_headers(self, headers): + """ + Write a part header in the buffer stream. + """ + if headers: + for name in sorted(headers.keys()): + value = headers[name] + self.fileobj.write(name) + self.fileobj.write(': ') + self.fileobj.write(value) + self.fileobj.write(self.CRLF) + self.fileobj.write(self.CRLF) + + def _build_headers(self, subtype, headers): + """ + Build the main headers of the multipart stream. + + This is here so we can send headers separete from content using + python-couchdb API. + """ + self.headers = {} + self.headers['Content-Type'] = 'multipart/%s; boundary="%s"' % \ + (subtype, self._boundary) + if headers: + for name in sorted(headers.keys()): + value = headers[name] + self.headers[name] = value + + class CouchDatabase(CommonBackend): """ A U1DB implementation that uses CouchDB as its persistence layer. @@ -564,8 +668,15 @@ class CouchDatabase(CommonBackend): and '_attachments' in result \ and 'u1db_conflicts' in result['_attachments']: doc.has_conflicts = True + doc.set_conflicts( + self._build_conflicts( + doc.doc_id, + json.loads(binascii.a2b_base64( + result['_attachments']['u1db_conflicts']['data'])))) # store couch revision doc.couch_rev = result['_rev'] + # store transactions + doc.transactions = result['u1db_transactions'] return doc def get_doc(self, doc_id, include_deleted=False): @@ -616,6 +727,10 @@ class CouchDatabase(CommonBackend): """ Put the document in the Couch backend database. + Note that C{old_doc} must have been fetched with the parameter + C{check_for_conflicts} equal to True, so we can properly update the + new document using the conflict information from the old one. + :param old_doc: The old document version. :type old_doc: CouchDocument :param doc: The document to be put. @@ -637,45 +752,61 @@ class CouchDatabase(CommonBackend): design document for an yet unknown reason. """ - trans_id = self._allocate_transaction_id() - # encode content - content = doc.get_json() - if content is not None: - content = binascii.b2a_base64(content)[:-1] # exclude trailing \n - # encode conflicts - conflicts = None - update_conflicts = doc.modified_conflicts() - if update_conflicts is True: - if doc.has_conflicts: - conflicts = binascii.b2a_base64( - json.dumps( - map(lambda cdoc: (cdoc.rev, cdoc.content), - doc.get_conflicts())) - )[:-1] # exclude \n - # perform the request - ddoc_path = ['_design', 'docs', '_update', 'put', doc.doc_id] - resource = self._database.resource(*ddoc_path) + attachments = {} # we save content and conflicts as attachments + parts = [] # and we put it using couch's multipart PUT + # save content as attachment + if doc.is_tombstone() is False: + content = doc.get_json() + attachments['u1db_content'] = { + 'follows': True, + 'content_type': 'application/octet-stream', + 'length': len(content), + } + parts.append(content) + # save conflicts as attachment + if doc.has_conflicts is True: + conflicts = json.dumps( + map(lambda cdoc: (cdoc.rev, cdoc.content), + doc.get_conflicts())) + attachments['u1db_conflicts'] = { + 'follows': True, + 'content_type': 'application/octet-stream', + 'length': len(conflicts), + } + parts.append(conflicts) + # store old transactions, if any + transactions = old_doc.transactions[:] if old_doc is not None else [] + # create a new transaction id and timestamp it so the transaction log + # is consistent when querying the database. + transactions.append( + # here we store milliseconds to keep consistent with javascript + # Date.prototype.getTime() which was used before inside a couchdb + # update handler. + (int(time.time() * 1000), + self._allocate_transaction_id())) + # build the couch document + couch_doc = { + '_id': doc.doc_id, + 'u1db_rev': doc.rev, + 'u1db_transactions': transactions, + '_attachments': attachments, + } + # if we are updating a doc we have to add the couch doc revision + if old_doc is not None: + couch_doc['_rev'] = old_doc.couch_rev + # prepare the multipart PUT + buf = StringIO() + envelope = MultipartWriter(buf) + envelope.add('application/json', json.dumps(couch_doc)) + for part in parts: + envelope.add('application/octet-stream', part) + envelope.close() + # try to save and fail if there's a revision conflict try: - response = resource.put_json( - body={ - 'couch_rev': old_doc.couch_rev - if old_doc is not None else None, - 'u1db_rev': doc.rev, - 'content': content, - 'trans_id': trans_id, - 'conflicts': conflicts, - 'update_conflicts': update_conflicts, - }, - headers={'content-type': 'application/json'}) - # the document might have been updated in between, so we check for - # the return message - msg = response[2].read() - if msg == 'ok': - return - elif msg == 'revision conflict': - raise RevisionConflict() - except ResourceNotFound as e: - raise_missing_design_doc_error(e, ddoc_path) + self._database.resource.put_json( + doc.doc_id, body=buf.getvalue(), headers=envelope.headers) + except ResourceConflict: + raise RevisionConflict() def put_doc(self, doc): """ @@ -810,6 +941,25 @@ class CouchDatabase(CommonBackend): self._put_doc(old_doc, doc) return new_rev + def _build_conflicts(self, doc_id, attached_conflicts): + """ + Build the conflicted documents list from the conflicts attachment + fetched from a couch document. + + :param attached_conflicts: The document's conflicts as fetched from a + couch document attachment. + :type attached_conflicts: dict + """ + conflicts = [] + for doc_rev, content in attached_conflicts: + doc = self._factory(doc_id, doc_rev) + if content is None: + doc.make_tombstone() + else: + doc.content = content + conflicts.append(doc) + return conflicts + def _get_conflicts(self, doc_id, couch_rev=None): """ Get the conflicted versions of a document. @@ -830,16 +980,8 @@ class CouchDatabase(CommonBackend): resource = self._database.resource(doc_id, 'u1db_conflicts') try: response = resource.get_json(**params) - conflicts = [] - # build the conflicted versions - for doc_rev, content in json.loads(response[2].read()): - doc = self._factory(doc_id, doc_rev) - if content is None: - doc.make_tombstone() - else: - doc.content = content - conflicts.append(doc) - return conflicts + return self._build_conflicts( + doc_id, json.loads(response[2].read())) except ResourceNotFound: return [] @@ -970,7 +1112,7 @@ class CouchDatabase(CommonBackend): serialized string. :type my_content: str """ - doc.ensure_fetch_conflicts(self._get_conflicts) + doc._ensure_fetch_conflicts(self._get_conflicts) doc.add_conflict( self._factory(doc_id=doc.doc_id, rev=my_doc_rev, json=my_content)) @@ -988,7 +1130,7 @@ class CouchDatabase(CommonBackend): :param conflict_revs: A list of the revisions to be deleted. :param conflict_revs: [str] """ - doc.ensure_fetch_conflicts(self._get_conflicts) + doc._ensure_fetch_conflicts(self._get_conflicts) doc.delete_conflicts(conflict_revs) def _prune_conflicts(self, doc, doc_vcr): @@ -1067,33 +1209,24 @@ class CouchDatabase(CommonBackend): conflicted_doc_revs) superseded_revs = set(conflicted_doc_revs) doc.rev = new_rev + # this backend stores conflicts as properties of the documents, so we + # have to copy these conflicts over to the document being updated. if cur_doc.rev in superseded_revs: + # the newer doc version will supersede the one in the database, so + # we copy conflicts before updating the backend. + doc.set_conflicts(cur_doc.get_conflicts()) # copy conflicts over. self._delete_conflicts(doc, superseded_revs) self._put_doc(cur_doc, doc) else: - self._add_conflict(doc, new_rev, doc.get_json()) - self._delete_conflicts(doc, superseded_revs) - # perform request to resolve document in server - ddoc_path = [ - '_design', 'docs', '_update', 'resolve_doc', doc.doc_id - ] - resource = self._database.resource(*ddoc_path) - conflicts = None - if doc.has_conflicts: - conflicts = binascii.b2a_base64( - json.dumps( - map(lambda cdoc: (cdoc.rev, cdoc.content), - doc.get_conflicts())) - )[:-1] # exclude \n - try: - response = resource.put_json( - body={ - 'couch_rev': cur_doc.couch_rev, - 'conflicts': conflicts, - }, - headers={'content-type': 'application/json'}) - except ResourceNotFound as e: - raise_missing_design_doc_error(e, ddoc_path) + # the newer doc version does not supersede the one in the + # database, so we will add a conflict to the database and copy + # those over to the document the user has in her hands. + self._add_conflict(cur_doc, new_rev, doc.get_json()) + self._delete_conflicts(cur_doc, superseded_revs) + self._put_doc(cur_doc, cur_doc) # just update conflicts + # backend has been updated with current conflicts, now copy them + # to the current document. + doc.set_conflicts(cur_doc.get_conflicts()) def _put_doc_if_newer(self, doc, save_conflict, replica_uid, replica_gen, replica_trans_id=''): @@ -1150,7 +1283,7 @@ class CouchDatabase(CommonBackend): if cur_doc is not None: doc.couch_rev = cur_doc.couch_rev # fetch conflicts because we will eventually manipulate them - doc.ensure_fetch_conflicts(self._get_conflicts) + doc._ensure_fetch_conflicts(self._get_conflicts) # from now on, it works just like u1db sqlite backend doc_vcr = vectorclock.VectorClockRev(doc.rev) if cur_doc is None: diff --git a/common/src/leap/soledad/common/ddocs/docs/updates/put.js b/common/src/leap/soledad/common/ddocs/docs/updates/put.js deleted file mode 100644 index 5a4647de..00000000 --- a/common/src/leap/soledad/common/ddocs/docs/updates/put.js +++ /dev/null @@ -1,64 +0,0 @@ -function(doc, req){ - /* we expect to receive the following in `req.body`: - * { - * 'couch_rev': '<couch_rev>', - * 'u1db_rev': '<u1db_rev>', - * 'content': '<base64 encoded content>', - * 'trans_id': '<reansaction_id>' - * 'conflicts': '<base64 encoded conflicts>', - * 'update_conflicts': <boolean> - * } - */ - var body = JSON.parse(req.body); - - // create a new document document - if (!doc) { - doc = {} - doc['_id'] = req['id']; - } - // or fail if couch revisions do not match - else if (doc['_rev'] != body['couch_rev']) { - // of fail if revisions do not match - return [null, 'revision conflict'] - } - - // store u1db rev - doc.u1db_rev = body['u1db_rev']; - - // save content as attachment - if (body['content'] != null) { - // save u1db content as attachment - if (!doc._attachments) - doc._attachments = {}; - doc._attachments.u1db_content = { - content_type: "application/octet-stream", - data: body['content'] // should be base64 encoded - }; - } - // or delete the attachment if document is tombstone - else if (doc._attachments && - doc._attachments.u1db_content) - delete doc._attachments.u1db_content; - - // store the transaction id - if (!doc.u1db_transactions) - doc.u1db_transactions = []; - var d = new Date(); - doc.u1db_transactions.push([d.getTime(), body['trans_id']]); - - // save conflicts as attachment if they were sent - if (body['update_conflicts']) - if (body['conflicts'] != null) { - if (!doc._attachments) - doc._attachments = {}; - doc._attachments.u1db_conflicts = { - content_type: "application/octet-stream", - data: body['conflicts'] // should be base64 encoded - } - } else { - if(doc._attachments && doc._attachments.u1db_conflicts) - delete doc._attachments.u1db_conflicts - } - - return [doc, 'ok']; -} diff --git a/common/src/leap/soledad/common/ddocs/docs/updates/resolve_doc.js b/common/src/leap/soledad/common/ddocs/docs/updates/resolve_doc.js deleted file mode 100644 index 7ba66cf8..00000000 --- a/common/src/leap/soledad/common/ddocs/docs/updates/resolve_doc.js +++ /dev/null @@ -1,39 +0,0 @@ -function(doc, req){ - /* we expect to receive the following in `req.body`: - * { - * 'couch_rev': '<couch_rev>', - * 'conflicts': '<base64 encoded conflicts>', - * } - */ - var body = JSON.parse(req.body); - - // fail if no document was given - if (!doc) { - return [null, 'document does not exist'] - } - - // fail if couch revisions do not match - if (body['couch_rev'] != null - && doc['_rev'] != body['couch_rev']) { - return [null, 'revision conflict'] - } - - // fail if conflicts were not sent - if (body['conflicts'] == null) - return [null, 'missing conflicts'] - - // save conflicts as attachment if they were sent - if (body['conflicts'] != null) { - if (!doc._attachments) - doc._attachments = {}; - doc._attachments.u1db_conflicts = { - content_type: "application/octet-stream", - data: body['conflicts'] // should be base64 encoded - } - } - // or delete attachment if there are no conflicts - else if (doc._attachments && doc._attachments.u1db_conflicts) - delete doc._attachments.u1db_conflicts; - - return [doc, 'ok']; -} diff --git a/common/src/leap/soledad/common/tests/test_couch.py b/common/src/leap/soledad/common/tests/test_couch.py index dc0ea906..86bb4b93 100644 --- a/common/src/leap/soledad/common/tests/test_couch.py +++ b/common/src/leap/soledad/common/tests/test_couch.py @@ -495,10 +495,6 @@ class CouchDatabaseExceptionsTests(CouchDBTestCase): self.assertRaises( errors.MissingDesignDocError, self.db._get_transaction_log) - # create_doc() - self.assertRaises( - errors.MissingDesignDocError, - self.db.create_doc, {}) # whats_changed() self.assertRaises( errors.MissingDesignDocError, @@ -507,14 +503,6 @@ class CouchDatabaseExceptionsTests(CouchDBTestCase): self.assertRaises( errors.MissingDesignDocError, self.db._do_set_replica_gen_and_trans_id, 1, 2, 3) - # fake a conflict so we can test resolve_doc() - first_rev = self.db._allocate_doc_rev(None) - doc = couch.CouchDocument( - doc_id='mydoc', rev=self.db._allocate_doc_rev(first_rev)) - self.db._get_doc = Mock(return_value=doc) - self.assertRaises( - errors.MissingDesignDocError, - self.db.resolve_doc, doc, [first_rev]) def test_missing_design_doc_functions_raises(self): """ @@ -645,10 +633,6 @@ class CouchDatabaseExceptionsTests(CouchDBTestCase): self.assertRaises( errors.MissingDesignDocDeletedError, self.db._get_transaction_log) - # create_doc() - self.assertRaises( - errors.MissingDesignDocDeletedError, - self.db.create_doc, {}) # whats_changed() self.assertRaises( errors.MissingDesignDocDeletedError, @@ -657,14 +641,6 @@ class CouchDatabaseExceptionsTests(CouchDBTestCase): self.assertRaises( errors.MissingDesignDocDeletedError, self.db._do_set_replica_gen_and_trans_id, 1, 2, 3) - # fake a conflict so we can test resolve_doc() - first_rev = self.db._allocate_doc_rev(None) - doc = couch.CouchDocument( - doc_id='mydoc', rev=self.db._allocate_doc_rev(first_rev)) - self.db._get_doc = Mock(return_value=doc) - self.assertRaises( - errors.MissingDesignDocDeletedError, - self.db.resolve_doc, doc, [first_rev]) load_tests = tests.load_with_scenarios diff --git a/scripts/migrate_dbs.py b/scripts/migrate_dbs.py deleted file mode 100644 index f1c20d87..00000000 --- a/scripts/migrate_dbs.py +++ /dev/null @@ -1,288 +0,0 @@ -#!/usr/bin/python - -import sys -import json -import logging -import argparse -import re -import threading -from urlparse import urlparse -from ConfigParser import ConfigParser -from couchdb.client import Server -from couchdb.http import ResourceNotFound, Resource, Session -from datetime import datetime - -from leap.soledad.common.couch import CouchDatabase - - -# parse command line for the log file name -logger_fname = "/tmp/u1db-couch-db-migration_%s.log" % \ - str(datetime.now()).replace(' ', '_') -parser = argparse.ArgumentParser() -parser.add_argument('--log', action='store', default=logger_fname, type=str, - required=False, help='the name of the log file', nargs=1) -args = parser.parse_args() - - -# configure the logger -logger = logging.getLogger(__name__) -logger.setLevel(logging.DEBUG) -print "Logging to %s." % args.log -logging.basicConfig( - filename=args.log, - format="%(asctime)-15s %(message)s") - - -# configure threads -max_threads = 20 -semaphore_pool = threading.BoundedSemaphore(value=max_threads) - -# get couch url -cp = ConfigParser() -cp.read('/etc/leap/soledad-server.conf') -url = cp.get('soledad-server', 'couch_url') - -resource = Resource(url, Session(retry_delays=[1,2,4,8], timeout=10)) -server = Server(url=resource) - -hidden_url = re.sub( - 'http://(.*):.*@', - 'http://\\1:xxxxx@', - url) - -print """ -========== -ATTENTION! -========== - -This script will modify Soledad's shared and user databases in: - - %s - -This script does not make a backup of the couch db data, so make sure youj -have a copy or you may loose data. -""" % hidden_url -confirm = raw_input("Proceed (type uppercase YES)? ") - -if confirm != "YES": - exit(1) - - -# -# Thread -# - -class DocWorkerThread(threading.Thread): - - def __init__(self, dbname, doc_id, db_idx, db_len, doc_idx, doc_len, - transaction_log, conflict_log, release_fun): - threading.Thread.__init__(self) - resource = Resource(url, Session(retry_delays=[1,2,4,8], timeout=10)) - server = Server(url=resource) - self._dbname = dbname - self._cdb = server[self._dbname] - self._doc_id = doc_id - self._db_idx = db_idx - self._db_len = db_len - self._doc_idx = doc_idx - self._doc_len = doc_len - self._transaction_log = transaction_log - self._conflict_log = conflict_log - self._release_fun = release_fun - - def run(self): - - old_doc = self._cdb[self._doc_id] - - # skip non u1db docs - if 'u1db_rev' not in old_doc: - logger.debug('(%d/%d) (%d/%d) Skipping %s/%s).' % - (self._db_idx, self._db_len, self._doc_idx, - self._doc_len, self._dbname, self._doc_id)) - self._release_fun() - return - else: - logger.debug('(%d/%d) (%d/%d) Processing %s/%s ...' % - (self._db_idx, self._db_len, self._doc_idx, - self._doc_len, self._dbname, self._doc_id)) - - doc = { - '_id': self._doc_id, - '_rev': old_doc['_rev'], - 'u1db_rev': old_doc['u1db_rev'] - } - attachments = [] - - # add transactions - doc['u1db_transactions'] = map( - lambda (gen, doc_id, trans_id): (gen, trans_id), - filter( - lambda (gen, doc_id, trans_id): doc_id == doc['_id'], - self._transaction_log)) - if len(doc['u1db_transactions']) == 0: - del doc['u1db_transactions'] - - # add conflicts - if doc['_id'] in self._conflict_log: - attachments.append([ - conflict_log[doc['_id']], - 'u1db_conflicts', - "application/octet-stream"]) - - # move document's content to 'u1db_content' attachment - content = self._cdb.get_attachment(doc, 'u1db_json') - if content is not None: - attachments.append([ - content, - 'u1db_content', - "application/octet-stream"]) - #self._cdb.delete_attachment(doc, 'u1db_json') - - # save modified doc - self._cdb.save(doc) - - # save all doc attachments - for content, att_name, content_type in attachments: - self._cdb.put_attachment( - doc, - content, - filename=att_name, - content_type=content_type) - - # release the semaphore - self._release_fun() - - -db_idx = 0 -db_len = len(server) -for dbname in server: - - db_idx += 1 - - if not (dbname.startswith('user-') or dbname == 'shared') \ - or dbname == 'user-test-db': - logger.info("(%d/%d) Skipping db %s." % (db_idx, db_len, dbname)) - continue - - logger.info("(%d/%d) Migrating db %s." % (db_idx, db_len, dbname)) - - # get access to couch db - cdb = Server(url)[dbname] - - # get access to soledad db - sdb = CouchDatabase(url, dbname) - - # Migration table - # --------------- - # - # * Metadata that was previously stored in special documents migrate to - # inside documents, to allow for atomic doc-and-metadata updates. - # * Doc content attachment name changes. - # * Indexes are removed, to be implemented in the future possibly as - # design docs view functions. - # - # +-----------------+-------------------------+-------------------------+ - # | Data | old storage | new storage | - # |-----------------+-------------------------+-------------------------+ - # | doc content | <doc_id>/u1db_json | <doc_id>/u1db_content | - # | doc conflicts | u1db/_conflicts | <doc_id>/u1db_conflicts | - # | transaction log | u1db/_transaction_log | doc.u1db_transactions | - # | sync log | u1db/_other_generations | u1db_sync_log | - # | indexes | u1db/_indexes | not implemented | - # | replica uid | u1db/_replica_uid | u1db_config | - # +-----------------+-------------------------+-------------------------+ - - def get_att_content(db, doc_id, att_name): - try: - return json.loads( - db.get_attachment( - doc_id, att_name).read())['content'] - except: - import ipdb - ipdb.set_trace() - - # only migrate databases that have the 'u1db/_replica_uid' document - try: - metadoc = cdb.get('u1db/_replica_uid') - replica_uid = get_att_content(cdb, 'u1db/_replica_uid', 'u1db_json') - except ResourceNotFound: - continue - - #--------------------------------------------------------------------- - # Step 1: Set replica uid. - #--------------------------------------------------------------------- - sdb._set_replica_uid(replica_uid) - - #--------------------------------------------------------------------- - # Step 2: Obtain metadata. - #--------------------------------------------------------------------- - - # obtain the transaction log: [['<doc_id>', '<trans_id>'], ...] - transaction_log = get_att_content( - cdb, 'u1db/_transaction_log', 'u1db_json') - new_transaction_log = [] - gen = 1 - for (doc_id, trans_id) in transaction_log: - new_transaction_log.append((gen, doc_id, trans_id)) - gen += 1 - transaction_log = new_transaction_log - - # obtain the conflict log: {'<doc_id>': ['<rev>', '<content>'], ...} - conflict_log = get_att_content(cdb, 'u1db/_conflicts', 'u1db_json') - - # obtain the sync log: - # {'<replica_uid>': ['<gen>', '<transaction_id>'], ...} - other_generations = get_att_content( - cdb, 'u1db/_other_generations', 'u1db_json') - - #--------------------------------------------------------------------- - # Step 3: Iterate over all documents in database. - #--------------------------------------------------------------------- - doc_len = len(cdb) - logger.info("(%d, %d) Found %d documents." % (db_idx, db_len, doc_len)) - doc_idx = 0 - threads = [] - for doc_id in cdb: - doc_idx = doc_idx + 1 - - semaphore_pool.acquire() - thread = DocWorkerThread(dbname, doc_id, db_idx, db_len, - doc_idx, doc_len, transaction_log, - conflict_log, semaphore_pool.release) - thread.daemon = True - thread.start() - threads.append(thread) - - map(lambda thread: thread.join(), threads) - - #--------------------------------------------------------------------- - # Step 4: Move sync log. - #--------------------------------------------------------------------- - - # move sync log - sync_doc = { - '_id': 'u1db_sync_log', - 'syncs': [] - } - - for replica_uid in other_generations: - gen, transaction_id = other_generations[replica_uid] - sync_doc['syncs'].append([replica_uid, gen, transaction_id]) - cdb.save(sync_doc) - - #--------------------------------------------------------------------- - # Step 5: Delete old meta documents. - #--------------------------------------------------------------------- - - # remove unused docs - for doc_id in ['_transaction_log', '_conflicts', '_other_generations', - '_indexes', '_replica_uid']: - for prefix in ['u1db/', 'u1db%2F']: - try: - doc = cdb['%s%s' % (prefix, doc_id)] - logger.info( - "(%d/%d) Deleting %s/%s/%s." % - (db_idx, db_len, dbname, 'u1db', doc_id)) - cdb.delete(doc) - except ResourceNotFound: - pass diff --git a/scripts/update_design_docs.py b/scripts/update_design_docs.py new file mode 100644 index 00000000..e7b5a29c --- /dev/null +++ b/scripts/update_design_docs.py @@ -0,0 +1,147 @@ +#!/usr/bin/python + +# This script updates Soledad's design documents in the session database and +# all user databases with contents from the installed leap.soledad.common +# package. + +import json +import logging +import argparse +import re +import threading +import binascii + + +from getpass import getpass +from ConfigParser import ConfigParser +from couchdb.client import Server +from couchdb.http import Resource, Session +from datetime import datetime +from urlparse import urlparse + + +from leap.soledad.common import ddocs + + +# parse command line for the log file name +logger_fname = "/tmp/update-design-docs_%s.log" % \ + str(datetime.now()).replace(' ', '_') +parser = argparse.ArgumentParser() +parser.add_argument('--log', action='store', default=logger_fname, type=str, + required=False, help='the name of the log file', nargs=1) +args = parser.parse_args() + + +# configure the logger +logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) +print "Logging to %s." % args.log +logging.basicConfig( + filename=args.log, + format="%(asctime)-15s %(message)s") + + +# configure threads +max_threads = 20 +semaphore_pool = threading.BoundedSemaphore(value=max_threads) +threads = [] + +# get couch url +cp = ConfigParser() +cp.read('/etc/leap/soledad-server.conf') +url = urlparse(cp.get('soledad-server', 'couch_url')) + +# get admin password +netloc = re.sub('^.*@', '', url.netloc) +url = url._replace(netloc=netloc) +password = getpass("Admin password for %s: " % url.geturl()) +url = url._replace(netloc='admin:%s@%s' % (password, netloc)) + +resource = Resource(url.geturl(), Session(retry_delays=[1,2,4,8], timeout=10)) +server = Server(url=resource) + +hidden_url = re.sub( + 'http://(.*):.*@', + 'http://\\1:xxxxx@', + url.geturl()) + +print """ +========== +ATTENTION! +========== + +This script will modify Soledad's shared and user databases in: + + %s + +This script does not make a backup of the couch db data, so make sure you +have a copy or you may loose data. +""" % hidden_url +confirm = raw_input("Proceed (type uppercase YES)? ") + +if confirm != "YES": + exit(1) + +# convert design doc content + +design_docs = { + '_design/docs': json.loads(binascii.a2b_base64(ddocs.docs)), + '_design/syncs': json.loads(binascii.a2b_base64(ddocs.syncs)), + '_design/transactions': json.loads(binascii.a2b_base64(ddocs.transactions)), +} + +# +# Thread +# + +class DBWorkerThread(threading.Thread): + + def __init__(self, server, dbname, db_idx, db_len, release_fun): + threading.Thread.__init__(self) + self._dbname = dbname + self._cdb = server[self._dbname] + self._db_idx = db_idx + self._db_len = db_len + self._release_fun = release_fun + + def run(self): + + logger.info("(%d/%d) Updating db %s." % (self._db_idx, self._db_len, + self._dbname)) + + for doc_id in design_docs: + doc = self._cdb[doc_id] + for key in ['lists', 'views', 'updates']: + if key in design_docs[doc_id]: + doc[key] = design_docs[doc_id][key] + self._cdb.save(doc) + + # release the semaphore + self._release_fun() + + +db_idx = 0 +db_len = len(server) +for dbname in server: + + db_idx += 1 + + if not (dbname.startswith('user-') or dbname == 'shared') \ + or dbname == 'user-test-db': + logger.info("(%d/%d) Skipping db %s." % (db_idx, db_len, dbname)) + continue + + + # get access to couch db + cdb = Server(url.geturl())[dbname] + + #--------------------------------------------------------------------- + # Start DB worker thread + #--------------------------------------------------------------------- + semaphore_pool.acquire() + thread = DBWorkerThread(server, dbname, db_idx, db_len, semaphore_pool.release) + thread.daemon = True + thread.start() + threads.append(thread) + +map(lambda thread: thread.join(), threads) |