summaryrefslogtreecommitdiff
path: root/common/src/leap/soledad
diff options
context:
space:
mode:
authordrebs <drebs@leap.se>2014-02-21 17:49:28 -0300
committerdrebs <drebs@leap.se>2014-03-11 09:41:31 -0300
commit7de7b4653b1815818611d2a6302bd38d1e7a7ccc (patch)
tree6f85c9d745a4935d27e41a05248ebc0915e2da84 /common/src/leap/soledad
parent66c93f4515139fcf666948b31e09c5bfae91e3dc (diff)
Use less memory when putting docs on couch. Closes #5011.
Diffstat (limited to 'common/src/leap/soledad')
-rw-r--r--common/src/leap/soledad/common/couch.py267
-rw-r--r--common/src/leap/soledad/common/ddocs/docs/updates/put.js64
-rw-r--r--common/src/leap/soledad/common/tests/test_couch.py8
3 files changed, 198 insertions, 141 deletions
diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py
index 40d64370..09fe1ca3 100644
--- a/common/src/leap/soledad/common/couch.py
+++ b/common/src/leap/soledad/common/couch.py
@@ -24,10 +24,21 @@ import uuid
import logging
import binascii
import socket
+import time
+import sys
+
+
+from StringIO import StringIO
from couchdb.client import Server
-from couchdb.http import ResourceNotFound, Unauthorized, ServerError, Session
+from couchdb.http import (
+ ResourceConflict,
+ ResourceNotFound,
+ Unauthorized,
+ ServerError,
+ Session,
+)
from u1db import query_parser, vectorclock
from u1db.errors import (
DatabaseDoesNotExist,
@@ -87,9 +98,9 @@ class CouchDocument(SoledadDocument):
SoledadDocument.__init__(self, doc_id, rev, json, has_conflicts)
self._couch_rev = None
self._conflicts = None
- self._modified_conflicts = False
+ self._transactions = None
- def ensure_fetch_conflicts(self, get_conflicts_fun):
+ def _ensure_fetch_conflicts(self, get_conflicts_fun):
"""
Ensure conflict data has been fetched from the server.
@@ -120,8 +131,7 @@ class CouchDocument(SoledadDocument):
:type doc: CouchDocument
"""
if self._conflicts is None:
- raise Exception("Run self.ensure_fetch_conflicts first!")
- self._modified_conflicts = True
+ raise Exception("Run self._ensure_fetch_conflicts first!")
self._conflicts.append(doc)
self.has_conflicts = len(self._conflicts) > 0
@@ -133,25 +143,13 @@ class CouchDocument(SoledadDocument):
:type conflict_revs: [str]
"""
if self._conflicts is None:
- raise Exception("Run self.ensure_fetch_conflicts first!")
+ raise Exception("Run self._ensure_fetch_conflicts first!")
conflicts_len = len(self._conflicts)
self._conflicts = filter(
lambda doc: doc.rev not in conflict_revs,
self._conflicts)
- if len(self._conflicts) < conflicts_len:
- self._modified_conflicts = True
self.has_conflicts = len(self._conflicts) > 0
- def modified_conflicts(self):
- """
- Return whether this document's conflicts have been modified.
-
- :return: Whether this document's conflicts have been modified.
- :rtype: bool
- """
- return self._conflicts is not None and \
- self._modified_conflicts is True
-
def _get_couch_rev(self):
return self._couch_rev
@@ -160,6 +158,14 @@ class CouchDocument(SoledadDocument):
couch_rev = property(_get_couch_rev, _set_couch_rev)
+ def _get_transactions(self):
+ return self._transactions
+
+ def _set_transactions(self, rev):
+ self._transactions = rev
+
+ transactions = property(_get_transactions, _set_transactions)
+
# monkey-patch the u1db http app to use CouchDocument
http_app.Document = CouchDocument
@@ -225,6 +231,94 @@ def raise_server_error(exc, ddoc_path):
raise errors.DesignDocUnknownError(path)
+class MultipartWriter(object):
+ """
+ A multipart writer adapted from python-couchdb's one so we can PUT
+ documents using couch's multipart PUT.
+
+ This stripped down version does not allow for nested structures, and
+ contains only the essential things we need to PUT SoledadDocuments to the
+ couch backend.
+ """
+
+ CRLF = '\r\n'
+
+ def __init__(self, fileobj, headers=None, boundary=None):
+ """
+ Initialize the multipart writer.
+ """
+ self.fileobj = fileobj
+ if boundary is None:
+ boundary = self._make_boundary()
+ self._boundary = boundary
+ self._build_headers('related', headers)
+
+ def add(self, mimetype, content, headers={}):
+ """
+ Add a part to the multipart stream.
+ """
+ self.fileobj.write('--')
+ self.fileobj.write(self._boundary)
+ self.fileobj.write(self.CRLF)
+ headers['Content-Type'] = mimetype
+ self._write_headers(headers)
+ if content:
+ # XXX: throw an exception if a boundary appears in the content??
+ self.fileobj.write(content)
+ self.fileobj.write(self.CRLF)
+
+ def close(self):
+ """
+ Close the multipart stream.
+ """
+ self.fileobj.write('--')
+ self.fileobj.write(self._boundary)
+ # be careful not to have anything after '--', otherwise old couch
+ # versions (including bigcouch) will fail.
+ self.fileobj.write('--')
+
+ def _make_boundary(self):
+ """
+ Create a boundary to discern multi parts.
+ """
+ try:
+ from uuid import uuid4
+ return '==' + uuid4().hex + '=='
+ except ImportError:
+ from random import randrange
+ token = randrange(sys.maxint)
+ format = '%%0%dd' % len(repr(sys.maxint - 1))
+ return '===============' + (format % token) + '=='
+
+ def _write_headers(self, headers):
+ """
+ Write a part header in the buffer stream.
+ """
+ if headers:
+ for name in sorted(headers.keys()):
+ value = headers[name]
+ self.fileobj.write(name)
+ self.fileobj.write(': ')
+ self.fileobj.write(value)
+ self.fileobj.write(self.CRLF)
+ self.fileobj.write(self.CRLF)
+
+ def _build_headers(self, subtype, headers):
+ """
+ Build the main headers of the multipart stream.
+
+ This is here so we can send headers separete from content using
+ python-couchdb API.
+ """
+ self.headers = {}
+ self.headers['Content-Type'] = 'multipart/%s; boundary="%s"' % \
+ (subtype, self._boundary)
+ if headers:
+ for name in sorted(headers.keys()):
+ value = headers[name]
+ self.headers[name] = value
+
+
class CouchDatabase(CommonBackend):
"""
A U1DB implementation that uses CouchDB as its persistence layer.
@@ -564,8 +658,12 @@ class CouchDatabase(CommonBackend):
and '_attachments' in result \
and 'u1db_conflicts' in result['_attachments']:
doc.has_conflicts = True
+ #doc.conflicts = self._build_conflicts(
+ # json.loads(result['_attachments']['u1db_conflicts']))
# store couch revision
doc.couch_rev = result['_rev']
+ # store transactions
+ doc.transactions = result['u1db_transactions']
return doc
def get_doc(self, doc_id, include_deleted=False):
@@ -616,6 +714,10 @@ class CouchDatabase(CommonBackend):
"""
Put the document in the Couch backend database.
+ Note that C{old_doc} must have been fetched with the parameter
+ C{check_for_conflicts} equal to True, so we can properly update the
+ new document using the conflict information from the old one.
+
:param old_doc: The old document version.
:type old_doc: CouchDocument
:param doc: The document to be put.
@@ -637,45 +739,61 @@ class CouchDatabase(CommonBackend):
design document for an yet
unknown reason.
"""
- trans_id = self._allocate_transaction_id()
- # encode content
- content = doc.get_json()
- if content is not None:
- content = binascii.b2a_base64(content)[:-1] # exclude trailing \n
- # encode conflicts
- conflicts = None
- update_conflicts = doc.modified_conflicts()
- if update_conflicts is True:
- if doc.has_conflicts:
- conflicts = binascii.b2a_base64(
- json.dumps(
- map(lambda cdoc: (cdoc.rev, cdoc.content),
- doc.get_conflicts()))
- )[:-1] # exclude \n
- # perform the request
- ddoc_path = ['_design', 'docs', '_update', 'put', doc.doc_id]
- resource = self._database.resource(*ddoc_path)
+ attachments = {} # we save content and conflicts as attachments
+ parts = [] # and we put it using couch's multipart PUT
+ # save content as attachment
+ if doc.is_tombstone() is False:
+ content = doc.get_json()
+ attachments['u1db_content'] = {
+ 'follows': True,
+ 'content_type': 'application/octet-stream',
+ 'length': len(content),
+ }
+ parts.append(content)
+ # save conflicts as attachment
+ if doc.has_conflicts is True:
+ conflicts = json.dumps(
+ map(lambda cdoc: (cdoc.rev, cdoc.content),
+ doc.get_conflicts()))
+ attachments['u1db_conflicts'] = {
+ 'follows': True,
+ 'content_type': 'application/octet-stream',
+ 'length': len(conflicts),
+ }
+ parts.append(conflicts)
+ # store old transactions, if any
+ transactions = old_doc.transactions[:] if old_doc is not None else []
+ # create a new transaction id and timestamp it so the transaction log
+ # is consistent when querying the database.
+ transactions.append(
+ # here we store milliseconds to keep consistent with javascript
+ # Date.prototype.getTime() which was used before inside a couchdb
+ # update handler.
+ (int(time.time() * 1000),
+ self._allocate_transaction_id()))
+ # build the couch document
+ couch_doc = {
+ '_id': doc.doc_id,
+ 'u1db_rev': doc.rev,
+ 'u1db_transactions': transactions,
+ '_attachments': attachments,
+ }
+ # if we are updating a doc we have to add the couch doc revision
+ if old_doc is not None:
+ couch_doc['_rev'] = old_doc.couch_rev
+ # prepare the multipart PUT
+ buf = StringIO()
+ envelope = MultipartWriter(buf)
+ envelope.add('application/json', json.dumps(couch_doc))
+ for part in parts:
+ envelope.add('application/octet-stream', part)
+ envelope.close()
+ # try to save and fail if there's a revision conflict
try:
- response = resource.put_json(
- body={
- 'couch_rev': old_doc.couch_rev
- if old_doc is not None else None,
- 'u1db_rev': doc.rev,
- 'content': content,
- 'trans_id': trans_id,
- 'conflicts': conflicts,
- 'update_conflicts': update_conflicts,
- },
- headers={'content-type': 'application/json'})
- # the document might have been updated in between, so we check for
- # the return message
- msg = response[2].read()
- if msg == 'ok':
- return
- elif msg == 'revision conflict':
- raise RevisionConflict()
- except ResourceNotFound as e:
- raise_missing_design_doc_error(e, ddoc_path)
+ self._database.resource.put_json(
+ doc.doc_id, body=buf.getvalue(), headers=envelope.headers)
+ except ResourceConflict:
+ raise RevisionConflict()
def put_doc(self, doc):
"""
@@ -810,6 +928,25 @@ class CouchDatabase(CommonBackend):
self._put_doc(old_doc, doc)
return new_rev
+ def _build_conflicts(self, doc_id, attached_conflicts):
+ """
+ Build the conflicted documents list from the conflicts attachment
+ fetched from a couch document.
+
+ :param attached_conflicts: The document's conflicts as fetched from a
+ couch document attachment.
+ :type attached_conflicts: dict
+ """
+ conflicts = []
+ for doc_rev, content in attached_conflicts:
+ doc = self._factory(doc_id, doc_rev)
+ if content is None:
+ doc.make_tombstone()
+ else:
+ doc.content = content
+ conflicts.append(doc)
+ return conflicts
+
def _get_conflicts(self, doc_id, couch_rev=None):
"""
Get the conflicted versions of a document.
@@ -830,16 +967,8 @@ class CouchDatabase(CommonBackend):
resource = self._database.resource(doc_id, 'u1db_conflicts')
try:
response = resource.get_json(**params)
- conflicts = []
- # build the conflicted versions
- for doc_rev, content in json.loads(response[2].read()):
- doc = self._factory(doc_id, doc_rev)
- if content is None:
- doc.make_tombstone()
- else:
- doc.content = content
- conflicts.append(doc)
- return conflicts
+ return self._build_conflicts(
+ doc_id, json.loads(response[2].read()))
except ResourceNotFound:
return []
@@ -970,7 +1099,7 @@ class CouchDatabase(CommonBackend):
serialized string.
:type my_content: str
"""
- doc.ensure_fetch_conflicts(self._get_conflicts)
+ doc._ensure_fetch_conflicts(self._get_conflicts)
doc.add_conflict(
self._factory(doc_id=doc.doc_id, rev=my_doc_rev,
json=my_content))
@@ -988,7 +1117,7 @@ class CouchDatabase(CommonBackend):
:param conflict_revs: A list of the revisions to be deleted.
:param conflict_revs: [str]
"""
- doc.ensure_fetch_conflicts(self._get_conflicts)
+ doc._ensure_fetch_conflicts(self._get_conflicts)
doc.delete_conflicts(conflict_revs)
def _prune_conflicts(self, doc, doc_vcr):
@@ -1150,7 +1279,7 @@ class CouchDatabase(CommonBackend):
if cur_doc is not None:
doc.couch_rev = cur_doc.couch_rev
# fetch conflicts because we will eventually manipulate them
- doc.ensure_fetch_conflicts(self._get_conflicts)
+ doc._ensure_fetch_conflicts(self._get_conflicts)
# from now on, it works just like u1db sqlite backend
doc_vcr = vectorclock.VectorClockRev(doc.rev)
if cur_doc is None:
diff --git a/common/src/leap/soledad/common/ddocs/docs/updates/put.js b/common/src/leap/soledad/common/ddocs/docs/updates/put.js
deleted file mode 100644
index 5a4647de..00000000
--- a/common/src/leap/soledad/common/ddocs/docs/updates/put.js
+++ /dev/null
@@ -1,64 +0,0 @@
-function(doc, req){
- /* we expect to receive the following in `req.body`:
- * {
- * 'couch_rev': '<couch_rev>',
- * 'u1db_rev': '<u1db_rev>',
- * 'content': '<base64 encoded content>',
- * 'trans_id': '<reansaction_id>'
- * 'conflicts': '<base64 encoded conflicts>',
- * 'update_conflicts': <boolean>
- * }
- */
- var body = JSON.parse(req.body);
-
- // create a new document document
- if (!doc) {
- doc = {}
- doc['_id'] = req['id'];
- }
- // or fail if couch revisions do not match
- else if (doc['_rev'] != body['couch_rev']) {
- // of fail if revisions do not match
- return [null, 'revision conflict']
- }
-
- // store u1db rev
- doc.u1db_rev = body['u1db_rev'];
-
- // save content as attachment
- if (body['content'] != null) {
- // save u1db content as attachment
- if (!doc._attachments)
- doc._attachments = {};
- doc._attachments.u1db_content = {
- content_type: "application/octet-stream",
- data: body['content'] // should be base64 encoded
- };
- }
- // or delete the attachment if document is tombstone
- else if (doc._attachments &&
- doc._attachments.u1db_content)
- delete doc._attachments.u1db_content;
-
- // store the transaction id
- if (!doc.u1db_transactions)
- doc.u1db_transactions = [];
- var d = new Date();
- doc.u1db_transactions.push([d.getTime(), body['trans_id']]);
-
- // save conflicts as attachment if they were sent
- if (body['update_conflicts'])
- if (body['conflicts'] != null) {
- if (!doc._attachments)
- doc._attachments = {};
- doc._attachments.u1db_conflicts = {
- content_type: "application/octet-stream",
- data: body['conflicts'] // should be base64 encoded
- }
- } else {
- if(doc._attachments && doc._attachments.u1db_conflicts)
- delete doc._attachments.u1db_conflicts
- }
-
- return [doc, 'ok'];
-}
diff --git a/common/src/leap/soledad/common/tests/test_couch.py b/common/src/leap/soledad/common/tests/test_couch.py
index dc0ea906..97d744e4 100644
--- a/common/src/leap/soledad/common/tests/test_couch.py
+++ b/common/src/leap/soledad/common/tests/test_couch.py
@@ -495,10 +495,6 @@ class CouchDatabaseExceptionsTests(CouchDBTestCase):
self.assertRaises(
errors.MissingDesignDocError,
self.db._get_transaction_log)
- # create_doc()
- self.assertRaises(
- errors.MissingDesignDocError,
- self.db.create_doc, {})
# whats_changed()
self.assertRaises(
errors.MissingDesignDocError,
@@ -645,10 +641,6 @@ class CouchDatabaseExceptionsTests(CouchDBTestCase):
self.assertRaises(
errors.MissingDesignDocDeletedError,
self.db._get_transaction_log)
- # create_doc()
- self.assertRaises(
- errors.MissingDesignDocDeletedError,
- self.db.create_doc, {})
# whats_changed()
self.assertRaises(
errors.MissingDesignDocDeletedError,