summaryrefslogtreecommitdiff
path: root/server/src/leap
diff options
context:
space:
mode:
authorRuben Pollan <meskio@sindominio.net>2015-09-27 21:49:00 +0200
committerRuben Pollan <meskio@sindominio.net>2015-09-27 21:49:00 +0200
commit1566ac3885d566d0b3d70a44eaf0bd901b2d655e (patch)
treea2c7995d2ab1f2a95fdc85539b82b4746424937f /server/src/leap
parent4be6f05d91891122e83f74d21c83c5f8fcd3a618 (diff)
parent5b20b469f22ab99533135beefd49129db49064e5 (diff)
Merge branch 'feature/sync_state_in_memory' into develop
- Releases: 0.8.0
Diffstat (limited to 'server/src/leap')
-rw-r--r--server/src/leap/soledad/server/caching.py32
-rw-r--r--server/src/leap/soledad/server/state.py141
-rw-r--r--server/src/leap/soledad/server/sync.py179
3 files changed, 181 insertions, 171 deletions
diff --git a/server/src/leap/soledad/server/caching.py b/server/src/leap/soledad/server/caching.py
new file mode 100644
index 00000000..9a049a39
--- /dev/null
+++ b/server/src/leap/soledad/server/caching.py
@@ -0,0 +1,32 @@
+# -*- coding: utf-8 -*-
+# caching.py
+# Copyright (C) 2015 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Server side caching. Using beaker for now.
+"""
+from beaker.cache import CacheManager
+
+
+def setup_caching():
+ _cache_manager = CacheManager(type='memory')
+ return _cache_manager
+
+
+_cache_manager = setup_caching()
+
+
+def get_cache_for(key, expire=3600):
+ return _cache_manager.get_cache(key, expire=expire)
diff --git a/server/src/leap/soledad/server/state.py b/server/src/leap/soledad/server/state.py
new file mode 100644
index 00000000..f269b77e
--- /dev/null
+++ b/server/src/leap/soledad/server/state.py
@@ -0,0 +1,141 @@
+# -*- coding: utf-8 -*-
+# state.py
+# Copyright (C) 2015 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Server side synchronization infrastructure.
+"""
+from leap.soledad.server import caching
+
+
+class ServerSyncState(object):
+ """
+ The state of one sync session, as stored on backend server.
+
+ On server side, the ongoing syncs metadata is maintained in
+ a caching layer.
+ """
+
+ def __init__(self, source_replica_uid, sync_id):
+ """
+ Initialize the sync state object.
+
+ :param sync_id: The id of current sync
+ :type sync_id: str
+ :param source_replica_uid: The source replica uid
+ :type source_replica_uid: str
+ """
+ self._source_replica_uid = source_replica_uid
+ self._sync_id = sync_id
+ caching_key = source_replica_uid + sync_id
+ self._storage = caching.get_cache_for(caching_key)
+
+ def _put_dict_info(self, key, value):
+ """
+ Put some information about the sync state.
+
+ :param key: The key for the info to be put.
+ :type key: str
+ :param value: The value for the info to be put.
+ :type value: str
+ """
+ if key not in self._storage:
+ self._storage[key] = []
+ info_list = self._storage.get(key)
+ info_list.append(value)
+ self._storage[key] = info_list
+
+ def put_seen_id(self, seen_id, gen):
+ """
+ Put one seen id on the sync state.
+
+ :param seen_id: The doc_id of a document seen during sync.
+ :type seen_id: str
+ :param gen: The corresponding db generation.
+ :type gen: int
+ """
+ self._put_dict_info(
+ 'seen_id',
+ (seen_id, gen))
+
+ def seen_ids(self):
+ """
+ Return all document ids seen during the sync.
+
+ :return: A dict with doc ids seen during the sync.
+ :rtype: dict
+ """
+ if 'seen_id' in self._storage:
+ seen_ids = self._storage.get('seen_id')
+ else:
+ seen_ids = []
+ return dict(seen_ids)
+
+ def put_changes_to_return(self, gen, trans_id, changes_to_return):
+ """
+ Put the calculated changes to return in the backend sync state.
+
+ :param gen: The target database generation that will be synced.
+ :type gen: int
+ :param trans_id: The target database transaction id that will be
+ synced.
+ :type trans_id: str
+ :param changes_to_return: A list of tuples with the changes to be
+ returned during the sync process.
+ :type changes_to_return: list
+ """
+ self._put_dict_info(
+ 'changes_to_return',
+ {
+ 'gen': gen,
+ 'trans_id': trans_id,
+ 'changes_to_return': changes_to_return,
+ }
+ )
+
+ def sync_info(self):
+ """
+ Return information about the current sync state.
+
+ :return: The generation and transaction id of the target database
+ which will be synced, and the number of documents to return,
+ or a tuple of Nones if those have not already been sent to
+ server.
+ :rtype: tuple
+ """
+ gen = trans_id = number_of_changes = None
+ if 'changes_to_return' in self._storage:
+ info = self._storage.get('changes_to_return')[0]
+ gen = info['gen']
+ trans_id = info['trans_id']
+ number_of_changes = len(info['changes_to_return'])
+ return gen, trans_id, number_of_changes
+
+ def next_change_to_return(self, received):
+ """
+ Return the next change to be returned to the source syncing replica.
+
+ :param received: How many documents the source replica has already
+ received during the current sync process.
+ :type received: int
+ """
+ gen = trans_id = next_change_to_return = None
+ if 'changes_to_return' in self._storage:
+ info = self._storage.get('changes_to_return')[0]
+ gen = info['gen']
+ trans_id = info['trans_id']
+ if received < len(info['changes_to_return']):
+ next_change_to_return = (info['changes_to_return'][received])
+ return gen, trans_id, next_change_to_return
diff --git a/server/src/leap/soledad/server/sync.py b/server/src/leap/soledad/server/sync.py
index 18c4ee40..619be565 100644
--- a/server/src/leap/soledad/server/sync.py
+++ b/server/src/leap/soledad/server/sync.py
@@ -17,183 +17,16 @@
"""
Server side synchronization infrastructure.
"""
-import json
-
-from leap.soledad.common.couch import CouchDatabase
from u1db import sync, Document
from u1db.remote import http_app
+from leap.soledad.server.state import ServerSyncState
+from leap.soledad.server.caching import get_cache_for
MAX_REQUEST_SIZE = 200 # in Mb
MAX_ENTRY_SIZE = 200 # in Mb
-class ServerSyncState(object):
- """
- The state of one sync session, as stored on backend server.
-
- This object performes queries to distinct design documents:
-
- _design/syncs/_update/state
- _design/syncs/_view/state
- _design/syncs/_view/seen_ids
- _design/syncs/_view/changes_to_return
-
- On server side, the ongoing syncs metadata is maintained in a document
- called 'u1db_sync_state'.
- """
-
- def __init__(self, db, source_replica_uid, sync_id):
- """
- Initialize the sync state object.
-
- :param db: The target syncing database.
- :type db: CouchDatabase.
- :param source_replica_uid: CouchDatabase
- :type source_replica_uid: str
- """
- self._db = db
- self._source_replica_uid = source_replica_uid
- self._sync_id = sync_id
-
- def _key(self, key):
- """
- Format a key to be used on couch views.
-
- :param key: The lookup key.
- :type key: json serializable object
-
- :return: The properly formatted key.
- :rtype: str
- """
- return json.dumps(key, separators=(',', ':'))
-
- def _put_info(self, key, value):
- """
- Put some information on the sync state document.
-
- This method works in conjunction with the
- _design/syncs/_update/state update handler couch backend.
-
- :param key: The key for the info to be put.
- :type key: str
- :param value: The value for the info to be put.
- :type value: str
- """
- ddoc_path = [
- '_design', 'syncs', '_update', 'state',
- 'u1db_sync_state']
- res = self._db._database.resource(*ddoc_path)
- with CouchDatabase.sync_info_lock[self._db.replica_uid]:
- res.put_json(
- body={
- 'sync_id': self._sync_id,
- 'source_replica_uid': self._source_replica_uid,
- key: value,
- },
- headers={'content-type': 'application/json'})
-
- def put_seen_id(self, seen_id, gen):
- """
- Put one seen id on the sync state document.
-
- :param seen_id: The doc_id of a document seen during sync.
- :type seen_id: str
- :param gen: The corresponding db generation for that document.
- :type gen: int
- """
- self._put_info(
- 'seen_id',
- [seen_id, gen])
-
- def seen_ids(self):
- """
- Return all document ids seen during the sync.
-
- :return: A list with doc ids seen during the sync.
- :rtype: list
- """
- ddoc_path = ['_design', 'syncs', '_view', 'seen_ids']
- resource = self._db._database.resource(*ddoc_path)
- response = resource.get_json(
- key=self._key([self._source_replica_uid, self._sync_id]))
- data = response[2]
- if data['rows']:
- entry = data['rows'].pop()
- return entry['value']['seen_ids']
- return []
-
- def put_changes_to_return(self, gen, trans_id, changes_to_return):
- """
- Put the calculated changes to return in the backend sync state
- document.
-
- :param gen: The target database generation that will be synced.
- :type gen: int
- :param trans_id: The target database transaction id that will be
- synced.
- :type trans_id: str
- :param changes_to_return: A list of tuples with the changes to be
- returned during the sync process.
- :type changes_to_return: list
- """
- self._put_info(
- 'changes_to_return',
- {
- 'gen': gen,
- 'trans_id': trans_id,
- 'changes_to_return': changes_to_return,
- }
- )
-
- def sync_info(self):
- """
- Return information about the current sync state.
-
- :return: The generation and transaction id of the target database
- which will be synced, and the number of documents to return,
- or a tuple of Nones if those have not already been sent to
- server.
- :rtype: tuple
- """
- ddoc_path = ['_design', 'syncs', '_view', 'state']
- resource = self._db._database.resource(*ddoc_path)
- response = resource.get_json(
- key=self._key([self._source_replica_uid, self._sync_id]))
- data = response[2]
- gen = None
- trans_id = None
- number_of_changes = None
- if data['rows'] and data['rows'][0]['value'] is not None:
- value = data['rows'][0]['value']
- gen = value['gen']
- trans_id = value['trans_id']
- number_of_changes = value['number_of_changes']
- return gen, trans_id, number_of_changes
-
- def next_change_to_return(self, received):
- """
- Return the next change to be returned to the source syncing replica.
-
- :param received: How many documents the source replica has already
- received during the current sync process.
- :type received: int
- """
- ddoc_path = ['_design', 'syncs', '_view', 'changes_to_return']
- resource = self._db._database.resource(*ddoc_path)
- response = resource.get_json(
- key=self._key(
- [self._source_replica_uid, self._sync_id, received]))
- data = response[2]
- if not data['rows']:
- return None, None, None
- value = data['rows'][0]['value']
- gen = value['gen']
- trans_id = value['trans_id']
- next_change_to_return = value['next_change_to_return']
- return gen, trans_id, tuple(next_change_to_return)
-
-
class SyncExchange(sync.SyncExchange):
def __init__(self, db, source_replica_uid, last_known_generation, sync_id):
@@ -216,8 +49,7 @@ class SyncExchange(sync.SyncExchange):
self.new_trans_id = None
self._trace_hook = None
# recover sync state
- self._sync_state = ServerSyncState(
- self._db, self.source_replica_uid, sync_id)
+ self._sync_state = ServerSyncState(self.source_replica_uid, sync_id)
def find_changes_to_return(self, received):
"""
@@ -353,10 +185,15 @@ class SyncResource(http_app.SyncResource):
:type ensure: bool
"""
# create or open the database
+ cache = get_cache_for('db-' + sync_id + self.dbname)
if ensure:
db, self.replica_uid = self.state.ensure_database(self.dbname)
+ elif cache and 'instance' in cache:
+ db = cache['instance']
else:
db = self.state.open_database(self.dbname)
+ db.init_caching(cache)
+ cache['instance'] = db
# validate the information the client has about server replica
db.validate_gen_and_trans_id(
last_known_generation, last_known_trans_id)