summaryrefslogtreecommitdiff
path: root/server/src/leap/soledad/server/sync.py
diff options
context:
space:
mode:
Diffstat (limited to 'server/src/leap/soledad/server/sync.py')
-rw-r--r--server/src/leap/soledad/server/sync.py305
1 files changed, 0 insertions, 305 deletions
diff --git a/server/src/leap/soledad/server/sync.py b/server/src/leap/soledad/server/sync.py
deleted file mode 100644
index 6791c06c..00000000
--- a/server/src/leap/soledad/server/sync.py
+++ /dev/null
@@ -1,305 +0,0 @@
-# -*- coding: utf-8 -*-
-# sync.py
-# Copyright (C) 2014 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-"""
-Server side synchronization infrastructure.
-"""
-import time
-from six.moves import zip as izip
-
-from leap.soledad.common.l2db import sync
-from leap.soledad.common.l2db.remote import http_app
-from leap.soledad.server.caching import get_cache_for
-from leap.soledad.server.state import ServerSyncState
-from leap.soledad.common.document import ServerDocument
-
-
-MAX_REQUEST_SIZE = float('inf') # It's a stream.
-MAX_ENTRY_SIZE = 200 # in Mb
-ENTRY_CACHE_SIZE = 8192 * 1024
-
-
-class SyncExchange(sync.SyncExchange):
-
- def __init__(self, db, source_replica_uid, last_known_generation, sync_id):
- """
- :param db: The target syncing database.
- :type db: SoledadBackend
- :param source_replica_uid: The uid of the source syncing replica.
- :type source_replica_uid: str
- :param last_known_generation: The last target replica generation the
- source replica knows about.
- :type last_known_generation: int
- :param sync_id: The id of the current sync session.
- :type sync_id: str
- """
- self._db = db
- self.source_replica_uid = source_replica_uid
- self.source_last_known_generation = last_known_generation
- self.sync_id = sync_id
- self.new_gen = None
- self.new_trans_id = None
- self._trace_hook = None
- # recover sync state
- self._sync_state = ServerSyncState(self.source_replica_uid, sync_id)
-
- def find_changes_to_return(self):
- """
- Find changes to return.
-
- Find changes since last_known_generation in db generation
- order using whats_changed. It excludes documents ids that have
- already been considered (superseded by the sender, etc).
-
- :return: the generation of this database, which the caller can
- consider themselves to be synchronized after processing
- allreturned documents, and the amount of documents to be sent
- to the source syncing replica.
- :rtype: int
- """
- # check if changes to return have already been calculated
- new_gen, new_trans_id, number_of_changes = self._sync_state.sync_info()
- if number_of_changes is None:
- self._trace('before whats_changed')
- new_gen, new_trans_id, changes = self._db.whats_changed(
- self.source_last_known_generation)
- self._trace('after whats_changed')
- seen_ids = self._sync_state.seen_ids()
- # changed docs that weren't superseded by or converged with
- self.changes_to_return = [
- (doc_id, gen, trans_id) for (doc_id, gen, trans_id) in changes
- # there was a subsequent update
- if doc_id not in seen_ids or seen_ids.get(doc_id) < gen]
- self._sync_state.put_changes_to_return(
- new_gen, new_trans_id, self.changes_to_return)
- number_of_changes = len(self.changes_to_return)
- self.new_gen = new_gen
- self.new_trans_id = new_trans_id
- return self.new_gen, number_of_changes
-
- def return_docs(self, return_doc_cb):
- """Return the changed documents and their last change generation
- repeatedly invoking the callback return_doc_cb.
-
- The final step of a sync exchange.
-
- :param: return_doc_cb(doc, gen, trans_id): is a callback
- used to return the documents with their last change generation
- to the target replica.
- :return: None
- """
- changes_to_return = self.changes_to_return
- # return docs, including conflicts.
- # content as a file-object (will be read when writing)
- changed_doc_ids = [doc_id for doc_id, _, _ in changes_to_return]
- docs = self._db.get_docs(
- changed_doc_ids, check_for_conflicts=False,
- include_deleted=True, read_content=False)
-
- docs_by_gen = izip(
- docs, (gen for _, gen, _ in changes_to_return),
- (trans_id for _, _, trans_id in changes_to_return))
- for doc, gen, trans_id in docs_by_gen:
- return_doc_cb(doc, gen, trans_id)
-
- def batched_insert_from_source(self, entries, sync_id):
- if not entries:
- return
- self._db.batch_start()
- for entry in entries:
- doc, gen, trans_id, number_of_docs, doc_idx = entry
- self.insert_doc_from_source(doc, gen, trans_id, number_of_docs,
- doc_idx, sync_id)
- self._db.batch_end()
-
- def insert_doc_from_source(
- self, doc, source_gen, trans_id,
- number_of_docs=None, doc_idx=None, sync_id=None):
- """Try to insert synced document from source.
-
- Conflicting documents are not inserted but will be sent over
- to the sync source.
-
- It keeps track of progress by storing the document source
- generation as well.
-
- The 1st step of a sync exchange is to call this repeatedly to
- try insert all incoming documents from the source.
-
- :param doc: A Document object.
- :type doc: Document
- :param source_gen: The source generation of doc.
- :type source_gen: int
- :param trans_id: The transaction id of that document change.
- :type trans_id: str
- :param number_of_docs: The total amount of documents sent on this sync
- session.
- :type number_of_docs: int
- :param doc_idx: The index of the current document.
- :type doc_idx: int
- :param sync_id: The id of the current sync session.
- :type sync_id: str
- """
- state, at_gen = self._db._put_doc_if_newer(
- doc, save_conflict=False, replica_uid=self.source_replica_uid,
- replica_gen=source_gen, replica_trans_id=trans_id,
- number_of_docs=number_of_docs, doc_idx=doc_idx, sync_id=sync_id)
- if state == 'inserted':
- self._sync_state.put_seen_id(doc.doc_id, at_gen)
- elif state == 'converged':
- # magical convergence
- self._sync_state.put_seen_id(doc.doc_id, at_gen)
- elif state == 'superseded':
- # we have something newer that we will return
- pass
- else:
- # conflict that we will returne
- assert state == 'conflicted'
-
-
-class SyncResource(http_app.SyncResource):
-
- max_request_size = MAX_REQUEST_SIZE * 1024 * 1024
- max_entry_size = MAX_ENTRY_SIZE * 1024 * 1024
-
- sync_exchange_class = SyncExchange
-
- @http_app.http_method(
- last_known_generation=int, last_known_trans_id=http_app.none_or_str,
- sync_id=http_app.none_or_str, content_as_args=True)
- def post_args(self, last_known_generation, last_known_trans_id=None,
- sync_id=None, ensure=False):
- """
- Handle the initial arguments for the sync POST request from client.
-
- :param last_known_generation: The last server replica generation the
- client knows about.
- :type last_known_generation: int
- :param last_known_trans_id: The last server replica transaction_id the
- client knows about.
- :type last_known_trans_id: str
- :param sync_id: The id of the current sync session.
- :type sync_id: str
- :param ensure: Whether the server replica should be created if it does
- not already exist.
- :type ensure: bool
- """
- # create or open the database
- cache = get_cache_for('db-' + sync_id + self.dbname, expire=120)
- if ensure:
- db, self.replica_uid = self.state.ensure_database(self.dbname)
- else:
- db = self.state.open_database(self.dbname)
- db.init_caching(cache)
- # validate the information the client has about server replica
- db.validate_gen_and_trans_id(
- last_known_generation, last_known_trans_id)
- # get a sync exchange object
- self.sync_exch = self.sync_exchange_class(
- db, self.source_replica_uid, last_known_generation, sync_id)
- self._sync_id = sync_id
- self._staging = []
- self._staging_size = 0
-
- @http_app.http_method(content_as_args=True)
- def post_put(
- self, id, rev, content, gen,
- trans_id, number_of_docs, doc_idx):
- """
- Put one incoming document into the server replica.
-
- :param id: The id of the incoming document.
- :type id: str
- :param rev: The revision of the incoming document.
- :type rev: str
- :param content: The content of the incoming document.
- :type content: dict
- :param gen: The source replica generation corresponding to the
- revision of the incoming document.
- :type gen: int
- :param trans_id: The source replica transaction id corresponding to
- the revision of the incoming document.
- :type trans_id: str
- :param number_of_docs: The total amount of documents sent on this sync
- session.
- :type number_of_docs: int
- :param doc_idx: The index of the current document.
- :type doc_idx: int
- """
- doc = ServerDocument(id, rev, json=content)
- self._staging_size += len(content or '')
- self._staging.append((doc, gen, trans_id, number_of_docs, doc_idx))
- if self._staging_size > ENTRY_CACHE_SIZE or doc_idx == number_of_docs:
- self.sync_exch.batched_insert_from_source(self._staging,
- self._sync_id)
- self._staging = []
- self._staging_size = 0
-
- def post_get(self):
- """
- Return syncing documents to the client.
- """
- def send_doc(doc, gen, trans_id):
- entry = dict(id=doc.doc_id, rev=doc.rev,
- gen=gen, trans_id=trans_id)
- self.responder.stream_entry(entry)
- content_reader = doc.get_json()
- if content_reader:
- content = content_reader.read()
- self.responder.stream_entry(content)
- content_reader.close()
- # throttle at 5mb/s
- # FIXME: twistd cant control througput
- # we need to either use gunicorn or go async
- time.sleep(len(content) / (5.0 * 1024 * 1024))
- else:
- self.responder.stream_entry('')
-
- new_gen, number_of_changes = \
- self.sync_exch.find_changes_to_return()
- self.responder.content_type = 'application/x-u1db-sync-response'
- self.responder.start_response(200)
- self.responder.start_stream(),
- header = {
- "new_generation": new_gen,
- "new_transaction_id": self.sync_exch.new_trans_id,
- "number_of_changes": number_of_changes,
- }
- if self.replica_uid is not None:
- header['replica_uid'] = self.replica_uid
- self.responder.stream_entry(header)
- self.sync_exch.return_docs(send_doc)
- self.responder.end_stream()
- self.responder.finish_response()
-
- def post_end(self):
- """
- Return the current generation and transaction_id after inserting one
- incoming document.
- """
- self.responder.content_type = 'application/x-soledad-sync-response'
- self.responder.start_response(200)
- self.responder.start_stream(),
- new_gen, new_trans_id = self.sync_exch._db._get_generation_info()
- header = {
- "new_generation": new_gen,
- "new_transaction_id": new_trans_id,
- }
- if self.replica_uid is not None:
- header['replica_uid'] = self.replica_uid
- self.responder.stream_entry(header)
- self.responder.end_stream()
- self.responder.finish_response()