From 7de7b4653b1815818611d2a6302bd38d1e7a7ccc Mon Sep 17 00:00:00 2001 From: drebs Date: Fri, 21 Feb 2014 17:49:28 -0300 Subject: Use less memory when putting docs on couch. Closes #5011. --- common/src/leap/soledad/common/couch.py | 267 +++++++++++++++------ .../leap/soledad/common/ddocs/docs/updates/put.js | 64 ----- common/src/leap/soledad/common/tests/test_couch.py | 8 - 3 files changed, 198 insertions(+), 141 deletions(-) delete mode 100644 common/src/leap/soledad/common/ddocs/docs/updates/put.js (limited to 'common/src') diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py index 40d64370..09fe1ca3 100644 --- a/common/src/leap/soledad/common/couch.py +++ b/common/src/leap/soledad/common/couch.py @@ -24,10 +24,21 @@ import uuid import logging import binascii import socket +import time +import sys + + +from StringIO import StringIO from couchdb.client import Server -from couchdb.http import ResourceNotFound, Unauthorized, ServerError, Session +from couchdb.http import ( + ResourceConflict, + ResourceNotFound, + Unauthorized, + ServerError, + Session, +) from u1db import query_parser, vectorclock from u1db.errors import ( DatabaseDoesNotExist, @@ -87,9 +98,9 @@ class CouchDocument(SoledadDocument): SoledadDocument.__init__(self, doc_id, rev, json, has_conflicts) self._couch_rev = None self._conflicts = None - self._modified_conflicts = False + self._transactions = None - def ensure_fetch_conflicts(self, get_conflicts_fun): + def _ensure_fetch_conflicts(self, get_conflicts_fun): """ Ensure conflict data has been fetched from the server. @@ -120,8 +131,7 @@ class CouchDocument(SoledadDocument): :type doc: CouchDocument """ if self._conflicts is None: - raise Exception("Run self.ensure_fetch_conflicts first!") - self._modified_conflicts = True + raise Exception("Run self._ensure_fetch_conflicts first!") self._conflicts.append(doc) self.has_conflicts = len(self._conflicts) > 0 @@ -133,25 +143,13 @@ class CouchDocument(SoledadDocument): :type conflict_revs: [str] """ if self._conflicts is None: - raise Exception("Run self.ensure_fetch_conflicts first!") + raise Exception("Run self._ensure_fetch_conflicts first!") conflicts_len = len(self._conflicts) self._conflicts = filter( lambda doc: doc.rev not in conflict_revs, self._conflicts) - if len(self._conflicts) < conflicts_len: - self._modified_conflicts = True self.has_conflicts = len(self._conflicts) > 0 - def modified_conflicts(self): - """ - Return whether this document's conflicts have been modified. - - :return: Whether this document's conflicts have been modified. - :rtype: bool - """ - return self._conflicts is not None and \ - self._modified_conflicts is True - def _get_couch_rev(self): return self._couch_rev @@ -160,6 +158,14 @@ class CouchDocument(SoledadDocument): couch_rev = property(_get_couch_rev, _set_couch_rev) + def _get_transactions(self): + return self._transactions + + def _set_transactions(self, rev): + self._transactions = rev + + transactions = property(_get_transactions, _set_transactions) + # monkey-patch the u1db http app to use CouchDocument http_app.Document = CouchDocument @@ -225,6 +231,94 @@ def raise_server_error(exc, ddoc_path): raise errors.DesignDocUnknownError(path) +class MultipartWriter(object): + """ + A multipart writer adapted from python-couchdb's one so we can PUT + documents using couch's multipart PUT. + + This stripped down version does not allow for nested structures, and + contains only the essential things we need to PUT SoledadDocuments to the + couch backend. + """ + + CRLF = '\r\n' + + def __init__(self, fileobj, headers=None, boundary=None): + """ + Initialize the multipart writer. + """ + self.fileobj = fileobj + if boundary is None: + boundary = self._make_boundary() + self._boundary = boundary + self._build_headers('related', headers) + + def add(self, mimetype, content, headers={}): + """ + Add a part to the multipart stream. + """ + self.fileobj.write('--') + self.fileobj.write(self._boundary) + self.fileobj.write(self.CRLF) + headers['Content-Type'] = mimetype + self._write_headers(headers) + if content: + # XXX: throw an exception if a boundary appears in the content?? + self.fileobj.write(content) + self.fileobj.write(self.CRLF) + + def close(self): + """ + Close the multipart stream. + """ + self.fileobj.write('--') + self.fileobj.write(self._boundary) + # be careful not to have anything after '--', otherwise old couch + # versions (including bigcouch) will fail. + self.fileobj.write('--') + + def _make_boundary(self): + """ + Create a boundary to discern multi parts. + """ + try: + from uuid import uuid4 + return '==' + uuid4().hex + '==' + except ImportError: + from random import randrange + token = randrange(sys.maxint) + format = '%%0%dd' % len(repr(sys.maxint - 1)) + return '===============' + (format % token) + '==' + + def _write_headers(self, headers): + """ + Write a part header in the buffer stream. + """ + if headers: + for name in sorted(headers.keys()): + value = headers[name] + self.fileobj.write(name) + self.fileobj.write(': ') + self.fileobj.write(value) + self.fileobj.write(self.CRLF) + self.fileobj.write(self.CRLF) + + def _build_headers(self, subtype, headers): + """ + Build the main headers of the multipart stream. + + This is here so we can send headers separete from content using + python-couchdb API. + """ + self.headers = {} + self.headers['Content-Type'] = 'multipart/%s; boundary="%s"' % \ + (subtype, self._boundary) + if headers: + for name in sorted(headers.keys()): + value = headers[name] + self.headers[name] = value + + class CouchDatabase(CommonBackend): """ A U1DB implementation that uses CouchDB as its persistence layer. @@ -564,8 +658,12 @@ class CouchDatabase(CommonBackend): and '_attachments' in result \ and 'u1db_conflicts' in result['_attachments']: doc.has_conflicts = True + #doc.conflicts = self._build_conflicts( + # json.loads(result['_attachments']['u1db_conflicts'])) # store couch revision doc.couch_rev = result['_rev'] + # store transactions + doc.transactions = result['u1db_transactions'] return doc def get_doc(self, doc_id, include_deleted=False): @@ -616,6 +714,10 @@ class CouchDatabase(CommonBackend): """ Put the document in the Couch backend database. + Note that C{old_doc} must have been fetched with the parameter + C{check_for_conflicts} equal to True, so we can properly update the + new document using the conflict information from the old one. + :param old_doc: The old document version. :type old_doc: CouchDocument :param doc: The document to be put. @@ -637,45 +739,61 @@ class CouchDatabase(CommonBackend): design document for an yet unknown reason. """ - trans_id = self._allocate_transaction_id() - # encode content - content = doc.get_json() - if content is not None: - content = binascii.b2a_base64(content)[:-1] # exclude trailing \n - # encode conflicts - conflicts = None - update_conflicts = doc.modified_conflicts() - if update_conflicts is True: - if doc.has_conflicts: - conflicts = binascii.b2a_base64( - json.dumps( - map(lambda cdoc: (cdoc.rev, cdoc.content), - doc.get_conflicts())) - )[:-1] # exclude \n - # perform the request - ddoc_path = ['_design', 'docs', '_update', 'put', doc.doc_id] - resource = self._database.resource(*ddoc_path) + attachments = {} # we save content and conflicts as attachments + parts = [] # and we put it using couch's multipart PUT + # save content as attachment + if doc.is_tombstone() is False: + content = doc.get_json() + attachments['u1db_content'] = { + 'follows': True, + 'content_type': 'application/octet-stream', + 'length': len(content), + } + parts.append(content) + # save conflicts as attachment + if doc.has_conflicts is True: + conflicts = json.dumps( + map(lambda cdoc: (cdoc.rev, cdoc.content), + doc.get_conflicts())) + attachments['u1db_conflicts'] = { + 'follows': True, + 'content_type': 'application/octet-stream', + 'length': len(conflicts), + } + parts.append(conflicts) + # store old transactions, if any + transactions = old_doc.transactions[:] if old_doc is not None else [] + # create a new transaction id and timestamp it so the transaction log + # is consistent when querying the database. + transactions.append( + # here we store milliseconds to keep consistent with javascript + # Date.prototype.getTime() which was used before inside a couchdb + # update handler. + (int(time.time() * 1000), + self._allocate_transaction_id())) + # build the couch document + couch_doc = { + '_id': doc.doc_id, + 'u1db_rev': doc.rev, + 'u1db_transactions': transactions, + '_attachments': attachments, + } + # if we are updating a doc we have to add the couch doc revision + if old_doc is not None: + couch_doc['_rev'] = old_doc.couch_rev + # prepare the multipart PUT + buf = StringIO() + envelope = MultipartWriter(buf) + envelope.add('application/json', json.dumps(couch_doc)) + for part in parts: + envelope.add('application/octet-stream', part) + envelope.close() + # try to save and fail if there's a revision conflict try: - response = resource.put_json( - body={ - 'couch_rev': old_doc.couch_rev - if old_doc is not None else None, - 'u1db_rev': doc.rev, - 'content': content, - 'trans_id': trans_id, - 'conflicts': conflicts, - 'update_conflicts': update_conflicts, - }, - headers={'content-type': 'application/json'}) - # the document might have been updated in between, so we check for - # the return message - msg = response[2].read() - if msg == 'ok': - return - elif msg == 'revision conflict': - raise RevisionConflict() - except ResourceNotFound as e: - raise_missing_design_doc_error(e, ddoc_path) + self._database.resource.put_json( + doc.doc_id, body=buf.getvalue(), headers=envelope.headers) + except ResourceConflict: + raise RevisionConflict() def put_doc(self, doc): """ @@ -810,6 +928,25 @@ class CouchDatabase(CommonBackend): self._put_doc(old_doc, doc) return new_rev + def _build_conflicts(self, doc_id, attached_conflicts): + """ + Build the conflicted documents list from the conflicts attachment + fetched from a couch document. + + :param attached_conflicts: The document's conflicts as fetched from a + couch document attachment. + :type attached_conflicts: dict + """ + conflicts = [] + for doc_rev, content in attached_conflicts: + doc = self._factory(doc_id, doc_rev) + if content is None: + doc.make_tombstone() + else: + doc.content = content + conflicts.append(doc) + return conflicts + def _get_conflicts(self, doc_id, couch_rev=None): """ Get the conflicted versions of a document. @@ -830,16 +967,8 @@ class CouchDatabase(CommonBackend): resource = self._database.resource(doc_id, 'u1db_conflicts') try: response = resource.get_json(**params) - conflicts = [] - # build the conflicted versions - for doc_rev, content in json.loads(response[2].read()): - doc = self._factory(doc_id, doc_rev) - if content is None: - doc.make_tombstone() - else: - doc.content = content - conflicts.append(doc) - return conflicts + return self._build_conflicts( + doc_id, json.loads(response[2].read())) except ResourceNotFound: return [] @@ -970,7 +1099,7 @@ class CouchDatabase(CommonBackend): serialized string. :type my_content: str """ - doc.ensure_fetch_conflicts(self._get_conflicts) + doc._ensure_fetch_conflicts(self._get_conflicts) doc.add_conflict( self._factory(doc_id=doc.doc_id, rev=my_doc_rev, json=my_content)) @@ -988,7 +1117,7 @@ class CouchDatabase(CommonBackend): :param conflict_revs: A list of the revisions to be deleted. :param conflict_revs: [str] """ - doc.ensure_fetch_conflicts(self._get_conflicts) + doc._ensure_fetch_conflicts(self._get_conflicts) doc.delete_conflicts(conflict_revs) def _prune_conflicts(self, doc, doc_vcr): @@ -1150,7 +1279,7 @@ class CouchDatabase(CommonBackend): if cur_doc is not None: doc.couch_rev = cur_doc.couch_rev # fetch conflicts because we will eventually manipulate them - doc.ensure_fetch_conflicts(self._get_conflicts) + doc._ensure_fetch_conflicts(self._get_conflicts) # from now on, it works just like u1db sqlite backend doc_vcr = vectorclock.VectorClockRev(doc.rev) if cur_doc is None: diff --git a/common/src/leap/soledad/common/ddocs/docs/updates/put.js b/common/src/leap/soledad/common/ddocs/docs/updates/put.js deleted file mode 100644 index 5a4647de..00000000 --- a/common/src/leap/soledad/common/ddocs/docs/updates/put.js +++ /dev/null @@ -1,64 +0,0 @@ -function(doc, req){ - /* we expect to receive the following in `req.body`: - * { - * 'couch_rev': '', - * 'u1db_rev': '', - * 'content': '', - * 'trans_id': '' - * 'conflicts': '', - * 'update_conflicts': - * } - */ - var body = JSON.parse(req.body); - - // create a new document document - if (!doc) { - doc = {} - doc['_id'] = req['id']; - } - // or fail if couch revisions do not match - else if (doc['_rev'] != body['couch_rev']) { - // of fail if revisions do not match - return [null, 'revision conflict'] - } - - // store u1db rev - doc.u1db_rev = body['u1db_rev']; - - // save content as attachment - if (body['content'] != null) { - // save u1db content as attachment - if (!doc._attachments) - doc._attachments = {}; - doc._attachments.u1db_content = { - content_type: "application/octet-stream", - data: body['content'] // should be base64 encoded - }; - } - // or delete the attachment if document is tombstone - else if (doc._attachments && - doc._attachments.u1db_content) - delete doc._attachments.u1db_content; - - // store the transaction id - if (!doc.u1db_transactions) - doc.u1db_transactions = []; - var d = new Date(); - doc.u1db_transactions.push([d.getTime(), body['trans_id']]); - - // save conflicts as attachment if they were sent - if (body['update_conflicts']) - if (body['conflicts'] != null) { - if (!doc._attachments) - doc._attachments = {}; - doc._attachments.u1db_conflicts = { - content_type: "application/octet-stream", - data: body['conflicts'] // should be base64 encoded - } - } else { - if(doc._attachments && doc._attachments.u1db_conflicts) - delete doc._attachments.u1db_conflicts - } - - return [doc, 'ok']; -} diff --git a/common/src/leap/soledad/common/tests/test_couch.py b/common/src/leap/soledad/common/tests/test_couch.py index dc0ea906..97d744e4 100644 --- a/common/src/leap/soledad/common/tests/test_couch.py +++ b/common/src/leap/soledad/common/tests/test_couch.py @@ -495,10 +495,6 @@ class CouchDatabaseExceptionsTests(CouchDBTestCase): self.assertRaises( errors.MissingDesignDocError, self.db._get_transaction_log) - # create_doc() - self.assertRaises( - errors.MissingDesignDocError, - self.db.create_doc, {}) # whats_changed() self.assertRaises( errors.MissingDesignDocError, @@ -645,10 +641,6 @@ class CouchDatabaseExceptionsTests(CouchDBTestCase): self.assertRaises( errors.MissingDesignDocDeletedError, self.db._get_transaction_log) - # create_doc() - self.assertRaises( - errors.MissingDesignDocDeletedError, - self.db.create_doc, {}) # whats_changed() self.assertRaises( errors.MissingDesignDocDeletedError, -- cgit v1.2.3