diff options
Diffstat (limited to 'server/src')
| -rw-r--r-- | server/src/leap/soledad/server/caching.py | 32 | ||||
| -rw-r--r-- | server/src/leap/soledad/server/state.py | 143 | ||||
| -rw-r--r-- | server/src/leap/soledad/server/sync.py | 173 | 
3 files changed, 177 insertions, 171 deletions
| diff --git a/server/src/leap/soledad/server/caching.py b/server/src/leap/soledad/server/caching.py new file mode 100644 index 00000000..cd5d8dd4 --- /dev/null +++ b/server/src/leap/soledad/server/caching.py @@ -0,0 +1,32 @@ +# -*- coding: utf-8 -*- +# caching.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Server side caching. Using beaker for now. +""" +from beaker.cache import CacheManager + + +def setup_caching(): +    _cache_manager = CacheManager(type='memory') +    return _cache_manager + + +_cache_manager = setup_caching() + + +def get_cache_for(key): +    return _cache_manager.get_cache(key) diff --git a/server/src/leap/soledad/server/state.py b/server/src/leap/soledad/server/state.py new file mode 100644 index 00000000..d1225170 --- /dev/null +++ b/server/src/leap/soledad/server/state.py @@ -0,0 +1,143 @@ +# -*- coding: utf-8 -*- +# state.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Server side synchronization infrastructure. +""" +from leap.soledad.server import caching + + +class ServerSyncState(object): +    """ +    The state of one sync session, as stored on backend server. + +    On server side, the ongoing syncs metadata is maintained in a caching layer. +    """ + +    def __init__(self, source_replica_uid, sync_id): +        """ +        Initialize the sync state object. + +        :param sync_id: The id of current sync +        :type sync_id: str +        :param source_replica_uid: The source replica uid +        :type source_replica_uid: str +        """ +        self._source_replica_uid = source_replica_uid +        self._sync_id = sync_id +        self._storage = caching.get_cache_for(source_replica_uid + sync_id) + +    def _put_dict_info(self, key, value): +        """ +        Put some information about the sync state. + +        :param key: The key for the info to be put. +        :type key: str +        :param value: The value for the info to be put. +        :type value: str +        """ +        if key not in self._storage: +            self._storage[key] = [] +        info_list = self._storage.get(key) +        info_list.append(value) +        self._storage[key] = info_list + +    def put_seen_id(self, seen_id, gen): +        """ +        Put one seen id on the sync state. + +        :param seen_id: The doc_id of a document seen during sync. +        :type seen_id: str +        :param gen: The corresponding db generation for that document. +        :type gen: int +        """ +        self._put_dict_info( +            'seen_id', +            (seen_id, gen)) + +    def seen_ids(self): +        """ +        Return all document ids seen during the sync. + +        :return: A dict with doc ids seen during the sync. +        :rtype: dict +        """ +        if 'seen_id' in self._storage: +            seen_ids = self._storage.get('seen_id') +        else: +            seen_ids = [] +        return dict(seen_ids) + +    def put_changes_to_return(self, gen, trans_id, changes_to_return): +        """ +        Put the calculated changes to return in the backend sync state. + +        :param gen: The target database generation that will be synced. +        :type gen: int +        :param trans_id: The target database transaction id that will be +                         synced. +        :type trans_id: str +        :param changes_to_return: A list of tuples with the changes to be +                                  returned during the sync process. +        :type changes_to_return: list +        """ +        self._put_dict_info( +            'changes_to_return', +            { +                'gen': gen, +                'trans_id': trans_id, +                'changes_to_return': changes_to_return, +            } +        ) + +    def sync_info(self): +        """ +        Return information about the current sync state. + +        :return: The generation and transaction id of the target database +                 which will be synced, and the number of documents to return, +                 or a tuple of Nones if those have not already been sent to +                 server. +        :rtype: tuple +        """ +        info = self._storage.get('changes_to_return') if 'changes_to_return' in self._storage else None +        gen = None +        trans_id = None +        number_of_changes = None +        if info: +            info = info[0] +            gen = info['gen'] +            trans_id = info['trans_id'] +            number_of_changes = len(info['changes_to_return']) +        return gen, trans_id, number_of_changes + +    def next_change_to_return(self, received): +        """ +        Return the next change to be returned to the source syncing replica. + +        :param received: How many documents the source replica has already +                         received during the current sync process. +        :type received: int +        """ +        info = self._storage.get('changes_to_return') if 'changes_to_return' in self._storage else None +        gen = trans_id = next_change_to_return = None +        if info: +            info = info[0] +            gen = info['gen'] +            trans_id = info['trans_id'] +            if received < len(info['changes_to_return']): +                next_change_to_return = tuple(info['changes_to_return'][received]) +        return gen, trans_id, next_change_to_return diff --git a/server/src/leap/soledad/server/sync.py b/server/src/leap/soledad/server/sync.py index 18c4ee40..64f7e4e7 100644 --- a/server/src/leap/soledad/server/sync.py +++ b/server/src/leap/soledad/server/sync.py @@ -17,183 +17,15 @@  """  Server side synchronization infrastructure.  """ -import json - -from leap.soledad.common.couch import CouchDatabase  from u1db import sync, Document  from u1db.remote import http_app +from leap.soledad.server.state import ServerSyncState  MAX_REQUEST_SIZE = 200  # in Mb  MAX_ENTRY_SIZE = 200  # in Mb -class ServerSyncState(object): -    """ -    The state of one sync session, as stored on backend server. - -    This object performes queries to distinct design documents: - -        _design/syncs/_update/state -        _design/syncs/_view/state -        _design/syncs/_view/seen_ids -        _design/syncs/_view/changes_to_return - -    On server side, the ongoing syncs metadata is maintained in a document -    called 'u1db_sync_state'. -    """ - -    def __init__(self, db, source_replica_uid, sync_id): -        """ -        Initialize the sync state object. - -        :param db: The target syncing database. -        :type db: CouchDatabase. -        :param source_replica_uid: CouchDatabase -        :type source_replica_uid: str -        """ -        self._db = db -        self._source_replica_uid = source_replica_uid -        self._sync_id = sync_id - -    def _key(self, key): -        """ -        Format a key to be used on couch views. - -        :param key: The lookup key. -        :type key: json serializable object - -        :return: The properly formatted key. -        :rtype: str -        """ -        return json.dumps(key, separators=(',', ':')) - -    def _put_info(self, key, value): -        """ -        Put some information on the sync state document. - -        This method works in conjunction with the -        _design/syncs/_update/state update handler couch backend. - -        :param key: The key for the info to be put. -        :type key: str -        :param value: The value for the info to be put. -        :type value: str -        """ -        ddoc_path = [ -            '_design', 'syncs', '_update', 'state', -            'u1db_sync_state'] -        res = self._db._database.resource(*ddoc_path) -        with CouchDatabase.sync_info_lock[self._db.replica_uid]: -            res.put_json( -                body={ -                    'sync_id': self._sync_id, -                    'source_replica_uid': self._source_replica_uid, -                    key: value, -                }, -                headers={'content-type': 'application/json'}) - -    def put_seen_id(self, seen_id, gen): -        """ -        Put one seen id on the sync state document. - -        :param seen_id: The doc_id of a document seen during sync. -        :type seen_id: str -        :param gen: The corresponding db generation for that document. -        :type gen: int -        """ -        self._put_info( -            'seen_id', -            [seen_id, gen]) - -    def seen_ids(self): -        """ -        Return all document ids seen during the sync. - -        :return: A list with doc ids seen during the sync. -        :rtype: list -        """ -        ddoc_path = ['_design', 'syncs', '_view', 'seen_ids'] -        resource = self._db._database.resource(*ddoc_path) -        response = resource.get_json( -            key=self._key([self._source_replica_uid, self._sync_id])) -        data = response[2] -        if data['rows']: -            entry = data['rows'].pop() -            return entry['value']['seen_ids'] -        return [] - -    def put_changes_to_return(self, gen, trans_id, changes_to_return): -        """ -        Put the calculated changes to return in the backend sync state -        document. - -        :param gen: The target database generation that will be synced. -        :type gen: int -        :param trans_id: The target database transaction id that will be -                         synced. -        :type trans_id: str -        :param changes_to_return: A list of tuples with the changes to be -                                  returned during the sync process. -        :type changes_to_return: list -        """ -        self._put_info( -            'changes_to_return', -            { -                'gen': gen, -                'trans_id': trans_id, -                'changes_to_return': changes_to_return, -            } -        ) - -    def sync_info(self): -        """ -        Return information about the current sync state. - -        :return: The generation and transaction id of the target database -                 which will be synced, and the number of documents to return, -                 or a tuple of Nones if those have not already been sent to -                 server. -        :rtype: tuple -        """ -        ddoc_path = ['_design', 'syncs', '_view', 'state'] -        resource = self._db._database.resource(*ddoc_path) -        response = resource.get_json( -            key=self._key([self._source_replica_uid, self._sync_id])) -        data = response[2] -        gen = None -        trans_id = None -        number_of_changes = None -        if data['rows'] and data['rows'][0]['value'] is not None: -            value = data['rows'][0]['value'] -            gen = value['gen'] -            trans_id = value['trans_id'] -            number_of_changes = value['number_of_changes'] -        return gen, trans_id, number_of_changes - -    def next_change_to_return(self, received): -        """ -        Return the next change to be returned to the source syncing replica. - -        :param received: How many documents the source replica has already -                         received during the current sync process. -        :type received: int -        """ -        ddoc_path = ['_design', 'syncs', '_view', 'changes_to_return'] -        resource = self._db._database.resource(*ddoc_path) -        response = resource.get_json( -            key=self._key( -                [self._source_replica_uid, self._sync_id, received])) -        data = response[2] -        if not data['rows']: -            return None, None, None -        value = data['rows'][0]['value'] -        gen = value['gen'] -        trans_id = value['trans_id'] -        next_change_to_return = value['next_change_to_return'] -        return gen, trans_id, tuple(next_change_to_return) - -  class SyncExchange(sync.SyncExchange):      def __init__(self, db, source_replica_uid, last_known_generation, sync_id): @@ -216,8 +48,7 @@ class SyncExchange(sync.SyncExchange):          self.new_trans_id = None          self._trace_hook = None          # recover sync state -        self._sync_state = ServerSyncState( -            self._db, self.source_replica_uid, sync_id) +        self._sync_state = ServerSyncState(self.source_replica_uid, sync_id)      def find_changes_to_return(self, received):          """ | 
