diff options
Diffstat (limited to 'client/src/leap/soledad/client/http_target/fetch.py')
-rw-r--r-- | client/src/leap/soledad/client/http_target/fetch.py | 161 |
1 files changed, 0 insertions, 161 deletions
diff --git a/client/src/leap/soledad/client/http_target/fetch.py b/client/src/leap/soledad/client/http_target/fetch.py deleted file mode 100644 index 9d456830..00000000 --- a/client/src/leap/soledad/client/http_target/fetch.py +++ /dev/null @@ -1,161 +0,0 @@ -# -*- coding: utf-8 -*- -# fetch.py -# Copyright (C) 2015 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. -import json -from twisted.internet import defer -from twisted.internet import threads - -from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS -from leap.soledad.client.events import emit_async -from leap.soledad.client.http_target.support import RequestBody -from leap.soledad.common.log import getLogger -from leap.soledad.client._crypto import is_symmetrically_encrypted -from leap.soledad.common.l2db import errors -from leap.soledad.client import crypto as old_crypto - -from .._document import Document -from . import fetch_protocol - -logger = getLogger(__name__) - - -class HTTPDocFetcher(object): - """ - Handles Document fetching from Soledad server, using HTTP as transport. - Steps: - * Prepares metadata by asking server for one document - * Fetch the total on response and prepare to ask all remaining - * (async) Documents will come encrypted. - So we parse, decrypt and insert locally as they arrive. - """ - - # The uuid of the local replica. - # Any class inheriting from this one should provide a meaningful attribute - # if the sync status event is meant to be used somewhere else. - - uuid = 'undefined' - userid = 'undefined' - - @defer.inlineCallbacks - def _receive_docs(self, last_known_generation, last_known_trans_id, - ensure_callback, sync_id): - new_generation = last_known_generation - new_transaction_id = last_known_trans_id - # Acts as a queue, ensuring line order on async processing - # as `self._insert_doc_cb` cant be run concurrently or out of order. - # DeferredSemaphore solves the concurrency and its implementation uses - # a queue, solving the ordering. - # FIXME: Find a proper solution to avoid surprises on Twisted changes - self.semaphore = defer.DeferredSemaphore(1) - - metadata = yield self._fetch_all( - last_known_generation, last_known_trans_id, - sync_id) - number_of_changes, ngen, ntrans = self._parse_metadata(metadata) - - # wait for pending inserts - yield self.semaphore.acquire() - - if ngen: - new_generation = ngen - new_transaction_id = ntrans - - defer.returnValue([new_generation, new_transaction_id]) - - def _fetch_all(self, last_known_generation, - last_known_trans_id, sync_id): - # add remote replica metadata to the request - body = RequestBody( - last_known_generation=last_known_generation, - last_known_trans_id=last_known_trans_id, - sync_id=sync_id, - ensure=self._ensure_callback is not None) - self._received_docs = 0 - # build a stream reader with _doc_parser as a callback - body_reader = fetch_protocol.build_body_reader(self._doc_parser) - # start download stream - return self._http_request( - self._url, - method='POST', - body=str(body), - content_type='application/x-soledad-sync-get', - body_reader=body_reader) - - @defer.inlineCallbacks - def _doc_parser(self, doc_info, content, total): - """ - Insert a received document into the local replica, decrypting - if necessary. The case where it's not decrypted is when a doc gets - inserted from Server side with a GPG encrypted content. - - :param doc_info: Dictionary representing Document information. - :type doc_info: dict - :param content: The Document's content. - :type idx: str - :param total: The total number of operations. - :type total: int - """ - yield self.semaphore.run(self.__atomic_doc_parse, doc_info, content, - total) - - @defer.inlineCallbacks - def __atomic_doc_parse(self, doc_info, content, total): - doc = Document(doc_info['id'], doc_info['rev'], content) - if is_symmetrically_encrypted(content): - content = (yield self._crypto.decrypt_doc(doc)).getvalue() - elif old_crypto.is_symmetrically_encrypted(doc): - content = self._deprecated_crypto.decrypt_doc(doc) - doc.set_json(content) - - # TODO insert blobs here on the blob backend - # FIXME: This is wrong. Using the very same SQLite connection object - # from multiple threads is dangerous. We should bring the dbpool here - # or find an alternative. Deferring to a thread only helps releasing - # the reactor for other tasks as this is an IO intensive call. - yield threads.deferToThread(self._insert_doc_cb, - doc, doc_info['gen'], doc_info['trans_id']) - self._received_docs += 1 - user_data = {'uuid': self.uuid, 'userid': self.userid} - _emit_receive_status(user_data, self._received_docs, total=total) - - def _parse_metadata(self, metadata): - """ - Parse the response from the server containing the sync metadata. - - :param response: Metadata as string - :type response: str - - :return: (number_of_changes, new_gen, new_trans_id) - :rtype: tuple - """ - try: - metadata = json.loads(metadata) - # make sure we have replica_uid from fresh new dbs - if self._ensure_callback and 'replica_uid' in metadata: - self._ensure_callback(metadata['replica_uid']) - return (metadata['number_of_changes'], metadata['new_generation'], - metadata['new_transaction_id']) - except (ValueError, KeyError): - raise errors.BrokenSyncStream('Metadata parsing failed') - - -def _emit_receive_status(user_data, received_docs, total): - content = {'received': received_docs, 'total': total} - emit_async(SOLEDAD_SYNC_RECEIVE_STATUS, user_data, content) - - if received_docs % 20 == 0: - msg = "%d/%d" % (received_docs, total) - logger.debug("Sync receive status: %s" % msg) |