summaryrefslogtreecommitdiff
path: root/server/src/leap/soledad/server/sync.py
diff options
context:
space:
mode:
Diffstat (limited to 'server/src/leap/soledad/server/sync.py')
-rw-r--r--server/src/leap/soledad/server/sync.py192
1 files changed, 17 insertions, 175 deletions
diff --git a/server/src/leap/soledad/server/sync.py b/server/src/leap/soledad/server/sync.py
index 18c4ee40..96f65912 100644
--- a/server/src/leap/soledad/server/sync.py
+++ b/server/src/leap/soledad/server/sync.py
@@ -17,189 +17,22 @@
"""
Server side synchronization infrastructure.
"""
-import json
-
-from leap.soledad.common.couch import CouchDatabase
from u1db import sync, Document
from u1db.remote import http_app
+from leap.soledad.server.state import ServerSyncState
+from leap.soledad.server.caching import get_cache_for
MAX_REQUEST_SIZE = 200 # in Mb
MAX_ENTRY_SIZE = 200 # in Mb
-class ServerSyncState(object):
- """
- The state of one sync session, as stored on backend server.
-
- This object performes queries to distinct design documents:
-
- _design/syncs/_update/state
- _design/syncs/_view/state
- _design/syncs/_view/seen_ids
- _design/syncs/_view/changes_to_return
-
- On server side, the ongoing syncs metadata is maintained in a document
- called 'u1db_sync_state'.
- """
-
- def __init__(self, db, source_replica_uid, sync_id):
- """
- Initialize the sync state object.
-
- :param db: The target syncing database.
- :type db: CouchDatabase.
- :param source_replica_uid: CouchDatabase
- :type source_replica_uid: str
- """
- self._db = db
- self._source_replica_uid = source_replica_uid
- self._sync_id = sync_id
-
- def _key(self, key):
- """
- Format a key to be used on couch views.
-
- :param key: The lookup key.
- :type key: json serializable object
-
- :return: The properly formatted key.
- :rtype: str
- """
- return json.dumps(key, separators=(',', ':'))
-
- def _put_info(self, key, value):
- """
- Put some information on the sync state document.
-
- This method works in conjunction with the
- _design/syncs/_update/state update handler couch backend.
-
- :param key: The key for the info to be put.
- :type key: str
- :param value: The value for the info to be put.
- :type value: str
- """
- ddoc_path = [
- '_design', 'syncs', '_update', 'state',
- 'u1db_sync_state']
- res = self._db._database.resource(*ddoc_path)
- with CouchDatabase.sync_info_lock[self._db.replica_uid]:
- res.put_json(
- body={
- 'sync_id': self._sync_id,
- 'source_replica_uid': self._source_replica_uid,
- key: value,
- },
- headers={'content-type': 'application/json'})
-
- def put_seen_id(self, seen_id, gen):
- """
- Put one seen id on the sync state document.
-
- :param seen_id: The doc_id of a document seen during sync.
- :type seen_id: str
- :param gen: The corresponding db generation for that document.
- :type gen: int
- """
- self._put_info(
- 'seen_id',
- [seen_id, gen])
-
- def seen_ids(self):
- """
- Return all document ids seen during the sync.
-
- :return: A list with doc ids seen during the sync.
- :rtype: list
- """
- ddoc_path = ['_design', 'syncs', '_view', 'seen_ids']
- resource = self._db._database.resource(*ddoc_path)
- response = resource.get_json(
- key=self._key([self._source_replica_uid, self._sync_id]))
- data = response[2]
- if data['rows']:
- entry = data['rows'].pop()
- return entry['value']['seen_ids']
- return []
-
- def put_changes_to_return(self, gen, trans_id, changes_to_return):
- """
- Put the calculated changes to return in the backend sync state
- document.
-
- :param gen: The target database generation that will be synced.
- :type gen: int
- :param trans_id: The target database transaction id that will be
- synced.
- :type trans_id: str
- :param changes_to_return: A list of tuples with the changes to be
- returned during the sync process.
- :type changes_to_return: list
- """
- self._put_info(
- 'changes_to_return',
- {
- 'gen': gen,
- 'trans_id': trans_id,
- 'changes_to_return': changes_to_return,
- }
- )
-
- def sync_info(self):
- """
- Return information about the current sync state.
-
- :return: The generation and transaction id of the target database
- which will be synced, and the number of documents to return,
- or a tuple of Nones if those have not already been sent to
- server.
- :rtype: tuple
- """
- ddoc_path = ['_design', 'syncs', '_view', 'state']
- resource = self._db._database.resource(*ddoc_path)
- response = resource.get_json(
- key=self._key([self._source_replica_uid, self._sync_id]))
- data = response[2]
- gen = None
- trans_id = None
- number_of_changes = None
- if data['rows'] and data['rows'][0]['value'] is not None:
- value = data['rows'][0]['value']
- gen = value['gen']
- trans_id = value['trans_id']
- number_of_changes = value['number_of_changes']
- return gen, trans_id, number_of_changes
-
- def next_change_to_return(self, received):
- """
- Return the next change to be returned to the source syncing replica.
-
- :param received: How many documents the source replica has already
- received during the current sync process.
- :type received: int
- """
- ddoc_path = ['_design', 'syncs', '_view', 'changes_to_return']
- resource = self._db._database.resource(*ddoc_path)
- response = resource.get_json(
- key=self._key(
- [self._source_replica_uid, self._sync_id, received]))
- data = response[2]
- if not data['rows']:
- return None, None, None
- value = data['rows'][0]['value']
- gen = value['gen']
- trans_id = value['trans_id']
- next_change_to_return = value['next_change_to_return']
- return gen, trans_id, tuple(next_change_to_return)
-
-
class SyncExchange(sync.SyncExchange):
def __init__(self, db, source_replica_uid, last_known_generation, sync_id):
"""
:param db: The target syncing database.
- :type db: CouchDatabase
+ :type db: SoledadBackend
:param source_replica_uid: The uid of the source syncing replica.
:type source_replica_uid: str
:param last_known_generation: The last target replica generation the
@@ -216,8 +49,7 @@ class SyncExchange(sync.SyncExchange):
self.new_trans_id = None
self._trace_hook = None
# recover sync state
- self._sync_state = ServerSyncState(
- self._db, self.source_replica_uid, sync_id)
+ self._sync_state = ServerSyncState(self.source_replica_uid, sync_id)
def find_changes_to_return(self, received):
"""
@@ -280,6 +112,14 @@ class SyncExchange(sync.SyncExchange):
doc = self._db.get_doc(changed_doc_id, include_deleted=True)
return_doc_cb(doc, gen, trans_id)
+ def batched_insert_from_source(self, entries, sync_id):
+ self._db.batch_start()
+ for entry in entries:
+ doc, gen, trans_id, number_of_docs, doc_idx = entry
+ self.insert_doc_from_source(doc, gen, trans_id, number_of_docs,
+ doc_idx, sync_id)
+ self._db.batch_end()
+
def insert_doc_from_source(
self, doc, source_gen, trans_id,
number_of_docs=None, doc_idx=None, sync_id=None):
@@ -353,10 +193,12 @@ class SyncResource(http_app.SyncResource):
:type ensure: bool
"""
# create or open the database
+ cache = get_cache_for('db-' + sync_id + self.dbname, expire=120)
if ensure:
db, self.replica_uid = self.state.ensure_database(self.dbname)
else:
db = self.state.open_database(self.dbname)
+ db.init_caching(cache)
# validate the information the client has about server replica
db.validate_gen_and_trans_id(
last_known_generation, last_known_trans_id)
@@ -364,6 +206,7 @@ class SyncResource(http_app.SyncResource):
self.sync_exch = self.sync_exchange_class(
db, self.source_replica_uid, last_known_generation, sync_id)
self._sync_id = sync_id
+ self._staging = []
@http_app.http_method(content_as_args=True)
def post_put(
@@ -391,9 +234,7 @@ class SyncResource(http_app.SyncResource):
:type doc_idx: int
"""
doc = Document(id, rev, content)
- self.sync_exch.insert_doc_from_source(
- doc, gen, trans_id, number_of_docs=number_of_docs,
- doc_idx=doc_idx, sync_id=self._sync_id)
+ self._staging.append((doc, gen, trans_id, number_of_docs, doc_idx))
@http_app.http_method(received=int, content_as_args=True)
def post_get(self, received):
@@ -432,6 +273,7 @@ class SyncResource(http_app.SyncResource):
Return the current generation and transaction_id after inserting one
incoming document.
"""
+ self.sync_exch.batched_insert_from_source(self._staging, self._sync_id)
self.responder.content_type = 'application/x-soledad-sync-response'
self.responder.start_response(200)
self.responder.start_stream(),