summaryrefslogtreecommitdiff
path: root/common/src/leap/soledad/common/couch/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'common/src/leap/soledad/common/couch/__init__.py')
-rw-r--r--common/src/leap/soledad/common/couch/__init__.py100
1 files changed, 82 insertions, 18 deletions
diff --git a/common/src/leap/soledad/common/couch/__init__.py b/common/src/leap/soledad/common/couch/__init__.py
index bd8b08b7..dae460cb 100644
--- a/common/src/leap/soledad/common/couch/__init__.py
+++ b/common/src/leap/soledad/common/couch/__init__.py
@@ -33,7 +33,6 @@ from multiprocessing.pool import ThreadPool
from couchdb.client import Server, Database
-from couchdb.multipart import MultipartWriter
from couchdb.http import (
ResourceConflict,
ResourceNotFound,
@@ -53,6 +52,7 @@ from u1db.remote import http_app
from leap.soledad.common import ddocs
from .errors import raise_server_error
from .errors import raise_missing_design_doc_error
+from .support import MultipartWriter
from leap.soledad.common.errors import InvalidURLError
from leap.soledad.common.document import ServerDocument
from leap.soledad.common.backend import SoledadBackend
@@ -153,10 +153,24 @@ class CouchDatabase(object):
self._url = url
self._dbname = dbname
self._database = self.get_couch_database(url, dbname)
+ self.batching = False
+ self.batch_generation = None
+ self.batch_docs = {}
if ensure_ddocs:
self.ensure_ddocs_on_db()
self.ensure_security_ddoc(database_security)
+ def batch_start(self):
+ self.batching = True
+ self.batch_generation = self.get_generation_info()
+ ids = set(row.id for row in self._database.view('_all_docs'))
+ self.batched_ids = ids
+
+ def batch_end(self):
+ self.batching = False
+ self.batch_generation = None
+ self.__perform_batch()
+
def get_couch_database(self, url, dbname):
"""
Generate a couchdb.Database instance given a url and dbname.
@@ -185,7 +199,8 @@ class CouchDatabase(object):
"""
for ddoc_name in ['docs', 'syncs', 'transactions']:
try:
- self.json_from_resource(['_design', ddoc_name, '_info'],
+ self.json_from_resource(['_design'] +
+ ddoc_name.split('/') + ['_info'],
check_missing_ddoc=False)
except ResourceNotFound:
ddoc = json.loads(
@@ -337,13 +352,49 @@ class CouchDatabase(object):
:return: The document.
:rtype: ServerDocument
"""
- # get document with all attachments (u1db content and eventual
- # conflicts)
+ doc_from_batch = self.__check_batch_before_get(doc_id)
+ if doc_from_batch:
+ return doc_from_batch
+ if self.batching and doc_id not in self.batched_ids:
+ return None
if doc_id not in self._database:
return None
+ # get document with all attachments (u1db content and eventual
+ # conflicts)
result = self.json_from_resource([doc_id], attachments=True)
return self.__parse_doc_from_couch(result, doc_id, check_for_conflicts)
+ def __check_batch_before_get(self, doc_id):
+ """
+ If doc_id is staged for batching, then we need to commit the batch
+ before going ahead. This avoids consistency problems, like trying to
+ get a document that isn't persisted and processing like it is missing.
+
+ :param doc_id: The unique document identifier
+ :type doc_id: str
+ """
+ if doc_id in self.batch_docs:
+ couch_doc = self.batch_docs[doc_id]
+ rev = self.__perform_batch(doc_id)
+ couch_doc['_rev'] = rev
+ self.batched_ids.add(doc_id)
+ return self.__parse_doc_from_couch(couch_doc, doc_id, True)
+ return None
+
+ def __perform_batch(self, doc_id=None):
+ status = self._database.update(self.batch_docs.values())
+ rev = None
+ for ok, stored_doc_id, rev_or_error in status:
+ if not ok:
+ error = rev_or_error
+ if type(error) is ResourceConflict:
+ raise RevisionConflict
+ raise error
+ elif doc_id == stored_doc_id:
+ rev = rev_or_error
+ self.batch_docs.clear()
+ return rev
+
def __parse_doc_from_couch(self, result, doc_id,
check_for_conflicts=False):
# restrict to u1db documents
@@ -571,6 +622,8 @@ class CouchDatabase(object):
:return: A tuple containing the current generation and transaction id.
:rtype: (int, str)
"""
+ if self.batching and self.batch_generation:
+ return self.batch_generation
# query a couch list function
ddoc_path = ['_design', 'transactions', '_list', 'generation', 'log']
info = self.json_from_resource(ddoc_path)
@@ -691,20 +744,31 @@ class CouchDatabase(object):
if old_doc is not None and hasattr(old_doc, 'couch_rev'):
couch_doc['_rev'] = old_doc.couch_rev
# prepare the multipart PUT
- buf = StringIO()
- headers = {}
- envelope = MultipartWriter(buf, headers=headers, subtype='related')
- envelope.add('application/json', json.dumps(couch_doc))
- for part in parts:
- envelope.add('application/octet-stream', part)
- envelope.close()
- # try to save and fail if there's a revision conflict
- try:
- resource = self._new_resource()
- resource.put_json(
- doc.doc_id, body=str(buf.getvalue()), headers=headers)
- except ResourceConflict:
- raise RevisionConflict()
+ if not self.batching:
+ buf = StringIO()
+ envelope = MultipartWriter(buf)
+ envelope.add('application/json', json.dumps(couch_doc))
+ for part in parts:
+ envelope.add('application/octet-stream', part)
+ envelope.close()
+ # try to save and fail if there's a revision conflict
+ try:
+ resource = self._new_resource()
+ resource.put_json(
+ doc.doc_id, body=str(buf.getvalue()),
+ headers=envelope.headers)
+ except ResourceConflict:
+ raise RevisionConflict()
+ else:
+ for name, attachment in attachments.items():
+ del attachment['follows']
+ del attachment['length']
+ index = 0 if name is 'u1db_content' else 1
+ attachment['data'] = binascii.b2a_base64(parts[index]).strip()
+ couch_doc['_attachments'] = attachments
+ self.batch_docs[doc.doc_id] = couch_doc
+ last_gen, last_trans_id = self.batch_generation
+ self.batch_generation = (last_gen + 1, transaction_id)
return transactions[-1][1]
def _new_resource(self, *path):