summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTomás Touceda <chiiph@leap.se>2014-03-11 09:56:28 -0300
committerTomás Touceda <chiiph@leap.se>2014-03-11 09:56:28 -0300
commita8d002714e4ce2ff487785357cc01d082ffad537 (patch)
tree00bb495df23d7f826bebe85fb500501ca8f460b3
parent66c93f4515139fcf666948b31e09c5bfae91e3dc (diff)
parente5664fb1046e71589d0c41cd605761cc642ff28b (diff)
Merge remote-tracking branch 'refs/remotes/drebs/feature/5011_use-less-memory-when-putting-docs-on-couch-3' into develop
-rw-r--r--client/src/leap/soledad/client/__init__.py2
-rw-r--r--common/changes/feature_5011-use-less-memory-when-putting-docs-on-couch1
-rw-r--r--common/src/leap/soledad/common/couch.py317
-rw-r--r--common/src/leap/soledad/common/ddocs/docs/updates/put.js64
-rw-r--r--common/src/leap/soledad/common/ddocs/docs/updates/resolve_doc.js39
-rw-r--r--common/src/leap/soledad/common/tests/test_couch.py24
-rw-r--r--scripts/migrate_dbs.py288
-rw-r--r--scripts/update_design_docs.py147
8 files changed, 374 insertions, 508 deletions
diff --git a/client/src/leap/soledad/client/__init__.py b/client/src/leap/soledad/client/__init__.py
index b5ce7c32..a8d68c88 100644
--- a/client/src/leap/soledad/client/__init__.py
+++ b/client/src/leap/soledad/client/__init__.py
@@ -778,7 +778,7 @@ class Soledad(object):
============================== WARNING ==============================
This method converts the document's contents to unicode in-place. This
- meanse that after calling C{put_doc(doc)}, the contents of the
+ means that after calling C{put_doc(doc)}, the contents of the
document, i.e. C{doc.content}, might be different from before the
call.
============================== WARNING ==============================
diff --git a/common/changes/feature_5011-use-less-memory-when-putting-docs-on-couch b/common/changes/feature_5011-use-less-memory-when-putting-docs-on-couch
new file mode 100644
index 00000000..7d3f6e4f
--- /dev/null
+++ b/common/changes/feature_5011-use-less-memory-when-putting-docs-on-couch
@@ -0,0 +1 @@
+ o Use less memory when putting docs on couch (#5011).
diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py
index 40d64370..456d4fdf 100644
--- a/common/src/leap/soledad/common/couch.py
+++ b/common/src/leap/soledad/common/couch.py
@@ -24,10 +24,21 @@ import uuid
import logging
import binascii
import socket
+import time
+import sys
+
+
+from StringIO import StringIO
from couchdb.client import Server
-from couchdb.http import ResourceNotFound, Unauthorized, ServerError, Session
+from couchdb.http import (
+ ResourceConflict,
+ ResourceNotFound,
+ Unauthorized,
+ ServerError,
+ Session,
+)
from u1db import query_parser, vectorclock
from u1db.errors import (
DatabaseDoesNotExist,
@@ -87,9 +98,9 @@ class CouchDocument(SoledadDocument):
SoledadDocument.__init__(self, doc_id, rev, json, has_conflicts)
self._couch_rev = None
self._conflicts = None
- self._modified_conflicts = False
+ self._transactions = None
- def ensure_fetch_conflicts(self, get_conflicts_fun):
+ def _ensure_fetch_conflicts(self, get_conflicts_fun):
"""
Ensure conflict data has been fetched from the server.
@@ -112,6 +123,16 @@ class CouchDocument(SoledadDocument):
"""
return self._conflicts
+ def set_conflicts(self, conflicts):
+ """
+ Set the conflicted versions of the document.
+
+ :param conflicts: The conflicted versions of the document.
+ :type conflicts: list
+ """
+ self._conflicts = conflicts
+ self.has_conflicts = len(self._conflicts) > 0
+
def add_conflict(self, doc):
"""
Add a conflict to this document.
@@ -120,8 +141,7 @@ class CouchDocument(SoledadDocument):
:type doc: CouchDocument
"""
if self._conflicts is None:
- raise Exception("Run self.ensure_fetch_conflicts first!")
- self._modified_conflicts = True
+ raise Exception("Run self._ensure_fetch_conflicts first!")
self._conflicts.append(doc)
self.has_conflicts = len(self._conflicts) > 0
@@ -133,25 +153,13 @@ class CouchDocument(SoledadDocument):
:type conflict_revs: [str]
"""
if self._conflicts is None:
- raise Exception("Run self.ensure_fetch_conflicts first!")
+ raise Exception("Run self._ensure_fetch_conflicts first!")
conflicts_len = len(self._conflicts)
self._conflicts = filter(
lambda doc: doc.rev not in conflict_revs,
self._conflicts)
- if len(self._conflicts) < conflicts_len:
- self._modified_conflicts = True
self.has_conflicts = len(self._conflicts) > 0
- def modified_conflicts(self):
- """
- Return whether this document's conflicts have been modified.
-
- :return: Whether this document's conflicts have been modified.
- :rtype: bool
- """
- return self._conflicts is not None and \
- self._modified_conflicts is True
-
def _get_couch_rev(self):
return self._couch_rev
@@ -160,6 +168,14 @@ class CouchDocument(SoledadDocument):
couch_rev = property(_get_couch_rev, _set_couch_rev)
+ def _get_transactions(self):
+ return self._transactions
+
+ def _set_transactions(self, rev):
+ self._transactions = rev
+
+ transactions = property(_get_transactions, _set_transactions)
+
# monkey-patch the u1db http app to use CouchDocument
http_app.Document = CouchDocument
@@ -225,6 +241,94 @@ def raise_server_error(exc, ddoc_path):
raise errors.DesignDocUnknownError(path)
+class MultipartWriter(object):
+ """
+ A multipart writer adapted from python-couchdb's one so we can PUT
+ documents using couch's multipart PUT.
+
+ This stripped down version does not allow for nested structures, and
+ contains only the essential things we need to PUT SoledadDocuments to the
+ couch backend.
+ """
+
+ CRLF = '\r\n'
+
+ def __init__(self, fileobj, headers=None, boundary=None):
+ """
+ Initialize the multipart writer.
+ """
+ self.fileobj = fileobj
+ if boundary is None:
+ boundary = self._make_boundary()
+ self._boundary = boundary
+ self._build_headers('related', headers)
+
+ def add(self, mimetype, content, headers={}):
+ """
+ Add a part to the multipart stream.
+ """
+ self.fileobj.write('--')
+ self.fileobj.write(self._boundary)
+ self.fileobj.write(self.CRLF)
+ headers['Content-Type'] = mimetype
+ self._write_headers(headers)
+ if content:
+ # XXX: throw an exception if a boundary appears in the content??
+ self.fileobj.write(content)
+ self.fileobj.write(self.CRLF)
+
+ def close(self):
+ """
+ Close the multipart stream.
+ """
+ self.fileobj.write('--')
+ self.fileobj.write(self._boundary)
+ # be careful not to have anything after '--', otherwise old couch
+ # versions (including bigcouch) will fail.
+ self.fileobj.write('--')
+
+ def _make_boundary(self):
+ """
+ Create a boundary to discern multi parts.
+ """
+ try:
+ from uuid import uuid4
+ return '==' + uuid4().hex + '=='
+ except ImportError:
+ from random import randrange
+ token = randrange(sys.maxint)
+ format = '%%0%dd' % len(repr(sys.maxint - 1))
+ return '===============' + (format % token) + '=='
+
+ def _write_headers(self, headers):
+ """
+ Write a part header in the buffer stream.
+ """
+ if headers:
+ for name in sorted(headers.keys()):
+ value = headers[name]
+ self.fileobj.write(name)
+ self.fileobj.write(': ')
+ self.fileobj.write(value)
+ self.fileobj.write(self.CRLF)
+ self.fileobj.write(self.CRLF)
+
+ def _build_headers(self, subtype, headers):
+ """
+ Build the main headers of the multipart stream.
+
+ This is here so we can send headers separete from content using
+ python-couchdb API.
+ """
+ self.headers = {}
+ self.headers['Content-Type'] = 'multipart/%s; boundary="%s"' % \
+ (subtype, self._boundary)
+ if headers:
+ for name in sorted(headers.keys()):
+ value = headers[name]
+ self.headers[name] = value
+
+
class CouchDatabase(CommonBackend):
"""
A U1DB implementation that uses CouchDB as its persistence layer.
@@ -564,8 +668,15 @@ class CouchDatabase(CommonBackend):
and '_attachments' in result \
and 'u1db_conflicts' in result['_attachments']:
doc.has_conflicts = True
+ doc.set_conflicts(
+ self._build_conflicts(
+ doc.doc_id,
+ json.loads(binascii.a2b_base64(
+ result['_attachments']['u1db_conflicts']['data']))))
# store couch revision
doc.couch_rev = result['_rev']
+ # store transactions
+ doc.transactions = result['u1db_transactions']
return doc
def get_doc(self, doc_id, include_deleted=False):
@@ -616,6 +727,10 @@ class CouchDatabase(CommonBackend):
"""
Put the document in the Couch backend database.
+ Note that C{old_doc} must have been fetched with the parameter
+ C{check_for_conflicts} equal to True, so we can properly update the
+ new document using the conflict information from the old one.
+
:param old_doc: The old document version.
:type old_doc: CouchDocument
:param doc: The document to be put.
@@ -637,45 +752,61 @@ class CouchDatabase(CommonBackend):
design document for an yet
unknown reason.
"""
- trans_id = self._allocate_transaction_id()
- # encode content
- content = doc.get_json()
- if content is not None:
- content = binascii.b2a_base64(content)[:-1] # exclude trailing \n
- # encode conflicts
- conflicts = None
- update_conflicts = doc.modified_conflicts()
- if update_conflicts is True:
- if doc.has_conflicts:
- conflicts = binascii.b2a_base64(
- json.dumps(
- map(lambda cdoc: (cdoc.rev, cdoc.content),
- doc.get_conflicts()))
- )[:-1] # exclude \n
- # perform the request
- ddoc_path = ['_design', 'docs', '_update', 'put', doc.doc_id]
- resource = self._database.resource(*ddoc_path)
+ attachments = {} # we save content and conflicts as attachments
+ parts = [] # and we put it using couch's multipart PUT
+ # save content as attachment
+ if doc.is_tombstone() is False:
+ content = doc.get_json()
+ attachments['u1db_content'] = {
+ 'follows': True,
+ 'content_type': 'application/octet-stream',
+ 'length': len(content),
+ }
+ parts.append(content)
+ # save conflicts as attachment
+ if doc.has_conflicts is True:
+ conflicts = json.dumps(
+ map(lambda cdoc: (cdoc.rev, cdoc.content),
+ doc.get_conflicts()))
+ attachments['u1db_conflicts'] = {
+ 'follows': True,
+ 'content_type': 'application/octet-stream',
+ 'length': len(conflicts),
+ }
+ parts.append(conflicts)
+ # store old transactions, if any
+ transactions = old_doc.transactions[:] if old_doc is not None else []
+ # create a new transaction id and timestamp it so the transaction log
+ # is consistent when querying the database.
+ transactions.append(
+ # here we store milliseconds to keep consistent with javascript
+ # Date.prototype.getTime() which was used before inside a couchdb
+ # update handler.
+ (int(time.time() * 1000),
+ self._allocate_transaction_id()))
+ # build the couch document
+ couch_doc = {
+ '_id': doc.doc_id,
+ 'u1db_rev': doc.rev,
+ 'u1db_transactions': transactions,
+ '_attachments': attachments,
+ }
+ # if we are updating a doc we have to add the couch doc revision
+ if old_doc is not None:
+ couch_doc['_rev'] = old_doc.couch_rev
+ # prepare the multipart PUT
+ buf = StringIO()
+ envelope = MultipartWriter(buf)
+ envelope.add('application/json', json.dumps(couch_doc))
+ for part in parts:
+ envelope.add('application/octet-stream', part)
+ envelope.close()
+ # try to save and fail if there's a revision conflict
try:
- response = resource.put_json(
- body={
- 'couch_rev': old_doc.couch_rev
- if old_doc is not None else None,
- 'u1db_rev': doc.rev,
- 'content': content,
- 'trans_id': trans_id,
- 'conflicts': conflicts,
- 'update_conflicts': update_conflicts,
- },
- headers={'content-type': 'application/json'})
- # the document might have been updated in between, so we check for
- # the return message
- msg = response[2].read()
- if msg == 'ok':
- return
- elif msg == 'revision conflict':
- raise RevisionConflict()
- except ResourceNotFound as e:
- raise_missing_design_doc_error(e, ddoc_path)
+ self._database.resource.put_json(
+ doc.doc_id, body=buf.getvalue(), headers=envelope.headers)
+ except ResourceConflict:
+ raise RevisionConflict()
def put_doc(self, doc):
"""
@@ -810,6 +941,25 @@ class CouchDatabase(CommonBackend):
self._put_doc(old_doc, doc)
return new_rev
+ def _build_conflicts(self, doc_id, attached_conflicts):
+ """
+ Build the conflicted documents list from the conflicts attachment
+ fetched from a couch document.
+
+ :param attached_conflicts: The document's conflicts as fetched from a
+ couch document attachment.
+ :type attached_conflicts: dict
+ """
+ conflicts = []
+ for doc_rev, content in attached_conflicts:
+ doc = self._factory(doc_id, doc_rev)
+ if content is None:
+ doc.make_tombstone()
+ else:
+ doc.content = content
+ conflicts.append(doc)
+ return conflicts
+
def _get_conflicts(self, doc_id, couch_rev=None):
"""
Get the conflicted versions of a document.
@@ -830,16 +980,8 @@ class CouchDatabase(CommonBackend):
resource = self._database.resource(doc_id, 'u1db_conflicts')
try:
response = resource.get_json(**params)
- conflicts = []
- # build the conflicted versions
- for doc_rev, content in json.loads(response[2].read()):
- doc = self._factory(doc_id, doc_rev)
- if content is None:
- doc.make_tombstone()
- else:
- doc.content = content
- conflicts.append(doc)
- return conflicts
+ return self._build_conflicts(
+ doc_id, json.loads(response[2].read()))
except ResourceNotFound:
return []
@@ -970,7 +1112,7 @@ class CouchDatabase(CommonBackend):
serialized string.
:type my_content: str
"""
- doc.ensure_fetch_conflicts(self._get_conflicts)
+ doc._ensure_fetch_conflicts(self._get_conflicts)
doc.add_conflict(
self._factory(doc_id=doc.doc_id, rev=my_doc_rev,
json=my_content))
@@ -988,7 +1130,7 @@ class CouchDatabase(CommonBackend):
:param conflict_revs: A list of the revisions to be deleted.
:param conflict_revs: [str]
"""
- doc.ensure_fetch_conflicts(self._get_conflicts)
+ doc._ensure_fetch_conflicts(self._get_conflicts)
doc.delete_conflicts(conflict_revs)
def _prune_conflicts(self, doc, doc_vcr):
@@ -1067,33 +1209,24 @@ class CouchDatabase(CommonBackend):
conflicted_doc_revs)
superseded_revs = set(conflicted_doc_revs)
doc.rev = new_rev
+ # this backend stores conflicts as properties of the documents, so we
+ # have to copy these conflicts over to the document being updated.
if cur_doc.rev in superseded_revs:
+ # the newer doc version will supersede the one in the database, so
+ # we copy conflicts before updating the backend.
+ doc.set_conflicts(cur_doc.get_conflicts()) # copy conflicts over.
self._delete_conflicts(doc, superseded_revs)
self._put_doc(cur_doc, doc)
else:
- self._add_conflict(doc, new_rev, doc.get_json())
- self._delete_conflicts(doc, superseded_revs)
- # perform request to resolve document in server
- ddoc_path = [
- '_design', 'docs', '_update', 'resolve_doc', doc.doc_id
- ]
- resource = self._database.resource(*ddoc_path)
- conflicts = None
- if doc.has_conflicts:
- conflicts = binascii.b2a_base64(
- json.dumps(
- map(lambda cdoc: (cdoc.rev, cdoc.content),
- doc.get_conflicts()))
- )[:-1] # exclude \n
- try:
- response = resource.put_json(
- body={
- 'couch_rev': cur_doc.couch_rev,
- 'conflicts': conflicts,
- },
- headers={'content-type': 'application/json'})
- except ResourceNotFound as e:
- raise_missing_design_doc_error(e, ddoc_path)
+ # the newer doc version does not supersede the one in the
+ # database, so we will add a conflict to the database and copy
+ # those over to the document the user has in her hands.
+ self._add_conflict(cur_doc, new_rev, doc.get_json())
+ self._delete_conflicts(cur_doc, superseded_revs)
+ self._put_doc(cur_doc, cur_doc) # just update conflicts
+ # backend has been updated with current conflicts, now copy them
+ # to the current document.
+ doc.set_conflicts(cur_doc.get_conflicts())
def _put_doc_if_newer(self, doc, save_conflict, replica_uid, replica_gen,
replica_trans_id=''):
@@ -1150,7 +1283,7 @@ class CouchDatabase(CommonBackend):
if cur_doc is not None:
doc.couch_rev = cur_doc.couch_rev
# fetch conflicts because we will eventually manipulate them
- doc.ensure_fetch_conflicts(self._get_conflicts)
+ doc._ensure_fetch_conflicts(self._get_conflicts)
# from now on, it works just like u1db sqlite backend
doc_vcr = vectorclock.VectorClockRev(doc.rev)
if cur_doc is None:
diff --git a/common/src/leap/soledad/common/ddocs/docs/updates/put.js b/common/src/leap/soledad/common/ddocs/docs/updates/put.js
deleted file mode 100644
index 5a4647de..00000000
--- a/common/src/leap/soledad/common/ddocs/docs/updates/put.js
+++ /dev/null
@@ -1,64 +0,0 @@
-function(doc, req){
- /* we expect to receive the following in `req.body`:
- * {
- * 'couch_rev': '<couch_rev>',
- * 'u1db_rev': '<u1db_rev>',
- * 'content': '<base64 encoded content>',
- * 'trans_id': '<reansaction_id>'
- * 'conflicts': '<base64 encoded conflicts>',
- * 'update_conflicts': <boolean>
- * }
- */
- var body = JSON.parse(req.body);
-
- // create a new document document
- if (!doc) {
- doc = {}
- doc['_id'] = req['id'];
- }
- // or fail if couch revisions do not match
- else if (doc['_rev'] != body['couch_rev']) {
- // of fail if revisions do not match
- return [null, 'revision conflict']
- }
-
- // store u1db rev
- doc.u1db_rev = body['u1db_rev'];
-
- // save content as attachment
- if (body['content'] != null) {
- // save u1db content as attachment
- if (!doc._attachments)
- doc._attachments = {};
- doc._attachments.u1db_content = {
- content_type: "application/octet-stream",
- data: body['content'] // should be base64 encoded
- };
- }
- // or delete the attachment if document is tombstone
- else if (doc._attachments &&
- doc._attachments.u1db_content)
- delete doc._attachments.u1db_content;
-
- // store the transaction id
- if (!doc.u1db_transactions)
- doc.u1db_transactions = [];
- var d = new Date();
- doc.u1db_transactions.push([d.getTime(), body['trans_id']]);
-
- // save conflicts as attachment if they were sent
- if (body['update_conflicts'])
- if (body['conflicts'] != null) {
- if (!doc._attachments)
- doc._attachments = {};
- doc._attachments.u1db_conflicts = {
- content_type: "application/octet-stream",
- data: body['conflicts'] // should be base64 encoded
- }
- } else {
- if(doc._attachments && doc._attachments.u1db_conflicts)
- delete doc._attachments.u1db_conflicts
- }
-
- return [doc, 'ok'];
-}
diff --git a/common/src/leap/soledad/common/ddocs/docs/updates/resolve_doc.js b/common/src/leap/soledad/common/ddocs/docs/updates/resolve_doc.js
deleted file mode 100644
index 7ba66cf8..00000000
--- a/common/src/leap/soledad/common/ddocs/docs/updates/resolve_doc.js
+++ /dev/null
@@ -1,39 +0,0 @@
-function(doc, req){
- /* we expect to receive the following in `req.body`:
- * {
- * 'couch_rev': '<couch_rev>',
- * 'conflicts': '<base64 encoded conflicts>',
- * }
- */
- var body = JSON.parse(req.body);
-
- // fail if no document was given
- if (!doc) {
- return [null, 'document does not exist']
- }
-
- // fail if couch revisions do not match
- if (body['couch_rev'] != null
- && doc['_rev'] != body['couch_rev']) {
- return [null, 'revision conflict']
- }
-
- // fail if conflicts were not sent
- if (body['conflicts'] == null)
- return [null, 'missing conflicts']
-
- // save conflicts as attachment if they were sent
- if (body['conflicts'] != null) {
- if (!doc._attachments)
- doc._attachments = {};
- doc._attachments.u1db_conflicts = {
- content_type: "application/octet-stream",
- data: body['conflicts'] // should be base64 encoded
- }
- }
- // or delete attachment if there are no conflicts
- else if (doc._attachments && doc._attachments.u1db_conflicts)
- delete doc._attachments.u1db_conflicts;
-
- return [doc, 'ok'];
-}
diff --git a/common/src/leap/soledad/common/tests/test_couch.py b/common/src/leap/soledad/common/tests/test_couch.py
index dc0ea906..86bb4b93 100644
--- a/common/src/leap/soledad/common/tests/test_couch.py
+++ b/common/src/leap/soledad/common/tests/test_couch.py
@@ -495,10 +495,6 @@ class CouchDatabaseExceptionsTests(CouchDBTestCase):
self.assertRaises(
errors.MissingDesignDocError,
self.db._get_transaction_log)
- # create_doc()
- self.assertRaises(
- errors.MissingDesignDocError,
- self.db.create_doc, {})
# whats_changed()
self.assertRaises(
errors.MissingDesignDocError,
@@ -507,14 +503,6 @@ class CouchDatabaseExceptionsTests(CouchDBTestCase):
self.assertRaises(
errors.MissingDesignDocError,
self.db._do_set_replica_gen_and_trans_id, 1, 2, 3)
- # fake a conflict so we can test resolve_doc()
- first_rev = self.db._allocate_doc_rev(None)
- doc = couch.CouchDocument(
- doc_id='mydoc', rev=self.db._allocate_doc_rev(first_rev))
- self.db._get_doc = Mock(return_value=doc)
- self.assertRaises(
- errors.MissingDesignDocError,
- self.db.resolve_doc, doc, [first_rev])
def test_missing_design_doc_functions_raises(self):
"""
@@ -645,10 +633,6 @@ class CouchDatabaseExceptionsTests(CouchDBTestCase):
self.assertRaises(
errors.MissingDesignDocDeletedError,
self.db._get_transaction_log)
- # create_doc()
- self.assertRaises(
- errors.MissingDesignDocDeletedError,
- self.db.create_doc, {})
# whats_changed()
self.assertRaises(
errors.MissingDesignDocDeletedError,
@@ -657,14 +641,6 @@ class CouchDatabaseExceptionsTests(CouchDBTestCase):
self.assertRaises(
errors.MissingDesignDocDeletedError,
self.db._do_set_replica_gen_and_trans_id, 1, 2, 3)
- # fake a conflict so we can test resolve_doc()
- first_rev = self.db._allocate_doc_rev(None)
- doc = couch.CouchDocument(
- doc_id='mydoc', rev=self.db._allocate_doc_rev(first_rev))
- self.db._get_doc = Mock(return_value=doc)
- self.assertRaises(
- errors.MissingDesignDocDeletedError,
- self.db.resolve_doc, doc, [first_rev])
load_tests = tests.load_with_scenarios
diff --git a/scripts/migrate_dbs.py b/scripts/migrate_dbs.py
deleted file mode 100644
index f1c20d87..00000000
--- a/scripts/migrate_dbs.py
+++ /dev/null
@@ -1,288 +0,0 @@
-#!/usr/bin/python
-
-import sys
-import json
-import logging
-import argparse
-import re
-import threading
-from urlparse import urlparse
-from ConfigParser import ConfigParser
-from couchdb.client import Server
-from couchdb.http import ResourceNotFound, Resource, Session
-from datetime import datetime
-
-from leap.soledad.common.couch import CouchDatabase
-
-
-# parse command line for the log file name
-logger_fname = "/tmp/u1db-couch-db-migration_%s.log" % \
- str(datetime.now()).replace(' ', '_')
-parser = argparse.ArgumentParser()
-parser.add_argument('--log', action='store', default=logger_fname, type=str,
- required=False, help='the name of the log file', nargs=1)
-args = parser.parse_args()
-
-
-# configure the logger
-logger = logging.getLogger(__name__)
-logger.setLevel(logging.DEBUG)
-print "Logging to %s." % args.log
-logging.basicConfig(
- filename=args.log,
- format="%(asctime)-15s %(message)s")
-
-
-# configure threads
-max_threads = 20
-semaphore_pool = threading.BoundedSemaphore(value=max_threads)
-
-# get couch url
-cp = ConfigParser()
-cp.read('/etc/leap/soledad-server.conf')
-url = cp.get('soledad-server', 'couch_url')
-
-resource = Resource(url, Session(retry_delays=[1,2,4,8], timeout=10))
-server = Server(url=resource)
-
-hidden_url = re.sub(
- 'http://(.*):.*@',
- 'http://\\1:xxxxx@',
- url)
-
-print """
-==========
-ATTENTION!
-==========
-
-This script will modify Soledad's shared and user databases in:
-
- %s
-
-This script does not make a backup of the couch db data, so make sure youj
-have a copy or you may loose data.
-""" % hidden_url
-confirm = raw_input("Proceed (type uppercase YES)? ")
-
-if confirm != "YES":
- exit(1)
-
-
-#
-# Thread
-#
-
-class DocWorkerThread(threading.Thread):
-
- def __init__(self, dbname, doc_id, db_idx, db_len, doc_idx, doc_len,
- transaction_log, conflict_log, release_fun):
- threading.Thread.__init__(self)
- resource = Resource(url, Session(retry_delays=[1,2,4,8], timeout=10))
- server = Server(url=resource)
- self._dbname = dbname
- self._cdb = server[self._dbname]
- self._doc_id = doc_id
- self._db_idx = db_idx
- self._db_len = db_len
- self._doc_idx = doc_idx
- self._doc_len = doc_len
- self._transaction_log = transaction_log
- self._conflict_log = conflict_log
- self._release_fun = release_fun
-
- def run(self):
-
- old_doc = self._cdb[self._doc_id]
-
- # skip non u1db docs
- if 'u1db_rev' not in old_doc:
- logger.debug('(%d/%d) (%d/%d) Skipping %s/%s).' %
- (self._db_idx, self._db_len, self._doc_idx,
- self._doc_len, self._dbname, self._doc_id))
- self._release_fun()
- return
- else:
- logger.debug('(%d/%d) (%d/%d) Processing %s/%s ...' %
- (self._db_idx, self._db_len, self._doc_idx,
- self._doc_len, self._dbname, self._doc_id))
-
- doc = {
- '_id': self._doc_id,
- '_rev': old_doc['_rev'],
- 'u1db_rev': old_doc['u1db_rev']
- }
- attachments = []
-
- # add transactions
- doc['u1db_transactions'] = map(
- lambda (gen, doc_id, trans_id): (gen, trans_id),
- filter(
- lambda (gen, doc_id, trans_id): doc_id == doc['_id'],
- self._transaction_log))
- if len(doc['u1db_transactions']) == 0:
- del doc['u1db_transactions']
-
- # add conflicts
- if doc['_id'] in self._conflict_log:
- attachments.append([
- conflict_log[doc['_id']],
- 'u1db_conflicts',
- "application/octet-stream"])
-
- # move document's content to 'u1db_content' attachment
- content = self._cdb.get_attachment(doc, 'u1db_json')
- if content is not None:
- attachments.append([
- content,
- 'u1db_content',
- "application/octet-stream"])
- #self._cdb.delete_attachment(doc, 'u1db_json')
-
- # save modified doc
- self._cdb.save(doc)
-
- # save all doc attachments
- for content, att_name, content_type in attachments:
- self._cdb.put_attachment(
- doc,
- content,
- filename=att_name,
- content_type=content_type)
-
- # release the semaphore
- self._release_fun()
-
-
-db_idx = 0
-db_len = len(server)
-for dbname in server:
-
- db_idx += 1
-
- if not (dbname.startswith('user-') or dbname == 'shared') \
- or dbname == 'user-test-db':
- logger.info("(%d/%d) Skipping db %s." % (db_idx, db_len, dbname))
- continue
-
- logger.info("(%d/%d) Migrating db %s." % (db_idx, db_len, dbname))
-
- # get access to couch db
- cdb = Server(url)[dbname]
-
- # get access to soledad db
- sdb = CouchDatabase(url, dbname)
-
- # Migration table
- # ---------------
- #
- # * Metadata that was previously stored in special documents migrate to
- # inside documents, to allow for atomic doc-and-metadata updates.
- # * Doc content attachment name changes.
- # * Indexes are removed, to be implemented in the future possibly as
- # design docs view functions.
- #
- # +-----------------+-------------------------+-------------------------+
- # | Data | old storage | new storage |
- # |-----------------+-------------------------+-------------------------+
- # | doc content | <doc_id>/u1db_json | <doc_id>/u1db_content |
- # | doc conflicts | u1db/_conflicts | <doc_id>/u1db_conflicts |
- # | transaction log | u1db/_transaction_log | doc.u1db_transactions |
- # | sync log | u1db/_other_generations | u1db_sync_log |
- # | indexes | u1db/_indexes | not implemented |
- # | replica uid | u1db/_replica_uid | u1db_config |
- # +-----------------+-------------------------+-------------------------+
-
- def get_att_content(db, doc_id, att_name):
- try:
- return json.loads(
- db.get_attachment(
- doc_id, att_name).read())['content']
- except:
- import ipdb
- ipdb.set_trace()
-
- # only migrate databases that have the 'u1db/_replica_uid' document
- try:
- metadoc = cdb.get('u1db/_replica_uid')
- replica_uid = get_att_content(cdb, 'u1db/_replica_uid', 'u1db_json')
- except ResourceNotFound:
- continue
-
- #---------------------------------------------------------------------
- # Step 1: Set replica uid.
- #---------------------------------------------------------------------
- sdb._set_replica_uid(replica_uid)
-
- #---------------------------------------------------------------------
- # Step 2: Obtain metadata.
- #---------------------------------------------------------------------
-
- # obtain the transaction log: [['<doc_id>', '<trans_id>'], ...]
- transaction_log = get_att_content(
- cdb, 'u1db/_transaction_log', 'u1db_json')
- new_transaction_log = []
- gen = 1
- for (doc_id, trans_id) in transaction_log:
- new_transaction_log.append((gen, doc_id, trans_id))
- gen += 1
- transaction_log = new_transaction_log
-
- # obtain the conflict log: {'<doc_id>': ['<rev>', '<content>'], ...}
- conflict_log = get_att_content(cdb, 'u1db/_conflicts', 'u1db_json')
-
- # obtain the sync log:
- # {'<replica_uid>': ['<gen>', '<transaction_id>'], ...}
- other_generations = get_att_content(
- cdb, 'u1db/_other_generations', 'u1db_json')
-
- #---------------------------------------------------------------------
- # Step 3: Iterate over all documents in database.
- #---------------------------------------------------------------------
- doc_len = len(cdb)
- logger.info("(%d, %d) Found %d documents." % (db_idx, db_len, doc_len))
- doc_idx = 0
- threads = []
- for doc_id in cdb:
- doc_idx = doc_idx + 1
-
- semaphore_pool.acquire()
- thread = DocWorkerThread(dbname, doc_id, db_idx, db_len,
- doc_idx, doc_len, transaction_log,
- conflict_log, semaphore_pool.release)
- thread.daemon = True
- thread.start()
- threads.append(thread)
-
- map(lambda thread: thread.join(), threads)
-
- #---------------------------------------------------------------------
- # Step 4: Move sync log.
- #---------------------------------------------------------------------
-
- # move sync log
- sync_doc = {
- '_id': 'u1db_sync_log',
- 'syncs': []
- }
-
- for replica_uid in other_generations:
- gen, transaction_id = other_generations[replica_uid]
- sync_doc['syncs'].append([replica_uid, gen, transaction_id])
- cdb.save(sync_doc)
-
- #---------------------------------------------------------------------
- # Step 5: Delete old meta documents.
- #---------------------------------------------------------------------
-
- # remove unused docs
- for doc_id in ['_transaction_log', '_conflicts', '_other_generations',
- '_indexes', '_replica_uid']:
- for prefix in ['u1db/', 'u1db%2F']:
- try:
- doc = cdb['%s%s' % (prefix, doc_id)]
- logger.info(
- "(%d/%d) Deleting %s/%s/%s." %
- (db_idx, db_len, dbname, 'u1db', doc_id))
- cdb.delete(doc)
- except ResourceNotFound:
- pass
diff --git a/scripts/update_design_docs.py b/scripts/update_design_docs.py
new file mode 100644
index 00000000..e7b5a29c
--- /dev/null
+++ b/scripts/update_design_docs.py
@@ -0,0 +1,147 @@
+#!/usr/bin/python
+
+# This script updates Soledad's design documents in the session database and
+# all user databases with contents from the installed leap.soledad.common
+# package.
+
+import json
+import logging
+import argparse
+import re
+import threading
+import binascii
+
+
+from getpass import getpass
+from ConfigParser import ConfigParser
+from couchdb.client import Server
+from couchdb.http import Resource, Session
+from datetime import datetime
+from urlparse import urlparse
+
+
+from leap.soledad.common import ddocs
+
+
+# parse command line for the log file name
+logger_fname = "/tmp/update-design-docs_%s.log" % \
+ str(datetime.now()).replace(' ', '_')
+parser = argparse.ArgumentParser()
+parser.add_argument('--log', action='store', default=logger_fname, type=str,
+ required=False, help='the name of the log file', nargs=1)
+args = parser.parse_args()
+
+
+# configure the logger
+logger = logging.getLogger(__name__)
+logger.setLevel(logging.DEBUG)
+print "Logging to %s." % args.log
+logging.basicConfig(
+ filename=args.log,
+ format="%(asctime)-15s %(message)s")
+
+
+# configure threads
+max_threads = 20
+semaphore_pool = threading.BoundedSemaphore(value=max_threads)
+threads = []
+
+# get couch url
+cp = ConfigParser()
+cp.read('/etc/leap/soledad-server.conf')
+url = urlparse(cp.get('soledad-server', 'couch_url'))
+
+# get admin password
+netloc = re.sub('^.*@', '', url.netloc)
+url = url._replace(netloc=netloc)
+password = getpass("Admin password for %s: " % url.geturl())
+url = url._replace(netloc='admin:%s@%s' % (password, netloc))
+
+resource = Resource(url.geturl(), Session(retry_delays=[1,2,4,8], timeout=10))
+server = Server(url=resource)
+
+hidden_url = re.sub(
+ 'http://(.*):.*@',
+ 'http://\\1:xxxxx@',
+ url.geturl())
+
+print """
+==========
+ATTENTION!
+==========
+
+This script will modify Soledad's shared and user databases in:
+
+ %s
+
+This script does not make a backup of the couch db data, so make sure you
+have a copy or you may loose data.
+""" % hidden_url
+confirm = raw_input("Proceed (type uppercase YES)? ")
+
+if confirm != "YES":
+ exit(1)
+
+# convert design doc content
+
+design_docs = {
+ '_design/docs': json.loads(binascii.a2b_base64(ddocs.docs)),
+ '_design/syncs': json.loads(binascii.a2b_base64(ddocs.syncs)),
+ '_design/transactions': json.loads(binascii.a2b_base64(ddocs.transactions)),
+}
+
+#
+# Thread
+#
+
+class DBWorkerThread(threading.Thread):
+
+ def __init__(self, server, dbname, db_idx, db_len, release_fun):
+ threading.Thread.__init__(self)
+ self._dbname = dbname
+ self._cdb = server[self._dbname]
+ self._db_idx = db_idx
+ self._db_len = db_len
+ self._release_fun = release_fun
+
+ def run(self):
+
+ logger.info("(%d/%d) Updating db %s." % (self._db_idx, self._db_len,
+ self._dbname))
+
+ for doc_id in design_docs:
+ doc = self._cdb[doc_id]
+ for key in ['lists', 'views', 'updates']:
+ if key in design_docs[doc_id]:
+ doc[key] = design_docs[doc_id][key]
+ self._cdb.save(doc)
+
+ # release the semaphore
+ self._release_fun()
+
+
+db_idx = 0
+db_len = len(server)
+for dbname in server:
+
+ db_idx += 1
+
+ if not (dbname.startswith('user-') or dbname == 'shared') \
+ or dbname == 'user-test-db':
+ logger.info("(%d/%d) Skipping db %s." % (db_idx, db_len, dbname))
+ continue
+
+
+ # get access to couch db
+ cdb = Server(url.geturl())[dbname]
+
+ #---------------------------------------------------------------------
+ # Start DB worker thread
+ #---------------------------------------------------------------------
+ semaphore_pool.acquire()
+ thread = DBWorkerThread(server, dbname, db_idx, db_len, semaphore_pool.release)
+ thread.daemon = True
+ thread.start()
+ threads.append(thread)
+
+map(lambda thread: thread.join(), threads)