diff options
| author | drebs <drebs@leap.se> | 2014-02-21 17:49:28 -0300 | 
|---|---|---|
| committer | drebs <drebs@leap.se> | 2014-03-11 09:41:31 -0300 | 
| commit | 7de7b4653b1815818611d2a6302bd38d1e7a7ccc (patch) | |
| tree | 6f85c9d745a4935d27e41a05248ebc0915e2da84 /common/src/leap | |
| parent | 66c93f4515139fcf666948b31e09c5bfae91e3dc (diff) | |
Use less memory when putting docs on couch. Closes #5011.
Diffstat (limited to 'common/src/leap')
| -rw-r--r-- | common/src/leap/soledad/common/couch.py | 267 | ||||
| -rw-r--r-- | common/src/leap/soledad/common/ddocs/docs/updates/put.js | 64 | ||||
| -rw-r--r-- | common/src/leap/soledad/common/tests/test_couch.py | 8 | 
3 files changed, 198 insertions, 141 deletions
| diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py index 40d64370..09fe1ca3 100644 --- a/common/src/leap/soledad/common/couch.py +++ b/common/src/leap/soledad/common/couch.py @@ -24,10 +24,21 @@ import uuid  import logging  import binascii  import socket +import time +import sys + + +from StringIO import StringIO  from couchdb.client import Server -from couchdb.http import ResourceNotFound, Unauthorized, ServerError, Session +from couchdb.http import ( +    ResourceConflict, +    ResourceNotFound, +    Unauthorized, +    ServerError, +    Session, +)  from u1db import query_parser, vectorclock  from u1db.errors import (      DatabaseDoesNotExist, @@ -87,9 +98,9 @@ class CouchDocument(SoledadDocument):          SoledadDocument.__init__(self, doc_id, rev, json, has_conflicts)          self._couch_rev = None          self._conflicts = None -        self._modified_conflicts = False +        self._transactions = None -    def ensure_fetch_conflicts(self, get_conflicts_fun): +    def _ensure_fetch_conflicts(self, get_conflicts_fun):          """          Ensure conflict data has been fetched from the server. @@ -120,8 +131,7 @@ class CouchDocument(SoledadDocument):          :type doc: CouchDocument          """          if self._conflicts is None: -            raise Exception("Run self.ensure_fetch_conflicts first!") -        self._modified_conflicts = True +            raise Exception("Run self._ensure_fetch_conflicts first!")          self._conflicts.append(doc)          self.has_conflicts = len(self._conflicts) > 0 @@ -133,25 +143,13 @@ class CouchDocument(SoledadDocument):          :type conflict_revs: [str]          """          if self._conflicts is None: -            raise Exception("Run self.ensure_fetch_conflicts first!") +            raise Exception("Run self._ensure_fetch_conflicts first!")          conflicts_len = len(self._conflicts)          self._conflicts = filter(              lambda doc: doc.rev not in conflict_revs,              self._conflicts) -        if len(self._conflicts) < conflicts_len: -            self._modified_conflicts = True          self.has_conflicts = len(self._conflicts) > 0 -    def modified_conflicts(self): -        """ -        Return whether this document's conflicts have been modified. - -        :return: Whether this document's conflicts have been modified. -        :rtype: bool -        """ -        return self._conflicts is not None and \ -            self._modified_conflicts is True -      def _get_couch_rev(self):          return self._couch_rev @@ -160,6 +158,14 @@ class CouchDocument(SoledadDocument):      couch_rev = property(_get_couch_rev, _set_couch_rev) +    def _get_transactions(self): +        return self._transactions + +    def _set_transactions(self, rev): +        self._transactions = rev + +    transactions = property(_get_transactions, _set_transactions) +  # monkey-patch the u1db http app to use CouchDocument  http_app.Document = CouchDocument @@ -225,6 +231,94 @@ def raise_server_error(exc, ddoc_path):      raise errors.DesignDocUnknownError(path) +class MultipartWriter(object): +    """ +    A multipart writer adapted from python-couchdb's one so we can PUT +    documents using couch's multipart PUT. + +    This stripped down version does not allow for nested structures, and +    contains only the essential things we need to PUT SoledadDocuments to the +    couch backend. +    """ + +    CRLF = '\r\n' + +    def __init__(self, fileobj, headers=None, boundary=None): +        """ +        Initialize the multipart writer. +        """ +        self.fileobj = fileobj +        if boundary is None: +            boundary = self._make_boundary() +        self._boundary = boundary +        self._build_headers('related', headers) + +    def add(self, mimetype, content, headers={}): +        """ +        Add a part to the multipart stream. +        """ +        self.fileobj.write('--') +        self.fileobj.write(self._boundary) +        self.fileobj.write(self.CRLF) +        headers['Content-Type'] = mimetype +        self._write_headers(headers) +        if content: +            # XXX: throw an exception if a boundary appears in the content?? +            self.fileobj.write(content) +            self.fileobj.write(self.CRLF) + +    def close(self): +        """ +        Close the multipart stream. +        """ +        self.fileobj.write('--') +        self.fileobj.write(self._boundary) +        # be careful not to have anything after '--', otherwise old couch +        # versions (including bigcouch) will fail. +        self.fileobj.write('--') + +    def _make_boundary(self): +        """ +        Create a boundary to discern multi parts. +        """ +        try: +            from uuid import uuid4 +            return '==' + uuid4().hex + '==' +        except ImportError: +            from random import randrange +            token = randrange(sys.maxint) +            format = '%%0%dd' % len(repr(sys.maxint - 1)) +            return '===============' + (format % token) + '==' + +    def _write_headers(self, headers): +        """ +        Write a part header in the buffer stream. +        """ +        if headers: +            for name in sorted(headers.keys()): +                value = headers[name] +                self.fileobj.write(name) +                self.fileobj.write(': ') +                self.fileobj.write(value) +                self.fileobj.write(self.CRLF) +        self.fileobj.write(self.CRLF) + +    def _build_headers(self, subtype, headers): +        """ +        Build the main headers of the multipart stream. + +        This is here so we can send headers separete from content using +        python-couchdb API. +        """ +        self.headers = {} +        self.headers['Content-Type'] = 'multipart/%s; boundary="%s"' % \ +                                       (subtype, self._boundary) +        if headers: +            for name in sorted(headers.keys()): +                value = headers[name] +                self.headers[name] = value + +  class CouchDatabase(CommonBackend):      """      A U1DB implementation that uses CouchDB as its persistence layer. @@ -564,8 +658,12 @@ class CouchDatabase(CommonBackend):                  and '_attachments' in result \                  and 'u1db_conflicts' in result['_attachments']:              doc.has_conflicts = True +            #doc.conflicts = self._build_conflicts( +            #    json.loads(result['_attachments']['u1db_conflicts']))          # store couch revision          doc.couch_rev = result['_rev'] +        # store transactions +        doc.transactions = result['u1db_transactions']          return doc      def get_doc(self, doc_id, include_deleted=False): @@ -616,6 +714,10 @@ class CouchDatabase(CommonBackend):          """          Put the document in the Couch backend database. +        Note that C{old_doc} must have been fetched with the parameter +        C{check_for_conflicts} equal to True, so we can properly update the +        new document using the conflict information from the old one. +          :param old_doc: The old document version.          :type old_doc: CouchDocument          :param doc: The document to be put. @@ -637,45 +739,61 @@ class CouchDatabase(CommonBackend):                                               design document for an yet                                               unknown reason.          """ -        trans_id = self._allocate_transaction_id() -        # encode content -        content = doc.get_json() -        if content is not None: -            content = binascii.b2a_base64(content)[:-1]  # exclude trailing \n -        # encode conflicts -        conflicts = None -        update_conflicts = doc.modified_conflicts() -        if update_conflicts is True: -            if doc.has_conflicts: -                conflicts = binascii.b2a_base64( -                    json.dumps( -                        map(lambda cdoc: (cdoc.rev, cdoc.content), -                            doc.get_conflicts())) -            )[:-1]  # exclude \n -        # perform the request -        ddoc_path = ['_design', 'docs', '_update', 'put', doc.doc_id] -        resource = self._database.resource(*ddoc_path) +        attachments = {}  # we save content and conflicts as attachments +        parts = []  # and we put it using couch's multipart PUT +        # save content as attachment +        if doc.is_tombstone() is False: +            content = doc.get_json() +            attachments['u1db_content'] = { +                'follows': True, +                'content_type': 'application/octet-stream', +                'length': len(content), +            } +            parts.append(content) +        # save conflicts as attachment +        if doc.has_conflicts is True: +            conflicts = json.dumps( +                map(lambda cdoc: (cdoc.rev, cdoc.content), +                    doc.get_conflicts())) +            attachments['u1db_conflicts'] = { +                'follows': True, +                'content_type': 'application/octet-stream', +                'length': len(conflicts), +            } +            parts.append(conflicts) +        # store old transactions, if any +        transactions = old_doc.transactions[:] if old_doc is not None else [] +        # create a new transaction id and timestamp it so the transaction log +        # is consistent when querying the database. +        transactions.append( +            # here we store milliseconds to keep consistent with javascript +            # Date.prototype.getTime() which was used before inside a couchdb +            # update handler. +            (int(time.time() * 1000), +            self._allocate_transaction_id())) +        # build the couch document +        couch_doc = { +            '_id': doc.doc_id, +            'u1db_rev': doc.rev, +            'u1db_transactions': transactions, +            '_attachments': attachments, +        } +        # if we are updating a doc we have to add the couch doc revision +        if old_doc is not None: +            couch_doc['_rev'] = old_doc.couch_rev +        # prepare the multipart PUT +        buf = StringIO() +        envelope = MultipartWriter(buf) +        envelope.add('application/json', json.dumps(couch_doc)) +        for part in parts: +            envelope.add('application/octet-stream', part) +        envelope.close() +        # try to save and fail if there's a revision conflict          try: -            response = resource.put_json( -                body={ -                    'couch_rev': old_doc.couch_rev -                        if old_doc is not None else None, -                    'u1db_rev': doc.rev, -                    'content': content, -                    'trans_id': trans_id, -                    'conflicts': conflicts, -                    'update_conflicts': update_conflicts, -                }, -                headers={'content-type': 'application/json'}) -            # the document might have been updated in between, so we check for -            # the return message -            msg = response[2].read() -            if msg == 'ok': -                return -            elif msg == 'revision conflict': -                raise RevisionConflict() -        except ResourceNotFound as e: -            raise_missing_design_doc_error(e, ddoc_path) +            self._database.resource.put_json( +                doc.doc_id, body=buf.getvalue(), headers=envelope.headers) +        except ResourceConflict: +            raise RevisionConflict()      def put_doc(self, doc):          """ @@ -810,6 +928,25 @@ class CouchDatabase(CommonBackend):          self._put_doc(old_doc, doc)          return new_rev +    def _build_conflicts(self, doc_id, attached_conflicts): +        """ +        Build the conflicted documents list from the conflicts attachment +        fetched from a couch document. + +        :param attached_conflicts: The document's conflicts as fetched from a +                                   couch document attachment. +        :type attached_conflicts: dict +        """ +        conflicts = [] +        for doc_rev, content in attached_conflicts: +            doc = self._factory(doc_id, doc_rev) +            if content is None: +                doc.make_tombstone() +            else: +                doc.content = content +            conflicts.append(doc) +        return conflicts +      def _get_conflicts(self, doc_id, couch_rev=None):          """          Get the conflicted versions of a document. @@ -830,16 +967,8 @@ class CouchDatabase(CommonBackend):          resource = self._database.resource(doc_id, 'u1db_conflicts')          try:              response = resource.get_json(**params) -            conflicts = [] -            # build the conflicted versions -            for doc_rev, content in json.loads(response[2].read()): -                doc = self._factory(doc_id, doc_rev) -                if content is None: -                    doc.make_tombstone() -                else: -                    doc.content = content -                conflicts.append(doc) -            return conflicts +            return self._build_conflicts( +                doc_id, json.loads(response[2].read()))          except ResourceNotFound:              return [] @@ -970,7 +1099,7 @@ class CouchDatabase(CommonBackend):                             serialized string.          :type my_content: str          """ -        doc.ensure_fetch_conflicts(self._get_conflicts) +        doc._ensure_fetch_conflicts(self._get_conflicts)          doc.add_conflict(              self._factory(doc_id=doc.doc_id, rev=my_doc_rev,                            json=my_content)) @@ -988,7 +1117,7 @@ class CouchDatabase(CommonBackend):          :param conflict_revs: A list of the revisions to be deleted.          :param conflict_revs: [str]          """ -        doc.ensure_fetch_conflicts(self._get_conflicts) +        doc._ensure_fetch_conflicts(self._get_conflicts)          doc.delete_conflicts(conflict_revs)      def _prune_conflicts(self, doc, doc_vcr): @@ -1150,7 +1279,7 @@ class CouchDatabase(CommonBackend):          if cur_doc is not None:              doc.couch_rev = cur_doc.couch_rev          # fetch conflicts because we will eventually manipulate them -        doc.ensure_fetch_conflicts(self._get_conflicts) +        doc._ensure_fetch_conflicts(self._get_conflicts)          # from now on, it works just like u1db sqlite backend          doc_vcr = vectorclock.VectorClockRev(doc.rev)          if cur_doc is None: diff --git a/common/src/leap/soledad/common/ddocs/docs/updates/put.js b/common/src/leap/soledad/common/ddocs/docs/updates/put.js deleted file mode 100644 index 5a4647de..00000000 --- a/common/src/leap/soledad/common/ddocs/docs/updates/put.js +++ /dev/null @@ -1,64 +0,0 @@ -function(doc, req){ -    /* we expect to receive the following in `req.body`: -     * { -     *     'couch_rev': '<couch_rev>', -     *     'u1db_rev': '<u1db_rev>', -     *     'content': '<base64 encoded content>', -     *     'trans_id': '<reansaction_id>' -     *     'conflicts': '<base64 encoded conflicts>', -     *     'update_conflicts': <boolean> -     * } -     */ -    var body = JSON.parse(req.body); - -    // create a new document document -    if (!doc) { -        doc = {} -        doc['_id'] = req['id']; -    } -    // or fail if couch revisions do not match -    else if (doc['_rev'] != body['couch_rev']) { -        // of fail if revisions do not match -        return [null, 'revision conflict'] -    } - -    // store u1db rev -    doc.u1db_rev = body['u1db_rev']; - -    // save content as attachment -    if (body['content'] != null) { -        // save u1db content as attachment -        if (!doc._attachments) -            doc._attachments = {}; -        doc._attachments.u1db_content =  { -            content_type: "application/octet-stream", -            data: body['content']  // should be base64 encoded -        }; -    } -    // or delete the attachment if document is tombstone -    else if (doc._attachments && -             doc._attachments.u1db_content) -        delete doc._attachments.u1db_content; - -    // store the transaction id -    if (!doc.u1db_transactions) -        doc.u1db_transactions = []; -    var d = new Date(); -    doc.u1db_transactions.push([d.getTime(), body['trans_id']]); - -    // save conflicts as attachment if they were sent -    if (body['update_conflicts']) -        if (body['conflicts'] != null) { -            if (!doc._attachments) -                doc._attachments = {}; -            doc._attachments.u1db_conflicts = { -                content_type: "application/octet-stream", -                data: body['conflicts']  // should be base64 encoded -            } -        } else { -            if(doc._attachments && doc._attachments.u1db_conflicts) -                delete doc._attachments.u1db_conflicts -        } - -    return [doc, 'ok']; -} diff --git a/common/src/leap/soledad/common/tests/test_couch.py b/common/src/leap/soledad/common/tests/test_couch.py index dc0ea906..97d744e4 100644 --- a/common/src/leap/soledad/common/tests/test_couch.py +++ b/common/src/leap/soledad/common/tests/test_couch.py @@ -495,10 +495,6 @@ class CouchDatabaseExceptionsTests(CouchDBTestCase):          self.assertRaises(              errors.MissingDesignDocError,              self.db._get_transaction_log) -        # create_doc() -        self.assertRaises( -            errors.MissingDesignDocError, -            self.db.create_doc, {})          # whats_changed()          self.assertRaises(              errors.MissingDesignDocError, @@ -645,10 +641,6 @@ class CouchDatabaseExceptionsTests(CouchDBTestCase):          self.assertRaises(              errors.MissingDesignDocDeletedError,              self.db._get_transaction_log) -        # create_doc() -        self.assertRaises( -            errors.MissingDesignDocDeletedError, -            self.db.create_doc, {})          # whats_changed()          self.assertRaises(              errors.MissingDesignDocDeletedError, | 
