diff options
Diffstat (limited to 'client/src')
7 files changed, 128 insertions, 66 deletions
diff --git a/client/src/leap/soledad/client/http_target/__init__.py b/client/src/leap/soledad/client/http_target/__init__.py index 62e8bcf0..94de2feb 100644 --- a/client/src/leap/soledad/client/http_target/__init__.py +++ b/client/src/leap/soledad/client/http_target/__init__.py @@ -26,6 +26,8 @@ import os from leap.soledad.common.log import getLogger from leap.common.http import HTTPClient +from twisted.web.client import HTTPConnectionPool +from twisted.internet import reactor from leap.soledad.client.http_target.send import HTTPDocSender from leap.soledad.client.http_target.api import SyncTargetAPI from leap.soledad.client.http_target.fetch import HTTPDocFetcher @@ -99,7 +101,8 @@ class SoledadHTTPSyncTarget(SyncTargetAPI, HTTPDocSender, HTTPDocFetcher): # XXX Increasing timeout of simple requests to avoid chances of hitting # the duplicated syncing bug. This could be reduced to the 30s default # after implementing Cancellable Sync. See #7382 - self._http = HTTPClient(cert_file, timeout=90) + self._http = HTTPClient(cert_file, timeout=90, + pool=HTTPConnectionPool(reactor)) if DO_STATS: self.sync_exchange_phase = [0] diff --git a/client/src/leap/soledad/client/http_target/api.py b/client/src/leap/soledad/client/http_target/api.py index 4e068523..00b943e1 100644 --- a/client/src/leap/soledad/client/http_target/api.py +++ b/client/src/leap/soledad/client/http_target/api.py @@ -22,6 +22,7 @@ from uuid import uuid4 from twisted.web.error import Error from twisted.internet import defer +from twisted.web.http_headers import Headers from leap.soledad.client.http_target.support import readBody from leap.soledad.common.errors import InvalidAuthTokenError @@ -72,11 +73,18 @@ class SyncTargetAPI(SyncTarget): return self._sync_enc_pool is not None def _http_request(self, url, method='GET', body=None, headers=None, - content_type=None, body_reader=readBody): + content_type=None, body_reader=readBody, + body_producer=None): headers = headers or self._base_header if content_type: headers.update({'content-type': [content_type]}) - d = self._http.request(url, method, body, headers, body_reader) + if not body_producer: + d = self._http.request(url, method, body, headers, body_reader) + else: + d = self._http._agent.request( + method, url, headers=Headers(headers), + bodyProducer=body_producer(body)) + d.addCallback(body_reader) d.addErrback(_unauth_to_invalid_token_error) return d diff --git a/client/src/leap/soledad/client/http_target/fetch.py b/client/src/leap/soledad/client/http_target/fetch.py index 063082e5..50e89a2a 100644 --- a/client/src/leap/soledad/client/http_target/fetch.py +++ b/client/src/leap/soledad/client/http_target/fetch.py @@ -23,7 +23,6 @@ from leap.soledad.client.http_target.support import RequestBody from leap.soledad.common.log import getLogger from leap.soledad.common.document import SoledadDocument from leap.soledad.common.l2db import errors -from datetime import datetime from . import fetch_protocol diff --git a/client/src/leap/soledad/client/http_target/send.py b/client/src/leap/soledad/client/http_target/send.py index c7bd057e..fcda9bd7 100644 --- a/client/src/leap/soledad/client/http_target/send.py +++ b/client/src/leap/soledad/client/http_target/send.py @@ -22,6 +22,7 @@ from leap.soledad.common.log import getLogger from leap.soledad.client.events import emit_async from leap.soledad.client.events import SOLEDAD_SYNC_SEND_STATUS from leap.soledad.client.http_target.support import RequestBody +from .send_protocol import DocStreamProducer logger = getLogger(__name__) @@ -54,73 +55,56 @@ class HTTPDocSender(object): last_known_trans_id=last_known_trans_id, sync_id=sync_id, ensure=self._ensure_callback is not None) - total = len(docs_by_generation) - while body.consumed < total: - result = yield self._send_batch(total, body, docs_by_generation) + result = yield self._send_batch(body, docs_by_generation) response_dict = json.loads(result)[0] gen_after_send = response_dict['new_generation'] trans_id_after_send = response_dict['new_transaction_id'] defer.returnValue([gen_after_send, trans_id_after_send]) - def _delete_sent(self, docs): - for doc, gen, trans_id in docs: - self._sync_enc_pool.delete_encrypted_doc( - doc.doc_id, doc.rev) - @defer.inlineCallbacks - def _send_batch(self, total, body, docs): - sent = [] + def _send_batch(self, body, docs): + total = len(docs) missing = total - body.consumed + calls = [] for i in xrange(1, missing + 1): - if body.pending_size > self.MAX_BATCH_SIZE: - break idx = body.consumed + i entry = docs[idx - 1] - sent.append(entry) - yield self._prepare_one_doc(entry, body, idx, total) - result = yield self._send_request(body.pop()) - if self._defer_encryption: - self._delete_sent(sent) + calls.append((self._prepare_one_doc, + entry, body, idx, total)) + result = yield self._send_request(body, calls) _emit_send_status(self.uuid, body.consumed, total) defer.returnValue(result) - def _send_request(self, body): + def _send_request(self, body, calls): return self._http_request( self._url, method='POST', - body=body, - content_type='application/x-soledad-sync-put') + body=(body, calls), + content_type='application/x-soledad-sync-put', + body_producer=DocStreamProducer) @defer.inlineCallbacks def _prepare_one_doc(self, entry, body, idx, total): - doc, gen, trans_id = entry - content = yield self._encrypt_doc(doc) + get_doc, gen, trans_id = entry + doc, content = yield self._encrypt_doc(get_doc) body.insert_info( id=doc.doc_id, rev=doc.rev, content=content, gen=gen, trans_id=trans_id, number_of_docs=total, doc_idx=idx) - def _encrypt_doc(self, doc): - d = None + @defer.inlineCallbacks + def _encrypt_doc(self, get_doc): + if type(get_doc) == tuple: + f, args = get_doc + doc = yield f(args) + else: + # tests + doc = get_doc if doc.is_tombstone(): - d = defer.succeed(None) - elif not self._defer_encryption: - # fallback case, for tests - d = defer.succeed(self._crypto.encrypt_doc(doc)) + defer.returnValue((doc, None)) else: - - def _maybe_encrypt_doc_inline(doc_json): - if doc_json is None: - # the document is not marked as tombstone, but we got - # nothing from the sync db. As it is not encrypted - # yet, we force inline encryption. - return self._crypto.encrypt_doc(doc) - return doc_json - - d = self._sync_enc_pool.get_encrypted_doc(doc.doc_id, doc.rev) - d.addCallback(_maybe_encrypt_doc_inline) - return d + defer.returnValue((doc, self._crypto.encrypt_doc(doc))) def _emit_send_status(user_data, idx, total): diff --git a/client/src/leap/soledad/client/http_target/send_protocol.py b/client/src/leap/soledad/client/http_target/send_protocol.py new file mode 100644 index 00000000..c72c6d13 --- /dev/null +++ b/client/src/leap/soledad/client/http_target/send_protocol.py @@ -0,0 +1,61 @@ +from zope.interface import implements +from twisted.internet import defer +from twisted.internet import reactor +from twisted.web.iweb import IBodyProducer +from twisted.web.iweb import UNKNOWN_LENGTH + + +class DocStreamProducer(object): + """ + A producer that writes the body of a request to a consumer. + """ + + implements(IBodyProducer) + + def __init__(self, parser_producer): + """ + Initialize the string produer. + + :param body: The body of the request. + :type body: str + """ + self.body, self.producer = parser_producer + self.length = UNKNOWN_LENGTH + self.pause = False + self.stop = False + + @defer.inlineCallbacks + def startProducing(self, consumer): + """ + Write the body to the consumer. + + :param consumer: Any IConsumer provider. + :type consumer: twisted.internet.interfaces.IConsumer + + :return: A successful deferred. + :rtype: twisted.internet.defer.Deferred + """ + call = self.producer.pop(0) + yield call[0](*call[1:]) + while self.producer and not self.stop: + if self.pause: + yield self.sleep(0.01) + continue + call = self.producer.pop(0) + yield call[0](*call[1:]) + consumer.write(self.body.pop(1)) + consumer.write(self.body.pop(1)) + + def sleep(self, secs): + d = defer.Deferred() + reactor.callLater(secs, d.callback, None) + return d + + def pauseProducing(self): + self.pause = True + + def stopProducing(self): + self.stop = True + + def resumeProducing(self): + self.pause = False diff --git a/client/src/leap/soledad/client/http_target/support.py b/client/src/leap/soledad/client/http_target/support.py index 6ec98ed4..40e5eb55 100644 --- a/client/src/leap/soledad/client/http_target/support.py +++ b/client/src/leap/soledad/client/http_target/support.py @@ -155,7 +155,6 @@ class RequestBody(object): self.headers = header_dict self.entries = [] self.consumed = 0 - self.pending_size = 0 def insert_info(self, **entry_dict): """ @@ -169,9 +168,8 @@ class RequestBody(object): """ entry = json.dumps(entry_dict) self.entries.append(entry) - self.pending_size += len(entry) - def pop(self): + def pop(self, amount=10): """ Removes all entries and returns it formatted and ready to be sent. @@ -182,19 +180,20 @@ class RequestBody(object): :return: formatted body ready to be sent :rtype: str """ - entries = self.entries[:] - self.entries = [] - self.pending_size = 0 - self.consumed += len(entries) - return self.entries_to_str(entries) + start = self.consumed == 0 + amount = min([len(self.entries), amount]) + entries = [self.entries.pop(0) for i in xrange(amount)] + self.consumed += amount + end = len(self.entries) == 0 + return self.entries_to_str(entries, start, end) def __str__(self): - return self.entries_to_str(self.entries) + return self.pop(len(self.entries)) def __len__(self): return len(self.entries) - def entries_to_str(self, entries=None): + def entries_to_str(self, entries=None, start=True, end=True): """ Format a list of entries into the body format expected by the server. @@ -205,6 +204,10 @@ class RequestBody(object): :return: formatted body ready to be sent :rtype: str """ - data = '[\r\n' + json.dumps(self.headers) + data = '' + if start: + data = '[\r\n' + json.dumps(self.headers) data += ''.join(',\r\n' + entry for entry in entries) - return data + '\r\n]' + if end: + data += '\r\n]' + return data diff --git a/client/src/leap/soledad/client/sync.py b/client/src/leap/soledad/client/sync.py index d3cfe029..9d237d98 100644 --- a/client/src/leap/soledad/client/sync.py +++ b/client/src/leap/soledad/client/sync.py @@ -142,17 +142,21 @@ class SoledadSynchronizer(Synchronizer): # -------------------------------------------------------------------- # prepare to send all the changed docs - changed_doc_ids = [doc_id for doc_id, _, _ in changes] - docs_to_send = self.source.get_docs( - changed_doc_ids, check_for_conflicts=False, include_deleted=True) - ids_sent = [] + # changed_doc_ids = [doc_id for doc_id, _, _ in changes] + # docs_to_send = self.source.get_docs( + # changed_doc_ids, check_for_conflicts=False, include_deleted=True) + ids_sent = [doc_id for doc_id, _, _ in changes] + # docs_by_generation = [] + # idx = 0 + # for doc in docs_to_send: + # _, gen, trans = changes[idx] + # docs_by_generation.append((doc, gen, trans)) + # idx += 1 + # ids_sent.append(doc.doc_id) docs_by_generation = [] - idx = 0 - for doc in docs_to_send: - _, gen, trans = changes[idx] - docs_by_generation.append((doc, gen, trans)) - idx += 1 - ids_sent.append(doc.doc_id) + for doc_id, gen, trans in changes: + get_doc = (self.source.get_doc, doc_id) + docs_by_generation.append((get_doc, gen, trans)) # exchange documents and try to insert the returned ones with # the target, return target synced-up-to gen. |