summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--client/src/leap/soledad/client/http_target/__init__.py5
-rw-r--r--client/src/leap/soledad/client/http_target/api.py12
-rw-r--r--client/src/leap/soledad/client/http_target/fetch.py1
-rw-r--r--client/src/leap/soledad/client/http_target/send.py64
-rw-r--r--client/src/leap/soledad/client/http_target/send_protocol.py61
-rw-r--r--client/src/leap/soledad/client/http_target/support.py27
-rw-r--r--client/src/leap/soledad/client/sync.py24
-rw-r--r--server/src/leap/soledad/server/__init__.py8
-rw-r--r--server/src/leap/soledad/server/sync.py5
-rw-r--r--testing/tests/perf/conftest.py2
10 files changed, 135 insertions, 74 deletions
diff --git a/client/src/leap/soledad/client/http_target/__init__.py b/client/src/leap/soledad/client/http_target/__init__.py
index 62e8bcf0..94de2feb 100644
--- a/client/src/leap/soledad/client/http_target/__init__.py
+++ b/client/src/leap/soledad/client/http_target/__init__.py
@@ -26,6 +26,8 @@ import os
from leap.soledad.common.log import getLogger
from leap.common.http import HTTPClient
+from twisted.web.client import HTTPConnectionPool
+from twisted.internet import reactor
from leap.soledad.client.http_target.send import HTTPDocSender
from leap.soledad.client.http_target.api import SyncTargetAPI
from leap.soledad.client.http_target.fetch import HTTPDocFetcher
@@ -99,7 +101,8 @@ class SoledadHTTPSyncTarget(SyncTargetAPI, HTTPDocSender, HTTPDocFetcher):
# XXX Increasing timeout of simple requests to avoid chances of hitting
# the duplicated syncing bug. This could be reduced to the 30s default
# after implementing Cancellable Sync. See #7382
- self._http = HTTPClient(cert_file, timeout=90)
+ self._http = HTTPClient(cert_file, timeout=90,
+ pool=HTTPConnectionPool(reactor))
if DO_STATS:
self.sync_exchange_phase = [0]
diff --git a/client/src/leap/soledad/client/http_target/api.py b/client/src/leap/soledad/client/http_target/api.py
index 4e068523..00b943e1 100644
--- a/client/src/leap/soledad/client/http_target/api.py
+++ b/client/src/leap/soledad/client/http_target/api.py
@@ -22,6 +22,7 @@ from uuid import uuid4
from twisted.web.error import Error
from twisted.internet import defer
+from twisted.web.http_headers import Headers
from leap.soledad.client.http_target.support import readBody
from leap.soledad.common.errors import InvalidAuthTokenError
@@ -72,11 +73,18 @@ class SyncTargetAPI(SyncTarget):
return self._sync_enc_pool is not None
def _http_request(self, url, method='GET', body=None, headers=None,
- content_type=None, body_reader=readBody):
+ content_type=None, body_reader=readBody,
+ body_producer=None):
headers = headers or self._base_header
if content_type:
headers.update({'content-type': [content_type]})
- d = self._http.request(url, method, body, headers, body_reader)
+ if not body_producer:
+ d = self._http.request(url, method, body, headers, body_reader)
+ else:
+ d = self._http._agent.request(
+ method, url, headers=Headers(headers),
+ bodyProducer=body_producer(body))
+ d.addCallback(body_reader)
d.addErrback(_unauth_to_invalid_token_error)
return d
diff --git a/client/src/leap/soledad/client/http_target/fetch.py b/client/src/leap/soledad/client/http_target/fetch.py
index 063082e5..50e89a2a 100644
--- a/client/src/leap/soledad/client/http_target/fetch.py
+++ b/client/src/leap/soledad/client/http_target/fetch.py
@@ -23,7 +23,6 @@ from leap.soledad.client.http_target.support import RequestBody
from leap.soledad.common.log import getLogger
from leap.soledad.common.document import SoledadDocument
from leap.soledad.common.l2db import errors
-from datetime import datetime
from . import fetch_protocol
diff --git a/client/src/leap/soledad/client/http_target/send.py b/client/src/leap/soledad/client/http_target/send.py
index c7bd057e..fcda9bd7 100644
--- a/client/src/leap/soledad/client/http_target/send.py
+++ b/client/src/leap/soledad/client/http_target/send.py
@@ -22,6 +22,7 @@ from leap.soledad.common.log import getLogger
from leap.soledad.client.events import emit_async
from leap.soledad.client.events import SOLEDAD_SYNC_SEND_STATUS
from leap.soledad.client.http_target.support import RequestBody
+from .send_protocol import DocStreamProducer
logger = getLogger(__name__)
@@ -54,73 +55,56 @@ class HTTPDocSender(object):
last_known_trans_id=last_known_trans_id,
sync_id=sync_id,
ensure=self._ensure_callback is not None)
- total = len(docs_by_generation)
- while body.consumed < total:
- result = yield self._send_batch(total, body, docs_by_generation)
+ result = yield self._send_batch(body, docs_by_generation)
response_dict = json.loads(result)[0]
gen_after_send = response_dict['new_generation']
trans_id_after_send = response_dict['new_transaction_id']
defer.returnValue([gen_after_send, trans_id_after_send])
- def _delete_sent(self, docs):
- for doc, gen, trans_id in docs:
- self._sync_enc_pool.delete_encrypted_doc(
- doc.doc_id, doc.rev)
-
@defer.inlineCallbacks
- def _send_batch(self, total, body, docs):
- sent = []
+ def _send_batch(self, body, docs):
+ total = len(docs)
missing = total - body.consumed
+ calls = []
for i in xrange(1, missing + 1):
- if body.pending_size > self.MAX_BATCH_SIZE:
- break
idx = body.consumed + i
entry = docs[idx - 1]
- sent.append(entry)
- yield self._prepare_one_doc(entry, body, idx, total)
- result = yield self._send_request(body.pop())
- if self._defer_encryption:
- self._delete_sent(sent)
+ calls.append((self._prepare_one_doc,
+ entry, body, idx, total))
+ result = yield self._send_request(body, calls)
_emit_send_status(self.uuid, body.consumed, total)
defer.returnValue(result)
- def _send_request(self, body):
+ def _send_request(self, body, calls):
return self._http_request(
self._url,
method='POST',
- body=body,
- content_type='application/x-soledad-sync-put')
+ body=(body, calls),
+ content_type='application/x-soledad-sync-put',
+ body_producer=DocStreamProducer)
@defer.inlineCallbacks
def _prepare_one_doc(self, entry, body, idx, total):
- doc, gen, trans_id = entry
- content = yield self._encrypt_doc(doc)
+ get_doc, gen, trans_id = entry
+ doc, content = yield self._encrypt_doc(get_doc)
body.insert_info(
id=doc.doc_id, rev=doc.rev, content=content, gen=gen,
trans_id=trans_id, number_of_docs=total,
doc_idx=idx)
- def _encrypt_doc(self, doc):
- d = None
+ @defer.inlineCallbacks
+ def _encrypt_doc(self, get_doc):
+ if type(get_doc) == tuple:
+ f, args = get_doc
+ doc = yield f(args)
+ else:
+ # tests
+ doc = get_doc
if doc.is_tombstone():
- d = defer.succeed(None)
- elif not self._defer_encryption:
- # fallback case, for tests
- d = defer.succeed(self._crypto.encrypt_doc(doc))
+ defer.returnValue((doc, None))
else:
-
- def _maybe_encrypt_doc_inline(doc_json):
- if doc_json is None:
- # the document is not marked as tombstone, but we got
- # nothing from the sync db. As it is not encrypted
- # yet, we force inline encryption.
- return self._crypto.encrypt_doc(doc)
- return doc_json
-
- d = self._sync_enc_pool.get_encrypted_doc(doc.doc_id, doc.rev)
- d.addCallback(_maybe_encrypt_doc_inline)
- return d
+ defer.returnValue((doc, self._crypto.encrypt_doc(doc)))
def _emit_send_status(user_data, idx, total):
diff --git a/client/src/leap/soledad/client/http_target/send_protocol.py b/client/src/leap/soledad/client/http_target/send_protocol.py
new file mode 100644
index 00000000..c72c6d13
--- /dev/null
+++ b/client/src/leap/soledad/client/http_target/send_protocol.py
@@ -0,0 +1,61 @@
+from zope.interface import implements
+from twisted.internet import defer
+from twisted.internet import reactor
+from twisted.web.iweb import IBodyProducer
+from twisted.web.iweb import UNKNOWN_LENGTH
+
+
+class DocStreamProducer(object):
+ """
+ A producer that writes the body of a request to a consumer.
+ """
+
+ implements(IBodyProducer)
+
+ def __init__(self, parser_producer):
+ """
+ Initialize the string produer.
+
+ :param body: The body of the request.
+ :type body: str
+ """
+ self.body, self.producer = parser_producer
+ self.length = UNKNOWN_LENGTH
+ self.pause = False
+ self.stop = False
+
+ @defer.inlineCallbacks
+ def startProducing(self, consumer):
+ """
+ Write the body to the consumer.
+
+ :param consumer: Any IConsumer provider.
+ :type consumer: twisted.internet.interfaces.IConsumer
+
+ :return: A successful deferred.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ call = self.producer.pop(0)
+ yield call[0](*call[1:])
+ while self.producer and not self.stop:
+ if self.pause:
+ yield self.sleep(0.01)
+ continue
+ call = self.producer.pop(0)
+ yield call[0](*call[1:])
+ consumer.write(self.body.pop(1))
+ consumer.write(self.body.pop(1))
+
+ def sleep(self, secs):
+ d = defer.Deferred()
+ reactor.callLater(secs, d.callback, None)
+ return d
+
+ def pauseProducing(self):
+ self.pause = True
+
+ def stopProducing(self):
+ self.stop = True
+
+ def resumeProducing(self):
+ self.pause = False
diff --git a/client/src/leap/soledad/client/http_target/support.py b/client/src/leap/soledad/client/http_target/support.py
index 6ec98ed4..40e5eb55 100644
--- a/client/src/leap/soledad/client/http_target/support.py
+++ b/client/src/leap/soledad/client/http_target/support.py
@@ -155,7 +155,6 @@ class RequestBody(object):
self.headers = header_dict
self.entries = []
self.consumed = 0
- self.pending_size = 0
def insert_info(self, **entry_dict):
"""
@@ -169,9 +168,8 @@ class RequestBody(object):
"""
entry = json.dumps(entry_dict)
self.entries.append(entry)
- self.pending_size += len(entry)
- def pop(self):
+ def pop(self, amount=10):
"""
Removes all entries and returns it formatted and ready
to be sent.
@@ -182,19 +180,20 @@ class RequestBody(object):
:return: formatted body ready to be sent
:rtype: str
"""
- entries = self.entries[:]
- self.entries = []
- self.pending_size = 0
- self.consumed += len(entries)
- return self.entries_to_str(entries)
+ start = self.consumed == 0
+ amount = min([len(self.entries), amount])
+ entries = [self.entries.pop(0) for i in xrange(amount)]
+ self.consumed += amount
+ end = len(self.entries) == 0
+ return self.entries_to_str(entries, start, end)
def __str__(self):
- return self.entries_to_str(self.entries)
+ return self.pop(len(self.entries))
def __len__(self):
return len(self.entries)
- def entries_to_str(self, entries=None):
+ def entries_to_str(self, entries=None, start=True, end=True):
"""
Format a list of entries into the body format expected
by the server.
@@ -205,6 +204,10 @@ class RequestBody(object):
:return: formatted body ready to be sent
:rtype: str
"""
- data = '[\r\n' + json.dumps(self.headers)
+ data = ''
+ if start:
+ data = '[\r\n' + json.dumps(self.headers)
data += ''.join(',\r\n' + entry for entry in entries)
- return data + '\r\n]'
+ if end:
+ data += '\r\n]'
+ return data
diff --git a/client/src/leap/soledad/client/sync.py b/client/src/leap/soledad/client/sync.py
index d3cfe029..9d237d98 100644
--- a/client/src/leap/soledad/client/sync.py
+++ b/client/src/leap/soledad/client/sync.py
@@ -142,17 +142,21 @@ class SoledadSynchronizer(Synchronizer):
# --------------------------------------------------------------------
# prepare to send all the changed docs
- changed_doc_ids = [doc_id for doc_id, _, _ in changes]
- docs_to_send = self.source.get_docs(
- changed_doc_ids, check_for_conflicts=False, include_deleted=True)
- ids_sent = []
+ # changed_doc_ids = [doc_id for doc_id, _, _ in changes]
+ # docs_to_send = self.source.get_docs(
+ # changed_doc_ids, check_for_conflicts=False, include_deleted=True)
+ ids_sent = [doc_id for doc_id, _, _ in changes]
+ # docs_by_generation = []
+ # idx = 0
+ # for doc in docs_to_send:
+ # _, gen, trans = changes[idx]
+ # docs_by_generation.append((doc, gen, trans))
+ # idx += 1
+ # ids_sent.append(doc.doc_id)
docs_by_generation = []
- idx = 0
- for doc in docs_to_send:
- _, gen, trans = changes[idx]
- docs_by_generation.append((doc, gen, trans))
- idx += 1
- ids_sent.append(doc.doc_id)
+ for doc_id, gen, trans in changes:
+ get_doc = (self.source.get_doc, doc_id)
+ docs_by_generation.append((get_doc, gen, trans))
# exchange documents and try to insert the returned ones with
# the target, return target synced-up-to gen.
diff --git a/server/src/leap/soledad/server/__init__.py b/server/src/leap/soledad/server/__init__.py
index d154e3fe..088f45ca 100644
--- a/server/src/leap/soledad/server/__init__.py
+++ b/server/src/leap/soledad/server/__init__.py
@@ -198,7 +198,8 @@ class HTTPInvocationByMethodWithBody(
try:
content_length = int(self.environ['CONTENT_LENGTH'])
except (ValueError, KeyError):
- raise http_app.BadRequest
+ # raise http_app.BadRequest
+ content_length = self.max_request_size
if content_length <= 0:
raise http_app.BadRequest
if content_length > self.max_request_size:
@@ -224,7 +225,6 @@ class HTTPInvocationByMethodWithBody(
if content_type == 'application/x-soledad-sync-put':
meth_put = self._lookup('%s_put' % method)
meth_end = self._lookup('%s_end' % method)
- entries = []
while True:
line = body_getline()
entry = line.strip()
@@ -233,11 +233,9 @@ class HTTPInvocationByMethodWithBody(
if not entry or not comma: # empty or no prec comma
raise http_app.BadRequest
entry, comma = utils.check_and_strip_comma(entry)
- entries.append(entry)
+ meth_put({}, entry)
if comma or body_getline(): # extra comma or data
raise http_app.BadRequest
- for entry in entries:
- meth_put({}, entry)
return meth_end()
# handle outgoing documents
elif content_type == 'application/x-soledad-sync-get':
diff --git a/server/src/leap/soledad/server/sync.py b/server/src/leap/soledad/server/sync.py
index c958bfaa..0bf7b236 100644
--- a/server/src/leap/soledad/server/sync.py
+++ b/server/src/leap/soledad/server/sync.py
@@ -237,7 +237,9 @@ class SyncResource(http_app.SyncResource):
:type doc_idx: int
"""
doc = Document(id, rev, content)
- self._staging.append((doc, gen, trans_id, number_of_docs, doc_idx))
+ self.sync_exch.insert_doc_from_source(
+ doc, gen, trans_id, number_of_docs=None,
+ doc_idx=None, sync_id=None)
@http_app.http_method(received=int, content_as_args=True)
def post_get(self, received):
@@ -282,7 +284,6 @@ class SyncResource(http_app.SyncResource):
Return the current generation and transaction_id after inserting one
incoming document.
"""
- self.sync_exch.batched_insert_from_source(self._staging, self._sync_id)
self.responder.content_type = 'application/x-soledad-sync-response'
self.responder.start_response(200)
self.responder.start_stream(),
diff --git a/testing/tests/perf/conftest.py b/testing/tests/perf/conftest.py
index 5ac1f3c0..f960bd32 100644
--- a/testing/tests/perf/conftest.py
+++ b/testing/tests/perf/conftest.py
@@ -243,7 +243,7 @@ def soledad_client(tmpdir, soledad_server, remote_db, soledad_dbs, request):
server_url=server_url,
cert_file=None,
auth_token=token,
- defer_encryption=True)
+ defer_encryption=False)
request.addfinalizer(soledad_client.close)
return soledad_client
return create