summaryrefslogtreecommitdiff
path: root/client/src/leap/soledad/client/http_target
diff options
context:
space:
mode:
Diffstat (limited to 'client/src/leap/soledad/client/http_target')
-rw-r--r--client/src/leap/soledad/client/http_target/__init__.py94
-rw-r--r--client/src/leap/soledad/client/http_target/api.py248
-rw-r--r--client/src/leap/soledad/client/http_target/fetch.py161
-rw-r--r--client/src/leap/soledad/client/http_target/fetch_protocol.py157
-rw-r--r--client/src/leap/soledad/client/http_target/send.py107
-rw-r--r--client/src/leap/soledad/client/http_target/send_protocol.py75
-rw-r--r--client/src/leap/soledad/client/http_target/support.py220
7 files changed, 0 insertions, 1062 deletions
diff --git a/client/src/leap/soledad/client/http_target/__init__.py b/client/src/leap/soledad/client/http_target/__init__.py
deleted file mode 100644
index b67d03f6..00000000
--- a/client/src/leap/soledad/client/http_target/__init__.py
+++ /dev/null
@@ -1,94 +0,0 @@
-# -*- coding: utf-8 -*-
-# __init__.py
-# Copyright (C) 2015 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-
-"""
-A U1DB backend for encrypting data before sending to server and decrypting
-after receiving.
-"""
-
-
-import os
-
-from twisted.web.client import Agent
-from twisted.internet import reactor
-
-from leap.common.certs import get_compatible_ssl_context_factory
-from leap.soledad.common.log import getLogger
-from leap.soledad.client.http_target.send import HTTPDocSender
-from leap.soledad.client.http_target.api import SyncTargetAPI
-from leap.soledad.client.http_target.fetch import HTTPDocFetcher
-from leap.soledad.client import crypto as old_crypto
-
-
-logger = getLogger(__name__)
-
-
-# we may want to collect statistics from the sync process
-DO_STATS = False
-if os.environ.get('SOLEDAD_STATS'):
- DO_STATS = True
-
-
-class SoledadHTTPSyncTarget(SyncTargetAPI, HTTPDocSender, HTTPDocFetcher):
-
- """
- A SyncTarget that encrypts data before sending and decrypts data after
- receiving.
-
- Normally encryption will have been written to the sync database upon
- document modification. The sync database is also used to write temporarily
- the parsed documents that the remote send us, before being decrypted and
- written to the main database.
- """
- def __init__(self, url, source_replica_uid, creds, crypto, cert_file):
- """
- Initialize the sync target.
-
- :param url: The server sync url.
- :type url: str
- :param source_replica_uid: The source replica uid which we use when
- deferring decryption.
- :type source_replica_uid: str
- :param creds: A dictionary containing the uuid and token.
- :type creds: creds
- :param crypto: An instance of SoledadCrypto so we can encrypt/decrypt
- document contents when syncing.
- :type crypto: soledad._crypto.SoledadCrypto
- :param cert_file: Path to the certificate of the ca used to validate
- the SSL certificate used by the remote soledad
- server.
- :type cert_file: str
- """
- if url.endswith("/"):
- url = url[:-1]
- self._url = str(url) + "/sync-from/" + str(source_replica_uid)
- self.source_replica_uid = source_replica_uid
- self._auth_header = None
- self._uuid = None
- self.set_creds(creds)
- self._crypto = crypto
- # TODO: DEPRECATED CRYPTO
- self._deprecated_crypto = old_crypto.SoledadCrypto(crypto.secret)
- self._insert_doc_cb = None
-
- # Twisted default Agent with our own ssl context factory
- factory = get_compatible_ssl_context_factory(cert_file)
- self._http = Agent(reactor, factory)
-
- if DO_STATS:
- self.sync_exchange_phase = [0]
diff --git a/client/src/leap/soledad/client/http_target/api.py b/client/src/leap/soledad/client/http_target/api.py
deleted file mode 100644
index c68185c6..00000000
--- a/client/src/leap/soledad/client/http_target/api.py
+++ /dev/null
@@ -1,248 +0,0 @@
-# -*- coding: utf-8 -*-
-# api.py
-# Copyright (C) 2015 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import os
-import json
-import base64
-
-from six import StringIO
-from uuid import uuid4
-
-from twisted.internet import defer
-from twisted.web.http_headers import Headers
-from twisted.web.client import FileBodyProducer
-
-from leap.soledad.client.http_target.support import readBody
-from leap.soledad.common.errors import InvalidAuthTokenError
-from leap.soledad.common.l2db.errors import HTTPError
-from leap.soledad.common.l2db import SyncTarget
-
-
-# we may want to collect statistics from the sync process
-DO_STATS = False
-if os.environ.get('SOLEDAD_STATS'):
- DO_STATS = True
-
-
-class SyncTargetAPI(SyncTarget):
- """
- Declares public methods and implements u1db.SyncTarget.
- """
-
- @property
- def uuid(self):
- return self._uuid
-
- def set_creds(self, creds):
- """
- Update credentials.
-
- :param creds: A dictionary containing the uuid and token.
- :type creds: dict
- """
- uuid = creds['token']['uuid']
- token = creds['token']['token']
- self._uuid = uuid
- auth = '%s:%s' % (uuid, token)
- b64_token = base64.b64encode(auth)
- self._auth_header = {'Authorization': ['Token %s' % b64_token]}
-
- @property
- def _base_header(self):
- return self._auth_header.copy() if self._auth_header else {}
-
- def _http_request(self, url, method='GET', body=None, headers=None,
- content_type=None, body_reader=readBody,
- body_producer=None):
- headers = headers or self._base_header
- if content_type:
- headers.update({'content-type': [content_type]})
- if not body_producer and body:
- body = FileBodyProducer(StringIO(body))
- elif body_producer:
- # Upload case, check send.py
- body = body_producer(body)
- d = self._http.request(
- method, url, headers=Headers(headers), bodyProducer=body)
- d.addCallback(body_reader)
- d.addErrback(_unauth_to_invalid_token_error)
- return d
-
- @defer.inlineCallbacks
- def get_sync_info(self, source_replica_uid):
- """
- Return information about known state of remote database.
-
- Return the replica_uid and the current database generation of the
- remote database, and its last-seen database generation for the client
- replica.
-
- :param source_replica_uid: The client-size replica uid.
- :type source_replica_uid: str
-
- :return: A deferred which fires with (target_replica_uid,
- target_replica_generation, target_trans_id,
- source_replica_last_known_generation,
- source_replica_last_known_transaction_id)
- :rtype: twisted.internet.defer.Deferred
- """
- raw = yield self._http_request(self._url)
- res = json.loads(raw)
- defer.returnValue((
- res['target_replica_uid'],
- res['target_replica_generation'],
- res['target_replica_transaction_id'],
- res['source_replica_generation'],
- res['source_transaction_id']
- ))
-
- def record_sync_info(
- self, source_replica_uid, source_replica_generation,
- source_replica_transaction_id):
- """
- Record tip information for another replica.
-
- After sync_exchange has been processed, the caller will have
- received new content from this replica. This call allows the
- source replica instigating the sync to inform us what their
- generation became after applying the documents we returned.
-
- This is used to allow future sync operations to not need to repeat data
- that we just talked about. It also means that if this is called at the
- wrong time, there can be database records that will never be
- synchronized.
-
- :param source_replica_uid: The identifier for the source replica.
- :type source_replica_uid: str
- :param source_replica_generation: The database generation for the
- source replica.
- :type source_replica_generation: int
- :param source_replica_transaction_id: The transaction id associated
- with the source replica
- generation.
- :type source_replica_transaction_id: str
-
- :return: A deferred which fires with the result of the query.
- :rtype: twisted.internet.defer.Deferred
- """
- data = json.dumps({
- 'generation': source_replica_generation,
- 'transaction_id': source_replica_transaction_id
- })
- return self._http_request(
- self._url,
- method='PUT',
- body=data,
- content_type='application/json')
-
- @defer.inlineCallbacks
- def sync_exchange(self, docs_by_generation, source_replica_uid,
- last_known_generation, last_known_trans_id,
- insert_doc_cb, ensure_callback=None,
- sync_id=None):
- """
- Find out which documents the remote database does not know about,
- encrypt and send them. After that, receive documents from the remote
- database.
-
- :param docs_by_generations: A list of (doc_id, generation, trans_id)
- of local documents that were changed since
- the last local generation the remote
- replica knows about.
- :type docs_by_generations: list of tuples
-
- :param source_replica_uid: The uid of the source replica.
- :type source_replica_uid: str
-
- :param last_known_generation: Target's last known generation.
- :type last_known_generation: int
-
- :param last_known_trans_id: Target's last known transaction id.
- :type last_known_trans_id: str
-
- :param insert_doc_cb: A callback for inserting received documents from
- target. If not overriden, this will call u1db
- insert_doc_from_target in synchronizer, which
- implements the TAKE OTHER semantics.
- :type insert_doc_cb: function
-
- :param ensure_callback: A callback that ensures we know the target
- replica uid if the target replica was just
- created.
- :type ensure_callback: function
-
- :return: A deferred which fires with the new generation and
- transaction id of the target replica.
- :rtype: twisted.internet.defer.Deferred
- """
- # ---------- phase 1: send docs to server ----------------------------
- if DO_STATS:
- self.sync_exchange_phase[0] += 1
- # --------------------------------------------------------------------
-
- self._ensure_callback = ensure_callback
-
- if sync_id is None:
- sync_id = str(uuid4())
- self.source_replica_uid = source_replica_uid
-
- # save a reference to the callback so we can use it after decrypting
- self._insert_doc_cb = insert_doc_cb
-
- gen_after_send, trans_id_after_send = yield self._send_docs(
- docs_by_generation,
- last_known_generation,
- last_known_trans_id,
- sync_id)
-
- # ---------- phase 2: receive docs -----------------------------------
- if DO_STATS:
- self.sync_exchange_phase[0] += 1
- # --------------------------------------------------------------------
-
- cur_target_gen, cur_target_trans_id = yield self._receive_docs(
- last_known_generation, last_known_trans_id,
- ensure_callback, sync_id)
-
- # update gen and trans id info in case we just sent and did not
- # receive docs.
- if gen_after_send is not None and gen_after_send > cur_target_gen:
- cur_target_gen = gen_after_send
- cur_target_trans_id = trans_id_after_send
-
- # ---------- phase 3: sync exchange is over --------------------------
- if DO_STATS:
- self.sync_exchange_phase[0] += 1
- # --------------------------------------------------------------------
-
- defer.returnValue([cur_target_gen, cur_target_trans_id])
-
-
-def _unauth_to_invalid_token_error(failure):
- """
- An errback to translate unauthorized errors to our own invalid token
- class.
-
- :param failure: The original failure.
- :type failure: twisted.python.failure.Failure
-
- :return: Either the original failure or an invalid auth token error.
- :rtype: twisted.python.failure.Failure
- """
- failure.trap(HTTPError)
- if failure.value.status == 401:
- raise InvalidAuthTokenError
- return failure
diff --git a/client/src/leap/soledad/client/http_target/fetch.py b/client/src/leap/soledad/client/http_target/fetch.py
deleted file mode 100644
index 9d456830..00000000
--- a/client/src/leap/soledad/client/http_target/fetch.py
+++ /dev/null
@@ -1,161 +0,0 @@
-# -*- coding: utf-8 -*-
-# fetch.py
-# Copyright (C) 2015 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import json
-from twisted.internet import defer
-from twisted.internet import threads
-
-from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS
-from leap.soledad.client.events import emit_async
-from leap.soledad.client.http_target.support import RequestBody
-from leap.soledad.common.log import getLogger
-from leap.soledad.client._crypto import is_symmetrically_encrypted
-from leap.soledad.common.l2db import errors
-from leap.soledad.client import crypto as old_crypto
-
-from .._document import Document
-from . import fetch_protocol
-
-logger = getLogger(__name__)
-
-
-class HTTPDocFetcher(object):
- """
- Handles Document fetching from Soledad server, using HTTP as transport.
- Steps:
- * Prepares metadata by asking server for one document
- * Fetch the total on response and prepare to ask all remaining
- * (async) Documents will come encrypted.
- So we parse, decrypt and insert locally as they arrive.
- """
-
- # The uuid of the local replica.
- # Any class inheriting from this one should provide a meaningful attribute
- # if the sync status event is meant to be used somewhere else.
-
- uuid = 'undefined'
- userid = 'undefined'
-
- @defer.inlineCallbacks
- def _receive_docs(self, last_known_generation, last_known_trans_id,
- ensure_callback, sync_id):
- new_generation = last_known_generation
- new_transaction_id = last_known_trans_id
- # Acts as a queue, ensuring line order on async processing
- # as `self._insert_doc_cb` cant be run concurrently or out of order.
- # DeferredSemaphore solves the concurrency and its implementation uses
- # a queue, solving the ordering.
- # FIXME: Find a proper solution to avoid surprises on Twisted changes
- self.semaphore = defer.DeferredSemaphore(1)
-
- metadata = yield self._fetch_all(
- last_known_generation, last_known_trans_id,
- sync_id)
- number_of_changes, ngen, ntrans = self._parse_metadata(metadata)
-
- # wait for pending inserts
- yield self.semaphore.acquire()
-
- if ngen:
- new_generation = ngen
- new_transaction_id = ntrans
-
- defer.returnValue([new_generation, new_transaction_id])
-
- def _fetch_all(self, last_known_generation,
- last_known_trans_id, sync_id):
- # add remote replica metadata to the request
- body = RequestBody(
- last_known_generation=last_known_generation,
- last_known_trans_id=last_known_trans_id,
- sync_id=sync_id,
- ensure=self._ensure_callback is not None)
- self._received_docs = 0
- # build a stream reader with _doc_parser as a callback
- body_reader = fetch_protocol.build_body_reader(self._doc_parser)
- # start download stream
- return self._http_request(
- self._url,
- method='POST',
- body=str(body),
- content_type='application/x-soledad-sync-get',
- body_reader=body_reader)
-
- @defer.inlineCallbacks
- def _doc_parser(self, doc_info, content, total):
- """
- Insert a received document into the local replica, decrypting
- if necessary. The case where it's not decrypted is when a doc gets
- inserted from Server side with a GPG encrypted content.
-
- :param doc_info: Dictionary representing Document information.
- :type doc_info: dict
- :param content: The Document's content.
- :type idx: str
- :param total: The total number of operations.
- :type total: int
- """
- yield self.semaphore.run(self.__atomic_doc_parse, doc_info, content,
- total)
-
- @defer.inlineCallbacks
- def __atomic_doc_parse(self, doc_info, content, total):
- doc = Document(doc_info['id'], doc_info['rev'], content)
- if is_symmetrically_encrypted(content):
- content = (yield self._crypto.decrypt_doc(doc)).getvalue()
- elif old_crypto.is_symmetrically_encrypted(doc):
- content = self._deprecated_crypto.decrypt_doc(doc)
- doc.set_json(content)
-
- # TODO insert blobs here on the blob backend
- # FIXME: This is wrong. Using the very same SQLite connection object
- # from multiple threads is dangerous. We should bring the dbpool here
- # or find an alternative. Deferring to a thread only helps releasing
- # the reactor for other tasks as this is an IO intensive call.
- yield threads.deferToThread(self._insert_doc_cb,
- doc, doc_info['gen'], doc_info['trans_id'])
- self._received_docs += 1
- user_data = {'uuid': self.uuid, 'userid': self.userid}
- _emit_receive_status(user_data, self._received_docs, total=total)
-
- def _parse_metadata(self, metadata):
- """
- Parse the response from the server containing the sync metadata.
-
- :param response: Metadata as string
- :type response: str
-
- :return: (number_of_changes, new_gen, new_trans_id)
- :rtype: tuple
- """
- try:
- metadata = json.loads(metadata)
- # make sure we have replica_uid from fresh new dbs
- if self._ensure_callback and 'replica_uid' in metadata:
- self._ensure_callback(metadata['replica_uid'])
- return (metadata['number_of_changes'], metadata['new_generation'],
- metadata['new_transaction_id'])
- except (ValueError, KeyError):
- raise errors.BrokenSyncStream('Metadata parsing failed')
-
-
-def _emit_receive_status(user_data, received_docs, total):
- content = {'received': received_docs, 'total': total}
- emit_async(SOLEDAD_SYNC_RECEIVE_STATUS, user_data, content)
-
- if received_docs % 20 == 0:
- msg = "%d/%d" % (received_docs, total)
- logger.debug("Sync receive status: %s" % msg)
diff --git a/client/src/leap/soledad/client/http_target/fetch_protocol.py b/client/src/leap/soledad/client/http_target/fetch_protocol.py
deleted file mode 100644
index 851eb3a1..00000000
--- a/client/src/leap/soledad/client/http_target/fetch_protocol.py
+++ /dev/null
@@ -1,157 +0,0 @@
-# -*- coding: utf-8 -*-
-# fetch_protocol.py
-# Copyright (C) 2016 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import json
-from functools import partial
-from six import StringIO
-from twisted.web._newclient import ResponseDone
-from leap.soledad.common.l2db import errors
-from leap.soledad.common.l2db.remote import utils
-from leap.soledad.common.log import getLogger
-from .support import ReadBodyProtocol
-from .support import readBody
-
-logger = getLogger(__name__)
-
-
-class DocStreamReceiver(ReadBodyProtocol):
- """
- A protocol implementation that can parse incoming data from server based
- on a line format specified on u1db implementation. Except that we split doc
- attributes from content to ease parsing and increment throughput for larger
- documents.
- [\r\n
- {metadata},\r\n
- {doc_info},\r\n
- {content},\r\n
- ...
- {doc_info},\r\n
- {content},\r\n
- ]
- """
-
- def __init__(self, response, deferred, doc_reader):
- self.deferred = deferred
- self.status = response.code if response else None
- self.message = response.phrase if response else None
- self.headers = response.headers if response else {}
- self.delimiter = '\r\n'
- self.metadata = ''
- self._doc_reader = doc_reader
- self.reset()
-
- def reset(self):
- self._line = 0
- self._buffer = StringIO()
- self._properly_finished = False
-
- def connectionLost(self, reason):
- """
- Deliver the accumulated response bytes to the waiting L{Deferred}, if
- the response body has been completely received without error.
- """
- if self.deferred.called:
- return
- try:
- if reason.check(ResponseDone):
- self.dataBuffer = self.metadata
- else:
- self.dataBuffer = self.finish()
- except errors.BrokenSyncStream as e:
- return self.deferred.errback(e)
- return ReadBodyProtocol.connectionLost(self, reason)
-
- def consumeBufferLines(self):
- """
- Consumes lines from buffer and rewind it, writing remaining data
- that didn't formed a line back into buffer.
- """
- content = self._buffer.getvalue()[0:self._buffer.tell()]
- self._buffer.seek(0)
- lines = content.split(self.delimiter)
- self._buffer.write(lines.pop(-1))
- return lines
-
- def dataReceived(self, data):
- """
- Buffer incoming data until a line breaks comes in. We check only
- the incoming data for efficiency.
- """
- self._buffer.write(data)
- if '\n' not in data:
- return
- lines = self.consumeBufferLines()
- while lines:
- line, _ = utils.check_and_strip_comma(lines.pop(0))
- self.lineReceived(line)
- self._line += 1
-
- def lineReceived(self, line):
- """
- Protocol implementation.
- 0: [\r\n
- 1: {metadata},\r\n
- (even): {doc_info},\r\n
- (odd): {data},\r\n
- (last): ]
- """
- if self._properly_finished:
- raise errors.BrokenSyncStream("Reading a finished stream")
- if ']' == line:
- self._properly_finished = True
- elif self._line == 0:
- if line is not '[':
- raise errors.BrokenSyncStream("Invalid start")
- elif self._line == 1:
- self.metadata = line
- if 'error' in self.metadata:
- raise errors.BrokenSyncStream("Error from server: %s" % line)
- self.total = json.loads(line).get('number_of_changes', -1)
- elif (self._line % 2) == 0:
- self.current_doc = json.loads(line)
- if 'error' in self.current_doc:
- raise errors.BrokenSyncStream("Error from server: %s" % line)
- else:
- d = self._doc_reader(
- self.current_doc, line.strip() or None, self.total)
- d.addErrback(self.deferred.errback)
-
- def finish(self):
- """
- Checks that ']' came and stream was properly closed.
- """
- if not self._properly_finished:
- raise errors.BrokenSyncStream('Stream not properly closed')
- content = self._buffer.getvalue()[0:self._buffer.tell()]
- self._buffer.close()
- return content
-
-
-def build_body_reader(doc_reader):
- """
- Get the documents from a sync stream and call doc_reader on each
- doc received.
-
- @param doc_reader: Function to be called for processing an incoming doc.
- Will be called with doc metadata (dict parsed from 1st line) and doc
- content (string)
- @type doc_reader: function
-
- @return: A function that can be called by the http Agent to create and
- configure the proper protocol.
- """
- protocolClass = partial(DocStreamReceiver, doc_reader=doc_reader)
- return partial(readBody, protocolClass=protocolClass)
diff --git a/client/src/leap/soledad/client/http_target/send.py b/client/src/leap/soledad/client/http_target/send.py
deleted file mode 100644
index 2b286ec5..00000000
--- a/client/src/leap/soledad/client/http_target/send.py
+++ /dev/null
@@ -1,107 +0,0 @@
-# -*- coding: utf-8 -*-
-# send.py
-# Copyright (C) 2015 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import json
-
-from twisted.internet import defer
-
-from leap.soledad.common.log import getLogger
-from leap.soledad.client.events import emit_async
-from leap.soledad.client.events import SOLEDAD_SYNC_SEND_STATUS
-from leap.soledad.client.http_target.support import RequestBody
-from .send_protocol import DocStreamProducer
-
-logger = getLogger(__name__)
-
-
-class HTTPDocSender(object):
- """
- Handles Document uploading from Soledad server, using HTTP as transport.
- They need to be encrypted and metadata prepared before sending.
- """
-
- # The uuid of the local replica.
- # Any class inheriting from this one should provide a meaningful attribute
- # if the sync status event is meant to be used somewhere else.
-
- uuid = 'undefined'
- userid = 'undefined'
-
- @defer.inlineCallbacks
- def _send_docs(self, docs_by_generation, last_known_generation,
- last_known_trans_id, sync_id):
-
- if not docs_by_generation:
- defer.returnValue([None, None])
-
- # add remote replica metadata to the request
- body = RequestBody(
- last_known_generation=last_known_generation,
- last_known_trans_id=last_known_trans_id,
- sync_id=sync_id,
- ensure=self._ensure_callback is not None)
- result = yield self._send_batch(body, docs_by_generation)
- response_dict = json.loads(result)[0]
- gen_after_send = response_dict['new_generation']
- trans_id_after_send = response_dict['new_transaction_id']
- defer.returnValue([gen_after_send, trans_id_after_send])
-
- @defer.inlineCallbacks
- def _send_batch(self, body, docs):
- total, calls = len(docs), []
- for i, entry in enumerate(docs):
- calls.append((self._prepare_one_doc,
- entry, body, i + 1, total))
- result = yield self._send_request(body, calls)
- _emit_send_status(self.uuid, body.consumed, total)
-
- defer.returnValue(result)
-
- def _send_request(self, body, calls):
- return self._http_request(
- self._url,
- method='POST',
- body=(body, calls),
- content_type='application/x-soledad-sync-put',
- body_producer=DocStreamProducer)
-
- @defer.inlineCallbacks
- def _prepare_one_doc(self, entry, body, idx, total):
- get_doc_call, gen, trans_id = entry
- doc, content = yield self._encrypt_doc(get_doc_call)
- body.insert_info(
- id=doc.doc_id, rev=doc.rev, content=content, gen=gen,
- trans_id=trans_id, number_of_docs=total,
- doc_idx=idx)
- _emit_send_status(self.uuid, body.consumed, total)
-
- @defer.inlineCallbacks
- def _encrypt_doc(self, get_doc_call):
- f, args, kwargs = get_doc_call
- doc = yield f(*args, **kwargs)
- if doc.is_tombstone():
- defer.returnValue((doc, None))
- else:
- content = yield self._crypto.encrypt_doc(doc)
- defer.returnValue((doc, content))
-
-
-def _emit_send_status(user_data, idx, total):
- content = {'sent': idx, 'total': total}
- emit_async(SOLEDAD_SYNC_SEND_STATUS, user_data, content)
-
- msg = "%d/%d" % (idx, total)
- logger.debug("Sync send status: %s" % msg)
diff --git a/client/src/leap/soledad/client/http_target/send_protocol.py b/client/src/leap/soledad/client/http_target/send_protocol.py
deleted file mode 100644
index 4941aa34..00000000
--- a/client/src/leap/soledad/client/http_target/send_protocol.py
+++ /dev/null
@@ -1,75 +0,0 @@
-# -*- coding: utf-8 -*-
-# send_protocol.py
-# Copyright (C) 2016 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-from zope.interface import implementer
-from twisted.internet import defer
-from twisted.internet import reactor
-from twisted.web.iweb import IBodyProducer
-from twisted.web.iweb import UNKNOWN_LENGTH
-
-
-@implementer(IBodyProducer)
-class DocStreamProducer(object):
- """
- A producer that writes the body of a request to a consumer.
- """
-
- def __init__(self, producer):
- """
- Initialize the string produer.
-
- :param producer: A RequestBody instance and a list of producer calls
- :type producer: (.support.RequestBody, [(function, *args)])
- """
- self.body, self.producer = producer
- self.length = UNKNOWN_LENGTH
- self.pause = False
- self.stop = False
-
- @defer.inlineCallbacks
- def startProducing(self, consumer):
- """
- Write the body to the consumer.
-
- :param consumer: Any IConsumer provider.
- :type consumer: twisted.internet.interfaces.IConsumer
-
- :return: A Deferred that fires when production ends.
- :rtype: twisted.internet.defer.Deferred
- """
- while self.producer and not self.stop:
- if self.pause:
- yield self.sleep(0.001)
- continue
- call = self.producer.pop(0)
- fun, args = call[0], call[1:]
- yield fun(*args)
- consumer.write(self.body.pop(1, leave_open=True))
- consumer.write(self.body.pop(0)) # close stream
-
- def sleep(self, secs):
- d = defer.Deferred()
- reactor.callLater(secs, d.callback, None)
- return d
-
- def pauseProducing(self):
- self.pause = True
-
- def stopProducing(self):
- self.stop = True
-
- def resumeProducing(self):
- self.pause = False
diff --git a/client/src/leap/soledad/client/http_target/support.py b/client/src/leap/soledad/client/http_target/support.py
deleted file mode 100644
index d8d8e420..00000000
--- a/client/src/leap/soledad/client/http_target/support.py
+++ /dev/null
@@ -1,220 +0,0 @@
-# -*- coding: utf-8 -*-
-# support.py
-# Copyright (C) 2015 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import warnings
-import json
-
-from twisted.internet import defer
-from twisted.web.client import _ReadBodyProtocol
-from twisted.web.client import PartialDownloadError
-from twisted.web._newclient import ResponseDone
-from twisted.web._newclient import PotentialDataLoss
-
-from leap.soledad.common.l2db import errors
-from leap.soledad.common.l2db.remote import http_errors
-
-# we want to make sure that HTTP errors will raise appropriate u1db errors,
-# that is, fire errbacks with the appropriate failures, in the context of
-# twisted. Because of that, we redefine the http body reader used by the HTTP
-# client below.
-
-
-class ReadBodyProtocol(_ReadBodyProtocol):
- """
- From original Twisted implementation, focused on adding our error
- handling and ensuring that the proper u1db error is raised.
- """
-
- def __init__(self, response, deferred):
- """
- Initialize the protocol, additionally storing the response headers.
- """
- _ReadBodyProtocol.__init__(
- self, response.code, response.phrase, deferred)
- self.headers = response.headers
-
- # ---8<--- snippet from u1db.remote.http_client, modified to use errbacks
- def _error(self, respdic):
- descr = respdic.get("error")
- exc_cls = errors.wire_description_to_exc.get(descr)
- if exc_cls is not None:
- message = respdic.get("message")
- self.deferred.errback(exc_cls(message))
- else:
- self.deferred.errback(
- errors.HTTPError(self.status, respdic, self.headers))
- # ---8<--- end of snippet from u1db.remote.http_client
-
- def connectionLost(self, reason):
- """
- Deliver the accumulated response bytes to the waiting L{Deferred}, if
- the response body has been completely received without error.
- """
- if reason.check(ResponseDone):
-
- body = b''.join(self.dataBuffer)
-
- # ---8<--- snippet from u1db.remote.http_client
- if self.status in (200, 201):
- self.deferred.callback(body)
- elif self.status in http_errors.ERROR_STATUSES:
- try:
- respdic = json.loads(body)
- except ValueError:
- self.deferred.errback(
- errors.HTTPError(self.status, body, self.headers))
- else:
- self._error(respdic)
- # special cases
- elif self.status == 503:
- self.deferred.errback(errors.Unavailable(body, self.headers))
- else:
- self.deferred.errback(
- errors.HTTPError(self.status, body, self.headers))
- # ---8<--- end of snippet from u1db.remote.http_client
-
- elif reason.check(PotentialDataLoss):
- self.deferred.errback(
- PartialDownloadError(self.status, self.message,
- b''.join(self.dataBuffer)))
- else:
- self.deferred.errback(reason)
-
-
-def readBody(response, protocolClass=ReadBodyProtocol):
- """
- Get the body of an L{IResponse} and return it as a byte string.
-
- This is a helper function for clients that don't want to incrementally
- receive the body of an HTTP response.
-
- @param response: The HTTP response for which the body will be read.
- @type response: L{IResponse} provider
-
- @return: A L{Deferred} which will fire with the body of the response.
- Cancelling it will close the connection to the server immediately.
- """
- def cancel(deferred):
- """
- Cancel a L{readBody} call, close the connection to the HTTP server
- immediately, if it is still open.
-
- @param deferred: The cancelled L{defer.Deferred}.
- """
- abort = getAbort()
- if abort is not None:
- abort()
-
- d = defer.Deferred(cancel)
- protocol = protocolClass(response, d)
-
- def getAbort():
- return getattr(protocol.transport, 'abortConnection', None)
-
- response.deliverBody(protocol)
-
- if protocol.transport is not None and getAbort() is None:
- warnings.warn(
- 'Using readBody with a transport that does not have an '
- 'abortConnection method',
- category=DeprecationWarning,
- stacklevel=2)
-
- return d
-
-
-class RequestBody(object):
- """
- This class is a helper to generate send and fetch requests.
- The expected format is something like:
- [
- {headers},
- {entry1},
- {...},
- {entryN},
- ]
- """
-
- def __init__(self, **header_dict):
- """
- Creates a new RequestBody holding header information.
-
- :param header_dict: A dictionary with the headers.
- :type header_dict: dict
- """
- self.headers = header_dict
- self.entries = []
- self.consumed = 0
-
- def insert_info(self, **entry_dict):
- """
- Dumps an entry into JSON format and add it to entries list.
- Adds 'content' key on a new line if it's present.
-
- :param entry_dict: Entry as a dictionary
- :type entry_dict: dict
- """
- content = ''
- if 'content' in entry_dict:
- content = ',\r\n' + (entry_dict['content'] or '')
- entry = json.dumps(entry_dict) + content
- self.entries.append(entry)
-
- def pop(self, amount=10, leave_open=False):
- """
- Removes entries and returns it formatted and ready
- to be sent.
-
- :param amount: number of entries to pop and format
- :type amount: int
-
- :param leave_open: flag to skip stream closing
- :type amount: bool
-
- :return: formatted body ready to be sent
- :rtype: str
- """
- start = self.consumed == 0
- amount = min([len(self.entries), amount])
- entries = [self.entries.pop(0) for i in xrange(amount)]
- self.consumed += amount
- end = len(self.entries) == 0 if not leave_open else False
- return self.entries_to_str(entries, start, end)
-
- def __str__(self):
- return self.pop(len(self.entries))
-
- def __len__(self):
- return len(self.entries)
-
- def entries_to_str(self, entries=None, start=True, end=True):
- """
- Format a list of entries into the body format expected
- by the server.
-
- :param entries: entries to format
- :type entries: list
-
- :return: formatted body ready to be sent
- :rtype: str
- """
- data = ''
- if start:
- data = '[\r\n' + json.dumps(self.headers)
- data += ''.join(',\r\n' + entry for entry in entries)
- if end:
- data += '\r\n]'
- return data