diff options
Diffstat (limited to 'client/src/leap/soledad/client/http_target/fetch.py')
-rw-r--r-- | client/src/leap/soledad/client/http_target/fetch.py | 237 |
1 files changed, 237 insertions, 0 deletions
diff --git a/client/src/leap/soledad/client/http_target/fetch.py b/client/src/leap/soledad/client/http_target/fetch.py new file mode 100644 index 00000000..c4bb79a0 --- /dev/null +++ b/client/src/leap/soledad/client/http_target/fetch.py @@ -0,0 +1,237 @@ +# -*- coding: utf-8 -*- +# http_target.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +import logging +import json +from u1db import errors +from u1db.remote import utils +from twisted.internet import defer +from leap.soledad.common.document import SoledadDocument +from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS +from leap.soledad.client.events import emit +from leap.soledad.client.crypto import is_symmetrically_encrypted +from leap.soledad.client.encdecpool import SyncDecrypterPool +from leap.soledad.client.http_target.support import RequestBody + +logger = logging.getLogger(__name__) + + +class HTTPDocFetcher(object): + + @defer.inlineCallbacks + def _receive_docs(self, last_known_generation, last_known_trans_id, + ensure_callback, sync_id, defer_decryption): + + self._queue_for_decrypt = defer_decryption \ + and self._sync_db is not None + + new_generation = last_known_generation + new_transaction_id = last_known_trans_id + + if self._queue_for_decrypt: + logger.debug( + "Soledad sync: will queue received docs for decrypting.") + + if defer_decryption: + self._setup_sync_decr_pool() + + # --------------------------------------------------------------------- + # maybe receive the first document + # --------------------------------------------------------------------- + + # we fetch the first document before fetching the rest because we need + # to know the total number of documents to be received, and this + # information comes as metadata to each request. + + doc = yield self._receive_one_doc( + last_known_generation, last_known_trans_id, + sync_id, 0) + self._received_docs = 0 + number_of_changes, ngen, ntrans = self._insert_received_doc(doc, 1, 1) + + if defer_decryption: + self._sync_decr_pool.start(number_of_changes) + + # --------------------------------------------------------------------- + # maybe receive the rest of the documents + # --------------------------------------------------------------------- + + # launch many asynchronous fetches and inserts of received documents + # in the temporary sync db. Will wait for all results before + # continuing. + + received = 1 + deferreds = [] + while received < number_of_changes: + d = self._receive_one_doc( + last_known_generation, + last_known_trans_id, sync_id, received) + d.addCallback( + self._insert_received_doc, + received + 1, # the index of the current received doc + number_of_changes) + deferreds.append(d) + received += 1 + results = yield defer.gatherResults(deferreds) + + # get generation and transaction id of target after insertions + if deferreds: + _, new_generation, new_transaction_id = results.pop() + + # --------------------------------------------------------------------- + # wait for async decryption to finish + # --------------------------------------------------------------------- + + if defer_decryption: + yield self._sync_decr_pool.deferred + self._sync_decr_pool.stop() + + defer.returnValue([new_generation, new_transaction_id]) + + def _receive_one_doc(self, last_known_generation, + last_known_trans_id, sync_id, received): + # add remote replica metadata to the request + body = RequestBody( + last_known_generation=last_known_generation, + last_known_trans_id=last_known_trans_id, + sync_id=sync_id, + ensure=self._ensure_callback is not None) + # inform server of how many documents have already been received + body.insert_info(received=received) + # send headers + return self._http_request( + self._url, + method='POST', + body=str(body), + content_type='application/x-soledad-sync-get') + + def _insert_received_doc(self, response, idx, total): + """ + Insert a received document into the local replica. + + :param response: The body and headers of the response. + :type response: tuple(str, dict) + :param idx: The index count of the current operation. + :type idx: int + :param total: The total number of operations. + :type total: int + """ + new_generation, new_transaction_id, number_of_changes, doc_id, \ + rev, content, gen, trans_id = \ + self._parse_received_doc_response(response) + if doc_id is not None: + # decrypt incoming document and insert into local database + # ------------------------------------------------------------- + # symmetric decryption of document's contents + # ------------------------------------------------------------- + # If arriving content was symmetrically encrypted, we decrypt it. + # We do it inline if defer_decryption flag is False or no sync_db + # was defined, otherwise we defer it writing it to the received + # docs table. + doc = SoledadDocument(doc_id, rev, content) + if is_symmetrically_encrypted(doc): + if self._queue_for_decrypt: + self._sync_decr_pool.insert_encrypted_received_doc( + doc.doc_id, doc.rev, doc.content, gen, trans_id, + idx) + else: + # defer_decryption is False or no-sync-db fallback + doc.set_json(self._crypto.decrypt_doc(doc)) + self._insert_doc_cb(doc, gen, trans_id) + else: + # not symmetrically encrypted doc, insert it directly + # or save it in the decrypted stage. + if self._queue_for_decrypt: + self._sync_decr_pool.insert_received_doc( + doc.doc_id, doc.rev, doc.content, gen, trans_id, + idx) + else: + self._insert_doc_cb(doc, gen, trans_id) + # ------------------------------------------------------------- + # end of symmetric decryption + # ------------------------------------------------------------- + self._received_docs += 1 + _emit_received(self._received_docs, total) + return number_of_changes, new_generation, new_transaction_id + + def _parse_received_doc_response(self, response): + """ + Parse the response from the server containing the received document. + + :param response: The body and headers of the response. + :type response: tuple(str, dict) + + :return: (new_gen, new_trans_id, number_of_changes, doc_id, rev, + content, gen, trans_id) + :rtype: tuple + """ + # decode incoming stream + parts = response.splitlines() + if not parts or parts[0] != '[' or parts[-1] != ']': + raise errors.BrokenSyncStream + data = parts[1:-1] + # decode metadata + try: + line, comma = utils.check_and_strip_comma(data[0]) + metadata = None + except (IndexError): + raise errors.BrokenSyncStream + try: + metadata = json.loads(line) + new_generation = metadata['new_generation'] + new_transaction_id = metadata['new_transaction_id'] + number_of_changes = metadata['number_of_changes'] + except (ValueError, KeyError): + raise errors.BrokenSyncStream + # make sure we have replica_uid from fresh new dbs + if self._ensure_callback and 'replica_uid' in metadata: + self._ensure_callback(metadata['replica_uid']) + # parse incoming document info + doc_id = None + rev = None + content = None + gen = None + trans_id = None + if number_of_changes > 0: + try: + entry = json.loads(data[1]) + doc_id = entry['id'] + rev = entry['rev'] + content = entry['content'] + gen = entry['gen'] + trans_id = entry['trans_id'] + except (IndexError, KeyError): + raise errors.BrokenSyncStream + return new_generation, new_transaction_id, number_of_changes, \ + doc_id, rev, content, gen, trans_id + + def _setup_sync_decr_pool(self): + """ + Set up the SyncDecrypterPool for deferred decryption. + """ + if self._sync_decr_pool is None and self._sync_db is not None: + # initialize syncing queue decryption pool + self._sync_decr_pool = SyncDecrypterPool( + self._crypto, + self._sync_db, + insert_doc_cb=self._insert_doc_cb, + source_replica_uid=self.source_replica_uid) + + +def _emit_received(received_docs, total): + msg = "%d/%d" % (received_docs, total) + emit(SOLEDAD_SYNC_RECEIVE_STATUS, msg) + logger.debug("Sync receive status: %s" % msg) |