summaryrefslogtreecommitdiff
path: root/src/leap/soledad/client/http_target/fetch.py
diff options
context:
space:
mode:
authordrebs <drebs@leap.se>2017-06-18 11:18:10 -0300
committerKali Kaneko <kali@leap.se>2017-06-24 00:49:17 +0200
commit3e94cafa43d464d73815e21810b97a4faf54136d (patch)
tree04f5152e3dfc9f27b7dca2368c0eb8b3f094f5b5 /src/leap/soledad/client/http_target/fetch.py
parent7d8ee786b086e47264619df3efa73e74440fd068 (diff)
[pkg] unify client and server into a single python package
We have been discussing about this merge for a while. Its main goal is to simplify things: code navigation, but also packaging. The rationale is that the code is more cohesive in this way, and there's only one source package to install. Dependencies that are only for the server or the client will not be installed by default, and they are expected to be provided by the environment. There are setuptools extras defined for the client and the server. Debianization is still expected to split the single source package into 3 binaries. Another avantage is that the documentation can now install a single package with a single step, and therefore include the docstrings into the generated docs. - Resolves: #8896
Diffstat (limited to 'src/leap/soledad/client/http_target/fetch.py')
-rw-r--r--src/leap/soledad/client/http_target/fetch.py161
1 files changed, 161 insertions, 0 deletions
diff --git a/src/leap/soledad/client/http_target/fetch.py b/src/leap/soledad/client/http_target/fetch.py
new file mode 100644
index 00000000..9d456830
--- /dev/null
+++ b/src/leap/soledad/client/http_target/fetch.py
@@ -0,0 +1,161 @@
+# -*- coding: utf-8 -*-
+# fetch.py
+# Copyright (C) 2015 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import json
+from twisted.internet import defer
+from twisted.internet import threads
+
+from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS
+from leap.soledad.client.events import emit_async
+from leap.soledad.client.http_target.support import RequestBody
+from leap.soledad.common.log import getLogger
+from leap.soledad.client._crypto import is_symmetrically_encrypted
+from leap.soledad.common.l2db import errors
+from leap.soledad.client import crypto as old_crypto
+
+from .._document import Document
+from . import fetch_protocol
+
+logger = getLogger(__name__)
+
+
+class HTTPDocFetcher(object):
+ """
+ Handles Document fetching from Soledad server, using HTTP as transport.
+ Steps:
+ * Prepares metadata by asking server for one document
+ * Fetch the total on response and prepare to ask all remaining
+ * (async) Documents will come encrypted.
+ So we parse, decrypt and insert locally as they arrive.
+ """
+
+ # The uuid of the local replica.
+ # Any class inheriting from this one should provide a meaningful attribute
+ # if the sync status event is meant to be used somewhere else.
+
+ uuid = 'undefined'
+ userid = 'undefined'
+
+ @defer.inlineCallbacks
+ def _receive_docs(self, last_known_generation, last_known_trans_id,
+ ensure_callback, sync_id):
+ new_generation = last_known_generation
+ new_transaction_id = last_known_trans_id
+ # Acts as a queue, ensuring line order on async processing
+ # as `self._insert_doc_cb` cant be run concurrently or out of order.
+ # DeferredSemaphore solves the concurrency and its implementation uses
+ # a queue, solving the ordering.
+ # FIXME: Find a proper solution to avoid surprises on Twisted changes
+ self.semaphore = defer.DeferredSemaphore(1)
+
+ metadata = yield self._fetch_all(
+ last_known_generation, last_known_trans_id,
+ sync_id)
+ number_of_changes, ngen, ntrans = self._parse_metadata(metadata)
+
+ # wait for pending inserts
+ yield self.semaphore.acquire()
+
+ if ngen:
+ new_generation = ngen
+ new_transaction_id = ntrans
+
+ defer.returnValue([new_generation, new_transaction_id])
+
+ def _fetch_all(self, last_known_generation,
+ last_known_trans_id, sync_id):
+ # add remote replica metadata to the request
+ body = RequestBody(
+ last_known_generation=last_known_generation,
+ last_known_trans_id=last_known_trans_id,
+ sync_id=sync_id,
+ ensure=self._ensure_callback is not None)
+ self._received_docs = 0
+ # build a stream reader with _doc_parser as a callback
+ body_reader = fetch_protocol.build_body_reader(self._doc_parser)
+ # start download stream
+ return self._http_request(
+ self._url,
+ method='POST',
+ body=str(body),
+ content_type='application/x-soledad-sync-get',
+ body_reader=body_reader)
+
+ @defer.inlineCallbacks
+ def _doc_parser(self, doc_info, content, total):
+ """
+ Insert a received document into the local replica, decrypting
+ if necessary. The case where it's not decrypted is when a doc gets
+ inserted from Server side with a GPG encrypted content.
+
+ :param doc_info: Dictionary representing Document information.
+ :type doc_info: dict
+ :param content: The Document's content.
+ :type idx: str
+ :param total: The total number of operations.
+ :type total: int
+ """
+ yield self.semaphore.run(self.__atomic_doc_parse, doc_info, content,
+ total)
+
+ @defer.inlineCallbacks
+ def __atomic_doc_parse(self, doc_info, content, total):
+ doc = Document(doc_info['id'], doc_info['rev'], content)
+ if is_symmetrically_encrypted(content):
+ content = (yield self._crypto.decrypt_doc(doc)).getvalue()
+ elif old_crypto.is_symmetrically_encrypted(doc):
+ content = self._deprecated_crypto.decrypt_doc(doc)
+ doc.set_json(content)
+
+ # TODO insert blobs here on the blob backend
+ # FIXME: This is wrong. Using the very same SQLite connection object
+ # from multiple threads is dangerous. We should bring the dbpool here
+ # or find an alternative. Deferring to a thread only helps releasing
+ # the reactor for other tasks as this is an IO intensive call.
+ yield threads.deferToThread(self._insert_doc_cb,
+ doc, doc_info['gen'], doc_info['trans_id'])
+ self._received_docs += 1
+ user_data = {'uuid': self.uuid, 'userid': self.userid}
+ _emit_receive_status(user_data, self._received_docs, total=total)
+
+ def _parse_metadata(self, metadata):
+ """
+ Parse the response from the server containing the sync metadata.
+
+ :param response: Metadata as string
+ :type response: str
+
+ :return: (number_of_changes, new_gen, new_trans_id)
+ :rtype: tuple
+ """
+ try:
+ metadata = json.loads(metadata)
+ # make sure we have replica_uid from fresh new dbs
+ if self._ensure_callback and 'replica_uid' in metadata:
+ self._ensure_callback(metadata['replica_uid'])
+ return (metadata['number_of_changes'], metadata['new_generation'],
+ metadata['new_transaction_id'])
+ except (ValueError, KeyError):
+ raise errors.BrokenSyncStream('Metadata parsing failed')
+
+
+def _emit_receive_status(user_data, received_docs, total):
+ content = {'received': received_docs, 'total': total}
+ emit_async(SOLEDAD_SYNC_RECEIVE_STATUS, user_data, content)
+
+ if received_docs % 20 == 0:
+ msg = "%d/%d" % (received_docs, total)
+ logger.debug("Sync receive status: %s" % msg)